sys/
taskserver.rs

1//! 非同期タスクを管理する。
2
3use crate::sysmod::SystemModules;
4use anyhow::Result;
5use chrono::prelude::*;
6use log::{error, info, trace};
7use std::future::Future;
8use std::sync::Arc;
9use tokio::select;
10use tokio::signal::unix::{SignalKind, signal};
11use tokio::sync::watch;
12
13/// システムシャットダウン開始通知送信側 (単数)
14type CancelTx = watch::Sender<bool>;
15/// システムシャットダウン開始通知受信側 (複数)
16type CancelRx = watch::Receiver<bool>;
17
18/// [TaskServer] と各非同期タスク間で共有されるデータ。
19///
20/// インスタンスは1つだけ生成され、[Arc] により共有される。
21pub struct Controller {
22    /// Tokio ランタイム。
23    pub rt: tokio::runtime::Runtime,
24    /// 全システムモジュールのリスト。
25    sysmods: SystemModules,
26    /// システムシャットダウン時、true が設定送信される。
27    ///
28    /// また、シャットダウンシーケンスにおいて、全タスクの完了待ちのためにも使う。
29    /// サーバ側は clone されたこれがすべて drop されるまで待機する。
30    cancel_rx: std::sync::Mutex<Option<CancelRx>>,
31}
32
33pub type Control = Arc<Controller>;
34
35/// [TaskServer::run] の返す実行終了種別。
36pub enum RunResult {
37    Shutdown,
38    Reboot,
39}
40
41/// タスクサーバ本体。
42pub struct TaskServer {
43    /// 各タスクに clone して渡すオリジナルの [Control]。
44    ctrl: Control,
45    /// システムシャットダウン時の中断リクエストの送信側。
46    /// <https://tokio.rs/tokio/topics/shutdown>
47    /// "Telling things to shut down" + "Waiting for things to finish shutting down"
48    cancel_tx: CancelTx,
49}
50
51impl Controller {
52    /// [crate::sysmod::SystemModule] リストへの参照を取得する。
53    pub fn sysmods(&self) -> &SystemModules {
54        &self.sysmods
55    }
56
57    /// キャンセル通知を待つ。
58    pub async fn wait_cancel_rx(&self) {
59        // mutex をロックして Receiver を取得し、その clone を作る
60        let org = self.cancel_rx.lock().unwrap().as_mut().map(|rx| rx.clone());
61        if let Some(mut rx) = org {
62            // clone した Receiver 上で待つ
63            rx.changed().await.unwrap();
64        } else {
65            // 既に drop されていた場合はすぐに返る
66        }
67    }
68}
69
70/// 1回限りのタスクを生成して実行開始する。
71///
72/// F: [Control] を引数に、T を返す関数。
73/// T: Future<Output = anyhow::Result<()> かつスレッド間移動可能。
74///
75/// つまり、F は [Control] を引数に、anyhow::Result<()> を返す async function。
76pub fn spawn_oneshot_task<F, T>(ctrl: &Control, name: &str, f: F)
77where
78    F: Fn(Control) -> T + Send + Sync + 'static,
79    T: Future<Output = Result<()>> + Send,
80{
81    // move するデータを準備する
82    let name = name.to_string();
83    let ctrl_move = Arc::clone(ctrl);
84
85    ctrl.rt.spawn(async move {
86        info!("[{name}] start (one-shot)");
87
88        // ctrl を clone して future へ move する
89        let future = f(Arc::clone(&ctrl_move));
90        let result = future.await;
91        // drop clone of ctrl
92
93        if let Err(e) = result {
94            error!("[{name}] finish (error): {e:?}");
95        } else {
96            info!("[{name}] finish (success)");
97        }
98        // drop ctrl
99    });
100}
101
102pub fn spawn_oneshot_fn<F>(ctrl: &Control, name: &str, f: F)
103where
104    F: Future<Output = Result<()>> + Send + 'static,
105{
106    // move するデータを準備する
107    let name = name.to_string();
108
109    ctrl.rt.spawn(async move {
110        info!("[{name}] start (one-shot)");
111
112        let result = f.await;
113
114        if let Err(e) = result {
115            error!("[{name}] finish (error): {e:?}");
116        } else {
117            info!("[{name}] finish (success)");
118        }
119        // drop ctrl
120    });
121}
122
123/// 周期タスクを生成する。
124///
125/// wakeup_list: 起動時刻。以下を満たさないと panic する。
126/// * second 以下が 0 である。(分単位)
127/// * 昇順ソート済みである。
128///
129/// F: [Control] を引数に、T を返す関数。
130/// T: Future<Output = anyhow::Result<()> かつスレッド間移動可能。
131///
132/// つまり、F は [Control] を引数に、anyhow::Result<()> を返す async function。
133pub fn spawn_periodic_task<F, T>(ctrl: &Control, name: &str, wakeup_list: &[NaiveTime], f: F)
134where
135    F: Fn(Control) -> T + Send + Sync + 'static,
136    T: Future<Output = Result<()>> + Send + Sync + 'static,
137{
138    // move するデータを準備する
139    let name = name.to_string();
140    let ctrl_move = Arc::clone(ctrl);
141
142    // 空でなくソート済み、秒以下がゼロなのを確認後
143    // 今日のその時刻からなる NaiveDateTime に変換する
144    assert!(!wakeup_list.is_empty(), "time list is empty");
145    assert!(wakeup_list.is_sorted(), "time list is not sorted");
146    let today = Local::now().date_naive();
147    let mut dt_list: Vec<_> = wakeup_list
148        .iter()
149        .map(|time| {
150            assert_eq!(time.second(), 0);
151            assert_eq!(time.nanosecond(), 0);
152            today.and_time(*time)
153        })
154        .collect();
155
156    // wakeup time list を最初の LOG_LIMIT 個までログに出力する
157    const LOG_LIMIT: usize = 5;
158    let log_iter = wakeup_list.iter().take(LOG_LIMIT);
159    let mut str = log_iter.enumerate().fold(String::new(), |sum, (i, v)| {
160        let str = if i == 0 {
161            format!("{v}")
162        } else {
163            format!(", {v}")
164        };
165        sum + &str
166    });
167    if wakeup_list.len() > LOG_LIMIT {
168        str += &format!(", ... ({} items)", wakeup_list.len());
169    }
170    info!("[{name}] registered as a periodic task");
171    info!("[{name}] wakeup time: {str}");
172
173    // spawn async task
174    ctrl.rt.spawn(async move {
175        type CDuration = chrono::Duration;
176        type TDuration = tokio::time::Duration;
177
178        loop {
179            // 現在時刻を取得して分までに切り捨てる
180            let now = Local::now().naive_local();
181            let now_hmd = now.date().and_hms_opt(now.hour(), now.minute(), 0).unwrap();
182            let next_min = now_hmd + CDuration::try_minutes(1).unwrap();
183            trace!("[{name}] periodic task check: {now_hmd}");
184
185            // 起動時刻リスト内で二分探索
186            match dt_list.binary_search(&now_hmd) {
187                Ok(_ind) => {
188                    // 一致するものを発見したので続行
189                    trace!("[{name}] hit in time list: {now_hmd}");
190                }
191                Err(ind) => {
192                    // ind = insertion point
193                    trace!("[{name}] not found in time list: {now_hmd}");
194                    // 起きるべき時刻は dt_list[ind]
195                    if ind < dt_list.len() {
196                        let target_dt = dt_list[ind] + CDuration::try_seconds(1).unwrap();
197                        let sleep_duration = target_dt - Local::now().naive_local();
198                        let sleep_sec = sleep_duration.num_seconds().clamp(0, i64::MAX) as u64;
199                        trace!("[{name}] target: {target_dt}, sleep_sec: {sleep_sec}");
200                        select! {
201                            _ = tokio::time::sleep(TDuration::from_secs(sleep_sec)) => {}
202                            _ = ctrl_move.wait_cancel_rx() => {
203                                info!("[{name}] cancel periodic task");
204                                return;
205                            }
206                        }
207
208                        trace!("[{name}] wake up");
209                    } else {
210                        // 一番後ろよりも現在時刻が後
211                        // 起動時刻リストをすべて1日ずつ後ろにずらす
212                        for dt in dt_list.iter_mut() {
213                            let tomorrow = dt.date() + CDuration::try_days(1).unwrap();
214                            let time = dt.time();
215                            *dt = tomorrow.and_time(time);
216                            trace!("[{name}] advance time list by 1 day");
217                        }
218                    }
219                    continue;
220                }
221            }
222
223            // ctrl を clone して future 内に move する
224            let future = f(ctrl_move.clone());
225            info!("[{name}] start (periodic)");
226            let result = future.await;
227            // drop clone of ctrl
228            if let Err(e) = result {
229                error!("[{name}] finish (error): {e:?}");
230            } else {
231                info!("[{name}] finish (success)");
232            }
233
234            // 次の "分" を狙って sleep する
235            // 目標は安全のため hh:mm:05 を狙う
236            let target_dt = next_min + CDuration::try_seconds(5).unwrap();
237            // タスクの実行に1分以上かかると負になるが、
238            // chrono::Duration は負数を許している
239            // その場合は 0 に補正する
240            let sleep_duration = target_dt - Local::now().naive_local();
241            let sleep_sec = sleep_duration.num_seconds().clamp(0, i64::MAX) as u64;
242            trace!("[{name}] target: {target_dt}, sleep_sec: {sleep_sec}");
243            select! {
244                _ = tokio::time::sleep(tokio::time::Duration::from_secs(sleep_sec)) => {}
245                _ = ctrl_move.wait_cancel_rx() => {
246                    info!("[{name}] cancel periodic task");
247                    return;
248                }
249            }
250            trace!("[{name}] wake up");
251        }
252        // drop ctrl
253    });
254}
255
256impl TaskServer {
257    /// タスクサーバを生成して初期化する。
258    pub fn new(sysmods: SystemModules) -> Self {
259        let rt = tokio::runtime::Builder::new_multi_thread()
260            .enable_all()
261            .build()
262            .unwrap();
263
264        // tx は self へ move
265        // rx は root Control へ move
266        let (cancel_tx, cancel_rx) = watch::channel(false);
267
268        let internal = Controller {
269            rt,
270            sysmods,
271            cancel_rx: std::sync::Mutex::new(Some(cancel_rx)),
272        };
273        let ctrl = Arc::new(internal);
274
275        TaskServer { ctrl, cancel_tx }
276    }
277
278    /// [spawn_oneshot_task] を内蔵の [Self::ctrl] を使って呼び出す。
279    pub fn spawn_oneshot_task<F, T>(&self, name: &str, f: F)
280    where
281        F: Fn(Control) -> T + Send + Sync + 'static,
282        T: Future<Output = Result<()>> + Send,
283    {
284        spawn_oneshot_task(&self.ctrl, name, f);
285    }
286
287    /// 実行を開始し、完了するまでブロックする。
288    ///
289    /// self の所有権を consume するため、一度しか実行できない。
290    ///
291    /// F: シグナルハンドラ。実行を終了するならば Some, 続行するならば None を返す。
292    pub fn run<F1, F2>(self, usr1_handler: F1, usr2_handler: F2) -> RunResult
293    where
294        F1: Fn() -> Option<RunResult>,
295        F2: Fn() -> Option<RunResult>,
296    {
297        let ctrl = Arc::clone(&self.ctrl);
298        self.ctrl.rt.block_on(async move {
299            // SystemModule 全体に on_start イベントを配送
300            ctrl.sysmods.on_start(&ctrl).await;
301
302            // この async block をシグナル処理に使う
303            let mut sigint = signal(SignalKind::interrupt()).unwrap();
304            let mut sigterm = signal(SignalKind::terminate()).unwrap();
305            let mut sighup = signal(SignalKind::hangup()).unwrap();
306            let mut sigusr1 = signal(SignalKind::user_defined1()).unwrap();
307            let mut sigusr2 = signal(SignalKind::user_defined2()).unwrap();
308
309            let run_result = loop {
310                tokio::select! {
311                    _ = sigint.recv() => {
312                        info!("[signal] SIGINT");
313                         break RunResult::Shutdown
314                    },
315                    _ = sigterm.recv() => {
316                        info!("[signal] SIGTERM");
317                        break RunResult::Shutdown
318                    },
319                    _ = sighup.recv() => {
320                        info!("[signal] SIGHUP");
321                        break RunResult::Reboot
322                    },
323                    _ = sigusr1.recv() => {
324                        info!("[signal] SIGUSR1");
325                        if let Some(result) = usr1_handler() {
326                            break result
327                        }
328                    }
329                    _ = sigusr2.recv() => {
330                        info!("[signal] SIGUSR2");
331                        if let Some(result) = usr2_handler() {
332                            break result
333                        }
334                    }
335                }
336            };
337
338            // 値を true に設定して全タスクにキャンセルリクエストを通知する
339            self.cancel_tx.send_replace(true);
340            // 全タスク完了待ち
341            // オリジナルの cancel_rx を self.ctrl から奪って drop しておく
342            drop(ctrl.cancel_rx.lock().unwrap().take());
343            // 全 cancel_rx が drop されるまで待つ
344            info!("waiting for all tasks to be completed....");
345            self.cancel_tx.closed().await;
346            info!("OK: all tasks are completed");
347
348            run_result
349        })
350        // drop self (self.ctrl)
351    }
352}