11# -*- coding: UTF-8 -*-
2+ import numpy as np
23from math import log2
4+ from typing import Optional
35
46
57__all__ = ["ensure_str" , "human_readable_size" , "ngrams_counts" , "ngrams_distribution" , "shannon_entropy" ]
68
79shannon_entropy = lambda b : - sum ([p * log2 (p ) for p in [float (ctr )/ len (b ) for ctr in [b .count (c ) for c in set (b )]]]) or 0.
810
911
10- def ensure_str (s , encoding = ' utf-8' , errors = ' strict' ) :
12+ def ensure_str (s : str | bytes , encoding : str = " utf-8" , errors : str = " strict" ) -> str :
1113 """ Ensure that an input string is decoded. """
1214 if isinstance (s , bytes ):
1315 try :
@@ -19,7 +21,7 @@ def ensure_str(s, encoding='utf-8', errors='strict'):
1921 return s
2022
2123
22- def human_readable_size (size , precision = 0 ) :
24+ def human_readable_size (size : int , precision : int = 0 ) -> str :
2325 """ Display bytes' size in a human-readable format given a precision. """
2426 i , units = 0 , ["B" , "KB" , "MB" , "GB" , "TB" , "PB" , "EB" , "ZB" , "YB" ]
2527 while size >= 1024 and i < len (units )- 1 :
@@ -28,27 +30,45 @@ def human_readable_size(size, precision=0):
2830 return "%.*f%s" % (precision , size , units [i ])
2931
3032
31- def ngrams_counts (byte_obj , n = 1 , step = 1 ) :
33+ def ngrams_counts (byte_obj : bytes | object , n : int = 1 , step : int = 1 ) -> dict [ bytes , int ] :
3234 """ Output the Counter instance for an input byte sequence or byte object based on n-grams.
3335 If the input is a byte object, cache the result.
3436
3537 :param byte_obj: byte sequence ('bytes') or byte object with "bytes" and "size" attributes (i.e. pathlib2.Path)
3638 :param n: n determining the size of n-grams, defaults to 1
3739 :param step: step for sliding the n-grams
40+ :param start: number of bytes to start from
3841 """
39- from collections import Counter
40- if isinstance (byte_obj , (str , bytes )):
41- return Counter (byte_obj [i :i + n ] for i in range (0 , len (byte_obj )- n + 1 , step ))
42- elif hasattr (byte_obj , "bytes" ) and hasattr (byte_obj , "size" ):
43- if not hasattr (byte_obj , "_ngram_counts_cache" ):
44- byte_obj ._ngram_counts_cache = {}
45- if n not in byte_obj ._ngram_counts_cache .keys ():
46- byte_obj ._ngram_counts_cache [n ] = Counter (byte_obj .bytes [i :i + n ] for i in range (0 , byte_obj .size - n + 1 , step ))
47- return byte_obj ._ngram_counts_cache [n ]
42+ if n not in (1 , 2 , 3 ):
43+ raise ValueError ("n must be 1, 2, or 3" )
44+ if step <= 0 :
45+ raise ValueError ("step must be positive" )
46+ if isinstance (byte_obj , bytes ) or hasattr (byte_obj , "bytes" ):
47+ a = np .frombuffer (data := byte_obj if isinstance (byte_obj , bytes ) else byte_obj .bytes , dtype = np .uint8 )
48+ l = a .size
49+ if l < n :
50+ return {}
51+ if n == 1 :
52+ counts = {b .to_bytes (1 , "big" ): int (c ) for b , c in \
53+ enumerate (np .bincount (np .frombuffer (data , dtype = np .uint8 )))}
54+ else :
55+ end = (m := (l - n ) // step + 1 ) * step
56+ grams = np .stack ((a [0 :end :step ], a [1 :1 + end :step ]), axis = 1 ) if n == 2 else \
57+ np .stack ((a [0 :end :step ], a [1 :1 + end :step ], a [2 :2 + end :step ]), axis = 1 )
58+ counts = {bytes (row ): int (c ) for row , c in zip (* np .unique (grams , axis = 0 , return_counts = True ))}
59+ if isinstance (byte_obj , bytes ):
60+ return counts
61+ elif hasattr (byte_obj , "bytes" ):
62+ if not hasattr (byte_obj , "_ngram_counts_cache" ):
63+ byte_obj ._ngram_counts_cache = {}
64+ if n not in byte_obj ._ngram_counts_cache .keys ():
65+ byte_obj ._ngram_counts_cache [n ] = counts
66+ return byte_obj ._ngram_counts_cache [n ]
4867 raise TypeError ("Bad input type ; should be a byte sequence or object" )
4968
5069
51- def ngrams_distribution (byte_obj , n = 1 , step = 1 , n_most_common = None , n_exclude_top = 0 , exclude = None ):
70+ def ngrams_distribution (byte_obj : bytes | object , n : int = 1 , step : int = 1 , n_most_common : Optional [int ] = None ,
71+ n_exclude_top : int = 0 , exclude : Optional [list ] = None ) -> list [tuple [bytes , int ]]:
5272 """ Compute the n-grams distribution of an input byte sequence or byte object given exclusions.
5373
5474 :param byte_obj: byte sequence ('bytes') or byte object with "bytes" and "size" attributes (i.e. pathlib2.Path)
@@ -60,7 +80,8 @@ def ngrams_distribution(byte_obj, n=1, step=1, n_most_common=None, n_exclude_top
6080 :return: list of n_most_common (n-gram, count) pairs
6181 """
6282 c = ngrams_counts (byte_obj , n , step )
63- r = c .most_common (len (c ) if n_most_common is None else n_most_common + n_exclude_top + len (exclude or []))
83+ n = len (c ) if n_most_common is None else n_most_common + n_exclude_top + len (exclude or [])
84+ r = sorted (c .items (), key = lambda p : p [1 ], reverse = True )[:n ]
6485 if exclude is not None :
6586 r = [(ngram , count ) for ngram , count in r if ngram not in exclude ]
6687 return r [n_exclude_top :n_exclude_top + (n_most_common or len (c ))]
0 commit comments