Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 140 additions & 13 deletions vector/vbuild/bytes.go
Original file line number Diff line number Diff line change
@@ -1,44 +1,171 @@
package vbuild

import (
"bytes"
"math"
"slices"

"github.com/brimdata/super"
"github.com/brimdata/super/vector"
)

type bytesBuilder struct {
typ super.Type
table vector.BytesTable
writer genericWriter
}

func newBytesBuilder(typ super.Type) Builder {
return &bytesBuilder{typ: typ, table: vector.NewBytesTableEmpty(0)}
return &bytesBuilder{
writer: &bytesConstWriter{typ: typ},
}
}

func (s *bytesBuilder) Write(vec vector.Any) {
s.writer = s.writer.Write(vec)
}

func (s *bytesBuilder) Build(*super.Context) vector.Any {
return s.writer.Build(nil)
}

type bytesConstWriter struct {
typ super.Type
val []byte
len uint32
}

func (b *bytesConstWriter) Write(vec vector.Any) genericWriter {
if c, ok := vec.(*vector.Const); ok {
v := bytesTableOf(c.Any).Bytes(0)
if b.val == nil {
b.val = slices.Clone(v)
}
if bytes.Equal(b.val, v) {
b.len += c.Len()
return b
}
}
writer := genericWriter(&bytesDictWriter{
typ: b.typ,
dict: make(map[string]byte),
})
if b.len > 0 {
writer = writer.Write(b.Build(nil))
}
return writer.Write(vec)
}

func (b *bytesConstWriter) Build(*super.Context) vector.Any {
table := vector.NewBytesTableEmpty(0)
table.Append(b.val)
return vector.NewConst(newBytesOrStringVector(b.typ, table), b.len)
}

type bytesDictWriter struct {
typ super.Type
dict map[string]byte
counts []uint32
index []byte
}

func (b *bytesDictWriter) Write(vec vector.Any) genericWriter {
switch vec := vec.(type) {
case *vector.Const:
t := bytesTableOf(vec.Any)
slot, ok := b.writeEntry(t.Bytes(0), vec.Len())
if ok {
b.index = slices.Grow(b.index, int(vec.Len()))
for range vec.Len() {
b.index = append(b.index, slot)
}
return b
}
case *vector.Dict:
t := bytesTableOf(vec.Any)
remap := make([]byte, t.Len())
var ok bool
for i := range t.Len() {
if remap[i], ok = b.writeEntry(t.Bytes(i), vec.Counts[i]); !ok {
break
}
}
if ok {
for _, idx := range vec.Index {
b.index = append(b.index, remap[idx])
}
return b
}
}
writer := genericWriter(&bytesFlatWriter{
typ: b.typ,
table: vector.NewBytesTableEmpty(0),
})
if len(b.index) > 0 {
writer = writer.Write(b.Build(nil))
}
return writer.Write(vec)
}

func (b *bytesDictWriter) writeEntry(val []byte, count uint32) (byte, bool) {
slot, ok := b.dict[string(val)]
if !ok {
if len(b.counts) > math.MaxUint8 {
Comment thread
mattnibs marked this conversation as resolved.
return 0, false
}
slot = byte(len(b.counts))
b.dict[string(val)] = slot
b.counts = append(b.counts, 0)
}
b.counts[slot] += count
return slot, true
}

func (b *bytesDictWriter) Build(*super.Context) vector.Any {
vals := make([][]byte, len(b.counts))
for s, idx := range b.dict {
vals[idx] = []byte(s)
}
table := vector.NewBytesTableEmpty(0)
for _, s := range vals {
table.Append(s)
}
return vector.NewDict(newBytesOrStringVector(b.typ, table), b.index, b.counts)
}

type bytesFlatWriter struct {
typ super.Type
table vector.BytesTable
}

func (b *bytesFlatWriter) Write(vec vector.Any) genericWriter {
switch vec := vec.(type) {
case *vector.View:
table := bytesTableOf(vec.Any)
for _, slot := range vec.Index {
s.table.Append(table.Bytes(slot))
b.table.Append(table.Bytes(slot))
}
case *vector.Const:
b := bytesTableOf(vec.Any).Bytes(0)
bytes := bytesTableOf(vec.Any).Bytes(0)
for range vec.Len() {
s.table.Append(b)
b.table.Append(bytes)
}
case *vector.Dict:
table := bytesTableOf(vec.Any)
for _, slot := range vec.Index {
s.table.Append(table.Bytes(uint32(slot)))
b.table.Append(table.Bytes(uint32(slot)))
}
case *vector.String, *vector.Bytes:
table := bytesTableOf(vec)
for i := range vec.Len() {
s.table.Append(table.Bytes(i))
b.table.Append(table.Bytes(i))
}
default:
panic(vec)
}
return b
}

func (b *bytesFlatWriter) Build(*super.Context) vector.Any {
return newBytesOrStringVector(b.typ, b.table)
}

func bytesTableOf(vec vector.Any) vector.BytesTable {
Expand All @@ -54,13 +181,13 @@ func bytesTableOf(vec vector.Any) vector.BytesTable {
}
}

func (s *bytesBuilder) Build(*super.Context) vector.Any {
switch s.typ.ID() {
func newBytesOrStringVector(typ super.Type, table vector.BytesTable) vector.Any {
switch typ.ID() {
case super.IDString:
return vector.NewString(s.table)
return vector.NewString(table)
case super.IDBytes:
return vector.NewBytes(s.table)
return vector.NewBytes(table)
default:
panic(s.typ)
panic(typ)
}
}
Loading