Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion src-tauri/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src-tauri/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ uuid = { version = "1", features = ["v4", "serde"] }
chrono = { version = "0.4", features = ["serde"] }
anyhow = "1"
thiserror = "1"
base64 = "0.21"
base64 = "0.22"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "chrono"] }
network-interface = "2.0.5"

71 changes: 71 additions & 0 deletions src-tauri/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,21 @@ pub async fn connect_to_master(
// Create SlaveClient
let slave_client = Arc::new(SlaveClient::new(config.host.clone(), config.port));

// Set up connection status callback to emit Tauri events
let app_handle_for_callback = state.app_handle.clone();
slave_client
.set_connection_status_callback(move |is_connected| {
let app_handle = app_handle_for_callback.clone();
tokio::spawn(async move {
if let Some(handle) = app_handle.read().await.as_ref() {
if let Err(e) = handle.emit("slave-connection-status", is_connected) {
eprintln!("Failed to emit slave connection status event: {}", e);
}
}
});
})
.await;

// Connect to master and get sync message receiver and sender
let (sync_rx, send_tx) = slave_client
.connect()
Expand Down Expand Up @@ -536,6 +551,15 @@ pub async fn disconnect_from_master(state: State<'_, AppState>) -> Result<(), St
Ok(())
}

#[tauri::command]
pub async fn is_slave_connected(state: State<'_, AppState>) -> Result<bool, String> {
if let Some(client) = state.slave_client.read().await.as_ref() {
Ok(client.is_connected().await)
} else {
Ok(false)
}
}

#[tauri::command]
pub async fn get_slave_reconnection_status(
state: State<'_, AppState>,
Expand Down Expand Up @@ -692,6 +716,53 @@ pub async fn get_performance_metrics(
Ok(state.performance_monitor.get_metrics().await)
}

#[tauri::command]
pub fn get_local_ip_address() -> Result<String, String> {
use network_interface::{NetworkInterface, NetworkInterfaceConfig};

let interfaces =
NetworkInterface::show().map_err(|e| format!("Failed to get network interfaces: {}", e))?;

// Prefer Ethernet interfaces (eth*, en* on Linux/macOS, ETHERNET on Windows)
// Fallback to any non-loopback IPv4 address
for iface in &interfaces {
let name_lower = iface.name.to_lowercase();

// Skip loopback interfaces (lo, loopback, etc.)
if name_lower.contains("loopback") || name_lower.starts_with("lo") {
continue;
}

// Check for IPv4 address (addr is Vec<Addr> in v2.0.5)
for addr in &iface.addr {
if let network_interface::Addr::V4(v4_addr) = addr {
// Prefer Ethernet interfaces (starts with "eth", "en", or contains "ethernet")
if name_lower.starts_with("eth")
|| name_lower.starts_with("en")
|| name_lower.contains("ethernet")
{
return Ok(v4_addr.ip.to_string());
}
}
}
}

// Fallback: return first non-loopback IPv4 address
for iface in interfaces {
let name_lower = iface.name.to_lowercase();

if !name_lower.contains("loopback") && !name_lower.starts_with("lo") {
for addr in &iface.addr {
if let network_interface::Addr::V4(v4_addr) = addr {
return Ok(v4_addr.ip.to_string());
}
}
}
}

Err("No network interface with IPv4 address found".to_string())
}

#[tauri::command]
pub fn greet(name: &str) -> String {
format!("Hello, {}! You've been greeted from Rust!", name)
Expand Down
2 changes: 2 additions & 0 deletions src-tauri/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub fn run() {
commands::stop_master_server,
commands::connect_to_master,
commands::disconnect_from_master,
commands::is_slave_connected,
commands::set_sync_targets,
commands::get_connected_clients_count,
commands::get_connected_clients_info,
Expand All @@ -69,6 +70,7 @@ pub fn run() {
commands::get_log_file_path,
commands::open_log_file,
commands::get_performance_metrics,
commands::get_local_ip_address,
])
.run(tauri::generate_context!())
.expect("error while running tauri application");
Expand Down
92 changes: 90 additions & 2 deletions src-tauri/src/network/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ pub struct ReconnectionStatus {
pub last_error: Option<String>,
}

type ConnectionStatusCallback = Arc<dyn Fn(bool) + Send + Sync>;

#[derive(Clone)]
pub struct SlaveClient {
host: String,
port: u16,
Expand All @@ -29,6 +32,8 @@ pub struct SlaveClient {
sync_message_tx: Arc<RwLock<Option<mpsc::UnboundedSender<SyncMessage>>>>,
reconnection_status: Arc<RwLock<ReconnectionStatus>>,
current_attempt: Arc<AtomicU32>,
is_connected: Arc<AtomicBool>,
connection_status_callback: Arc<RwLock<Option<ConnectionStatusCallback>>>,
}

impl SlaveClient {
Expand All @@ -48,6 +53,33 @@ impl SlaveClient {
last_error: None,
})),
current_attempt: Arc::new(AtomicU32::new(0)),
is_connected: Arc::new(AtomicBool::new(false)),
connection_status_callback: Arc::new(RwLock::new(None)),
}
}

pub async fn set_connection_status_callback<F>(&self, callback: F)
where
F: Fn(bool) + Send + Sync + 'static,
{
*self.connection_status_callback.write().await = Some(Arc::new(callback));
}

pub async fn is_connected(&self) -> bool {
self.is_connected.load(Ordering::SeqCst)
}

async fn set_connected(&self, connected: bool) {
let old_value = self.is_connected.swap(connected, Ordering::SeqCst);
// Only notify if status actually changed
if old_value != connected {
let callback_opt = {
let callback = self.connection_status_callback.read().await;
callback.clone()
};
if let Some(cb) = callback_opt.as_ref() {
cb(connected);
}
}
}

Expand Down Expand Up @@ -85,6 +117,10 @@ impl SlaveClient {
let message_tx_for_send = self.message_tx.clone();
let sync_message_tx_for_store = self.sync_message_tx.clone();

// Channel to notify when first connection is established
let (first_connection_tx, mut first_connection_rx) =
mpsc::unbounded_channel::<Result<(), String>>();

// Spawn task to handle sending messages (will be connected when WebSocket is ready)
let send_tx_for_sending = send_tx.clone();
let (send_ready_tx, mut send_ready_rx) =
Expand Down Expand Up @@ -127,8 +163,11 @@ impl SlaveClient {
// Spawn connection task with auto-reconnect
let reconnection_status_for_task = self.reconnection_status.clone();
let current_attempt_for_task = self.current_attempt.clone();
let first_connection_tx_for_task = first_connection_tx.clone();
let client_for_status = Arc::new(self.clone());
tokio::spawn(async move {
let mut attempt = 0;
let mut is_first_connection = true;

loop {
if !should_reconnect.load(Ordering::SeqCst) {
Expand All @@ -140,6 +179,7 @@ impl SlaveClient {
status.last_error = None;
}
current_attempt_for_task.store(0, Ordering::SeqCst);
client_for_status.clone().set_connected(false).await;
break;
}

Expand Down Expand Up @@ -177,6 +217,14 @@ impl SlaveClient {
));
}
current_attempt_for_task.store(0, Ordering::SeqCst);
client_for_status.clone().set_connected(false).await;
// Notify first connection failure
if is_first_connection {
let _ = first_connection_tx_for_task.send(Err(format!(
"Failed to connect after {} attempts",
max_attempts
)));
}
break;
}

Expand All @@ -185,7 +233,8 @@ impl SlaveClient {
Ok((ws_stream, _)) => {
println!("Connected to master: {}", url);
attempt = 0; // Reset attempt counter on successful connection
// Update status: connected successfully
client_for_status.clone().set_connected(true).await;
// Update status: connected successfully
{
let mut status = reconnection_status_for_task.write().await;
status.is_reconnecting = false;
Expand All @@ -194,6 +243,12 @@ impl SlaveClient {
}
current_attempt_for_task.store(0, Ordering::SeqCst);

// Notify first connection success
if is_first_connection {
is_first_connection = false;
let _ = first_connection_tx_for_task.send(Ok(()));
}

let (ws_sender, mut ws_receiver) = ws_stream.split();
let tx_clone = tx.clone();

Expand All @@ -211,6 +266,7 @@ impl SlaveClient {
let message_tx_for_cleanup = message_tx_for_send.clone();
let sync_message_tx_for_cleanup = sync_message_tx_for_store.clone();
let reconnection_status_for_incoming = reconnection_status_for_task.clone();
let client_for_disconnect = client_for_status.clone();
tokio::spawn(async move {
while let Some(msg) = ws_receiver.next().await {
match msg {
Expand Down Expand Up @@ -260,6 +316,7 @@ impl SlaveClient {
status.attempt_count = 0;
status.last_error = Some("Connection lost".to_string());
}
client_for_disconnect.set_connected(false).await;
});

// Wait for connection to break
Expand All @@ -280,17 +337,48 @@ impl SlaveClient {
status.last_error = Some(format!("{}", e));
}
current_attempt_for_task.store(attempt, Ordering::SeqCst);
client_for_status.clone().set_connected(false).await;
// Notify first connection failure
if is_first_connection && attempt >= max_attempts {
let _ = first_connection_tx_for_task.send(Err(format!("{}", e)));
}
}
}
}
});

Ok((rx, send_tx))
// Wait for first connection to be established (with timeout)
let timeout = tokio::time::Duration::from_secs(30);
match tokio::time::timeout(timeout, first_connection_rx.recv()).await {
Ok(Some(Ok(()))) => {
// Connection established successfully
Ok((rx, send_tx))
}
Ok(Some(Err(e))) => {
// Connection failed
self.set_connected(false).await;
Err(anyhow::anyhow!("Failed to connect to master: {}", e))
}
Ok(None) => {
// Channel closed unexpectedly
self.set_connected(false).await;
Err(anyhow::anyhow!("Connection attempt was cancelled"))
}
Err(_) => {
// Timeout
self.set_connected(false).await;
Err(anyhow::anyhow!(
"Connection timeout after {} seconds",
timeout.as_secs()
))
}
}
}

pub async fn disconnect(&self) {
// Stop reconnection attempts
self.should_reconnect.store(false, Ordering::SeqCst);
self.set_connected(false).await;

// Update status: not reconnecting
{
Expand Down
Loading