-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstream_app.py
More file actions
36 lines (29 loc) · 1.05 KB
/
stream_app.py
File metadata and controls
36 lines (29 loc) · 1.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import os
from pyspark.sql import SparkSession
from pyot import connector, aggregate
# Initialize spark env
# spark = SparkSession.builder.master('local').getOrCreate()
spark = SparkSession.builder.appName('StructredStreamingApp').getOrCreate()
# Get Azure setting information from environmental variable
EVENTHUB_CONNECTION_STRING = os.getenv('EVENTHUB_CONNECTION_STRING')
EVENTHUB_CONSUMER_GROUP = os.getenv('EVENTHUB_CONSUMER_GROUP')
if type(EVENTHUB_CONSUMER_GROUP) is not str:
EVENTHUB_CONSUMER_GROUP = "$Default"
# Connect and get streaming data frame
raw_streaming_df = connector.create_eventhub_streamdf(
spark,
EVENTHUB_CONNECTION_STRING,
EVENTHUB_CONSUMER_GROUP
)
aggregated_df = aggregate.create_averaged_df(raw_streaming_df, 30, 60)
# Write data in the console
try:
stream = (aggregated_df
.writeStream
.outputMode('append')
.format('console')
.start()
)
stream.awaitTermination()
except KeyboardInterrupt:
print("Streaming is terminated by user input")