Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 45 additions & 8 deletions hasql-queue.cabal
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
cabal-version: 1.12

-- This file has been generated from package.yaml by hpack version 0.31.2.
-- This file has been generated from package.yaml by hpack version 0.34.4.
--
-- see: https://github.com/sol/hpack
--
-- hash: 956ae93525f9dafcc0c9c8149cd2bbc8cfcfe4e63310adec92ce40f995e4cbf4
-- hash: 30a78bb71c0fb6470ad0d6b6788b23f19801ab253d1c65e008a48e329e01b914

name: hasql-queue
version: 1.2.0.1
version: 1.3.0.0
synopsis: A PostgreSQL backed queue
description: A PostgreSQL backed queue. Please see README.md
category: Web
Expand All @@ -18,7 +18,8 @@ maintainer: jonathangfischoff@gmail.com
copyright: 2020 Jonathan Fischoff
license: BSD3
license-file: LICENSE
tested-with: GHC ==8.8.1
tested-with:
GHC ==8.8.1
build-type: Simple
extra-source-files:
README.md
Expand All @@ -42,7 +43,16 @@ library
Paths_hasql_queue
hs-source-dirs:
src
default-extensions: OverloadedStrings LambdaCase RecordWildCards TupleSections GeneralizedNewtypeDeriving QuasiQuotes ScopedTypeVariables TypeApplications AllowAmbiguousTypes
default-extensions:
OverloadedStrings
LambdaCase
RecordWildCards
TupleSections
GeneralizedNewtypeDeriving
QuasiQuotes
ScopedTypeVariables
TypeApplications
AllowAmbiguousTypes
ghc-options: -Wall -Wno-unused-do-bind -Wno-unused-foralls
build-depends:
aeson
Expand All @@ -67,7 +77,16 @@ executable benchmark
Paths_hasql_queue
hs-source-dirs:
benchmarks
default-extensions: OverloadedStrings LambdaCase RecordWildCards TupleSections GeneralizedNewtypeDeriving QuasiQuotes ScopedTypeVariables TypeApplications AllowAmbiguousTypes
default-extensions:
OverloadedStrings
LambdaCase
RecordWildCards
TupleSections
GeneralizedNewtypeDeriving
QuasiQuotes
ScopedTypeVariables
TypeApplications
AllowAmbiguousTypes
ghc-options: -Wall -Wno-unused-do-bind -Wno-unused-foralls -O2 -threaded -rtsopts -with-rtsopts=-N
build-depends:
aeson
Expand Down Expand Up @@ -98,7 +117,16 @@ executable hasql-queue-tmp-db
Paths_hasql_queue
hs-source-dirs:
hasql-queue-tmp-db
default-extensions: OverloadedStrings LambdaCase RecordWildCards TupleSections GeneralizedNewtypeDeriving QuasiQuotes ScopedTypeVariables TypeApplications AllowAmbiguousTypes
default-extensions:
OverloadedStrings
LambdaCase
RecordWildCards
TupleSections
GeneralizedNewtypeDeriving
QuasiQuotes
ScopedTypeVariables
TypeApplications
AllowAmbiguousTypes
ghc-options: -Wall -Wno-unused-do-bind -Wno-unused-foralls -O2 -threaded -rtsopts -with-rtsopts=-N -g2
build-depends:
aeson
Expand Down Expand Up @@ -137,7 +165,16 @@ test-suite unit-tests
Paths_hasql_queue
hs-source-dirs:
test
default-extensions: OverloadedStrings LambdaCase RecordWildCards TupleSections GeneralizedNewtypeDeriving QuasiQuotes ScopedTypeVariables TypeApplications AllowAmbiguousTypes
default-extensions:
OverloadedStrings
LambdaCase
RecordWildCards
TupleSections
GeneralizedNewtypeDeriving
QuasiQuotes
ScopedTypeVariables
TypeApplications
AllowAmbiguousTypes
ghc-options: -Wall -Wno-unused-do-bind -Wno-unused-foralls -O2 -threaded -rtsopts -with-rtsopts=-N
build-depends:
aeson
Expand Down
2 changes: 1 addition & 1 deletion package.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: hasql-queue
version: '1.2.0.2'
version: '1.3.0.0'
synopsis: A PostgreSQL backed queue
description: A PostgreSQL backed queue. Please see README.md
category: Web
Expand Down
23 changes: 3 additions & 20 deletions src/Hasql/Queue/High/ExactlyOnce.hs
Original file line number Diff line number Diff line change
Expand Up @@ -58,30 +58,13 @@ dequeue valueDecoder count
| count <= 0 = pure []
| otherwise = do
let multipleQuery = [here|
DELETE FROM payloads
WHERE id in
( SELECT p1.id
FROM payloads AS p1
WHERE p1.state='enqueued'
ORDER BY p1.modified_at ASC
FOR UPDATE SKIP LOCKED
LIMIT $1
)
RETURNING value
SELECT value FROM dequeue_payload($1)
|]

