-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconsumer.py
More file actions
60 lines (54 loc) · 2.18 KB
/
consumer.py
File metadata and controls
60 lines (54 loc) · 2.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
from kafka import KafkaConsumer
import json
from db_connection import insert_transaction_to_db
from time import sleep
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from datetime import datetime
# Initialize Kafka consumer
consumer = KafkaConsumer(
'transactions',
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
group_id='activity-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
def real_time_analysis(transaction):
"""Process the transaction to detect suspicious activity."""
if transaction['amount'] > 30_000:
print(f"""
Suspicious activity in transaction {transaction['transaction_id']}
with amount {transaction['amount']}
recorded in {transaction['timestamp']}""")
transaction['suspicious'] = 1
else:
transaction['suspicious'] = 0
return transaction
def plot_template():
plt.title('Real-Time Transaction Volume')
plt.xlabel('Timestamp')
plt.ylabel('Amount ($)')
plt.tick_params(axis='x', rotation=10) # Rotate x-axis labels for readability
plt.grid(True)
plot_template()
timestamps = []
amounts = []
def plot_recent(timestamps, amounts, recents_):
recent_stamps = timestamps[len(timestamps) - recents_:] if len(timestamps) >= recents_ else timestamps
recent_amounts = amounts[len(amounts) - recents_:] if len(amounts) >= recents_ else amounts
plt.cla()
plot_template()
return recent_stamps, recent_amounts
def update_plot(frame):
transaction = next(consumer).value
timestamps.append(transaction['timestamp'])
amounts.append(transaction['amount'])
recent_stamps , recent_amounts = plot_recent(timestamps,amounts,10)
processed_transaction = real_time_analysis(transaction)
insert_transaction_to_db(processed_transaction)
sleep(1) # To avoid overloading the system with rapid inserts
plt.plot(recent_stamps, recent_amounts, marker='o', linestyle='-', color='b')
if __name__ == "__main__":
ani = FuncAnimation(plt.gcf(), update_plot, frames=None,interval=100, cache_frame_data=False)
plt.tight_layout()
plt.show()