use std::time::SystemTime; use log::{error, info}; use tokio::select; use tokio::sync::mpsc::{Receiver, Sender}; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use atomic_float::AtomicF32; use std::sync::{Arc, Mutex}; use bioz_icd_rs::MeasurementPointSet; use crate::icd; use crate::client::WorkbookClient; use crate::plot::{TimeSeriesPlot, BodePlot}; use crate::signals::{LoggingSignal, StartStopSignal}; pub async fn communicate_with_hardware( mut run_impedancemeter_rx: Receiver, run_impedancemeter_tx: Sender, magnitude: Arc>, phase: Arc>, magnitude_series: Arc>, phase_series: Arc>, bode_series: Arc>, connected: Arc, data_frequency: Arc, periods_per_dft: Arc>>, periods_per_dft_sweep: Arc, Option>)>>, gui_logging_enabled: Arc, log_tx: Sender, ) { let data_counter = Arc::new(AtomicU32::new(0)); let data_counter_clone = data_counter.clone(); tokio::spawn(async move { loop { tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; data_frequency.store(data_counter.load(Ordering::Relaxed) as f32, Ordering::Relaxed); data_counter.store(0, Ordering::Relaxed); } }); #[derive(Default, Clone, Copy)] struct Settings { frequency: Option, } let settings = Arc::new(Mutex::new(Settings::default())); loop { let workbook_client = match WorkbookClient::new() { Ok(client) => { info!("Connected to hardware successfully."); if let Some(frequency) = settings.lock().unwrap().frequency { run_impedancemeter_tx.send(frequency).await.unwrap(); } connected.store(true, Ordering::Relaxed); client }, Err(e) => { error!("Failed to connect to hardware: {:?}", e); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; continue; } }; // Subscribe to SingleImpedanceOutputTopic let mut single_impedance_sub = workbook_client .client .subscribe_multi::(8) .await .unwrap(); let data = (magnitude_series.clone(), phase_series.clone(), magnitude.clone(), phase.clone()); let data_counter_clone_single = data_counter_clone.clone(); // Clone log_tx for the task let gui_logging_enabled_clone = gui_logging_enabled.clone(); let log_tx_clone = log_tx.clone(); let settings_clone = settings.clone(); tokio::spawn(async move { while let Ok(val) = single_impedance_sub.recv().await { { let mut mag_plot = data.0.lock().unwrap(); let mut phase_plot = data.1.lock().unwrap(); let mut mag_val = data.2.lock().unwrap(); let mut phase_val = data.3.lock().unwrap(); *mag_val = val.magnitude; *phase_val = val.phase; mag_plot.add(val.magnitude as f64); phase_plot.add(val.phase as f64); data_counter_clone_single.fetch_add(1, Ordering::Relaxed); } // Send logging signal if gui_logging_enabled_clone.load(Ordering::Relaxed) { let settings = *settings_clone.lock().unwrap(); match settings.frequency { Some(StartStopSignal::StartSingle(freq, _, _)) => { if let Err(e) = log_tx_clone.send(LoggingSignal::SingleImpedance(SystemTime::now(), freq, val.magnitude, val.phase)).await { error!("Failed to send logging signal: {:?}", e); } }, _ => { error!("Frequency not set for single impedance logging."); }, } // if let Err(e) = log_tx_clone.send(LoggingSignal::SingleImpedance(SystemTime::now(), settings.frequency, val.magnitude, val.phase)).await { // error!("Failed to send logging signal: {:?}", e); // } } } info!("SingleImpedanceOutputTopic subscription ended."); }); // Subscribe to SweepImpedanceOutputTopic8 let mut sweep_impedance_sub = workbook_client .client .subscribe_multi::(8) .await .unwrap(); let data = bode_series.clone(); let data_counter_clone_sweep = data_counter_clone.clone(); // Clone log_tx for the task let gui_logging_enabled_clone = gui_logging_enabled.clone(); let log_tx_clone = log_tx.clone(); tokio::spawn(async move { while let Ok(val) = sweep_impedance_sub.recv().await { match val.points { MeasurementPointSet::Eight => { let magnitudes: Vec = val.magnitudes_8.into_iter().collect(); let phases: Vec = val.phases_8.into_iter().collect(); { let mut bode_plot = data.lock().unwrap(); bode_plot.update_magnitudes(MeasurementPointSet::Eight, magnitudes.clone()); bode_plot.update_phases(MeasurementPointSet::Eight, phases.clone()); } if gui_logging_enabled_clone.load(Ordering::Relaxed) { if let Err(e) = log_tx_clone.send(LoggingSignal::SweepImpedance(SystemTime::now(), MeasurementPointSet::Eight.values().to_vec(), magnitudes.clone(), phases.clone())).await { error!("Failed to send logging signal: {:?}", e); } } }, MeasurementPointSet::Eighteen => { let magnitudes: Vec = val.magnitudes_18.into_iter().collect(); let phases: Vec = val.phases_18.into_iter().collect(); { let mut bode_plot = data.lock().unwrap(); bode_plot.update_magnitudes(MeasurementPointSet::Eighteen, magnitudes.clone()); bode_plot.update_phases(MeasurementPointSet::Eighteen, phases.clone()); } if gui_logging_enabled_clone.load(Ordering::Relaxed) { if let Err(e) = log_tx_clone.send(LoggingSignal::SweepImpedance(SystemTime::now(), MeasurementPointSet::Eighteen.values().to_vec(), magnitudes.clone(), phases.clone())).await { error!("Failed to send logging signal: {:?}", e); } } }, } data_counter_clone_sweep.fetch_add(1, Ordering::Relaxed); } }); loop { select! { Some(frequency) = run_impedancemeter_rx.recv() => { match frequency { StartStopSignal::StartSingle(freq, lead_mode, dft_num) => { match workbook_client.start_impedancemeter_single(freq, lead_mode, dft_num).await { Ok(Ok(periods)) => { info!("Impedance meter started at frequency: {} with periods per DFT: {}", freq, periods); settings.lock().unwrap().frequency = Some(StartStopSignal::StartSingle(freq, lead_mode, dft_num)); *periods_per_dft.lock().unwrap() = Some(periods); }, Ok(Err(e)) => { error!("Failed to init on hardware: {:?}", e); *periods_per_dft.lock().unwrap() = None; }, Err(e) => { error!("Communication error when starting impedancemeter: {:?}", e); *periods_per_dft.lock().unwrap() = None; } } }, StartStopSignal::StartSweep(lead_mode, num_points) => { match workbook_client.start_impedancemeter_sweep(lead_mode, num_points).await { Ok(Ok(periods)) => { settings.lock().unwrap().frequency = Some(StartStopSignal::StartSweep(lead_mode, num_points)); info!("Sweep Impedancemeter started."); match num_points { MeasurementPointSet::Eight => { *periods_per_dft_sweep.lock().unwrap() = (num_points.values().iter().copied().collect(), Some(periods.periods_per_dft_8.into_iter().collect())); }, MeasurementPointSet::Eighteen => { *periods_per_dft_sweep.lock().unwrap() = (num_points.values().iter().copied().collect(), Some(periods.periods_per_dft_18.into_iter().collect())); }, } }, Ok(Err(e)) => { error!("Failed to sweep-init on hardware: {:?}", e); *periods_per_dft_sweep.lock().unwrap() = (num_points.values().iter().copied().collect(), None); }, Err(e) => { error!("Communication error when starting impedancemeter: {:?}", e); *periods_per_dft_sweep.lock().unwrap() = (num_points.values().iter().copied().collect(), None); } } }, StartStopSignal::Stop => { if let Err(e) = workbook_client.stop_impedancemeter().await { error!("Failed to stop impedancemeter: {:?}", e); } else { settings.lock().unwrap().frequency = Some(StartStopSignal::Stop); *periods_per_dft.lock().unwrap() = None; let (freq, _) = periods_per_dft_sweep.lock().unwrap().clone(); *periods_per_dft_sweep.lock().unwrap() = (freq, None); info!("Impedancemeter stopped."); } }, } } _ = workbook_client.wait_closed() => { // Handle client closure info!("Client connection closed."); break; } else => { // All channels closed break; } } } info!("Communication with hardware ended."); connected.store(false, Ordering::Relaxed); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; } }