multipleEncoder = E.param $ E.nonNullable $ fromIntegral >$< E.int4

singleQuery = [here|
DELETE FROM payloads
WHERE id =
( SELECT p1.id
FROM payloads AS p1
WHERE p1.state='enqueued'
ORDER BY p1.modified_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING value
SELECT value FROM dequeue_payload(1)
|]

singleEncoder = mempty
Expand Down
42 changes: 8 additions & 34 deletions src/Hasql/Queue/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ newtype PayloadId = PayloadId { unPayloadId :: Int64 }
data Payload a = Payload
{ pId :: PayloadId
, pState :: State
-- TODO do I need this?
, pAttempts :: Int
, pModifiedAt :: Int
-- TODO rename. I don't need this either.
, pValue :: a
} deriving (Show, Eq)

Expand All @@ -75,8 +71,6 @@ payloadDecoder thePayloadDecoder
= Payload
<$> payloadIdRow
<*> D.column (D.nonNullable stateDecoder)
<*> D.column (D.nonNullable $ fromIntegral <$> D.int4)
<*> D.column (D.nonNullable $ fromIntegral <$> D.int4)
<*> D.column (D.nonNullable thePayloadDecoder)

payloadIdEncoder :: E.Value PayloadId
Expand All @@ -92,9 +86,7 @@ payloadIdRow = D.column (D.nonNullable payloadIdDecoder)
enqueuePayload :: E.Value a -> [a] -> Session [PayloadId]
enqueuePayload theEncoder values = do
let theQuery = [here|
INSERT INTO payloads (attempts, value)
SELECT 0, * FROM unnest($1)
RETURNING id
SELECT id FROM enqueue_payload($1)
|]
encoder = E.param $ E.nonNullable $ E.foldableArray $ E.nonNullable theEncoder
decoder = D.rowList (D.column (D.nonNullable payloadIdDecoder))
Expand All @@ -105,30 +97,15 @@ enqueuePayload theEncoder values = do
dequeuePayload :: D.Value a -> Int -> Session [Payload a]
dequeuePayload valueDecoder count = do
let multipleQuery = [here|
DELETE FROM payloads
WHERE id in
( SELECT p1.id
FROM payloads AS p1
WHERE p1.state='enqueued'
ORDER BY p1.modified_at ASC
FOR UPDATE SKIP LOCKED
LIMIT $1
)
RETURNING id, state, attempts, modified_at, value
SELECT id, state, value
FROM dequeue_payload($1)
|]

multipleEncoder = E.param $ E.nonNullable $ fromIntegral >$< E.int4

singleQuery = [here|
DELETE FROM payloads
WHERE id =
( SELECT p1.id
FROM payloads AS p1
WHERE p1.state='enqueued'
ORDER BY p1.modified_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING id, state, attempts, modified_at, value
SELECT id, state, value
FROM dequeue_payload(1)
|]

singleEncoder = mempty
Expand All @@ -144,7 +121,7 @@ dequeuePayload valueDecoder count = do
getPayload :: D.Value a -> PayloadId -> Session (Maybe (Payload a))
getPayload decoder payloadId = do
let theQuery = [here|
SELECT id, state, attempts, modified_at, value
SELECT id, state, value
FROM payloads
WHERE id = $1
|]
Expand All @@ -168,10 +145,7 @@ getCount = do
incrementAttempts :: Int -> [PayloadId] -> Session ()
incrementAttempts retryCount pids = do
let theQuery = [here|
UPDATE payloads
SET state=CASE WHEN attempts >= $1 THEN 'failed' :: state_t ELSE 'enqueued' END
, attempts=attempts+1
WHERE id = ANY($2)
SELECT increment_payload_attempts($1, $2)
|]
encoder = (fst >$< E.param (E.nonNullable E.int4)) <>
(snd >$< E.param (E.nonNullable $ E.foldableArray $ E.nonNullable payloadIdEncoder))
Expand Down
64 changes: 64 additions & 0 deletions src/Hasql/Queue/Migrate.hs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,36 @@ migrationQueryString valueType = [i|
CREATE INDEX IF NOT EXISTS active_modified_at_idx ON payloads USING btree (modified_at)
WHERE (state = 'enqueued');

CREATE OR REPLACE FUNCTION dequeue_payload(limit_ INT) RETURNS SETOF payloads AS
$$
WITH available AS
( SELECT p1.id
FROM payloads AS p1
WHERE p1.state='enqueued'
ORDER BY p1.modified_at ASC
FOR UPDATE SKIP LOCKED
LIMIT limit_
)
DELETE FROM payloads
USING available
WHERE payloads.id = available.id
RETURNING payloads.*
$$ LANGUAGE SQL VOLATILE;

CREATE OR REPLACE FUNCTION increment_payload_attempts(threshold_ INT, ids_ BIGINT[]) RETURNS VOID AS
$$
UPDATE payloads
SET state=CASE WHEN attempts >= threshold_ THEN 'failed' :: state_t ELSE 'enqueued' END
, attempts=attempts+1
WHERE id = ANY(ids_)
$$ LANGUAGE SQL VOLATILE;

CREATE OR REPLACE FUNCTION enqueue_payload(values_ ${valueType}[]) RETURNS SETOF payloads AS
$$
INSERT INTO payloads (attempts, value)
SELECT 0, * FROM unnest(values_)
RETURNING *
$$ LANGUAGE SQL VOLATILE;
|]

{-| This function creates a table and enumeration type that is
Expand Down Expand Up @@ -106,6 +136,37 @@ migrationQueryString valueType = [i|

CREATE INDEX IF NOT EXISTS active_modified_at_idx ON payloads USING btree (modified_at, state)
WHERE (state = 'enqueued');

CREATE OR REPLACE FUNCTION dequeue_payload(limit_ INT) RETURNS SETOF payloads AS
$$
WITH available AS
( SELECT p1.id
FROM payloads AS p1
WHERE p1.state='enqueued'
ORDER BY p1.modified_at ASC
FOR UPDATE SKIP LOCKED
LIMIT limit_
)
DELETE FROM payloads
USING available
WHERE payloads.id = available.id
RETURNING payloads.*
$$ LANGUAGE SQL VOLATILE;

CREATE OR REPLACE FUNCTION increment_payload_attempts(threshold_ INT, ids_ BIGINT[]) RETURNS VOID AS
$$
UPDATE payloads
SET state=CASE WHEN attempts >= threshold_ THEN 'failed' :: state_t ELSE 'enqueued' END
, attempts=attempts+1
WHERE id = ANY(ids_)
$$ LANGUAGE SQL VOLATILE;

CREATE OR REPLACE FUNCTION enqueue_payload(values_ ${valueType}[]) RETURNS SETOF payloads AS
$$
INSERT INTO payloads (attempts, value)
SELECT 0, * FROM unnest(values_)
RETURNING *
$$ LANGUAGE SQL VOLATILE;
@

The @VALUE_TYPE@ needs to passed in through the second argument.
Expand All @@ -123,6 +184,9 @@ Drop everything created by 'migrate'
teardown :: Connection -> IO ()
teardown conn = do
let theQuery = [i|
DROP FUNCTION IF EXISTS enqueue_payload;
DROP FUNCTION IF EXISTS dequeue_payload;
DROP FUNCTION IF EXISTS increment_payload_attempts;
DROP TABLE IF EXISTS payloads;
DROP TYPE IF EXISTS state_t;
DROP SEQUENCE IF EXISTS modified_index;
Expand Down
4 changes: 2 additions & 2 deletions test/Hasql/Queue/Low/AtLeastOnceSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ spec = describe "Hasql.Queue.Low.AtLeastOnce" $ aroundAll withSetup $ describe "
let Just decoded = mapM (decode . encode) xs
sort decoded `shouldBe` sort expected

it "enqueue returns a PayloadId that cooresponds to the entry it added" $ withConnection $ \conn -> do
it "enqueue returns a PayloadId that corresponds to the entry it added" $ withConnection $ \conn -> do
[payloadId] <- I.runThrow (I.enqueuePayload E.int4 [1]) conn
Just actual <- getPayload conn D.int4 payloadId

pValue actual `shouldBe` 1
pId actual `shouldBe` payloadId