diff --git a/changelog.d/5-internal/WPB-25915 b/changelog.d/5-internal/WPB-25915 new file mode 100644 index 00000000000..e69de29bb2d diff --git a/docs/src/developer/reference/config-options.md b/docs/src/developer/reference/config-options.md index 4a269a7fe45..48d238ec239 100644 --- a/docs/src/developer/reference/config-options.md +++ b/docs/src/developer/reference/config-options.md @@ -1908,6 +1908,13 @@ time. For conversations, this is necessary for channel search and management of channels from the team-management UI. It is highly recommended to take a backup of the affected Cassandra data before triggering a migration. +When migrating conversations, `background-worker.config.migrateConversationsOptions.timeout` +should be configured as well. It sets a per-conversation upper bound for the +migration attempt, so a stuck conversation does not keep the migration run +blocked indefinitely. Start with a value that is comfortably above the normal +time for one conversation migration, then adjust it based on observed runtime +and the size of your dataset. + Migrations are independent and can be run separately, in batches, or all at once. This is expected, because migrations will be released over time. The pattern below applies per `postgresMigration` setting. A single setting may @@ -2082,6 +2089,13 @@ migrateConversationCodes: false migrateTeamFeatures: false migrateDomainRegistration: false +# conversation migration settings +migrateConversationsOptions: + pageSize: 10000 + parallelism: 2 + # (optional) migration timeout in seconds, applies to a single conversation + timeout: 60 + # Background jobs consumer backgroundJobs: concurrency: 8 # in-flight jobs per process @@ -2092,6 +2106,20 @@ backgroundJobs: federationDomain: example.org ``` +The optional `migrateConversationsOptions.timeout` setting limits how long a single +conversation migration attempt may run after it has acquired the migration +lock. The value is a plain number of seconds, so `60` means 1 minute. +If the timeout is exceeded, that conversation migration is aborted and the +whole migration run is treated as failed. + +Choose a value that is comfortably above the normal time for one conversation +migration, but still low enough to catch a genuinely stuck migration in a +reasonable time. A good starting point for most deployments is `60` seconds +(1 minute), then adjust based on observed migration durations. + +If the setting is omitted, no timeout will be enforced. If a conversation +migration stalls, this can lead to leaked exclusive advisory locks. + Secrets - Set `background-worker.secrets.pgPassword` to pass the PostgreSQL password. The chart mounts it to `/etc/wire/background-worker/secrets/pgPassword` and sets `postgresqlPassword` accordingly. diff --git a/libs/types-common/default.nix b/libs/types-common/default.nix index 63392d3321b..b16153fde72 100644 --- a/libs/types-common/default.nix +++ b/libs/types-common/default.nix @@ -36,6 +36,7 @@ , optparse-applicative , pem , polysemy +, polysemy-time , protobuf , QuickCheck , quickcheck-instances @@ -95,6 +96,7 @@ mkDerivation { optparse-applicative pem polysemy + polysemy-time protobuf QuickCheck quickcheck-instances diff --git a/libs/types-common/src/Util/Timeout.hs b/libs/types-common/src/Util/Timeout.hs index 6a5c4218fc1..e0c19e9336d 100644 --- a/libs/types-common/src/Util/Timeout.hs +++ b/libs/types-common/src/Util/Timeout.hs @@ -26,12 +26,13 @@ import Data.Aeson.Types import Data.Scientific import Data.Time.Clock import Imports +import Polysemy.Time import Test.QuickCheck (Arbitrary (arbitrary), choose) newtype Timeout = Timeout { timeoutDiff :: NominalDiffTime } - deriving newtype (Eq, Enum, Ord, Num, Real, Fractional, RealFrac, Show) + deriving newtype (Eq, Enum, Ord, Num, Real, Fractional, RealFrac, Show, TimeUnit) instance Arbitrary Timeout where arbitrary = Timeout . fromIntegral <$> choose (60 :: Int, 10 * 24 * 3600) diff --git a/libs/types-common/types-common.cabal b/libs/types-common/types-common.cabal index 8832be94802..31ef91e6d86 100644 --- a/libs/types-common/types-common.cabal +++ b/libs/types-common/types-common.cabal @@ -125,6 +125,7 @@ library , optparse-applicative >=0.10 , pem , polysemy + , polysemy-time , protobuf >=0.2 , QuickCheck >=2.9 , quickcheck-instances >=0.3.16 diff --git a/libs/wire-subsystems/src/Wire/ConversationStore/Migration.hs b/libs/wire-subsystems/src/Wire/ConversationStore/Migration.hs index 1ddee3759b6..c8f79aaa6fd 100644 --- a/libs/wire-subsystems/src/Wire/ConversationStore/Migration.hs +++ b/libs/wire-subsystems/src/Wire/ConversationStore/Migration.hs @@ -25,6 +25,7 @@ import Control.Error (lastMay) import Data.Conduit import Data.Conduit.List qualified as C import Data.Domain +import Data.IORef qualified as IORef import Data.Id import Data.IntMap qualified as IntMap import Data.Map qualified as Map @@ -46,12 +47,14 @@ import Polysemy.Async import Polysemy.Conc import Polysemy.Error import Polysemy.Input -import Polysemy.Resource (Resource, resourceToIOFinal) +import Polysemy.Resource (Resource, bracket, resourceToIOFinal) import Polysemy.State import Polysemy.Time import Polysemy.TinyLog import Prometheus qualified import System.Logger qualified as Log +import UnliftIO.Exception qualified as UnliftIO +import Util.Timeout import Wire.API.Conversation hiding (Member) import Wire.API.Conversation.CellsState import Wire.API.Conversation.Protocol @@ -101,15 +104,16 @@ migrateConvsLoop :: Prometheus.Counter -> Prometheus.Counter -> Prometheus.Counter -> + Prometheus.Vector Text Prometheus.Histogram -> IO () -migrateConvsLoop migOpts cassClient pgPool logger migCounter migFinished migFailed = +migrateConvsLoop migOpts cassClient pgPool logger migCounter migFinished migFailed migDuration = migrationLoop logger "conversations" migFinished migFailed (interpreter cassClient pgPool logger "conversations") - (migrateAllConversations migOpts migCounter) + (migrateAllConversations migOpts migCounter migDuration) migrateUsersLoop :: MigrationOptions -> @@ -157,12 +161,13 @@ migrateAllConversations :: ) => MigrationOptions -> Prometheus.Counter -> + Prometheus.Vector Text Prometheus.Histogram -> ConduitM () Void (Sem r) () -migrateAllConversations migOpts migCounter = do +migrateAllConversations migOpts migCounter migDuration = do lift $ info $ Log.msg (Log.val "migrateAllConversations") withCount (paginateSem select (paramsP LocalQuorum () migOpts.pageSize) x5) .| logRetrievedPage migOpts.pageSize runIdentity - .| C.mapM_ (unsafePooledMapConcurrentlyN_ migOpts.parallelism (handleErrors (migrateConversation migCounter) "conv")) + .| C.mapM_ (unsafePooledMapConcurrentlyN_ migOpts.parallelism (handleErrors (migrateConversationWithLock migOpts.timeout migCounter migDuration) "conv")) where select :: PrepQuery R () (Identity ConvId) select = "select conv from conversation" @@ -209,7 +214,7 @@ handleError action lockType id_ = do -- * Conversations -migrateConversation :: +migrateConversationWithLock :: ( PGConstraints r, Member (Input ClientState) r, Member TinyLog r, @@ -218,17 +223,61 @@ migrateConversation :: Member Race r, Member Resource r ) => + Maybe Timeout -> Prometheus.Counter -> + Prometheus.Vector Text Prometheus.Histogram -> ConvId -> Sem r () -migrateConversation migCounter cid = do - void . withMigrationLocks LockExclusive (Seconds 10) [cid] $ do - mConvData <- withCassandra $ getAllConvData cid - for_ mConvData $ \convData -> do - saveConvToPostgres convData - withCassandra $ deleteConv convData - markDeletionComplete DeleteConv cid - liftIO $ Prometheus.incCounter migCounter +migrateConversationWithLock mTimeout migCounter migDuration cid = do + outcomeRef <- liftIO $ IORef.newIORef @Text "error" + bracket + (liftIO getCurrentTime) + (observeDuration migDuration outcomeRef) + ( const do + result <- + runError $ + withMigrationLocks LockExclusive (Seconds 10) [cid] $ do + case mTimeout of + Just to -> do + timeoutResult <- Polysemy.Conc.timeout (to <$ handleTimeout to) to $ migrateConversation + case timeoutResult of + Left timedOutAfter -> do + markOutcome outcomeRef "timeout" + -- this aborts the whole migration process + liftIO . UnliftIO.throwIO $ MigrationTimedOut (idToText cid) timedOutAfter + Right () -> do + markOutcome outcomeRef "success" + Nothing -> do + migrateConversation + markOutcome outcomeRef "success" + + case result of + Left TimedOutAcquiringLock -> do + markOutcome outcomeRef "lock_timeout" + throw TimedOutAcquiringLock + Right () -> pure () + ) + where + migrateConversation = do + mConvData <- withCassandra $ getAllConvData cid + for_ mConvData $ \convData -> do + saveConvToPostgres convData + withCassandra $ deleteConv convData + markDeletionComplete DeleteConv cid + liftIO $ Prometheus.incCounter migCounter + + handleTimeout to = do + err $ + Log.msg (Log.val "conversation migrations timed out") + . Log.field "conv" (idToText cid) + . Log.field "timeout" (show to) + + markOutcome ref outcome = liftIO $ IORef.writeIORef ref outcome + + observeDuration metric outcomeRef start = do + outcome <- liftIO $ IORef.readIORef outcomeRef + end <- liftIO getCurrentTime + liftIO $ Prometheus.withLabel metric outcome (`Prometheus.observe` realToFrac (diffUTCTime end start)) deleteConvFromCassandra :: (Member (Input ClientState) r, Member TinyLog r, Member (Embed IO) r) => AllConvData -> Sem r () deleteConvFromCassandra allConvData = withCassandra $ do diff --git a/libs/wire-subsystems/src/Wire/Migration.hs b/libs/wire-subsystems/src/Wire/Migration.hs index d2ae1573e33..3d92945e2a6 100644 --- a/libs/wire-subsystems/src/Wire/Migration.hs +++ b/libs/wire-subsystems/src/Wire/Migration.hs @@ -34,15 +34,25 @@ import Polysemy.TinyLog import Prometheus qualified import System.Logger qualified as Log import UnliftIO qualified +import Util.Timeout (Timeout) import Wire.Util (embedClient) data MigrationOptions = MigrationOptions { pageSize :: Int32, - parallelism :: Int + parallelism :: Int, + -- optional timeout that applies to a single conversation and + -- limits how long a single conversation migration attempt may run + -- after it has acquired the migration lock + timeout :: Maybe Timeout } deriving (Show, Eq, Generic) deriving (FromJSON) via Generically MigrationOptions +data MigrationTimedOut = MigrationTimedOut Text Timeout + deriving stock (Show) + +instance Exception MigrationTimedOut + migrationLoop :: Log.Logger -> ByteString -> diff --git a/services/background-worker/src/Wire/BackgroundWorker.hs b/services/background-worker/src/Wire/BackgroundWorker.hs index 315bea5bd3b..d630d2b81a6 100644 --- a/services/background-worker/src/Wire/BackgroundWorker.hs +++ b/services/background-worker/src/Wire/BackgroundWorker.hs @@ -62,21 +62,21 @@ run opts galleyOpts = do then runAppT env $ withNamedLogger "migrate-conversation-codes" $ - Migrations.conversationCodes (MigrationOptions 1000 1) + Migrations.conversationCodes (MigrationOptions 1000 1 Nothing) else pure $ pure () cleanupTeamFeaturesMigration <- if opts.migrateTeamFeatures then runAppT env $ withNamedLogger "migrate-team-features" $ - Migrations.teamFeatures (MigrationOptions 1000 1) + Migrations.teamFeatures (MigrationOptions 1000 1 Nothing) else pure $ pure () cleanupDomainRegistrationMigration <- if opts.migrateDomainRegistration then runAppT env $ withNamedLogger "migrate-domain-registration" $ - Migrations.domainRegistration (MigrationOptions 1000 1) + Migrations.domainRegistration (MigrationOptions 1000 1 Nothing) else pure $ pure () cleanupJobs <- runAppT env $ diff --git a/services/background-worker/src/Wire/PostgresMigrations.hs b/services/background-worker/src/Wire/PostgresMigrations.hs index ea8212a9d35..fe531f30df2 100644 --- a/services/background-worker/src/Wire/PostgresMigrations.hs +++ b/services/background-worker/src/Wire/PostgresMigrations.hs @@ -39,11 +39,12 @@ conversations migOpts = do convMigCounter <- register $ counter $ Prometheus.Info "wire_local_convs_migrated_to_pg" "Number of local conversations migrated to Postgresql" convMigFinished <- register $ counter $ Prometheus.Info "wire_local_convs_migration_finished" "Whether the conversation migration to Postgresql is finished successfully" convMigFailed <- register $ counter $ Prometheus.Info "wire_local_convs_migration_failed" "Whether the conversation migration to Postgresql has failed" + convMigDuration <- register $ vector "outcome" $ histogram (Prometheus.Info "wire_local_convs_migration_duration_seconds" "Duration of local conversation migration attempts") defaultBuckets userMigCounter <- register $ counter $ Prometheus.Info "wire_user_remote_convs_migrated_to_pg" "Number of users whose remote conversation membership data is migrated to Postgresql" userMigFinished <- register $ counter $ Prometheus.Info "wire_user_remote_convs_migration_finished" "Whether the migration of remote conversation membership data to Postgresql is finished successfully" userMigFailed <- register $ counter $ Prometheus.Info "wire_user_remote_convs_migration_failed" "Whether the migration of remote conversation membership data to Postgresql has failed" - convLoop <- async . lift $ migrateConvsLoop migOpts cassClient pgPool logger convMigCounter convMigFinished convMigFailed + convLoop <- async . lift $ migrateConvsLoop migOpts cassClient pgPool logger convMigCounter convMigFinished convMigFailed convMigDuration userLoop <- async . lift $ migrateUsersLoop migOpts cassClient pgPool logger userMigCounter userMigFinished userMigFailed Log.info logger $ Log.msg (Log.val "started conversation migration")