Files
bioz-host-rs/src/communication.rs

203 lines
9.0 KiB
Rust

use log::{error, info};
use tokio::select;
use tokio::sync::mpsc::{Sender, Receiver};
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use atomic_float::AtomicF32;
use std::sync::{Arc, Mutex};
use bioz_icd_rs::MeasurementPointSet;
use crate::icd;
use crate::client::WorkbookClient;
use crate::plot::{TimeSeriesPlot, BodePlot};
use crate::signals::StartStopSignal;
pub async fn communicate_with_hardware(
mut run_impedancemeter_rx: Receiver<StartStopSignal>,
run_impedancemeter_tx: Sender<StartStopSignal>,
magnitude: Arc<Mutex<f32>>,
phase: Arc<Mutex<f32>>,
magnitude_series: Arc<Mutex<TimeSeriesPlot>>,
phase_series: Arc<Mutex<TimeSeriesPlot>>,
bode_series: Arc<Mutex<BodePlot>>,
connected: Arc<AtomicBool>,
data_frequency: Arc<AtomicF32>,
periods_per_dft: Arc<Mutex<Option<f32>>>,
periods_per_dft_multi: Arc<Mutex<(Vec<f32>, Option<Vec<f32>>)>>,
) {
let data_counter = Arc::new(AtomicU32::new(0));
let data_counter_clone = data_counter.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
data_frequency.store(data_counter.load(Ordering::Relaxed) as f32, Ordering::Relaxed);
data_counter.store(0, Ordering::Relaxed);
}
});
#[derive(Default)]
struct Settings {
frequency: Option<StartStopSignal>,
}
let mut settings = Settings::default();
loop {
let workbook_client = match WorkbookClient::new() {
Ok(client) => {
info!("Connected to hardware successfully.");
if let Some(frequency) = settings.frequency {
run_impedancemeter_tx.send(frequency).await.unwrap();
}
connected.store(true, Ordering::Relaxed);
client
},
Err(e) => {
error!("Failed to connect to hardware: {:?}", e);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
continue;
}
};
// Subscribe to SingleImpedanceOutputTopic
let mut single_impedance_sub = workbook_client
.client
.subscribe_multi::<icd::SingleImpedanceOutputTopic>(8)
.await
.unwrap();
let data = (magnitude_series.clone(), phase_series.clone(), magnitude.clone(), phase.clone());
let data_counter_clone_single = data_counter_clone.clone();
tokio::spawn(async move {
while let Ok(val) = single_impedance_sub.recv().await {
let mut mag_plot = data.0.lock().unwrap();
let mut phase_plot = data.1.lock().unwrap();
let mut mag_val = data.2.lock().unwrap();
let mut phase_val = data.3.lock().unwrap();
*mag_val = val.magnitude;
*phase_val = val.phase;
mag_plot.add(val.magnitude as f64);
phase_plot.add(val.phase as f64);
data_counter_clone_single.fetch_add(1, Ordering::Relaxed);
}
info!("SingleImpedanceOutputTopic subscription ended.");
});
// Subscribe to MultiImpedanceOutputTopic8
let mut multi_impedance_sub = workbook_client
.client
.subscribe_multi::<icd::MultiImpedanceOutputTopic>(8)
.await
.unwrap();
let data = bode_series.clone();
let data_counter_clone_multi = data_counter_clone.clone();
tokio::spawn(async move {
while let Ok(val) = multi_impedance_sub.recv().await {
let mut bode_plot = data.lock().unwrap();
match val.points {
MeasurementPointSet::Eight => {
let magnitudes: Vec<f32> = val.magnitudes_8.into_iter().collect();
let phases: Vec<f32> = val.phases_8.into_iter().collect();
bode_plot.update_magnitudes(MeasurementPointSet::Eight, magnitudes);
bode_plot.update_phases(MeasurementPointSet::Eight, phases);
},
MeasurementPointSet::Eighteen => {
let magnitudes: Vec<f32> = val.magnitudes_18.into_iter().collect();
let phases: Vec<f32> = val.phases_18.into_iter().collect();
bode_plot.update_magnitudes(MeasurementPointSet::Eighteen, magnitudes);
bode_plot.update_phases(MeasurementPointSet::Eighteen, phases);
},
}
data_counter_clone_multi.fetch_add(1, Ordering::Relaxed);
}
});
loop {
select! {
Some(frequency) = run_impedancemeter_rx.recv() => {
match frequency {
StartStopSignal::StartSingle(freq, dft_num) => {
match workbook_client.start_impedancemeter_single(freq, dft_num).await {
Ok(Ok(periods)) => {
info!("Impedance meter started at frequency: {} with periods per DFT: {}", freq, periods);
settings.frequency = Some(StartStopSignal::StartSingle(freq, dft_num));
*periods_per_dft.lock().unwrap() = Some(periods);
},
Ok(Err(e)) => {
error!("Failed to init on hardware: {:?}", e);
*periods_per_dft.lock().unwrap() = None;
},
Err(e) => {
error!("Communication error when starting impedancemeter: {:?}", e);
*periods_per_dft.lock().unwrap() = None;
}
}
},
StartStopSignal::StartMulti(num_points) => {
match workbook_client.start_impedancemeter_multi(num_points).await {
Ok(Ok(periods)) => {
settings.frequency = Some(StartStopSignal::StartMulti(num_points));
info!("Multi-point Impedancemeter started.");
match num_points {
MeasurementPointSet::Eight => {
*periods_per_dft_multi.lock().unwrap() = (num_points.values().iter().copied().collect(), Some(periods.periods_per_dft_8.into_iter().collect()));
},
MeasurementPointSet::Eighteen => {
*periods_per_dft_multi.lock().unwrap() = (num_points.values().iter().copied().collect(), Some(periods.periods_per_dft_18.into_iter().collect()));
},
}
},
Ok(Err(e)) => {
error!("Failed to multi-init on hardware: {:?}", e);
*periods_per_dft_multi.lock().unwrap() = (num_points.values().iter().copied().collect(), None);
},
Err(e) => {
error!("Communication error when starting impedancemeter: {:?}", e);
*periods_per_dft_multi.lock().unwrap() = (num_points.values().iter().copied().collect(), None);
}
}
},
StartStopSignal::Stop => {
if let Err(e) = workbook_client.stop_impedancemeter().await {
error!("Failed to stop impedancemeter: {:?}", e);
} else {
settings.frequency = Some(StartStopSignal::Stop);
*periods_per_dft.lock().unwrap() = None;
let (freq, _) = periods_per_dft_multi.lock().unwrap().clone();
*periods_per_dft_multi.lock().unwrap() = (freq, None);
info!("Impedancemeter stopped.");
}
},
}
}
_ = workbook_client.wait_closed() => {
// Handle client closure
info!("Client connection closed.");
break;
}
else => {
// All channels closed
break;
}
}
}
info!("Communication with hardware ended.");
connected.store(false, Ordering::Relaxed);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}