-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathradio_protocol.py
More file actions
2276 lines (1725 loc) · 66.7 KB
/
radio_protocol.py
File metadata and controls
2276 lines (1725 loc) · 66.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
Radio Protocol Classes for Interlocutor
"""
import logging
import random
import socket
import struct
import threading
import time
from enum import Enum
from typing import Dict, List, Tuple, Union
# debug configuration
class DebugConfig:
"""Centralized debug configuration"""
VERBOSE = False
QUIET = False
@classmethod
def set_mode(cls, verbose=False, quiet=False):
cls.VERBOSE = verbose
cls.QUIET = quiet
# Set up logging based on mode
if verbose:
logging.basicConfig(level=logging.DEBUG, format='🐛 %(message)s')
elif quiet:
logging.basicConfig(level=logging.WARNING, format='⚠️ %(message)s')
else:
logging.basicConfig(level=logging.INFO, format='ℹ️ %(message)s')
@classmethod
def debug_print(cls, message, force=False):
"""Print message only in verbose mode or if forced"""
if cls.VERBOSE or force:
print(message)
@classmethod
def user_print(cls, message):
"""Print user-facing messages (always shown unless quiet)"""
if not cls.QUIET:
print(message)
@classmethod
def system_print(cls, message):
"""Print important system messages (always shown)"""
print(message)
def encode_callsign(callsign: str) -> int:
"""
Encodes a callsign into a 6-byte binary format using base-40 encoding.
The callsign is any combination of uppercase letters, digits,
hyphens, slashes, and periods. Each character is encoded base-40.
:param callsign: The callsign to encode.
:return: A 6-byte binary representation of the callsign.
"""
encoded = 0
for c in callsign[::-1]:
encoded *= 40
if "A" <= c <= "Z":
encoded += ord(c) - ord("A") + 1
elif "0" <= c <= "9":
encoded += ord(c) - ord("0") + 27
elif c == "-":
encoded += 37
elif c == "/":
encoded += 38
elif c == ".":
encoded += 39
else:
raise ValueError(f"Invalid character '{c}' in callsign.")
if encoded > 0xFFFFFFFFFFFF:
raise ValueError("Encoded callsign exceeds maximum length of 6 bytes.")
return encoded
def decode_callsign(encoded: int) -> str:
"""
Decodes a 6-byte binary callsign back to string format.
:param encoded: The encoded callsign as an integer.
:return: The decoded callsign string.
"""
callsign_map = {
1: "A", 2: "B", 3: "C", 4: "D", 5: "E", 6: "F", 7: "G", 8: "H", 9: "I", 10: "J",
11: "K", 12: "L", 13: "M", 14: "N", 15: "O", 16: "P", 17: "Q", 18: "R", 19: "S", 20: "T",
21: "U", 22: "V", 23: "W", 24: "X", 25: "Y", 26: "Z", 27: "0", 28: "1", 29: "2", 30: "3",
31: "4", 32: "5", 33: "6", 34: "7", 35: "8", 36: "9", 37: "-", 38: "/", 39: ".",
}
decoded: str = ""
while encoded > 0:
remainder = encoded % 40
if remainder in callsign_map:
decoded = callsign_map[remainder] + decoded
else:
raise ValueError(f"Invalid encoded value: {remainder}")
encoded //= 40
return decoded[::-1] # Reverse to get the correct order
class MessageType(Enum):
"""Message types with priority ordering"""
VOICE = (1, "VOICE")
CONTROL = (2, "CONTROL")
TEXT = (3, "TEXT")
DATA = (4, "DATA")
def __init__(self, priority, name):
self.priority = priority
self.message_name = name
class StationIdentifier:
"""Domain model for flexible station identification using base-40 encoding"""
def __init__(self, callsign):
"""Initialize with a flexible callsign (no SSID in base-40 encoding)"""
self.callsign = self._validate_callsign(callsign)
self.encoded_value = encode_callsign(self.callsign)
def _validate_callsign(self, callsign):
"""Validate callsign for base-40 encoding"""
if not callsign:
raise ValueError("Callsign cannot be empty")
callsign_upper = callsign.upper().strip()
# Check for valid base-40 characters
valid_chars = set("ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-/.")
invalid_chars = set(callsign_upper) - valid_chars
if invalid_chars:
raise ValueError(f"Invalid characters in callsign: {', '.join(invalid_chars)}")
# Test encoding to ensure it fits in 6 bytes
try:
encoded = encode_callsign(callsign_upper)
if encoded > 0xFFFFFFFFFFFF:
raise ValueError("Callsign too long for 6-byte encoding")
except ValueError as e:
raise ValueError(f"Callsign encoding failed: {e}")
return callsign_upper
def to_bytes(self):
"""Convert station ID to 6-byte representation for protocol"""
# Convert the encoded integer to 6 bytes (big-endian)
return self.encoded_value.to_bytes(6, byteorder='big')
def __str__(self):
return self.callsign
@classmethod
def from_bytes(cls, station_bytes):
"""Create StationIdentifier from 6-byte representation"""
if len(station_bytes) != 6:
raise ValueError("Station ID must be exactly 6 bytes")
# Convert bytes to integer (big-endian)
encoded_value = int.from_bytes(station_bytes, byteorder='big')
# Decode the callsign
try:
callsign = decode_callsign(encoded_value)
return cls(callsign)
except ValueError as e:
raise ValueError(f"Failed to decode station ID: {e}")
@classmethod
def from_encoded(cls, encoded_value):
"""Create StationIdentifier from already encoded integer"""
callsign = decode_callsign(encoded_value)
instance = cls.__new__(cls) # Create without calling __init__
instance.callsign = callsign
instance.encoded_value = encoded_value
return instance
class COBSEncoder:
"""
COBS encoder for Opulent Voice Protocol
Think of this as a Frame Boundary Manager - it ensures we can always
find where one frame ends and the next begins, even with arbitrary data.
MAX_BLOCK_SIZE is how far ahead the COBS encoder looks to find the next
0x00 value. If it's larger than the max_payload_per_frame in the fragmenter
then we have the least amount of extra overhead from smaller COBS fragments
than the text and control message fragmenter is creating, in order to
fit text and control messages into 40ms frames.
"""
MAX_BLOCK_SIZE = 254
@staticmethod
def encode(data: bytes) -> bytes:
"""Encode data using COBS algorithm
This version of the COBS encoder returns the encoded data with the
COBS separator byte (0x00) included at the end.
"""
if not data:
return b'\x01\x00' # 01 encodes the implied zero byte, followed by the separator byte
encoded = bytearray()
pos = 0
while pos < len(data):
# Find next zero byte (or end of data)
zero_pos = data.find(0, pos)
if zero_pos == -1:
zero_pos = len(data) # No zero found, use end of data
block_len = zero_pos - pos
# Handle blocks larger than MAX_BLOCK_SIZE
while block_len >= COBSEncoder.MAX_BLOCK_SIZE:
encoded.append(COBSEncoder.MAX_BLOCK_SIZE + 1) # 255
encoded.extend(data[pos:pos + COBSEncoder.MAX_BLOCK_SIZE])
pos += COBSEncoder.MAX_BLOCK_SIZE
block_len = zero_pos - pos
# Handle the remaining block (< MAX_BLOCK_SIZE)
if block_len > 0:
encoded.append(block_len + 1)
encoded.extend(data[pos:zero_pos])
else:
encoded.append(1) # Zero-length block
pos = zero_pos + 1
# is this right?
if pos == len(data):
# If we reached the end, append the implied zero byte
encoded.append(1)
encoded.append(0) # COBS separator byte
return bytes(encoded)
# FIXED COBS Decoder - Replace the decode method in radio_protocol.py
@staticmethod
def decode(encoded_data: bytes) -> bytes:
"""Decode COBS-encoded data - FIXED VERSION"""
if not encoded_data or encoded_data[-1] != 0:
raise ValueError("COBS data must end with zero byte")
data = encoded_data[:-1] # Remove separator byte
if data.find(b"\x00") != -1:
raise ValueError("Unexpected zero byte in COBS data")
decoded = bytearray()
pos = 0
while pos < len(data):
code = data[pos]
pos += 1
if code == 0:
raise ValueError("Unexpected zero byte in COBS data")
block_len = code - 1
if pos + block_len > len(data):
raise ValueError("COBS block extends beyond data")
# Add the data block
decoded.extend(data[pos:pos + block_len])
pos += block_len
# FIXED: Add zero byte if this wasn't a max-length block AND we're not at the end
if code < 255 and pos < len(data):
decoded.append(0)
return bytes(decoded)
class COBSFrameBoundaryManager:
"""
Domain model for managing frame boundaries in Opulent Voice Protocol
"""
def __init__(self):
self.stats = {
'frames_encoded': 0,
'frames_decoded': 0,
'encoding_errors': 0,
'decoding_errors': 0,
'total_overhead_bytes': 0
}
def encode_frame(self, ip_frame_data: bytes) -> bytes:
"""Encode IP frame with COBS for boundary management"""
try:
# Apply COBS encoding
encoded_frame = COBSEncoder.encode(ip_frame_data)
# Update statistics
self.stats['frames_encoded'] += 1
overhead = len(encoded_frame) - len(ip_frame_data)
self.stats['total_overhead_bytes'] += overhead
return encoded_frame
except Exception as e:
self.stats['encoding_errors'] += 1
raise ValueError(f"COBS encoding failed: {e}")
# 2. In COBSFrameBoundaryManager class, replace the decode_frame method:
def decode_frame(self, encoded_data: bytes) -> Tuple[bytes, int]:
"""Decode COBS frame and return original IP data - FIXED FOR 1-BYTE LOSS"""
try:
# Add terminator for decoding if needed
if encoded_data.endswith(b'\x00'):
cobs_data_with_terminator = encoded_data
else:
cobs_data_with_terminator = encoded_data + b'\x00'
# Decode the COBS data
decoded_frame = COBSEncoder.decode(cobs_data_with_terminator)
# Only show debug info in verbose mode
DebugConfig.debug_print(f"🔍 COBS decode: {len(encoded_data)}B → {len(decoded_frame)}B")
# Only show size mismatches (potential issues)
if len(decoded_frame) != 120:
DebugConfig.debug_print(f"⚠️ Unexpected frame size: {len(decoded_frame)}B (expected 120B)")
self.stats['frames_decoded'] += 1
return decoded_frame, len(cobs_data_with_terminator)
except Exception as e:
# Always show decode failures (they're important)
print(f"❌ COBS decode failed: {len(encoded_data)}B frame - {e}")
self.stats['decoding_errors'] += 1
raise ValueError(f"COBS decoding failed: {e}")
def get_stats(self) -> dict:
"""Get encoding statistics"""
stats = self.stats.copy()
if stats['frames_encoded'] > 0:
stats['avg_overhead_per_frame'] = stats['total_overhead_bytes'] / stats['frames_encoded']
else:
stats['avg_overhead_per_frame'] = 0
return stats
class SimpleFrameSplitter:
"""
FIXED: Frame splitter with correct 134-byte frames for all content types
"""
def __init__(self, opulent_voice_frame_size: int = 134): # CHANGED: 133 → 134
"""
opulent_voice_frame_size: Total size of each Opulent Voice frame (including 12-byte header)
FIXED: Now correctly sized for audio frames without splitting
"""
self.opulent_voice_frame_size = opulent_voice_frame_size
self.payload_size = opulent_voice_frame_size - 12 # 134 - 12 = 122 bytes
print(f"📏 FIXED Frame splitter: {self.opulent_voice_frame_size}B total, {self.payload_size}B payload")
print(f"📏 Audio frame budget: IP(120B) + COBS(2B) = {self.payload_size}B ✅")
self.stats = {
'single_frame_messages': 0,
'multi_frame_messages': 0,
'total_frames_created': 0,
'audio_frames_split': 0, # Should always be 0!
'text_frames_created': 0,
'control_frames_created': 0
}
def split_cobs_frame(self, cobs_encoded_data: bytes, frame_type: str = "unknown") -> List[bytes]:
"""
ENHANCED: Split COBS frame with frame type tracking and split detection
"""
if len(cobs_encoded_data) <= self.payload_size:
# Single frame - pad to exactly payload_size bytes
padded_data = cobs_encoded_data + b'\x00' * (self.payload_size - len(cobs_encoded_data))
self.stats['single_frame_messages'] += 1
self.stats['total_frames_created'] += 1
#print(f"📦 {frame_type}: {len(cobs_encoded_data)}B COBS → 1 frame ({len(padded_data)}B) ✅")
return [padded_data]
# Multi-frame - this should NOT happen for audio!
if frame_type == "audio":
self.stats['audio_frames_split'] += 1
print(f"🚨 CRITICAL ERROR: Audio frame split!")
print(f"🚨 {len(cobs_encoded_data)}B COBS > {self.payload_size}B limit")
print(f"🚨 This violates Opulent Voice Protocol timing requirements!")
# Could raise exception here if you want to catch this in testing
self.stats['multi_frame_messages'] += 1
frames = []
for i in range(0, len(cobs_encoded_data), self.payload_size):
chunk = cobs_encoded_data[i:i + self.payload_size]
# Pad last chunk to exactly payload_size bytes if needed
if len(chunk) < self.payload_size:
chunk = chunk + b'\x00' * (self.payload_size - len(chunk))
frames.append(chunk)
self.stats['total_frames_created'] += 1
# Track frame type statistics
if frame_type == "text":
self.stats['text_frames_created'] += len(frames)
elif frame_type == "control":
self.stats['control_frames_created'] += len(frames)
print(f"📦 {frame_type}: {len(cobs_encoded_data)}B COBS → {len(frames)} frames")
return frames
def get_stats(self):
"""Enhanced statistics with frame type breakdown"""
stats = self.stats.copy()
# Add derived statistics
if stats['total_frames_created'] > 0:
stats['avg_frames_per_message'] = stats['total_frames_created'] / (
stats['single_frame_messages'] + stats['multi_frame_messages']
)
else:
stats['avg_frames_per_message'] = 0
stats['audio_split_rate'] = (stats['audio_frames_split'] / max(1, stats['total_frames_created'])) * 100
return stats
class SimpleFrameReassembler:
"""
Simple frame reassembler - concatenates 122-byte payloads until COBS delimiter found
No fragmentation headers to worry about
"""
def __init__(self):
self.buffer = bytearray()
self.stats = {
'frames_received': 0,
'messages_completed': 0,
'bytes_buffered': 0
}
def add_frame_payload(self, frame_payload: bytes) -> list[bytes]:
self.stats['frames_received'] += 1
delimiter_pos = frame_payload.find(0, 0)
if delimiter_pos == -1:
# no delimiter anywhere, just append the whole frame_payload
self.buffer.extend(frame_payload) # this is cheap for a bytearray
return [] # no reassembled_frames were completed by this frame_payload.
# We've completed a packet, using up any existing contents of self.buffer.
reassembled_frames = [bytes(self.buffer + frame_payload[0:delimiter_pos])]
# Now we are dealing with only the remains of frame_payload
start_pos = delimiter_pos + 1 # index into frame_payload
while start_pos < len(frame_payload):
delimiter_pos = frame_payload.find(0, start_pos)
if delimiter_pos == -1:
# We don't have another ending delimiter, so we're done for now.
# Save the remains of the frame, if any, in self.buffer
self.buffer[:] = frame_payload[start_pos:]
break # ← BREAK instead of return
if delimiter_pos == start_pos:
# we have an extra delimiter of padding here, not a packet
# just skip it (without incurring a copy)
start_pos += 1
else:
# we have a packet that was contained within the frame_payload
reassembled_frames.append(frame_payload[start_pos:delimiter_pos])
start_pos = delimiter_pos + 1
else:
# This 'else' clause runs when the while loop exits normally
# (didn't break), meaning we processed all data
self.buffer.clear()
# ALWAYS return here after the loop
self.stats['messages_completed'] += len(reassembled_frames)
return reassembled_frames
def add_frame_payload_proposed(self, frame_payload: bytes) -> list[bytes]:
""" From Paul
Reassemble incoming frame payloads into COBS-encoded packets
by breaking them up at the zero-byte delimiters (not included).
The frame_payload is always relatively small (122 bytes),
but self.buffer can grow up to 65535 bytes if we allow that.
So we take pains to avoid doing much with self.buffer until
we absolutely have to, and then keeping it simple. We already
know that self.buffer doesn't contain any delimiters, so we
only need to scan frame_payload. If we're careful, we only
need to scan it a total of once.
"""
self.stats['frames_received'] += 1
delimiter_pos = frame_payload.find(0, 0)
if delimiter_pos == -1:
# no delimiter anywhere, just append the whole frame_payload
self.buffer.extend(frame_payload) # this is cheap for a bytearray
return [] # no reassembled_frames were completed by this frame_payload.
# We've completed a packet, using up any existing contents of self.buffer.
#reassembled_frames = [bytes(self.buffer + frame_payload[0:delimiter_pos]),] #original line
#DEBUG
print("self.buffer is ", self.buffer, "frame_payload[0:delimiter_pos] is ", frame_payload[0:delimiter_pos], "delimiter_pos is ", delimiter_pos)
reassembled_frames = [bytes(self.buffer + frame_payload[0:delimiter_pos])]
#DEBUG
print("reassembled_frames ", reassembled_frames)
# Now we are dealing with only the remains of frame_payload,
# which is relatively short. But we'll still handle it carefully
# without any unnecessary copy operations, by doing some index arithmetic.
start_pos = delimiter_pos + 1 # index into frame_payload
while start_pos < len(frame_payload):
delimiter_pos = frame_payload.find(0, start_pos)
if delimiter_pos == -1:
# We don't have another ending delimiter, so we're done for now.
# Save the remains of the frame, if any, in self.buffer
self.buffer[:] = frame_payload[start_pos:]
self.stats['messages_completed'] += len(reassembled_frames)
#DEBUG
print("reassembled_frames", reassembled_frames)
return reassembled_frames
if delimiter_pos == start_pos:
# we have an extra delimiter of padding here, not a packet
# just skip it (without incurring a copy)
start_pos += 1
else:
# we have a packet that was contained within the frame_payload
reassembled_frames.append(frame_payload[start_pos:delimiter_pos])
start_pos = delimiter_pos+1
def add_frame_payload_replaced(self, frame_payload: bytes) -> List[bytes]:
"""
Add a 122-byte frame payload and return complete COBS frame if ready
frame_payload: 122-byte payload from Opulent Voice frame (header removed)
Returns: List of completed COBS-encoded frames
"""
# We will build a list of zero or more reassembled COBS frames
reassembled_frames = []
if len(frame_payload) != 122:
print(f"⚠ Expected 122-byte payload, got {len(frame_payload)}B")
return reassembled_frames
# Add payload to buffer
self.buffer.extend(frame_payload)
self.stats['frames_received'] += 1
while len(self.buffer) > 0:
# Look for COBS delimiter (0x00)
delimiter_pos = self.buffer.find(0)
if delimiter_pos != -1:
if delimiter_pos == 0:
# Delimiter at start, skip it
self.buffer = self.buffer[1:]
continue
# Found a non-empty complete COBS frame, add it to the list
reassembled_frames.append(self.buffer[:delimiter_pos]) # don't include the delimiter
self.stats['messages_completed'] += 1
# Remove processed data from buffer
self.buffer = self.buffer[delimiter_pos + 1:] # don't include the delimiter
self.stats['bytes_buffered'] = len(self.buffer)
if len(reassembled_frames) == 0:
print("📝 No complete COBS frames yet, buffering payload")
else:
print(f"✅ Reassembled {len(reassembled_frames)} complete COBS frames")
for frame in reassembled_frames:
print(f"✅ Reassembled complete COBS frame: {len(frame)}B")
return reassembled_frames
def get_stats(self):
"""Get reassembly statistics"""
return self.stats.copy()
class RTPHeader:
"""
RTP Header implmentation for Opulent Voice Protocol
"""
VERSION = 2
PT_OPUS = 96 # in the range 96 to 127
HEADER_SIZE = 12
# Opulent Voice Protocol Constants
OPULENT_VOICE_FRAME_DURATION_MS = 40
OPULENT_VOICE_SAMPLE_RATE = 48000
OPULENT_VOICE_OPUS_PAYLOAD_SIZE = 80
OPULENT_VOICE_SAMPLES_PER_FRAME = 1920
def __init__(self, payload_type=PT_OPUS, ssrc=None): # Synchronization Source (SSRC)
# Identifies source of a stream of RTP packets
# Value is randomly chosen and unique within session.
# Contributing source (CSRC) is a source of a stream of
# RTP packets that has contributed to the combined
# stream produced by an RTP mixer
# Marker bit is set at the beginning of a "talkspurt"
self.version = self.VERSION
self.padding = 0
self.extension = 0
self.csrc_count = 0
self.marker = 0
self.payload_type = payload_type
self.sequence_number = random.randint(0, 65535)
self.ssrc = ssrc or self._generate_ssrc()
self.timestamp_base = int(time.time() * self.OPULENT_VOICE_SAMPLE_RATE) % (2**32)
self.samples_per_frame = self.OPULENT_VOICE_SAMPLES_PER_FRAME
def _generate_ssrc(self):
return random.randint(1, 2**32 - 1)
def create_header(self, is_first_packet=False, custom_timestamp=None):
marker = 1 if is_first_packet else 0
if custom_timestamp is not None:
timestamp = custom_timestamp
else:
timestamp = (self.timestamp_base + (self.sequence_number * self.samples_per_frame)) % (2**32)
first_word = (
(self.version << 30) |
(self.padding << 29) |
(self.extension << 28) |
(self.csrc_count << 24) |
(marker << 23) |
(self.payload_type << 16) |
self.sequence_number
)
header = struct.pack('!I I I',
first_word,
timestamp,
self.ssrc)
self.sequence_number = (self.sequence_number + 1) % 65535
return header
def parse_header(self, header_bytes):
if len(header_bytes) < self.HEADER_SIZE:
raise ValueError(f"RTP Header too short: {len(header_bytes)} bytes")
first_word, timestamp, ssrc = struct.unpack('!I I I', header_bytes[:12])
version = (first_word >> 30) & 0x3
padding = (first_word >> 29) & 0x1
extension = (first_word >> 28) & 0x1
csrc_count = (first_word >>24) & 0xF
marker = (first_word >> 23) & 0x1
payload_type = (first_word >> 16) & 0x7F
sequence_number = first_word & 0xFFFF
return {
'version': version,
'padding': padding,
'extension': extension,
'csrc_count': csrc_count,
'marker': marker,
'payload_type': payload_type,
'sequence_number': sequence_number,
'timestamp': timestamp,
'ssrc': ssrc,
'header_size': self.HEADER_SIZE + (csrc_count * 4)
}
def get_stats(self):
return {
'ssrc': self.ssrc,
'current_sequence': self.sequence_number,
'payload_type': self.payload_type,
'samples_per_frame': self.samples_per_frame
}
class RTPAudioFrameBuilder:
"""
Combines RTP headers with Opus payloads for Opulent Voice transmission.
"""
def __init__(self, station_identifier, payload_type=RTPHeader.PT_OPUS):
self.station_id = station_identifier
ssrc = hash(str(station_identifier)) % (2**32)
if ssrc == 0:
ssrc = 1
self.rtp_header = RTPHeader(payload_type = payload_type, ssrc = ssrc)
self.is_talk_spurt_start = True
self.expected_opus_size = RTPHeader.OPULENT_VOICE_OPUS_PAYLOAD_SIZE
def create_rtp_audio_frame(self, opus_packet, is_start_of_transmission = False):
# Validate that we have 80 bytes
if len(opus_packet) != self.expected_opus_size:
raise ValueError(
f"Opulent Voice Protocol violation: OPUS packet must be "
f"{self.expected_opus_size} bytes, but we got {len(opus_packet)} bytes."
)
marker = is_start_of_transmission or self.is_talk_spurt_start
self.is_talk_spurt_start = False
rtp_header = self.rtp_header.create_header(is_first_packet = marker)
rtp_frame = rtp_header + opus_packet
expected_total = RTPHeader.HEADER_SIZE + self.expected_opus_size
if len(rtp_frame) != expected_total:
raise RuntimeError(
f"RTP frame size error: expected {expected_total} bytes, "
f"created {len(rtp_frame)} bytes"
)
return rtp_frame
def validate_opus_packet(self, opus_packet):
return len(opus_packet) == self.expected_opus_size
def start_new_talk_spurt(self):
self.is_talk_spurt_start = True
def end_talk_spurt(self):
pass
def get_rtp_stats(self):
stats = self.rtp_header.get_stats()
stats.update({
'frame_duration_ms': RTPHeader.OPULENT_VOICE_FRAME_DURATION_MS,
'opus_payload_size': self.expected_opus_size,
'expected_frame_rate': 1000 / RTPHeader.OPULENT_VOICE_FRAME_DURATION_MS,
'total_rtp_frame_size': RTPHeader.HEADER_SIZE + self.expected_opus_size
})
return stats
class UDPHeader:
"""
UDP Header implementation following RFC 768
UDP Header Format (8 bytes):
0 7 8 15 16 23 24 31
+--------+--------+--------+--------+
| Source | Destination |
| Port | Port |
+--------+--------+--------+--------+
| | |
| Length | Checksum |
+--------+--------+--------+--------+
"""
HEADER_SIZE = 8
def __init__(self, source_port=None, dest_port=57372):
"""
Initialize UDP header builder
source_port: Source port (auto-assigned if None)
dest_port: Destination port
"""
self.source_port = source_port or self._get_ephemeral_port()
self.dest_port = dest_port
def _get_ephemeral_port(self):
"""Get an ephemeral port number (49152-65535 range)"""
return random.randint(49152, 65535)
def create_header(self, payload_data, calculate_checksum=True, source_ip=None, dest_ip=None):
"""
Create UDP header for given payload
payload_data: The data to be wrapped in UDP
calculate_checksum: Whether to calculate checksum (can be disabled for speed)
source_ip: Source IP address
dest_ip: Destination IP address
return: 8-byte UDP header
"""
# UDP length includes header + payload
udp_length = self.HEADER_SIZE + len(payload_data)
if udp_length > 65535:
raise ValueError(f"UDP packet way too big: {udp_length} bytes")
# Calculate checksum if requested
if calculate_checksum and source_ip and dest_ip:
checksum = self._calculate_checksum(payload_data, udp_length, source_ip, dest_ip)
elif calculate_checksum:
checksum = self._simple_checksum(payload_data, udp_length) #fallback
else:
checksum = 0 # Checksum optional in IPv4
try:
# Pack UDP header
header = struct.pack('!HHHH',
self.source_port,
self.dest_port,
udp_length,
checksum)
return header
except:
print(f"✗ Struct error when trying to pack UDP Header.")
return None
def _calculate_checksum(self, payload_data, udp_length, source_ip, dest_ip):
"""
Calculate UDP checksum with proper pseudo-header (RFC 768)
payload_data: UDP payload
udp_length: UDP header + payload length
source_ip: Source IP address (string format)
dest_ip: Destination IP address (string format)
return: 16-bit checksum
"""
# Convert IP addresses to network byte order integers using socket.inet_aton
try:
source_addr = struct.unpack("!I", socket.inet_aton(source_ip))[0]
dest_addr = struct.unpack("!I", socket.inet_aton(dest_ip))[0]
except socket.error:
# Fallback to simple checksum if IP conversion fails
return self._simple_checksum(payload_data, udp_length)
# Create proper 12-byte UDP pseudo-header per RFC 768
# Format: Source IP (4) + Dest IP (4) + Zero (1) + Protocol (1) + UDP Length (2)
pseudo_header = struct.pack('!IIBBH',
source_addr, # Source IP (4 bytes)
dest_addr, # Dest IP (4 bytes)
0, # Zero byte (1 byte)
17, # Protocol = UDP (1 byte)
udp_length # UDP Length (2 bytes)
)
# Create UDP header with zero checksum for calculation
udp_header = struct.pack('!HHHH',
self.source_port,
self.dest_port,
udp_length,
0 # Zero checksum for calculation
)
# Combine pseudo-header + UDP header + payload
checksum_data = pseudo_header + udp_header + payload_data
# Pad to even length if necessary
if len(checksum_data) % 2:
checksum_data += b'\x00'
# Calculate 16-bit ones complement checksum
checksum = 0
for i in range(0, len(checksum_data), 2):
word = (checksum_data[i] << 8) + checksum_data[i + 1]
checksum += word
# Handle carries immediately to prevent overflow
while checksum > 0xFFFF:
checksum = (checksum & 0xFFFF) + (checksum >> 16)
# Take one's complement
checksum = (~checksum) & 0xFFFF
# UDP checksum of 0 is invalid, use 0xFFFF instead
if checksum == 0:
checksum = 0xFFFF
return checksum
def _simple_checksum(self, payload_data, udp_length):
"""
Simplified checksum when no IP addresses are available
Note: This is not RFC-compliant but better than nothing
"""
# Create simplified pseudo UDP packet for checksum calculation
pseudo_header = struct.pack('!HHHH',
self.source_port,
self.dest_port,
udp_length,
0 # Checksum field zero for calculation
)
# Combine header and payload for checksum
checksum_data = pseudo_header + payload_data
# Pad to even length
if len(checksum_data) % 2:
checksum_data += b'\x00'
# Calculate 16-bit checksum
checksum = 0
for i in range(0, len(checksum_data), 2):
word = (checksum_data[i] << 8) + checksum_data[i + 1]
checksum += word
while checksum > 0xFFFF:
checksum = (checksum & 0xFFFF) + (checksum >> 16)