diff --git a/compiler/optimizer/parallelize.go b/compiler/optimizer/parallelize.go index 866dca8dd..a7c3507ce 100644 --- a/compiler/optimizer/parallelize.go +++ b/compiler/optimizer/parallelize.go @@ -1,6 +1,8 @@ package optimizer import ( + "reflect" + "github.com/brimdata/super/compiler/dag" "github.com/brimdata/super/order" ) @@ -299,7 +301,12 @@ func (o *Optimizer) concurrentPath(seq dag.Seq, sortKeys order.SortKeys) (length // upstream sort is the same as the Load destination sort we // request a merge and set the Load operator to do a sorted write. return k, nil, false, nil - case *dag.ForkOp, *dag.ScatterOp, *dag.HeadOp, *dag.TailOp, *dag.UniqOp, *dag.FuseOp, + case *dag.SwitchOp: + if hasAggregate(op) { + return 0, nil, false, nil + } + return k, sortExprsForSortKeys(sortKeys), true, nil + case *dag.ForkOp, *dag.HeadOp, *dag.ScatterOp, *dag.TailOp, *dag.UniqOp, *dag.FuseOp, *dag.HashJoinOp, *dag.InferOp, *dag.JoinOp, *dag.OutputOp: return k, sortExprsForSortKeys(sortKeys), true, nil default: @@ -316,6 +323,15 @@ func (o *Optimizer) concurrentPath(seq dag.Seq, sortKeys order.SortKeys) (length return len(seq), sortExprsForSortKeys(sortKeys), true, nil } +func hasAggregate(op dag.Op) bool { + var found bool + walkT(reflect.ValueOf(op), func(a *dag.AggregateOp) *dag.AggregateOp { + found = true + return a + }) + return found +} + func sortExprsForSortKeys(keys order.SortKeys) []dag.SortExpr { var exprs []dag.SortExpr for _, k := range keys { diff --git a/compiler/ztests/par-switch.yaml b/compiler/ztests/par-switch.yaml new file mode 100644 index 000000000..9952d70a1 --- /dev/null +++ b/compiler/ztests/par-switch.yaml @@ -0,0 +1,16 @@ +# A switch operator containing an aggregate operator cannot be parallelized. +script: | + super compile -vam -C -P 2 'from /dev/null | switch default ( aggregate count() ) | sort count' + +outputs: + - name: stdout + data: | + file /dev/null unordered + | switch + case true ( + aggregate + count:=count() + | values count + ) + | sort count asc nulls last + | output main