2525log = logging .getLogger ("{}.{}" .format (__name__ , "base" ))
2626
2727
28+ NETFLOW_AGGREGATION_INTERVALS = [
29+ ('1min' , 60 ),
30+ ('15min' , 15 * 60 ),
31+ ('1h' , 3600 ),
32+ ('4h' , 4 * 3600 ),
33+ ('24h' , 24 * 3600 ),
34+ ]
35+
36+
2837def _get_last_used_seq (job_id ):
2938 with get_db_cursor () as c :
3039 c .execute (f'SELECT j.last_used_seq, r.ts FROM { DB_PREFIX } bot_jobs j, { DB_PREFIX } records r WHERE j.job_id = %s AND j.last_used_seq = r.seq;' , (job_id ,))
@@ -48,36 +57,30 @@ def _save_current_max_seq(job_id, seq):
4857 c .execute (f"INSERT INTO { DB_PREFIX } bot_jobs (job_id, last_used_seq) VALUES (%s, %s) ON CONFLICT (job_id) DO UPDATE SET last_used_seq = %s;" , (job_id , seq , seq ))
4958
5059
51- # def get_entities():
52- # requests.get()
53-
5460class NetFlowBot (Collector ):
5561
5662 def jobs (self ):
63+ # first merge together entity infos so that those entities from the same account are together:
64+ accounts_infos = defaultdict (list )
5765 for entity_info in self .fetch_job_configs ('netflow' ):
58- # log.error(f'{repr(entity_info)}')
59- entity_id = entity_info ['entity_id' ]
60- entity_ip = entity_info ['details' ]['ipv4' ]
61- account_id = entity_info ['account_id' ]
62- for sensor_info in entity_info ["sensors" ]:
63- sensor_id = sensor_info ["sensor_id" ]
64- interval = sensor_info ["sensor_details" ]["aggregation_interval_s" ]
65- interval_label = sensor_info ["sensor_details" ]["interval_label" ]
66-
67- job_id = f'aggr/{ interval_label } /{ entity_id } /{ sensor_id } '
66+ accounts_infos [entity_info ["account_id" ]].append (entity_info )
67+
68+ for account_id , entities_infos in accounts_infos .items ():
69+ for interval_label , interval in NETFLOW_AGGREGATION_INTERVALS :
70+ job_id = f'aggr/{ interval_label } /{ account_id } '
6871 job_params = {
6972 "job_id" : job_id ,
7073 "interval_label" : interval_label ,
7174 "account_id" : account_id ,
72- "entity_id" : entity_id ,
73- "entity_ip" : entity_ip ,
75+ "entities_infos" : entities_infos ,
7476 "backend_url" : self .backend_url ,
7577 "bot_token" : self .bot_token ,
7678 }
77- yield job_id , [interval ], NetFlowBot .perform_entity_aggr_job , job_params
79+ yield job_id , [interval ], NetFlowBot .perform_account_aggr_job , job_params
80+
7881
7982 @staticmethod
80- def perform_entity_aggr_job (* args , ** job_params ):
83+ def perform_account_aggr_job (* args , ** job_params ):
8184 # \d netflow_flows
8285 # Column | Type | Description
8386 # ---------------+----------+-------------
@@ -95,27 +98,45 @@ def perform_entity_aggr_job(*args, **job_params):
9598
9699 job_id = job_params ["job_id" ]
97100 interval_label = job_params ["interval_label" ]
98-
99101 account_id = job_params ["account_id" ]
100- entity_id = job_params ["entity_id" ]
101- entity_ip = job_params ["entity_ip" ]
102+ entities_infos = job_params ["entities_infos" ]
102103
103104 last_used_seq , last_used_ts = _get_last_used_seq (job_id )
104105 max_seq , max_ts = _get_current_max_seq ()
105106 if max_seq is None or last_used_ts == max_ts :
106107 log .info (f"No netflow data found for job { job_id } , skipping." )
107108 return
108-
109109 _save_current_max_seq (job_id , max_seq )
110-
111110 if last_used_seq is None :
112111 log .info (f"Counter was not yet initialized for job { job_id } , skipping." )
113112 return
113+ time_between = float (max_ts - last_used_ts )
114114
115115 values = []
116- time_between = float (max_ts - last_used_ts )
117- values .extend (NetFlowBot .get_traffic_for_entity (interval_label , last_used_seq , max_seq , time_between , DIRECTION_EGRESS , entity_id , entity_ip ))
118- values .extend (NetFlowBot .get_traffic_for_entity (interval_label , last_used_seq , max_seq , time_between , DIRECTION_INGRESS , entity_id , entity_ip ))
116+ sum_traffic_egress = 0
117+ sum_traffic_ingress = 0
118+ for entity_info in entities_infos :
119+ entity_id = entity_info ["entity_id" ]
120+ entity_ip = entity_info ["details" ]["ipv4" ]
121+ v , s = NetFlowBot .get_traffic_for_entity (interval_label , last_used_seq , max_seq , time_between , DIRECTION_EGRESS , entity_id , entity_ip )
122+ values .extend (v )
123+ sum_traffic_egress += s
124+ v , s = NetFlowBot .get_traffic_for_entity (interval_label , last_used_seq , max_seq , time_between , DIRECTION_INGRESS , entity_id , entity_ip )
125+ values .extend (v )
126+ sum_traffic_ingress += s
127+
128+ # add two values for cumulative sum for the whole account:
129+ output_path = NetFlowBot .construct_output_path_prefix (interval_label , DIRECTION_EGRESS , entity_id = None , interface = None )
130+ values .append ({
131+ 'p' : output_path ,
132+ 'v' : sum_traffic_egress / time_between ,
133+ })
134+ output_path = NetFlowBot .construct_output_path_prefix (interval_label , DIRECTION_INGRESS , entity_id = None , interface = None )
135+ values .append ({
136+ 'p' : output_path ,
137+ 'v' : sum_traffic_ingress / time_between ,
138+ })
139+
119140
120141 if not values :
121142 log .warning ("No values found to be sent to Grafolean" )
@@ -212,32 +233,7 @@ def get_traffic_for_entity(interval_label, last_seq, max_seq, time_between, dire
212233 'p' : output_path ,
213234 'v' : sum_traffic / time_between ,
214235 })
215- return values
216-
217-
218- # @staticmethod
219- # def get_traffic_all_entities(interval_label, last_seq, max_seq, direction):
220- # output_path = NetFlowBot.construct_output_path_prefix(interval_label, direction, entity_id=None, interface=None)
221- # with get_db_cursor() as c:
222- # c.execute(f"""
223- # SELECT
224- # sum(f.in_bytes)
225- # FROM
226- # {DB_PREFIX}records "r",
227- # {DB_PREFIX}flows "f"
228- # WHERE
229- # r.seq > %s AND
230- # r.ts <= %s AND
231- # r.seq = f.record AND
232- # f.direction = %s
233- # """, (last_seq, max_seq, direction))
234- # values = []
235- # traffic_bytes, = c.fetchone()
236- # values.append({
237- # 'p': output_path,
238- # 'v': traffic_bytes, # Bps
239- # })
240- # return values
236+ return values , sum_traffic
241237
242238
243239 # @staticmethod
0 commit comments