use axum::{Json, Router, extract::State, routing::get}; use clap::Parser; use native_tls::TlsConnector; use printstats::*; use reqwest::Client; use rumqttc::{AsyncClient, Event, MqttOptions, Packet, QoS, Transport}; use std::collections::HashMap; use std::fs; use std::process; use std::sync::Arc; use std::sync::RwLock; use std::time::Duration; use tracing::{debug, error, info, warn}; type StateMap = Arc>>; #[derive(Parser, Debug)] #[command(about = "Print statistics scraper")] struct Args { /// Path to the configuration file #[arg(long, default_value = "config.toml")] config: String, /// Address to bind the HTTP server to #[arg(long, default_value = "0.0.0.0:3000")] bind: String, } #[tokio::main] async fn main() { tracing_subscriber::fmt::init(); let args = Args::parse(); let content = fs::read_to_string(&args.config).unwrap_or_else(|e| { error!(path = %args.config, error = %e, "Failed to read config file"); process::exit(1); }); let config = Config::load(&content).unwrap_or_else(|e| { error!(path = %args.config, error = %e, "Failed to parse config file"); process::exit(1); }); let state: StateMap = Arc::new(RwLock::new(HashMap::new())); for printer in config.printers { let state = state.clone(); match printer { Printer::Prusa { name, host, api_key, } => { info!(name, host, "Found Prusa"); tokio::spawn(async move { match tokio::spawn(poll_prusa(name.clone(), host, api_key, state)).await { Ok(()) => warn!(name, "Prusa polling task exited unexpectedly"), Err(e) => error!(name, error = ?e, "Prusa polling task panicked"), } }); } Printer::Bambu { name, host, serial_number, access_code, } => { info!(name, host, "Found Bambu"); tokio::spawn(async move { match tokio::spawn(poll_bambu( name.clone(), host, serial_number, access_code, state, )) .await { Ok(()) => warn!(name, "Bambu polling task exited unexpectedly"), Err(e) => error!(name, error = ?e, "Bambu polling task panicked"), } }); } } } let app = Router::new().route("/", get(root)).with_state(state); let listener = tokio::net::TcpListener::bind(&args.bind) .await .unwrap_or_else(|e| { error!(addr = %args.bind, error = %e, "Failed to bind to address"); process::exit(1); }); info!(addr = %args.bind, "Listening"); axum::serve(listener, app).await.unwrap_or_else(|e| { error!(error = %e, "HTTP server error"); process::exit(1); }); } async fn fetch_prusa( client: &Client, name: &str, host: &str, api_key: &str, state: &StateMap, ) -> Result<(), Box> { let response = client .get(format!("http://{}/api/v1/status", host)) .header("X-Api-Key", api_key) .send() .await? .json::() .await?; let mut lock = state.write().unwrap(); lock.entry(name.to_owned()) .or_default() .update_from(&response); Ok(()) } async fn poll_prusa(name: String, host: String, api_key: String, state: StateMap) { let client = Client::builder() .timeout(Duration::from_secs(10)) .build() .unwrap_or_else(|e| { error!(name, error = %e, "Failed to build HTTP client"); process::exit(1); }); loop { if let Err(e) = fetch_prusa(&client, &name, &host, &api_key, &state).await { error!(name, error = %e, "Error polling Prusa printer"); } tokio::time::sleep(Duration::from_secs(5)).await; } } async fn poll_bambu( name: String, host: String, serial_number: String, access_code: String, state: StateMap, ) { let mut mqttoptions = MqttOptions::new(&name, &host, 8883); mqttoptions.set_keep_alive(Duration::from_secs(5)); mqttoptions.set_credentials("bblp", &access_code); // Bambu printers use TLS with a self-signed certificate let connector = match TlsConnector::builder() .danger_accept_invalid_certs(true) .build() { Ok(c) => c, Err(e) => { error!(name, error = %e, "Failed to build TLS connector"); process::exit(1); } }; mqttoptions.set_transport(Transport::tls_with_config(connector.into())); let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); loop { // eventloop.poll() yields back to Tokio when there's no data match eventloop.poll().await { Ok(Event::Incoming(Packet::ConnAck(_))) => { if let Err(e) = client .subscribe(format!("device/{}/report", serial_number), QoS::AtMostOnce) .await { error!(name, error = %e, "Failed to subscribe to Bambu MQTT topic"); } // Send a request to push all values to us; otherwise we'll have to wait for the values to come in incremental updates. if let Err(e) = client .publish( format!("device/{}/request", serial_number), QoS::AtMostOnce, false, serde_json::json!({ "pushing": { "sequence_id": "0", "command": "pushall", "version": 1, "push_target": 1 } }) .to_string(), ) .await { error!(name, error = %e, "Failed to send pushall to Bambu printer"); } } Ok(Event::Incoming(Packet::Publish(p))) => { debug!(payload = ?p.payload, "Received Bambu payload"); match serde_json::from_slice::(&p.payload) { Ok(msg) => { let mut lock = state.write().unwrap(); lock.entry(name.clone()).or_default().update_from(&msg); debug!(name, "Updated state"); } Err(e) => error!(error = %e, "Failed to deserialize BambuStatus"), } } Ok(_) => {} Err(e) => { error!(error = ?e, "MQTT error"); tokio::time::sleep(Duration::from_secs(5)).await; // Simple retry } } } } async fn root(State(state): State) -> Json> { Json(state.read().unwrap().clone()) }