use axum::{Json, Router, extract::State, routing::get}; use native_tls::TlsConnector; use printstats::*; use reqwest::Client; use rumqttc::{AsyncClient, Event, MqttOptions, Packet, QoS, Transport}; use std::collections::HashMap; use std::fs; use std::sync::Arc; use std::time::Duration; use tokio::sync::Mutex; type StateMap = Arc>>; #[tokio::main] async fn main() { tracing_subscriber::fmt::init(); let content = fs::read_to_string("config.toml").expect("Couldn't read config.toml"); let config = Config::load(&content); let state: StateMap = Arc::new(Mutex::new(HashMap::new())); for printer in config.printers { match printer { Printer::Prusa { name, host, api_key, } => { tracing::info!(name, host, "Found Prusa"); tokio::spawn(poll_prusa(name, host, api_key, state.clone())); } Printer::Bambu { name, host, serial_number, access_code, } => { tracing::info!(name, host, "Found Bambu"); tokio::spawn(poll_bambu( name, host, serial_number, access_code, state.clone(), )); } } } let app = Router::new().route("/", get(root)).with_state(state); let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); axum::serve(listener, app).await.unwrap(); } async fn fetch_prusa( client: &Client, name: &str, host: &str, api_key: &str, state: &StateMap, ) -> Result<(), Box> { let response = client .get(format!("http://{}/api/v1/status", host)) .header("X-Api-Key", api_key) .send() .await? .json::() .await?; let mut lock = state.lock().await; let entry = lock.entry(name.to_owned()).or_default(); entry.update_from(&response); Ok(()) } async fn poll_prusa(name: String, host: String, api_key: String, state: StateMap) { let client = Client::new(); loop { if let Err(e) = fetch_prusa(&client, &name, &host, &api_key, &state).await { tracing::error!(name, error = %e, "Error polling Prusa printer"); } tokio::time::sleep(Duration::from_secs(5)).await; } } async fn poll_bambu( name: String, host: String, serial_number: String, access_code: String, state: StateMap, ) { let mut mqttoptions = MqttOptions::new(&name, &host, 8883); mqttoptions.set_keep_alive(Duration::from_secs(5)); mqttoptions.set_credentials("bblp", &access_code); // Bambu printers use TLS with a self-signed certificate let connector = TlsConnector::builder() .danger_accept_invalid_certs(true) .build() .unwrap(); mqttoptions.set_transport(Transport::tls_with_config(connector.into())); let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); loop { // eventloop.poll() yields back to Tokio when there's no data match eventloop.poll().await { Ok(Event::Incoming(Packet::ConnAck(_))) => { client .subscribe(format!("device/{}/report", serial_number), QoS::AtMostOnce) .await .unwrap(); // Send a request to push all values to us; otherwise we'll have to wait for the values to come in incremental updates. client .publish( format!("device/{}/request", serial_number), QoS::AtMostOnce, false, r#"{ "pushing": { "sequence_id": "0", "command": "pushall", "version": 1, "push_target": 1 } }"#, ) .await .unwrap(); } Ok(Event::Incoming(Packet::Publish(p))) => { tracing::debug!(payload = ?p.payload, "Received Bambu payload"); match serde_json::from_slice::(&p.payload) { Ok(msg) => { let mut lock = state.lock().await; let entry = lock.entry(name.clone()).or_default(); entry.update_from(&msg); tracing::debug!(name, payload = ?p.payload, "Updated state"); } Err(e) => tracing::error!(error = %e, "Failed to deserialize BambuStatus"), } } Ok(_) => {} Err(e) => { tracing::error!(error = ?e, "MQTT error"); tokio::time::sleep(Duration::from_secs(5)).await; // Simple retry } } } } async fn root(State(state): State) -> Json> { let lock = state.lock().await; Json(lock.clone()) }