sys/sysmod/
line.rs

1//! LINE API。
2
3#![allow(clippy::identity_op)]
4
5use super::SystemModule;
6use super::openai::{InputContent, ParameterType};
7use super::openai::{
8    ParameterElement,
9    chat_history::ChatHistory,
10    function::{self, BasicContext, FuncArgs, FunctionTable},
11};
12use crate::config;
13use crate::sysmod::openai::{Function, Parameters, function::FUNCTION_TOKEN};
14use crate::sysmod::openai::{OpenAi, OpenAiErrorKind, Role, SearchContextSize, Tool, UserLocation};
15use crate::taskserver::{self, Control};
16
17use actix_web::http::header::ContentType;
18use anyhow::{Context, Result, anyhow, bail, ensure};
19use base64::{Engine, engine::general_purpose};
20use log::{error, info, warn};
21use reqwest::{Client, StatusCode};
22use serde::{Deserialize, Serialize};
23use std::time::Duration;
24use std::vec;
25use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Instant};
26use utils::netutil;
27
28/// LINE API タイムアウト。
29const TIMEOUT: Duration = Duration::from_secs(30);
30/// [Message::Text] の最大文字数。
31/// mention 関連でのずれが少し怖いので余裕を持たせる。
32const MSG_SPLIT_LEN: usize = 5000 - 128;
33
34/// Discord 設定データ。toml 設定に対応する。
35#[derive(Default, Debug, Clone, Serialize, Deserialize)]
36pub struct LineConfig {
37    /// 機能を有効化するなら true。
38    enabled: bool,
39    /// アクセストークン。Developer Portal で入手できる。
40    token: String,
41    /// チャネルシークレット。
42    pub channel_secret: String,
43    /// 特権ユーザ LINE ID リスト。
44    pub privileged_user_ids: Vec<String>,
45    /// LINE ID から名前への固定マップ。
46    pub id_name_map: HashMap<String, String>,
47    // OpenAI プロンプト。
48    #[serde(default)]
49    pub prompt: LinePrompt,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct LinePrompt {
54    /// 最初に一度だけ与えられるシステムメッセージ。
55    pub instructions: Vec<String>,
56    /// 個々のメッセージの直前に一度ずつ与えらえるシステムメッセージ。
57    pub each: Vec<String>,
58    /// 会話履歴をクリアするまでの時間。
59    pub history_timeout_min: u32,
60    /// OpenAI API タイムアウト時のメッセージ。
61    pub timeout_msg: String,
62    /// OpenAI API レートリミットエラーのメッセージ。
63    pub ratelimit_msg: String,
64    /// OpenAI API クレジット枯渇エラーのメッセージ。
65    pub quota_msg: String,
66    /// OpenAI API エラー時のメッセージ。
67    pub error_msg: String,
68}
69
70/// [LinePrompt] のデフォルト値。
71const DEFAULT_TOML: &str =
72    include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/res/openai_line.toml"));
73impl Default for LinePrompt {
74    fn default() -> Self {
75        toml::from_str(DEFAULT_TOML).unwrap()
76    }
77}
78
79/// OpenAI function 呼び出し時のコンテキスト情報。
80pub struct FunctionContext {
81    /// userId, groupIdm or roomId
82    pub reply_to: String,
83    /// 特権ユーザの発言なら true
84    pub privileged: bool,
85}
86
87/// LINE システムモジュール。
88///
89/// [Option] は遅延初期化。
90pub struct Line {
91    /// 設定データ。
92    pub config: LineConfig,
93    /// HTTP クライアント。
94    client: reqwest::Client,
95    /// タスクキューの送信口。
96    tx: tokio::sync::mpsc::Sender<WebhookEvent>,
97    /// タスクキューの受信口。
98    /// コンストラクタで生成、設定されるが、
99    /// タスクスレッド起動時にその中へ move される。
100    rx: Option<tokio::sync::mpsc::Receiver<WebhookEvent>>,
101
102    /// 受信した画像リストを display_name をキーとしてバッファする。
103    pub image_buffer: HashMap<String, Vec<InputContent>>,
104    /// ai コマンドの会話履歴。
105    pub chat_history: Option<ChatHistory>,
106    /// [Self::image_buffer] [Self::chat_history] の有効期限。
107    pub history_timeout: Option<Instant>,
108    /// OpenAI function 機能テーブル
109    pub func_table: Option<FunctionTable<FunctionContext>>,
110}
111
112impl Line {
113    const TASKQUEUE_SIZE: usize = 8;
114
115    /// コンストラクタ。
116    pub fn new() -> Result<Self> {
117        info!("[line] initialize");
118        let config = config::get(|cfg| cfg.line.clone());
119        let client = Client::builder().timeout(TIMEOUT).build()?;
120        let (tx, rx) = tokio::sync::mpsc::channel(Self::TASKQUEUE_SIZE);
121
122        Ok(Self {
123            config,
124            client,
125            tx,
126            rx: Some(rx),
127            image_buffer: Default::default(),
128            chat_history: None,
129            history_timeout: None,
130            func_table: None,
131        })
132    }
133
134    async fn init_openai(&mut self, ctrl: &Control) {
135        if self.chat_history.is_some() && self.func_table.is_some() {
136            return;
137        }
138
139        // トークン上限を算出
140        // Function 定義 + 前文 + (使用可能上限) + 出力
141        let (model_info, reserved) = {
142            let openai = ctrl.sysmods().openai.lock().await;
143
144            (
145                openai.model_info_offline(),
146                openai.get_output_reserved_token(),
147            )
148        };
149
150        let mut chat_history = ChatHistory::new(model_info.name);
151        assert!(chat_history.get_total_limit() == model_info.context_window);
152        let pre_token: usize = self
153            .config
154            .prompt
155            .instructions
156            .iter()
157            .map(|text| chat_history.token_count(text))
158            .sum();
159        let reserved = FUNCTION_TOKEN + pre_token + reserved;
160        chat_history.reserve_tokens(reserved);
161        info!("[line] OpenAI token limit");
162        info!("[line] {:6} total", model_info.context_window);
163        info!("[line] {reserved:6} reserved");
164        info!("[line] {:6} chat history", chat_history.usage().1);
165
166        let mut func_table = FunctionTable::new(Arc::clone(ctrl), Some("line"));
167        func_table.register_basic_functions();
168        register_camera(&mut func_table);
169        register_draw_picture(&mut func_table);
170
171        let _ = self.chat_history.insert(chat_history);
172        let _ = self.func_table.insert(func_table);
173    }
174
175    pub async fn chat_history(&mut self, ctrl: &Control) -> &ChatHistory {
176        self.init_openai(ctrl).await;
177        self.chat_history.as_ref().unwrap()
178    }
179
180    pub async fn chat_history_mut(&mut self, ctrl: &Control) -> &mut ChatHistory {
181        self.init_openai(ctrl).await;
182        self.chat_history.as_mut().unwrap()
183    }
184
185    pub async fn func_table(&mut self, ctrl: &Control) -> &FunctionTable<FunctionContext> {
186        self.init_openai(ctrl).await;
187        self.func_table.as_ref().unwrap()
188    }
189}
190
191/// WebHook Request 処理タスク。
192async fn line_worker(
193    ctrl: Control,
194    mut rx: tokio::sync::mpsc::Receiver<WebhookEvent>,
195) -> Result<()> {
196    loop {
197        tokio::select! {
198        _ = ctrl.wait_cancel_rx() => {
199            info!("[line-worker] cancel");
200            break;
201        }
202        ev = rx.recv() => {
203            if let Some(ev) = ev {
204                    info!("[line-worker] process webhook event start");
205                    if let Err(why) = process_webhook_event(&ctrl, ev).await {
206                        error!("[line-worker] process webhook event error: {why:#?}");
207                    } else {
208                        info!("[line-worker] process webhook event ok");
209                    }
210                } else {
211                    info!("[line-worker] sender closed");
212                    break;
213                }
214            }
215        }
216    }
217
218    let rest = rx.len();
219    if rest > 0 {
220        warn!("[line-worker] {} remaining task(s) will be dropped", rest);
221    }
222    // drop rx (残りタスクがあっても捨てる)
223
224    Ok(())
225}
226
227async fn process_webhook_event(ctrl: &Control, ev: WebhookEvent) -> Result<()> {
228    match &ev.body {
229        WebhookEventBody::Message {
230            reply_token,
231            message,
232        } => match message {
233            RecvMessage::Text { text, .. } => {
234                info!("[line-worker] Receive text message: {text}");
235                on_text_message(ctrl, &ev.common.source, reply_token, text).await
236            }
237            RecvMessage::Image {
238                id,
239                content_provider,
240                ..
241            } => {
242                info!("[line-worker] Receive image message");
243                on_image_message(ctrl, &ev.common.source, id, reply_token, content_provider).await
244            }
245            other => {
246                info!("[line-worker] Ignore message type: {other:?}");
247                Ok(())
248            }
249        },
250        other => {
251            info!("[line-worker] Ignore event: {other:?}");
252            Ok(())
253        }
254    }
255}
256
257impl SystemModule for Line {
258    fn on_start(&mut self, ctrl: &Control) {
259        info!("[line] on_start");
260
261        let rx = self.rx.take().unwrap();
262        let ctrl_for_worker = ctrl.clone();
263        taskserver::spawn_oneshot_fn(ctrl, "line-worker", async move {
264            line_worker(ctrl_for_worker, rx).await
265        });
266    }
267}
268
269////////////////////////////////////////////////////////////////////////////////
270
271#[derive(Debug, Serialize, Deserialize)]
272struct WebHookRequest {
273    destination: String,
274    // may be empty
275    events: Vec<WebhookEvent>,
276}
277
278#[derive(Debug, Serialize, Deserialize)]
279struct WebhookEvent {
280    #[serde(flatten)]
281    common: WebhookEventCommon,
282    #[serde(flatten)]
283    body: WebhookEventBody,
284}
285
286#[derive(Debug, Serialize, Deserialize)]
287#[serde(rename_all = "camelCase")]
288struct WebhookEventCommon {
289    /// "active" or "standby"
290    mode: String,
291    timestamp: u64,
292    source: Option<Source>,
293    webhook_event_id: String,
294    delivery_context: DeliveryContext,
295}
296
297#[derive(Debug, Serialize, Deserialize)]
298#[serde(
299    tag = "type",
300    rename_all = "camelCase",
301    rename_all_fields = "camelCase"
302)]
303enum Source {
304    User {
305        user_id: String,
306    },
307    Group {
308        group_id: String,
309        user_id: Option<String>,
310    },
311    Room {
312        room_id: String,
313        user_id: Option<String>,
314    },
315}
316
317#[derive(Debug, Serialize, Deserialize)]
318#[serde(rename_all = "camelCase")]
319struct DeliveryContext {
320    is_redelivery: bool,
321}
322
323#[derive(Debug, Serialize, Deserialize)]
324#[serde(
325    tag = "type",
326    rename_all = "camelCase",
327    rename_all_fields = "camelCase"
328)]
329enum WebhookEventBody {
330    Message {
331        reply_token: String,
332        message: RecvMessage,
333    },
334    Unsend {
335        message_id: String,
336    },
337    Follow {
338        reply_token: String,
339    },
340    Unfollow,
341    Join {
342        reply_token: String,
343    },
344    Leave,
345    MemberJoined {
346        joined: Members,
347        reply_token: String,
348    },
349    MemberLeft {
350        left: Members,
351        reply_token: String,
352    },
353    #[serde(other)]
354    Other,
355}
356
357#[derive(Debug, Serialize, Deserialize)]
358#[serde(tag = "type", rename_all = "camelCase")]
359struct Members {
360    /// type = "user"
361    members: Vec<Source>,
362}
363
364#[derive(Debug, Serialize, Deserialize)]
365#[serde(
366    tag = "type",
367    rename_all = "camelCase",
368    rename_all_fields = "camelCase"
369)]
370enum RecvMessage {
371    /// 送信元から送られたテキストを含むメッセージオブジェクトです。
372    Text {
373        /// メッセージID
374        id: String,
375        /// メッセージの引用トークン。
376        /// 詳しくは、『Messaging APIドキュメント』の「引用トークンを取得する」を
377        /// 参照してください。
378        quote_token: String,
379        /// メッセージのテキスト
380        text: String,
381        // emojis
382        /// メンションの情報を含むオブジェクト。
383        /// textプロパティにメンションが含まれる場合のみ、
384        /// メッセージイベントに含まれます。
385        mention: Option<Mention>,
386        /// 引用されたメッセージのメッセージID。
387        /// 過去のメッセージを引用している場合にのみ含まれます。
388        quoted_message_id: Option<String>,
389    },
390    /// 送信元から送られた画像を含むメッセージオブジェクトです。
391    Image {
392        /// メッセージID
393        id: String,
394        /// メッセージの引用トークン。
395        /// 詳しくは、『Messaging APIドキュメント』の「引用トークンを取得する」を
396        /// 参照してください。
397        quote_token: String,
398        /// 画像ファイルの提供元。
399        content_provider: ContentProvider,
400    },
401    #[serde(other)]
402    Other,
403}
404
405#[derive(Debug, Serialize, Deserialize)]
406#[serde(tag = "type", rename_all = "camelCase")]
407struct Mention {
408    mentionees: Vec<Mentionee>,
409}
410
411#[derive(Debug, Serialize, Deserialize)]
412#[serde(tag = "type", rename_all = "camelCase")]
413struct Mentionee {
414    index: usize,
415    length: usize,
416    #[serde(flatten)]
417    target: MentioneeTarget,
418    quoted_message_id: Option<String>,
419}
420
421#[derive(Debug, Serialize, Deserialize)]
422#[serde(
423    tag = "type",
424    rename_all = "camelCase",
425    rename_all_fields = "camelCase"
426)]
427enum MentioneeTarget {
428    User { user_id: Option<String> },
429    All,
430}
431
432/// 画像ファイルの提供元。
433#[derive(Debug, Serialize, Deserialize)]
434#[serde(
435    tag = "type",
436    rename_all = "camelCase",
437    rename_all_fields = "camelCase"
438)]
439enum ContentProvider {
440    /// LINEユーザーが画像ファイルを送信しました。
441    /// 画像ファイルのバイナリデータは、メッセージIDを指定してコンテンツを取得する
442    /// エンドポイントを使用することで取得できます。
443    Line,
444    /// 画像ファイルのURLは `contentProvider.originalContentUrl`
445    /// プロパティに含まれます。
446    /// なお、画像ファイルの提供元がexternalの場合、
447    /// 画像ファイルのバイナリデータはコンテンツを取得するエンドポイントで
448    /// 取得できません。
449    External {
450        /// 画像ファイルのURL。
451        /// contentProvider.typeがexternalの場合にのみ含まれます。
452        /// 画像ファイルが置かれているサーバーは、
453        /// LINEヤフー株式会社が提供しているものではありません。
454        original_content_url: String,
455        /// プレビュー画像のURL。
456        /// contentProvider.typeがexternalの場合にのみ含まれます。
457        /// プレビュー画像が置かれているサーバーは、
458        /// LINEヤフー株式会社が提供しているものではありません。
459        preview_image_url: String,
460        // image_set
461    },
462}
463
464// web サーバからの呼び出し部分
465impl Line {
466    /// 署名検証。
467    pub fn verify_signature(&self, signature: &str, body: &str) -> Result<()> {
468        let channel_secret = self.config.channel_secret.as_str();
469
470        verify_signature(signature, channel_secret, body)
471    }
472
473    /// 署名検証後の POST request 処理本体。
474    pub fn process_post(&self, json_body: &str) -> Result<()> {
475        // JSON parse
476        let req = serde_json::from_str::<WebHookRequest>(json_body).inspect_err(|err| {
477            error!("[line] Json parse error: {err}");
478            error!("[line] {json_body}");
479        })?;
480        info!("{req:?}");
481
482        // WebhookEvent が最大5個入っている
483        for ev in req.events {
484            // mode == "active" の時のみ処理
485            if ev.common.mode != "active" {
486                info!(
487                    "[line] Ignore event because mode is not active: {}",
488                    ev.common.mode
489                );
490                continue;
491            }
492            // 処理タスクキューへディスパッチ
493            // キューがいっぱいの場合は警告だけログに出して成功扱いにする
494            if self.tx.try_send(ev).is_err() {
495                warn!("[line] Task queue full, drop message");
496            }
497        }
498
499        Ok(())
500    }
501}
502
503/// 署名検証。
504fn verify_signature(signature: &str, channel_secret: &str, body: &str) -> Result<()> {
505    let key = channel_secret.as_bytes();
506    let data = body.as_bytes();
507    let expected = general_purpose::STANDARD.decode(signature)?;
508
509    utils::netutil::hmac_sha256_verify(key, data, &expected)
510}
511
512async fn on_text_message(
513    ctrl: &Control,
514    src: &Option<Source>,
515    reply_token: &str,
516    text: &str,
517) -> Result<()> {
518    ensure!(src.is_some(), "Field 'source' is required");
519    let src = src.as_ref().unwrap();
520
521    let (prompt, privileged) = {
522        let mut line = ctrl.sysmods().line.lock().await;
523        let prompt = line.config.prompt.clone();
524
525        // source フィールドからプロフィール情報を取得
526        let (display_name, privileged) = source_to_display_name_and_permission(&line, src).await?;
527
528        // タイムアウト処理
529        line.check_history_timeout(ctrl).await;
530
531        // 画像バッファから同一ユーザに対する画像リストを抽出
532        let imgs = line.image_buffer.remove(&display_name).unwrap_or_default();
533
534        // 今回の発言をヒストリに追加 (システムメッセージ + 本体)
535        let sysmsg = prompt.each.join("").replace("${user}", &display_name);
536        line.chat_history_mut(ctrl)
537            .await
538            .push_input_message(Role::Developer, &sysmsg)?;
539        line.chat_history_mut(ctrl)
540            .await
541            .push_input_and_images(Role::User, text, imgs)?;
542
543        (prompt, privileged)
544    };
545
546    // システムメッセージ
547    let inst = prompt.instructions.join("");
548    // ツール (function + built-in tools)
549    let mut tools = vec![];
550    // web search
551    tools.push(Tool::WebSearchPreview {
552        search_context_size: Some(SearchContextSize::Medium),
553        user_location: Some(UserLocation::default()),
554    });
555    // function
556    {
557        let mut line = ctrl.sysmods().line.lock().await;
558        for f in line.func_table(ctrl).await.function_list() {
559            tools.push(Tool::Function(f.clone()));
560        }
561    }
562
563    // function 用コンテキスト情報
564    let reply_to = match src {
565        Source::User { user_id } => user_id,
566        Source::Group {
567            group_id,
568            user_id: _,
569        } => group_id,
570        Source::Room {
571            room_id: _,
572            user_id: _,
573        } => bail!("Source::Room is not supported"),
574    };
575
576    let mut func_trace = String::new();
577    let reply_msg = loop {
578        let mut line = ctrl.sysmods().line.lock().await;
579
580        let resp = {
581            let mut ai = ctrl.sysmods().openai.lock().await;
582
583            // ヒストリの中身全部を追加
584            let input = Vec::from_iter(line.chat_history(ctrl).await.iter().cloned());
585            // ChatGPT API
586            ai.chat_with_tools(Some(&inst), input, &tools).await
587        };
588
589        match resp {
590            Ok(resp) => {
591                // function 呼び出しがあれば履歴に追加
592                for fc in resp.func_call_iter() {
593                    let call_id = &fc.call_id;
594                    let func_name = &fc.name;
595                    let func_args = &fc.arguments;
596
597                    // 関数に渡すコンテキスト情報 (LINE reply_to ID)
598                    let ctx = FunctionContext {
599                        reply_to: reply_to.clone(),
600                        privileged,
601                    };
602                    // call function
603                    let func_out = line
604                        .func_table(ctrl)
605                        .await
606                        .call(ctx, func_name, func_args)
607                        .await;
608                    // debug trace
609                    if line.func_table(ctrl).await.debug_mode() {
610                        if !func_trace.is_empty() {
611                            func_trace.push('\n');
612                        }
613                        func_trace += &format!(
614                            "function call: {func_name}\nparameters: {func_args}\nresult: {func_out}"
615                        );
616                    }
617                    // function の結果を履歴に追加
618                    line.chat_history_mut(ctrl)
619                        .await
620                        .push_function(call_id, func_name, func_args, &func_out)?;
621                }
622                // アシスタント応答と web search があれば履歴に追加
623                let text = resp.output_text();
624                let text = if text.is_empty() { None } else { Some(text) };
625                line.chat_history_mut(ctrl)
626                    .await
627                    .push_output_and_tools(text.as_deref(), resp.web_search_iter().cloned())?;
628
629                if let Some(text) = text {
630                    break Ok(text);
631                }
632            }
633            Err(err) => {
634                // エラーが発生した
635                error!("{err:#?}");
636                break Err(err);
637            }
638        }
639    };
640
641    // LINE へ返信
642    {
643        let mut line = ctrl.sysmods().line.lock().await;
644
645        let mut msgs: Vec<&str> = Vec::new();
646        if !func_trace.is_empty() {
647            msgs.push(&func_trace);
648        }
649        match reply_msg {
650            Ok(reply_msg) => {
651                msgs.push(&reply_msg);
652                for msg in msgs.iter() {
653                    info!("[line] openai reply: {msg}");
654                }
655                line.reply_multi(reply_token, &msgs).await?;
656
657                // タイムアウト延長
658                line.postpone_timeout();
659            }
660            Err(err) => {
661                error!("[line] openai error: {err:#?}");
662                let errmsg = match OpenAi::error_kind(&err) {
663                    OpenAiErrorKind::Timeout => prompt.timeout_msg,
664                    OpenAiErrorKind::RateLimit => prompt.ratelimit_msg,
665                    OpenAiErrorKind::QuotaExceeded => prompt.quota_msg,
666                    _ => prompt.error_msg,
667                };
668                msgs.push(&errmsg);
669                for msg in msgs.iter() {
670                    info!("[line] openai reply: {msg}");
671                }
672                line.reply_multi(reply_token, &msgs).await?;
673            }
674        }
675    }
676
677    Ok(())
678}
679
680async fn on_image_message(
681    ctrl: &Control,
682    src: &Option<Source>,
683    id: &str,
684    _reply_token: &str,
685    content_provider: &ContentProvider,
686) -> Result<()> {
687    ensure!(src.is_some(), "Field 'source' is required");
688    let src = src.as_ref().unwrap();
689
690    let mut line = ctrl.sysmods().line.lock().await;
691
692    // source フィールドからプロフィール情報を取得
693    let (display_name, _privileged) = source_to_display_name_and_permission(&line, src).await?;
694
695    // 画像を取得
696    let bin = match content_provider {
697        ContentProvider::Line => line.get_content(id).await?,
698        ContentProvider::External {
699            original_content_url,
700            preview_image_url: _,
701        } => {
702            const CONN_TIMEOUT: Duration = Duration::from_secs(30);
703            const TIMEOUT: Duration = Duration::from_secs(30);
704
705            let client = reqwest::ClientBuilder::new()
706                .connect_timeout(CONN_TIMEOUT)
707                .timeout(TIMEOUT)
708                .build()?;
709            netutil::checked_get_url_bin(&client, original_content_url)
710                .await
711                .context("URL get network error")?
712        }
713    };
714    let input_content = OpenAi::to_image_input(&bin)?;
715
716    // タイムアウト処理
717    line.check_history_timeout(ctrl).await;
718    // 今回の発言を一時バッファに追加
719    if let Some(v) = line.image_buffer.get_mut(&display_name) {
720        v.push(input_content);
721    } else {
722        line.image_buffer.insert(display_name, vec![input_content]);
723    }
724
725    Ok(())
726}
727
728async fn source_to_display_name_and_permission(
729    line: &Line,
730    src: &Source,
731) -> Result<(String, bool)> {
732    let user_id = match src {
733        Source::User { user_id } => user_id,
734        Source::Group {
735            group_id: _,
736            user_id,
737        } => {
738            if let Some(user_id) = user_id {
739                user_id
740            } else {
741                bail!("userId is null");
742            }
743        }
744        Source::Room {
745            room_id: _,
746            user_id: _,
747        } => bail!("Source::Room is not supported"),
748    };
749    let display_name = if let Some(name) = line.config.id_name_map.get(user_id) {
750        name.to_string()
751    } else {
752        line.get_profile(user_id).await?.display_name
753    };
754    let privileged = line.config.privileged_user_ids.contains(user_id);
755
756    Ok((display_name, privileged))
757}
758
759////////////////////////////////////////////////////////////////////////////////
760
761#[derive(Debug, Serialize, Deserialize)]
762#[serde(tag = "type")]
763struct ErrorResp {
764    message: String,
765    details: Option<Vec<Detail>>,
766}
767
768#[derive(Debug, Serialize, Deserialize)]
769struct Detail {
770    message: Option<String>,
771    property: Option<String>,
772}
773
774#[derive(Debug, Serialize, Deserialize)]
775#[serde(rename_all = "camelCase")]
776pub struct ProfileResp {
777    pub display_name: String,
778    pub user_id: String,
779    pub language: Option<String>,
780    pub picture_url: Option<String>,
781    pub status_message: Option<String>,
782}
783
784#[derive(Debug, Serialize, Deserialize)]
785#[serde(rename_all = "camelCase")]
786#[serde_with::skip_serializing_none]
787struct ReplyReq {
788    reply_token: String,
789    /// len = 1..=5
790    messages: Vec<SendMessage>,
791    notification_disabled: Option<bool>,
792}
793
794#[derive(Debug, Serialize, Deserialize)]
795#[serde(rename_all = "camelCase")]
796pub struct ReplyResp {
797    sent_messages: Vec<SentMessage>,
798}
799
800#[derive(Debug, Serialize, Deserialize)]
801#[serde(rename_all = "camelCase")]
802#[serde_with::skip_serializing_none]
803struct PushReq {
804    to: String,
805    /// len = 1..=5
806    messages: Vec<SendMessage>,
807    notification_disabled: Option<bool>,
808    custom_aggregation_units: Option<Vec<String>>,
809}
810
811#[derive(Debug, Serialize, Deserialize)]
812#[serde(rename_all = "camelCase")]
813pub struct PushResp {
814    sent_messages: Vec<SentMessage>,
815}
816
817#[derive(Debug, Serialize, Deserialize)]
818#[serde(rename_all = "camelCase")]
819struct SentMessage {
820    id: String,
821    quote_token: Option<String>,
822}
823
824/// 画像サイズ制限。
825/// 仕様ぴったりの 10 MB はスマホの通信料に対して大きすぎるので控えめにする。
826const IMAGE_ORIGINAL_SIZE_MAX: usize = 1 * 1000 * 1000;
827/// プレビュー画像サイズ制限。
828/// 仕様は 1 MB。
829const IMAGE_PREVIEW_SIZE_MAX: usize = 200 * 1000;
830
831#[derive(Debug, Serialize, Deserialize)]
832#[serde(rename_all = "camelCase")]
833#[serde(tag = "type")]
834#[serde_with::skip_serializing_none]
835enum SendMessage {
836    #[serde(rename_all = "camelCase")]
837    Text { text: String },
838    #[serde(rename_all = "camelCase")]
839    Image {
840        /// url len <= 5000
841        /// protocol = https (>= TLS 1.2)
842        /// format = jpeg | png
843        /// size <= 10 MB
844        original_content_url: String,
845        /// url len <= 5000
846        /// protocol = https (>= TLS 1.2)
847        /// format = jpeg | png
848        /// size <= 1 MB
849        preview_image_url: String,
850    },
851}
852
853fn url_profile(user_id: &str) -> String {
854    format!("https://api.line.me/v2/bot/profile/{user_id}")
855}
856
857fn url_group_profile(group_id: &str, user_id: &str) -> String {
858    format!("https://api.line.me/v2/bot/group/{group_id}/member/{user_id}")
859}
860
861fn url_content(message_id: &str) -> String {
862    format!("https://api-data.line.me/v2/bot/message/{message_id}/content")
863}
864
865const URL_REPLY: &str = "https://api.line.me/v2/bot/message/reply";
866const URL_PUSH: &str = "https://api.line.me/v2/bot/message/push";
867
868impl Line {
869    pub async fn get_profile(&self, user_id: &str) -> Result<ProfileResp> {
870        self.get_auth_json(&url_profile(user_id)).await
871    }
872
873    pub async fn get_group_profile(&self, group_id: &str, user_id: &str) -> Result<ProfileResp> {
874        self.get_auth_json(&url_group_profile(group_id, user_id))
875            .await
876    }
877
878    /// コンテンツを取得する。
879    ///
880    /// <https://developers.line.biz/ja/reference/messaging-api/#get-content>
881    ///
882    /// Webhookで受信したメッセージIDを使って、ユーザーが送信した画像、動画、音声、
883    /// およびファイルを取得するエンドポイントです。
884    /// このエンドポイントは、Webhookイベントオブジェクトの
885    /// contentProvider.typeプロパティがlineの場合にのみ利用できます。
886    /// ユーザーからデータサイズが大きい動画または音声が送られた場合に、
887    /// コンテンツのバイナリデータを取得できるようになるまで時間がかかるときがあります。
888    /// バイナリデータの準備中にコンテンツを取得しようとすると、
889    /// ステータスコード202が返されバイナリデータは取得できません。
890    /// バイナリデータが取得できるかどうかは、
891    /// 動画または音声の取得準備の状況を確認するエンドポイントで確認できます。
892    /// なお、ユーザーが送信したコンテンツは、
893    /// 縮小などの変換が内部的に行われる場合があります。
894    pub async fn get_content(&self, message_id: &str) -> Result<Vec<u8>> {
895        let bin = loop {
896            let (status, bin) = self.get_auth_bin(&url_content(message_id)).await?;
897            match status {
898                StatusCode::OK => {
899                    info!("OK ({} bytes)", bin.len());
900                    break bin;
901                }
902                StatusCode::ACCEPTED => {
903                    // 202 Accepted
904                    info!("202: Not ready yet");
905                    tokio::time::sleep(Duration::from_secs(5)).await;
906                }
907                _ => {
908                    bail!("Invalid status: {status}");
909                }
910            }
911        };
912
913        Ok(bin)
914    }
915
916    pub fn postpone_timeout(&mut self) {
917        let now = Instant::now();
918        let timeout = now + Duration::from_secs(self.config.prompt.history_timeout_min as u64 * 60);
919        self.history_timeout = Some(timeout);
920    }
921
922    /// [Self::image_buffer] [Self::chat_history] にタイムアウトを適用する。
923    pub async fn check_history_timeout(&mut self, ctrl: &Control) {
924        let now = Instant::now();
925
926        if let Some(timeout) = self.history_timeout
927            && now > timeout
928        {
929            self.image_buffer.clear();
930            self.chat_history_mut(ctrl).await.clear();
931            self.history_timeout = None;
932        }
933    }
934
935    /// [Self::reply_multi] のシンプル版。
936    /// 文字列が長すぎるならば分割して送信する。
937    pub async fn reply(&self, reply_token: &str, text: &str) -> Result<ReplyResp> {
938        let texts = [text];
939
940        self.reply_multi(reply_token, &texts).await
941    }
942
943    /// <https://developers.line.biz/ja/reference/messaging-api/#send-reply-message>
944    ///
945    /// <https://developers.line.biz/ja/docs/messaging-api/text-character-count/>
946    ///
947    /// `texts` のそれぞれについて、長すぎるならば分割し、
948    /// 文字列メッセージ配列として送信する。
949    /// 配列の最大サイズは 5。
950    pub async fn reply_multi(&self, reply_token: &str, texts: &[&str]) -> Result<ReplyResp> {
951        let mut messages = Vec::new();
952        for text in texts {
953            ensure!(!text.is_empty(), "text must not be empty");
954            let splitted = split_message(text);
955            messages.extend(splitted.iter().map(|&chunk| SendMessage::Text {
956                text: chunk.to_string(),
957            }));
958        }
959        ensure!(messages.len() <= 5, "text too long: {}", texts.len());
960
961        let req = ReplyReq {
962            reply_token: reply_token.to_string(),
963            messages,
964            notification_disabled: None,
965        };
966        let resp = self.post_auth_json(URL_REPLY, &req).await?;
967        info!("{resp:?}");
968
969        Ok(resp)
970    }
971
972    /// <https://developers.line.biz/ja/reference/messaging-api/#send-push-message>
973    ///
974    /// <https://developers.line.biz/ja/docs/messaging-api/text-character-count/>
975    #[allow(unused)]
976    pub async fn push_message(&self, to: &str, text: &str) -> Result<ReplyResp> {
977        ensure!(!text.is_empty(), "text must not be empty");
978
979        let messages: Vec<_> = split_message(text)
980            .iter()
981            .map(|&chunk| SendMessage::Text {
982                text: chunk.to_string(),
983            })
984            .collect();
985        ensure!(messages.len() <= 5, "text too long: {}", text.len());
986
987        let req = PushReq {
988            to: to.to_string(),
989            messages,
990            notification_disabled: None,
991            custom_aggregation_units: None,
992        };
993        let resp = self.post_auth_json(URL_PUSH, &req).await?;
994        info!("{resp:?}");
995
996        Ok(resp)
997    }
998
999    /// <https://developers.line.biz/ja/reference/messaging-api/#send-push-message>
1000    pub async fn push_image_message(
1001        &self,
1002        to: &str,
1003        url_original: &str,
1004        url_preview: &str,
1005    ) -> Result<ReplyResp> {
1006        let messages = vec![SendMessage::Image {
1007            original_content_url: url_original.to_string(),
1008            preview_image_url: url_preview.to_string(),
1009        }];
1010
1011        let req = PushReq {
1012            to: to.to_string(),
1013            messages,
1014            notification_disabled: None,
1015            custom_aggregation_units: None,
1016        };
1017        let resp = self.post_auth_json(URL_PUSH, &req).await?;
1018        info!("{resp:?}");
1019
1020        Ok(resp)
1021    }
1022
1023    /// レスポンスの内容を確認しながら json に変換する。
1024    ///
1025    /// * HTTP Status が成功の場合
1026    ///   * Response body JSON を T に変換できればそれを返す。
1027    ///   * 変換に失敗したらエラー。
1028    /// * HTTP Status が失敗の場合
1029    ///   * Response body JSON を [ErrorResp] にパースできればその [Debug] を
1030    ///     メッセージとしてエラーを返す。
1031    ///   * 変換に失敗した場合、JSON ソースをメッセージとしてエラーを返す。
1032    async fn check_resp_json<'a, T>(resp: reqwest::Response) -> Result<T>
1033    where
1034        T: for<'de> Deserialize<'de>,
1035    {
1036        // https://qiita.com/kzee/items/d01e6f3e00dfadb9a00b
1037        let status = resp.status();
1038        let body = resp.text().await?;
1039
1040        if status.is_success() {
1041            Ok(serde_json::from_reader::<_, T>(body.as_bytes())?)
1042        } else {
1043            match serde_json::from_str::<ErrorResp>(&body) {
1044                Ok(obj) => bail!("{status}: {:?}", obj),
1045                Err(json_err) => bail!("{status} - {json_err}: {body}"),
1046            }
1047        }
1048    }
1049
1050    async fn get_auth_json<'a, T>(&self, url: &str) -> Result<T>
1051    where
1052        T: for<'de> Deserialize<'de>,
1053    {
1054        info!("[line] GET {url}");
1055        let token = &self.config.token;
1056        let resp = netutil::send_with_retry(|| {
1057            self.client
1058                .get(url)
1059                .header("Authorization", format!("Bearer {token}"))
1060        })
1061        .await?;
1062
1063        Self::check_resp_json(resp).await
1064    }
1065
1066    async fn post_auth_json<T, R>(&self, url: &str, body: &T) -> Result<R>
1067    where
1068        T: Serialize + Debug,
1069        R: for<'de> Deserialize<'de>,
1070    {
1071        info!("[line] POST {url} {body:?}");
1072        let token = &self.config.token;
1073        let resp = netutil::send_with_retry(|| {
1074            self.client
1075                .post(url)
1076                .header("Authorization", format!("Bearer {token}"))
1077                .json(body)
1078        })
1079        .await?;
1080
1081        Self::check_resp_json(resp).await
1082    }
1083
1084    async fn get_auth_bin(&self, url: &str) -> Result<(StatusCode, Vec<u8>)> {
1085        info!("[line] GET {url}");
1086        let token = &self.config.token;
1087
1088        let resp = netutil::send_with_retry(|| {
1089            self.client
1090                .get(url)
1091                .header("Authorization", format!("Bearer {token}"))
1092        })
1093        .await?;
1094
1095        let status = resp.status();
1096        if status.is_success() {
1097            let body = resp.bytes().await?.to_vec();
1098
1099            Ok((status, body))
1100        } else {
1101            let body = resp.text().await?;
1102
1103            match serde_json::from_str::<ErrorResp>(&body) {
1104                Ok(obj) => bail!("{status}: {:?}", obj),
1105                Err(json_err) => bail!("{status} - {json_err}: {body}"),
1106            }
1107        }
1108    }
1109}
1110
1111fn split_message(text: &str) -> Vec<&str> {
1112    // UTF-16 5000 文字で分割
1113    let mut result = Vec::new();
1114    // 左端
1115    let mut s = 0;
1116    // utf-16 文字数
1117    let mut len = 0;
1118    for (ind, c) in text.char_indices() {
1119        // 1 or 2
1120        let clen = c.len_utf16();
1121        // 超えそうなら [s, ind) の部分文字列を出力
1122        if len + clen > MSG_SPLIT_LEN {
1123            result.push(&text[s..ind]);
1124            s = ind;
1125            len = 0;
1126        }
1127        len += clen;
1128    }
1129    if len > 0 {
1130        result.push(&text[s..]);
1131    }
1132
1133    result
1134}
1135
1136/// 固有関数: 画像生成
1137fn register_draw_picture(func_table: &mut FunctionTable<FunctionContext>) {
1138    let mut properties = HashMap::new();
1139    properties.insert(
1140        "keywords".to_string(),
1141        ParameterElement {
1142            type_: vec![ParameterType::String],
1143            description: Some("Keywords for drawing. They must be in English.".to_string()),
1144            ..Default::default()
1145        },
1146    );
1147    func_table.register_function(
1148        Function {
1149            name: "draw".to_string(),
1150            description: Some("Draw a picture".to_string()),
1151            parameters: Parameters {
1152                properties,
1153                required: vec!["keywords".to_string()],
1154                ..Default::default()
1155            },
1156            ..Default::default()
1157        },
1158        move |bctx, ctx, args| Box::pin(draw_picture(bctx, ctx, args)),
1159    );
1160}
1161
1162async fn draw_picture(
1163    bctx: Arc<BasicContext>,
1164    ctx: FunctionContext,
1165    args: &FuncArgs,
1166) -> Result<String> {
1167    let keywords = function::get_arg_str(args, "keywords")?.to_string();
1168
1169    let ctrl = bctx.ctrl.clone();
1170    taskserver::spawn_oneshot_fn(&ctrl, "line_draw_picture", async move {
1171        let url = {
1172            let mut ai = bctx.ctrl.sysmods().openai.lock().await;
1173
1174            ai.generate_image(&keywords, 1)
1175                .await?
1176                .pop()
1177                .ok_or_else(|| anyhow!("parse error"))?
1178        };
1179        {
1180            let line = bctx.ctrl.sysmods().line.lock().await;
1181            line.push_image_message(&ctx.reply_to, &url, &url).await?;
1182        }
1183        Ok(())
1184    });
1185
1186    Ok("Accepted. The result will be automatially posted later. Assistant should not draw for now.".to_string())
1187}
1188
1189/// 固有関数: カメラで写真を撮る
1190fn register_camera(func_table: &mut FunctionTable<FunctionContext>) {
1191    func_table.register_function(
1192        Function {
1193            name: "camera".to_string(),
1194            description: Some("Take a picture".to_string()),
1195            parameters: Parameters {
1196                properties: Default::default(),
1197                required: Default::default(),
1198                ..Default::default()
1199            },
1200            ..Default::default()
1201        },
1202        move |bctx, ctx, args| Box::pin(camera(bctx, ctx, args)),
1203    );
1204}
1205
1206async fn camera(bctx: Arc<BasicContext>, ctx: FunctionContext, _args: &FuncArgs) -> Result<String> {
1207    use crate::sysmod::camera;
1208
1209    anyhow::ensure!(ctx.privileged, "Permission denied");
1210
1211    let mut w = camera::PIC_DEF_W;
1212    let mut h = camera::PIC_DEF_H;
1213    info!("[line] take a picture: {}x{}", w, h);
1214    let mut orig = camera::take_a_pic(camera::TakePicOption::new().width(w).height(h)).await?;
1215    info!("[line] take a picture OK, size={}", orig.len());
1216    // サイズ制限に収まるまで小さくする
1217    while orig.len() > IMAGE_ORIGINAL_SIZE_MAX {
1218        w /= 2;
1219        h /= 2;
1220        orig = camera::resize(&orig, w, h)?;
1221        info!("[line] resize, size={}", orig.len());
1222    }
1223    let mut preview = orig.clone();
1224    while preview.len() > IMAGE_PREVIEW_SIZE_MAX {
1225        w /= 2;
1226        h /= 2;
1227        preview = camera::resize(&preview, w, h)?;
1228    }
1229    let (url_original, url_preview) = {
1230        let mut http = bctx.ctrl.sysmods().http.lock().await;
1231
1232        (
1233            http.export_tmp_data(ContentType::jpeg(), orig)?,
1234            http.export_tmp_data(ContentType::jpeg(), preview)?,
1235        )
1236    };
1237
1238    let ctrl = bctx.ctrl.clone();
1239    taskserver::spawn_oneshot_fn(&ctrl, "line_camera_send", async move {
1240        {
1241            let line = bctx.ctrl.sysmods().line.lock().await;
1242
1243            line.push_image_message(&ctx.reply_to, &url_original, &url_preview)
1244                .await?;
1245        }
1246        Ok(())
1247    });
1248
1249    Ok("OK. Now the users can see the picture.".to_string())
1250}
1251
1252#[cfg(test)]
1253mod tests {
1254    use super::*;
1255
1256    #[test]
1257    fn split_long_message() {
1258        let mut src = String::new();
1259        assert!(split_message(&src).is_empty());
1260
1261        for i in 0..MSG_SPLIT_LEN {
1262            let a = 'A' as u32;
1263            src.push(char::from_u32(a + (i as u32 % 26)).unwrap());
1264        }
1265        let res = split_message(&src);
1266        assert_eq!(1, res.len());
1267        assert_eq!(src, res[0]);
1268
1269        src.push('0');
1270        let res = split_message(&src);
1271        assert_eq!(2, res.len());
1272        assert_eq!(&src[..MSG_SPLIT_LEN], res[0]);
1273        assert_eq!(&src[MSG_SPLIT_LEN..], res[1]);
1274
1275        src.pop();
1276        src.pop();
1277        src.push('😀');
1278        let res = split_message(&src);
1279        assert_eq!(2, res.len());
1280        assert_eq!(&src[..MSG_SPLIT_LEN - 1], res[0]);
1281        assert_eq!(&src[MSG_SPLIT_LEN - 1..], res[1]);
1282    }
1283
1284    #[test]
1285    fn parse_text_message() {
1286        // https://developers.line.biz/ja/reference/messaging-api/#wh-text
1287        let src = r#"
1288{
1289  "destination": "xxxxxxxxxx",
1290  "events": [
1291    {
1292      "replyToken": "nHuyWiB7yP5Zw52FIkcQobQuGDXCTA",
1293      "type": "message",
1294      "mode": "active",
1295      "timestamp": 1462629479859,
1296      "source": {
1297        "type": "group",
1298        "groupId": "Ca56f94637c...",
1299        "userId": "U4af4980629..."
1300      },
1301      "webhookEventId": "01FZ74A0TDDPYRVKNK77XKC3ZR",
1302      "deliveryContext": {
1303        "isRedelivery": false
1304      },
1305      "message": {
1306        "id": "444573844083572737",
1307        "type": "text",
1308        "quoteToken": "q3Plxr4AgKd...",
1309        "text": "@All @example Good Morning!! (love)",
1310        "emojis": [
1311          {
1312            "index": 29,
1313            "length": 6,
1314            "productId": "5ac1bfd5040ab15980c9b435",
1315            "emojiId": "001"
1316          }
1317        ],
1318        "mention": {
1319          "mentionees": [
1320            {
1321              "index": 0,
1322              "length": 4,
1323              "type": "all"
1324            },
1325            {
1326              "index": 5,
1327              "length": 8,
1328              "userId": "U49585cd0d5...",
1329              "type": "user",
1330              "isSelf": false
1331            }
1332          ]
1333        }
1334      }
1335    }
1336  ]
1337}"#;
1338        let _: WebHookRequest = serde_json::from_str(src).unwrap();
1339    }
1340
1341    #[test]
1342    fn parse_image_message() {
1343        // https://developers.line.biz/ja/reference/messaging-api/#wh-image
1344        let src1 = r#"
1345{
1346    "destination": "xxxxxxxxxx",
1347    "events": [
1348        {
1349            "type": "message",
1350            "message": {
1351                "type": "image",
1352                "id": "354718705033693859",
1353                "quoteToken": "q3Plxr4AgKd...",
1354                "contentProvider": {
1355                    "type": "line"
1356                },
1357                "imageSet": {
1358                    "id": "E005D41A7288F41B65593ED38FF6E9834B046AB36A37921A56BC236F13A91855",
1359                    "index": 1,
1360                    "total": 2
1361                }
1362            },
1363            "timestamp": 1627356924513,
1364            "source": {
1365                "type": "user",
1366                "userId": "U4af4980629..."
1367            },
1368            "webhookEventId": "01FZ74A0TDDPYRVKNK77XKC3ZR",
1369            "deliveryContext": {
1370                "isRedelivery": false
1371            },
1372            "replyToken": "7840b71058e24a5d91f9b5726c7512c9",
1373            "mode": "active"
1374        }
1375    ]
1376}"#;
1377        let src2 = r#"
1378{
1379    "destination": "xxxxxxxxxx",
1380    "events": [
1381        {
1382            "type": "message",
1383            "message": {
1384                "type": "image",
1385                "id": "354718705033693861",
1386                "quoteToken": "yHAz4Ua2wx7...",
1387                "contentProvider": {
1388                    "type": "line"
1389                },
1390                "imageSet": {
1391                    "id": "E005D41A7288F41B65593ED38FF6E9834B046AB36A37921A56BC236F13A91855",
1392                    "index": 2,
1393                    "total": 2
1394                }
1395            },
1396            "timestamp": 1627356924722,
1397            "source": {
1398                "type": "user",
1399                "userId": "U4af4980629..."
1400            },
1401            "webhookEventId": "01FZ74A0TDDPYRVKNK77XKC3ZR",
1402            "deliveryContext": {
1403                "isRedelivery": false
1404            },
1405            "replyToken": "fbf94e269485410da6b7e3a5e33283e8",
1406            "mode": "active"
1407        }
1408    ]
1409}"#;
1410        let _: WebHookRequest = serde_json::from_str(src1).unwrap();
1411        let _: WebHookRequest = serde_json::from_str(src2).unwrap();
1412    }
1413
1414    #[test]
1415    fn base64_decode() {
1416        let line_signature = "A+JCmhu7Tg6f4lwANmLGirCS2rY8kHBmSG18ctUtvjQ=";
1417        let res = verify_signature(line_signature, "1234567890", "test");
1418        assert!(res.is_err());
1419        // base64 decode に成功し、MAC 検証に失敗する
1420        assert!(res.unwrap_err().to_string().contains("MAC"));
1421    }
1422}