sys/sysmod/http/
line_hook.rs

1//! LINE Webhook.
2//!
3//! <https://developers.line.biz/ja/docs/messaging-api/>
4
5use super::{ActixError, WebResult};
6use crate::sysmod::line::{FunctionContext, Line};
7use crate::sysmod::openai::{OpenAi, OpenAiErrorKind, Role, SearchContextSize, Tool, UserLocation};
8use crate::taskserver::Control;
9
10use actix_web::{HttpRequest, HttpResponse, Responder, http::header::ContentType, web};
11use anyhow::{Context, Result, bail, ensure};
12use base64::{Engine, engine::general_purpose};
13use log::{error, info};
14use serde::{Deserialize, Serialize};
15use std::time::Duration;
16use utils::netutil;
17
18#[derive(Debug, Serialize, Deserialize)]
19struct WebHookRequest {
20    destination: String,
21    // may be empty
22    events: Vec<WebhookEvent>,
23}
24
25#[derive(Debug, Serialize, Deserialize)]
26struct WebhookEvent {
27    #[serde(flatten)]
28    common: WebhookEventCommon,
29    #[serde(flatten)]
30    body: WebhookEventBody,
31}
32
33#[derive(Debug, Serialize, Deserialize)]
34#[serde(rename_all = "camelCase")]
35struct WebhookEventCommon {
36    /// "active" or "standby"
37    mode: String,
38    timestamp: u64,
39    source: Option<Source>,
40    webhook_event_id: String,
41    delivery_context: DeliveryContext,
42}
43
44#[derive(Debug, Serialize, Deserialize)]
45#[serde(
46    tag = "type",
47    rename_all = "camelCase",
48    rename_all_fields = "camelCase"
49)]
50enum Source {
51    User {
52        user_id: String,
53    },
54    Group {
55        group_id: String,
56        user_id: Option<String>,
57    },
58    Room {
59        room_id: String,
60        user_id: Option<String>,
61    },
62}
63
64#[derive(Debug, Serialize, Deserialize)]
65#[serde(rename_all = "camelCase")]
66struct DeliveryContext {
67    is_redelivery: bool,
68}
69
70#[derive(Debug, Serialize, Deserialize)]
71#[serde(
72    tag = "type",
73    rename_all = "camelCase",
74    rename_all_fields = "camelCase"
75)]
76enum WebhookEventBody {
77    Message {
78        reply_token: String,
79        message: Message,
80    },
81    Unsend {
82        message_id: String,
83    },
84    Follow {
85        reply_token: String,
86    },
87    Unfollow,
88    Join {
89        reply_token: String,
90    },
91    Leave,
92    MemberJoined {
93        joined: Members,
94        reply_token: String,
95    },
96    MemberLeft {
97        left: Members,
98        reply_token: String,
99    },
100    #[serde(other)]
101    Other,
102}
103
104#[derive(Debug, Serialize, Deserialize)]
105#[serde(tag = "type", rename_all = "camelCase")]
106struct Members {
107    /// type = "user"
108    members: Vec<Source>,
109}
110
111#[derive(Debug, Serialize, Deserialize)]
112#[serde(
113    tag = "type",
114    rename_all = "camelCase",
115    rename_all_fields = "camelCase"
116)]
117enum Message {
118    /// 送信元から送られたテキストを含むメッセージオブジェクトです。
119    Text {
120        /// メッセージID
121        id: String,
122        /// メッセージの引用トークン。
123        /// 詳しくは、『Messaging APIドキュメント』の「引用トークンを取得する」を
124        /// 参照してください。
125        quote_token: String,
126        /// メッセージのテキスト
127        text: String,
128        // emojis
129        /// メンションの情報を含むオブジェクト。
130        /// textプロパティにメンションが含まれる場合のみ、
131        /// メッセージイベントに含まれます。
132        mention: Option<Mention>,
133        /// 引用されたメッセージのメッセージID。
134        /// 過去のメッセージを引用している場合にのみ含まれます。
135        quoted_message_id: Option<String>,
136    },
137    /// 送信元から送られた画像を含むメッセージオブジェクトです。
138    Image {
139        /// メッセージID
140        id: String,
141        /// メッセージの引用トークン。
142        /// 詳しくは、『Messaging APIドキュメント』の「引用トークンを取得する」を
143        /// 参照してください。
144        quote_token: String,
145        /// 画像ファイルの提供元。
146        content_provider: ContentProvider,
147    },
148    #[serde(other)]
149    Other,
150}
151
152#[derive(Debug, Serialize, Deserialize)]
153#[serde(tag = "type", rename_all = "camelCase")]
154struct Mention {
155    mentionees: Vec<Mentionee>,
156}
157
158#[derive(Debug, Serialize, Deserialize)]
159#[serde(tag = "type", rename_all = "camelCase")]
160struct Mentionee {
161    index: usize,
162    length: usize,
163    #[serde(flatten)]
164    target: MentioneeTarget,
165    quoted_message_id: Option<String>,
166}
167
168#[derive(Debug, Serialize, Deserialize)]
169#[serde(
170    tag = "type",
171    rename_all = "camelCase",
172    rename_all_fields = "camelCase"
173)]
174enum MentioneeTarget {
175    User { user_id: Option<String> },
176    All,
177}
178
179/// 画像ファイルの提供元。
180#[derive(Debug, Serialize, Deserialize)]
181#[serde(
182    tag = "type",
183    rename_all = "camelCase",
184    rename_all_fields = "camelCase"
185)]
186enum ContentProvider {
187    /// LINEユーザーが画像ファイルを送信しました。
188    /// 画像ファイルのバイナリデータは、メッセージIDを指定してコンテンツを取得する
189    /// エンドポイントを使用することで取得できます。
190    Line,
191    /// 画像ファイルのURLは `contentProvider.originalContentUrl`
192    /// プロパティに含まれます。
193    /// なお、画像ファイルの提供元がexternalの場合、
194    /// 画像ファイルのバイナリデータはコンテンツを取得するエンドポイントで
195    /// 取得できません。
196    External {
197        /// 画像ファイルのURL。
198        /// contentProvider.typeがexternalの場合にのみ含まれます。
199        /// 画像ファイルが置かれているサーバーは、
200        /// LINEヤフー株式会社が提供しているものではありません。
201        original_content_url: String,
202        /// プレビュー画像のURL。
203        /// contentProvider.typeがexternalの場合にのみ含まれます。
204        /// プレビュー画像が置かれているサーバーは、
205        /// LINEヤフー株式会社が提供しているものではありません。
206        preview_image_url: String,
207        // image_set
208    },
209}
210
211#[actix_web::get("/line/")]
212async fn index_get() -> impl Responder {
213    let body = r#"<!DOCTYPE html>
214<html lang="en">
215  <head>
216    <title>LINE Webhook</title>
217  </head>
218  <body>
219    <h1>LINE Webhook</h1>
220    <p>Your request is GET.</p>
221  </body>
222</html>
223"#;
224
225    HttpResponse::Ok()
226        .content_type(ContentType::html())
227        .body(body)
228}
229
230#[actix_web::post("/line/")]
231async fn index_post(req: HttpRequest, body: String, ctrl: web::Data<Control>) -> WebResult {
232    info!("LINE github webhook");
233
234    let headers = req.headers();
235    let signature = headers.get("x-line-signature");
236    if signature.is_none() {
237        return Err(ActixError::new("x-line-signature required", 400));
238    }
239    let signature = signature.unwrap().to_str();
240    if signature.is_err() {
241        return Err(ActixError::new("Bad x-line-signature header", 400));
242    }
243    let signature = signature.unwrap();
244    info!("x-line-signature: {signature}");
245
246    // verify signature
247    let channel_secret = {
248        let line = ctrl.sysmods().line.lock().await;
249        line.config.channel_secret.clone()
250    };
251    if let Err(err) = verify_signature(signature, &channel_secret, &body) {
252        return Err(ActixError::new(&err.to_string(), 401));
253    }
254
255    if let Err(e) = process_post(&ctrl, &body).await {
256        Err(ActixError::new(&e.to_string(), 400))
257    } else {
258        Ok(HttpResponse::Ok()
259            .content_type(ContentType::plaintext())
260            .body(""))
261    }
262}
263
264/// 署名検証。
265fn verify_signature(signature: &str, channel_secret: &str, body: &str) -> Result<()> {
266    let key = channel_secret.as_bytes();
267    let data = body.as_bytes();
268    let expected = general_purpose::STANDARD.decode(signature)?;
269
270    netutil::hmac_sha256_verify(key, data, &expected)
271}
272
273/// 署名検証後の POST request 処理本体。
274async fn process_post(ctrl: &Control, json_body: &str) -> Result<()> {
275    // JSON parse
276    let req = serde_json::from_str::<WebHookRequest>(json_body).inspect_err(|err| {
277        error!("[line] Json parse error: {err}");
278        error!("[line] {json_body}");
279    })?;
280    info!("{req:?}");
281
282    // WebhookEvent が最大5個入っている
283    for ev in req.events.iter() {
284        // mode == "active" の時のみ処理
285        if ev.common.mode != "active" {
286            info!(
287                "[line] Ignore event because mode is not active: {}",
288                ev.common.mode
289            );
290            continue;
291        }
292
293        // イベントタイプに応じた処理にディスパッチ
294        match &ev.body {
295            WebhookEventBody::Message {
296                reply_token,
297                message,
298            } => match message {
299                Message::Text {
300                    id: _,
301                    quote_token: _,
302                    text,
303                    mention: _,
304                    quoted_message_id: _,
305                } => {
306                    info!("[line] Receive text message: {text}");
307                    on_text_message(ctrl, &ev.common.source, reply_token, text).await?;
308                }
309                Message::Image {
310                    id,
311                    quote_token: _,
312                    content_provider,
313                } => {
314                    info!("[line] Receive image message");
315                    on_image_message(ctrl, &ev.common.source, id, reply_token, content_provider)
316                        .await?;
317                }
318                other => {
319                    info!("[line] Ignore message type: {other:?}");
320                }
321            },
322            other => {
323                info!("[line] Ignore event: {other:?}");
324            }
325        }
326    }
327
328    Ok(())
329}
330
331async fn source_to_display_name(line: &Line, src: &Source) -> Result<String> {
332    let display_name = match src {
333        Source::User { user_id } => {
334            if let Some(name) = line.config.id_name_map.get(user_id) {
335                name.to_string()
336            } else {
337                line.get_profile(user_id).await?.display_name
338            }
339        }
340        Source::Group { group_id, user_id } => {
341            if let Some(user_id) = user_id {
342                if let Some(name) = line.config.id_name_map.get(user_id) {
343                    name.to_string()
344                } else {
345                    line.get_group_profile(group_id, user_id)
346                        .await?
347                        .display_name
348                }
349            } else {
350                bail!("userId is null");
351            }
352        }
353        Source::Room {
354            room_id: _,
355            user_id: _,
356        } => bail!("Source::Room is not supported"),
357    };
358
359    Ok(display_name)
360}
361
362async fn on_text_message(
363    ctrl: &Control,
364    src: &Option<Source>,
365    reply_token: &str,
366    text: &str,
367) -> Result<()> {
368    ensure!(src.is_some(), "Field 'source' is required");
369    let src = src.as_ref().unwrap();
370
371    let prompt = {
372        let mut line = ctrl.sysmods().line.lock().await;
373        let prompt = line.config.prompt.clone();
374
375        // source フィールドからプロフィール情報を取得
376        let display_name = source_to_display_name(&line, src).await?;
377
378        // タイムアウト処理
379        line.check_history_timeout(ctrl).await;
380
381        // 画像バッファから同一ユーザに対する画像リストを抽出
382        let imgs = line.image_buffer.remove(&display_name).unwrap_or_default();
383
384        // 今回の発言をヒストリに追加 (システムメッセージ + 本体)
385        let sysmsg = prompt.each.join("").replace("${user}", &display_name);
386        line.chat_history_mut(ctrl)
387            .await
388            .push_input_message(Role::Developer, &sysmsg)?;
389        line.chat_history_mut(ctrl)
390            .await
391            .push_input_and_images(Role::User, text, imgs)?;
392
393        prompt
394    };
395
396    // システムメッセージ
397    let inst = prompt.instructions.join("");
398    // ツール (function + built-in tools)
399    let mut tools = vec![];
400    // web search
401    tools.push(Tool::WebSearchPreview {
402        search_context_size: Some(SearchContextSize::Medium),
403        user_location: Some(UserLocation::default()),
404    });
405    // function
406    {
407        let mut line = ctrl.sysmods().line.lock().await;
408        for f in line.func_table(ctrl).await.function_list() {
409            tools.push(Tool::Function(f.clone()));
410        }
411    }
412
413    // function 用コンテキスト情報
414    let reply_to = match src {
415        Source::User { user_id } => user_id,
416        Source::Group {
417            group_id,
418            user_id: _,
419        } => group_id,
420        Source::Room {
421            room_id: _,
422            user_id: _,
423        } => bail!("Source::Room is not supported"),
424    };
425
426    let mut func_trace = String::new();
427    let reply_msg = loop {
428        let mut line = ctrl.sysmods().line.lock().await;
429
430        let resp = {
431            let mut ai = ctrl.sysmods().openai.lock().await;
432
433            // ヒストリの中身全部を追加
434            let input = Vec::from_iter(line.chat_history(ctrl).await.iter().cloned());
435            // ChatGPT API
436            ai.chat_with_tools(Some(&inst), input, &tools).await
437        };
438
439        match resp {
440            Ok(resp) => {
441                // function 呼び出しがあれば履歴に追加
442                for fc in resp.func_call_iter() {
443                    let call_id = &fc.call_id;
444                    let func_name = &fc.name;
445                    let func_args = &fc.arguments;
446
447                    // 関数に渡すコンテキスト情報 (LINE reply_to ID)
448                    let ctx = FunctionContext {
449                        reply_to: reply_to.clone(),
450                    };
451                    // call function
452                    let func_out = line
453                        .func_table(ctrl)
454                        .await
455                        .call(ctx, func_name, func_args)
456                        .await;
457                    // debug trace
458                    if line.func_table(ctrl).await.debug_mode() {
459                        if !func_trace.is_empty() {
460                            func_trace.push('\n');
461                        }
462                        func_trace += &format!(
463                            "function call: {func_name}\nparameters: {func_args}\nresult: {func_out}"
464                        );
465                    }
466                    // function の結果を履歴に追加
467                    line.chat_history_mut(ctrl)
468                        .await
469                        .push_function(call_id, func_name, func_args, &func_out)?;
470                }
471                // アシスタント応答と web search があれば履歴に追加
472                let text = resp.output_text();
473                let text = if text.is_empty() { None } else { Some(text) };
474                line.chat_history_mut(ctrl)
475                    .await
476                    .push_output_and_tools(text.as_deref(), resp.web_search_iter().cloned())?;
477
478                if let Some(text) = text {
479                    break Ok(text);
480                }
481            }
482            Err(err) => {
483                // エラーが発生した
484                error!("{err:#?}");
485                break Err(err);
486            }
487        }
488    };
489
490    // LINE へ返信
491    {
492        let mut line = ctrl.sysmods().line.lock().await;
493
494        let mut msgs: Vec<&str> = Vec::new();
495        if !func_trace.is_empty() {
496            msgs.push(&func_trace);
497        }
498        match reply_msg {
499            Ok(reply_msg) => {
500                msgs.push(&reply_msg);
501                for msg in msgs.iter() {
502                    info!("[line] openai reply: {msg}");
503                }
504                line.reply_multi(reply_token, &msgs).await?;
505
506                // タイムアウト延長
507                line.postpone_timeout();
508            }
509            Err(err) => {
510                error!("[line] openai error: {err:#?}");
511                let errmsg = match OpenAi::error_kind(&err) {
512                    OpenAiErrorKind::Timeout => prompt.timeout_msg,
513                    OpenAiErrorKind::RateLimit => prompt.ratelimit_msg,
514                    OpenAiErrorKind::QuotaExceeded => prompt.quota_msg,
515                    _ => prompt.error_msg,
516                };
517                msgs.push(&errmsg);
518                for msg in msgs.iter() {
519                    info!("[line] openai reply: {msg}");
520                }
521                line.reply_multi(reply_token, &msgs).await?;
522            }
523        }
524    }
525
526    Ok(())
527}
528
529async fn on_image_message(
530    ctrl: &Control,
531    src: &Option<Source>,
532    id: &str,
533    _reply_token: &str,
534    content_provider: &ContentProvider,
535) -> Result<()> {
536    ensure!(src.is_some(), "Field 'source' is required");
537    let src = src.as_ref().unwrap();
538
539    let mut line = ctrl.sysmods().line.lock().await;
540
541    // source フィールドからプロフィール情報を取得
542    let display_name = source_to_display_name(&line, src).await?;
543
544    // 画像を取得
545    let bin = match content_provider {
546        ContentProvider::Line => line.get_content(id).await?,
547        ContentProvider::External {
548            original_content_url,
549            preview_image_url: _,
550        } => {
551            const TIMEOUT: Duration = Duration::from_secs(30);
552            let client = reqwest::ClientBuilder::new().timeout(TIMEOUT).build()?;
553            let resp = client
554                .get(original_content_url)
555                .send()
556                .await
557                .context("URL get network error")?;
558
559            netutil::check_http_resp_bin(resp)
560                .await
561                .context("URL get network error")?
562        }
563    };
564    let input_content = OpenAi::to_image_input(&bin)?;
565
566    // タイムアウト処理
567    line.check_history_timeout(ctrl).await;
568    // 今回の発言を一時バッファに追加
569    if let Some(v) = line.image_buffer.get_mut(&display_name) {
570        v.push(input_content);
571    } else {
572        line.image_buffer.insert(display_name, vec![input_content]);
573    }
574
575    Ok(())
576}
577
578#[cfg(test)]
579mod tests {
580    use super::*;
581
582    #[test]
583    fn parse_text_message() {
584        // https://developers.line.biz/ja/reference/messaging-api/#wh-text
585        let src = r#"
586{
587  "destination": "xxxxxxxxxx",
588  "events": [
589    {
590      "replyToken": "nHuyWiB7yP5Zw52FIkcQobQuGDXCTA",
591      "type": "message",
592      "mode": "active",
593      "timestamp": 1462629479859,
594      "source": {
595        "type": "group",
596        "groupId": "Ca56f94637c...",
597        "userId": "U4af4980629..."
598      },
599      "webhookEventId": "01FZ74A0TDDPYRVKNK77XKC3ZR",
600      "deliveryContext": {
601        "isRedelivery": false
602      },
603      "message": {
604        "id": "444573844083572737",
605        "type": "text",
606        "quoteToken": "q3Plxr4AgKd...",
607        "text": "@All @example Good Morning!! (love)",
608        "emojis": [
609          {
610            "index": 29,
611            "length": 6,
612            "productId": "5ac1bfd5040ab15980c9b435",
613            "emojiId": "001"
614          }
615        ],
616        "mention": {
617          "mentionees": [
618            {
619              "index": 0,
620              "length": 4,
621              "type": "all"
622            },
623            {
624              "index": 5,
625              "length": 8,
626              "userId": "U49585cd0d5...",
627              "type": "user",
628              "isSelf": false
629            }
630          ]
631        }
632      }
633    }
634  ]
635}"#;
636        let _: WebHookRequest = serde_json::from_str(src).unwrap();
637    }
638
639    #[test]
640    fn parse_image_message() {
641        // https://developers.line.biz/ja/reference/messaging-api/#wh-image
642        let src1 = r#"
643{
644    "destination": "xxxxxxxxxx",
645    "events": [
646        {
647            "type": "message",
648            "message": {
649                "type": "image",
650                "id": "354718705033693859",
651                "quoteToken": "q3Plxr4AgKd...",
652                "contentProvider": {
653                    "type": "line"
654                },
655                "imageSet": {
656                    "id": "E005D41A7288F41B65593ED38FF6E9834B046AB36A37921A56BC236F13A91855",
657                    "index": 1,
658                    "total": 2
659                }
660            },
661            "timestamp": 1627356924513,
662            "source": {
663                "type": "user",
664                "userId": "U4af4980629..."
665            },
666            "webhookEventId": "01FZ74A0TDDPYRVKNK77XKC3ZR",
667            "deliveryContext": {
668                "isRedelivery": false
669            },
670            "replyToken": "7840b71058e24a5d91f9b5726c7512c9",
671            "mode": "active"
672        }
673    ]
674}"#;
675        let src2 = r#"
676{
677    "destination": "xxxxxxxxxx",
678    "events": [
679        {
680            "type": "message",
681            "message": {
682                "type": "image",
683                "id": "354718705033693861",
684                "quoteToken": "yHAz4Ua2wx7...",
685                "contentProvider": {
686                    "type": "line"
687                },
688                "imageSet": {
689                    "id": "E005D41A7288F41B65593ED38FF6E9834B046AB36A37921A56BC236F13A91855",
690                    "index": 2,
691                    "total": 2
692                }
693            },
694            "timestamp": 1627356924722,
695            "source": {
696                "type": "user",
697                "userId": "U4af4980629..."
698            },
699            "webhookEventId": "01FZ74A0TDDPYRVKNK77XKC3ZR",
700            "deliveryContext": {
701                "isRedelivery": false
702            },
703            "replyToken": "fbf94e269485410da6b7e3a5e33283e8",
704            "mode": "active"
705        }
706    ]
707}"#;
708        let _: WebHookRequest = serde_json::from_str(src1).unwrap();
709        let _: WebHookRequest = serde_json::from_str(src2).unwrap();
710    }
711
712    #[test]
713    fn base64_decode() {
714        let line_signature = "A+JCmhu7Tg6f4lwANmLGirCS2rY8kHBmSG18ctUtvjQ=";
715        let res = verify_signature(line_signature, "1234567890", "test");
716        assert!(res.is_err());
717        // base64 decode に成功し、MAC 検証に失敗する
718        assert!(res.unwrap_err().to_string().contains("MAC"));
719    }
720}