Skip to content

Commit 090bfd2

Browse files
author
Jamil Maqdis Anton
committed
Rename Append.fs to AppendRaw.fs, same with Read.fs. Fix code accordingly.
1 parent acba815 commit 090bfd2

File tree

9 files changed

+198
-131
lines changed

9 files changed

+198
-131
lines changed

src/Append.fs

Lines changed: 14 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,20 @@
11
namespace SqlStreamStore.FSharp
22

3-
open SqlStreamStore.FSharp
43
open SqlStreamStore.Streams
54

65
module Append =
7-
let private stringIdToGuid: StreamMessageId -> System.Guid =
8-
function
9-
| StreamMessageId.Custom guid -> guid
10-
| StreamMessageId.Auto -> System.Guid.NewGuid()
6+
let appendNewMessage (store: SqlStreamStore.IStreamStore)
7+
(streamName: StreamName)
8+
(appendVersion: AppendVersion)
9+
(messageDetails: MessageDetails)
10+
: Async<Result<AppendResult, string>> =
11+
AppendRaw.appendNewMessage store streamName appendVersion messageDetails
12+
|> ExceptionHandler.simpleExceptionHandler
1113

12-
let private newStreamMessageFromMessageDetails: MessageDetails -> NewStreamMessage =
13-
fun msg ->
14-
match msg.jsonMetadata with
15-
| "" -> NewStreamMessage(stringIdToGuid msg.id, msg.type_, msg.jsonData)
16-
| metadata -> NewStreamMessage(stringIdToGuid msg.id, msg.type_, msg.jsonData, metadata)
17-
18-
let private fromAppendVersion: AppendVersion -> int =
19-
function
20-
| AppendVersion.Any -> ExpectedVersion.Any
21-
| AppendVersion.EmptyStream -> ExpectedVersion.EmptyStream
22-
| AppendVersion.NoStream -> ExpectedVersion.NoStream
23-
| AppendVersion.SpecificVersion version -> version
24-
25-
let appendNewMessage: SqlStreamStore.IStreamStore -> StreamName -> AppendVersion -> MessageDetails -> Async<AppendResult> =
26-
fun store streamName appendVersion messageDetails ->
27-
store.AppendToStream
28-
(StreamId(streamName),
29-
fromAppendVersion appendVersion,
30-
[| newStreamMessageFromMessageDetails messageDetails |])
31-
|> Async.AwaitTask
32-
33-
let appendNewMessages: SqlStreamStore.IStreamStore -> StreamName -> AppendVersion -> List<MessageDetails> -> Async<AppendResult> =
34-
fun store streamName appendVersion messages ->
35-
store.AppendToStream
36-
(StreamId(streamName),
37-
fromAppendVersion appendVersion,
38-
messages
39-
|> List.map newStreamMessageFromMessageDetails
40-
|> List.toArray)
41-
|> Async.AwaitTask
14+
let appendNewMessages (store: SqlStreamStore.IStreamStore)
15+
(streamName: StreamName)
16+
(appendVersion: AppendVersion)
17+
(messages: MessageDetails list)
18+
: Async<Result<AppendResult, string>> =
19+
AppendRaw.appendNewMessages store streamName appendVersion messages
20+
|> ExceptionHandler.simpleExceptionHandler

src/AppendExtras.fs

Lines changed: 0 additions & 26 deletions
This file was deleted.

