Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3-bug-fixes/WPB-25906
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make migration locks release safely on failure
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import Polysemy.Conc
import Polysemy.Embed
import Polysemy.Error (Error, mapError, throw)
import Polysemy.Input
import Polysemy.Resource (Resource)
import Polysemy.Time
import Polysemy.TinyLog
import System.Logger qualified as Log
Expand Down Expand Up @@ -1086,7 +1087,8 @@ interpretConversationStoreToCassandraAndPostgres ::
PGConstraints r,
Member Async r,
Member (Error MigrationError) r,
Member Race r
Member Race r,
Member Resource r
) =>
ClientState ->
Sem (ConversationStore ': r) a ->
Expand Down Expand Up @@ -1550,7 +1552,7 @@ instance APIError MigrationError where
toResponse _ = waiErrorToJSONResponse $ WaiError.mkError status500 "internal-server-error" "Internal Server Error"

withMigrationLockAndCleanup ::
(PGConstraints r, Member Async r, Member TinyLog r, Member Race r, Member (Error MigrationError) r) =>
(PGConstraints r, Member Async r, Member TinyLog r, Member Race r, Member Resource r, Member (Error MigrationError) r) =>
ClientState ->
LockType ->
Either ConvId UserId ->
Expand All @@ -1566,6 +1568,7 @@ withMigrationLocksAndConvCleanup ::
Member Async r,
Member TinyLog r,
Member Race r,
Member Resource r,
Member (Error MigrationError) r,
TimeUnit u
) =>
Expand All @@ -1587,6 +1590,7 @@ withMigrationLocksAndUserCleanup ::
Member Async r,
Member TinyLog r,
Member Race r,
Member Resource r,
Member (Error MigrationError) r,
TimeUnit u
) =>
Expand Down
10 changes: 8 additions & 2 deletions libs/wire-subsystems/src/Wire/ConversationStore/Migration.hs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import Polysemy.Async
import Polysemy.Conc
import Polysemy.Error
import Polysemy.Input
import Polysemy.Resource (Resource, resourceToIOFinal)
import Polysemy.State
import Polysemy.Time
import Polysemy.TinyLog
Expand Down Expand Up @@ -83,6 +84,7 @@ type EffectStack =
[ State Int,
Input ClientState,
Input Hasql.Pool,
Resource,
Async,
Race,
TinyLog,
Expand Down Expand Up @@ -137,6 +139,7 @@ interpreter cassClient pgPool logger name =
. raiseUnder
. interpretRace
. asyncToIOFinal
. resourceToIOFinal
. runInputConst pgPool
. runInputConst cassClient
. runState 0
Expand All @@ -148,6 +151,7 @@ migrateAllConversations ::
Member TinyLog r,
Member Async r,
Member Race r,
Member Resource r,
Member (State Int) r,
Member (Concurrency Unsafe) r
) =>
Expand All @@ -170,6 +174,7 @@ migrateAllUsers ::
Member TinyLog r,
Member Async r,
Member Race r,
Member Resource r,
Member (State Int) r,
Member (Concurrency 'Unsafe) r
) =>
Expand Down Expand Up @@ -210,7 +215,8 @@ migrateConversation ::
Member TinyLog r,
Member Async r,
Member (Error MigrationLockError) r,
Member Race r
Member Race r,
Member Resource r
) =>
Prometheus.Counter ->
ConvId ->
Expand Down Expand Up @@ -443,7 +449,7 @@ saveConvToPostgres allConvData = do

-- * Users

