-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path03_document_processing.py
More file actions
172 lines (149 loc) · 4.55 KB
/
03_document_processing.py
File metadata and controls
172 lines (149 loc) · 4.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
"""Document Processing Workflow - Parallel execution (fan-out/fan-in) example.
This example demonstrates:
- Declarative activity definition with parallel execution
- Multiple dependencies for aggregation (fan-in)
- Referencing outputs from multiple upstream activities
- Echo activity to simulate processing
- Email notification with aggregated results
Workflow DAG:
fetch_doc1 ──→ process_doc1 ──┐
fetch_doc2 ──→ process_doc2 ──┼──→ aggregate_results ──→ store_summary
fetch_doc3 ──→ process_doc3 ──┘
"""
from textwrap import dedent
from kruxiaflow import Activity, Workflow, workflow
# === PARALLEL FETCH (Fan-Out) ===
# These three activities have no dependencies and execute in parallel
fetch_doc1 = Activity(
key="fetch_doc1",
worker="std",
activity_name="http_request",
parameters={
"method": "GET",
"url": "https://httpbin.org/get?doc=1&title=Introduction",
},
outputs=["response"], # String auto-converted to ActivityOutputDefinition
)
fetch_doc2 = Activity(
key="fetch_doc2",
worker="std",
activity_name="http_request",
parameters={
"method": "GET",
"url": "https://httpbin.org/get?doc=2&title=Architecture",
},
outputs=["response"], # String auto-converted to ActivityOutputDefinition
)
fetch_doc3 = Activity(
key="fetch_doc3",
worker="std",
activity_name="http_request",
parameters={
"method": "GET",
"url": "https://httpbin.org/get?doc=3&title=Conclusion",
},
outputs=["response"], # String auto-converted to ActivityOutputDefinition
)
# === PARALLEL PROCESS (Fan-Out) ===
# Each process activity depends on its corresponding fetch activity.
# Echo passes through the fetched data, simulating a processing step.
process_doc1 = Activity(
key="process_doc1",
worker="std",
activity_name="echo",
parameters={
"doc": 1,
"title": fetch_doc1["response.body.args.title"],
"url": fetch_doc1["response.body.url"],
},
depends_on=["fetch_doc1"],
)
process_doc2 = Activity(
key="process_doc2",
worker="std",
activity_name="echo",
parameters={
"doc": 2,
"title": fetch_doc2["response.body.args.title"],
"url": fetch_doc2["response.body.url"],
},
depends_on=["fetch_doc2"],
)
process_doc3 = Activity(
key="process_doc3",
worker="std",
activity_name="echo",
parameters={
"doc": 3,
"title": fetch_doc3["response.body.args.title"],
"url": fetch_doc3["response.body.url"],
},
depends_on=["fetch_doc3"],
)
# === FAN-IN AGGREGATION ===
# This activity waits for ALL three process activities to complete
aggregate_results = Activity(
key="aggregate_results",
worker="std",
activity_name="echo",
parameters={
"workflow_id": workflow.id,
"doc1": process_doc1["echo.title"],
"doc2": process_doc2["echo.title"],
"doc3": process_doc3["echo.title"],
"documents_processed": 3,
},
depends_on=["process_doc1", "process_doc2", "process_doc3"],
)
# === FINAL NOTIFICATION ===
# Send the summary via Mailpit
store_summary = Activity(
key="store_summary",
worker="std",
activity_name="http_request",
parameters={
"method": "POST",
"url": "http://mailpit:8025/api/v1/send",
"headers": {"Content-Type": "application/json"},
"body": {
"From": {
"Name": "Kruxia Flow",
"Email": "workflow@kruxiaflow.local",
},
"To": [
{
"Name": "Document Consumer",
"Email": "docs@example.com",
}
],
"Subject": f"Document Processing Summary - {workflow.id}",
"Text": dedent(f"""\
Documents Processed: {aggregate_results["echo.documents_processed"]}
Doc 1: {aggregate_results["echo.doc1"]}
Doc 2: {aggregate_results["echo.doc2"]}
Doc 3: {aggregate_results["echo.doc3"]}
"""),
},
},
depends_on=["aggregate_results"],
)
# Build the workflow
document_workflow = Workflow(
name="process_documents",
activities=[
# Parallel fetch
fetch_doc1,
fetch_doc2,
fetch_doc3,
# Parallel process
process_doc1,
process_doc2,
process_doc3,
# Aggregation and storage
aggregate_results,
store_summary,
],
)
if __name__ == "__main__":
# Print the compiled YAML to verify
print(document_workflow)