From 409828278eba21c506abfac8c13851e1274ebc95 Mon Sep 17 00:00:00 2001 From: Serguey Parkhomovsky Date: Tue, 17 Mar 2026 20:40:37 -0700 Subject: Extract printer polling into separate functions --- src/main.rs | 163 ++++++++++++++++++++++++++++++------------------------------ 1 file changed, 81 insertions(+), 82 deletions(-) diff --git a/src/main.rs b/src/main.rs index f093e0a..9710430 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,92 +17,14 @@ async fn main() { for printer in config.printers { match printer { - Printer::Prusa { - name, - host, - api_key, - .. - } => { + Printer::Prusa { name, host, api_key, .. } => { let state_clone = state.clone(); - tokio::spawn(async move { - loop { - let result: Result<(), Box> = async { - let client = Client::new(); - let response = client - .get(format!("http://{}/api/v1/status", host)) - .header("X-Api-Key", &api_key) - .send() - .await? - .json::() - .await?; - let mut lock = state_clone.lock().await; - let entry = lock.entry(name.clone()).or_default(); - extract_status_from_prusa(&response, entry); - Ok(()) - } - .await; - if let Err(e) = result { - eprintln!("Error polling Prusa printer {}: {}", name, e); - } - tokio::time::sleep(Duration::from_secs(5)).await; - } - }); + tokio::spawn(poll_prusa(name, host, api_key, state_clone)); } - Printer::Bambu { - name, - host, - serial_number, - access_code, - } => { + Printer::Bambu { name, host, serial_number, access_code } => { println!("Found Bambu: {} at {}", name, host); let state_clone = Arc::clone(&state); - tokio::spawn(async move { - let mut mqttoptions = MqttOptions::new(&name, &host, 8883); - mqttoptions.set_keep_alive(Duration::from_secs(5)); - mqttoptions.set_credentials("bblp", &access_code); - - // Bambu printers use TLS with a self-signed certificate - let connector = TlsConnector::builder() - .danger_accept_invalid_certs(true) - .build() - .unwrap(); - mqttoptions.set_transport(Transport::tls_with_config(connector.into())); - - let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); - client - .subscribe(format!("device/{}/report", serial_number), QoS::AtMostOnce) - .await - .unwrap(); - - loop { - // eventloop.poll() yields back to Tokio when there's no data - match eventloop.poll().await { - Ok(notification) => { - if let Event::Incoming(Packet::Publish(p)) = notification { - let mut lock = state_clone.lock().await; - println!("{:?}", &p.payload); - match serde_json::from_slice::(&p.payload) { - Ok(msg) => { - let entry = lock.entry(name.clone()).or_default(); - extract_status_from_bambu(&msg, entry); - println!( - "Updated state for {}: {:?}", - &name, p.payload - ); - } - Err(e) => { - eprintln!("Failed to deserialize BambuStatus: {}", e); - } - } - } - } - Err(e) => { - eprintln!("MQTT error: {:?}", e); - tokio::time::sleep(Duration::from_secs(5)).await; // Simple retry - } - } - } - }); + tokio::spawn(poll_bambu(name, host, serial_number, access_code, state_clone)); } } } @@ -117,6 +39,83 @@ async fn main() { let _ = axum::serve(listener, app).await; } +async fn poll_prusa( + name: String, + host: String, + api_key: String, + state: Arc>>, +) { + let client = Client::new(); + loop { + let result: Result<(), Box> = async { + let response = client + .get(format!("http://{}/api/v1/status", host)) + .header("X-Api-Key", &api_key) + .send() + .await? + .json::() + .await?; + let mut lock = state.lock().await; + let entry = lock.entry(name.clone()).or_default(); + extract_status_from_prusa(&response, entry); + Ok(()) + } + .await; + if let Err(e) = result { + eprintln!("Error polling Prusa printer {}: {}", name, e); + } + tokio::time::sleep(Duration::from_secs(5)).await; + } +} + +async fn poll_bambu( + name: String, + host: String, + serial_number: String, + access_code: String, + state: Arc>>, +) { + let mut mqttoptions = MqttOptions::new(&name, &host, 8883); + mqttoptions.set_keep_alive(Duration::from_secs(5)); + mqttoptions.set_credentials("bblp", &access_code); + + // Bambu printers use TLS with a self-signed certificate + let connector = TlsConnector::builder() + .danger_accept_invalid_certs(true) + .build() + .unwrap(); + mqttoptions.set_transport(Transport::tls_with_config(connector.into())); + + let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); + client + .subscribe(format!("device/{}/report", serial_number), QoS::AtMostOnce) + .await + .unwrap(); + + loop { + // eventloop.poll() yields back to Tokio when there's no data + match eventloop.poll().await { + Ok(Event::Incoming(Packet::Publish(p))) => { + println!("{:?}", &p.payload); + match serde_json::from_slice::(&p.payload) { + Ok(msg) => { + let mut lock = state.lock().await; + let entry = lock.entry(name.clone()).or_default(); + extract_status_from_bambu(&msg, entry); + println!("Updated state for {}: {:?}", &name, p.payload); + } + Err(e) => eprintln!("Failed to deserialize BambuStatus: {}", e), + } + } + Ok(_) => {} + Err(e) => { + eprintln!("MQTT error: {:?}", e); + tokio::time::sleep(Duration::from_secs(5)).await; // Simple retry + } + } + } +} + async fn root( State(state): State>>>, ) -> Json> { -- cgit v1.2.3