1#[cfg(any(all(feature = "test", feature = "kobo"), doc))]
38mod dbus_monitor;
39#[cfg(any(feature = "test", doc))]
40mod hello_world;
41#[cfg(any(feature = "kobo", doc))]
42mod wifi_status_monitor;
43
44use std::collections::HashMap;
45use std::sync::atomic::{AtomicBool, Ordering};
46use std::sync::mpsc::{self, Receiver, Sender};
47use std::thread::{self, JoinHandle};
48use std::time::Duration;
49
50use thiserror::Error;
51
52use crate::settings::Settings;
53use crate::view::Event;
54
55#[derive(Error, Debug)]
57pub enum TaskError {
58 #[error("task '{0}' is already running")]
60 AlreadyRunning(TaskId),
61
62 #[error("task '{0}' is not running")]
64 NotRunning(TaskId),
65}
66
67#[derive(Debug, Clone, PartialEq, Eq, Hash)]
69pub enum TaskId {
70 Placeholder,
72 #[cfg(any(feature = "test", doc))]
74 HelloWorld,
75 #[cfg(any(all(feature = "test", feature = "kobo"), doc))]
77 DbusMonitor,
78 #[cfg(any(feature = "kobo", doc))]
80 WifiStatusMonitor,
81 #[cfg(test)]
83 TestTask,
84 #[cfg(test)]
86 TestTask2,
87}
88
89impl std::fmt::Display for TaskId {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 match self {
92 TaskId::Placeholder => write!(f, "placeholder"),
93 #[cfg(feature = "test")]
94 TaskId::HelloWorld => write!(f, "hello_world"),
95 #[cfg(all(feature = "test", feature = "kobo"))]
96 TaskId::DbusMonitor => write!(f, "dbus_monitor"),
97 #[cfg(feature = "kobo")]
98 TaskId::WifiStatusMonitor => write!(f, "wifi_status_monitor"),
99 #[cfg(test)]
100 TaskId::TestTask => write!(f, "test_task"),
101 #[cfg(test)]
102 TaskId::TestTask2 => write!(f, "test_task_2"),
103 }
104 }
105}
106
107pub struct ShutdownSignal {
112 receiver: Receiver<()>,
113 stopped: AtomicBool,
114}
115
116impl ShutdownSignal {
117 fn new(receiver: Receiver<()>) -> Self {
118 Self {
119 receiver,
120 stopped: AtomicBool::new(false),
121 }
122 }
123
124 pub fn should_stop(&self) -> bool {
130 if self.stopped.load(Ordering::Acquire) {
131 return true;
132 }
133 if self.receiver.try_recv().is_ok() {
134 self.stopped.store(true, Ordering::Release);
135 return true;
136 }
137 false
138 }
139
140 pub fn wait(&self, duration: Duration) -> bool {
146 if self.stopped.load(Ordering::Acquire) {
147 return true;
148 }
149 match self.receiver.recv_timeout(duration) {
150 Ok(()) | Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
151 self.stopped.store(true, Ordering::Release);
152 true
153 }
154 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => false,
155 }
156 }
157}
158
159pub trait BackgroundTask: Send {
165 fn id(&self) -> TaskId;
167
168 fn run(&mut self, hub: &Sender<Event>, shutdown: &ShutdownSignal);
173
174 fn stop(&mut self) {}
178}
179
180struct RunningTask {
181 handle: JoinHandle<()>,
182 shutdown: Sender<()>,
183}
184
185pub struct TaskManager {
190 tasks: HashMap<TaskId, RunningTask>,
191}
192
193impl TaskManager {
194 pub fn new() -> Self {
196 Self {
197 tasks: HashMap::new(),
198 }
199 }
200
201 #[cfg_attr(feature = "otel", tracing::instrument(skip(self, task, hub), fields(task_id = tracing::field::Empty), ret))]
208 pub fn start(
209 &mut self,
210 task: Box<dyn BackgroundTask>,
211 hub: Sender<Event>,
212 ) -> Result<TaskId, TaskError> {
213 let id = task.id();
214
215 #[cfg(feature = "otel")]
216 tracing::Span::current().record("task_id", tracing::field::display(&id));
217
218 if self.is_running(&id) {
219 return Err(TaskError::AlreadyRunning(id));
220 }
221
222 let (shutdown_tx, shutdown_rx) = mpsc::channel();
223 let shutdown_signal = ShutdownSignal::new(shutdown_rx);
224
225 let handle = thread::spawn(move || {
226 let mut task = task;
227 tracing::info!("task started");
228 task.run(&hub, &shutdown_signal);
229 task.stop();
230 tracing::info!("task stopped");
231 });
232
233 self.tasks.insert(
234 id.clone(),
235 RunningTask {
236 handle,
237 shutdown: shutdown_tx,
238 },
239 );
240
241 tracing::info!("task registered");
242 Ok(id)
243 }
244
245 #[cfg_attr(feature = "otel", tracing::instrument(skip(self), fields(task_id = %id), ret))]
250 pub fn stop(&mut self, id: &TaskId) -> Result<(), TaskError> {
251 self.cleanup_finished();
252 if let Some(task) = self.tasks.remove(id) {
253 tracing::info!("sending shutdown signal");
254 if let Err(e) = task.shutdown.send(()) {
255 tracing::error!(error = %e, "failed to send shutdown signal");
256 }
257 if task.handle.join().is_err() {
258 tracing::error!("task thread panicked");
259 }
260 Ok(())
261 } else {
262 Err(TaskError::NotRunning(id.clone()))
263 }
264 }
265
266 #[cfg_attr(feature = "otel", tracing::instrument(skip(self), fields(task_count = tracing::field::Empty)))]
270 pub fn stop_all(&mut self) {
271 let tasks: Vec<_> = self.tasks.drain().collect();
272
273 #[cfg(feature = "otel")]
274 tracing::Span::current().record("task_count", tasks.len());
275
276 if !tasks.is_empty() {
277 tracing::info!("stopping all tasks");
278 }
279 for (_, task) in &tasks {
280 if let Err(e) = task.shutdown.send(()) {
281 tracing::error!(error = %e, "failed to send shutdown signal");
282 }
283 }
284 for (_, task) in tasks {
285 if task.handle.join().is_err() {
286 tracing::error!("task thread panicked");
287 }
288 }
289 }
290
291 fn cleanup_finished(&mut self) {
293 self.tasks.retain(|_, task| !task.handle.is_finished());
294 }
295
296 pub fn is_running(&mut self, id: &TaskId) -> bool {
298 self.cleanup_finished();
299 self.tasks.contains_key(id)
300 }
301
302 pub fn running_tasks(&mut self) -> Vec<TaskId> {
304 self.cleanup_finished();
305 self.tasks.keys().cloned().collect()
306 }
307}
308
309impl Default for TaskManager {
310 fn default() -> Self {
311 Self::new()
312 }
313}
314
315impl Drop for TaskManager {
316 fn drop(&mut self) {
317 self.stop_all();
318 }
319}
320
321pub fn register_startup_tasks(manager: &mut TaskManager, hub: Sender<Event>, _settings: &Settings) {
329 #[cfg(feature = "kobo")]
330 {
331 let task = Box::new(wifi_status_monitor::WifiStatusMonitorTask);
332 if let Err(e) = manager.start(task, hub.clone()) {
333 tracing::warn!(error = %e, "failed to start wifi_status_monitor task");
334 }
335 }
336
337 #[cfg(feature = "test")]
338 {
339 let settings = _settings;
340
341 let task = Box::new(hello_world::HelloWorldTask);
342 if let Err(e) = manager.start(task, hub.clone()) {
343 tracing::warn!(error = %e, "failed to start hello_world task");
344 }
345
346 #[cfg(feature = "kobo")]
347 if settings.logging.enable_dbus_log {
348 let task = Box::new(dbus_monitor::DbusMonitorTask);
349 if let Err(e) = manager.start(task, hub) {
350 tracing::warn!(error = %e, "failed to start dbus_monitor task");
351 }
352 }
353 }
354}
355
356#[cfg(test)]
357mod tests {
358 use super::*;
359 use std::sync::mpsc;
360 use std::time::{Duration, Instant};
361
362 fn wait_until_not_running(manager: &mut TaskManager, id: &TaskId) {
363 let deadline = Instant::now() + Duration::from_secs(5);
364 while Instant::now() < deadline {
365 if !manager.is_running(id) {
366 return;
367 }
368 std::thread::sleep(Duration::from_millis(1));
369 }
370 panic!("task '{id}' did not finish within timeout");
371 }
372
373 struct InstantTask;
374
375 impl BackgroundTask for InstantTask {
376 fn id(&self) -> TaskId {
377 TaskId::TestTask2
378 }
379
380 fn run(&mut self, _hub: &Sender<Event>, _shutdown: &ShutdownSignal) {}
381 }
382
383 struct WaitingTask;
384
385 impl BackgroundTask for WaitingTask {
386 fn id(&self) -> TaskId {
387 TaskId::TestTask
388 }
389
390 fn run(&mut self, _hub: &Sender<Event>, shutdown: &ShutdownSignal) {
391 shutdown.wait(Duration::from_secs(60));
392 }
393 }
394
395 #[test]
396 fn start_and_stop() {
397 let mut manager = TaskManager::new();
398 let (hub, _rx) = mpsc::channel();
399
400 let id = manager.start(Box::new(WaitingTask), hub).unwrap();
401 assert!(manager.is_running(&id));
402
403 manager.stop(&id).unwrap();
404 assert!(!manager.is_running(&id));
405 }
406
407 #[test]
408 fn duplicate_start_returns_error() {
409 let mut manager = TaskManager::new();
410 let (hub, _rx) = mpsc::channel();
411
412 manager.start(Box::new(WaitingTask), hub.clone()).unwrap();
413 let err = manager.start(Box::new(WaitingTask), hub).unwrap_err();
414
415 assert!(matches!(err, TaskError::AlreadyRunning(TaskId::TestTask)));
416 }
417
418 #[test]
419 fn finished_task_is_cleaned_up() {
420 let mut manager = TaskManager::new();
421 let (hub, _rx) = mpsc::channel();
422
423 let id = manager.start(Box::new(InstantTask), hub).unwrap();
424
425 wait_until_not_running(&mut manager, &id);
426 assert!(!manager.is_running(&id));
427 }
428
429 #[test]
430 fn stop_finished_task_returns_not_running() {
431 let mut manager = TaskManager::new();
432 let (hub, _rx) = mpsc::channel();
433
434 let id = manager.start(Box::new(InstantTask), hub).unwrap();
435
436 wait_until_not_running(&mut manager, &id);
437 let err = manager.stop(&id).unwrap_err();
438
439 assert!(matches!(err, TaskError::NotRunning(TaskId::TestTask2)));
440 }
441
442 #[test]
443 fn running_tasks_excludes_finished() {
444 let mut manager = TaskManager::new();
445 let (hub, _rx) = mpsc::channel();
446
447 manager.start(Box::new(WaitingTask), hub.clone()).unwrap();
448 let instant_id = manager.start(Box::new(InstantTask), hub).unwrap();
449
450 wait_until_not_running(&mut manager, &instant_id);
451 let running = manager.running_tasks();
452
453 assert_eq!(running.len(), 1);
454 assert_eq!(running[0], TaskId::TestTask);
455
456 manager.stop_all();
457 }
458
459 #[test]
460 fn stop_all_stops_everything() {
461 let mut manager = TaskManager::new();
462 let (hub, _rx) = mpsc::channel();
463
464 manager.start(Box::new(WaitingTask), hub).unwrap();
465 manager.stop_all();
466
467 assert!(!manager.is_running(&TaskId::TestTask));
468 }
469}