22from __future__ import annotations
33from io import BytesIO
44from math import ceil
5- from typing import Any , Union , IO
5+ from typing import Dict , Any , List , Tuple , IO
66# Third party modules
77# Local modules
88# Program
@@ -41,6 +41,30 @@ class MaxOPProtection(FSPackerError): pass
4141
4242class OutOfData (FSPackerError ): pass
4343
44+ def packMessage (message :bytes ) -> bytes :
45+ d :int = len (message )
46+ if d < 0xFD :
47+ return d .to_bytes (1 , VAR_ENDIANNESS ) + message
48+ elif d <= 0xFFFF :
49+ return b"\xFD " + d .to_bytes (2 , VAR_ENDIANNESS ) + message
50+ elif d <= 0xFFFFFF :
51+ return b"\xFE " + d .to_bytes (3 , VAR_ENDIANNESS ) + message
52+ elif d <= 0xFFFFFFFF :
53+ return b"\xFF " + d .to_bytes (4 , VAR_ENDIANNESS ) + message
54+ raise RuntimeError ("Too big data to pack" )
55+
56+ def unpackMessage (buffer :bytes ) -> Tuple [int , int ]:
57+ d :int = buffer [0 ]
58+ if d < 0xFD :
59+ return 1 , d
60+ elif d == 0xFD :
61+ return 3 , int .from_bytes (buffer [1 :3 ], VAR_ENDIANNESS )
62+ elif d == 0xFE :
63+ return 4 , int .from_bytes (buffer [1 :4 ], VAR_ENDIANNESS )
64+ elif d == 0xFF :
65+ return 5 , int .from_bytes (buffer [1 :5 ], VAR_ENDIANNESS )
66+ return 0 , 0
67+
4468def _create_vint (d :int ) -> bytes :
4569 if d < 0xEC :
4670 return d .to_bytes (1 , VAR_ENDIANNESS )
@@ -52,18 +76,18 @@ def _create_vint(d:int) -> bytes:
5276 return b"\xEE " + d .to_bytes (8 , VAR_ENDIANNESS )
5377
5478class FSPacker :
79+ def __init__ (self ) -> None :
80+ self .dictCounter :int = 0
81+ self .dictByKey :Dict [Any , int ] = {}
82+ self .dictBuffer :BytesIO = BytesIO ()
83+ self .opBuffer :BytesIO = BytesIO ()
5584 @classmethod
5685 def dump (self , data :Any , fp :IO [bytes ]) -> None :
5786 fp .write ( self ()._parse (data ) )
5887 return
5988 @classmethod
6089 def dumps (self , data :Any ) -> bytes :
6190 return self ()._parse (data )
62- def __init__ (self ):
63- self .dictCounter :int = 0
64- self .dictByKey :dict = {}
65- self .dictBuffer :BytesIO = BytesIO ()
66- self .opBuffer :BytesIO = BytesIO ()
6791 def _parse (self , data :Any ) -> bytes :
6892 self ._dump (data )
6993 return VAR_VERSION + _create_vint (len (self .dictByKey )) + self .dictBuffer .getbuffer () + self .opBuffer .getbuffer ()
@@ -149,15 +173,15 @@ def _register(self, k:Any) -> int:
149173 self .dictCounter += 1
150174 return self .dictByKey [k ]
151175
152- class FSUnPacker :
176+ class FSUnpacker :
153177 @classmethod
154178 def load (self , data :IO [bytes ], maxDictSize :int = 0 , maxOPSize :int = 0 ) -> Any :
155179 return self (BytesIO (data .read ()), maxDictSize , maxOPSize )._parse ()
156180 @classmethod
157181 def loads (self , data :bytes , maxDictSize :int = 0 , maxOPSize :int = 0 ) -> Any :
158182 return self (BytesIO (data ), maxDictSize , maxOPSize )._parse ()
159183 def __init__ (self , buffer :BytesIO , maxDictSize :int = 0 , maxOPSize :int = 0 ):
160- self .dict :list = []
184+ self .dict :List [ Any ] = []
161185 self .buffer :BytesIO = buffer
162186 self .maxDictSize :int = maxDictSize
163187 self .maxOPSize :int = maxOPSize
@@ -246,5 +270,5 @@ def _loads(self) -> Any:
246270
247271dump = FSPacker .dump
248272dumps = FSPacker .dumps
249- load = FSUnPacker .load
250- loads = FSUnPacker .loads
273+ load = FSUnpacker .load
274+ loads = FSUnpacker .loads
0 commit comments