-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathStreaming.fs
More file actions
111 lines (87 loc) · 2.96 KB
/
Streaming.fs
File metadata and controls
111 lines (87 loc) · 2.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
module ImageProcessing.Streaming
open ImageProcessing.ImageProcessing
let listAllFiles dir =
let files = System.IO.Directory.GetFiles dir
List.ofArray files
type msg =
| Img of Image
| EOS of AsyncReplyChannel<unit>
let imgSaver saveImage =
MailboxProcessor.Start(fun inbox ->
let rec loop () = async {
let! msg = inbox.Receive()
match msg with
| EOS ch ->
printfn "Image saver is finished!"
ch.Reply()
| Img img ->
printfn $"Save: %A{img.Name}"
saveImage img
return! loop ()
}
loop ()
)
let imgProcessor filterApplicator (imgSaver: MailboxProcessor<_>) =
let filter = filterApplicator
MailboxProcessor.Start(fun inbox ->
let rec loop cnt = async {
let! msg = inbox.Receive()
match msg with
| EOS ch ->
printfn "Image processor is ready to finish!"
imgSaver.PostAndReply EOS
printfn "Image processor is finished!"
ch.Reply()
| Img img ->
printfn $"Filter: %A{img.Name}"
let filtered = filter img
imgSaver.Post(Img filtered)
return! loop (not cnt)
}
loop true
)
let processAllFiles inDir outDir filterApplicators =
let mutable cnt = 0
let outFile (imgName: string) = System.IO.Path.Combine(outDir, imgName)
let saveImageToDir (img:Image) = saveImage img (outFile img.Name)
let imgProcessors =
filterApplicators
|> List.map (fun x ->
let imgSaver = imgSaver saveImageToDir
imgProcessor x imgSaver
)
|> Array.ofList
let filesToProcess = listAllFiles inDir
while cnt < List.length filesToProcess do
let p = (imgProcessors |> Array.minBy (fun p -> p.CurrentQueueLength))
if p.CurrentQueueLength < 3
then
p.Post (Img(loadAsImage (filesToProcess[cnt])))
cnt <- cnt + 1
(*
for file in filesToProcess do
(imgProcessors
|> Array.minBy (fun p -> p.CurrentQueueLength))
.Post(Img(loadAsImage file))
*)
for imgProcessor in imgProcessors do
imgProcessor.PostAndReply EOS
let processAllLoadedFiles inImages filterApplicators =
let result = new ResizeArray<_>()
let mutable cnt = 0
let saveImageToArr (img:Image) = result.Add img
let imgProcessors =
filterApplicators
|> List.map (fun x ->
let imgSaver = imgSaver saveImageToArr
imgProcessor x imgSaver
)
|> Array.ofList
while cnt < List.length inImages do
let p = (imgProcessors |> Array.minBy (fun p -> p.CurrentQueueLength))
if p.CurrentQueueLength < 3
then
p.Post inImages[cnt]
cnt <- cnt + 1
for imgProcessor in imgProcessors do
imgProcessor.PostAndReply EOS