1414
1515from grafoleancollector import Collector , send_results_to_grafolean
1616from dbutils import db , DB_PREFIX
17- from lookup import PROTOCOLS
17+ from lookup import PROTOCOLS , DIRECTION_INGRESS , DIRECTION_EGRESS
1818
1919logging .basicConfig (format = '%(asctime)s.%(msecs)03d | %(levelname)s | %(message)s' ,
2020 datefmt = '%Y-%m-%d %H:%M:%S' , level = logging .DEBUG )
2525log = logging .getLogger ("{}.{}" .format (__name__ , "base" ))
2626
2727
28+ def _get_last_used_seq (job_id ):
29+ with db .cursor () as c :
30+ c .execute (f'SELECT j.last_used_seq, r.ts FROM { DB_PREFIX } bot_jobs j, { DB_PREFIX } records r WHERE j.id = %s and j.last_used_seq == r.seq;' , (job_id ,))
31+ last_used_seq , ts = c .fetchone ()
32+ return last_used_seq , ts
33+
34+ def _get_current_max_seq ():
35+ with db .cursor () as c :
36+ c .execute (f"SELECT MAX(seq) FROM { DB_PREFIX } records;" )
37+ max_seq , = c .fetchone ()
38+ return max_seq
39+
40+ def _save_current_max_seq (job_id , seq ):
41+ with db .cursor () as c :
42+ c .execute (f"INSERT INTO { DB_PREFIX } bot_jobs (job_id, last_used_seq) VALUES (%s, %s) ON CONFLICT (job_id) DO UPDATE SET last_used_seq = %s;" , (job_id , seq , seq ))
43+
44+
45+ def get_entities ():
46+ requests .get ()
47+
2848class NetFlowBot (Collector ):
2949
3050 def jobs (self ):
@@ -45,6 +65,7 @@ def jobs(self):
4565 intervals = [60 ]
4666 job_params = {
4767 "job_id" : job_id ,
68+ "interval_slug" : '1min' ,
4869 "entity_info" : {
4970 "account_id" : 129104112 ,
5071 "entity_id" : 236477687 ,
@@ -58,104 +79,123 @@ def jobs(self):
5879 }
5980 yield job_id , intervals , NetFlowBot .perform_job , job_params
6081
61- job_id = '1h'
62- intervals = [3600 ]
63- job_params = {
64- "job_id" : job_id ,
65- "entity_info" : {
66- "account_id" : 129104112 ,
67- "entity_id" : 236477687 ,
68- "entity_type" : "device" ,
69- "details" : {
70- "ipv4" : "1.2.3.4"
71- },
72- },
73- "backend_url" : self .backend_url ,
74- "bot_token" : self .bot_token ,
75- }
76- yield job_id , intervals , NetFlowBot .perform_job , job_params
77-
78- job_id = '24h'
79- intervals = [3600 * 24 ]
80- job_params = {
81- "job_id" : job_id ,
82- "entity_info" : {
83- "account_id" : 129104112 ,
84- "entity_id" : 236477687 ,
85- "entity_type" : "device" ,
86- "details" : {
87- "ipv4" : "1.2.3.4"
88- },
89- },
90- "backend_url" : self .backend_url ,
91- "bot_token" : self .bot_token ,
92- }
93- yield job_id , intervals , NetFlowBot .perform_job , job_params
94-
82+ # job_id = '1h'
83+ # intervals = [3600]
84+ # job_params = {
85+ # "job_id": job_id,
86+ # "interval_slug": '1h',
87+ # "entity_info": {
88+ # "account_id": 129104112,
89+ # "entity_id": 236477687,
90+ # "entity_type": "device",
91+ # "details": {
92+ # "ipv4": "1.2.3.4"
93+ # },
94+ # },
95+ # "backend_url": self.backend_url,
96+ # "bot_token": self.bot_token,
97+ # }
98+ # yield job_id, intervals, NetFlowBot.perform_job, job_params
99+
100+ # job_id = '24h'
101+ # intervals = [3600 * 24]
102+ # job_params = {
103+ # "job_id": job_id,
104+ # "interval_slug": '24h',
105+ # "entity_info": {
106+ # "account_id": 129104112,
107+ # "entity_id": 236477687,
108+ # "entity_type": "device",
109+ # "details": {
110+ # "ipv4": "1.2.3.4"
111+ # },
112+ # },
113+ # "backend_url": self.backend_url,
114+ # "bot_token": self.bot_token,
115+ # }
116+ # yield job_id, intervals, NetFlowBot.perform_job, job_params
95117
96118
97119 # This method is called whenever the job needs to be done. It gets the parameters and performs fetching of data.
98120 @staticmethod
99121 def perform_job (* args , ** job_params ):
100- # {
101- # "DST_AS": 0,
102- # "SRC_AS": 0,
103- # "IN_PKTS": 1, # Incoming counter with length N x 8 bits for the number of packets associated with an IP Flow
104- # "SRC_TOS": 0,
105- # "DST_MASK": 0,
106- # "IN_BYTES": 52, # Incoming counter with length N x 8 bits for number of bytes associated with an IP Flow.
107- # "PROTOCOL": 6, # IP protocol
108- # "SRC_MASK": 25,
109- # "DIRECTION": 0, # Flow direction: 0 - ingress flow, 1 - egress flow
110- # "TCP_FLAGS": 20,
111- # "INPUT_SNMP": 17, # Input interface index
112- # "L4_DST_PORT": 443, # TCP/UDP destination port number
113- # "L4_SRC_PORT": 36458,
114- # "OUTPUT_SNMP": 3, # Output interface index
115- # "IPV4_DST_ADDR": "1.2.3.4",
116- # "IPV4_NEXT_HOP": 1385497089,
117- # "IPV4_SRC_ADDR": "4.3.2.1",
118- # "LAST_SWITCHED": 2222830592,
119- # "FIRST_SWITCHED": 2222830592,
120- # "FLOW_SAMPLER_ID": 0,
121- # "UNKNOWN_FIELD_TYPE": 0
122- # }
123- # https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html#wp9001622
122+ # \d netflow_flows
123+ # Column | Type | Description
124+ # ---------------+----------+-------------
125+ # record | integer | // FK -> netflow_records.seq (PK)
126+ # in_bytes | integer | number of bytes associated with an IP Flow
127+ # protocol | smallint | IP protocol (see lookup.py -> PROTOCOLS)
128+ # direction | smallint | flow direction: 0 - ingress flow, 1 - egress flow
129+ # l4_dst_port | integer | destination port
130+ # l4_src_port | integer | source port
131+ # input_snmp | smallint | input interface index
132+ # output_snmp | smallint | output interface index
133+ # ipv4_src_addr | text | source IP
134+ # ipv4_dst_addr | text | destination IP
135+ # ---------------+----------+-------------
124136
125137 job_id = job_params ["job_id" ]
138+ interval_slug = job_params ["interval_slug" ]
139+
140+ entity_id = entity_ip = interface_index = None
141+ entity_info = job_params .get ("entity_info" , None )
142+ if entity_info is not None :
143+ entity_id = entity_info ["entity_id" ]
144+ entity_ip = entity_info ["details" ]["ipv4" ]
145+ interface_index = entity_info .get ("interface_index" , None )
146+
147+
148+ last_used_seq , last_used_ts = _get_last_used_seq (job_id )
149+ max_seq = _get_current_max_seq ()
150+ _save_current_max_seq (job_id , max_seq )
151+
126152 values = []
127- entity_info = job_params ["entity_info" ]
128- minute_ago = datetime .now () - timedelta (minutes = 1 )
129-
130- if job_id == '1min' :
131- output_path_prefix = f'entity.{ entity_info ["entity_id" ]} .netflow.traffic_in'
132-
133- two_minutes_ago = minute_ago - timedelta (minutes = 1 )
134-
135- # Traffic in and out: (per interface)
136- values .extend (NetFlowBot .get_values_traffic_in (output_path_prefix , two_minutes_ago , minute_ago ))
137- values .extend (NetFlowBot .get_values_traffic_out (output_path_prefix , two_minutes_ago , minute_ago ))
138- # output_path_prefix = f'entity.{entity_info["entity_id"]}.netflow'
139- values .extend (NetFlowBot .get_top_N_IPs (output_path_prefix , two_minutes_ago , minute_ago , 18 , is_direction_in = True ))
140- values .extend (NetFlowBot .get_top_N_IPs (output_path_prefix , two_minutes_ago , minute_ago , 18 , is_direction_in = False ))
141- values .extend (NetFlowBot .get_top_N_protocols (output_path_prefix , two_minutes_ago , minute_ago , 18 , is_direction_in = True ))
142- values .extend (NetFlowBot .get_top_N_protocols (output_path_prefix , two_minutes_ago , minute_ago , 18 , is_direction_in = False ))
143-
144- # every hour, collect stats for the whole hour:
145- elif job_id == '1h' :
146- output_path_prefix_1hour = f'entity.{ entity_info ["entity_id" ]} .netflow.traffic.in.1hour'
147- hour_ago = minute_ago - timedelta (hours = 1 )
148- values .extend (NetFlowBot .get_top_N_IPs (output_path_prefix_1hour , hour_ago , minute_ago , 18 , is_direction_in = True ))
149- values .extend (NetFlowBot .get_top_N_IPs (output_path_prefix_1hour , hour_ago , minute_ago , 18 , is_direction_in = False ))
150-
151- # every 24h, also collect stats for the whole day:
152- elif job_id == '24h' :
153- output_path_prefix_1day = f'entity.{ entity_info ["entity_id" ]} .netflow.traffic.in.1day'
154- day_ago = minute_ago - timedelta (days = 1 )
155- values .extend (NetFlowBot .get_top_N_IPs (output_path_prefix_1day , day_ago , minute_ago , 18 , is_direction_in = True ))
156- values .extend (NetFlowBot .get_top_N_IPs (output_path_prefix_1day , day_ago , minute_ago , 18 , is_direction_in = False ))
157- values .extend (NetFlowBot .get_top_N_protocols (output_path_prefix_1day , day_ago , minute_ago , 18 , is_direction_in = True ))
158- values .extend (NetFlowBot .get_top_N_protocols (output_path_prefix_1day , day_ago , minute_ago , 18 , is_direction_in = False ))
153+ for direction in [DIRECTION_EGRESS , DIRECTION_INGRESS ]:
154+ values .extend (NetFlowBot .get_traffic_all_entities (interval_slug , last_used_seq , max_seq , direction = direction ))
155+
156+
157+ # values.extend(NetFlowBot.get_traffic(interval_slug, last_used_seq, max_seq, direction=DIRECTION_EGRESS, entity=entity_id, entity_ip=entity_ip, interface=interface_index))
158+ # values.extend(NetFlowBot.get_traffic(interval_slug, last_used_seq, max_seq, direction=DIRECTION_INGRESS, entity=entity_id, entity_ip=entity_ip, interface=interface_index))
159+
160+ # values.extend(NetFlowBot.get_top_protocols(interval_slug, last_used_seq, max_seq, direction=DIRECTION_EGRESS, entity=entity_id, entity_ip=entity_ip, interface=interface_index, n=15))
161+ # values.extend(NetFlowBot.get_top_protocols(interval_slug, last_used_seq, max_seq, direction=DIRECTION_INGRESS, entity=entity_id, entity_ip=entity_ip, interface=interface_index, n=15))
162+
163+ # values.extend(NetFlowBot.get_top_IPs(interval_slug, last_used_seq, max_seq, direction=DIRECTION_EGRESS, entity=entity_id, entity_ip=entity_ip, interface=interface_index, n=15))
164+ # values.extend(NetFlowBot.get_top_IPs(interval_slug, last_used_seq, max_seq, direction=DIRECTION_INGRESS, entity=entity_id, entity_ip=entity_ip, interface=interface_index, n=15))
165+
166+
167+ # protocol_str = 'TCP'
168+ # ipv4_dst_addr = '1.2.3.4'
169+ # ipv4_src_addr = '4.3.2.1'
170+ # # traffic on all devices, all interfaces, per ingress / egress:
171+ # f'netflow.{interval_slug}.egress'
172+ # f'netflow.{interval_slug}.ingress'
173+ # # traffic on all devices, all interfaces, per ingress / egress, for top X protocols:
174+ # f'netflow.{interval_slug}.egress.protocol.{protocol_str}'
175+ # f'netflow.{interval_slug}.ingress.protocol.{protocol_str}'
176+ # # traffic on all devices, all interfaces, per ingress / egress, for top X ips:
177+ # f'netflow.{interval_slug}.egress.ip.{ipv4_dst_addr}'
178+ # f'netflow.{interval_slug}.ingress.ip.{ipv4_src_addr}'
179+
180+ # # traffic on all interfaces, per device, per ingress / egress:
181+ # f'netflow.{interval_slug}.egress.entity.{entity_id}'
182+ # f'netflow.{interval_slug}.ingress.entity.{entity_id}'
183+ # # traffic on all interfaces, per device, per ingress / egress, for top X protocols:
184+ # f'netflow.{interval_slug}.egress.entity.{entity_id}.protocol.{protocol_str}'
185+ # f'netflow.{interval_slug}.ingress.entity.{entity_id}.protocol.{protocol_str}'
186+ # # traffic on all interfaces, per device, per ingress / egress, for top X ips:
187+ # f'netflow.{interval_slug}.egress.entity.{entity_id}.ip.{ipv4_dst_addr}'
188+ # f'netflow.{interval_slug}.ingress.entity.{entity_id}.ip.{ipv4_src_addr}'
189+
190+ # # traffic per interface, per device, per ingress / egress:
191+ # f'netflow.{interval_slug}.egress.entity.{entity_id}.if.{output_snmp}'
192+ # f'netflow.{interval_slug}.ingress.entity.{entity_id}.if.{input_snmp}'
193+ # # traffic per interface, per device, per ingress / egress, for top X protocols:
194+ # f'netflow.{interval_slug}.egress.entity.{entity_id}.if.{output_snmp}.protocol.{protocol_str}'
195+ # f'netflow.{interval_slug}.ingress.entity.{entity_id}.if.{input_snmp}.protocol.{protocol_str}'
196+ # # traffic per interface, per device, per ingress / egress, for top X ips:
197+ # f'netflow.{interval_slug}.egress.entity.{entity_id}.if.{output_snmp}.ip.{ipv4_dst_addr}'
198+ # f'netflow.{interval_slug}.ingress.entity.{entity_id}.if.{input_snmp}.ip.{ipv4_src_addr}'
159199
160200 if not values :
161201 log .warning ("No values found to be sent to Grafolean" )
@@ -170,7 +210,45 @@ def perform_job(*args, **job_params):
170210 )
171211
172212 @staticmethod
173- def get_values_traffic_in (output_path_prefix , from_time , to_time ):
213+ def construct_output_path_prefix (interval_slug , direction , entity , interface ):
214+ prefix = f"netflow.{ interval_slug } .{ 'ingress' if direction == DIRECTION_INGRESS else 'egress' } "
215+ if entity is None :
216+ return prefix
217+ prefix = f'{ prefix } .entity.{ entity } '
218+ if interface is None :
219+ return prefix
220+ prefix = f'{ prefix } .if.{ interface } '
221+ return prefix
222+
223+
224+ @staticmethod
225+ def get_traffic_all_entities (interval_slug , last_seq , max_seq , direction ):
226+ output_path = NetFlowBot .construct_output_path_prefix (interval_slug , direction , entity = None , interface = None )
227+ with db .cursor () as c :
228+ c .execute (f"""
229+ SELECT
230+ sum(f.in_bytes)
231+ FROM
232+ { DB_PREFIX } records "r",
233+ { DB_PREFIX } flows "f"
234+ WHERE
235+ r.seq > %s AND
236+ r.ts <= %s AND
237+ r.seq = f.record AND
238+ f.direction = %s
239+ """ , (last_seq , max_seq , direction ))
240+ values = []
241+ traffic_bytes , = c .fetchone ()
242+ values .append ({
243+ 'p' : output_path ,
244+ 'v' : traffic_bytes , # Bps
245+ })
246+ return values
247+
248+
249+ @staticmethod
250+ def get_traffic (interval_slug , last_seq , max_seq , direction , entity = None , interface = None ):
251+ output_path = NetFlowBot .construct_output_path_prefix (interval_slug , direction , entity , interface )
174252 with db .cursor () as c :
175253 # TODO: missing check for IP: r.client_ip = %s AND
176254 c .execute (f"""
@@ -197,6 +275,7 @@ def get_values_traffic_in(output_path_prefix, from_time, to_time):
197275 'v' : traffic_bytes / 60. , # Bps
198276 })
199277 return values
278+ return []
200279
201280 @staticmethod
202281 def get_values_traffic_out (output_path_prefix , from_time , to_time ):
0 commit comments