src/AppendRaw.fs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
namespace SqlStreamStore.FSharp
2+
3+
open SqlStreamStore.FSharp
4+
open SqlStreamStore.Streams
5+
6+
module AppendRaw =
7+
let private stringIdToGuid: StreamMessageId -> System.Guid =
8+
function
9+
| StreamMessageId.Custom guid -> guid
10+
| StreamMessageId.Auto -> System.Guid.NewGuid()
11+
12+
let private newStreamMessageFromMessageDetails: MessageDetails -> NewStreamMessage =
13+
fun msg ->
14+
match msg.jsonMetadata with
15+
| "" -> NewStreamMessage(stringIdToGuid msg.id, msg.type_, msg.jsonData)
16+
| metadata -> NewStreamMessage(stringIdToGuid msg.id, msg.type_, msg.jsonData, metadata)
17+
18+
let private fromAppendVersion: AppendVersion -> int =
19+
function
20+
| AppendVersion.Any -> ExpectedVersion.Any
21+
| AppendVersion.EmptyStream -> ExpectedVersion.EmptyStream
22+
| AppendVersion.NoStream -> ExpectedVersion.NoStream
23+
| AppendVersion.SpecificVersion version -> version
24+
25+
let appendNewMessage: SqlStreamStore.IStreamStore -> StreamName -> AppendVersion -> MessageDetails -> Async<AppendResult> =
26+
fun store streamName appendVersion messageDetails ->
27+
store.AppendToStream
28+
(StreamId(streamName),
29+
fromAppendVersion appendVersion,
30+
[| newStreamMessageFromMessageDetails messageDetails |])
31+
|> Async.AwaitTask
32+
33+
let appendNewMessages: SqlStreamStore.IStreamStore -> StreamName -> AppendVersion -> List<MessageDetails> -> Async<AppendResult> =
34+
fun store streamName appendVersion messages ->
35+
store.AppendToStream
36+
(StreamId(streamName),
37+
fromAppendVersion appendVersion,
38+
messages
39+
|> List.map newStreamMessageFromMessageDetails
40+
|> List.toArray)
41+
|> Async.AwaitTask

src/Read.fs

Lines changed: 16 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,16 @@
1-
namespace SqlStreamStore.FSharp
2-
3-
open System.Threading
4-
open SqlStreamStore.Streams
5-
6-
module Read =
7-
let private fromReadVersion: ReadVersion -> int =
8-
function
9-
| ReadVersion.Start -> int (Position.Start)
10-
| ReadVersion.End -> int (Position.End)
11-
| ReadVersion.SpecificVersion version -> int (version)
12-
13-
let private fromStartPositionInclusive: StartPosition -> int64 =
14-
function
15-
| StartPosition.Start -> 0L
16-
| StartPosition.End -> -1L
17-
| StartPosition.SpecificPosition position -> position
18-
19-
let readFromAllStream: SqlStreamStore.IStreamStore -> ReadingDirection -> StartPosition -> MessageCount -> Async<ReadAllPage> =
20-
fun store readingDirection startPositionInclusive msgCount ->
21-
match readingDirection with
22-
| ReadingDirection.Forward ->
23-
store.ReadAllForwards(fromStartPositionInclusive startPositionInclusive, msgCount)
24-
| ReadingDirection.Backward ->
25-
store.ReadAllBackwards(fromStartPositionInclusive startPositionInclusive, msgCount)
26-
|> Async.AwaitTask
27-
28-
let readFromAllStream': SqlStreamStore.IStreamStore -> ReadingDirection -> StartPosition -> MessageCount -> bool -> CancellationToken -> Async<ReadAllPage> =
29-
fun store readingDirection startPositionInclusive msgCount prefetchJson cancellationToken ->
30-
match readingDirection with
31-
| ReadingDirection.Forward ->
32-
store.ReadAllForwards
33-
(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson, cancellationToken)
34-
| ReadingDirection.Backward ->
35-
store.ReadAllBackwards
36-
(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson, cancellationToken)
37-
|> Async.AwaitTask
38-
39-
let readFromStream: SqlStreamStore.IStreamStore -> ReadingDirection -> StreamName -> ReadVersion -> MessageCount -> Async<ReadStreamPage> =
40-
fun store readingDirection streamName readVersion msgCount ->
41-
match readingDirection with
42-
| ReadingDirection.Forward ->
43-
store.ReadStreamForwards(StreamId(streamName), fromReadVersion readVersion, msgCount)
44-
| ReadingDirection.Backward ->
45-
store.ReadStreamBackwards(StreamId(streamName), fromReadVersion readVersion, msgCount)
46-
|> Async.AwaitTask
47-
48-
let readFromStream': SqlStreamStore.IStreamStore -> ReadingDirection -> StreamName -> ReadVersion -> MessageCount -> bool -> CancellationToken -> Async<ReadStreamPage> =
49-
fun store readingDirection streamName readVersion msgCount prefetchJson cancellationToken ->
50-
match readingDirection with
51-
| ReadingDirection.Forward ->
52-
store.ReadStreamForwards
53-
(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson, cancellationToken)
54-
| ReadingDirection.Backward ->
55-
store.ReadStreamBackwards
56-
(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson, cancellationToken)
57-
|> Async.AwaitTask
1+
namespace SqlStreamStore.FSharp
2+
3+
open SqlStreamStore.Streams
4+
5+
module Read =
6+
let readFromAllStream (store: SqlStreamStore.IStreamStore)
7+
(readingDirection: ReadingDirection)
8+
(startPositionInclusive: StartPosition)
9+
(msgCount: MessageCount)
10+
: Async<Result<ReadAllPage, string>> =
11+
ReadRaw.readFromAllStream store readingDirection startPositionInclusive msgCount
12+
|> ExceptionHandler.simpleExceptionHandler
13+
14+
let readFromStream store readingDirection streamName readVersion msgCount =
15+
ReadRaw.readFromStream store readingDirection streamName readVersion msgCount
16+
|> ExceptionHandler.simpleExceptionHandler

