From 8c6512fcc191304a10720b69d7cdb6bd37d13144 Mon Sep 17 00:00:00 2001 From: Noah Treuhaft Date: Mon, 18 May 2026 19:46:06 -0400 Subject: [PATCH] ztest: use storage.InternalEngine for SPQ-style tests When ztest executes a SPQ-style test, it provides the test's input to the runtime by passing an sio.Reader to runtime.CompileQuery, prompting the compiler to prepend an ast.DefaultScan to the test's AST to signal to the runtime that the flowgraph should read from the sio.Reader. If the same test is executed via the super command, the input will be read via an ast.FileScan, and this different mechanism can produce differently structured vectors. To eliminate that difference, add pkg/storage.InternalEngine and use it in ztest.runInternal to allow SPQ-style tests to read their input from an ast.FileScan rather than an ast.DefaultScan. --- compiler/semantic/ztests/const-from-err.yaml | 6 ++ pkg/storage/internal.go | 60 ++++++++++++++++++++ sio/fjsonio/ztests/parser-error.yaml | 2 +- ztest/ztest.go | 44 ++++++-------- 4 files changed, 84 insertions(+), 28 deletions(-) create mode 100644 pkg/storage/internal.go diff --git a/compiler/semantic/ztests/const-from-err.yaml b/compiler/semantic/ztests/const-from-err.yaml index d930eb2a18..85d596c09d 100644 --- a/compiler/semantic/ztests/const-from-err.yaml +++ b/compiler/semantic/ztests/const-from-err.yaml @@ -3,6 +3,12 @@ spq: | values x error: | + file does not exist at line 1, column 41: + const x = [values {x:1},2 | put y[(from noa.json | bar+1)]:=1] + ~~~~~~~~ cannot read data in constant expression at line 1, column 41: const x = [values {x:1},2 | put y[(from noa.json | bar+1)]:=1] ~~~~~~~~ + no such field "bar" at line 1, column 52: + const x = [values {x:1},2 | put y[(from noa.json | bar+1)]:=1] + ~~~ diff --git a/pkg/storage/internal.go b/pkg/storage/internal.go new file mode 100644 index 0000000000..2f8bfcdf5a --- /dev/null +++ b/pkg/storage/internal.go @@ -0,0 +1,60 @@ +package storage + +import ( + "context" + "io" + "io/fs" +) + +type InternalEngine struct { + files map[string]io.Reader +} + +func NewInternalEngine() *InternalEngine { + return &InternalEngine{map[string]io.Reader{}} +} + +func (i *InternalEngine) AddReader(uri string, r io.Reader) { + i.files[uri] = r +} + +func (i *InternalEngine) Get(_ context.Context, u *URI) (Reader, error) { + v, ok := i.files[u.String()] + if !ok { + return nil, fs.ErrNotExist + } + return ¬SupportedReaderAt{io.NopCloser(v)}, nil +} + +func (*InternalEngine) Put(context.Context, *URI) (io.WriteCloser, error) { + return nil, ErrNotSupported +} + +func (*InternalEngine) PutIfNotExists(context.Context, *URI, []byte) error { + return ErrNotSupported + +} +func (*InternalEngine) Delete(context.Context, *URI) error { + return ErrNotSupported +} + +func (*InternalEngine) DeleteByPrefix(context.Context, *URI) error { + return ErrNotSupported +} + +func (i *InternalEngine) Exists(_ context.Context, u *URI) (bool, error) { + _, ok := i.files[u.String()] + return ok, nil +} + +func (i *InternalEngine) Size(_ context.Context, u *URI) (int64, error) { + _, ok := i.files[u.String()] + if !ok { + return 0, fs.ErrNotExist + } + return 0, nil +} + +func (*InternalEngine) List(context.Context, *URI) ([]Info, error) { + return nil, ErrNotSupported +} diff --git a/sio/fjsonio/ztests/parser-error.yaml b/sio/fjsonio/ztests/parser-error.yaml index 17bab10a16..e29e4db399 100644 --- a/sio/fjsonio/ztests/parser-error.yaml +++ b/sio/fjsonio/ztests/parser-error.yaml @@ -9,7 +9,7 @@ input: | {"x":1} error: | - line 2: invalid JSON value + stdio:stdin: line 2: invalid JSON value --- diff --git a/ztest/ztest.go b/ztest/ztest.go index f09fb5a5bd..0fb63cd85d 100644 --- a/ztest/ztest.go +++ b/ztest/ztest.go @@ -137,6 +137,7 @@ import ( "github.com/brimdata/super/cli/outputflags" "github.com/brimdata/super/compiler" "github.com/brimdata/super/compiler/parser" + "github.com/brimdata/super/pkg/storage" "github.com/brimdata/super/runtime" "github.com/brimdata/super/runtime/exec" "github.com/brimdata/super/sio" @@ -488,30 +489,33 @@ func runInternal(ctx context.Context, query string, input *string, outputFlags, if err != nil { return "", err } - sctx := super.NewContext() - var readers []sio.Reader - if input != nil { - zrc, err := newInputReader(sctx, *input, inputFlags) - if err != nil { - return "", err - } - defer zrc.Close() - readers = []sio.Reader{zrc} - } var fs flag.FlagSet + var inflags inputflags.Flags var outflags outputflags.Flags + inflags.SetFlags(&fs, true) outflags.SetFlags(&fs) - if err := fs.Parse(outputFlags); err != nil { + if err := fs.Parse(append(inputFlags, outputFlags...)); err != nil { + return "", err + } + if err := inflags.Init(); err != nil { return "", err } if err := outflags.Init(); err != nil { return "", err } - env := exec.NewEnvironment(nil, nil) + eng := storage.NewInternalEngine() + if input != nil { + ast.PrependFileScan([]string{"stdio:stdin"}) + eng.AddReader("stdio:stdin", strings.NewReader(*input)) + } + env := exec.NewEnvironment(eng, nil) + env.Dynamic = inflags.Dynamic + env.ReaderOpts = inflags.ReaderOpts + env.SampleSize = inflags.SampleSize if vector { env.Runtime = exec.RuntimeVAM } - q, err := runtime.CompileQuery(ctx, sctx, compiler.NewCompilerWithEnv(env), ast, readers) + q, err := runtime.CompileQuery(ctx, super.NewContext(), compiler.NewCompilerWithEnv(env), ast, nil) if err != nil { return "", err } @@ -527,17 +531,3 @@ func runInternal(ctx context.Context, query string, input *string, outputFlags, } return outbuf.String(), err } - -func newInputReader(sctx *super.Context, input string, flags []string) (sio.ReadCloser, error) { - var inflags inputflags.Flags - var fs flag.FlagSet - inflags.SetFlags(&fs, true) - if err := fs.Parse(flags); err != nil { - return nil, err - } - r, err := anyio.GzipReader(strings.NewReader(input)) - if err != nil { - return nil, err - } - return anyio.NewReader(sctx, r, inflags.ReaderOpts) -}