diff --git a/changelog.d/3-bug-fixes/WPB-25906 b/changelog.d/3-bug-fixes/WPB-25906 new file mode 100644 index 00000000000..9387647ac65 --- /dev/null +++ b/changelog.d/3-bug-fixes/WPB-25906 @@ -0,0 +1 @@ +Make migration locks release safely on failure diff --git a/libs/wire-subsystems/src/Wire/ConversationStore/Cassandra.hs b/libs/wire-subsystems/src/Wire/ConversationStore/Cassandra.hs index ee25f5f3acb..dbddef3cd34 100644 --- a/libs/wire-subsystems/src/Wire/ConversationStore/Cassandra.hs +++ b/libs/wire-subsystems/src/Wire/ConversationStore/Cassandra.hs @@ -53,6 +53,7 @@ import Polysemy.Conc import Polysemy.Embed import Polysemy.Error (Error, mapError, throw) import Polysemy.Input +import Polysemy.Resource (Resource) import Polysemy.Time import Polysemy.TinyLog import System.Logger qualified as Log @@ -1086,7 +1087,8 @@ interpretConversationStoreToCassandraAndPostgres :: PGConstraints r, Member Async r, Member (Error MigrationError) r, - Member Race r + Member Race r, + Member Resource r ) => ClientState -> Sem (ConversationStore ': r) a -> @@ -1550,7 +1552,7 @@ instance APIError MigrationError where toResponse _ = waiErrorToJSONResponse $ WaiError.mkError status500 "internal-server-error" "Internal Server Error" withMigrationLockAndCleanup :: - (PGConstraints r, Member Async r, Member TinyLog r, Member Race r, Member (Error MigrationError) r) => + (PGConstraints r, Member Async r, Member TinyLog r, Member Race r, Member Resource r, Member (Error MigrationError) r) => ClientState -> LockType -> Either ConvId UserId -> @@ -1566,6 +1568,7 @@ withMigrationLocksAndConvCleanup :: Member Async r, Member TinyLog r, Member Race r, + Member Resource r, Member (Error MigrationError) r, TimeUnit u ) => @@ -1587,6 +1590,7 @@ withMigrationLocksAndUserCleanup :: Member Async r, Member TinyLog r, Member Race r, + Member Resource r, Member (Error MigrationError) r, TimeUnit u ) => diff --git a/libs/wire-subsystems/src/Wire/ConversationStore/Migration.hs b/libs/wire-subsystems/src/Wire/ConversationStore/Migration.hs index 881ccd38ea3..1ddee3759b6 100644 --- a/libs/wire-subsystems/src/Wire/ConversationStore/Migration.hs +++ b/libs/wire-subsystems/src/Wire/ConversationStore/Migration.hs @@ -46,6 +46,7 @@ import Polysemy.Async import Polysemy.Conc import Polysemy.Error import Polysemy.Input +import Polysemy.Resource (Resource, resourceToIOFinal) import Polysemy.State import Polysemy.Time import Polysemy.TinyLog @@ -83,6 +84,7 @@ type EffectStack = [ State Int, Input ClientState, Input Hasql.Pool, + Resource, Async, Race, TinyLog, @@ -137,6 +139,7 @@ interpreter cassClient pgPool logger name = . raiseUnder . interpretRace . asyncToIOFinal + . resourceToIOFinal . runInputConst pgPool . runInputConst cassClient . runState 0 @@ -148,6 +151,7 @@ migrateAllConversations :: Member TinyLog r, Member Async r, Member Race r, + Member Resource r, Member (State Int) r, Member (Concurrency Unsafe) r ) => @@ -170,6 +174,7 @@ migrateAllUsers :: Member TinyLog r, Member Async r, Member Race r, + Member Resource r, Member (State Int) r, Member (Concurrency 'Unsafe) r ) => @@ -210,7 +215,8 @@ migrateConversation :: Member TinyLog r, Member Async r, Member (Error MigrationLockError) r, - Member Race r + Member Race r, + Member Resource r ) => Prometheus.Counter -> ConvId -> @@ -443,7 +449,7 @@ saveConvToPostgres allConvData = do -- * Users -migrateUser :: (PGConstraints r, Member (Input ClientState) r, Member TinyLog r, Member Async r, Member (Error MigrationLockError) r, Member Race r) => Prometheus.Counter -> UserId -> Sem r () +migrateUser :: (PGConstraints r, Member (Input ClientState) r, Member TinyLog r, Member Async r, Member (Error MigrationLockError) r, Member Race r, Member Resource r) => Prometheus.Counter -> UserId -> Sem r () migrateUser migCounter uid = do withMigrationLocks LockExclusive (Seconds 10) [uid] $ do statusses <- getRemoteMemberStatusFromCassandra uid diff --git a/libs/wire-subsystems/src/Wire/DomainRegistrationStore/DualWrite.hs b/libs/wire-subsystems/src/Wire/DomainRegistrationStore/DualWrite.hs index 435a4c418f8..fb8c1b4dd3f 100644 --- a/libs/wire-subsystems/src/Wire/DomainRegistrationStore/DualWrite.hs +++ b/libs/wire-subsystems/src/Wire/DomainRegistrationStore/DualWrite.hs @@ -26,6 +26,7 @@ import Polysemy import Polysemy.Async import Polysemy.Conc.Effect.Race import Polysemy.Error +import Polysemy.Resource (Resource) import Polysemy.Time import Polysemy.TinyLog import Wire.DomainRegistrationStore @@ -40,6 +41,7 @@ interpretDomainRegistrationStoreToCassandraAndPostgres :: Member TinyLog r, Member Async r, Member Race r, + Member Resource r, Member (Error MigrationLockError) r ) => ClientState -> diff --git a/libs/wire-subsystems/src/Wire/DomainRegistrationStore/Migration.hs b/libs/wire-subsystems/src/Wire/DomainRegistrationStore/Migration.hs index 8bebc7aa2e0..bfb98de7d77 100644 --- a/libs/wire-subsystems/src/Wire/DomainRegistrationStore/Migration.hs +++ b/libs/wire-subsystems/src/Wire/DomainRegistrationStore/Migration.hs @@ -35,6 +35,7 @@ import Polysemy.Conc (interpretRace) import Polysemy.Conc.Effect.Race hiding (Timeout) import Polysemy.Error import Polysemy.Input +import Polysemy.Resource (Resource, resourceToIOFinal) import Polysemy.State import Polysemy.Time import Polysemy.TinyLog @@ -57,6 +58,7 @@ type EffectStack = [ State Int, Input ClientState, Input Hasql.Pool, + Resource, Async, Race, TinyLog, @@ -91,6 +93,7 @@ interpreter cassClient pgPool logger name = . raiseUnder . interpretRace . asyncToIOFinal + . resourceToIOFinal . runInputConst pgPool . runInputConst cassClient . runState 0 @@ -102,7 +105,8 @@ migrateAllDomainRegistrations :: Member TinyLog r, Member (State Int) r, Member Async r, - Member Race r + Member Race r, + Member Resource r ) => MigrationOptions -> Prometheus.Counter -> @@ -124,7 +128,8 @@ migrateDomainRegistrationRow :: Member TinyLog r, Member Async r, Member (Error MigrationLockError) r, - Member Race r + Member Race r, + Member Resource r ) => Prometheus.Counter -> StoredDomainRegistration -> diff --git a/libs/wire-subsystems/src/Wire/MigrationLock.hs b/libs/wire-subsystems/src/Wire/MigrationLock.hs index a1e18b5099d..678d8b9cd16 100644 --- a/libs/wire-subsystems/src/Wire/MigrationLock.hs +++ b/libs/wire-subsystems/src/Wire/MigrationLock.hs @@ -37,6 +37,7 @@ import Polysemy.Async import Polysemy.Conc.Effect.Race import Polysemy.Error import Polysemy.Input +import Polysemy.Resource (Resource, bracket) import Polysemy.Time.Data.TimeUnit import Polysemy.TinyLog (TinyLog) import Polysemy.TinyLog qualified as TinyLog @@ -77,6 +78,7 @@ withMigrationLocks :: Member Async r, Member TinyLog r, Member Race r, + Member Resource r, Member (Error MigrationLockError) r, TimeUnit u, MigrationLockable x @@ -87,42 +89,44 @@ withMigrationLocks :: Sem r a -> Sem r a withMigrationLocks lockType maxWait lockables action = do - lockAcquired <- embed newEmptyMVar - actionCompleted <- embed newEmptyMVar - - pool <- input - lockThread <- async . embed . Hasql.use pool $ do - let lockIds = fmap lockKey lockables - Session.statement lockIds acquireLocks - - liftIO $ putMVar lockAcquired () - liftIO $ takeMVar actionCompleted - - Session.statement lockIds releaseLocks - - void . timeout (cancel lockThread >> throw TimedOutAcquiringLock) maxWait $ embed (takeMVar lockAcquired) - res <- action - embed $ putMVar actionCompleted () - - mEithErr <- timeout (cancel lockThread) (Seconds 1) $ await lockThread - let logFirstLock = - case lockables of - [] -> id - (x : _) -> Log.field ("first_" <> lockScope @x) (lockKey x) - logError errorStr = - TinyLog.warn $ - Log.msg (Log.val "Failed to cleanly unlock the migration locks") - . logFirstLock - . Log.field "numberOfLocks" (length lockables) - . Log.field "error" errorStr - case mEithErr of - Left () -> logError "timed out waiting for unlock" - Right (Nothing) -> logError "lock/unlock thread didn't finish" - Right (Just (Left e)) -> logError (show e) - Right (Just (Right ())) -> pure () - - pure res + bracket acquire release (const action) where + acquire = do + lockAcquired <- embed newEmptyMVar + actionCompleted <- embed newEmptyMVar + + pool <- input + lockThread <- async . embed . Hasql.use pool $ do + let lockIds = fmap lockKey lockables + Session.statement lockIds acquireLocks + + liftIO $ putMVar lockAcquired () + liftIO $ takeMVar actionCompleted + + Session.statement lockIds releaseLocks + + void . timeout (cancel lockThread >> throw TimedOutAcquiringLock) maxWait $ embed (takeMVar lockAcquired) + pure (actionCompleted, lockThread) + + release (actionCompleted, lockThread) = do + let logFirstLock = + case lockables of + [] -> id + (x : _) -> Log.field ("first_" <> lockScope @x) (lockKey x) + logError errorStr = + TinyLog.warn $ + Log.msg (Log.val "Failed to cleanly unlock the migration locks") + . logFirstLock + . Log.field "numberOfLocks" (length lockables) + . Log.field "error" errorStr + _ <- embed $ tryPutMVar actionCompleted () + mEithErr <- timeout (cancel lockThread) (Seconds 1) $ await lockThread + case mEithErr of + Left () -> logError "timed out waiting for unlock" + Right (Nothing) -> logError "lock/unlock thread didn't finish" + Right (Just (Left e)) -> logError (show e) + Right (Just (Right ())) -> pure () + acquireLocks :: Hasql.Statement [Int64] () acquireLocks = lmapPG @(Vector _) diff --git a/libs/wire-subsystems/src/Wire/TeamFeatureStore/Migrating.hs b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Migrating.hs index e85b883c9ed..e35c8f2fe8c 100644 --- a/libs/wire-subsystems/src/Wire/TeamFeatureStore/Migrating.hs +++ b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Migrating.hs @@ -27,6 +27,7 @@ import Polysemy.Async import Polysemy.Conc.Effect.Race import Polysemy.Error import Polysemy.Input +import Polysemy.Resource (Resource) import Polysemy.Time import Polysemy.TinyLog import Wire.API.Team.Feature @@ -46,6 +47,7 @@ interpretTeamFeatureStoreToCassandraAndPostgres :: Member TinyLog r, Member Async r, Member Race r, + Member Resource r, Member (Error MigrationLockError) r ) => Sem (TeamFeatureStore ': r) a -> @@ -66,6 +68,7 @@ getDbFeatureImpl :: Member Async r, Member Race r, Member (Input ClientState) r, + Member Resource r, Member (Error MigrationLockError) r ) => FeatureSingleton cfg -> @@ -105,6 +108,7 @@ setDbFeatureImpl :: Member Async r, Member Race r, Member (Input ClientState) r, + Member Resource r, Member (Error MigrationLockError) r ) => FeatureSingleton cfg -> @@ -121,6 +125,7 @@ setFeatureLockStatusImpl :: Member Async r, Member Race r, Member (Input ClientState) r, + Member Resource r, Member (Error MigrationLockError) r ) => FeatureSingleton cfg -> @@ -137,6 +142,7 @@ patchDbFeatureImpl :: Member Async r, Member Race r, Member (Input ClientState) r, + Member Resource r, Member (Error MigrationLockError) r ) => FeatureSingleton cfg -> @@ -159,6 +165,7 @@ withWritePathUnderLock :: Member Async r, Member Race r, Member (Input ClientState) r, + Member Resource r, Member (Error MigrationLockError) r, IsFeatureConfig cfg ) => @@ -184,6 +191,7 @@ withSharedLock :: Member TinyLog r, Member Async r, Member Race r, + Member Resource r, Member (Error MigrationLockError) r, MigrationLockable x ) => diff --git a/libs/wire-subsystems/src/Wire/TeamFeatureStore/Migration.hs b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Migration.hs index d072cc590ca..5e5a7d95382 100644 --- a/libs/wire-subsystems/src/Wire/TeamFeatureStore/Migration.hs +++ b/libs/wire-subsystems/src/Wire/TeamFeatureStore/Migration.hs @@ -29,6 +29,7 @@ import Polysemy.Async import Polysemy.Conc import Polysemy.Error import Polysemy.Input +import Polysemy.Resource (Resource, resourceToIOFinal) import Polysemy.State import Polysemy.Time import Polysemy.TinyLog @@ -50,7 +51,8 @@ migrateAllTeamFeatures :: Member TinyLog r, Member (State Int) r, Member Async r, - Member Race r + Member Race r, + Member Resource r ) => MigrationOptions -> Prometheus.Counter -> @@ -65,6 +67,7 @@ type EffectStack = [ State Int, Input ClientState, Input Hasql.Pool, + Resource, Async, Race, TinyLog, @@ -99,6 +102,7 @@ interpreter cassClient pgPool logger name = . raiseUnder . interpretRace . asyncToIOFinal + . resourceToIOFinal . runInputConst pgPool . runInputConst cassClient . runState 0 @@ -108,7 +112,8 @@ migrateTeamFeature :: Member TinyLog r, Member Async r, Member (Error MigrationLockError) r, - Member Race r + Member Race r, + Member Resource r ) => Prometheus.Counter -> (TeamId, Text, Maybe FeatureStatus, Maybe LockStatus, Maybe DbConfig) -> diff --git a/services/brig/src/Brig/CanonicalInterpreter.hs b/services/brig/src/Brig/CanonicalInterpreter.hs index 06eb36f10ff..83803ab3b1e 100644 --- a/services/brig/src/Brig/CanonicalInterpreter.hs +++ b/services/brig/src/Brig/CanonicalInterpreter.hs @@ -50,6 +50,7 @@ import Polysemy.Embed (runEmbedded) import Polysemy.Error (Error, errorToIOFinal, mapError, runError) import Polysemy.Input (Input, runInputConst) import Polysemy.Internal.Kind +import Polysemy.Resource import Polysemy.TinyLog (TinyLog) import Wire.API.Error (ErrorS, errorToWai) import Wire.API.Error.Galley @@ -275,6 +276,7 @@ type BrigLowerLevelEffects = Embed IO, Race, Async, + Resource, Concurrency 'Unsafe, Final IO ] @@ -408,6 +410,7 @@ runBrigToIO e (AppT ma) = do ( either throwM pure <=< ( runFinal . unsafelyPerformConcurrency + . resourceToIOFinal . asyncToIOFinal . interpretRace . embedToFinal diff --git a/services/galley/src/Galley/App.hs b/services/galley/src/Galley/App.hs index cc38ae92501..b9cf3fe3fa8 100644 --- a/services/galley/src/Galley/App.hs +++ b/services/galley/src/Galley/App.hs @@ -408,7 +408,8 @@ evalGalley e = PGConstraints r, Member Async r, Member (Error MigrationError) r, - Member Race r + Member Race r, + Member Resource r ) => Sem (ConversationStore ': r) a -> Sem r a