Skip to content

Commit ee01d4b

Browse files
committed
feat(Source Device): add async implementation
1 parent 46dcdc2 commit ee01d4b

35 files changed

Lines changed: 716 additions & 632 deletions

src/dbus/interface/source/iio_imu.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ impl SourceIioImuInterface {
2121
pub async fn listen_on_dbus(
2222
conn: Connection,
2323
device: UdevDevice,
24-
) -> Result<(), Box<dyn Error>> {
24+
) -> Result<(), Box<dyn Error + Send + Sync>> {
2525
let iface = SourceIioImuInterface::new(device);
2626
let Ok(id) = iface.id() else {
2727
return Ok(());

src/drivers/iio_imu/driver.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
use std::{collections::HashMap, error::Error, time::Duration};
1+
use std::{
2+
collections::HashMap,
3+
error::Error,
4+
sync::{Arc, Mutex},
5+
time::Duration,
6+
};
27

38
use industrial_io::{Channel, ChannelType, Device, Direction};
49

@@ -12,9 +17,9 @@ use super::{
1217
/// Driver for reading IIO IMU data
1318
pub struct Driver {
1419
mount_matrix: MountMatrix,
15-
accel: HashMap<String, Channel>,
20+
accel: HashMap<String, Arc<Mutex<Channel>>>,
1621
accel_info: HashMap<String, AxisInfo>,
17-
gyro: HashMap<String, Channel>,
22+
gyro: HashMap<String, Arc<Mutex<Channel>>>,
1823
gyro_info: HashMap<String, AxisInfo>,
1924
pub sample_delay: Duration,
2025
}
@@ -116,7 +121,7 @@ impl Driver {
116121
let Some(info) = self.accel_info.get(id) else {
117122
continue;
118123
};
119-
let data = channel.attr_read_int("raw")?;
124+
let data = channel.lock().unwrap().attr_read_int("raw")?;
120125

121126
// processed_value = (raw + offset) * scale
122127
let value = (data + info.offset) as f64 * info.scale;
@@ -144,7 +149,7 @@ impl Driver {
144149
let Some(info) = self.gyro_info.get(id) else {
145150
continue;
146151
};
147-
let data = channel.attr_read_int("raw")?;
152+
let data = channel.lock().unwrap().attr_read_int("raw")?;
148153

149154
// processed_value = (raw + offset) * scale
150155
let value = (data + info.offset) as f64 * info.scale;
@@ -194,7 +199,10 @@ impl Driver {
194199
fn get_channels_with_type(
195200
device: &Device,
196201
channel_type: ChannelType,
197-
) -> (HashMap<String, Channel>, HashMap<String, AxisInfo>) {
202+
) -> (
203+
HashMap<String, Arc<Mutex<Channel>>>,
204+
HashMap<String, AxisInfo>,
205+
) {
198206
let mut channels = HashMap::new();
199207
let mut channel_info = HashMap::new();
200208
device
@@ -276,8 +284,11 @@ fn get_channels_with_type(
276284
scales_avail,
277285
};
278286
channel_info.insert(id.clone(), info);
279-
channels.insert(id, channel);
287+
channels.insert(id, Arc::new(Mutex::new(channel)));
280288
});
281289

282290
(channels, channel_info)
283291
}
292+
293+
// A mutex is used to access channels
294+
unsafe impl Send for Driver {}

src/input/composite_device/mod.rs

Lines changed: 75 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use zbus::Connection;
2222

2323
use crate::{
2424
config::{
25-
capability_map::CapabilityMapConfig, path::get_profiles_path, CompositeDeviceConfig,
25+
self, capability_map::CapabilityMapConfig, path::get_profiles_path, CompositeDeviceConfig,
2626
DeviceProfile, ProfileMapping,
2727
},
2828
dbus::interface::{
@@ -40,7 +40,7 @@ use crate::{
4040
evdev::EventDevice, hidraw::HidRawDevice, iio::IioDevice, led::LedDevice, SourceDevice,
4141
},
4242
},
43-
udev::{device::UdevDevice, hide_device, unhide_device},
43+
udev::device::UdevDevice,
4444
};
4545

4646
use self::{client::CompositeDeviceClient, command::CompositeCommand};
@@ -156,7 +156,7 @@ pub struct CompositeDevice {
156156
}
157157

158158
impl CompositeDevice {
159-
pub fn new(
159+
pub async fn new(
160160
conn: Connection,
161161
manager: mpsc::Sender<ManagerCommand>,
162162
config: CompositeDeviceConfig,
@@ -240,7 +240,7 @@ impl CompositeDevice {
240240
}
241241
}
242242

243-
if let Err(e) = device.add_source_device(device_info) {
243+
if let Err(e) = device.add_source_device(device_info).await {
244244
return Err(e.to_string().into());
245245
}
246246

@@ -277,9 +277,6 @@ impl CompositeDevice {
277277

278278
let dbus_path = self.dbus_path.clone();
279279

280-
// Start all source devices
281-
self.run_source_devices().await?;
282-
283280
// Set persist value from config if set, used to determine
284281
// if CompositeDevice self-closes after all SourceDevices have
285282
// been removed.
@@ -537,18 +534,6 @@ impl CompositeDevice {
537534
log::debug!("Stopping target devices");
538535
self.targets.stop().await;
539536

540-
// Unhide all source devices
541-
for source_path in self.source_device_paths.clone() {
542-
if source_path.starts_with("/sys/bus/iio/devices") {
543-
log::debug!("Skipping unhiding IIO device: {source_path}");
544-
continue;
545-
}
546-
log::debug!("Un-hiding device: {}", source_path);
547-
if let Err(e) = unhide_device(source_path.clone()).await {
548-
log::debug!("Unable to unhide device {source_path}: {:?}", e);
549-
}
550-
}
551-
552537
// Send stop command to all source devices
553538
for (path, source) in &self.source_devices {
554539
log::debug!("Stopping source device: {path}");
@@ -590,54 +575,6 @@ impl CompositeDevice {
590575
self.source_device_paths.clone()
591576
}
592577

593-
/// Start and run the source devices that this composite device will
594-
/// consume.
595-
async fn run_source_devices(&mut self) -> Result<(), Box<dyn Error>> {
596-
// Hide the device if specified
597-
for source_path in self.source_devices_to_hide.drain(..) {
598-
log::debug!("Hiding device: {}", source_path);
599-
if let Err(e) = hide_device(source_path.as_str()).await {
600-
log::warn!("Failed to hide device '{source_path}': {e:?}");
601-
}
602-
log::debug!("Finished hiding device: {source_path}");
603-
}
604-
605-
log::debug!("Starting new source devices");
606-
// Start listening for events from all source devices
607-
let sources = self.source_devices_discovered.drain(..);
608-
for source_device in sources {
609-
let device_id = source_device.get_id();
610-
// If the source device is blocked, don't bother running it
611-
if self.source_devices_blocked.contains(&device_id) {
612-
log::debug!("Source device '{device_id}' blocked. Skipping running.");
613-
continue;
614-
}
615-
616-
let source_tx = source_device.client();
617-
self.source_devices.insert(device_id.clone(), source_tx);
618-
let tx = self.tx.clone();
619-
620-
// Add the IIO IMU Dbus interface. We do this here because it needs the source
621-
// device transmitter and this is the only place we can refrence it at the moment.
622-
let device = source_device.get_device_ref().clone();
623-
if let SourceDevice::Iio(_) = source_device {
624-
SourceIioImuInterface::listen_on_dbus(self.conn.clone(), device.clone()).await?;
625-
}
626-
627-
self.source_device_tasks.spawn(async move {
628-
if let Err(e) = source_device.run().await {
629-
log::error!("Failed running device: {:?}", e);
630-
}
631-
log::debug!("Source device closed");
632-
if let Err(e) = tx.send(CompositeCommand::SourceDeviceStopped(device)).await {
633-
log::error!("Failed to send device stop command: {:?}", e);
634-
}
635-
});
636-
}
637-
log::debug!("All source device tasks started");
638-
Ok(())
639-
}
640-
641578
/// Process a single event from a source device. Events are piped through
642579
/// a translation layer, then dispatched to the appropriate target device(s)
643580
async fn process_event(
@@ -1459,10 +1396,9 @@ impl CompositeDevice {
14591396

14601397
/// Executed whenever a source device is added to this [CompositeDevice].
14611398
async fn on_source_device_added(&mut self, device: UdevDevice) -> Result<(), Box<dyn Error>> {
1462-
if let Err(e) = self.add_source_device(device) {
1399+
if let Err(e) = self.add_source_device(device).await {
14631400
return Err(e.to_string().into());
14641401
}
1465-
self.run_source_devices().await?;
14661402

14671403
// Signal to DBus that source devices have changed
14681404
self.signal_sources_changed().await;
@@ -1525,72 +1461,52 @@ impl CompositeDevice {
15251461
Ok(())
15261462
}
15271463

1528-
/// Creates and adds a source device using the given [SourceDeviceInfo]
1529-
fn add_source_device(
1464+
/// Creates, adds, and starts a source device using the given [SourceDeviceInfo]
1465+
async fn add_source_device(
15301466
&mut self,
15311467
device: UdevDevice,
15321468
) -> Result<(), Box<dyn Error + Send + Sync>> {
15331469
// Check to see if this source device should be blocked.
1534-
let mut is_blocked = false;
1535-
let mut is_blocked_evdev = false;
15361470
let source_config = self.config.get_matching_device(&device);
1537-
if let Some(source_config) = source_config.as_ref() {
1538-
if let Some(blocked) = source_config.blocked {
1539-
is_blocked = blocked;
1540-
}
1541-
}
1542-
1543-
let subsystem = device.subsystem();
1544-
1545-
// Hide the device if specified
1546-
let should_passthru = source_config
1471+
let is_blocked = source_config
15471472
.as_ref()
1548-
.and_then(|c| c.passthrough)
1473+
.and_then(|conf| conf.blocked)
15491474
.unwrap_or(false);
1550-
let should_hide = !should_passthru && subsystem.as_str() != "iio";
1551-
if should_hide {
1552-
let source_path = device.devnode();
1553-
self.source_devices_to_hide.push(source_path);
1475+
1476+
// Create the source device
1477+
let client = self.client();
1478+
let source_device = Self::create_source_device(client, device.clone(), source_config)?;
1479+
let id = source_device.get_id();
1480+
let capabilities = source_device.get_capabilities()?;
1481+
let path = source_device.get_device_path();
1482+
let source_tx = source_device.client();
1483+
self.source_devices.insert(id.clone(), source_tx);
1484+
1485+
// Add the IIO IMU Dbus interface. We do this here because it needs the source
1486+
// device transmitter and this is the only place we can refrence it at the moment.
1487+
if let SourceDevice::Iio(_) = source_device {
1488+
SourceIioImuInterface::listen_on_dbus(self.conn.clone(), device.clone()).await?;
15541489
}
15551490

1556-
let source_device = match subsystem.as_str() {
1557-
"input" => {
1558-
log::debug!("Adding EVDEV source device: {:?}", device.name());
1559-
if is_blocked {
1560-
is_blocked_evdev = true;
1561-
}
1562-
let device = EventDevice::new(device, self.client(), source_config.clone())?;
1563-
SourceDevice::Event(device)
1564-
}
1565-
"hidraw" => {
1566-
log::debug!("Adding HIDRAW source device: {:?}", device.name());
1567-
let device = HidRawDevice::new(device, self.client(), source_config.clone())?;
1568-
SourceDevice::HidRaw(device)
1569-
}
1570-
"iio" => {
1571-
log::debug!("Adding IIO source device: {:?}", device.name());
1572-
let device = IioDevice::new(device, self.client(), source_config.clone())?;
1573-
SourceDevice::Iio(device)
1574-
}
1575-
"leds" => {
1576-
log::debug!("Adding LED source device: {:?}", device.sysname());
1577-
let device = LedDevice::new(device, self.client(), source_config.clone())?;
1578-
SourceDevice::Led(device)
1579-
}
1580-
_ => {
1581-
return Err(format!(
1582-
"Unspported subsystem: {subsystem}, unable to add source device {}",
1583-
device.name()
1584-
)
1585-
.into())
1491+
// Spawn the task and run the source device
1492+
let composite_device_tx = self.tx.clone();
1493+
let device_info = device.clone();
1494+
let device_path = path.clone();
1495+
tokio::task::spawn(async move {
1496+
// Run the source device
1497+
if let Err(e) = source_device.run().await {
1498+
log::error!("Failed running device: {e:?}");
15861499
}
1587-
};
1500+
log::debug!("Source device closed: {device_path}");
1501+
composite_device_tx
1502+
.send(CompositeCommand::SourceDeviceStopped(device_info))
1503+
.await;
1504+
});
15881505

15891506
// Get the capabilities of the source device.
15901507
// TODO: When we *remove* a source device, we also need to remove
15911508
// capabilities
15921509
if !is_blocked {
1593-
let capabilities = source_device.get_capabilities()?;
15941510
for cap in capabilities {
15951511
if self.translatable_capabilities.contains(&cap) {
15961512
continue;
@@ -1600,15 +1516,11 @@ impl CompositeDevice {
16001516
}
16011517

16021518
// Check if this device should be blocked from sending events to target devices.
1603-
let id = source_device.get_id();
1604-
if let Some(device_config) = self
1605-
.config
1606-
.get_matching_device(source_device.get_device_ref())
1607-
{
1519+
if let Some(device_config) = self.config.get_matching_device(&device) {
16081520
if let Some(blocked) = device_config.blocked {
16091521
// Blocked event devices should still be run so they can be
16101522
// EVIOGRAB'd
1611-
if blocked && !is_blocked_evdev {
1523+
if blocked {
16121524
self.source_devices_blocked.insert(id.clone());
16131525
}
16141526
}
@@ -1617,14 +1529,48 @@ impl CompositeDevice {
16171529
// TODO: Based on the capability map in the config, translate
16181530
// the capabilities.
16191531
// Keep track of the source device
1620-
let device_path = source_device.get_device_path();
1621-
self.source_devices_discovered.push(source_device);
1622-
self.source_device_paths.push(device_path);
1532+
self.source_device_paths.push(path);
16231533
self.source_devices_used.push(id);
16241534

16251535
Ok(())
16261536
}
16271537

1538+
/// Create the source device from the given udev device and configuration
1539+
fn create_source_device(
1540+
client: CompositeDeviceClient,
1541+
device: UdevDevice,
1542+
config: Option<config::SourceDevice>,
1543+
) -> Result<SourceDevice, Box<dyn Error + Send + Sync>> {
1544+
let subsystem = device.subsystem();
1545+
match subsystem.as_str() {
1546+
"input" => {
1547+
log::debug!("Adding EVDEV source device: {:?}", device.name());
1548+
let device = EventDevice::new(device, client, config)?;
1549+
Ok(SourceDevice::Event(device))
1550+
}
1551+
"hidraw" => {
1552+
log::debug!("Adding HIDRAW source device: {:?}", device.name());
1553+
let device = HidRawDevice::new(device, client, config)?;
1554+
Ok(SourceDevice::HidRaw(device))
1555+
}
1556+
"iio" => {
1557+
log::debug!("Adding IIO source device: {:?}", device.name());
1558+
let device = IioDevice::new(device, client, config)?;
1559+
Ok(SourceDevice::Iio(device))
1560+
}
1561+
"leds" => {
1562+
log::debug!("Adding LED source device: {:?}", device.sysname());
1563+
let device = LedDevice::new(device, client, config)?;
1564+
Ok(SourceDevice::Led(device))
1565+
}
1566+
_ => Err(format!(
1567+
"Unspported subsystem: {subsystem}, unable to add source device {}",
1568+
device.name()
1569+
)
1570+
.into()),
1571+
}
1572+
}
1573+
16281574
/// Load the given device profile
16291575
pub fn load_device_profile(
16301576
&mut self,

src/input/manager.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,8 @@ impl Manager {
551551
device,
552552
self.next_composite_dbus_path()?,
553553
capability_map,
554-
)?;
554+
)
555+
.await?;
555556

556557
// Check to see if there's already a CompositeDevice for
557558
// these source devices.
@@ -809,7 +810,7 @@ impl Manager {
809810
let composite_path = String::from(device.dbus_path());
810811
let composite_path_clone = composite_path.clone();
811812
let tx = self.tx.clone();
812-
let task = tokio::spawn(async move {
813+
let task = tokio::task::spawn(async move {
813814
if let Err(e) = device.run().await {
814815
log::error!("Error running {composite_path}: {}", e.to_string());
815816
}

0 commit comments

Comments
 (0)