use native_tls::TlsConnector; use printstats::*; use reqwest::Client; use rumqttc::{AsyncClient, Event, MqttOptions, Packet, QoS, Transport}; use std::collections::HashMap; use std::fs; use std::sync::Arc; use std::time::Duration; use tokio::sync::Mutex; #[tokio::main] async fn main() { let content = fs::read_to_string("config.toml").expect("Couldn't read config.toml"); let config = Config::load(&content); let state: Arc>> = Arc::new(Mutex::new(HashMap::new())); for printer in config.printers { match printer { Printer::Prusa { name, host, api_key, .. } => { let state_clone = state.clone(); tokio::spawn(async move { loop { let result: Result<(), Box> = async { let client = Client::new(); let response = client .get(format!("http://{}/api/v1/status", host)) .header("X-Api-Key", &api_key) .send() .await? .json::() .await?; let mut lock = state_clone.lock().await; lock.entry(name.clone()) .and_modify(|prs| *prs = extract_status_from_prusa(&response)) .or_insert_with(|| { extract_status_from_prusa(&response) } ); Ok(()) } .await; if let Err(e) = result { eprintln!("Error polling Prusa printer {}: {}", name, e); } tokio::time::sleep(Duration::from_secs(5)).await; } }); } Printer::Bambu { name, host, serial_number, access_code, } => { println!("Found Bambu: {} at {}", name, host); let state_clone = Arc::clone(&state); tokio::spawn(async move { let mut mqttoptions = MqttOptions::new(&name, &host, 8883); mqttoptions.set_keep_alive(Duration::from_secs(5)); mqttoptions.set_credentials("bblp", &access_code); // Bambu printers use TLS with a self-signed certificate let connector = TlsConnector::builder() .danger_accept_invalid_certs(true) .build() .unwrap(); mqttoptions.set_transport(Transport::tls_with_config(connector.into())); let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); client .subscribe(format!("device/{}/report", serial_number), QoS::AtMostOnce) .await .unwrap(); loop { // eventloop.poll() yields back to Tokio when there's no data match eventloop.poll().await { Ok(notification) => { if let Event::Incoming(Packet::Publish(p)) = notification { let mut lock = state_clone.lock().await; println!("{:?}", &p.payload); if let Ok(msg) = serde_json::from_slice::(&p.payload) { lock.entry(name.clone()) .and_modify(|prs| prs.bed_temp = msg.print.bed_temper) .or_insert_with(|| PrinterState { bed_temp: msg.print.bed_temper, ..Default::default() }); println!("Updated state for {}: {:?}", &name, p.payload); } } } Err(e) => { eprintln!("MQTT error: {:?}", e); tokio::time::sleep(Duration::from_secs(5)).await; // Simple retry } } } }); } } } loop { tokio::select! { _ = tokio::signal::ctrl_c() => { println!("Shutting down..."); break; } _ = tokio::time::sleep(Duration::from_secs(1)) => { println!("-- PRINTER STATE --"); let lock = state.lock().await; for (key, value) in lock.iter() { println!("{}: {:?}", key, value); } } } } }