forked from cradlepoint/sdk-samples
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcp.py
More file actions
9084 lines (7735 loc) · 358 KB
/
cp.py
File metadata and controls
9084 lines (7735 loc) · 358 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
NCOS communication module for SDK applications.
This module provides a comprehensive interface for communicating with NCOS (Network
Control Operating System) routers. It includes classes and functions for:
- Direct communication with router configuration store
- Event-driven programming with config store events
- Status monitoring and retrieval
- Device control and management
- Network configuration and monitoring
- GPS and location services
- Certificate management
- Firewall and security management
The module supports both local execution on NCOS devices and remote execution
from development machines using HTTP API calls.
Copyright (c) 2025 Ericsson Enterprise Wireless Solutions <www.cradlepoint.com>.
All rights reserved.
This file contains confidential information of Ericsson Enterprise Wireless Solutions
and your use of this file is subject to the Ericsson Enterprise Wireless Solutions
Software License Agreement distributed with this file. Unauthorized reproduction
or distribution of this file is subject to civil and criminal penalties.
"""
import json
import os
import re
import select
import socket
import threading
import logging.handlers
import signal
import sys
import time
import configparser
import urllib.request
import urllib.parse
import traceback
import requests
import base64
import datetime
import random
import string
import http.server
import socketserver
import mimetypes
from typing import Any, Dict, List, Optional, Tuple, Union, Callable, Literal
from http import HTTPStatus
from datetime import datetime, timedelta
from enum import Enum
class SdkCSException(Exception):
"""Custom exception for SDK communication errors.
This exception is raised when errors occur during communication
with the NCOS configuration store or when SDK operations fail.
"""
pass
class CSClient(object):
"""NCOS SDK mechanism for communication between apps and the router tree/config store.
The CSClient class provides the primary interface for communicating with NCOS routers.
Instances of this class communicate with the router using either socket connections
(for local execution) or HTTP method calls (for remote execution).
Apps running locally on the router use a Unix domain socket to send commands from
the app to the router tree and to receive data (JSON) from the router tree.
Apps running remotely use the requests library to send HTTP method calls to the
router and to receive data from the router tree. This allows developers to use
an IDE to run and debug the application on a computer, though with limitations
regarding device hardware access (e.g., serial, USB, etc.).
Attributes:
app_name (str): The name of the application using this client.
ncos (bool): Whether the client is running on an NCOS device.
logger (logging.Logger): Logger instance for the application.
"""
END_OF_HEADER = b"\r\n\r\n"
STATUS_HEADER_RE = re.compile(rb"status: \w*")
CONTENT_LENGTH_HEADER_RE = re.compile(rb"content-length: \w*")
MAX_PACKET_SIZE = 8192
RECV_TIMEOUT = 2.0
_instances = {}
@classmethod
def is_initialized(cls) -> bool:
"""Check if the singleton instance has been created.
Returns:
bool: True if the singleton instance exists, False otherwise.
"""
return cls in cls._instances
def __new__(cls, *na: Any, **kwna: Any) -> 'CSClient':
"""Create or return the singleton instance with subclassing support.
Args:
*na: Variable length argument list (not used).
**kwna: Arbitrary keyword arguments (not used).
Returns:
CSClient: The singleton instance of the class.
"""
if not cls.is_initialized():
cls._instances[cls] = super().__new__(cls)
return cls._instances[cls]
def __init__(self, app_name: str, init: bool = False, enable_logging: bool = False, ncos: bool = False) -> None:
"""Initialize the CSClient instance.
Args:
app_name (str): The name of the application using this client.
init (bool): Flag to perform full initialization. If False, only
the singleton instance is returned without initialization.
enable_logging (bool): Whether to enable logging. Defaults to False.
ncos (bool): Whether running on NCOS. Defaults to False.
"""
# Always set basic attributes to prevent AttributeError
self.app_name = app_name
self.enable_logging = enable_logging
self.ncos = ncos
# Cache device access credentials to avoid reading config file on every API call
self._cached_device_ip = None
self._cached_username = None
self._cached_password = None
self._cached_auth = None
# Initialize logger
self.logger = None
# Only perform full initialization if requested
if not init:
return
if self.ncos and self.enable_logging:
handlers = [logging.StreamHandler()]
handlers.append(logging.handlers.SysLogHandler(address='/dev/log'))
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(name)s: %(message)s', datefmt='%b %d %H:%M:%S',
handlers=handlers)
self.logger = logging.getLogger(app_name)
# Disable urllib3 connection pool logging to reduce noise
logging.getLogger('urllib3.connectionpool').setLevel(logging.WARNING)
def get(self, base: str, query: str = '', tree: int = 0) -> Optional[Dict[str, Any]]:
"""Construct and send a GET request to retrieve specified data from a device.
The behavior of this method is contextual:
- If the app is installed on (and executed from) a device, it directly
queries the router tree to retrieve the specified data.
- If the app is running remotely from a computer, it calls the HTTP GET
method to retrieve the specified data.
Args:
base (str): String representing a path to a resource on a router tree
(e.g., '/config/system/logging/level').
query (str): Optional query string for the request. Defaults to empty string.
tree (int): Optional tree identifier. Defaults to 0.
Returns:
dict or None: A dictionary containing the response data
(e.g., {"success": True, "data": {}}), or None if the
request fails.
"""
if self.ncos:
cmd = "get\n{}\n{}\n{}\n".format(base, query, tree)
return self._dispatch(cmd).get('data')
else:
# Running in a computer so use http to send the get to the device.
device_ip, username, password = self._get_cached_credentials()
device_api = 'http://{}/api/{}/{}'.format(device_ip, base, query)
try:
response = requests.get(device_api, auth=self._get_cached_auth())
except (requests.exceptions.Timeout,
requests.exceptions.ConnectionError):
self.log("Timeout: device at {} did not respond.".format(device_ip))
return None
return json.loads(response.text).get('data')
def decrypt(self, base: str, query: str = '', tree: int = 0) -> Optional[Dict[str, Any]]:
"""Construct and send a decrypt/GET request to retrieve encrypted data from a device.
The behavior of this method is contextual:
- If the app is installed on (and executed from) a device, it directly
queries the router tree to retrieve and decrypt the specified data.
- If the app is running remotely from a computer, it calls the HTTP GET
method to retrieve the specified data.
Args:
base (str): String representing a path to a resource on a router tree
(e.g., '/config/system/logging/level').
query (str): Optional query string for the request. Defaults to empty string.
tree (int): Optional tree identifier. Defaults to 0.
Returns:
dict or None: A dictionary containing the decrypted response data
(e.g., {"success": True, "data": {}}), or None if the
request fails or if running remotely.
"""
if self.ncos:
cmd = "decrypt\n{}\n{}\n{}\n".format(base, query, tree)
return self._dispatch(cmd).get('data')
else:
# Running in a computer and can't actually send the alert.
self.log('Decrypt is only available when running the app in NCOS.')
def put(self, base: str, value: Any = '', query: str = '', tree: int = 0) -> Optional[Dict[str, Any]]:
"""Construct and send a PUT request to update or add specified data to the device router tree.
The behavior of this method is contextual:
- If the app is installed on (and executed from) a device, it directly
updates or adds the specified data to the router tree.
- If the app is running remotely from a computer, it calls the HTTP PUT
method to update or add the specified data.
Args:
base (str): String representing a path to a resource on a router tree
(e.g., '/config/system/logging/level').
value (Any): The value to set at the specified path. Will be JSON serialized.
Defaults to empty string.
query (str): Optional query string for the request. Defaults to empty string.
tree (int): Optional tree identifier. Defaults to 0.
Returns:
dict or None: A dictionary containing the response data
(e.g., {"success": True, "data": {}}), or None if the
request fails.
"""
value = json.dumps(value)
if self.ncos:
cmd = "put\n{}\n{}\n{}\n{}\n".format(base, query, tree, value)
return self._dispatch(cmd)
else:
# Running in a computer so use http to send the put to the device.
device_ip, username, password = self._get_cached_credentials()
device_api = 'http://{}/api/{}/{}'.format(device_ip, base, query)
try:
response = requests.put(device_api,
headers={"Content-Type": "application/x-www-form-urlencoded"},
auth=self._get_cached_auth(),
data={"data": '{}'.format(value)})
except (requests.exceptions.Timeout,
requests.exceptions.ConnectionError):
self.log("Timeout: device at {} did not respond.".format(device_ip))
return None
return json.loads(response.text)
def post(self, base: str, value: Any = '', query: str = '') -> Optional[Dict[str, Any]]:
"""Construct and send a POST request to update or add specified data to the device router tree.
The behavior of this method is contextual:
- If the app is installed on (and executed from) a device, it directly
updates or adds the specified data to the router tree.
- If the app is running remotely from a computer, it calls the HTTP POST
method to update or add the specified data.
Args:
base (str): String representing a path to a resource on a router tree
(e.g., '/config/system/logging/level').
value (Any): The value to set at the specified path. Will be JSON serialized.
Defaults to empty string.
query (str): Optional query string for the request. Defaults to empty string.
Returns:
dict or None: A dictionary containing the response data
(e.g., {"success": True, "data": {}}), or None if the
request fails.
"""
value = json.dumps(value)
if self.ncos:
cmd = f"post\n{base}\n{query}\n{value}\n"
return self._dispatch(cmd)
else:
# Running in a computer so use http to send the post to the device.
device_ip, username, password = self._get_cached_credentials()
device_api = 'http://{}/api/{}/{}'.format(device_ip, base, query)
try:
response = requests.post(device_api,
headers={"Content-Type": "application/x-www-form-urlencoded"},
auth=self._get_cached_auth(),
data={"data": '{}'.format(value)})
except (requests.exceptions.Timeout,
requests.exceptions.ConnectionError):
self.log("Timeout: device at {} did not respond.".format(device_ip))
return None
return json.loads(response.text)
def patch(self, value: List[Any]) -> Optional[Dict[str, Any]]:
"""Construct and send a PATCH request to update or add specified data to the device router tree.
The behavior of this method is contextual:
- If the app is installed on (and executed from) a device, it directly
updates or adds the specified data to the router tree.
- If the app is running remotely from a computer, it calls the HTTP PUT
method to update or add the specified data.
Args:
value (List[Any]): List containing dict of add/changes, and list of removals:
[{add}, [remove]].
Returns:
dict or None: A dictionary containing the response data
(e.g., {"success": True, "data": {}}), or None if the
request fails.
"""
if self.ncos:
if value[0].get("config"):
adds = value[0]
else:
adds = {"config": value[0]}
adds = json.dumps(adds)
removals = json.dumps(value[1])
cmd = f"patch\n{adds}\n{removals}\n"
return self._dispatch(cmd)
else:
# Running in a computer so use http to send the put to the device.
device_ip, username, password = self._get_cached_credentials()
device_api = 'http://{}/api/'.format(device_ip)
try:
response = requests.patch(device_api,
headers={"Content-Type": "application/x-www-form-urlencoded"},
auth=self._get_cached_auth(),
data={"data": '{}'.format(json.dumps(value))})
except (requests.exceptions.Timeout,
requests.exceptions.ConnectionError):
self.log("Timeout: device at {} did not respond.".format(device_ip))
return None
return json.loads(response.text)
def delete(self, base: str, query: str = '') -> Optional[Dict[str, Any]]:
"""Construct and send a DELETE request to delete specified data from the device router tree.
The behavior of this method is contextual:
- If the app is installed on (and executed from) a device, it directly
deletes the specified data from the router tree.
- If the app is running remotely from a computer, it calls the HTTP DELETE
method to delete the specified data.
Args:
base (str): String representing a path to a resource on a router tree
(e.g., '/config/system/logging/level').
query (str): Optional query string for the request. Defaults to empty string.
Returns:
dict or None: A dictionary containing the response data
(e.g., {"success": True, "data": {}}), or None if the
request fails.
"""
if self.ncos:
cmd = "delete\n{}\n{}\n".format(base, query)
return self._dispatch(cmd)
else:
# Running in a computer so use http to send the delete to the device.
device_ip, username, password = self._get_cached_credentials()
device_api = 'http://{}/api/{}/{}'.format(device_ip, base, query)
try:
response = requests.delete(device_api,
headers={"Content-Type": "application/x-www-form-urlencoded"},
auth=self._get_cached_auth(),
data={"data": '{}'.format(base)})
except (requests.exceptions.Timeout,
requests.exceptions.ConnectionError):
self.log("Timeout: device at {} did not respond.".format(device_ip))
return None
return json.loads(response.text)
def alert(self, value: str = '') -> Optional[Dict[str, Any]]:
"""Construct and send a custom alert to NCM for the device.
Apps calling this method must be running on the target device to send the alert.
If invoked while running on a computer, then only a log is output.
Args:
value (str): String to be displayed for the alert. Defaults to empty string.
Returns:
dict or None: Success returns None, failure returns an error dictionary.
When running remotely, always returns None and logs the alert.
"""
if self.ncos:
cmd = "alert\n{}\n{}\n".format(self.app_name, value)
return self._dispatch(cmd)
else:
# Running in a computer and can't actually send the alert.
self.log('Alert is only available when running the app in NCOS.')
self.log('Alert Text: {}'.format(value))
def log(self, value: str = '') -> None:
"""Add an INFO log to the device SYSLOG.
Args:
value (Union[str, Dict[str, Any]]): String text for the log.
Defaults to empty string.
Returns:
None: This method does not return a value.
"""
if _cs_client.enable_logging:
# Running in NCOS so write to the logger
self.logger.info(value)
elif self.ncos:
# Running in container so write to stdout
with open('/dev/stdout', 'w') as logfile:
logfile.write(f'{value}\n')
else:
# Running in a computer so just use print for the log.
print(value)
def _get_cached_credentials(self) -> Tuple[str, str, str]:
"""Get cached device credentials, loading them if not already cached.
Returns:
Tuple[str, str, str]: A tuple containing (device_ip, username, password)
"""
if self._cached_device_ip is None:
self._cached_device_ip, self._cached_username, self._cached_password = self._get_device_access_info()
return self._cached_device_ip, self._cached_username, self._cached_password
def _get_cached_auth(self) -> Any:
"""Get cached authentication object, creating it if not already cached.
Returns:
requests.auth.HTTPBasicAuth or requests.auth.HTTPDigestAuth: The appropriate
authentication object based on the NCOS version.
"""
if self._cached_auth is None:
device_ip, username, password = self._get_cached_credentials()
self._cached_auth = self._get_auth(device_ip, username, password)
return self._cached_auth
def _get_auth(self, device_ip: str, username: str, password: str) -> Any:
"""Return the proper HTTP Auth for the NCOS version.
This is only needed when the app is running on a computer.
Digest Auth is used for NCOS 6.4 and below while Basic Auth is
used for NCOS 6.5 and up.
Args:
device_ip (str): IP address of the target device.
username (str): Username for authentication.
password (str): Password for authentication.
Returns:
requests.auth.HTTPBasicAuth or requests.auth.HTTPDigestAuth: The appropriate
authentication object based on the NCOS version.
"""
use_basic = False
device_api = 'http://{}/api/status/product_info'.format(device_ip)
try:
response = requests.get(device_api, auth=requests.auth.HTTPBasicAuth(username, password))
if response.status_code == HTTPStatus.OK:
use_basic = True
except:
use_basic = False
if use_basic:
return requests.auth.HTTPBasicAuth(username, password)
else:
return requests.auth.HTTPDigestAuth(username, password)
@staticmethod
def _get_device_access_info() -> Tuple[str, str, str]:
"""Return device access info from the sdk_settings.ini file.
This should only be called when running on a computer.
Returns:
Tuple[str, str, str]: A tuple containing (device_ip, username, password)
from the sdk_settings.ini file.
"""
try:
device_ip = ''
device_username = ''
device_password = ''
if 'linux' not in sys.platform:
# Try parent directory first, then fallback to current directory
parent_settings_file = os.path.join(os.path.dirname(os.getcwd()), 'sdk_settings.ini')
current_settings_file = os.path.join(os.getcwd(), 'sdk_settings.ini')
# Check which file exists
if os.path.exists(parent_settings_file):
settings_file = parent_settings_file
elif os.path.exists(current_settings_file):
settings_file = current_settings_file
else:
settings_file = parent_settings_file # Use parent as default for error messages
config = configparser.ConfigParser()
config.read(settings_file)
# Keys in sdk_settings.ini
sdk_key = 'sdk'
ip_key = 'dev_client_ip'
username_key = 'dev_client_username'
password_key = 'dev_client_password'
if sdk_key in config:
if ip_key in config[sdk_key]:
device_ip = config[sdk_key][ip_key]
else:
log('ERROR 1: The {} key does not exist in {}'.format(ip_key, settings_file))
if username_key in config[sdk_key]:
device_username = config[sdk_key][username_key]
else:
log('ERROR 2: The {} key does not exist in {}'.format(username_key, settings_file))
if password_key in config[sdk_key]:
device_password = config[sdk_key][password_key]
else:
log('ERROR 3: The {} key does not exist in {}'.format(password_key, settings_file))
else:
log('ERROR 4: The {} section does not exist in {}'.format(sdk_key, settings_file))
return device_ip, device_username, device_password
except Exception as e:
log(f"Error getting device access info: {e}")
return '', '', ''
def _safe_dispatch(self, cmd: str) -> Dict[str, Any]:
"""Send the command and return the response.
Args:
cmd (str): The command string to send to the router.
Returns:
dict: A dictionary containing the response with 'status' and 'data' keys.
"""
try:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as sock:
sock.connect('/var/tmp/cs.sock')
sock.sendall(bytes(cmd, 'ascii'))
return self._receive(sock)
except Exception as e:
self.log(f"Error in safe dispatch: {e}")
return None
def _dispatch(self, cmd: str) -> Dict[str, Any]:
"""Safely dispatch a command to the router.
Args:
cmd (str): The command string to send to the router.
Returns:
dict: A dictionary containing the response from the router.
"""
errmsg = None
result = ""
try:
result = self._safe_dispatch(cmd)
except Exception as err:
# ignore the command error, continue on to next command
errmsg = "dispatch failed with exception={} err={}".format(type(err), str(err))
if errmsg is not None:
self.log(errmsg)
pass
return result
def _safe_receive(self, sock: socket.socket) -> Dict[str, Any]:
"""Safely receive data from a socket.
Args:
sock (socket.socket): The socket to receive data from.
Returns:
dict: A dictionary containing the response with 'status' and 'data' keys.
"""
sock.settimeout(self.RECV_TIMEOUT)
data = b""
eoh = -1
while eoh < 0:
# In the event that the config store times out in returning data, lib returns
# an empty result. Then again, if the config store hangs for 2+ seconds,
# the app's behavior is the least of our worries.
try:
buf = sock.recv(self.MAX_PACKET_SIZE)
except socket.timeout:
return {"status": "timeout", "data": None}
if len(buf) == 0:
break
data += buf
eoh = data.find(self.END_OF_HEADER)
status_hdr = self.STATUS_HEADER_RE.search(data).group(0)[8:]
content_len = self.CONTENT_LENGTH_HEADER_RE.search(data).group(0)[16:]
remaining = int(content_len) - (len(data) - eoh - len(self.END_OF_HEADER))
# body sent from csevent_xxx.sock will have id, action, path, & cfg
while remaining > 0:
buf = sock.recv(self.MAX_PACKET_SIZE) # TODO: This will hang things as well.
if len(buf) == 0:
break
data += buf
remaining -= len(buf)
body = data[eoh:].decode()
try:
result = json.loads(body)
except json.JSONDecodeError as e:
# config store receiver doesn't give back
# proper json for 'put' ops, body
# contains verbose error message
# so putting the error msg in result
result = body.strip()
return {"status": status_hdr.decode(), "data": result}
def _receive(self, sock: socket.socket) -> Dict[str, Any]:
"""Receive data from a socket with error handling.
Args:
sock (socket.socket): The socket to receive data from.
Returns:
dict: A dictionary containing the response from the socket.
"""
errmsg = None
result = ""
try:
result = self._safe_receive(sock)
except Exception as err:
# ignore the command error, continue on to next command
errmsg = "_receive failed with exception={} err={}".format(type(err), str(err))
if errmsg is not None:
self.log(errmsg)
return result
class EventingCSClient(CSClient):
"""Event-driven CSClient for handling config store events.
The EventingCSClient extends CSClient to provide event-driven programming
capabilities. It allows applications to register callbacks that are triggered
when specific config store events occur (e.g., when values are set or retrieved).
This class manages a background thread that listens for config store events
and invokes registered callbacks when events occur.
Attributes:
running (bool): Whether the event handling loop is currently running.
registry (dict): Dictionary mapping event IDs to callback information.
eids (int): Counter for generating unique event IDs.
on (method): Alias for the register method.
un (method): Alias for the unregister method.
"""
running = False
registry = {}
eids = 1
def __init__(self, app_name: str, init: bool = True, enable_logging: bool = False, ncos: bool = False) -> None:
"""Initialize the EventingCSClient and set up aliases for register/unregister.
Args:
app_name (str): The name of the application using this client.
init (bool): Flag to perform full initialization. Defaults to True.
enable_logging (bool): Whether to enable logging. Defaults to False.
ncos (bool): Whether running on NCOS. Defaults to False.
"""
super().__init__(app_name, init, enable_logging, ncos)
self.on = self.register
self.un = self.unregister
def start(self) -> None:
"""Start the event handling loop in a separate thread.
This method creates a Unix domain socket, starts a background thread
to handle config store events, and begins listening for incoming events.
"""
try:
if self.running:
self.log(f"Eventing Config Store {self.pid} already running")
return
self.running = True
self.pid = os.getpid()
self.f = '/var/tmp/csevent_%d.sock' % self.pid
try:
os.unlink(self.f)
except FileNotFoundError:
pass
self.event_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.event_sock.bind(self.f)
self.event_sock.listen() # backlog is optional. already set on value found in /proc
self.event_sock.setblocking(False)
self.eloop = threading.Thread(target=self._handle_events)
self.eloop.start()
except Exception as e:
self.log(f"Error starting event handling loop: {e}")
self.running = False
def stop(self) -> None:
"""Stop the event handling loop and clean up resources.
This method unregisters all callbacks, closes the event socket,
removes the socket file, and stops the background thread.
"""
try:
if not self.running:
return
self.log(f"Stopping")
for k in list(self.registry.keys()):
self.unregister(k)
self.event_sock.close()
os.unlink(self.f)
self.running = False
except Exception as e:
self.log(f"Error stopping event handling loop: {e}")
self.running = False
def _handle_events(self) -> None:
"""The main event loop for handling config store events.
This method runs in a separate thread and continuously polls for
incoming config store events. When events are received, it invokes
the appropriate registered callbacks.
"""
poller = select.poll()
poller.register(self.event_sock,
select.POLLIN | select.POLLERR | select.POLLHUP) # I don't unregsiter this in cleaning up!
while self.running:
try:
events = poller.poll(1000)
for f, ev in events:
if ev & (select.POLLERR | select.POLLHUP):
self.log("Hangup/error received. Stopping")
self.stop() # TODO: restart w/ cached registrations. Will no longer be an error case
if ev & select.POLLIN:
conn, addr = self.event_sock.accept()
result = self._receive(conn)
eid = int(result['data']['id'])
try:
cb = self.registry[eid]['cb']
args = self.registry[eid]['args']
try:
# PUTting just a string to config store results in a json encoded string returned.
# e.g. set /config/system/logging/level "debug", result['data']['cfg'] is '"debug"'
cfg = json.loads(result['data']['cfg'])
except TypeError as e:
# Non-string path
cfg = result['data']['cfg']
try:
cb_return = cb(result['data']['path'], cfg, args)
except:
if traceback:
traceback.print_exc()
self.log(f"Exception during callback for {str(self.registry[eid])}")
if result['data']['action'] == 'get': # We've something to send back.
# config_store_receiver expects json
cb_return = json.JSONEncoder().encode(cb_return)
conn.sendall(
cb_return.encode()) # No dispatch. Config store receiver will put to config store.
except (NameError, ValueError) as e:
self.log(f"Could not find register data for eid {eid}")
except OSError as e:
self.log(f"OSError: {e}")
raise
def register(self, action: str = 'set', path: str = '', callback: Callable = None, *args: Any) -> Dict[str, Any]:
"""Register a callback for a config store event.
Args:
action (str): The action to listen for (e.g., 'set', 'get'). Defaults to 'set'.
path (str): The config store path to monitor. Defaults to empty string.
callback (callable): The function to call when the event occurs. Defaults to None.
*args: Additional arguments to pass to the callback.
Returns:
dict: The result of the registration command.
"""
try:
if not self.running:
self.start()
# what about multiple registration?
eid = self.eids
self.eids += 1
self.registry[eid] = {'cb': callback, 'action': action, 'path': path, 'args': args}
cmd = "register\n{}\n{}\n{}\n{}\n".format(self.pid, eid, action, path)
return self._dispatch(cmd)
except Exception as e:
self.log(f"Error registering callback for {path}: {e}")
return {}
def unregister(self, eid: int = 0) -> Dict[str, Any]:
"""Unregister a callback by its event ID.
Args:
eid (int): The event ID returned by register. Defaults to 0.
Returns:
dict: The result of the unregistration command.
"""
ret = ""
try:
e = self.registry[eid]
except KeyError:
pass
else:
if self.running:
cmd = "unregister\n{}\n{}\n{}\n{}\n".format(self.pid, eid, e['action'], e['path'])
ret = self._dispatch(cmd)
del self.registry[eid]
return ret
# ============================================================================
# GRANULAR STATUS GET METHODS
# ============================================================================
def get_gps_status(self) -> Dict[str, Any]:
"""Get GPS status and return detailed information with decimal coordinates.
Returns:
Dict[str, Any]: GPS status information including:
- gps_lock (bool): Whether GPS has a lock
- satellites (int): Number of satellites in view
- location (dict): GPS coordinates in degrees/minutes/seconds format
- latitude (float): GPS latitude in decimal format
- longitude (float): GPS longitude in decimal format
- altitude (float): Altitude in meters
- speed (float): Ground speed in knots
- heading (float): Heading in degrees
- accuracy (float): GPS accuracy in meters
- last_fix_age (int): Age of last GPS fix
"""
try:
gps_data = self.get('status/gps')
if not gps_data:
return {"gps_lock": False, "satellites": 0}
analysis = {
"gps_lock": False,
"satellites": 0,
"location": None,
"latitude": None,
"longitude": None,
"altitude": None,
"speed": None,
"heading": None,
"accuracy": None,
"last_fix_age": None
}
fix = gps_data.get("fix", {})
if fix:
analysis.update({
"gps_lock": fix.get("lock", False),
"satellites": fix.get("satellites", 0),
"altitude": fix.get("altitude_meters"),
"speed": fix.get("ground_speed_knots"),
"heading": fix.get("heading"),
"accuracy": fix.get("accuracy"),
"last_fix_age": fix.get("age")
})
if fix.get("latitude") and fix.get("longitude"):
analysis["location"] = {
"latitude": f"{fix['latitude']['degree']}°{fix['latitude']['minute']}'{fix['latitude']['second']}\"",
"longitude": f"{fix['longitude']['degree']}°{fix['longitude']['minute']}'{fix['longitude']['second']}\""
}
# Add decimal coordinates to the root level
try:
lat_deg = fix['latitude']['degree']
lat_min = fix['latitude']['minute']
lat_sec = fix['latitude']['second']
long_deg = fix['longitude']['degree']
long_min = fix['longitude']['minute']
long_sec = fix['longitude']['second']
decimal_lat = dec(lat_deg, lat_min, lat_sec)
decimal_long = dec(long_deg, long_min, long_sec)
analysis["latitude"] = decimal_lat
analysis["longitude"] = decimal_long
except Exception as coord_error:
self.log(f"Error converting coordinates to decimal: {coord_error}")
return analysis
except Exception as e:
self.log(f"Error analyzing GPS status: {e}")
return None
def get_lat_long(self, max_retries: int = 5, retry_delay: float = 0.1) -> Tuple[Optional[float], Optional[float]]:
"""Return latitude and longitude as floats.
Args:
max_retries (int): Maximum number of retries to get GPS fix. Defaults to 5.
retry_delay (float): Delay between retries in seconds. Defaults to 0.1.
Returns:
Tuple[float, float] or Tuple[None, None]: A tuple containing (latitude, longitude)
in decimal degrees, or (None, None) if GPS fix is not available.
"""
try:
fix = self.get('status/gps/fix')
retries = 0
while not fix and retries < max_retries:
time.sleep(retry_delay)
fix = self.get('status/gps/fix')
retries += 1
if not fix:
return None, None
try:
lat_deg = fix['latitude']['degree']
lat_min = fix['latitude']['minute']
lat_sec = fix['latitude']['second']
long_deg = fix['longitude']['degree']
long_min = fix['longitude']['minute']
long_sec = fix['longitude']['second']
lat = dec(lat_deg, lat_min, lat_sec)
long = dec(long_deg, long_min, long_sec)
if lat is None or long is None:
return None, None
lat = float(f"{float(lat):.6f}")
long = float(f"{float(long):.6f}")
return lat, long
except:
return None, None
except Exception as e:
self.log(f"Error getting latitude and longitude: {e}")
return None, None
def get_system_status(self) -> Dict[str, Any]:
"""Get system status and return detailed information.
Returns:
Dict[str, Any]: System status information including:
- uptime (int): System uptime in seconds
- temperature (float): System temperature
- cpu_usage (float): CPU usage percentage
- memory (dict): Memory usage statistics including:
- total_bytes (int): Total memory in bytes
- used_bytes (int): Used memory in bytes
- free_bytes (int): Free memory in bytes
- percentage_used (float): Memory usage percentage
- disk (dict): Disk usage statistics including:
- total_bytes (int): Total disk space in bytes
- used_bytes (int): Used disk space in bytes
- free_bytes (int): Free disk space in bytes
- percentage_used (float): Disk usage percentage
- services_running (int): Number of running services
- services_disabled (int): Number of disabled services
- internal_apps_running (int): Number of running internal applications
- external_apps_running (int): Number of running external applications
"""
try:
system_data = self.get('status/system')
if not system_data:
return {}
# Get memory data
memory_data = system_data.get("memory", {})
mem_total = float(memory_data.get("memtotal", 0))
mem_available = float(memory_data.get("memavailable", 0))
mem_used = mem_total - mem_available
mem_percentage = round((mem_used / mem_total * 100) if mem_total > 0 else 0, 1)
# Get disk usage data
disk_data = self.get('status/mount/disk_usage/')
disk_total = 0
disk_free = 0
disk_used = 0
disk_percentage = 0
if disk_data:
disk_total = float(disk_data.get("total_bytes", 0))
disk_free = float(disk_data.get("free_bytes", 0))
disk_used = disk_total - disk_free
disk_percentage = round((disk_used / disk_total * 100) if disk_total > 0 else 0, 1)
analysis = {
"uptime": system_data.get("uptime"),