migrateUser :: (PGConstraints r, Member (Input ClientState) r, Member TinyLog r, Member Async r, Member (Error MigrationLockError) r, Member Race r) => Prometheus.Counter -> UserId -> Sem r ()
migrateUser :: (PGConstraints r, Member (Input ClientState) r, Member TinyLog r, Member Async r, Member (Error MigrationLockError) r, Member Race r, Member Resource r) => Prometheus.Counter -> UserId -> Sem r ()
migrateUser migCounter uid = do
withMigrationLocks LockExclusive (Seconds 10) [uid] $ do
statusses <- getRemoteMemberStatusFromCassandra uid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import Polysemy
import Polysemy.Async
import Polysemy.Conc.Effect.Race
import Polysemy.Error
import Polysemy.Resource (Resource)
import Polysemy.Time
import Polysemy.TinyLog
import Wire.DomainRegistrationStore
Expand All @@ -40,6 +41,7 @@ interpretDomainRegistrationStoreToCassandraAndPostgres ::
Member TinyLog r,
Member Async r,
Member Race r,
Member Resource r,
Member (Error MigrationLockError) r
) =>
ClientState ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import Polysemy.Conc (interpretRace)
import Polysemy.Conc.Effect.Race hiding (Timeout)
import Polysemy.Error
import Polysemy.Input
import Polysemy.Resource (Resource, resourceToIOFinal)
import Polysemy.State
import Polysemy.Time
import Polysemy.TinyLog
Expand All @@ -57,6 +58,7 @@ type EffectStack =
[ State Int,
Input ClientState,
Input Hasql.Pool,
Resource,
Async,
Race,
TinyLog,
Expand Down Expand Up @@ -91,6 +93,7 @@ interpreter cassClient pgPool logger name =
. raiseUnder
. interpretRace
. asyncToIOFinal
. resourceToIOFinal
. runInputConst pgPool
. runInputConst cassClient
. runState 0
Expand All @@ -102,7 +105,8 @@ migrateAllDomainRegistrations ::
Member TinyLog r,
Member (State Int) r,
Member Async r,
Member Race r
Member Race r,
Member Resource r
) =>
MigrationOptions ->
Prometheus.Counter ->
Expand All @@ -124,7 +128,8 @@ migrateDomainRegistrationRow ::
Member TinyLog r,
Member Async r,
Member (Error MigrationLockError) r,
Member Race r
Member Race r,
Member Resource r
) =>
Prometheus.Counter ->
StoredDomainRegistration ->
Expand Down
74 changes: 39 additions & 35 deletions libs/wire-subsystems/src/Wire/MigrationLock.hs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import Polysemy.Async
import Polysemy.Conc.Effect.Race
import Polysemy.Error
import Polysemy.Input
import Polysemy.Resource (Resource, bracket)
import Polysemy.Time.Data.TimeUnit
import Polysemy.TinyLog (TinyLog)
import Polysemy.TinyLog qualified as TinyLog
Expand Down Expand Up @@ -77,6 +78,7 @@ withMigrationLocks ::
Member Async r,
Member TinyLog r,
Member Race r,
Member Resource r,
Member (Error MigrationLockError) r,
TimeUnit u,
MigrationLockable x
Expand All @@ -87,42 +89,44 @@ withMigrationLocks ::
Sem r a ->
Sem r a
withMigrationLocks lockType maxWait lockables action = do
lockAcquired <- embed newEmptyMVar
actionCompleted <- embed newEmptyMVar

pool <- input
lockThread <- async . embed . Hasql.use pool $ do
let lockIds = fmap lockKey lockables
Session.statement lockIds acquireLocks

liftIO $ putMVar lockAcquired ()
liftIO $ takeMVar actionCompleted

Session.statement lockIds releaseLocks

void . timeout (cancel lockThread >> throw TimedOutAcquiringLock) maxWait $ embed (takeMVar lockAcquired)
res <- action
embed $ putMVar actionCompleted ()

mEithErr <- timeout (cancel lockThread) (Seconds 1) $ await lockThread
let logFirstLock =
case lockables of
[] -> id
(x : _) -> Log.field ("first_" <> lockScope @x) (lockKey x)
logError errorStr =
TinyLog.warn $
Log.msg (Log.val "Failed to cleanly unlock the migration locks")
. logFirstLock
. Log.field "numberOfLocks" (length lockables)
. Log.field "error" errorStr
case mEithErr of
Left () -> logError "timed out waiting for unlock"
Right (Nothing) -> logError "lock/unlock thread didn't finish"
Right (Just (Left e)) -> logError (show e)
Right (Just (Right ())) -> pure ()

pure res
bracket acquire release (const action)
where
acquire = do
lockAcquired <- embed newEmptyMVar
actionCompleted <- embed newEmptyMVar

pool <- input
lockThread <- async . embed . Hasql.use pool $ do
let lockIds = fmap lockKey lockables
Session.statement lockIds acquireLocks

liftIO $ putMVar lockAcquired ()
liftIO $ takeMVar actionCompleted

Session.statement lockIds releaseLocks

void . timeout (cancel lockThread >> throw TimedOutAcquiringLock) maxWait $ embed (takeMVar lockAcquired)
pure (actionCompleted, lockThread)

release (actionCompleted, lockThread) = do
let logFirstLock =
case lockables of
[] -> id
(x : _) -> Log.field ("first_" <> lockScope @x) (lockKey x)
logError errorStr =
TinyLog.warn $
Log.msg (Log.val "Failed to cleanly unlock the migration locks")
. logFirstLock
. Log.field "numberOfLocks" (length lockables)
. Log.field "error" errorStr
_ <- embed $ tryPutMVar actionCompleted ()
mEithErr <- timeout (cancel lockThread) (Seconds 1) $ await lockThread
case mEithErr of
Left () -> logError "timed out waiting for unlock"
Right (Nothing) -> logError "lock/unlock thread didn't finish"
Right (Just (Left e)) -> logError (show e)
Right (Just (Right ())) -> pure ()

