Skip to main content

cadmus_core/task/
mod.rs

1//! Long-running background task infrastructure.
2//!
3//! This module provides a trait-based system for defining and managing
4//! background tasks that run alongside the main application loop.
5//!
6//! # Architecture
7//!
8//! - [`BackgroundTask`] trait defines the interface for long-running tasks
9//! - [`TaskManager`] spawns and manages task lifecycles
10//! - [`ShutdownSignal`] provides graceful shutdown coordination
11//!
12//! # Example
13//!
14//! ```ignore
15//! use cadmus_core::task::{BackgroundTask, TaskId, ShutdownSignal};
16//! use std::sync::mpsc::Sender;
17//! use cadmus_core::view::Event;
18//!
19//! struct MyTask;
20//!
21//! impl BackgroundTask for MyTask {
22//!     fn id(&self) -> TaskId {
23//!         TaskId::MyTask
24//!     }
25//!
26//!     fn run(&mut self, hub: &Sender<Event>, shutdown: &ShutdownSignal) {
27//!         while !shutdown.should_stop() {
28//!             // Do work...
29//!             if shutdown.wait(Duration::from_secs(60)) {
30//!                 break;
31//!             }
32//!         }
33//!     }
34//! }
35//! ```
36
37#[cfg(any(all(feature = "test", feature = "kobo"), doc))]
38mod dbus_monitor;
39pub mod dictionary_index;
40#[cfg(any(feature = "test", doc))]
41mod hello_world;
42pub mod import;
43pub mod thumbnail;
44#[cfg(any(feature = "kobo", doc))]
45mod wifi_status_monitor;
46
47use std::collections::{HashMap, VecDeque};
48use std::sync::atomic::{AtomicBool, Ordering};
49use std::sync::mpsc::{self, Receiver, Sender};
50use std::thread::{self, JoinHandle};
51use std::time::Duration;
52
53use thiserror::Error;
54
55use crate::db::Database;
56use crate::settings::Settings;
57use crate::view::Event;
58
59/// Errors that can occur during task management operations.
60#[derive(Error, Debug)]
61pub enum TaskError {
62    /// A task with the given ID is already running.
63    #[error("task '{0}' is already running")]
64    AlreadyRunning(TaskId),
65
66    /// A task with the given ID is not running.
67    #[error("task '{0}' is not running")]
68    NotRunning(TaskId),
69}
70
71/// Unique identifier for a background task.
72#[derive(Debug, Clone, PartialEq, Eq, Hash)]
73pub enum TaskId {
74    /// A tmp placeholder until there is a Task always available.
75    Placeholder,
76    /// Library import task.
77    Import,
78    /// Thumbnail extraction background task.
79    ThumbnailExtraction,
80    /// Dictionary index background task.
81    DictionaryIndex,
82    /// The example task that prints periodically (test builds only).
83    #[cfg(any(feature = "test", doc))]
84    HelloWorld,
85    /// D-Bus system bus monitor (test + kobo builds only).
86    #[cfg(any(all(feature = "test", feature = "kobo"), doc))]
87    DbusMonitor,
88    /// WiFi status monitor using dhcpcd-dbus (kobo builds only).
89    #[cfg(any(feature = "kobo", doc))]
90    WifiStatusMonitor,
91    /// Test-only task for unit tests.
92    #[cfg(test)]
93    TestTask,
94    /// Second test-only task for unit tests.
95    #[cfg(test)]
96    TestTask2,
97}
98
99impl std::fmt::Display for TaskId {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        match self {
102            TaskId::Placeholder => write!(f, "placeholder"),
103            TaskId::Import => write!(f, "import"),
104            TaskId::ThumbnailExtraction => write!(f, "thumbnail_extraction"),
105            TaskId::DictionaryIndex => write!(f, "dictionary_index"),
106            #[cfg(feature = "test")]
107            TaskId::HelloWorld => write!(f, "hello_world"),
108            #[cfg(all(feature = "test", feature = "kobo"))]
109            TaskId::DbusMonitor => write!(f, "dbus_monitor"),
110            #[cfg(feature = "kobo")]
111            TaskId::WifiStatusMonitor => write!(f, "wifi_status_monitor"),
112            #[cfg(test)]
113            TaskId::TestTask => write!(f, "test_task"),
114            #[cfg(test)]
115            TaskId::TestTask2 => write!(f, "test_task_2"),
116        }
117    }
118}
119
120/// Signal for coordinating graceful shutdown of background tasks.
121///
122/// Tasks should periodically check [`should_stop`](Self::should_stop) or use
123/// [`wait`](Self::wait) to interrupt sleep when shutdown is requested.
124pub struct ShutdownSignal {
125    receiver: Receiver<()>,
126    /// Keeps the sender alive when no external owner exists, preventing
127    /// spurious `Disconnected` errors in `wait()`.
128    _sender_anchor: Option<Sender<()>>,
129    stopped: AtomicBool,
130}
131
132impl ShutdownSignal {
133    fn new(receiver: Receiver<()>) -> Self {
134        Self {
135            receiver,
136            _sender_anchor: None,
137            stopped: AtomicBool::new(false),
138        }
139    }
140
141    /// Creates a shutdown signal that never fires.
142    ///
143    /// Intended for use in tests and one-shot contexts where graceful shutdown
144    /// is not needed.
145    pub fn never() -> Self {
146        let (tx, rx) = mpsc::channel();
147        Self {
148            receiver: rx,
149            _sender_anchor: Some(tx),
150            stopped: AtomicBool::new(false),
151        }
152    }
153
154    /// Creates a shutdown signal from a raw receiver, for use in tests.
155    ///
156    /// Prefer [`never`](Self::never) when no shutdown is needed. Use this
157    /// when the test needs to trigger shutdown explicitly by sending `()` on
158    /// the corresponding `Sender`.
159    #[cfg(test)]
160    pub fn new_for_test(receiver: Receiver<()>) -> Self {
161        Self::new(receiver)
162    }
163
164    /// Returns `true` if shutdown has been requested.
165    ///
166    /// Once `true` is returned, all subsequent calls also return `true`
167    /// (the shutdown state is latched). This is non-blocking and suitable
168    /// for polling in tight loops.
169    pub fn should_stop(&self) -> bool {
170        if self.stopped.load(Ordering::Acquire) {
171            return true;
172        }
173        if self.receiver.try_recv().is_ok() {
174            self.stopped.store(true, Ordering::Release);
175            return true;
176        }
177        false
178    }
179
180    /// Waits for the given duration or until shutdown is requested.
181    ///
182    /// Returns `true` if shutdown was requested, `false` if the duration elapsed.
183    ///
184    /// This is the preferred method for tasks that sleep between work cycles.
185    pub fn wait(&self, duration: Duration) -> bool {
186        if self.stopped.load(Ordering::Acquire) {
187            return true;
188        }
189        match self.receiver.recv_timeout(duration) {
190            Ok(()) | Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
191                self.stopped.store(true, Ordering::Release);
192                true
193            }
194            Err(std::sync::mpsc::RecvTimeoutError::Timeout) => false,
195        }
196    }
197}
198
199/// A long-running background task.
200///
201/// Implement this trait to define tasks that run in dedicated threads
202/// alongside the main application loop. Tasks receive the event hub
203/// to dispatch events and a shutdown signal for graceful termination.
204pub trait BackgroundTask: Send {
205    /// Returns the unique identifier for this task.
206    fn id(&self) -> TaskId;
207
208    /// Runs the task until shutdown is requested.
209    ///
210    /// This method is called in a dedicated thread. Use `hub` to send
211    /// events to the main loop and `shutdown` to check for termination.
212    fn run(&mut self, hub: &Sender<Event>, shutdown: &ShutdownSignal);
213
214    /// Called when the task is being stopped.
215    ///
216    /// Override this to perform cleanup. The default implementation does nothing.
217    fn stop(&mut self) {}
218
219    /// Returns a "finished" event to send after the task thread exits.
220    ///
221    /// The [`TaskManager`] sends this event after
222    /// observing the task's thread as finished. The default returns `None`.
223    fn finished_event(&self) -> Option<Event> {
224        None
225    }
226}
227
228struct RunningTask {
229    handle: JoinHandle<()>,
230    shutdown: Sender<()>,
231    /// Event to emit when the task is observed as naturally finished.
232    finished_event: Option<Event>,
233}
234
235/// Manages the lifecycle of background tasks.
236///
237/// The task manager spawns tasks in dedicated threads and provides
238/// methods to stop individual tasks or all tasks at once.
239pub struct TaskManager {
240    tasks: HashMap<TaskId, RunningTask>,
241    /// Library indices awaiting import while one is already running.
242    pending_import_indices: VecDeque<Option<usize>>,
243    /// Library indices awaiting thumbnail extraction while a run is in progress.
244    pending_thumbnail_indices: VecDeque<Option<usize>>,
245    /// Events from naturally finished tasks, waiting to be sent.
246    buffered_events: Vec<Event>,
247}
248
249impl TaskManager {
250    /// Creates a new empty task manager.
251    pub fn new() -> Self {
252        Self {
253            tasks: HashMap::new(),
254            pending_import_indices: VecDeque::new(),
255            pending_thumbnail_indices: VecDeque::new(),
256            buffered_events: Vec::new(),
257        }
258    }
259
260    /// Starts a background task in a new thread.
261    ///
262    /// The task receives a clone of `hub` for sending events and a
263    /// [`ShutdownSignal`] for graceful termination.
264    ///
265    /// Returns an error if a task with the same ID is already running.
266    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self, task, hub), fields(task_id = tracing::field::Empty), ret))]
267    pub fn start(
268        &mut self,
269        task: Box<dyn BackgroundTask>,
270        hub: Sender<Event>,
271    ) -> Result<TaskId, TaskError> {
272        let id = task.id();
273
274        #[cfg(feature = "tracing")]
275        tracing::Span::current().record("task_id", tracing::field::display(&id));
276
277        if self.is_running(&id) {
278            return Err(TaskError::AlreadyRunning(id));
279        }
280
281        let (shutdown_tx, shutdown_rx) = mpsc::channel();
282        let shutdown_signal = ShutdownSignal::new(shutdown_rx);
283
284        let finished_event = task.finished_event();
285
286        let handle = thread::spawn(move || {
287            let mut task = task;
288            tracing::info!("task started");
289            task.run(&hub, &shutdown_signal);
290            task.stop();
291            tracing::info!("task stopped");
292        });
293
294        self.tasks.insert(
295            id.clone(),
296            RunningTask {
297                handle,
298                shutdown: shutdown_tx,
299                finished_event,
300            },
301        );
302
303        tracing::info!("task registered");
304        Ok(id)
305    }
306
307    /// Stops a running task by ID.
308    ///
309    /// Sends the shutdown signal and waits for the task thread to finish.
310    /// Returns an error if the task is not running.
311    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self), fields(task_id = %id), ret))]
312    pub fn stop(&mut self, id: &TaskId) -> Result<(), TaskError> {
313        self.cleanup_finished();
314        if let Some(task) = self.tasks.remove(id) {
315            tracing::info!("sending shutdown signal");
316            if let Err(e) = task.shutdown.send(()) {
317                tracing::error!(error = %e, "failed to send shutdown signal");
318            }
319            if task.handle.join().is_err() {
320                tracing::error!("task thread panicked");
321            }
322            Ok(())
323        } else {
324            Err(TaskError::NotRunning(id.clone()))
325        }
326    }
327
328    /// Stops all running tasks.
329    ///
330    /// Sends shutdown signals to all tasks and waits for them to finish.
331    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self), fields(task_count = tracing::field::Empty)))]
332    pub fn stop_all(&mut self) {
333        let tasks: Vec<_> = self.tasks.drain().collect();
334
335        #[cfg(feature = "tracing")]
336        tracing::Span::current().record("task_count", tasks.len());
337
338        if !tasks.is_empty() {
339            tracing::info!("stopping all tasks");
340        }
341        for (_, task) in &tasks {
342            if let Err(e) = task.shutdown.send(()) {
343                tracing::error!(error = %e, "failed to send shutdown signal");
344            }
345        }
346        for (_, task) in tasks {
347            if task.handle.join().is_err() {
348                tracing::error!("task thread panicked");
349            }
350        }
351    }
352
353    /// Removes entries for tasks whose threads have finished, buffering
354    /// their completion events only if the thread exited successfully.
355    fn cleanup_finished(&mut self) {
356        let finished: Vec<TaskId> = self
357            .tasks
358            .iter()
359            .filter(|(_, task)| task.handle.is_finished())
360            .map(|(id, _)| id.clone())
361            .collect();
362
363        for id in finished {
364            if let Some(task) = self.tasks.remove(&id) {
365                if task.handle.join().is_ok() {
366                    if let Some(evt) = task.finished_event {
367                        self.buffered_events.push(evt);
368                    }
369                } else {
370                    tracing::error!(task_id = %id, "task thread panicked");
371                }
372            }
373        }
374    }
375
376    /// Sends any buffered completion events from naturally finished tasks.
377    fn flush_buffered_events(&mut self, hub: &Sender<Event>) {
378        for evt in self.buffered_events.drain(..) {
379            hub.send(evt).ok();
380        }
381    }
382
383    /// Observes an event without consuming it.
384    ///
385    /// Must be called for every event before passing it to the view tree.
386    /// Always returns `false` — it never consumes events.
387    #[cfg_attr(
388        feature = "tracing",
389        tracing::instrument(skip(self, hub, database, settings))
390    )]
391    pub fn handle_event(
392        &mut self,
393        evt: &Event,
394        hub: &Sender<Event>,
395        database: &Database,
396        settings: &Settings,
397    ) -> bool {
398        self.cleanup_finished();
399        self.flush_buffered_events(hub);
400
401        match evt {
402            Event::ImportLibrary { library_index } => {
403                self.schedule_import(*library_index, hub, database, settings);
404            }
405            Event::ImportFinished { library_index } => {
406                self.drain_pending_imports(hub, database, settings);
407                self.schedule_thumbnail_extraction(*library_index, hub, database, settings);
408            }
409            Event::ThumbnailExtractionFinished { .. } => {
410                self.drain_pending_thumbnails(hub, database, settings);
411            }
412            Event::ReindexDictionaries => {
413                self.schedule_dictionary_index(hub, database);
414            }
415            _ => {}
416        }
417        false
418    }
419
420    /// Schedules an import task, queuing the index if one is already running.
421    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
422    fn schedule_import(
423        &mut self,
424        library_index: Option<usize>,
425        hub: &Sender<Event>,
426        database: &Database,
427        settings: &Settings,
428    ) {
429        if self.is_running(&TaskId::Import) {
430            tracing::info!(library_index = ?library_index, "import already running, queueing");
431            self.pending_import_indices.push_back(library_index);
432            return;
433        }
434
435        self.flush_buffered_events(hub);
436
437        let task = Box::new(import::ImportTask::new(
438            database.clone(),
439            settings.clone(),
440            library_index,
441        ));
442
443        if let Err(e) = self.start(task, hub.clone()) {
444            tracing::warn!(error = %e, "failed to start import task");
445        }
446    }
447
448    /// Starts the next pending import when the current one finishes.
449    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
450    fn drain_pending_imports(
451        &mut self,
452        hub: &Sender<Event>,
453        database: &Database,
454        settings: &Settings,
455    ) {
456        if self.is_running(&TaskId::Import) || self.pending_import_indices.is_empty() {
457            return;
458        }
459
460        let Some(next) = self.pending_import_indices.pop_front() else {
461            return;
462        };
463        self.schedule_import(next, hub, database, settings);
464    }
465
466    /// Schedules a dictionary index scan, stopping any running instance first.
467    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
468    fn schedule_dictionary_index(&mut self, hub: &Sender<Event>, database: &Database) {
469        if self.is_running(&TaskId::DictionaryIndex) {
470            tracing::debug!("stopping running dictionary index task for restart");
471            if let Err(e) = self.stop(&TaskId::DictionaryIndex) {
472                tracing::warn!(error = %e, "failed to stop dictionary_index task for restart");
473            }
474        }
475
476        self.flush_buffered_events(hub);
477
478        let task = Box::new(dictionary_index::DictionaryIndexTask::new(database.clone()));
479
480        if let Err(e) = self.start(task, hub.clone()) {
481            tracing::warn!(error = %e, "failed to start dictionary_index task");
482        }
483    }
484
485    /// Schedules a thumbnail extraction task, queuing the index if one is already running.
486    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
487    pub fn schedule_thumbnail_extraction(
488        &mut self,
489        library_index: Option<usize>,
490        hub: &Sender<Event>,
491        database: &Database,
492        settings: &Settings,
493    ) {
494        if self.is_running(&TaskId::ThumbnailExtraction) {
495            tracing::info!(library_index = ?library_index, "thumbnail extraction already running, queueing");
496            self.pending_thumbnail_indices.push_back(library_index);
497            return;
498        }
499
500        self.flush_buffered_events(hub);
501
502        let task = Box::new(thumbnail::ThumbnailExtractionTask::new(
503            database.clone(),
504            settings.clone(),
505            library_index,
506        ));
507
508        if let Err(e) = self.start(task, hub.clone()) {
509            tracing::warn!(error = %e, "failed to start thumbnail extraction task");
510        }
511    }
512
513    /// Starts the next pending thumbnail extraction when the current one finishes.
514    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
515    fn drain_pending_thumbnails(
516        &mut self,
517        hub: &Sender<Event>,
518        database: &Database,
519        settings: &Settings,
520    ) {
521        if self.is_running(&TaskId::ThumbnailExtraction)
522            || self.pending_thumbnail_indices.is_empty()
523        {
524            return;
525        }
526
527        let Some(next) = self.pending_thumbnail_indices.pop_front() else {
528            return;
529        };
530        self.schedule_thumbnail_extraction(next, hub, database, settings);
531    }
532
533    /// Returns `true` if a task with the given ID is running.
534    pub fn is_running(&mut self, id: &TaskId) -> bool {
535        self.cleanup_finished();
536        self.tasks.contains_key(id)
537    }
538
539    /// Returns the IDs of all running tasks.
540    pub fn running_tasks(&mut self) -> Vec<TaskId> {
541        self.cleanup_finished();
542        self.tasks.keys().cloned().collect()
543    }
544}
545
546impl Default for TaskManager {
547    fn default() -> Self {
548        Self::new()
549    }
550}
551
552impl Drop for TaskManager {
553    fn drop(&mut self) {
554        self.stop_all();
555    }
556}
557
558/// Registers background tasks that run at startup.
559///
560/// Call this during startup to add background tasks.
561/// Currently registers:
562/// - [`wifi_status_monitor::WifiStatusMonitorTask`] - monitors WiFi status via dhcpcd-dbus (kobo only)
563/// - [`hello_world::HelloWorldTask`] - prints "Hello world!" every minute (test only)
564/// - [`dbus_monitor::DbusMonitorTask`] - monitors D-Bus signals (test + kobo only, when `settings.logging.enable_dbus_log` is true)
565/// - [`import::ImportTask`] - imports all libraries if `settings.import.startup_trigger` is set
566/// - [`dictionary_index::DictionaryIndexTask`] - indexes `.index` dictionary files into SQLite
567pub fn register_startup_tasks(
568    manager: &mut TaskManager,
569    hub: Sender<Event>,
570    settings: &Settings,
571    database: &Database,
572) {
573    #[cfg(feature = "kobo")]
574    {
575        let task = Box::new(wifi_status_monitor::WifiStatusMonitorTask);
576        if let Err(e) = manager.start(task, hub.clone()) {
577            tracing::warn!(error = %e, "failed to start wifi_status_monitor task");
578        }
579    }
580
581    #[cfg(feature = "test")]
582    {
583        let task = Box::new(hello_world::HelloWorldTask);
584        if let Err(e) = manager.start(task, hub.clone()) {
585            tracing::warn!(error = %e, "failed to start hello_world task");
586        }
587
588        #[cfg(feature = "kobo")]
589        if settings.logging.enable_dbus_log {
590            let task = Box::new(dbus_monitor::DbusMonitorTask);
591            if let Err(e) = manager.start(task, hub.clone()) {
592                tracing::warn!(error = %e, "failed to start dbus_monitor task");
593            }
594        }
595    }
596
597    if settings.import.startup_trigger {
598        manager.schedule_import(None, &hub, database, settings);
599    }
600
601    let task = Box::new(dictionary_index::DictionaryIndexTask::new(database.clone()));
602    if let Err(e) = manager.start(task, hub.clone()) {
603        tracing::warn!(error = %e, "failed to start dictionary_index task");
604    }
605}
606
607#[cfg(test)]
608mod tests {
609    use super::*;
610    use std::sync::mpsc;
611    use std::time::{Duration, Instant};
612
613    fn wait_until_not_running(manager: &mut TaskManager, id: &TaskId) {
614        let deadline = Instant::now() + Duration::from_secs(5);
615        while Instant::now() < deadline {
616            if !manager.is_running(id) {
617                return;
618            }
619            std::thread::sleep(Duration::from_millis(1));
620        }
621        panic!("task '{id}' did not finish within timeout");
622    }
623
624    struct InstantTask;
625
626    impl BackgroundTask for InstantTask {
627        fn id(&self) -> TaskId {
628            TaskId::TestTask2
629        }
630
631        fn run(&mut self, _hub: &Sender<Event>, _shutdown: &ShutdownSignal) {}
632    }
633
634    struct WaitingTask;
635
636    impl BackgroundTask for WaitingTask {
637        fn id(&self) -> TaskId {
638            TaskId::TestTask
639        }
640
641        fn run(&mut self, _hub: &Sender<Event>, shutdown: &ShutdownSignal) {
642            shutdown.wait(Duration::from_secs(60));
643        }
644    }
645
646    #[test]
647    fn start_and_stop() {
648        let mut manager = TaskManager::new();
649        let (hub, _rx) = mpsc::channel();
650
651        let id = manager.start(Box::new(WaitingTask), hub).unwrap();
652        assert!(manager.is_running(&id));
653
654        manager.stop(&id).unwrap();
655        assert!(!manager.is_running(&id));
656    }
657
658    #[test]
659    fn duplicate_start_returns_error() {
660        let mut manager = TaskManager::new();
661        let (hub, _rx) = mpsc::channel();
662
663        manager.start(Box::new(WaitingTask), hub.clone()).unwrap();
664        let err = manager.start(Box::new(WaitingTask), hub).unwrap_err();
665
666        assert!(matches!(err, TaskError::AlreadyRunning(TaskId::TestTask)));
667    }
668
669    #[test]
670    fn finished_task_is_cleaned_up() {
671        let mut manager = TaskManager::new();
672        let (hub, _rx) = mpsc::channel();
673
674        let id = manager.start(Box::new(InstantTask), hub).unwrap();
675
676        wait_until_not_running(&mut manager, &id);
677        assert!(!manager.is_running(&id));
678    }
679
680    #[test]
681    fn stop_finished_task_returns_not_running() {
682        let mut manager = TaskManager::new();
683        let (hub, _rx) = mpsc::channel();
684
685        let id = manager.start(Box::new(InstantTask), hub).unwrap();
686
687        wait_until_not_running(&mut manager, &id);
688        let err = manager.stop(&id).unwrap_err();
689
690        assert!(matches!(err, TaskError::NotRunning(TaskId::TestTask2)));
691    }
692
693    #[test]
694    fn running_tasks_excludes_finished() {
695        let mut manager = TaskManager::new();
696        let (hub, _rx) = mpsc::channel();
697
698        manager.start(Box::new(WaitingTask), hub.clone()).unwrap();
699        let instant_id = manager.start(Box::new(InstantTask), hub).unwrap();
700
701        wait_until_not_running(&mut manager, &instant_id);
702        let running = manager.running_tasks();
703
704        assert_eq!(running.len(), 1);
705        assert_eq!(running[0], TaskId::TestTask);
706
707        manager.stop_all();
708    }
709
710    #[test]
711    fn stop_all_stops_everything() {
712        let mut manager = TaskManager::new();
713        let (hub, _rx) = mpsc::channel();
714
715        manager.start(Box::new(WaitingTask), hub).unwrap();
716        manager.stop_all();
717
718        assert!(!manager.is_running(&TaskId::TestTask));
719    }
720
721    #[test]
722    fn test_thumbnail_extraction_task_lifecycle() {
723        let mut manager = TaskManager::new();
724        let (hub, _rx) = mpsc::channel();
725        let database = Database::new(":memory:").unwrap();
726        database.migrate().unwrap();
727        let settings = Settings::default();
728
729        manager.schedule_thumbnail_extraction(None, &hub, &database, &settings);
730
731        // Task exits quickly on an unseeded database, so wait for
732        // completion rather than asserting the transient running state.
733        wait_until_not_running(&mut manager, &TaskId::ThumbnailExtraction);
734        assert!(!manager.is_running(&TaskId::ThumbnailExtraction));
735
736        let err = manager.stop(&TaskId::ThumbnailExtraction).unwrap_err();
737        assert!(matches!(
738            err,
739            TaskError::NotRunning(TaskId::ThumbnailExtraction)
740        ));
741    }
742
743    #[test]
744    fn thumbnail_extraction_queues_when_running() {
745        let mut manager = TaskManager::new();
746        let (hub, _rx) = mpsc::channel();
747
748        // Simulate a running ThumbnailExtraction task with a blocking thread.
749        let (shutdown_tx, shutdown_rx) = mpsc::channel();
750        let blocking_handle = thread::spawn(move || {
751            let _ = shutdown_rx.recv();
752        });
753        manager.tasks.insert(
754            TaskId::ThumbnailExtraction,
755            RunningTask {
756                handle: blocking_handle,
757                shutdown: shutdown_tx,
758                finished_event: None,
759            },
760        );
761
762        let database = Database::new(":memory:").unwrap();
763        database.migrate().unwrap();
764        let settings = Settings::default();
765
766        manager.schedule_thumbnail_extraction(Some(0), &hub, &database, &settings);
767        manager.schedule_thumbnail_extraction(Some(1), &hub, &database, &settings);
768
769        assert_eq!(manager.pending_thumbnail_indices.len(), 2);
770
771        manager.stop(&TaskId::ThumbnailExtraction).unwrap();
772
773        manager.drain_pending_thumbnails(&hub, &database, &settings);
774        assert_eq!(manager.pending_thumbnail_indices.len(), 1);
775
776        wait_until_not_running(&mut manager, &TaskId::ThumbnailExtraction);
777
778        manager.drain_pending_thumbnails(&hub, &database, &settings);
779        assert!(manager.pending_thumbnail_indices.is_empty());
780
781        wait_until_not_running(&mut manager, &TaskId::ThumbnailExtraction);
782    }
783}