1#[cfg(any(all(feature = "test", feature = "kobo"), doc))]
38mod dbus_monitor;
39pub mod dictionary_index;
40#[cfg(any(feature = "test", doc))]
41mod hello_world;
42pub mod import;
43pub mod thumbnail;
44#[cfg(any(feature = "kobo", doc))]
45mod wifi_status_monitor;
46
47use std::collections::{HashMap, VecDeque};
48use std::sync::atomic::{AtomicBool, Ordering};
49use std::sync::mpsc::{self, Receiver, Sender};
50use std::thread::{self, JoinHandle};
51use std::time::Duration;
52
53use thiserror::Error;
54
55use crate::db::Database;
56use crate::settings::Settings;
57use crate::view::Event;
58
59#[derive(Error, Debug)]
61pub enum TaskError {
62 #[error("task '{0}' is already running")]
64 AlreadyRunning(TaskId),
65
66 #[error("task '{0}' is not running")]
68 NotRunning(TaskId),
69}
70
71#[derive(Debug, Clone, PartialEq, Eq, Hash)]
73pub enum TaskId {
74 Placeholder,
76 Import,
78 ThumbnailExtraction,
80 DictionaryIndex,
82 #[cfg(any(feature = "test", doc))]
84 HelloWorld,
85 #[cfg(any(all(feature = "test", feature = "kobo"), doc))]
87 DbusMonitor,
88 #[cfg(any(feature = "kobo", doc))]
90 WifiStatusMonitor,
91 #[cfg(test)]
93 TestTask,
94 #[cfg(test)]
96 TestTask2,
97}
98
99impl std::fmt::Display for TaskId {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 match self {
102 TaskId::Placeholder => write!(f, "placeholder"),
103 TaskId::Import => write!(f, "import"),
104 TaskId::ThumbnailExtraction => write!(f, "thumbnail_extraction"),
105 TaskId::DictionaryIndex => write!(f, "dictionary_index"),
106 #[cfg(feature = "test")]
107 TaskId::HelloWorld => write!(f, "hello_world"),
108 #[cfg(all(feature = "test", feature = "kobo"))]
109 TaskId::DbusMonitor => write!(f, "dbus_monitor"),
110 #[cfg(feature = "kobo")]
111 TaskId::WifiStatusMonitor => write!(f, "wifi_status_monitor"),
112 #[cfg(test)]
113 TaskId::TestTask => write!(f, "test_task"),
114 #[cfg(test)]
115 TaskId::TestTask2 => write!(f, "test_task_2"),
116 }
117 }
118}
119
120pub struct ShutdownSignal {
125 receiver: Receiver<()>,
126 _sender_anchor: Option<Sender<()>>,
129 stopped: AtomicBool,
130}
131
132impl ShutdownSignal {
133 fn new(receiver: Receiver<()>) -> Self {
134 Self {
135 receiver,
136 _sender_anchor: None,
137 stopped: AtomicBool::new(false),
138 }
139 }
140
141 pub fn never() -> Self {
146 let (tx, rx) = mpsc::channel();
147 Self {
148 receiver: rx,
149 _sender_anchor: Some(tx),
150 stopped: AtomicBool::new(false),
151 }
152 }
153
154 #[cfg(test)]
160 pub fn new_for_test(receiver: Receiver<()>) -> Self {
161 Self::new(receiver)
162 }
163
164 pub fn should_stop(&self) -> bool {
170 if self.stopped.load(Ordering::Acquire) {
171 return true;
172 }
173 if self.receiver.try_recv().is_ok() {
174 self.stopped.store(true, Ordering::Release);
175 return true;
176 }
177 false
178 }
179
180 pub fn wait(&self, duration: Duration) -> bool {
186 if self.stopped.load(Ordering::Acquire) {
187 return true;
188 }
189 match self.receiver.recv_timeout(duration) {
190 Ok(()) | Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
191 self.stopped.store(true, Ordering::Release);
192 true
193 }
194 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => false,
195 }
196 }
197}
198
199pub trait BackgroundTask: Send {
205 fn id(&self) -> TaskId;
207
208 fn run(&mut self, hub: &Sender<Event>, shutdown: &ShutdownSignal);
213
214 fn stop(&mut self) {}
218
219 fn finished_event(&self) -> Option<Event> {
224 None
225 }
226}
227
228struct RunningTask {
229 handle: JoinHandle<()>,
230 shutdown: Sender<()>,
231 finished_event: Option<Event>,
233}
234
235pub struct TaskManager {
240 tasks: HashMap<TaskId, RunningTask>,
241 pending_import_indices: VecDeque<Option<usize>>,
243 pending_thumbnail_indices: VecDeque<Option<usize>>,
245 buffered_events: Vec<Event>,
247}
248
249impl TaskManager {
250 pub fn new() -> Self {
252 Self {
253 tasks: HashMap::new(),
254 pending_import_indices: VecDeque::new(),
255 pending_thumbnail_indices: VecDeque::new(),
256 buffered_events: Vec::new(),
257 }
258 }
259
260 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self, task, hub), fields(task_id = tracing::field::Empty), ret))]
267 pub fn start(
268 &mut self,
269 task: Box<dyn BackgroundTask>,
270 hub: Sender<Event>,
271 ) -> Result<TaskId, TaskError> {
272 let id = task.id();
273
274 #[cfg(feature = "tracing")]
275 tracing::Span::current().record("task_id", tracing::field::display(&id));
276
277 if self.is_running(&id) {
278 return Err(TaskError::AlreadyRunning(id));
279 }
280
281 let (shutdown_tx, shutdown_rx) = mpsc::channel();
282 let shutdown_signal = ShutdownSignal::new(shutdown_rx);
283
284 let finished_event = task.finished_event();
285
286 let handle = thread::spawn(move || {
287 let mut task = task;
288 tracing::info!("task started");
289 task.run(&hub, &shutdown_signal);
290 task.stop();
291 tracing::info!("task stopped");
292 });
293
294 self.tasks.insert(
295 id.clone(),
296 RunningTask {
297 handle,
298 shutdown: shutdown_tx,
299 finished_event,
300 },
301 );
302
303 tracing::info!("task registered");
304 Ok(id)
305 }
306
307 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self), fields(task_id = %id), ret))]
312 pub fn stop(&mut self, id: &TaskId) -> Result<(), TaskError> {
313 self.cleanup_finished();
314 if let Some(task) = self.tasks.remove(id) {
315 tracing::info!("sending shutdown signal");
316 if let Err(e) = task.shutdown.send(()) {
317 tracing::error!(error = %e, "failed to send shutdown signal");
318 }
319 if task.handle.join().is_err() {
320 tracing::error!("task thread panicked");
321 }
322 Ok(())
323 } else {
324 Err(TaskError::NotRunning(id.clone()))
325 }
326 }
327
328 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self), fields(task_count = tracing::field::Empty)))]
332 pub fn stop_all(&mut self) {
333 let tasks: Vec<_> = self.tasks.drain().collect();
334
335 #[cfg(feature = "tracing")]
336 tracing::Span::current().record("task_count", tasks.len());
337
338 if !tasks.is_empty() {
339 tracing::info!("stopping all tasks");
340 }
341 for (_, task) in &tasks {
342 if let Err(e) = task.shutdown.send(()) {
343 tracing::error!(error = %e, "failed to send shutdown signal");
344 }
345 }
346 for (_, task) in tasks {
347 if task.handle.join().is_err() {
348 tracing::error!("task thread panicked");
349 }
350 }
351 }
352
353 fn cleanup_finished(&mut self) {
356 let finished: Vec<TaskId> = self
357 .tasks
358 .iter()
359 .filter(|(_, task)| task.handle.is_finished())
360 .map(|(id, _)| id.clone())
361 .collect();
362
363 for id in finished {
364 if let Some(task) = self.tasks.remove(&id) {
365 if task.handle.join().is_ok() {
366 if let Some(evt) = task.finished_event {
367 self.buffered_events.push(evt);
368 }
369 } else {
370 tracing::error!(task_id = %id, "task thread panicked");
371 }
372 }
373 }
374 }
375
376 fn flush_buffered_events(&mut self, hub: &Sender<Event>) {
378 for evt in self.buffered_events.drain(..) {
379 hub.send(evt).ok();
380 }
381 }
382
383 #[cfg_attr(
388 feature = "tracing",
389 tracing::instrument(skip(self, hub, database, settings))
390 )]
391 pub fn handle_event(
392 &mut self,
393 evt: &Event,
394 hub: &Sender<Event>,
395 database: &Database,
396 settings: &Settings,
397 ) -> bool {
398 self.cleanup_finished();
399 self.flush_buffered_events(hub);
400
401 match evt {
402 Event::ImportLibrary { library_index } => {
403 self.schedule_import(*library_index, hub, database, settings);
404 }
405 Event::ImportFinished { library_index } => {
406 self.drain_pending_imports(hub, database, settings);
407 self.schedule_thumbnail_extraction(*library_index, hub, database, settings);
408 }
409 Event::ThumbnailExtractionFinished { .. } => {
410 self.drain_pending_thumbnails(hub, database, settings);
411 }
412 Event::ReindexDictionaries => {
413 self.schedule_dictionary_index(hub, database);
414 }
415 _ => {}
416 }
417 false
418 }
419
420 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
422 fn schedule_import(
423 &mut self,
424 library_index: Option<usize>,
425 hub: &Sender<Event>,
426 database: &Database,
427 settings: &Settings,
428 ) {
429 if self.is_running(&TaskId::Import) {
430 tracing::info!(library_index = ?library_index, "import already running, queueing");
431 self.pending_import_indices.push_back(library_index);
432 return;
433 }
434
435 self.flush_buffered_events(hub);
436
437 let task = Box::new(import::ImportTask::new(
438 database.clone(),
439 settings.clone(),
440 library_index,
441 ));
442
443 if let Err(e) = self.start(task, hub.clone()) {
444 tracing::warn!(error = %e, "failed to start import task");
445 }
446 }
447
448 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
450 fn drain_pending_imports(
451 &mut self,
452 hub: &Sender<Event>,
453 database: &Database,
454 settings: &Settings,
455 ) {
456 if self.is_running(&TaskId::Import) || self.pending_import_indices.is_empty() {
457 return;
458 }
459
460 let Some(next) = self.pending_import_indices.pop_front() else {
461 return;
462 };
463 self.schedule_import(next, hub, database, settings);
464 }
465
466 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
468 fn schedule_dictionary_index(&mut self, hub: &Sender<Event>, database: &Database) {
469 if self.is_running(&TaskId::DictionaryIndex) {
470 tracing::debug!("stopping running dictionary index task for restart");
471 if let Err(e) = self.stop(&TaskId::DictionaryIndex) {
472 tracing::warn!(error = %e, "failed to stop dictionary_index task for restart");
473 }
474 }
475
476 self.flush_buffered_events(hub);
477
478 let task = Box::new(dictionary_index::DictionaryIndexTask::new(database.clone()));
479
480 if let Err(e) = self.start(task, hub.clone()) {
481 tracing::warn!(error = %e, "failed to start dictionary_index task");
482 }
483 }
484
485 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
487 pub fn schedule_thumbnail_extraction(
488 &mut self,
489 library_index: Option<usize>,
490 hub: &Sender<Event>,
491 database: &Database,
492 settings: &Settings,
493 ) {
494 if self.is_running(&TaskId::ThumbnailExtraction) {
495 tracing::info!(library_index = ?library_index, "thumbnail extraction already running, queueing");
496 self.pending_thumbnail_indices.push_back(library_index);
497 return;
498 }
499
500 self.flush_buffered_events(hub);
501
502 let task = Box::new(thumbnail::ThumbnailExtractionTask::new(
503 database.clone(),
504 settings.clone(),
505 library_index,
506 ));
507
508 if let Err(e) = self.start(task, hub.clone()) {
509 tracing::warn!(error = %e, "failed to start thumbnail extraction task");
510 }
511 }
512
513 #[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
515 fn drain_pending_thumbnails(
516 &mut self,
517 hub: &Sender<Event>,
518 database: &Database,
519 settings: &Settings,
520 ) {
521 if self.is_running(&TaskId::ThumbnailExtraction)
522 || self.pending_thumbnail_indices.is_empty()
523 {
524 return;
525 }
526
527 let Some(next) = self.pending_thumbnail_indices.pop_front() else {
528 return;
529 };
530 self.schedule_thumbnail_extraction(next, hub, database, settings);
531 }
532
533 pub fn is_running(&mut self, id: &TaskId) -> bool {
535 self.cleanup_finished();
536 self.tasks.contains_key(id)
537 }
538
539 pub fn running_tasks(&mut self) -> Vec<TaskId> {
541 self.cleanup_finished();
542 self.tasks.keys().cloned().collect()
543 }
544}
545
546impl Default for TaskManager {
547 fn default() -> Self {
548 Self::new()
549 }
550}
551
552impl Drop for TaskManager {
553 fn drop(&mut self) {
554 self.stop_all();
555 }
556}
557
558pub fn register_startup_tasks(
568 manager: &mut TaskManager,
569 hub: Sender<Event>,
570 settings: &Settings,
571 database: &Database,
572) {
573 #[cfg(feature = "kobo")]
574 {
575 let task = Box::new(wifi_status_monitor::WifiStatusMonitorTask);
576 if let Err(e) = manager.start(task, hub.clone()) {
577 tracing::warn!(error = %e, "failed to start wifi_status_monitor task");
578 }
579 }
580
581 #[cfg(feature = "test")]
582 {
583 let task = Box::new(hello_world::HelloWorldTask);
584 if let Err(e) = manager.start(task, hub.clone()) {
585 tracing::warn!(error = %e, "failed to start hello_world task");
586 }
587
588 #[cfg(feature = "kobo")]
589 if settings.logging.enable_dbus_log {
590 let task = Box::new(dbus_monitor::DbusMonitorTask);
591 if let Err(e) = manager.start(task, hub.clone()) {
592 tracing::warn!(error = %e, "failed to start dbus_monitor task");
593 }
594 }
595 }
596
597 if settings.import.startup_trigger {
598 manager.schedule_import(None, &hub, database, settings);
599 }
600
601 let task = Box::new(dictionary_index::DictionaryIndexTask::new(database.clone()));
602 if let Err(e) = manager.start(task, hub.clone()) {
603 tracing::warn!(error = %e, "failed to start dictionary_index task");
604 }
605}
606
607#[cfg(test)]
608mod tests {
609 use super::*;
610 use std::sync::mpsc;
611 use std::time::{Duration, Instant};
612
613 fn wait_until_not_running(manager: &mut TaskManager, id: &TaskId) {
614 let deadline = Instant::now() + Duration::from_secs(5);
615 while Instant::now() < deadline {
616 if !manager.is_running(id) {
617 return;
618 }
619 std::thread::sleep(Duration::from_millis(1));
620 }
621 panic!("task '{id}' did not finish within timeout");
622 }
623
624 struct InstantTask;
625
626 impl BackgroundTask for InstantTask {
627 fn id(&self) -> TaskId {
628 TaskId::TestTask2
629 }
630
631 fn run(&mut self, _hub: &Sender<Event>, _shutdown: &ShutdownSignal) {}
632 }
633
634 struct WaitingTask;
635
636 impl BackgroundTask for WaitingTask {
637 fn id(&self) -> TaskId {
638 TaskId::TestTask
639 }
640
641 fn run(&mut self, _hub: &Sender<Event>, shutdown: &ShutdownSignal) {
642 shutdown.wait(Duration::from_secs(60));
643 }
644 }
645
646 #[test]
647 fn start_and_stop() {
648 let mut manager = TaskManager::new();
649 let (hub, _rx) = mpsc::channel();
650
651 let id = manager.start(Box::new(WaitingTask), hub).unwrap();
652 assert!(manager.is_running(&id));
653
654 manager.stop(&id).unwrap();
655 assert!(!manager.is_running(&id));
656 }
657
658 #[test]
659 fn duplicate_start_returns_error() {
660 let mut manager = TaskManager::new();
661 let (hub, _rx) = mpsc::channel();
662
663 manager.start(Box::new(WaitingTask), hub.clone()).unwrap();
664 let err = manager.start(Box::new(WaitingTask), hub).unwrap_err();
665
666 assert!(matches!(err, TaskError::AlreadyRunning(TaskId::TestTask)));
667 }
668
669 #[test]
670 fn finished_task_is_cleaned_up() {
671 let mut manager = TaskManager::new();
672 let (hub, _rx) = mpsc::channel();
673
674 let id = manager.start(Box::new(InstantTask), hub).unwrap();
675
676 wait_until_not_running(&mut manager, &id);
677 assert!(!manager.is_running(&id));
678 }
679
680 #[test]
681 fn stop_finished_task_returns_not_running() {
682 let mut manager = TaskManager::new();
683 let (hub, _rx) = mpsc::channel();
684
685 let id = manager.start(Box::new(InstantTask), hub).unwrap();
686
687 wait_until_not_running(&mut manager, &id);
688 let err = manager.stop(&id).unwrap_err();
689
690 assert!(matches!(err, TaskError::NotRunning(TaskId::TestTask2)));
691 }
692
693 #[test]
694 fn running_tasks_excludes_finished() {
695 let mut manager = TaskManager::new();
696 let (hub, _rx) = mpsc::channel();
697
698 manager.start(Box::new(WaitingTask), hub.clone()).unwrap();
699 let instant_id = manager.start(Box::new(InstantTask), hub).unwrap();
700
701 wait_until_not_running(&mut manager, &instant_id);
702 let running = manager.running_tasks();
703
704 assert_eq!(running.len(), 1);
705 assert_eq!(running[0], TaskId::TestTask);
706
707 manager.stop_all();
708 }
709
710 #[test]
711 fn stop_all_stops_everything() {
712 let mut manager = TaskManager::new();
713 let (hub, _rx) = mpsc::channel();
714
715 manager.start(Box::new(WaitingTask), hub).unwrap();
716 manager.stop_all();
717
718 assert!(!manager.is_running(&TaskId::TestTask));
719 }
720
721 #[test]
722 fn test_thumbnail_extraction_task_lifecycle() {
723 let mut manager = TaskManager::new();
724 let (hub, _rx) = mpsc::channel();
725 let database = Database::new(":memory:").unwrap();
726 database.migrate().unwrap();
727 let settings = Settings::default();
728
729 manager.schedule_thumbnail_extraction(None, &hub, &database, &settings);
730
731 wait_until_not_running(&mut manager, &TaskId::ThumbnailExtraction);
734 assert!(!manager.is_running(&TaskId::ThumbnailExtraction));
735
736 let err = manager.stop(&TaskId::ThumbnailExtraction).unwrap_err();
737 assert!(matches!(
738 err,
739 TaskError::NotRunning(TaskId::ThumbnailExtraction)
740 ));
741 }
742
743 #[test]
744 fn thumbnail_extraction_queues_when_running() {
745 let mut manager = TaskManager::new();
746 let (hub, _rx) = mpsc::channel();
747
748 let (shutdown_tx, shutdown_rx) = mpsc::channel();
750 let blocking_handle = thread::spawn(move || {
751 let _ = shutdown_rx.recv();
752 });
753 manager.tasks.insert(
754 TaskId::ThumbnailExtraction,
755 RunningTask {
756 handle: blocking_handle,
757 shutdown: shutdown_tx,
758 finished_event: None,
759 },
760 );
761
762 let database = Database::new(":memory:").unwrap();
763 database.migrate().unwrap();
764 let settings = Settings::default();
765
766 manager.schedule_thumbnail_extraction(Some(0), &hub, &database, &settings);
767 manager.schedule_thumbnail_extraction(Some(1), &hub, &database, &settings);
768
769 assert_eq!(manager.pending_thumbnail_indices.len(), 2);
770
771 manager.stop(&TaskId::ThumbnailExtraction).unwrap();
772
773 manager.drain_pending_thumbnails(&hub, &database, &settings);
774 assert_eq!(manager.pending_thumbnail_indices.len(), 1);
775
776 wait_until_not_running(&mut manager, &TaskId::ThumbnailExtraction);
777
778 manager.drain_pending_thumbnails(&hub, &database, &settings);
779 assert!(manager.pending_thumbnail_indices.is_empty());
780
781 wait_until_not_running(&mut manager, &TaskId::ThumbnailExtraction);
782 }
783}