src/ReadRaw.fs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
namespace SqlStreamStore.FSharp
2+
3+
open System.Threading
4+
open SqlStreamStore.Streams
5+
6+
module ReadRaw =
7+
let private fromReadVersion: ReadVersion -> int =
8+
function
9+
| ReadVersion.Start -> int (Position.Start)
10+
| ReadVersion.End -> int (Position.End)
11+
| ReadVersion.SpecificVersion version -> int (version)
12+
13+
let private fromStartPositionInclusive: StartPosition -> int64 =
14+
function
15+
| StartPosition.Start -> 0L
16+
| StartPosition.End -> -1L
17+
| StartPosition.SpecificPosition position -> position
18+
19+
let readFromAllStream: SqlStreamStore.IStreamStore -> ReadingDirection -> StartPosition -> MessageCount -> Async<ReadAllPage> =
20+
fun store readingDirection startPositionInclusive msgCount ->
21+
match readingDirection with
22+
| ReadingDirection.Forward ->
23+
store.ReadAllForwards(fromStartPositionInclusive startPositionInclusive, msgCount)
24+
| ReadingDirection.Backward ->
25+
store.ReadAllBackwards(fromStartPositionInclusive startPositionInclusive, msgCount)
26+
|> Async.AwaitTask
27+
28+
let readFromAllStream': SqlStreamStore.IStreamStore -> ReadingDirection -> StartPosition -> MessageCount -> bool -> CancellationToken -> Async<ReadAllPage> =
29+
fun store readingDirection startPositionInclusive msgCount prefetchJson cancellationToken ->
30+
match readingDirection with
31+
| ReadingDirection.Forward ->
32+
store.ReadAllForwards
33+
(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson, cancellationToken)
34+
| ReadingDirection.Backward ->
35+
store.ReadAllBackwards
36+
(fromStartPositionInclusive startPositionInclusive, msgCount, prefetchJson, cancellationToken)
37+
|> Async.AwaitTask
38+
39+
let readFromStream: SqlStreamStore.IStreamStore -> ReadingDirection -> StreamName -> ReadVersion -> MessageCount -> Async<ReadStreamPage> =
40+
fun store readingDirection streamName readVersion msgCount ->
41+
match readingDirection with
42+
| ReadingDirection.Forward ->
43+
store.ReadStreamForwards(StreamId(streamName), fromReadVersion readVersion, msgCount)
44+
| ReadingDirection.Backward ->
45+
store.ReadStreamBackwards(StreamId(streamName), fromReadVersion readVersion, msgCount)
46+
|> Async.AwaitTask
47+
48+
let readFromStream': SqlStreamStore.IStreamStore -> ReadingDirection -> StreamName -> ReadVersion -> MessageCount -> bool -> CancellationToken -> Async<ReadStreamPage> =
49+
fun store readingDirection streamName readVersion msgCount prefetchJson cancellationToken ->
50+
match readingDirection with
51+
| ReadingDirection.Forward ->
52+
store.ReadStreamForwards
53+
(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson, cancellationToken)
54+
| ReadingDirection.Backward ->
55+
store.ReadStreamBackwards
56+
(StreamId(streamName), fromReadVersion readVersion, msgCount, prefetchJson, cancellationToken)
57+
|> Async.AwaitTask

