1#![allow(clippy::identity_op)]
4
5use super::SystemModule;
6use super::openai::{InputContent, ParameterType};
7use super::openai::{
8 ParameterElement,
9 chat_history::ChatHistory,
10 function::{self, BasicContext, FuncArgs, FunctionTable},
11};
12use crate::config;
13use crate::sysmod::openai::{Function, Parameters, function::FUNCTION_TOKEN};
14use crate::sysmod::openai::{OpenAi, OpenAiErrorKind, Role, SearchContextSize, Tool, UserLocation};
15use crate::taskserver::{self, Control};
16
17use actix_web::http::header::ContentType;
18use anyhow::{Context, Result, anyhow, bail, ensure};
19use base64::{Engine, engine::general_purpose};
20use log::{error, info, warn};
21use reqwest::{Client, StatusCode};
22use serde::{Deserialize, Serialize};
23use std::time::Duration;
24use std::vec;
25use std::{collections::HashMap, fmt::Debug, sync::Arc, time::Instant};
26use utils::netutil;
27
28const TIMEOUT: Duration = Duration::from_secs(30);
30const MSG_SPLIT_LEN: usize = 5000 - 128;
33
34#[derive(Default, Debug, Clone, Serialize, Deserialize)]
36pub struct LineConfig {
37 enabled: bool,
39 token: String,
41 pub channel_secret: String,
43 pub privileged_user_ids: Vec<String>,
45 pub id_name_map: HashMap<String, String>,
47 #[serde(default)]
49 pub prompt: LinePrompt,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct LinePrompt {
54 pub instructions: Vec<String>,
56 pub each: Vec<String>,
58 pub history_timeout_min: u32,
60 pub timeout_msg: String,
62 pub ratelimit_msg: String,
64 pub quota_msg: String,
66 pub error_msg: String,
68}
69
70const DEFAULT_TOML: &str =
72 include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/res/openai_line.toml"));
73impl Default for LinePrompt {
74 fn default() -> Self {
75 toml::from_str(DEFAULT_TOML).unwrap()
76 }
77}
78
79pub struct FunctionContext {
81 pub reply_to: String,
83 pub privileged: bool,
85}
86
87pub struct Line {
91 pub config: LineConfig,
93 client: reqwest::Client,
95 tx: tokio::sync::mpsc::Sender<WebhookEvent>,
97 rx: Option<tokio::sync::mpsc::Receiver<WebhookEvent>>,
101
102 pub image_buffer: HashMap<String, Vec<InputContent>>,
104 pub chat_history: Option<ChatHistory>,
106 pub history_timeout: Option<Instant>,
108 pub func_table: Option<FunctionTable<FunctionContext>>,
110}
111
112impl Line {
113 const TASKQUEUE_SIZE: usize = 8;
114
115 pub fn new() -> Result<Self> {
117 info!("[line] initialize");
118 let config = config::get(|cfg| cfg.line.clone());
119 let client = Client::builder().timeout(TIMEOUT).build()?;
120 let (tx, rx) = tokio::sync::mpsc::channel(Self::TASKQUEUE_SIZE);
121
122 Ok(Self {
123 config,
124 client,
125 tx,
126 rx: Some(rx),
127 image_buffer: Default::default(),
128 chat_history: None,
129 history_timeout: None,
130 func_table: None,
131 })
132 }
133
134 async fn init_openai(&mut self, ctrl: &Control) {
135 if self.chat_history.is_some() && self.func_table.is_some() {
136 return;
137 }
138
139 let (model_info, reserved) = {
142 let openai = ctrl.sysmods().openai.lock().await;
143
144 (
145 openai.model_info_offline(),
146 openai.get_output_reserved_token(),
147 )
148 };
149
150 let mut chat_history = ChatHistory::new(model_info.name);
151 assert!(chat_history.get_total_limit() == model_info.context_window);
152 let pre_token: usize = self
153 .config
154 .prompt
155 .instructions
156 .iter()
157 .map(|text| chat_history.token_count(text))
158 .sum();
159 let reserved = FUNCTION_TOKEN + pre_token + reserved;
160 chat_history.reserve_tokens(reserved);
161 info!("[line] OpenAI token limit");
162 info!("[line] {:6} total", model_info.context_window);
163 info!("[line] {reserved:6} reserved");
164 info!("[line] {:6} chat history", chat_history.usage().1);
165
166 let mut func_table = FunctionTable::new(Arc::clone(ctrl), Some("line"));
167 func_table.register_basic_functions();
168 register_camera(&mut func_table);
169 register_draw_picture(&mut func_table);
170
171 let _ = self.chat_history.insert(chat_history);
172 let _ = self.func_table.insert(func_table);
173 }
174
175 pub async fn chat_history(&mut self, ctrl: &Control) -> &ChatHistory {
176 self.init_openai(ctrl).await;
177 self.chat_history.as_ref().unwrap()
178 }
179
180 pub async fn chat_history_mut(&mut self, ctrl: &Control) -> &mut ChatHistory {
181 self.init_openai(ctrl).await;
182 self.chat_history.as_mut().unwrap()
183 }
184
185 pub async fn func_table(&mut self, ctrl: &Control) -> &FunctionTable<FunctionContext> {
186 self.init_openai(ctrl).await;
187 self.func_table.as_ref().unwrap()
188 }
189}
190
191async fn line_worker(
193 ctrl: Control,
194 mut rx: tokio::sync::mpsc::Receiver<WebhookEvent>,
195) -> Result<()> {
196 loop {
197 tokio::select! {
198 _ = ctrl.wait_cancel_rx() => {
199 info!("[line-worker] cancel");
200 break;
201 }
202 ev = rx.recv() => {
203 if let Some(ev) = ev {
204 info!("[line-worker] process webhook event start");
205 if let Err(why) = process_webhook_event(&ctrl, ev).await {
206 error!("[line-worker] process webhook event error: {why:#?}");
207 } else {
208 info!("[line-worker] process webhook event ok");
209 }
210 } else {
211 info!("[line-worker] sender closed");
212 break;
213 }
214 }
215 }
216 }
217
218 let rest = rx.len();
219 if rest > 0 {
220 warn!("[line-worker] {} remaining task(s) will be dropped", rest);
221 }
222 Ok(())
225}
226
227async fn process_webhook_event(ctrl: &Control, ev: WebhookEvent) -> Result<()> {
228 match &ev.body {
229 WebhookEventBody::Message {
230 reply_token,
231 message,
232 } => match message {
233 RecvMessage::Text { text, .. } => {
234 info!("[line-worker] Receive text message: {text}");
235 on_text_message(ctrl, &ev.common.source, reply_token, text).await
236 }
237 RecvMessage::Image {
238 id,
239 content_provider,
240 ..
241 } => {
242 info!("[line-worker] Receive image message");
243 on_image_message(ctrl, &ev.common.source, id, reply_token, content_provider).await
244 }
245 other => {
246 info!("[line-worker] Ignore message type: {other:?}");
247 Ok(())
248 }
249 },
250 other => {
251 info!("[line-worker] Ignore event: {other:?}");
252 Ok(())
253 }
254 }
255}
256
257impl SystemModule for Line {
258 fn on_start(&mut self, ctrl: &Control) {
259 info!("[line] on_start");
260
261 let rx = self.rx.take().unwrap();
262 let ctrl_for_worker = ctrl.clone();
263 taskserver::spawn_oneshot_fn(ctrl, "line-worker", async move {
264 line_worker(ctrl_for_worker, rx).await
265 });
266 }
267}
268
269#[derive(Debug, Serialize, Deserialize)]
272struct WebHookRequest {
273 destination: String,
274 events: Vec<WebhookEvent>,
276}
277
278#[derive(Debug, Serialize, Deserialize)]
279struct WebhookEvent {
280 #[serde(flatten)]
281 common: WebhookEventCommon,
282 #[serde(flatten)]
283 body: WebhookEventBody,
284}
285
286#[derive(Debug, Serialize, Deserialize)]
287#[serde(rename_all = "camelCase")]
288struct WebhookEventCommon {
289 mode: String,
291 timestamp: u64,
292 source: Option<Source>,
293 webhook_event_id: String,
294 delivery_context: DeliveryContext,
295}
296
297#[derive(Debug, Serialize, Deserialize)]
298#[serde(
299 tag = "type",
300 rename_all = "camelCase",
301 rename_all_fields = "camelCase"
302)]
303enum Source {
304 User {
305 user_id: String,
306 },
307 Group {
308 group_id: String,
309 user_id: Option<String>,
310 },
311 Room {
312 room_id: String,
313 user_id: Option<String>,
314 },
315}
316
317#[derive(Debug, Serialize, Deserialize)]
318#[serde(rename_all = "camelCase")]
319struct DeliveryContext {
320 is_redelivery: bool,
321}
322
323#[derive(Debug, Serialize, Deserialize)]
324#[serde(
325 tag = "type",
326 rename_all = "camelCase",
327 rename_all_fields = "camelCase"
328)]
329enum WebhookEventBody {
330 Message {
331 reply_token: String,
332 message: RecvMessage,
333 },
334 Unsend {
335 message_id: String,
336 },
337 Follow {
338 reply_token: String,
339 },
340 Unfollow,
341 Join {
342 reply_token: String,
343 },
344 Leave,
345 MemberJoined {
346 joined: Members,
347 reply_token: String,
348 },
349 MemberLeft {
350 left: Members,
351 reply_token: String,
352 },
353 #[serde(other)]
354 Other,
355}
356
357#[derive(Debug, Serialize, Deserialize)]
358#[serde(tag = "type", rename_all = "camelCase")]
359struct Members {
360 members: Vec<Source>,
362}
363
364#[derive(Debug, Serialize, Deserialize)]
365#[serde(
366 tag = "type",
367 rename_all = "camelCase",
368 rename_all_fields = "camelCase"
369)]
370enum RecvMessage {
371 Text {
373 id: String,
375 quote_token: String,
379 text: String,
381 mention: Option<Mention>,
386 quoted_message_id: Option<String>,
389 },
390 Image {
392 id: String,
394 quote_token: String,
398 content_provider: ContentProvider,
400 },
401 #[serde(other)]
402 Other,
403}
404
405#[derive(Debug, Serialize, Deserialize)]
406#[serde(tag = "type", rename_all = "camelCase")]
407struct Mention {
408 mentionees: Vec<Mentionee>,
409}
410
411#[derive(Debug, Serialize, Deserialize)]
412#[serde(tag = "type", rename_all = "camelCase")]
413struct Mentionee {
414 index: usize,
415 length: usize,
416 #[serde(flatten)]
417 target: MentioneeTarget,
418 quoted_message_id: Option<String>,
419}
420
421#[derive(Debug, Serialize, Deserialize)]
422#[serde(
423 tag = "type",
424 rename_all = "camelCase",
425 rename_all_fields = "camelCase"
426)]
427enum MentioneeTarget {
428 User { user_id: Option<String> },
429 All,
430}
431
432#[derive(Debug, Serialize, Deserialize)]
434#[serde(
435 tag = "type",
436 rename_all = "camelCase",
437 rename_all_fields = "camelCase"
438)]
439enum ContentProvider {
440 Line,
444 External {
450 original_content_url: String,
455 preview_image_url: String,
460 },
462}
463
464impl Line {
466 pub fn verify_signature(&self, signature: &str, body: &str) -> Result<()> {
468 let channel_secret = self.config.channel_secret.as_str();
469
470 verify_signature(signature, channel_secret, body)
471 }
472
473 pub fn process_post(&self, json_body: &str) -> Result<()> {
475 let req = serde_json::from_str::<WebHookRequest>(json_body).inspect_err(|err| {
477 error!("[line] Json parse error: {err}");
478 error!("[line] {json_body}");
479 })?;
480 info!("{req:?}");
481
482 for ev in req.events {
484 if ev.common.mode != "active" {
486 info!(
487 "[line] Ignore event because mode is not active: {}",
488 ev.common.mode
489 );
490 continue;
491 }
492 if self.tx.try_send(ev).is_err() {
495 warn!("[line] Task queue full, drop message");
496 }
497 }
498
499 Ok(())
500 }
501}
502
503fn verify_signature(signature: &str, channel_secret: &str, body: &str) -> Result<()> {
505 let key = channel_secret.as_bytes();
506 let data = body.as_bytes();
507 let expected = general_purpose::STANDARD.decode(signature)?;
508
509 utils::netutil::hmac_sha256_verify(key, data, &expected)
510}
511
512async fn on_text_message(
513 ctrl: &Control,
514 src: &Option<Source>,
515 reply_token: &str,
516 text: &str,
517) -> Result<()> {
518 ensure!(src.is_some(), "Field 'source' is required");
519 let src = src.as_ref().unwrap();
520
521 let (prompt, privileged) = {
522 let mut line = ctrl.sysmods().line.lock().await;
523 let prompt = line.config.prompt.clone();
524
525 let (display_name, privileged) = source_to_display_name_and_permission(&line, src).await?;
527
528 line.check_history_timeout(ctrl).await;
530
531 let imgs = line.image_buffer.remove(&display_name).unwrap_or_default();
533
534 let sysmsg = prompt.each.join("").replace("${user}", &display_name);
536 line.chat_history_mut(ctrl)
537 .await
538 .push_input_message(Role::Developer, &sysmsg)?;
539 line.chat_history_mut(ctrl)
540 .await
541 .push_input_and_images(Role::User, text, imgs)?;
542
543 (prompt, privileged)
544 };
545
546 let inst = prompt.instructions.join("");
548 let mut tools = vec![];
550 tools.push(Tool::WebSearchPreview {
552 search_context_size: Some(SearchContextSize::Medium),
553 user_location: Some(UserLocation::default()),
554 });
555 {
557 let mut line = ctrl.sysmods().line.lock().await;
558 for f in line.func_table(ctrl).await.function_list() {
559 tools.push(Tool::Function(f.clone()));
560 }
561 }
562
563 let reply_to = match src {
565 Source::User { user_id } => user_id,
566 Source::Group {
567 group_id,
568 user_id: _,
569 } => group_id,
570 Source::Room {
571 room_id: _,
572 user_id: _,
573 } => bail!("Source::Room is not supported"),
574 };
575
576 let mut func_trace = String::new();
577 let reply_msg = loop {
578 let mut line = ctrl.sysmods().line.lock().await;
579
580 let resp = {
581 let mut ai = ctrl.sysmods().openai.lock().await;
582
583 let input = Vec::from_iter(line.chat_history(ctrl).await.iter().cloned());
585 ai.chat_with_tools(Some(&inst), input, &tools).await
587 };
588
589 match resp {
590 Ok(resp) => {
591 for fc in resp.func_call_iter() {
593 let call_id = &fc.call_id;
594 let func_name = &fc.name;
595 let func_args = &fc.arguments;
596
597 let ctx = FunctionContext {
599 reply_to: reply_to.clone(),
600 privileged,
601 };
602 let func_out = line
604 .func_table(ctrl)
605 .await
606 .call(ctx, func_name, func_args)
607 .await;
608 if line.func_table(ctrl).await.debug_mode() {
610 if !func_trace.is_empty() {
611 func_trace.push('\n');
612 }
613 func_trace += &format!(
614 "function call: {func_name}\nparameters: {func_args}\nresult: {func_out}"
615 );
616 }
617 line.chat_history_mut(ctrl)
619 .await
620 .push_function(call_id, func_name, func_args, &func_out)?;
621 }
622 let text = resp.output_text();
624 let text = if text.is_empty() { None } else { Some(text) };
625 line.chat_history_mut(ctrl)
626 .await
627 .push_output_and_tools(text.as_deref(), resp.web_search_iter().cloned())?;
628
629 if let Some(text) = text {
630 break Ok(text);
631 }
632 }
633 Err(err) => {
634 error!("{err:#?}");
636 break Err(err);
637 }
638 }
639 };
640
641 {
643 let mut line = ctrl.sysmods().line.lock().await;
644
645 let mut msgs: Vec<&str> = Vec::new();
646 if !func_trace.is_empty() {
647 msgs.push(&func_trace);
648 }
649 match reply_msg {
650 Ok(reply_msg) => {
651 msgs.push(&reply_msg);
652 for msg in msgs.iter() {
653 info!("[line] openai reply: {msg}");
654 }
655 line.reply_multi(reply_token, &msgs).await?;
656
657 line.postpone_timeout();
659 }
660 Err(err) => {
661 error!("[line] openai error: {err:#?}");
662 let errmsg = match OpenAi::error_kind(&err) {
663 OpenAiErrorKind::Timeout => prompt.timeout_msg,
664 OpenAiErrorKind::RateLimit => prompt.ratelimit_msg,
665 OpenAiErrorKind::QuotaExceeded => prompt.quota_msg,
666 _ => prompt.error_msg,
667 };
668 msgs.push(&errmsg);
669 for msg in msgs.iter() {
670 info!("[line] openai reply: {msg}");
671 }
672 line.reply_multi(reply_token, &msgs).await?;
673 }
674 }
675 }
676
677 Ok(())
678}
679
680async fn on_image_message(
681 ctrl: &Control,
682 src: &Option<Source>,
683 id: &str,
684 _reply_token: &str,
685 content_provider: &ContentProvider,
686) -> Result<()> {
687 ensure!(src.is_some(), "Field 'source' is required");
688 let src = src.as_ref().unwrap();
689
690 let mut line = ctrl.sysmods().line.lock().await;
691
692 let (display_name, _privileged) = source_to_display_name_and_permission(&line, src).await?;
694
695 let bin = match content_provider {
697 ContentProvider::Line => line.get_content(id).await?,
698 ContentProvider::External {
699 original_content_url,
700 preview_image_url: _,
701 } => {
702 const CONN_TIMEOUT: Duration = Duration::from_secs(30);
703 const TIMEOUT: Duration = Duration::from_secs(30);
704
705 let client = reqwest::ClientBuilder::new()
706 .connect_timeout(CONN_TIMEOUT)
707 .timeout(TIMEOUT)
708 .build()?;
709 netutil::checked_get_url_bin(&client, original_content_url)
710 .await
711 .context("URL get network error")?
712 }
713 };
714 let input_content = OpenAi::to_image_input(&bin)?;
715
716 line.check_history_timeout(ctrl).await;
718 if let Some(v) = line.image_buffer.get_mut(&display_name) {
720 v.push(input_content);
721 } else {
722 line.image_buffer.insert(display_name, vec![input_content]);
723 }
724
725 Ok(())
726}
727
728async fn source_to_display_name_and_permission(
729 line: &Line,
730 src: &Source,
731) -> Result<(String, bool)> {
732 let user_id = match src {
733 Source::User { user_id } => user_id,
734 Source::Group {
735 group_id: _,
736 user_id,
737 } => {
738 if let Some(user_id) = user_id {
739 user_id
740 } else {
741 bail!("userId is null");
742 }
743 }
744 Source::Room {
745 room_id: _,
746 user_id: _,
747 } => bail!("Source::Room is not supported"),
748 };
749 let display_name = if let Some(name) = line.config.id_name_map.get(user_id) {
750 name.to_string()
751 } else {
752 line.get_profile(user_id).await?.display_name
753 };
754 let privileged = line.config.privileged_user_ids.contains(user_id);
755
756 Ok((display_name, privileged))
757}
758
759#[derive(Debug, Serialize, Deserialize)]
762#[serde(tag = "type")]
763struct ErrorResp {
764 message: String,
765 details: Option<Vec<Detail>>,
766}
767
768#[derive(Debug, Serialize, Deserialize)]
769struct Detail {
770 message: Option<String>,
771 property: Option<String>,
772}
773
774#[derive(Debug, Serialize, Deserialize)]
775#[serde(rename_all = "camelCase")]
776pub struct ProfileResp {
777 pub display_name: String,
778 pub user_id: String,
779 pub language: Option<String>,
780 pub picture_url: Option<String>,
781 pub status_message: Option<String>,
782}
783
784#[derive(Debug, Serialize, Deserialize)]
785#[serde(rename_all = "camelCase")]
786#[serde_with::skip_serializing_none]
787struct ReplyReq {
788 reply_token: String,
789 messages: Vec<SendMessage>,
791 notification_disabled: Option<bool>,
792}
793
794#[derive(Debug, Serialize, Deserialize)]
795#[serde(rename_all = "camelCase")]
796pub struct ReplyResp {
797 sent_messages: Vec<SentMessage>,
798}
799
800#[derive(Debug, Serialize, Deserialize)]
801#[serde(rename_all = "camelCase")]
802#[serde_with::skip_serializing_none]
803struct PushReq {
804 to: String,
805 messages: Vec<SendMessage>,
807 notification_disabled: Option<bool>,
808 custom_aggregation_units: Option<Vec<String>>,
809}
810
811#[derive(Debug, Serialize, Deserialize)]
812#[serde(rename_all = "camelCase")]
813pub struct PushResp {
814 sent_messages: Vec<SentMessage>,
815}
816
817#[derive(Debug, Serialize, Deserialize)]
818#[serde(rename_all = "camelCase")]
819struct SentMessage {
820 id: String,
821 quote_token: Option<String>,
822}
823
824const IMAGE_ORIGINAL_SIZE_MAX: usize = 1 * 1000 * 1000;
827const IMAGE_PREVIEW_SIZE_MAX: usize = 200 * 1000;
830
831#[derive(Debug, Serialize, Deserialize)]
832#[serde(rename_all = "camelCase")]
833#[serde(tag = "type")]
834#[serde_with::skip_serializing_none]
835enum SendMessage {
836 #[serde(rename_all = "camelCase")]
837 Text { text: String },
838 #[serde(rename_all = "camelCase")]
839 Image {
840 original_content_url: String,
845 preview_image_url: String,
850 },
851}
852
853fn url_profile(user_id: &str) -> String {
854 format!("https://api.line.me/v2/bot/profile/{user_id}")
855}
856
857fn url_group_profile(group_id: &str, user_id: &str) -> String {
858 format!("https://api.line.me/v2/bot/group/{group_id}/member/{user_id}")
859}
860
861fn url_content(message_id: &str) -> String {
862 format!("https://api-data.line.me/v2/bot/message/{message_id}/content")
863}
864
865const URL_REPLY: &str = "https://api.line.me/v2/bot/message/reply";
866const URL_PUSH: &str = "https://api.line.me/v2/bot/message/push";
867
868impl Line {
869 pub async fn get_profile(&self, user_id: &str) -> Result<ProfileResp> {
870 self.get_auth_json(&url_profile(user_id)).await
871 }
872
873 pub async fn get_group_profile(&self, group_id: &str, user_id: &str) -> Result<ProfileResp> {
874 self.get_auth_json(&url_group_profile(group_id, user_id))
875 .await
876 }
877
878 pub async fn get_content(&self, message_id: &str) -> Result<Vec<u8>> {
895 let bin = loop {
896 let (status, bin) = self.get_auth_bin(&url_content(message_id)).await?;
897 match status {
898 StatusCode::OK => {
899 info!("OK ({} bytes)", bin.len());
900 break bin;
901 }
902 StatusCode::ACCEPTED => {
903 info!("202: Not ready yet");
905 tokio::time::sleep(Duration::from_secs(5)).await;
906 }
907 _ => {
908 bail!("Invalid status: {status}");
909 }
910 }
911 };
912
913 Ok(bin)
914 }
915
916 pub fn postpone_timeout(&mut self) {
917 let now = Instant::now();
918 let timeout = now + Duration::from_secs(self.config.prompt.history_timeout_min as u64 * 60);
919 self.history_timeout = Some(timeout);
920 }
921
922 pub async fn check_history_timeout(&mut self, ctrl: &Control) {
924 let now = Instant::now();
925
926 if let Some(timeout) = self.history_timeout
927 && now > timeout
928 {
929 self.image_buffer.clear();
930 self.chat_history_mut(ctrl).await.clear();
931 self.history_timeout = None;
932 }
933 }
934
935 pub async fn reply(&self, reply_token: &str, text: &str) -> Result<ReplyResp> {
938 let texts = [text];
939
940 self.reply_multi(reply_token, &texts).await
941 }
942
943 pub async fn reply_multi(&self, reply_token: &str, texts: &[&str]) -> Result<ReplyResp> {
951 let mut messages = Vec::new();
952 for text in texts {
953 ensure!(!text.is_empty(), "text must not be empty");
954 let splitted = split_message(text);
955 messages.extend(splitted.iter().map(|&chunk| SendMessage::Text {
956 text: chunk.to_string(),
957 }));
958 }
959 ensure!(messages.len() <= 5, "text too long: {}", texts.len());
960
961 let req = ReplyReq {
962 reply_token: reply_token.to_string(),
963 messages,
964 notification_disabled: None,
965 };
966 let resp = self.post_auth_json(URL_REPLY, &req).await?;
967 info!("{resp:?}");
968
969 Ok(resp)
970 }
971
972 #[allow(unused)]
976 pub async fn push_message(&self, to: &str, text: &str) -> Result<ReplyResp> {
977 ensure!(!text.is_empty(), "text must not be empty");
978
979 let messages: Vec<_> = split_message(text)
980 .iter()
981 .map(|&chunk| SendMessage::Text {
982 text: chunk.to_string(),
983 })
984 .collect();
985 ensure!(messages.len() <= 5, "text too long: {}", text.len());
986
987 let req = PushReq {
988 to: to.to_string(),
989 messages,
990 notification_disabled: None,
991 custom_aggregation_units: None,
992 };
993 let resp = self.post_auth_json(URL_PUSH, &req).await?;
994 info!("{resp:?}");
995
996 Ok(resp)
997 }
998
999 pub async fn push_image_message(
1001 &self,
1002 to: &str,
1003 url_original: &str,
1004 url_preview: &str,
1005 ) -> Result<ReplyResp> {
1006 let messages = vec![SendMessage::Image {
1007 original_content_url: url_original.to_string(),
1008 preview_image_url: url_preview.to_string(),
1009 }];
1010
1011 let req = PushReq {
1012 to: to.to_string(),
1013 messages,
1014 notification_disabled: None,
1015 custom_aggregation_units: None,
1016 };
1017 let resp = self.post_auth_json(URL_PUSH, &req).await?;
1018 info!("{resp:?}");
1019
1020 Ok(resp)
1021 }
1022
1023 async fn check_resp_json<'a, T>(resp: reqwest::Response) -> Result<T>
1033 where
1034 T: for<'de> Deserialize<'de>,
1035 {
1036 let status = resp.status();
1038 let body = resp.text().await?;
1039
1040 if status.is_success() {
1041 Ok(serde_json::from_reader::<_, T>(body.as_bytes())?)
1042 } else {
1043 match serde_json::from_str::<ErrorResp>(&body) {
1044 Ok(obj) => bail!("{status}: {:?}", obj),
1045 Err(json_err) => bail!("{status} - {json_err}: {body}"),
1046 }
1047 }
1048 }
1049
1050 async fn get_auth_json<'a, T>(&self, url: &str) -> Result<T>
1051 where
1052 T: for<'de> Deserialize<'de>,
1053 {
1054 info!("[line] GET {url}");
1055 let token = &self.config.token;
1056 let resp = netutil::send_with_retry(|| {
1057 self.client
1058 .get(url)
1059 .header("Authorization", format!("Bearer {token}"))
1060 })
1061 .await?;
1062
1063 Self::check_resp_json(resp).await
1064 }
1065
1066 async fn post_auth_json<T, R>(&self, url: &str, body: &T) -> Result<R>
1067 where
1068 T: Serialize + Debug,
1069 R: for<'de> Deserialize<'de>,
1070 {
1071 info!("[line] POST {url} {body:?}");
1072 let token = &self.config.token;
1073 let resp = netutil::send_with_retry(|| {
1074 self.client
1075 .post(url)
1076 .header("Authorization", format!("Bearer {token}"))
1077 .json(body)
1078 })
1079 .await?;
1080
1081 Self::check_resp_json(resp).await
1082 }
1083
1084 async fn get_auth_bin(&self, url: &str) -> Result<(StatusCode, Vec<u8>)> {
1085 info!("[line] GET {url}");
1086 let token = &self.config.token;
1087
1088 let resp = netutil::send_with_retry(|| {
1089 self.client
1090 .get(url)
1091 .header("Authorization", format!("Bearer {token}"))
1092 })
1093 .await?;
1094
1095 let status = resp.status();
1096 if status.is_success() {
1097 let body = resp.bytes().await?.to_vec();
1098
1099 Ok((status, body))
1100 } else {
1101 let body = resp.text().await?;
1102
1103 match serde_json::from_str::<ErrorResp>(&body) {
1104 Ok(obj) => bail!("{status}: {:?}", obj),
1105 Err(json_err) => bail!("{status} - {json_err}: {body}"),
1106 }
1107 }
1108 }
1109}
1110
1111fn split_message(text: &str) -> Vec<&str> {
1112 let mut result = Vec::new();
1114 let mut s = 0;
1116 let mut len = 0;
1118 for (ind, c) in text.char_indices() {
1119 let clen = c.len_utf16();
1121 if len + clen > MSG_SPLIT_LEN {
1123 result.push(&text[s..ind]);
1124 s = ind;
1125 len = 0;
1126 }
1127 len += clen;
1128 }
1129 if len > 0 {
1130 result.push(&text[s..]);
1131 }
1132
1133 result
1134}
1135
1136fn register_draw_picture(func_table: &mut FunctionTable<FunctionContext>) {
1138 let mut properties = HashMap::new();
1139 properties.insert(
1140 "keywords".to_string(),
1141 ParameterElement {
1142 type_: vec![ParameterType::String],
1143 description: Some("Keywords for drawing. They must be in English.".to_string()),
1144 ..Default::default()
1145 },
1146 );
1147 func_table.register_function(
1148 Function {
1149 name: "draw".to_string(),
1150 description: Some("Draw a picture".to_string()),
1151 parameters: Parameters {
1152 properties,
1153 required: vec!["keywords".to_string()],
1154 ..Default::default()
1155 },
1156 ..Default::default()
1157 },
1158 move |bctx, ctx, args| Box::pin(draw_picture(bctx, ctx, args)),
1159 );
1160}
1161
1162async fn draw_picture(
1163 bctx: Arc<BasicContext>,
1164 ctx: FunctionContext,
1165 args: &FuncArgs,
1166) -> Result<String> {
1167 let keywords = function::get_arg_str(args, "keywords")?.to_string();
1168
1169 let ctrl = bctx.ctrl.clone();
1170 taskserver::spawn_oneshot_fn(&ctrl, "line_draw_picture", async move {
1171 let url = {
1172 let mut ai = bctx.ctrl.sysmods().openai.lock().await;
1173
1174 ai.generate_image(&keywords, 1)
1175 .await?
1176 .pop()
1177 .ok_or_else(|| anyhow!("parse error"))?
1178 };
1179 {
1180 let line = bctx.ctrl.sysmods().line.lock().await;
1181 line.push_image_message(&ctx.reply_to, &url, &url).await?;
1182 }
1183 Ok(())
1184 });
1185
1186 Ok("Accepted. The result will be automatially posted later. Assistant should not draw for now.".to_string())
1187}
1188
1189fn register_camera(func_table: &mut FunctionTable<FunctionContext>) {
1191 func_table.register_function(
1192 Function {
1193 name: "camera".to_string(),
1194 description: Some("Take a picture".to_string()),
1195 parameters: Parameters {
1196 properties: Default::default(),
1197 required: Default::default(),
1198 ..Default::default()
1199 },
1200 ..Default::default()
1201 },
1202 move |bctx, ctx, args| Box::pin(camera(bctx, ctx, args)),
1203 );
1204}
1205
1206async fn camera(bctx: Arc<BasicContext>, ctx: FunctionContext, _args: &FuncArgs) -> Result<String> {
1207 use crate::sysmod::camera;
1208
1209 anyhow::ensure!(ctx.privileged, "Permission denied");
1210
1211 let mut w = camera::PIC_DEF_W;
1212 let mut h = camera::PIC_DEF_H;
1213 info!("[line] take a picture: {}x{}", w, h);
1214 let mut orig = camera::take_a_pic(camera::TakePicOption::new().width(w).height(h)).await?;
1215 info!("[line] take a picture OK, size={}", orig.len());
1216 while orig.len() > IMAGE_ORIGINAL_SIZE_MAX {
1218 w /= 2;
1219 h /= 2;
1220 orig = camera::resize(&orig, w, h)?;
1221 info!("[line] resize, size={}", orig.len());
1222 }
1223 let mut preview = orig.clone();
1224 while preview.len() > IMAGE_PREVIEW_SIZE_MAX {
1225 w /= 2;
1226 h /= 2;
1227 preview = camera::resize(&preview, w, h)?;
1228 }
1229 let (url_original, url_preview) = {
1230 let mut http = bctx.ctrl.sysmods().http.lock().await;
1231
1232 (
1233 http.export_tmp_data(ContentType::jpeg(), orig)?,
1234 http.export_tmp_data(ContentType::jpeg(), preview)?,
1235 )
1236 };
1237
1238 let ctrl = bctx.ctrl.clone();
1239 taskserver::spawn_oneshot_fn(&ctrl, "line_camera_send", async move {
1240 {
1241 let line = bctx.ctrl.sysmods().line.lock().await;
1242
1243 line.push_image_message(&ctx.reply_to, &url_original, &url_preview)
1244 .await?;
1245 }
1246 Ok(())
1247 });
1248
1249 Ok("OK. Now the users can see the picture.".to_string())
1250}
1251
1252#[cfg(test)]
1253mod tests {
1254 use super::*;
1255
1256 #[test]
1257 fn split_long_message() {
1258 let mut src = String::new();
1259 assert!(split_message(&src).is_empty());
1260
1261 for i in 0..MSG_SPLIT_LEN {
1262 let a = 'A' as u32;
1263 src.push(char::from_u32(a + (i as u32 % 26)).unwrap());
1264 }
1265 let res = split_message(&src);
1266 assert_eq!(1, res.len());
1267 assert_eq!(src, res[0]);
1268
1269 src.push('0');
1270 let res = split_message(&src);
1271 assert_eq!(2, res.len());
1272 assert_eq!(&src[..MSG_SPLIT_LEN], res[0]);
1273 assert_eq!(&src[MSG_SPLIT_LEN..], res[1]);
1274
1275 src.pop();
1276 src.pop();
1277 src.push('😀');
1278 let res = split_message(&src);
1279 assert_eq!(2, res.len());
1280 assert_eq!(&src[..MSG_SPLIT_LEN - 1], res[0]);
1281 assert_eq!(&src[MSG_SPLIT_LEN - 1..], res[1]);
1282 }
1283
1284 #[test]
1285 fn parse_text_message() {
1286 let src = r#"
1288{
1289 "destination": "xxxxxxxxxx",
1290 "events": [
1291 {
1292 "replyToken": "nHuyWiB7yP5Zw52FIkcQobQuGDXCTA",
1293 "type": "message",
1294 "mode": "active",
1295 "timestamp": 1462629479859,
1296 "source": {
1297 "type": "group",
1298 "groupId": "Ca56f94637c...",
1299 "userId": "U4af4980629..."
1300 },
1301 "webhookEventId": "01FZ74A0TDDPYRVKNK77XKC3ZR",
1302 "deliveryContext": {
1303 "isRedelivery": false
1304 },
1305 "message": {
1306 "id": "444573844083572737",
1307 "type": "text",
1308 "quoteToken": "q3Plxr4AgKd...",
1309 "text": "@All @example Good Morning!! (love)",
1310 "emojis": [
1311 {
1312 "index": 29,
1313 "length": 6,
1314 "productId": "5ac1bfd5040ab15980c9b435",
1315 "emojiId": "001"
1316 }
1317 ],
1318 "mention": {
1319 "mentionees": [
1320 {
1321 "index": 0,
1322 "length": 4,
1323 "type": "all"
1324 },
1325 {
1326 "index": 5,
1327 "length": 8,
1328 "userId": "U49585cd0d5...",
1329 "type": "user",
1330 "isSelf": false
1331 }
1332 ]
1333 }
1334 }
1335 }
1336 ]
1337}"#;
1338 let _: WebHookRequest = serde_json::from_str(src).unwrap();
1339 }
1340
1341 #[test]
1342 fn parse_image_message() {
1343 let src1 = r#"
1345{
1346 "destination": "xxxxxxxxxx",
1347 "events": [
1348 {
1349 "type": "message",
1350 "message": {
1351 "type": "image",
1352 "id": "354718705033693859",
1353 "quoteToken": "q3Plxr4AgKd...",
1354 "contentProvider": {
1355 "type": "line"
1356 },
1357 "imageSet": {
1358 "id": "E005D41A7288F41B65593ED38FF6E9834B046AB36A37921A56BC236F13A91855",
1359 "index": 1,
1360 "total": 2
1361 }
1362 },
1363 "timestamp": 1627356924513,
1364 "source": {
1365 "type": "user",
1366 "userId": "U4af4980629..."
1367 },
1368 "webhookEventId": "01FZ74A0TDDPYRVKNK77XKC3ZR",
1369 "deliveryContext": {
1370 "isRedelivery": false
1371 },
1372 "replyToken": "7840b71058e24a5d91f9b5726c7512c9",
1373 "mode": "active"
1374 }
1375 ]
1376}"#;
1377 let src2 = r#"
1378{
1379 "destination": "xxxxxxxxxx",
1380 "events": [
1381 {
1382 "type": "message",
1383 "message": {
1384 "type": "image",
1385 "id": "354718705033693861",
1386 "quoteToken": "yHAz4Ua2wx7...",
1387 "contentProvider": {
1388 "type": "line"
1389 },
1390 "imageSet": {
1391 "id": "E005D41A7288F41B65593ED38FF6E9834B046AB36A37921A56BC236F13A91855",
1392 "index": 2,
1393 "total": 2
1394 }
1395 },
1396 "timestamp": 1627356924722,
1397 "source": {
1398 "type": "user",
1399 "userId": "U4af4980629..."
1400 },
1401 "webhookEventId": "01FZ74A0TDDPYRVKNK77XKC3ZR",
1402 "deliveryContext": {
1403 "isRedelivery": false
1404 },
1405 "replyToken": "fbf94e269485410da6b7e3a5e33283e8",
1406 "mode": "active"
1407 }
1408 ]
1409}"#;
1410 let _: WebHookRequest = serde_json::from_str(src1).unwrap();
1411 let _: WebHookRequest = serde_json::from_str(src2).unwrap();
1412 }
1413
1414 #[test]
1415 fn base64_decode() {
1416 let line_signature = "A+JCmhu7Tg6f4lwANmLGirCS2rY8kHBmSG18ctUtvjQ=";
1417 let res = verify_signature(line_signature, "1234567890", "test");
1418 assert!(res.is_err());
1419 assert!(res.unwrap_err().to_string().contains("MAC"));
1421 }
1422}