cadmus_core/task/
dbus_monitor.rs1use std::sync::mpsc::Sender;
8use std::time::Duration;
9
10use futures_util::stream::StreamExt;
11
12use crate::task::{BackgroundTask, ShutdownSignal, TaskId};
13use crate::view::Event;
14
15const SHUTDOWN_POLL_INTERVAL: Duration = Duration::from_millis(200);
16
17pub struct DbusMonitorTask;
24
25impl BackgroundTask for DbusMonitorTask {
26 fn id(&self) -> TaskId {
27 TaskId::DbusMonitor
28 }
29
30 fn run(&mut self, _hub: &Sender<Event>, shutdown: &ShutdownSignal) {
31 let rt = match tokio::runtime::Runtime::new() {
32 Ok(rt) => rt,
33 Err(e) => {
34 tracing::error!(error = %e, "failed to create tokio runtime");
35 return;
36 }
37 };
38
39 rt.block_on(async {
40 if let Err(e) = monitor(shutdown).await {
41 tracing::error!(error = %e, "dbus monitor exited with error");
42 }
43 });
44 }
45}
46
47async fn monitor(shutdown: &ShutdownSignal) -> Result<(), Box<dyn std::error::Error>> {
48 let connection = zbus::Connection::system().await?;
49 tracing::info!("connected to system bus");
50
51 let rule = zbus::MatchRule::builder()
52 .msg_type(zbus::message::Type::Signal)
53 .build();
54
55 let mut stream = zbus::MessageStream::for_match_rule(rule, &connection, Some(100)).await?;
56
57 tracing::info!("subscribed to all signals");
58
59 loop {
60 tokio::select! {
61 biased;
62
63 _ = async {
64 loop {
65 if shutdown.should_stop() {
66 return;
67 }
68 tokio::time::sleep(SHUTDOWN_POLL_INTERVAL).await;
69 }
70 } => {
71 tracing::info!("shutdown requested");
72 break;
73 }
74
75 msg = stream.next() => {
76 let Some(msg) = msg else { break };
77 let msg = match msg {
78 Ok(m) => m,
79 Err(e) => {
80 tracing::warn!(error = %e, "failed to read dbus message");
81 continue;
82 }
83 };
84
85 let body = msg.body();
86 let body: Option<zbus::zvariant::Structure> = match body.deserialize() {
87 Ok(b) => Some(b),
88 Err(e) => {
89 tracing::warn!(error = %e, "failed to deserialize dbus message body");
90 None
91 }
92 };
93
94 let header = msg.header();
95 tracing::debug!(
96 dbus_message = ?msg,
97 dbus_sender = ?header.sender(),
98 dbus_path = ?header.path(),
99 dbus_interface = ?header.interface(),
100 dbus_member = ?header.member(),
101 dbus_body = ?body,
102
103 "dbus signal"
104 );
105 }
106 }
107 }
108
109 tracing::info!("dbus monitor stopped");
110 Ok(())
111}