acquireLocks :: Hasql.Statement [Int64] ()
acquireLocks =
lmapPG @(Vector _)
Expand Down
8 changes: 8 additions & 0 deletions libs/wire-subsystems/src/Wire/TeamFeatureStore/Migrating.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import Polysemy.Async
import Polysemy.Conc.Effect.Race
import Polysemy.Error
import Polysemy.Input
import Polysemy.Resource (Resource)
import Polysemy.Time
import Polysemy.TinyLog
import Wire.API.Team.Feature
Expand All @@ -46,6 +47,7 @@ interpretTeamFeatureStoreToCassandraAndPostgres ::
Member TinyLog r,
Member Async r,
Member Race r,
Member Resource r,
Member (Error MigrationLockError) r
) =>
Sem (TeamFeatureStore ': r) a ->
Expand All @@ -66,6 +68,7 @@ getDbFeatureImpl ::
Member Async r,
Member Race r,
Member (Input ClientState) r,
Member Resource r,
Member (Error MigrationLockError) r
) =>
FeatureSingleton cfg ->
Expand Down Expand Up @@ -105,6 +108,7 @@ setDbFeatureImpl ::
Member Async r,
Member Race r,
Member (Input ClientState) r,
Member Resource r,
Member (Error MigrationLockError) r
) =>
FeatureSingleton cfg ->
Expand All @@ -121,6 +125,7 @@ setFeatureLockStatusImpl ::
Member Async r,
Member Race r,
Member (Input ClientState) r,
Member Resource r,
Member (Error MigrationLockError) r
) =>
FeatureSingleton cfg ->
Expand All @@ -137,6 +142,7 @@ patchDbFeatureImpl ::
Member Async r,
Member Race r,
Member (Input ClientState) r,
Member Resource r,
Member (Error MigrationLockError) r
) =>
FeatureSingleton cfg ->
Expand All @@ -159,6 +165,7 @@ withWritePathUnderLock ::
Member Async r,
Member Race r,
Member (Input ClientState) r,
Member Resource r,
Member (Error MigrationLockError) r,
IsFeatureConfig cfg
) =>
Expand All @@ -184,6 +191,7 @@ withSharedLock ::
Member TinyLog r,
Member Async r,
Member Race r,
Member Resource r,
Member (Error MigrationLockError) r,
MigrationLockable x
) =>
Expand Down
9 changes: 7 additions & 2 deletions libs/wire-subsystems/src/Wire/TeamFeatureStore/Migration.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import Polysemy.Async
import Polysemy.Conc
import Polysemy.Error
import Polysemy.Input
import Polysemy.Resource (Resource, resourceToIOFinal)
import Polysemy.State
import Polysemy.Time
import Polysemy.TinyLog
Expand All @@ -50,7 +51,8 @@ migrateAllTeamFeatures ::
Member TinyLog r,
Member (State Int) r,
Member Async r,
Member Race r
Member Race r,
Member Resource r
) =>
MigrationOptions ->
Prometheus.Counter ->
Expand All @@ -65,6 +67,7 @@ type EffectStack =
[ State Int,
Input ClientState,
Input Hasql.Pool,
Resource,
Async,
Race,
TinyLog,
Expand Down Expand Up @@ -99,6 +102,7 @@ interpreter cassClient pgPool logger name =
. raiseUnder
. interpretRace
. asyncToIOFinal
. resourceToIOFinal
. runInputConst pgPool
. runInputConst cassClient
. runState 0
Expand All @@ -108,7 +112,8 @@ migrateTeamFeature ::
Member TinyLog r,
Member Async r,
Member (Error MigrationLockError) r,
Member Race r
Member Race r,
Member Resource r
) =>
Prometheus.Counter ->
(TeamId, Text, Maybe FeatureStatus, Maybe LockStatus, Maybe DbConfig) ->
Expand Down
3 changes: 3 additions & 0 deletions services/brig/src/Brig/CanonicalInterpreter.hs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import Polysemy.Embed (runEmbedded)
import Polysemy.Error (Error, errorToIOFinal, mapError, runError)
import Polysemy.Input (Input, runInputConst)
import Polysemy.Internal.Kind
import Polysemy.Resource
import Polysemy.TinyLog (TinyLog)
import Wire.API.Error (ErrorS, errorToWai)
import Wire.API.Error.Galley
Expand Down Expand Up @@ -275,6 +276,7 @@ type BrigLowerLevelEffects =
Embed IO,
Race,
Async,
Resource,
Concurrency 'Unsafe,
Final IO
]
Expand Down Expand Up @@ -408,6 +410,7 @@ runBrigToIO e (AppT ma) = do
( either throwM pure
<=< ( runFinal
. unsafelyPerformConcurrency
. resourceToIOFinal
. asyncToIOFinal
. interpretRace
. embedToFinal
Expand Down
Loading