Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,25 @@ await item.Value.CloudConnection.Filter(cp => cp.IsActive).ForEachAsync(
});
}
}

// After releasing the write lock, trigger reconnection
_ = Task.Run(async () =>
{
await Task.Delay(TimeSpan.FromSeconds(1)); // Brief delay
await this.TriggerReconnectionForAllDevices();
});
}

async Task TriggerReconnectionForAllDevices()
{
var devicesSnapshot = this.devices.ToArray();
var reconnectionTasks = devicesSnapshot
.Where(kvp => kvp.Value.DeviceConnection.Filter(dc => dc.IsActive).HasValue)
.Select(kvp => this.TryGetCloudConnectionInternal(kvp.Key));

await Task.WhenAll(reconnectionTasks);
}

ConnectedDevice GetOrCreateConnectedDevice(IIdentity identity)
{
string deviceId = Preconditions.CheckNotNull(identity, nameof(identity)).Id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1446,6 +1446,113 @@ static async Task<IEntityStore<string, string>> GetPopulatedEntityStore(string e

static IIdentityProvider GetIdentityProvider() => new IdentityProvider(IotHubHostName);

[Fact]
[Unit]
public async Task TestProactiveReconnectionAfterDeviceDisconnected()
{
// Arrange
string iotHubHostName = "iotHubName";
string edgeDeviceId = "edge";
var module1Credentials = new TokenCredentials(new ModuleIdentity(iotHubHostName, edgeDeviceId, "module1"), "xyz", DummyProductInfo, Option.None<string>(), Option.None<string>(), false);
var module2Credentials = new TokenCredentials(new ModuleIdentity(iotHubHostName, edgeDeviceId, "module2"), "xyz", DummyProductInfo, Option.None<string>(), Option.None<string>(), false);
var device1Credentials = new TokenCredentials(new DeviceIdentity(iotHubHostName, "device1"), "pqr", DummyProductInfo, Option.None<string>(), Option.None<string>(), false);

// Track connection creation calls
var connectionCreationCount = new Dictionary<string, int>();
var cloudConnectionProvider = Mock.Of<ICloudConnectionProvider>();
Action<string, CloudConnectionStatus> callback = null;
Mock.Get(cloudConnectionProvider)
.Setup(c => c.Connect(It.IsAny<IClientCredentials>(), It.IsAny<Action<string, CloudConnectionStatus>>()))
.Callback<IClientCredentials, Action<string, CloudConnectionStatus>>((creds, c) =>
{
callback = c;
var id = creds.Identity.Id;
connectionCreationCount[id] = connectionCreationCount.ContainsKey(id) ? connectionCreationCount[id] + 1 : 1;
})
.ReturnsAsync(() => Try.Success(GetCloudConnectionMock()));

var credentialsManager = Mock.Of<ICredentialsCache>();
var deviceConnectivityManager = new DeviceConnectivityManager();

var connectionManager = new ConnectionManager(cloudConnectionProvider, credentialsManager, GetIdentityProvider(), deviceConnectivityManager);
// Create device proxies that remain active (simulating device connections that stay connected)
var deviceProxy1 = CreateActiveDeviceProxy();
var deviceProxy2 = CreateActiveDeviceProxy();
var deviceProxy3 = CreateActiveDeviceProxy();

// Act - Create cloud connections
Try<ICloudProxy> module1CloudProxy = await connectionManager.CreateCloudConnectionAsync(module1Credentials);
Try<ICloudProxy> module2CloudProxy = await connectionManager.CreateCloudConnectionAsync(module2Credentials);
Try<ICloudProxy> device1CloudProxy = await connectionManager.CreateCloudConnectionAsync(device1Credentials);

// Add device connections (simulating MQTT/AMQP connections to EdgeHub)
await connectionManager.AddDeviceConnection(module1Credentials.Identity, deviceProxy1);
await connectionManager.AddDeviceConnection(module2Credentials.Identity, deviceProxy2);
await connectionManager.AddDeviceConnection(device1Credentials.Identity, deviceProxy3);

// Verify initial state
Assert.True(module1CloudProxy.Success);
Assert.True(module2CloudProxy.Success);
Assert.True(device1CloudProxy.Success);
Assert.True(module1CloudProxy.Value.IsActive);
Assert.True(module2CloudProxy.Value.IsActive);
Assert.True(device1CloudProxy.Value.IsActive);

// Verify initial connection counts
Assert.Equal(1, connectionCreationCount[module1Credentials.Identity.Id]);
Assert.Equal(1, connectionCreationCount[module2Credentials.Identity.Id]);
Assert.Equal(1, connectionCreationCount[device1Credentials.Identity.Id]);

// Act - Trigger device disconnected event (simulating network connectivity loss)
deviceConnectivityManager.InvokeDeviceDisconnected();

// Verify that cloud connections are closed
Assert.False(module1CloudProxy.Value.IsActive);
Assert.False(module2CloudProxy.Value.IsActive);
Assert.False(device1CloudProxy.Value.IsActive);

// Verify that device connections are still active (EdgeHub connections remain)
Assert.True(deviceProxy1.IsActive);
Assert.True(deviceProxy2.IsActive);
Assert.True(deviceProxy3.IsActive);

// Wait for potential proactive reconnection (in original code, this won't happen)
await Task.Delay(TimeSpan.FromSeconds(1));

// Verify that no proactive reconnection occurred in the original implementation
Assert.Equal(1, connectionCreationCount[module1Credentials.Identity.Id]);
Assert.Equal(1, connectionCreationCount[module2Credentials.Identity.Id]);
Assert.Equal(1, connectionCreationCount[device1Credentials.Identity.Id]);

// Act - Manually request connections (should trigger reactive reconnection)
Option<ICloudProxy> reconnectedModule1 = await connectionManager.GetCloudConnection(module1Credentials.Identity.Id);
Option<ICloudProxy> reconnectedModule2 = await connectionManager.GetCloudConnection(module2Credentials.Identity.Id);
Option<ICloudProxy> reconnectedDevice1 = await connectionManager.GetCloudConnection(device1Credentials.Identity.Id);

// Assert - Verify reactive reconnection happened
Assert.True(reconnectedModule1.HasValue);
Assert.True(reconnectedModule2.HasValue);
Assert.True(reconnectedDevice1.HasValue);
Assert.True(reconnectedModule1.OrDefault().IsActive);
Assert.True(reconnectedModule2.OrDefault().IsActive);
Assert.True(reconnectedDevice1.OrDefault().IsActive);

// Verify new connections were created (reactive reconnection)
Assert.Equal(2, connectionCreationCount[module1Credentials.Identity.Id]);
Assert.Equal(2, connectionCreationCount[module2Credentials.Identity.Id]);
Assert.Equal(2, connectionCreationCount[device1Credentials.Identity.Id]);
}

private static IDeviceProxy CreateActiveDeviceProxy()
{
var deviceProxy = new Mock<IDeviceProxy>();
deviceProxy.SetupGet(dp => dp.IsActive).Returns(true);
deviceProxy.Setup(dp => dp.CloseAsync(It.IsAny<Exception>()))
.Callback(() => deviceProxy.SetupGet(dp => dp.IsActive).Returns(false))
.Returns(Task.CompletedTask);
return deviceProxy.Object;
}

class DeviceConnectivityManager : IDeviceConnectivityManager
{
public event EventHandler DeviceConnected;
Expand Down
Loading
Loading