From de17f877a9067269d353e1dc64d35fd0c621c29d Mon Sep 17 00:00:00 2001 From: Noah Treuhaft Date: Fri, 15 May 2026 14:08:52 -0400 Subject: [PATCH] disable parallelization of switch containing aggregate The optimizer parallelizes the following program, causing incorrect output. $ echo '1 2 3' | GOMAXPROCS=2 super \ -vam -c 'switch default ( count() ) | sort count' - 0 3 Fix ths by disabling parallelization of switch operators when any path contains an aggregate operator. --- compiler/optimizer/parallelize.go | 18 +++++++++++++++++- compiler/ztests/par-switch.yaml | 16 ++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) create mode 100644 compiler/ztests/par-switch.yaml diff --git a/compiler/optimizer/parallelize.go b/compiler/optimizer/parallelize.go index 866dca8dda..a7c3507cea 100644 --- a/compiler/optimizer/parallelize.go +++ b/compiler/optimizer/parallelize.go @@ -1,6 +1,8 @@ package optimizer import ( + "reflect" + "github.com/brimdata/super/compiler/dag" "github.com/brimdata/super/order" ) @@ -299,7 +301,12 @@ func (o *Optimizer) concurrentPath(seq dag.Seq, sortKeys order.SortKeys) (length // upstream sort is the same as the Load destination sort we // request a merge and set the Load operator to do a sorted write. return k, nil, false, nil - case *dag.ForkOp, *dag.ScatterOp, *dag.HeadOp, *dag.TailOp, *dag.UniqOp, *dag.FuseOp, + case *dag.SwitchOp: + if hasAggregate(op) { + return 0, nil, false, nil + } + return k, sortExprsForSortKeys(sortKeys), true, nil + case *dag.ForkOp, *dag.HeadOp, *dag.ScatterOp, *dag.TailOp, *dag.UniqOp, *dag.FuseOp, *dag.HashJoinOp, *dag.InferOp, *dag.JoinOp, *dag.OutputOp: return k, sortExprsForSortKeys(sortKeys), true, nil default: @@ -316,6 +323,15 @@ func (o *Optimizer) concurrentPath(seq dag.Seq, sortKeys order.SortKeys) (length return len(seq), sortExprsForSortKeys(sortKeys), true, nil } +func hasAggregate(op dag.Op) bool { + var found bool + walkT(reflect.ValueOf(op), func(a *dag.AggregateOp) *dag.AggregateOp { + found = true + return a + }) + return found +} + func sortExprsForSortKeys(keys order.SortKeys) []dag.SortExpr { var exprs []dag.SortExpr for _, k := range keys { diff --git a/compiler/ztests/par-switch.yaml b/compiler/ztests/par-switch.yaml new file mode 100644 index 0000000000..9952d70a15 --- /dev/null +++ b/compiler/ztests/par-switch.yaml @@ -0,0 +1,16 @@ +# A switch operator containing an aggregate operator cannot be parallelized. +script: | + super compile -vam -C -P 2 'from /dev/null | switch default ( aggregate count() ) | sort count' + +outputs: + - name: stdout + data: | + file /dev/null unordered + | switch + case true ( + aggregate + count:=count() + | values count + ) + | sort count asc nulls last + | output main