src/SqlStreamStore.FSharp.fsproj

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,14 @@
1111
</PropertyGroup>
1212

1313
<ItemGroup>
14+
<Compile Include="AsyncExtras.fs" />
15+
<Compile Include="ExceptionHandler.fs" />
1416
<Compile Include="Types.fs" />
17+
<Compile Include="AppendRaw.fs" />
18+
<Compile Include="ReadRaw.fs" />
19+
<Compile Include="Postgres.fs" />
1520
<Compile Include="Append.fs" />
1621
<Compile Include="Read.fs" />
17-
<Compile Include="Postgres.fs" />
1822
</ItemGroup>
1923

2024
<ItemGroup>

tests/AppendExtrasTests.fs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
module SqlStreamStore.FSharp.Tests.AppendExtrasTests
2+
3+
open Expecto
4+
5+
open SqlStreamStore.FSharp
6+
7+
8+
[<Tests>]
9+
let tests =
10+
testList
11+
"AppendExtras Tests"
12+
[ testAsync "Should append one message to stream." {
13+
let inMemStore = new SqlStreamStore.InMemoryStreamStore()
14+
15+
let streamName = "test"
16+
17+
let appendVersion = AppendVersion.NoStream
18+
19+
let msg =
20+
{ id = StreamMessageId.Auto
21+
type_ = "testing"
22+
jsonData = "{}"
23+
jsonMetadata = "{}" }
24+
25+
let! appendResult = AppendRaw.appendNewMessage inMemStore streamName appendVersion msg
26+
27+
Expect.equal appendResult.CurrentVersion 0 "Error: message version doesn't match."
28+
}
29+
30+
testAsync "Should append a list of messages to stream." {
31+
let inMemStore = new SqlStreamStore.InMemoryStreamStore()
32+
33+
let streamName = "test"
34+
35+
let appendVersion = AppendVersion.NoStream
36+
37+
let msg1 =
38+
{ id = StreamMessageId.Auto
39+
type_ = "testing"
40+
jsonData = "{}"
41+
jsonMetadata = "{}" }
42+
43+
let msg2 =
44+
{ id = StreamMessageId.Auto
45+
type_ = "testing"
46+
jsonData = "{}"
47+
jsonMetadata = "{}" }
48+
49+
let msgList = [ msg1; msg2 ]
50+
51+
let! appendResult = AppendRaw.appendNewMessages inMemStore streamName appendVersion msgList
52+
Expect.equal appendResult.CurrentVersion 1 "Error: message version doesn't match."
53+
} ]

tests/AppendTests.fs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ let tests =
2121
jsonData = "{}"
2222
jsonMetadata = "{}" }
2323

24-
let! appendResult = Append.appendNewMessage inMemStore streamName appendVersion msg
24+
let! appendResult = AppendRaw.appendNewMessage inMemStore streamName appendVersion msg
2525

2626
Expect.equal appendResult.CurrentVersion 0 "Error: message version doesn't match."
2727
}
@@ -47,6 +47,6 @@ let tests =
4747

4848
let msgList = [ msg1; msg2 ]
4949

50-
let! appendResult = Append.appendNewMessages inMemStore streamName appendVersion msgList
50+
let! appendResult = AppendRaw.appendNewMessages inMemStore streamName appendVersion msgList
5151
Expect.equal appendResult.CurrentVersion 1 "Error: message version doesn't match."
5252
} ]

0 commit comments

Comments
 (0)