cadmus_core/task/
mod.rs

1//! Long-running background task infrastructure.
2//!
3//! This module provides a trait-based system for defining and managing
4//! background tasks that run alongside the main application loop.
5//!
6//! # Architecture
7//!
8//! - [`BackgroundTask`] trait defines the interface for long-running tasks
9//! - [`TaskManager`] spawns and manages task lifecycles
10//! - [`ShutdownSignal`] provides graceful shutdown coordination
11//!
12//! # Example
13//!
14//! ```ignore
15//! use cadmus_core::task::{BackgroundTask, TaskId, ShutdownSignal};
16//! use std::sync::mpsc::Sender;
17//! use cadmus_core::view::Event;
18//!
19//! struct MyTask;
20//!
21//! impl BackgroundTask for MyTask {
22//!     fn id(&self) -> TaskId {
23//!         TaskId::MyTask
24//!     }
25//!
26//!     fn run(&mut self, hub: &Sender<Event>, shutdown: &ShutdownSignal) {
27//!         while !shutdown.should_stop() {
28//!             // Do work...
29//!             if shutdown.wait(Duration::from_secs(60)) {
30//!                 break;
31//!             }
32//!         }
33//!     }
34//! }
35//! ```
36
37#[cfg(any(all(feature = "test", feature = "kobo"), doc))]
38mod dbus_monitor;
39#[cfg(any(feature = "test", doc))]
40mod hello_world;
41#[cfg(any(feature = "kobo", doc))]
42mod wifi_status_monitor;
43
44use std::collections::HashMap;
45use std::sync::atomic::{AtomicBool, Ordering};
46use std::sync::mpsc::{self, Receiver, Sender};
47use std::thread::{self, JoinHandle};
48use std::time::Duration;
49
50use thiserror::Error;
51
52use crate::settings::Settings;
53use crate::view::Event;
54
55/// Errors that can occur during task management operations.
56#[derive(Error, Debug)]
57pub enum TaskError {
58    /// A task with the given ID is already running.
59    #[error("task '{0}' is already running")]
60    AlreadyRunning(TaskId),
61
62    /// A task with the given ID is not running.
63    #[error("task '{0}' is not running")]
64    NotRunning(TaskId),
65}
66
67/// Unique identifier for a background task.
68#[derive(Debug, Clone, PartialEq, Eq, Hash)]
69pub enum TaskId {
70    /// A tmp placeholder until there is a Task always available.
71    Placeholder,
72    /// The example task that prints periodically (test builds only).
73    #[cfg(any(feature = "test", doc))]
74    HelloWorld,
75    /// D-Bus system bus monitor (test + kobo builds only).
76    #[cfg(any(all(feature = "test", feature = "kobo"), doc))]
77    DbusMonitor,
78    /// WiFi status monitor using dhcpcd-dbus (kobo builds only).
79    #[cfg(any(feature = "kobo", doc))]
80    WifiStatusMonitor,
81    /// Test-only task for unit tests.
82    #[cfg(test)]
83    TestTask,
84    /// Second test-only task for unit tests.
85    #[cfg(test)]
86    TestTask2,
87}
88
89impl std::fmt::Display for TaskId {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        match self {
92            TaskId::Placeholder => write!(f, "placeholder"),
93            #[cfg(feature = "test")]
94            TaskId::HelloWorld => write!(f, "hello_world"),
95            #[cfg(all(feature = "test", feature = "kobo"))]
96            TaskId::DbusMonitor => write!(f, "dbus_monitor"),
97            #[cfg(feature = "kobo")]
98            TaskId::WifiStatusMonitor => write!(f, "wifi_status_monitor"),
99            #[cfg(test)]
100            TaskId::TestTask => write!(f, "test_task"),
101            #[cfg(test)]
102            TaskId::TestTask2 => write!(f, "test_task_2"),
103        }
104    }
105}
106
107/// Signal for coordinating graceful shutdown of background tasks.
108///
109/// Tasks should periodically check [`should_stop`](Self::should_stop) or use
110/// [`wait`](Self::wait) to interrupt sleep when shutdown is requested.
111pub struct ShutdownSignal {
112    receiver: Receiver<()>,
113    stopped: AtomicBool,
114}
115
116impl ShutdownSignal {
117    fn new(receiver: Receiver<()>) -> Self {
118        Self {
119            receiver,
120            stopped: AtomicBool::new(false),
121        }
122    }
123
124    /// Returns `true` if shutdown has been requested.
125    ///
126    /// Once `true` is returned, all subsequent calls also return `true`
127    /// (the shutdown state is latched). This is non-blocking and suitable
128    /// for polling in tight loops.
129    pub fn should_stop(&self) -> bool {
130        if self.stopped.load(Ordering::Acquire) {
131            return true;
132        }
133        if self.receiver.try_recv().is_ok() {
134            self.stopped.store(true, Ordering::Release);
135            return true;
136        }
137        false
138    }
139
140    /// Waits for the given duration or until shutdown is requested.
141    ///
142    /// Returns `true` if shutdown was requested, `false` if the duration elapsed.
143    ///
144    /// This is the preferred method for tasks that sleep between work cycles.
145    pub fn wait(&self, duration: Duration) -> bool {
146        if self.stopped.load(Ordering::Acquire) {
147            return true;
148        }
149        match self.receiver.recv_timeout(duration) {
150            Ok(()) | Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
151                self.stopped.store(true, Ordering::Release);
152                true
153            }
154            Err(std::sync::mpsc::RecvTimeoutError::Timeout) => false,
155        }
156    }
157}
158
159/// A long-running background task.
160///
161/// Implement this trait to define tasks that run in dedicated threads
162/// alongside the main application loop. Tasks receive the event hub
163/// to dispatch events and a shutdown signal for graceful termination.
164pub trait BackgroundTask: Send {
165    /// Returns the unique identifier for this task.
166    fn id(&self) -> TaskId;
167
168    /// Runs the task until shutdown is requested.
169    ///
170    /// This method is called in a dedicated thread. Use `hub` to send
171    /// events to the main loop and `shutdown` to check for termination.
172    fn run(&mut self, hub: &Sender<Event>, shutdown: &ShutdownSignal);
173
174    /// Called when the task is being stopped.
175    ///
176    /// Override this to perform cleanup. The default implementation does nothing.
177    fn stop(&mut self) {}
178}
179
180struct RunningTask {
181    handle: JoinHandle<()>,
182    shutdown: Sender<()>,
183}
184
185/// Manages the lifecycle of background tasks.
186///
187/// The task manager spawns tasks in dedicated threads and provides
188/// methods to stop individual tasks or all tasks at once.
189pub struct TaskManager {
190    tasks: HashMap<TaskId, RunningTask>,
191}
192
193impl TaskManager {
194    /// Creates a new empty task manager.
195    pub fn new() -> Self {
196        Self {
197            tasks: HashMap::new(),
198        }
199    }
200
201    /// Starts a background task in a new thread.
202    ///
203    /// The task receives a clone of `hub` for sending events and a
204    /// [`ShutdownSignal`] for graceful termination.
205    ///
206    /// Returns an error if a task with the same ID is already running.
207    #[cfg_attr(feature = "otel", tracing::instrument(skip(self, task, hub), fields(task_id = tracing::field::Empty), ret))]
208    pub fn start(
209        &mut self,
210        task: Box<dyn BackgroundTask>,
211        hub: Sender<Event>,
212    ) -> Result<TaskId, TaskError> {
213        let id = task.id();
214
215        #[cfg(feature = "otel")]
216        tracing::Span::current().record("task_id", tracing::field::display(&id));
217
218        if self.is_running(&id) {
219            return Err(TaskError::AlreadyRunning(id));
220        }
221
222        let (shutdown_tx, shutdown_rx) = mpsc::channel();
223        let shutdown_signal = ShutdownSignal::new(shutdown_rx);
224
225        let handle = thread::spawn(move || {
226            let mut task = task;
227            tracing::info!("task started");
228            task.run(&hub, &shutdown_signal);
229            task.stop();
230            tracing::info!("task stopped");
231        });
232
233        self.tasks.insert(
234            id.clone(),
235            RunningTask {
236                handle,
237                shutdown: shutdown_tx,
238            },
239        );
240
241        tracing::info!("task registered");
242        Ok(id)
243    }
244
245    /// Stops a running task by ID.
246    ///
247    /// Sends the shutdown signal and waits for the task thread to finish.
248    /// Returns an error if the task is not running.
249    #[cfg_attr(feature = "otel", tracing::instrument(skip(self), fields(task_id = %id), ret))]
250    pub fn stop(&mut self, id: &TaskId) -> Result<(), TaskError> {
251        self.cleanup_finished();
252        if let Some(task) = self.tasks.remove(id) {
253            tracing::info!("sending shutdown signal");
254            if let Err(e) = task.shutdown.send(()) {
255                tracing::error!(error = %e, "failed to send shutdown signal");
256            }
257            if task.handle.join().is_err() {
258                tracing::error!("task thread panicked");
259            }
260            Ok(())
261        } else {
262            Err(TaskError::NotRunning(id.clone()))
263        }
264    }
265
266    /// Stops all running tasks.
267    ///
268    /// Sends shutdown signals to all tasks and waits for them to finish.
269    #[cfg_attr(feature = "otel", tracing::instrument(skip(self), fields(task_count = tracing::field::Empty)))]
270    pub fn stop_all(&mut self) {
271        let tasks: Vec<_> = self.tasks.drain().collect();
272
273        #[cfg(feature = "otel")]
274        tracing::Span::current().record("task_count", tasks.len());
275
276        if !tasks.is_empty() {
277            tracing::info!("stopping all tasks");
278        }
279        for (_, task) in &tasks {
280            if let Err(e) = task.shutdown.send(()) {
281                tracing::error!(error = %e, "failed to send shutdown signal");
282            }
283        }
284        for (_, task) in tasks {
285            if task.handle.join().is_err() {
286                tracing::error!("task thread panicked");
287            }
288        }
289    }
290
291    /// Removes entries for tasks whose threads have finished.
292    fn cleanup_finished(&mut self) {
293        self.tasks.retain(|_, task| !task.handle.is_finished());
294    }
295
296    /// Returns `true` if a task with the given ID is running.
297    pub fn is_running(&mut self, id: &TaskId) -> bool {
298        self.cleanup_finished();
299        self.tasks.contains_key(id)
300    }
301
302    /// Returns the IDs of all running tasks.
303    pub fn running_tasks(&mut self) -> Vec<TaskId> {
304        self.cleanup_finished();
305        self.tasks.keys().cloned().collect()
306    }
307}
308
309impl Default for TaskManager {
310    fn default() -> Self {
311        Self::new()
312    }
313}
314
315impl Drop for TaskManager {
316    fn drop(&mut self) {
317        self.stop_all();
318    }
319}
320
321/// Registers background tasks that run at startup.
322///
323/// Call this during startup to add background tasks.
324/// Currently registers:
325/// - [`wifi_status_monitor::WifiStatusMonitorTask`] - monitors WiFi status via dhcpcd-dbus (kobo only)
326/// - [`hello_world::HelloWorldTask`] - prints "Hello world!" every minute (test only)
327/// - [`dbus_monitor::DbusMonitorTask`] - monitors D-Bus signals (test + kobo only, when `settings.logging.enable_dbus_log` is true)
328pub fn register_startup_tasks(manager: &mut TaskManager, hub: Sender<Event>, _settings: &Settings) {
329    #[cfg(feature = "kobo")]
330    {
331        let task = Box::new(wifi_status_monitor::WifiStatusMonitorTask);
332        if let Err(e) = manager.start(task, hub.clone()) {
333            tracing::warn!(error = %e, "failed to start wifi_status_monitor task");
334        }
335    }
336
337    #[cfg(feature = "test")]
338    {
339        let settings = _settings;
340
341        let task = Box::new(hello_world::HelloWorldTask);
342        if let Err(e) = manager.start(task, hub.clone()) {
343            tracing::warn!(error = %e, "failed to start hello_world task");
344        }
345
346        #[cfg(feature = "kobo")]
347        if settings.logging.enable_dbus_log {
348            let task = Box::new(dbus_monitor::DbusMonitorTask);
349            if let Err(e) = manager.start(task, hub) {
350                tracing::warn!(error = %e, "failed to start dbus_monitor task");
351            }
352        }
353    }
354}
355
356#[cfg(test)]
357mod tests {
358    use super::*;
359    use std::sync::mpsc;
360    use std::time::{Duration, Instant};
361
362    fn wait_until_not_running(manager: &mut TaskManager, id: &TaskId) {
363        let deadline = Instant::now() + Duration::from_secs(5);
364        while Instant::now() < deadline {
365            if !manager.is_running(id) {
366                return;
367            }
368            std::thread::sleep(Duration::from_millis(1));
369        }
370        panic!("task '{id}' did not finish within timeout");
371    }
372
373    struct InstantTask;
374
375    impl BackgroundTask for InstantTask {
376        fn id(&self) -> TaskId {
377            TaskId::TestTask2
378        }
379
380        fn run(&mut self, _hub: &Sender<Event>, _shutdown: &ShutdownSignal) {}
381    }
382
383    struct WaitingTask;
384
385    impl BackgroundTask for WaitingTask {
386        fn id(&self) -> TaskId {
387            TaskId::TestTask
388        }
389
390        fn run(&mut self, _hub: &Sender<Event>, shutdown: &ShutdownSignal) {
391            shutdown.wait(Duration::from_secs(60));
392        }
393    }
394
395    #[test]
396    fn start_and_stop() {
397        let mut manager = TaskManager::new();
398        let (hub, _rx) = mpsc::channel();
399
400        let id = manager.start(Box::new(WaitingTask), hub).unwrap();
401        assert!(manager.is_running(&id));
402
403        manager.stop(&id).unwrap();
404        assert!(!manager.is_running(&id));
405    }
406
407    #[test]
408    fn duplicate_start_returns_error() {
409        let mut manager = TaskManager::new();
410        let (hub, _rx) = mpsc::channel();
411
412        manager.start(Box::new(WaitingTask), hub.clone()).unwrap();
413        let err = manager.start(Box::new(WaitingTask), hub).unwrap_err();
414
415        assert!(matches!(err, TaskError::AlreadyRunning(TaskId::TestTask)));
416    }
417
418    #[test]
419    fn finished_task_is_cleaned_up() {
420        let mut manager = TaskManager::new();
421        let (hub, _rx) = mpsc::channel();
422
423        let id = manager.start(Box::new(InstantTask), hub).unwrap();
424
425        wait_until_not_running(&mut manager, &id);
426        assert!(!manager.is_running(&id));
427    }
428
429    #[test]
430    fn stop_finished_task_returns_not_running() {
431        let mut manager = TaskManager::new();
432        let (hub, _rx) = mpsc::channel();
433
434        let id = manager.start(Box::new(InstantTask), hub).unwrap();
435
436        wait_until_not_running(&mut manager, &id);
437        let err = manager.stop(&id).unwrap_err();
438
439        assert!(matches!(err, TaskError::NotRunning(TaskId::TestTask2)));
440    }
441
442    #[test]
443    fn running_tasks_excludes_finished() {
444        let mut manager = TaskManager::new();
445        let (hub, _rx) = mpsc::channel();
446
447        manager.start(Box::new(WaitingTask), hub.clone()).unwrap();
448        let instant_id = manager.start(Box::new(InstantTask), hub).unwrap();
449
450        wait_until_not_running(&mut manager, &instant_id);
451        let running = manager.running_tasks();
452
453        assert_eq!(running.len(), 1);
454        assert_eq!(running[0], TaskId::TestTask);
455
456        manager.stop_all();
457    }
458
459    #[test]
460    fn stop_all_stops_everything() {
461        let mut manager = TaskManager::new();
462        let (hub, _rx) = mpsc::channel();
463
464        manager.start(Box::new(WaitingTask), hub).unwrap();
465        manager.stop_all();
466
467        assert!(!manager.is_running(&TaskId::TestTask));
468    }
469}