1use super::{ActixError, WebResult};
6use crate::sysmod::line::{FunctionContext, Line};
7use crate::sysmod::openai::{OpenAi, OpenAiErrorKind, Role, SearchContextSize, Tool, UserLocation};
8use crate::taskserver::Control;
9
10use actix_web::{HttpRequest, HttpResponse, Responder, http::header::ContentType, web};
11use anyhow::{Context, Result, bail, ensure};
12use base64::{Engine, engine::general_purpose};
13use log::{error, info};
14use serde::{Deserialize, Serialize};
15use std::time::Duration;
16use utils::netutil;
17
18#[derive(Debug, Serialize, Deserialize)]
19struct WebHookRequest {
20 destination: String,
21 events: Vec<WebhookEvent>,
23}
24
25#[derive(Debug, Serialize, Deserialize)]
26struct WebhookEvent {
27 #[serde(flatten)]
28 common: WebhookEventCommon,
29 #[serde(flatten)]
30 body: WebhookEventBody,
31}
32
33#[derive(Debug, Serialize, Deserialize)]
34#[serde(rename_all = "camelCase")]
35struct WebhookEventCommon {
36 mode: String,
38 timestamp: u64,
39 source: Option<Source>,
40 webhook_event_id: String,
41 delivery_context: DeliveryContext,
42}
43
44#[derive(Debug, Serialize, Deserialize)]
45#[serde(
46 tag = "type",
47 rename_all = "camelCase",
48 rename_all_fields = "camelCase"
49)]
50enum Source {
51 User {
52 user_id: String,
53 },
54 Group {
55 group_id: String,
56 user_id: Option<String>,
57 },
58 Room {
59 room_id: String,
60 user_id: Option<String>,
61 },
62}
63
64#[derive(Debug, Serialize, Deserialize)]
65#[serde(rename_all = "camelCase")]
66struct DeliveryContext {
67 is_redelivery: bool,
68}
69
70#[derive(Debug, Serialize, Deserialize)]
71#[serde(
72 tag = "type",
73 rename_all = "camelCase",
74 rename_all_fields = "camelCase"
75)]
76enum WebhookEventBody {
77 Message {
78 reply_token: String,
79 message: Message,
80 },
81 Unsend {
82 message_id: String,
83 },
84 Follow {
85 reply_token: String,
86 },
87 Unfollow,
88 Join {
89 reply_token: String,
90 },
91 Leave,
92 MemberJoined {
93 joined: Members,
94 reply_token: String,
95 },
96 MemberLeft {
97 left: Members,
98 reply_token: String,
99 },
100 #[serde(other)]
101 Other,
102}
103
104#[derive(Debug, Serialize, Deserialize)]
105#[serde(tag = "type", rename_all = "camelCase")]
106struct Members {
107 members: Vec<Source>,
109}
110
111#[derive(Debug, Serialize, Deserialize)]
112#[serde(
113 tag = "type",
114 rename_all = "camelCase",
115 rename_all_fields = "camelCase"
116)]
117enum Message {
118 Text {
120 id: String,
122 quote_token: String,
126 text: String,
128 mention: Option<Mention>,
133 quoted_message_id: Option<String>,
136 },
137 Image {
139 id: String,
141 quote_token: String,
145 content_provider: ContentProvider,
147 },
148 #[serde(other)]
149 Other,
150}
151
152#[derive(Debug, Serialize, Deserialize)]
153#[serde(tag = "type", rename_all = "camelCase")]
154struct Mention {
155 mentionees: Vec<Mentionee>,
156}
157
158#[derive(Debug, Serialize, Deserialize)]
159#[serde(tag = "type", rename_all = "camelCase")]
160struct Mentionee {
161 index: usize,
162 length: usize,
163 #[serde(flatten)]
164 target: MentioneeTarget,
165 quoted_message_id: Option<String>,
166}
167
168#[derive(Debug, Serialize, Deserialize)]
169#[serde(
170 tag = "type",
171 rename_all = "camelCase",
172 rename_all_fields = "camelCase"
173)]
174enum MentioneeTarget {
175 User { user_id: Option<String> },
176 All,
177}
178
179#[derive(Debug, Serialize, Deserialize)]
181#[serde(
182 tag = "type",
183 rename_all = "camelCase",
184 rename_all_fields = "camelCase"
185)]
186enum ContentProvider {
187 Line,
191 External {
197 original_content_url: String,
202 preview_image_url: String,
207 },
209}
210
211#[actix_web::get("/line/")]
212async fn index_get() -> impl Responder {
213 let body = r#"<!DOCTYPE html>
214<html lang="en">
215 <head>
216 <title>LINE Webhook</title>
217 </head>
218 <body>
219 <h1>LINE Webhook</h1>
220 <p>Your request is GET.</p>
221 </body>
222</html>
223"#;
224
225 HttpResponse::Ok()
226 .content_type(ContentType::html())
227 .body(body)
228}
229
230#[actix_web::post("/line/")]
231async fn index_post(req: HttpRequest, body: String, ctrl: web::Data<Control>) -> WebResult {
232 info!("LINE github webhook");
233
234 let headers = req.headers();
235 let signature = headers.get("x-line-signature");
236 if signature.is_none() {
237 return Err(ActixError::new("x-line-signature required", 400));
238 }
239 let signature = signature.unwrap().to_str();
240 if signature.is_err() {
241 return Err(ActixError::new("Bad x-line-signature header", 400));
242 }
243 let signature = signature.unwrap();
244 info!("x-line-signature: {signature}");
245
246 let channel_secret = {
248 let line = ctrl.sysmods().line.lock().await;
249 line.config.channel_secret.clone()
250 };
251 if let Err(err) = verify_signature(signature, &channel_secret, &body) {
252 return Err(ActixError::new(&err.to_string(), 401));
253 }
254
255 if let Err(e) = process_post(&ctrl, &body).await {
256 Err(ActixError::new(&e.to_string(), 400))
257 } else {
258 Ok(HttpResponse::Ok()
259 .content_type(ContentType::plaintext())
260 .body(""))
261 }
262}
263
264fn verify_signature(signature: &str, channel_secret: &str, body: &str) -> Result<()> {
266 let key = channel_secret.as_bytes();
267 let data = body.as_bytes();
268 let expected = general_purpose::STANDARD.decode(signature)?;
269
270 netutil::hmac_sha256_verify(key, data, &expected)
271}
272
273async fn process_post(ctrl: &Control, json_body: &str) -> Result<()> {
275 let req = serde_json::from_str::<WebHookRequest>(json_body).inspect_err(|err| {
277 error!("[line] Json parse error: {err}");
278 error!("[line] {json_body}");
279 })?;
280 info!("{req:?}");
281
282 for ev in req.events.iter() {
284 if ev.common.mode != "active" {
286 info!(
287 "[line] Ignore event because mode is not active: {}",
288 ev.common.mode
289 );
290 continue;
291 }
292
293 match &ev.body {
295 WebhookEventBody::Message {
296 reply_token,
297 message,
298 } => match message {
299 Message::Text {
300 id: _,
301 quote_token: _,
302 text,
303 mention: _,
304 quoted_message_id: _,
305 } => {
306 info!("[line] Receive text message: {text}");
307 on_text_message(ctrl, &ev.common.source, reply_token, text).await?;
308 }
309 Message::Image {
310 id,
311 quote_token: _,
312 content_provider,
313 } => {
314 info!("[line] Receive image message");
315 on_image_message(ctrl, &ev.common.source, id, reply_token, content_provider)
316 .await?;
317 }
318 other => {
319 info!("[line] Ignore message type: {other:?}");
320 }
321 },
322 other => {
323 info!("[line] Ignore event: {other:?}");
324 }
325 }
326 }
327
328 Ok(())
329}
330
331async fn source_to_display_name(line: &Line, src: &Source) -> Result<String> {
332 let display_name = match src {
333 Source::User { user_id } => {
334 if let Some(name) = line.config.id_name_map.get(user_id) {
335 name.to_string()
336 } else {
337 line.get_profile(user_id).await?.display_name
338 }
339 }
340 Source::Group { group_id, user_id } => {
341 if let Some(user_id) = user_id {
342 if let Some(name) = line.config.id_name_map.get(user_id) {
343 name.to_string()
344 } else {
345 line.get_group_profile(group_id, user_id)
346 .await?
347 .display_name
348 }
349 } else {
350 bail!("userId is null");
351 }
352 }
353 Source::Room {
354 room_id: _,
355 user_id: _,
356 } => bail!("Source::Room is not supported"),
357 };
358
359 Ok(display_name)
360}
361
362async fn on_text_message(
363 ctrl: &Control,
364 src: &Option<Source>,
365 reply_token: &str,
366 text: &str,
367) -> Result<()> {
368 ensure!(src.is_some(), "Field 'source' is required");
369 let src = src.as_ref().unwrap();
370
371 let prompt = {
372 let mut line = ctrl.sysmods().line.lock().await;
373 let prompt = line.config.prompt.clone();
374
375 let display_name = source_to_display_name(&line, src).await?;
377
378 line.check_history_timeout(ctrl).await;
380
381 let imgs = line.image_buffer.remove(&display_name).unwrap_or_default();
383
384 let sysmsg = prompt.each.join("").replace("${user}", &display_name);
386 line.chat_history_mut(ctrl)
387 .await
388 .push_input_message(Role::Developer, &sysmsg)?;
389 line.chat_history_mut(ctrl)
390 .await
391 .push_input_and_images(Role::User, text, imgs)?;
392
393 prompt
394 };
395
396 let inst = prompt.instructions.join("");
398 let mut tools = vec![];
400 tools.push(Tool::WebSearchPreview {
402 search_context_size: Some(SearchContextSize::Medium),
403 user_location: Some(UserLocation::default()),
404 });
405 {
407 let mut line = ctrl.sysmods().line.lock().await;
408 for f in line.func_table(ctrl).await.function_list() {
409 tools.push(Tool::Function(f.clone()));
410 }
411 }
412
413 let reply_to = match src {
415 Source::User { user_id } => user_id,
416 Source::Group {
417 group_id,
418 user_id: _,
419 } => group_id,
420 Source::Room {
421 room_id: _,
422 user_id: _,
423 } => bail!("Source::Room is not supported"),
424 };
425
426 let mut func_trace = String::new();
427 let reply_msg = loop {
428 let mut line = ctrl.sysmods().line.lock().await;
429
430 let resp = {
431 let mut ai = ctrl.sysmods().openai.lock().await;
432
433 let input = Vec::from_iter(line.chat_history(ctrl).await.iter().cloned());
435 ai.chat_with_tools(Some(&inst), input, &tools).await
437 };
438
439 match resp {
440 Ok(resp) => {
441 for fc in resp.func_call_iter() {
443 let call_id = &fc.call_id;
444 let func_name = &fc.name;
445 let func_args = &fc.arguments;
446
447 let ctx = FunctionContext {
449 reply_to: reply_to.clone(),
450 };
451 let func_out = line
453 .func_table(ctrl)
454 .await
455 .call(ctx, func_name, func_args)
456 .await;
457 if line.func_table(ctrl).await.debug_mode() {
459 if !func_trace.is_empty() {
460 func_trace.push('\n');
461 }
462 func_trace += &format!(
463 "function call: {func_name}\nparameters: {func_args}\nresult: {func_out}"
464 );
465 }
466 line.chat_history_mut(ctrl)
468 .await
469 .push_function(call_id, func_name, func_args, &func_out)?;
470 }
471 let text = resp.output_text();
473 let text = if text.is_empty() { None } else { Some(text) };
474 line.chat_history_mut(ctrl)
475 .await
476 .push_output_and_tools(text.as_deref(), resp.web_search_iter().cloned())?;
477
478 if let Some(text) = text {
479 break Ok(text);
480 }
481 }
482 Err(err) => {
483 error!("{err:#?}");
485 break Err(err);
486 }
487 }
488 };
489
490 {
492 let mut line = ctrl.sysmods().line.lock().await;
493
494 let mut msgs: Vec<&str> = Vec::new();
495 if !func_trace.is_empty() {
496 msgs.push(&func_trace);
497 }
498 match reply_msg {
499 Ok(reply_msg) => {
500 msgs.push(&reply_msg);
501 for msg in msgs.iter() {
502 info!("[line] openai reply: {msg}");
503 }
504 line.reply_multi(reply_token, &msgs).await?;
505
506 line.postpone_timeout();
508 }
509 Err(err) => {
510 error!("[line] openai error: {err:#?}");
511 let errmsg = match OpenAi::error_kind(&err) {
512 OpenAiErrorKind::Timeout => prompt.timeout_msg,
513 OpenAiErrorKind::RateLimit => prompt.ratelimit_msg,
514 OpenAiErrorKind::QuotaExceeded => prompt.quota_msg,
515 _ => prompt.error_msg,
516 };
517 msgs.push(&errmsg);
518 for msg in msgs.iter() {
519 info!("[line] openai reply: {msg}");
520 }
521 line.reply_multi(reply_token, &msgs).await?;
522 }
523 }
524 }
525
526 Ok(())
527}
528
529async fn on_image_message(
530 ctrl: &Control,
531 src: &Option<Source>,
532 id: &str,
533 _reply_token: &str,
534 content_provider: &ContentProvider,
535) -> Result<()> {
536 ensure!(src.is_some(), "Field 'source' is required");
537 let src = src.as_ref().unwrap();
538
539 let mut line = ctrl.sysmods().line.lock().await;
540
541 let display_name = source_to_display_name(&line, src).await?;
543
544 let bin = match content_provider {
546 ContentProvider::Line => line.get_content(id).await?,
547 ContentProvider::External {
548 original_content_url,
549 preview_image_url: _,
550 } => {
551 const TIMEOUT: Duration = Duration::from_secs(30);
552 let client = reqwest::ClientBuilder::new().timeout(TIMEOUT).build()?;
553 let resp = client
554 .get(original_content_url)
555 .send()
556 .await
557 .context("URL get network error")?;
558
559 netutil::check_http_resp_bin(resp)
560 .await
561 .context("URL get network error")?
562 }
563 };
564 let input_content = OpenAi::to_image_input(&bin)?;
565
566 line.check_history_timeout(ctrl).await;
568 if let Some(v) = line.image_buffer.get_mut(&display_name) {
570 v.push(input_content);
571 } else {
572 line.image_buffer.insert(display_name, vec![input_content]);
573 }
574
575 Ok(())
576}
577
578#[cfg(test)]
579mod tests {
580 use super::*;
581
582 #[test]
583 fn parse_text_message() {
584 let src = r#"
586{
587 "destination": "xxxxxxxxxx",
588 "events": [
589 {
590 "replyToken": "nHuyWiB7yP5Zw52FIkcQobQuGDXCTA",
591 "type": "message",
592 "mode": "active",
593 "timestamp": 1462629479859,
594 "source": {
595 "type": "group",
596 "groupId": "Ca56f94637c...",
597 "userId": "U4af4980629..."
598 },
599 "webhookEventId": "01FZ74A0TDDPYRVKNK77XKC3ZR",
600 "deliveryContext": {
601 "isRedelivery": false
602 },
603 "message": {
604 "id": "444573844083572737",
605 "type": "text",
606 "quoteToken": "q3Plxr4AgKd...",
607 "text": "@All @example Good Morning!! (love)",
608 "emojis": [
609 {
610 "index": 29,
611 "length": 6,
612 "productId": "5ac1bfd5040ab15980c9b435",
613 "emojiId": "001"
614 }
615 ],
616 "mention": {
617 "mentionees": [
618 {
619 "index": 0,
620 "length": 4,
621 "type": "all"
622 },
623 {
624 "index": 5,
625 "length": 8,
626 "userId": "U49585cd0d5...",
627 "type": "user",
628 "isSelf": false
629 }
630 ]
631 }
632 }
633 }
634 ]
635}"#;
636 let _: WebHookRequest = serde_json::from_str(src).unwrap();
637 }
638
639 #[test]
640 fn parse_image_message() {
641 let src1 = r#"
643{
644 "destination": "xxxxxxxxxx",
645 "events": [
646 {
647 "type": "message",
648 "message": {
649 "type": "image",
650 "id": "354718705033693859",
651 "quoteToken": "q3Plxr4AgKd...",
652 "contentProvider": {
653 "type": "line"
654 },
655 "imageSet": {
656 "id": "E005D41A7288F41B65593ED38FF6E9834B046AB36A37921A56BC236F13A91855",
657 "index": 1,
658 "total": 2
659 }
660 },
661 "timestamp": 1627356924513,
662 "source": {
663 "type": "user",
664 "userId": "U4af4980629..."
665 },
666 "webhookEventId": "01FZ74A0TDDPYRVKNK77XKC3ZR",
667 "deliveryContext": {
668 "isRedelivery": false
669 },
670 "replyToken": "7840b71058e24a5d91f9b5726c7512c9",
671 "mode": "active"
672 }
673 ]
674}"#;
675 let src2 = r#"
676{
677 "destination": "xxxxxxxxxx",
678 "events": [
679 {
680 "type": "message",
681 "message": {
682 "type": "image",
683 "id": "354718705033693861",
684 "quoteToken": "yHAz4Ua2wx7...",
685 "contentProvider": {
686 "type": "line"
687 },
688 "imageSet": {
689 "id": "E005D41A7288F41B65593ED38FF6E9834B046AB36A37921A56BC236F13A91855",
690 "index": 2,
691 "total": 2
692 }
693 },
694 "timestamp": 1627356924722,
695 "source": {
696 "type": "user",
697 "userId": "U4af4980629..."
698 },
699 "webhookEventId": "01FZ74A0TDDPYRVKNK77XKC3ZR",
700 "deliveryContext": {
701 "isRedelivery": false
702 },
703 "replyToken": "fbf94e269485410da6b7e3a5e33283e8",
704 "mode": "active"
705 }
706 ]
707}"#;
708 let _: WebHookRequest = serde_json::from_str(src1).unwrap();
709 let _: WebHookRequest = serde_json::from_str(src2).unwrap();
710 }
711
712 #[test]
713 fn base64_decode() {
714 let line_signature = "A+JCmhu7Tg6f4lwANmLGirCS2rY8kHBmSG18ctUtvjQ=";
715 let res = verify_signature(line_signature, "1234567890", "test");
716 assert!(res.is_err());
717 assert!(res.unwrap_err().to_string().contains("MAC"));
719 }
720}