@@ -2,9 +2,11 @@ package lids
22
33import (
44 "encoding/binary"
5+ "errors"
6+ "math"
57 "unsafe"
68
7- "github.com/ozontech/seq-db/consts "
9+ "github.com/ozontech/seq-db/config "
810 "github.com/ozontech/seq-db/packer"
911)
1012
@@ -23,40 +25,18 @@ func (b *Block) getLIDs(i int) []uint32 {
2325 return b .LIDs [b .Offsets [i ]:b .Offsets [i + 1 ]]
2426}
2527
26- func (b * Block ) Pack (dst []byte ) []byte {
28+ func (b * Block ) Pack (dst []byte , tmp [] uint32 ) []byte {
2729 // TODO store next flags into a single byte
2830 // write b.IsLastLID as a dedicated uint32 in the header of block
29- switch b .IsLastLID {
30- case true :
31+ if b .IsLastLID {
3132 dst = binary .LittleEndian .AppendUint32 (dst , 1 )
32- case false :
33+ } else {
3334 dst = binary .LittleEndian .AppendUint32 (dst , 0 )
3435 }
3536
36- fullBlock := len (b .LIDs ) == consts .LIDBlockCap
37- switch fullBlock {
38- case true :
39- dst = binary .LittleEndian .AppendUint32 (dst , 1 )
40- case false :
41- dst = binary .LittleEndian .AppendUint32 (dst , 0 )
42- }
37+ dst = packer .CompressDeltaBitpackUint32 (dst , b .Offsets , tmp )
38+ dst = packer .CompressDeltaBitpackUint32 (dst , b .LIDs , tmp )
4339
44- if len (b .LIDs ) == consts .LIDBlockCap {
45- offsetPacker := packer .NewBitpacker (dst , 128 )
46- offsetPacker .Append (b .Offsets )
47- dst = offsetPacker .Close ()
48- lidPacker := packer .NewBitpacker (dst , 128 )
49- dst = lidPacker .Append4kBlock (b .LIDs )
50- } else {
51- lidPacker := packer .NewBitpacker (dst , 128 )
52- sep := []uint32 {0 }
53- last := b .getCount () - 1
54- for i := 0 ; i <= last ; i ++ {
55- lidPacker .Append (b .getLIDs (i ))
56- lidPacker .Append (sep )
57- }
58- dst = lidPacker .Close ()
59- }
6040 return dst
6141}
6242
@@ -68,63 +48,75 @@ func (b *Block) GetSizeBytes() int {
6848 return blockSize + uint32Size * cap (b .LIDs ) + uint32Size * cap (b .Offsets )
6949}
7050
71- // TODO add support of the previous versions
72- func (b * Block ) Unpack (data []byte , buf * UnpackBuffer ) error {
73- unpacker := packer .NewBytesUnpacker (data )
51+ func (b * Block ) Unpack (data []byte , fracVer config.BinaryDataVersion , buf * UnpackBuffer ) error {
7452 buf .Reset ()
7553
54+ if fracVer >= config .BinaryDataV3 {
55+ return b .unpackBitpack (data , buf )
56+ }
57+
58+ return b .unpackVarint (data , buf )
59+ }
60+
61+ func (b * Block ) unpackBitpack (data []byte , buf * UnpackBuffer ) error {
7662 // read IsLastLID from a dedicated uint32
77- isLastLIDValue := unpacker .GetUint32 ()
78- switch isLastLIDValue {
79- case 1 :
80- b .IsLastLID = true
81- case 0 :
82- b .IsLastLID = false
63+ if len (data ) < 4 {
64+ return errors .New ("lids block decode error: truncated IsLastLID header" )
8365 }
66+ isLastLIDValue := binary .LittleEndian .Uint32 (data [:4 ])
67+ b .IsLastLID = isLastLIDValue == 1
68+ data = data [4 :]
69+
70+ var err error
71+ var values []uint32
8472
85- fullBlock := unpacker .GetUint32 ()
86- switch fullBlock {
87- case 1 :
88- // block has exactly consts.LIDBlockCap LIDs
89- decompressedChunk := buf .decompressed
90- compressedChunk := buf .compressed
91- offsetUnpacker := packer .NewBitpackUnpacker (unpacker , decompressedChunk , compressedChunk )
92- for {
93- offsetChunk , ok := offsetUnpacker .NextChunk ()
94- if ! ok {
95- break
96- }
97- b .Offsets = append (b .Offsets , offsetChunk ... )
73+ data , values , err = packer .DecompressDeltaBitpackUint32 (data , buf .compressed , buf .decompressed )
74+ if err != nil {
75+ return err
76+ }
77+ b .Offsets = append ([]uint32 {}, values ... )
78+
79+ data , values , err = packer .DecompressDeltaBitpackUint32 (data , buf .compressed , buf .decompressed )
80+ if err != nil {
81+ return err
82+ }
83+ b .LIDs = append ([]uint32 {}, values ... )
84+ return nil
85+ }
86+
87+ func (b * Block ) unpackVarint (data []byte , buf * UnpackBuffer ) error {
88+ var lid , offset uint32
89+
90+ b .IsLastLID = true
91+
92+ buf .offsets = append (buf .offsets , 0 ) // first offset is always zero
93+
94+ unpacker := packer .NewBytesUnpacker (data )
95+ for unpacker .Len () > 0 {
96+ delta , err := unpacker .GetVarint ()
97+ if err != nil {
98+ return err
9899 }
100+ lid += uint32 (delta )
99101
100- lidUnpacker := packer .NewBitpackUnpacker (unpacker , decompressedChunk , compressedChunk )
101- b .LIDs = lidUnpacker .AllocateAndRead4kChunk ()
102- case 0 :
103- decompressedChunk := buf .decompressed
104- compressedChunk := buf .compressed
105- buf .offsets = append (buf .offsets , 0 )
106-
107- bitpackUnpacker := packer .NewBitpackUnpacker (unpacker , decompressedChunk , compressedChunk )
108- pos := 0
109- for {
110- chunk , ok := bitpackUnpacker .NextChunk ()
111- if ! ok {
112- break
113- }
114-
115- for _ , lid := range chunk {
116- if pos > 0 && lid == 0 {
117- b .LIDs = append (b .LIDs , buf .lids ... )
118- buf .lids = buf .lids [:0 ]
119- buf .offsets = append (buf .offsets , uint32 (pos ))
120- } else {
121- buf .lids = append (buf .lids , lid )
122- pos ++
123- }
124- }
102+ if lid == math .MaxUint32 { // end of LIDs of current TID, see `Block.Pack()` method
103+ offset = uint32 (len (buf .lids ))
104+ buf .offsets = append (buf .offsets , offset )
105+ lid -= uint32 (delta )
106+ continue
125107 }
126108
127- b . Offsets = append ([] uint32 {}, buf .offsets ... )
109+ buf . lids = append (buf .lids , lid )
128110 }
111+
112+ if int (offset ) < len (buf .lids ) {
113+ b .IsLastLID = false
114+ buf .offsets = append (buf .offsets , uint32 (len (buf .lids )))
115+ }
116+
117+ // copy from buffer
118+ b .LIDs = append ([]uint32 {}, buf .lids ... )
119+ b .Offsets = append ([]uint32 {}, buf .offsets ... )
120+
129121 return nil
130122}
0 commit comments