diff --git a/app/src/main/java/to/bitkit/repositories/LightningRepo.kt b/app/src/main/java/to/bitkit/repositories/LightningRepo.kt index ce3845225..8c03b0443 100644 --- a/app/src/main/java/to/bitkit/repositories/LightningRepo.kt +++ b/app/src/main/java/to/bitkit/repositories/LightningRepo.kt @@ -25,6 +25,7 @@ import kotlinx.coroutines.flow.update import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.tasks.await import kotlinx.coroutines.withContext import kotlinx.coroutines.withTimeoutOrNull @@ -45,6 +46,7 @@ import org.lightningdevkit.ldknode.SpendableUtxo import org.lightningdevkit.ldknode.Txid import to.bitkit.data.CacheStore import to.bitkit.data.SettingsStore +import to.bitkit.data.backup.VssBackupClient import to.bitkit.data.keychain.Keychain import to.bitkit.di.BgDispatcher import to.bitkit.env.Env @@ -91,6 +93,7 @@ class LightningRepo @Inject constructor( private val cacheStore: CacheStore, private val preActivityMetadataRepo: PreActivityMetadataRepo, private val connectivityRepo: ConnectivityRepo, + private val vssBackupClient: VssBackupClient, ) { private val _lightningState = MutableStateFlow(LightningState()) val lightningState = _lightningState.asStateFlow() @@ -109,6 +112,7 @@ class LightningRepo @Inject constructor( private val syncMutex = Mutex() private val syncPending = AtomicBoolean(false) private val syncRetryJob = AtomicReference(null) + private val lifecycleMutex = Mutex() init { observeConnectivityForSyncRetry() @@ -263,6 +267,7 @@ class LightningRepo @Inject constructor( customRgsServerUrl: String? = null, eventHandler: NodeEventHandler? = null, channelMigration: ChannelDataMigration? = null, + shouldValidateGraph: Boolean = true, ): Result = withContext(bgDispatcher) { if (_isRecoveryMode.value) { return@withContext Result.failure(RecoveryModeError()) @@ -270,88 +275,132 @@ class LightningRepo @Inject constructor( eventHandler?.let { _eventHandlers.add(it) } - val initialLifecycleState = _lightningState.value.nodeLifecycleState - if (initialLifecycleState.isRunningOrStarting()) { - Logger.info("LDK node start skipped, lifecycle state: $initialLifecycleState", context = TAG) - return@withContext Result.success(Unit) - } + // Track retry state outside mutex to avoid deadlock (Mutex is non-reentrant) + var shouldRetryStart = false + var shouldRestartForGraphReset = false + var initialLifecycleState: NodeLifecycleState = NodeLifecycleState.Stopped - runCatching { - _lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Starting) } - - // Setup if needed - if (lightningService.node == null) { - val setupResult = setup(walletIndex, customServerUrl, customRgsServerUrl, channelMigration) - if (setupResult.isFailure) { - _lightningState.update { - it.copy( - nodeLifecycleState = NodeLifecycleState.ErrorStarting( - setupResult.exceptionOrNull() ?: NodeSetupError() + val result = lifecycleMutex.withLock { + initialLifecycleState = _lightningState.value.nodeLifecycleState + if (initialLifecycleState.isRunningOrStarting()) { + Logger.info("LDK node start skipped, lifecycle state: $initialLifecycleState", context = TAG) + return@withLock Result.success(Unit) + } + + runCatching { + _lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Starting) } + + // Setup if needed + if (lightningService.node == null) { + val setupResult = setup(walletIndex, customServerUrl, customRgsServerUrl, channelMigration) + if (setupResult.isFailure) { + _lightningState.update { + it.copy( + nodeLifecycleState = NodeLifecycleState.ErrorStarting( + setupResult.exceptionOrNull() ?: NodeSetupError() + ) ) - ) + } + return@withLock setupResult } - return@withContext setupResult } - } - if (getStatus()?.isRunning == true) { - Logger.info("LDK node already running", context = TAG) - _lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Running) } - lightningService.startEventListener(::onEvent).onFailure { - Logger.warn("Failed to start event listener", it, context = TAG) - return@withContext Result.failure(it) + if (getStatus()?.isRunning == true) { + Logger.info("LDK node already running", context = TAG) + _lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Running) } + lightningService.startEventListener(::onEvent).onFailure { + Logger.warn("Failed to start event listener", it, context = TAG) + return@withLock Result.failure(it) + } + return@withLock Result.success(Unit) } - return@withContext Result.success(Unit) - } - // Start node - lightningService.start(timeout, ::onEvent) + // Start node + lightningService.start(timeout, ::onEvent) - _lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Running) } + _lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Running) } - // Initial state sync - syncState() - updateGeoBlockState() - refreshChannelCache() + // Initial state sync + syncState() + updateGeoBlockState() + refreshChannelCache() - // Post-startup tasks (non-blocking) - connectToTrustedPeers().onFailure { - Logger.error("Failed to connect to trusted peers", it, context = TAG) - } + // Validate network graph has trusted peers (RGS cache can become stale) + if (shouldValidateGraph && !lightningService.validateNetworkGraph()) { + Logger.warn("Network graph is stale, resetting and restarting...", context = TAG) + lightningService.stop() + lightningService.resetNetworkGraph(walletIndex) + // Also clear stale graph from VSS to prevent fallback restoration + runCatching { + vssBackupClient.setup(walletIndex).getOrThrow() + vssBackupClient.deleteObject("network_graph").getOrThrow() + Logger.info("Cleared stale network graph from VSS", context = TAG) + }.onFailure { + Logger.warn("Failed to clear graph from VSS", it, context = TAG) + } + _lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Stopped) } + shouldRestartForGraphReset = true + return@withLock Result.success(Unit) + } - sync().onFailure { e -> - Logger.warn("Initial sync failed, event-driven sync will retry", e, context = TAG) - } - scope.launch { registerForNotifications() } - Unit - }.onFailure { e -> - val currentLifecycleState = _lightningState.value.nodeLifecycleState - if (currentLifecycleState.isRunning()) { - Logger.warn("Start error occurred but node is $currentLifecycleState, skipping retry", e, context = TAG) - return@withContext Result.success(Unit) - } + // Post-startup tasks (non-blocking) + connectToTrustedPeers().onFailure { + Logger.error("Failed to connect to trusted peers", it, context = TAG) + } - if (shouldRetry) { - val retryDelay = 2.seconds - Logger.warn("Start error, retrying after $retryDelay...", e, context = TAG) - _lightningState.update { it.copy(nodeLifecycleState = initialLifecycleState) } - - delay(retryDelay) - return@withContext start( - walletIndex = walletIndex, - timeout = timeout, - shouldRetry = false, - customServerUrl = customServerUrl, - customRgsServerUrl = customRgsServerUrl, - channelMigration = channelMigration, - ) - } else { - _lightningState.update { - it.copy(nodeLifecycleState = NodeLifecycleState.ErrorStarting(e)) + sync().onFailure { e -> + Logger.warn("Initial sync failed, event-driven sync will retry", e, context = TAG) + } + scope.launch { registerForNotifications() } + Result.success(Unit) + }.getOrElse { e -> + val currentState = _lightningState.value.nodeLifecycleState + if (currentState.isRunning()) { + Logger.warn("Start error but node is $currentState, skipping retry", e, context = TAG) + return@withLock Result.success(Unit) + } + + if (shouldRetry) { + Logger.warn("Start error, will retry...", e, context = TAG) + _lightningState.update { it.copy(nodeLifecycleState = initialLifecycleState) } + shouldRetryStart = true + Result.failure(e) + } else { + _lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.ErrorStarting(e)) } + Result.failure(e) } - return@withContext Result.failure(e) } } + + // Retry OUTSIDE the mutex to avoid deadlock (Kotlin Mutex is non-reentrant) + if (shouldRetryStart) { + delay(2.seconds) + return@withContext start( + walletIndex = walletIndex, + timeout = timeout, + shouldRetry = false, + customServerUrl = customServerUrl, + customRgsServerUrl = customRgsServerUrl, + channelMigration = channelMigration, + shouldValidateGraph = shouldValidateGraph, + ) + } + + // Restart after graph reset OUTSIDE the mutex to avoid deadlock + if (shouldRestartForGraphReset) { + return@withContext start( + walletIndex = walletIndex, + timeout = timeout, + shouldRetry = shouldRetry, + customServerUrl = customServerUrl, + customRgsServerUrl = customRgsServerUrl, + eventHandler = eventHandler, + channelMigration = channelMigration, + shouldValidateGraph = false, // Prevent infinite loop + ) + } + + result } private suspend fun onEvent(event: Event) { @@ -375,16 +424,27 @@ class LightningRepo @Inject constructor( } suspend fun stop(): Result = withContext(bgDispatcher) { - if (_lightningState.value.nodeLifecycleState.isStoppedOrStopping()) { - return@withContext Result.success(Unit) - } + lifecycleMutex.withLock { + if (_lightningState.value.nodeLifecycleState.isStoppedOrStopping()) { + return@withLock Result.success(Unit) + } - runCatching { - _lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Stopping) } - lightningService.stop() - _lightningState.update { LightningState(nodeLifecycleState = NodeLifecycleState.Stopped) } - }.onFailure { - Logger.error("Node stop error", it, context = TAG) + runCatching { + _lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Stopping) } + lightningService.stop() + _lightningState.update { LightningState(nodeLifecycleState = NodeLifecycleState.Stopped) } + }.onFailure { + Logger.error("Node stop error", it, context = TAG) + // On failure, check actual node state and update accordingly + // If node is still running, revert to Running state to allow retry + if (lightningService.node != null && lightningService.status?.isRunning == true) { + Logger.warn("Stop failed but node is still running, reverting to Running state", context = TAG) + _lightningState.update { it.copy(nodeLifecycleState = NodeLifecycleState.Running) } + } else { + // Node appears stopped, update state + _lightningState.update { LightningState(nodeLifecycleState = NodeLifecycleState.Stopped) } + } + } } } diff --git a/app/src/main/java/to/bitkit/services/LightningService.kt b/app/src/main/java/to/bitkit/services/LightningService.kt index 8862e580e..4df47a556 100644 --- a/app/src/main/java/to/bitkit/services/LightningService.kt +++ b/app/src/main/java/to/bitkit/services/LightningService.kt @@ -262,6 +262,63 @@ class LightningService @Inject constructor( Logger.info("LDK storage wiped", context = TAG) } + /** + * Resets the network graph cache, forcing a full RGS sync on next startup. + * This is useful when the cached graph is stale or missing nodes. + * Note: Node must be stopped before calling this. + */ + fun resetNetworkGraph(walletIndex: Int) { + if (node != null) throw ServiceError.NodeStillRunning() + Logger.warn("Resetting network graph cache…", context = TAG) + val ldkPath = Path(Env.ldkStoragePath(walletIndex)).toFile() + val graphFile = ldkPath.resolve("network_graph_cache") + if (graphFile.exists()) { + graphFile.delete() + Logger.info("Network graph cache deleted", context = TAG) + } else { + Logger.info("No network graph cache found", context = TAG) + } + } + + /** + * Validates that all trusted peers are present in the network graph. + * Returns false if all trusted peers are missing, indicating the graph cache is stale. + */ + fun validateNetworkGraph(): Boolean { + val node = this.node ?: return true + val graph = node.networkGraph() + val graphNodes = graph.listNodes().toSet() + if (graphNodes.isEmpty()) { + val rgsTimestamp = node.status().latestRgsSnapshotTimestamp + if (rgsTimestamp != null) { + Logger.warn("Network graph is empty despite RGS timestamp $rgsTimestamp", context = TAG) + return false + } + Logger.debug("Network graph is empty, skipping validation", context = TAG) + return true + } + val missingPeers = trustedPeers.filter { it.nodeId !in graphNodes } + if (missingPeers.size == trustedPeers.size) { + Logger.warn( + "Network graph missing all ${trustedPeers.size} trusted peers", + context = TAG, + ) + return false + } + if (missingPeers.isNotEmpty()) { + Logger.debug( + "Network graph missing ${missingPeers.size}/${trustedPeers.size} trusted peers", + context = TAG, + ) + } + val presentCount = trustedPeers.size - missingPeers.size + Logger.debug( + "Network graph validated: $presentCount/${trustedPeers.size} trusted peers present", + context = TAG, + ) + return true + } + suspend fun sync() { val node = this.node ?: throw ServiceError.NodeNotSetup() diff --git a/app/src/test/java/to/bitkit/androidServices/LightningNodeServiceTest.kt b/app/src/test/java/to/bitkit/androidServices/LightningNodeServiceTest.kt index f8c6bd9aa..918e31ac8 100644 --- a/app/src/test/java/to/bitkit/androidServices/LightningNodeServiceTest.kt +++ b/app/src/test/java/to/bitkit/androidServices/LightningNodeServiceTest.kt @@ -101,6 +101,7 @@ class LightningNodeServiceTest : BaseUnitTest() { anyOrNull(), anyOrNull(), anyOrNull(), + any(), ) } doAnswer { capturedHandler = it.getArgument(5) as? NodeEventHandler diff --git a/app/src/test/java/to/bitkit/repositories/LightningRepoTest.kt b/app/src/test/java/to/bitkit/repositories/LightningRepoTest.kt index 10ed883c3..703263fe0 100644 --- a/app/src/test/java/to/bitkit/repositories/LightningRepoTest.kt +++ b/app/src/test/java/to/bitkit/repositories/LightningRepoTest.kt @@ -32,6 +32,7 @@ import to.bitkit.data.AppCacheData import to.bitkit.data.CacheStore import to.bitkit.data.SettingsData import to.bitkit.data.SettingsStore +import to.bitkit.data.backup.VssBackupClient import to.bitkit.data.keychain.Keychain import to.bitkit.ext.createChannelDetails import to.bitkit.ext.of @@ -65,6 +66,7 @@ class LightningRepoTest : BaseUnitTest() { private val preActivityMetadataRepo = mock() private val lnurlService = mock() private val connectivityRepo = mock() + private val vssBackupClient = mock() @Before fun setUp() = runBlocking { @@ -82,6 +84,7 @@ class LightningRepoTest : BaseUnitTest() { cacheStore = cacheStore, preActivityMetadataRepo = preActivityMetadataRepo, connectivityRepo = connectivityRepo, + vssBackupClient = vssBackupClient, ) } @@ -91,6 +94,7 @@ class LightningRepoTest : BaseUnitTest() { whenever(lightningService.setup(any(), anyOrNull(), anyOrNull(), anyOrNull(), anyOrNull())).thenReturn(Unit) whenever(lightningService.start(anyOrNull(), any())).thenReturn(Unit) whenever(lightningService.sync()).thenReturn(Unit) + whenever(lightningService.validateNetworkGraph()).thenReturn(true) whenever(settingsStore.data).thenReturn(flowOf(SettingsData())) val blocktank = mock() whenever(coreService.blocktank).thenReturn(blocktank) @@ -107,6 +111,7 @@ class LightningRepoTest : BaseUnitTest() { whenever(lightningService.node).thenReturn(mock()) whenever(lightningService.setup(any(), anyOrNull(), anyOrNull(), anyOrNull(), anyOrNull())).thenReturn(Unit) whenever(lightningService.start(anyOrNull(), any())).thenReturn(Unit) + whenever(lightningService.validateNetworkGraph()).thenReturn(true) val blocktank = mock() whenever(coreService.blocktank).thenReturn(blocktank) whenever(blocktank.info(any())).thenReturn(null) @@ -388,6 +393,7 @@ class LightningRepoTest : BaseUnitTest() { whenever(lightningService.node).thenReturn(mock()) whenever(lightningService.setup(any(), anyOrNull(), anyOrNull(), anyOrNull(), anyOrNull())).thenReturn(Unit) whenever(lightningService.start(anyOrNull(), any())).thenReturn(Unit) + whenever(lightningService.validateNetworkGraph()).thenReturn(true) whenever(lightningService.sync()).thenThrow(RuntimeException("Sync failed")) whenever(settingsStore.data).thenReturn(flowOf(SettingsData())) val blocktank = mock() @@ -621,6 +627,7 @@ class LightningRepoTest : BaseUnitTest() { whenever(lightningService.node).thenReturn(null) whenever(lightningService.setup(any(), anyOrNull(), anyOrNull(), anyOrNull(), anyOrNull())).thenReturn(Unit) whenever(lightningService.start(anyOrNull(), any())).thenReturn(Unit) + whenever(lightningService.validateNetworkGraph()).thenReturn(true) whenever(settingsStore.data).thenReturn(flowOf(SettingsData())) val blocktank = mock() @@ -665,6 +672,7 @@ class LightningRepoTest : BaseUnitTest() { whenever(lightningService.node).thenReturn(null) whenever(lightningService.setup(any(), anyOrNull(), anyOrNull(), anyOrNull(), anyOrNull())).thenReturn(Unit) whenever(lightningService.start(anyOrNull(), any())).thenReturn(Unit) + whenever(lightningService.validateNetworkGraph()).thenReturn(true) whenever(settingsStore.data).thenReturn(flowOf(SettingsData())) val blocktank = mock() @@ -690,6 +698,7 @@ class LightningRepoTest : BaseUnitTest() { // lightningService.start() succeeds (state becomes Running at line 241) whenever(lightningService.start(anyOrNull(), any())).thenReturn(Unit) + whenever(lightningService.validateNetworkGraph()).thenReturn(true) // lightningService.nodeId throws during syncState() (called at line 244, AFTER state = Running) whenever(lightningService.nodeId).thenThrow(RuntimeException("error during syncState")) diff --git a/app/src/test/java/to/bitkit/ui/WalletViewModelTest.kt b/app/src/test/java/to/bitkit/ui/WalletViewModelTest.kt index 579fc2287..fdfbacb49 100644 --- a/app/src/test/java/to/bitkit/ui/WalletViewModelTest.kt +++ b/app/src/test/java/to/bitkit/ui/WalletViewModelTest.kt @@ -241,8 +241,18 @@ class WalletViewModelTest : BaseUnitTest() { whenever(testWalletRepo.walletExists()).thenReturn(true) whenever(testLightningRepo.lightningState).thenReturn(lightningState) whenever(testLightningRepo.isRecoveryMode).thenReturn(isRecoveryMode) - whenever(testLightningRepo.start(any(), anyOrNull(), any(), anyOrNull(), anyOrNull(), anyOrNull(), anyOrNull())) - .thenReturn(Result.success(Unit)) + whenever( + testLightningRepo.start( + any(), + anyOrNull(), + any(), + anyOrNull(), + anyOrNull(), + anyOrNull(), + anyOrNull(), + any(), + ), + ).thenReturn(Result.success(Unit)) val testSut = WalletViewModel( context = context, @@ -262,7 +272,16 @@ class WalletViewModelTest : BaseUnitTest() { testSut.start() advanceUntilIdle() - verify(testLightningRepo).start(any(), anyOrNull(), any(), anyOrNull(), anyOrNull(), anyOrNull(), anyOrNull()) + verify(testLightningRepo).start( + any(), + anyOrNull(), + any(), + anyOrNull(), + anyOrNull(), + anyOrNull(), + anyOrNull(), + any(), + ) verify(testWalletRepo).refreshBip21() } @@ -282,8 +301,18 @@ class WalletViewModelTest : BaseUnitTest() { whenever(testWalletRepo.restoreWallet(any(), anyOrNull())).thenReturn(Result.success(Unit)) whenever(testLightningRepo.lightningState).thenReturn(lightningState) whenever(testLightningRepo.isRecoveryMode).thenReturn(isRecoveryMode) - whenever(testLightningRepo.start(any(), anyOrNull(), any(), anyOrNull(), anyOrNull(), anyOrNull(), anyOrNull())) - .thenReturn(Result.success(Unit)) + whenever( + testLightningRepo.start( + any(), + anyOrNull(), + any(), + anyOrNull(), + anyOrNull(), + anyOrNull(), + anyOrNull(), + any(), + ), + ).thenReturn(Result.success(Unit)) val testSut = WalletViewModel( context = context,