Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions compiler/semantic/ztests/const-from-err.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ spq: |
values x

error: |
file does not exist at line 1, column 41:
const x = [values {x:1},2 | put y[(from noa.json | bar+1)]:=1]
~~~~~~~~
cannot read data in constant expression at line 1, column 41:
const x = [values {x:1},2 | put y[(from noa.json | bar+1)]:=1]
~~~~~~~~
no such field "bar" at line 1, column 52:
const x = [values {x:1},2 | put y[(from noa.json | bar+1)]:=1]
~~~
60 changes: 60 additions & 0 deletions pkg/storage/internal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package storage

import (
"context"
"io"
"io/fs"
)

type InternalEngine struct {
files map[string]io.Reader
}

func NewInternalEngine() *InternalEngine {
return &InternalEngine{map[string]io.Reader{}}
}

func (i *InternalEngine) AddReader(uri string, r io.Reader) {
i.files[uri] = r
}

func (i *InternalEngine) Get(_ context.Context, u *URI) (Reader, error) {
v, ok := i.files[u.String()]
if !ok {
return nil, fs.ErrNotExist
}
return &notSupportedReaderAt{io.NopCloser(v)}, nil
}

func (*InternalEngine) Put(context.Context, *URI) (io.WriteCloser, error) {
return nil, ErrNotSupported
}

func (*InternalEngine) PutIfNotExists(context.Context, *URI, []byte) error {
return ErrNotSupported

}
func (*InternalEngine) Delete(context.Context, *URI) error {
return ErrNotSupported
}

func (*InternalEngine) DeleteByPrefix(context.Context, *URI) error {
return ErrNotSupported
}

func (i *InternalEngine) Exists(_ context.Context, u *URI) (bool, error) {
_, ok := i.files[u.String()]
return ok, nil
}

func (i *InternalEngine) Size(_ context.Context, u *URI) (int64, error) {
_, ok := i.files[u.String()]
if !ok {
return 0, fs.ErrNotExist
}
return 0, nil
}

func (*InternalEngine) List(context.Context, *URI) ([]Info, error) {
return nil, ErrNotSupported
}
2 changes: 1 addition & 1 deletion sio/fjsonio/ztests/parser-error.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ input: |
{"x":1}

error: |
line 2: invalid JSON value
stdio:stdin: line 2: invalid JSON value

---

Expand Down
44 changes: 17 additions & 27 deletions ztest/ztest.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ import (
"github.com/brimdata/super/cli/outputflags"
"github.com/brimdata/super/compiler"
"github.com/brimdata/super/compiler/parser"
"github.com/brimdata/super/pkg/storage"
"github.com/brimdata/super/runtime"
"github.com/brimdata/super/runtime/exec"
"github.com/brimdata/super/sio"
Expand Down Expand Up @@ -488,30 +489,33 @@ func runInternal(ctx context.Context, query string, input *string, outputFlags,
if err != nil {
return "", err
}
sctx := super.NewContext()
var readers []sio.Reader
if input != nil {
zrc, err := newInputReader(sctx, *input, inputFlags)
if err != nil {
return "", err
}
defer zrc.Close()
readers = []sio.Reader{zrc}
}
var fs flag.FlagSet
var inflags inputflags.Flags
var outflags outputflags.Flags
inflags.SetFlags(&fs, true)
outflags.SetFlags(&fs)
if err := fs.Parse(outputFlags); err != nil {
if err := fs.Parse(append(inputFlags, outputFlags...)); err != nil {
return "", err
}
if err := inflags.Init(); err != nil {
return "", err
}
if err := outflags.Init(); err != nil {
return "", err
}
env := exec.NewEnvironment(nil, nil)
eng := storage.NewInternalEngine()
if input != nil {
ast.PrependFileScan([]string{"stdio:stdin"})
eng.AddReader("stdio:stdin", strings.NewReader(*input))
}
env := exec.NewEnvironment(eng, nil)
env.Dynamic = inflags.Dynamic
env.ReaderOpts = inflags.ReaderOpts
env.SampleSize = inflags.SampleSize
if vector {
env.Runtime = exec.RuntimeVAM
}
q, err := runtime.CompileQuery(ctx, sctx, compiler.NewCompilerWithEnv(env), ast, readers)
q, err := runtime.CompileQuery(ctx, super.NewContext(), compiler.NewCompilerWithEnv(env), ast, nil)
if err != nil {
return "", err
}
Expand All @@ -527,17 +531,3 @@ func runInternal(ctx context.Context, query string, input *string, outputFlags,
}
return outbuf.String(), err
}

func newInputReader(sctx *super.Context, input string, flags []string) (sio.ReadCloser, error) {
var inflags inputflags.Flags
var fs flag.FlagSet
inflags.SetFlags(&fs, true)
if err := fs.Parse(flags); err != nil {
return nil, err
}
r, err := anyio.GzipReader(strings.NewReader(input))
if err != nil {
return nil, err
}
return anyio.NewReader(sctx, r, inflags.ReaderOpts)
}
Loading