1use crate::sysmod::SystemModules;
4use anyhow::Result;
5use chrono::prelude::*;
6use log::{error, info, trace};
7use std::future::Future;
8use std::sync::Arc;
9use tokio::select;
10use tokio::signal::unix::{SignalKind, signal};
11use tokio::sync::watch;
12
13type CancelTx = watch::Sender<bool>;
15type CancelRx = watch::Receiver<bool>;
17
18pub struct Controller {
22 pub rt: tokio::runtime::Runtime,
24 sysmods: SystemModules,
26 cancel_rx: std::sync::Mutex<Option<CancelRx>>,
31}
32
33pub type Control = Arc<Controller>;
34
35pub enum RunResult {
37 Shutdown,
38 Reboot,
39}
40
41pub struct TaskServer {
43 ctrl: Control,
45 cancel_tx: CancelTx,
49}
50
51impl Controller {
52 pub fn sysmods(&self) -> &SystemModules {
54 &self.sysmods
55 }
56
57 pub async fn wait_cancel_rx(&self) {
59 let org = self.cancel_rx.lock().unwrap().as_mut().map(|rx| rx.clone());
61 if let Some(mut rx) = org {
62 rx.changed().await.unwrap();
64 } else {
65 }
67 }
68}
69
70pub fn spawn_oneshot_task<F, T>(ctrl: &Control, name: &str, f: F)
77where
78 F: Fn(Control) -> T + Send + Sync + 'static,
79 T: Future<Output = Result<()>> + Send,
80{
81 let name = name.to_string();
83 let ctrl_move = Arc::clone(ctrl);
84
85 ctrl.rt.spawn(async move {
86 info!("[{name}] start (one-shot)");
87
88 let future = f(Arc::clone(&ctrl_move));
90 let result = future.await;
91 if let Err(e) = result {
94 error!("[{name}] finish (error): {e:?}");
95 } else {
96 info!("[{name}] finish (success)");
97 }
98 });
100}
101
102pub fn spawn_oneshot_fn<F>(ctrl: &Control, name: &str, f: F)
103where
104 F: Future<Output = Result<()>> + Send + 'static,
105{
106 let name = name.to_string();
108
109 ctrl.rt.spawn(async move {
110 info!("[{name}] start (one-shot)");
111
112 let result = f.await;
113
114 if let Err(e) = result {
115 error!("[{name}] finish (error): {e:?}");
116 } else {
117 info!("[{name}] finish (success)");
118 }
119 });
121}
122
123pub fn spawn_periodic_task<F, T>(ctrl: &Control, name: &str, wakeup_list: &[NaiveTime], f: F)
134where
135 F: Fn(Control) -> T + Send + Sync + 'static,
136 T: Future<Output = Result<()>> + Send + Sync + 'static,
137{
138 let name = name.to_string();
140 let ctrl_move = Arc::clone(ctrl);
141
142 assert!(!wakeup_list.is_empty(), "time list is empty");
145 assert!(wakeup_list.is_sorted(), "time list is not sorted");
146 let today = Local::now().date_naive();
147 let mut dt_list: Vec<_> = wakeup_list
148 .iter()
149 .map(|time| {
150 assert_eq!(time.second(), 0);
151 assert_eq!(time.nanosecond(), 0);
152 today.and_time(*time)
153 })
154 .collect();
155
156 const LOG_LIMIT: usize = 5;
158 let log_iter = wakeup_list.iter().take(LOG_LIMIT);
159 let mut str = log_iter.enumerate().fold(String::new(), |sum, (i, v)| {
160 let str = if i == 0 {
161 format!("{v}")
162 } else {
163 format!(", {v}")
164 };
165 sum + &str
166 });
167 if wakeup_list.len() > LOG_LIMIT {
168 str += &format!(", ... ({} items)", wakeup_list.len());
169 }
170 info!("[{name}] registered as a periodic task");
171 info!("[{name}] wakeup time: {str}");
172
173 ctrl.rt.spawn(async move {
175 type CDuration = chrono::Duration;
176 type TDuration = tokio::time::Duration;
177
178 loop {
179 let now = Local::now().naive_local();
181 let now_hmd = now.date().and_hms_opt(now.hour(), now.minute(), 0).unwrap();
182 let next_min = now_hmd + CDuration::try_minutes(1).unwrap();
183 trace!("[{name}] periodic task check: {now_hmd}");
184
185 match dt_list.binary_search(&now_hmd) {
187 Ok(_ind) => {
188 trace!("[{name}] hit in time list: {now_hmd}");
190 }
191 Err(ind) => {
192 trace!("[{name}] not found in time list: {now_hmd}");
194 if ind < dt_list.len() {
196 let target_dt = dt_list[ind] + CDuration::try_seconds(1).unwrap();
197 let sleep_duration = target_dt - Local::now().naive_local();
198 let sleep_sec = sleep_duration.num_seconds().clamp(0, i64::MAX) as u64;
199 trace!("[{name}] target: {target_dt}, sleep_sec: {sleep_sec}");
200 select! {
201 _ = tokio::time::sleep(TDuration::from_secs(sleep_sec)) => {}
202 _ = ctrl_move.wait_cancel_rx() => {
203 info!("[{name}] cancel periodic task");
204 return;
205 }
206 }
207
208 trace!("[{name}] wake up");
209 } else {
210 for dt in dt_list.iter_mut() {
213 let tomorrow = dt.date() + CDuration::try_days(1).unwrap();
214 let time = dt.time();
215 *dt = tomorrow.and_time(time);
216 trace!("[{name}] advance time list by 1 day");
217 }
218 }
219 continue;
220 }
221 }
222
223 let future = f(ctrl_move.clone());
225 info!("[{name}] start (periodic)");
226 let result = future.await;
227 if let Err(e) = result {
229 error!("[{name}] finish (error): {e:?}");
230 } else {
231 info!("[{name}] finish (success)");
232 }
233
234 let target_dt = next_min + CDuration::try_seconds(5).unwrap();
237 let sleep_duration = target_dt - Local::now().naive_local();
241 let sleep_sec = sleep_duration.num_seconds().clamp(0, i64::MAX) as u64;
242 trace!("[{name}] target: {target_dt}, sleep_sec: {sleep_sec}");
243 select! {
244 _ = tokio::time::sleep(tokio::time::Duration::from_secs(sleep_sec)) => {}
245 _ = ctrl_move.wait_cancel_rx() => {
246 info!("[{name}] cancel periodic task");
247 return;
248 }
249 }
250 trace!("[{name}] wake up");
251 }
252 });
254}
255
256impl TaskServer {
257 pub fn new(sysmods: SystemModules) -> Self {
259 let rt = tokio::runtime::Builder::new_multi_thread()
260 .enable_all()
261 .build()
262 .unwrap();
263
264 let (cancel_tx, cancel_rx) = watch::channel(false);
267
268 let internal = Controller {
269 rt,
270 sysmods,
271 cancel_rx: std::sync::Mutex::new(Some(cancel_rx)),
272 };
273 let ctrl = Arc::new(internal);
274
275 TaskServer { ctrl, cancel_tx }
276 }
277
278 pub fn spawn_oneshot_task<F, T>(&self, name: &str, f: F)
280 where
281 F: Fn(Control) -> T + Send + Sync + 'static,
282 T: Future<Output = Result<()>> + Send,
283 {
284 spawn_oneshot_task(&self.ctrl, name, f);
285 }
286
287 pub fn run<F1, F2>(self, usr1_handler: F1, usr2_handler: F2) -> RunResult
293 where
294 F1: Fn() -> Option<RunResult>,
295 F2: Fn() -> Option<RunResult>,
296 {
297 let ctrl = Arc::clone(&self.ctrl);
298 self.ctrl.rt.block_on(async move {
299 ctrl.sysmods.on_start(&ctrl).await;
301
302 let mut sigint = signal(SignalKind::interrupt()).unwrap();
304 let mut sigterm = signal(SignalKind::terminate()).unwrap();
305 let mut sighup = signal(SignalKind::hangup()).unwrap();
306 let mut sigusr1 = signal(SignalKind::user_defined1()).unwrap();
307 let mut sigusr2 = signal(SignalKind::user_defined2()).unwrap();
308
309 let run_result = loop {
310 tokio::select! {
311 _ = sigint.recv() => {
312 info!("[signal] SIGINT");
313 break RunResult::Shutdown
314 },
315 _ = sigterm.recv() => {
316 info!("[signal] SIGTERM");
317 break RunResult::Shutdown
318 },
319 _ = sighup.recv() => {
320 info!("[signal] SIGHUP");
321 break RunResult::Reboot
322 },
323 _ = sigusr1.recv() => {
324 info!("[signal] SIGUSR1");
325 if let Some(result) = usr1_handler() {
326 break result
327 }
328 }
329 _ = sigusr2.recv() => {
330 info!("[signal] SIGUSR2");
331 if let Some(result) = usr2_handler() {
332 break result
333 }
334 }
335 }
336 };
337
338 self.cancel_tx.send_replace(true);
340 drop(ctrl.cancel_rx.lock().unwrap().take());
343 info!("waiting for all tasks to be completed....");
345 self.cancel_tx.closed().await;
346 info!("OK: all tasks are completed");
347
348 run_result
349 })
350 }
352}