cadmus_core/task/
wifi_status_monitor.rs1use std::collections::HashMap;
8use std::sync::mpsc::Sender;
9use std::time::Duration;
10
11use futures_util::stream::StreamExt;
12
13#[cfg(feature = "otel")]
14use opentelemetry::trace::Status;
15#[cfg(feature = "otel")]
16use tracing_opentelemetry::OpenTelemetrySpanExt;
17
18use crate::input::DeviceEvent;
19use crate::task::{BackgroundTask, ShutdownSignal, TaskId};
20use crate::view::Event;
21
22const DHCPCCD_SERVICE: &str = "name.marples.roy.dhcpcd";
23const DHCPCCD_PATH: &str = "/name/marples/roy/dhcpcd";
24const DHCPCCD_INTERFACE: &str = "name.marples.roy.dhcpcd";
25const SHUTDOWN_POLL_INTERVAL: Duration = Duration::from_millis(200);
26
27pub struct WifiStatusMonitorTask;
29
30impl BackgroundTask for WifiStatusMonitorTask {
31 fn id(&self) -> TaskId {
32 TaskId::WifiStatusMonitor
33 }
34
35 fn run(&mut self, hub: &Sender<Event>, shutdown: &ShutdownSignal) {
36 let rt = match tokio::runtime::Runtime::new() {
37 Ok(rt) => rt,
38 Err(e) => {
39 tracing::error!(error = %e, "failed to create tokio runtime");
40 return;
41 }
42 };
43
44 rt.block_on(async {
45 if let Err(e) = monitor(hub, shutdown).await {
46 tracing::error!(error = %e, "wifi status monitor exited with error");
47 }
48 });
49 }
50}
51
52#[cfg_attr(feature = "otel", tracing::instrument(skip(connection, hub), ret(level=tracing::Level::TRACE)))]
53async fn check_initial_status(
54 connection: &zbus::Connection,
55 hub: &Sender<Event>,
56) -> Result<(), Box<dyn std::error::Error>> {
57 let proxy =
58 zbus::Proxy::new(connection, DHCPCCD_SERVICE, DHCPCCD_PATH, DHCPCCD_INTERFACE).await?;
59
60 let status: String = proxy.call("GetStatus", &()).await?;
61
62 tracing::debug!(status = %status, "initial dhcpcd status");
63
64 if status == "connected" {
65 tracing::info!("network already up at startup, sending NetUp event");
66 hub.send(Event::Device(DeviceEvent::NetUp)).ok();
67 }
68
69 Ok(())
70}
71
72async fn monitor(
73 hub: &Sender<Event>,
74 shutdown: &ShutdownSignal,
75) -> Result<(), Box<dyn std::error::Error>> {
76 let connection = zbus::Connection::system().await?;
77 tracing::info!("connected to system bus");
78
79 if let Err(e) = check_initial_status(&connection, hub).await {
80 tracing::warn!(error = %e, "failed to check initial dhcpcd status, will rely on signals");
81 }
82
83 let rule = zbus::MatchRule::builder()
84 .msg_type(zbus::message::Type::Signal)
85 .path(DHCPCCD_PATH)?
86 .member("WpaStatus")?
87 .build();
88
89 let mut stream = zbus::MessageStream::for_match_rule(rule, &connection, Some(100)).await?;
90
91 tracing::info!(
92 path = DHCPCCD_PATH,
93 member = "WpaStatus",
94 "subscribed to wifi status signals"
95 );
96
97 loop {
98 tokio::select! {
99 biased;
100
101 _ = async {
102 loop {
103 if shutdown.should_stop() {
104 return;
105 }
106 tokio::time::sleep(SHUTDOWN_POLL_INTERVAL).await;
107 }
108 } => {
109 tracing::info!("shutdown requested");
110 break;
111 }
112
113 msg = stream.next() => {
114 #[cfg(feature = "otel")]
115 let span = tracing::info_span!("wifi_status_monitor: received dbus message").entered();
116
117 let Some(msg) = msg else { break };
118 let msg = match msg {
119 Ok(m) => m,
120 Err(e) => {
121 #[cfg(feature = "otel")]
122 span.set_status(Status::error("failed to read dbus message"));
123 tracing::warn!(error = %e, "failed to read dbus message");
124 continue;
125 }
126 };
127
128 if let Err(e) = process_wpa_status(&msg, hub) {
129 #[cfg(feature = "otel")]
130 span.set_status(Status::error("failed to process wpa status"));
131 tracing::warn!(error = %e, "failed to process wpa status");
132 }
133 }
134 }
135 }
136
137 tracing::info!("wifi status monitor stopped");
138 Ok(())
139}
140
141#[cfg_attr(feature = "otel", tracing::instrument(skip(msg, hub), ret(level=tracing::Level::TRACE)))]
142fn process_wpa_status(
143 msg: &zbus::Message,
144 hub: &Sender<Event>,
145) -> Result<(), Box<dyn std::error::Error>> {
146 let body = msg.body();
147
148 let interfaces: HashMap<String, HashMap<String, String>> = body.deserialize()?;
149
150 check_interfaces(&interfaces, hub);
151
152 Ok(())
153}
154
155fn check_interfaces(interfaces: &HashMap<String, HashMap<String, String>>, hub: &Sender<Event>) {
160 for (interface_name, properties) in interfaces {
161 if let Some(wpa_state) = properties.get("wpa_state") {
162 tracing::debug!(
163 interface = %interface_name,
164 status = %wpa_state,
165 "WpaStatus received"
166 );
167
168 let has_ip = properties
169 .get("ip_address")
170 .is_some_and(|ip| !ip.is_empty());
171
172 if wpa_state == "COMPLETED" && has_ip {
173 tracing::info!("network up detected via dhcpcd-dbus");
174 hub.send(Event::Device(DeviceEvent::NetUp)).ok();
175 }
176 }
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183 use std::sync::mpsc;
184
185 #[test]
186 fn check_interfaces_sends_netup_when_wpa_completed() {
187 let (tx, rx) = mpsc::channel();
188
189 let mut interfaces = HashMap::new();
190 let mut properties = HashMap::new();
191 properties.insert("wpa_state".to_string(), "COMPLETED".to_string());
192 properties.insert("ip_address".to_string(), "127.0.0.1".to_string());
193 interfaces.insert("wlan0".to_string(), properties);
194
195 check_interfaces(&interfaces, &tx);
196
197 let event = rx.try_recv().expect("should receive NetUp event");
198 assert!(matches!(event, Event::Device(DeviceEvent::NetUp)));
199 }
200
201 #[test]
202 fn check_interfaces_handles_multiple_interfaces() {
203 let (tx, _rx) = mpsc::channel();
204
205 let mut wlan0_props = HashMap::new();
206 wlan0_props.insert("wpa_state".to_string(), "COMPLETED".to_string());
207 wlan0_props.insert("ip_address".to_string(), "127.0.0.1".to_string());
208
209 let mut wlan1_props = HashMap::new();
210 wlan1_props.insert("wpa_state".to_string(), "SCANNING".to_string());
211 wlan1_props.insert("ip_address".to_string(), "".to_string());
212
213 let mut interfaces = HashMap::new();
214 interfaces.insert("wlan0".to_string(), wlan0_props);
215 interfaces.insert("wlan1".to_string(), wlan1_props);
216
217 check_interfaces(&interfaces, &tx);
218 }
219
220 #[test]
221 fn check_interfaces_does_not_send_netup_without_ip_address() {
222 let (tx, rx) = mpsc::channel();
223
224 let mut interfaces = HashMap::new();
226 let mut properties = HashMap::new();
227 properties.insert("wpa_state".to_string(), "COMPLETED".to_string());
228 interfaces.insert("wlan0".to_string(), properties);
229
230 check_interfaces(&interfaces, &tx);
231
232 assert!(
233 rx.try_recv().is_err(),
234 "should not send NetUp before DHCP binding"
235 );
236 }
237
238 #[test]
239 fn check_interfaces_does_not_send_netup_with_empty_ip_address() {
240 let (tx, rx) = mpsc::channel();
241
242 let mut interfaces = HashMap::new();
243 let mut properties = HashMap::new();
244 properties.insert("wpa_state".to_string(), "COMPLETED".to_string());
245 properties.insert("ip_address".to_string(), "".to_string());
246 interfaces.insert("wlan0".to_string(), properties);
247
248 check_interfaces(&interfaces, &tx);
249
250 assert!(
251 rx.try_recv().is_err(),
252 "should not send NetUp with empty ip_address"
253 );
254 }
255}