Skip to content

Commit 7efdf7f

Browse files
Merge pull request #27 from actionforge/disable_concurrency
Add _disable_concurrency to serialize concurrent node execution
2 parents 922f11a + d48e921 commit 7efdf7f

9 files changed

Lines changed: 312 additions & 11 deletions

.github/workflows/graphs/build-test-publish.act

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6021,6 +6021,7 @@ nodes:
60216021
x: 13320
60226022
y: 620
60236023
inputs:
6024+
_disable_concurrency: true
60246025
dependency-snapshot: 'true'
60256026
- id: core-string-fmt-v1-lychee-coconut-gray
60266027
type: core/string-fmt@v1

core/base.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,9 @@ type NodeBaseInterface interface {
169169
IsExecutionNode() bool
170170
SetExecutionNode(execNode bool)
171171

172+
DisableConcurrency() bool
173+
SetDisableConcurrency(v bool)
174+
172175
// Returns the cache type where data is stored or should be stored to
173176
// By default this depends on if this is an execution node or not.
174177
GetCacheType() CacheType
@@ -183,9 +186,10 @@ type NodeBaseComponent struct {
183186
FullPath string // Full path of the node within the graph hierarchy
184187
CacheId string // Unique identifier for the cache
185188
NodeType string // Node type of the node (e.g. core/run@v1 or github.com/actions/checkout@v3)
186-
Graph *ActionGraph
187-
Parent NodeBaseInterface
188-
isExecutionNode bool
189+
Graph *ActionGraph
190+
Parent NodeBaseInterface
191+
isExecutionNode bool
192+
disableConcurrency bool
189193
}
190194

191195
func (n *NodeBaseComponent) GetCacheType() CacheType {
@@ -204,6 +208,14 @@ func (n *NodeBaseComponent) SetExecutionNode(execNode bool) {
204208
n.isExecutionNode = execNode
205209
}
206210

211+
func (n *NodeBaseComponent) DisableConcurrency() bool {
212+
return n.disableConcurrency
213+
}
214+
215+
func (n *NodeBaseComponent) SetDisableConcurrency(v bool) {
216+
n.disableConcurrency = v
217+
}
218+
207219
func (n *NodeBaseComponent) SetId(id string) {
208220
n.Id = id
209221
n.CacheId = fmt.Sprintf("%s:%s", n.Id, uuid.New().String())

core/context.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,12 @@ type ExecutionState struct {
169169
JobConclusion string `json:"jobConclusion"`
170170

171171
DebugCallback DebugCallback `json:"-"`
172+
173+
// PendingConcurrencyLocks tracks concurrency locks that are held during
174+
// ExecuteImpl. The key is node id → *sync.Mutex. Released when the
175+
// node calls Execute to dispatch downstream node, or as a fallback when
176+
// ExecuteImpl returns without any dispatching
177+
PendingConcurrencyLocks *sync.Map `json:"-"`
172178
}
173179

174180
type ExecutionStateOptions struct {
@@ -244,8 +250,9 @@ func (c *ExecutionState) PushNewExecutionState(parentNode NodeBaseInterface) *Ex
244250
PostSteps: c.PostSteps,
245251
JobConclusion: c.JobConclusion,
246252

247-
Visited: visited,
248-
DebugCallback: c.DebugCallback,
253+
Visited: visited,
254+
DebugCallback: c.DebugCallback,
255+
PendingConcurrencyLocks: &sync.Map{},
249256
}
250257

251258
return newEc

core/executions.go

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package core
22

3-
import "github.com/actionforge/actrun-cli/utils"
3+
import (
4+
"sync"
5+
6+
"github.com/actionforge/actrun-cli/utils"
7+
)
48

59
type ExecutionSource struct {
610
SrcNode HasExecutionInterface
@@ -30,6 +34,16 @@ func (n *Executions) Execute(outputPort OutputId, ec *ExecutionState, err error)
3034

3135
dest, hasDest := n.GetExecutionTarget(outputPort)
3236

37+
// Release any pending concurrency lock for the source node. When a node
38+
// calls Execute on its own Executions to dispatch downstream, it means
39+
// the nodes own ExecuteImpl work is done so we can release its lock
40+
// and let the next concurrent caller continue
41+
if hasDest && dest.SrcNode != nil {
42+
if lockVal, loaded := ec.PendingConcurrencyLocks.LoadAndDelete(dest.SrcNode.GetId()); loaded {
43+
lockVal.(*sync.Mutex).Unlock()
44+
}
45+
}
46+
3347
// Set the step conclusion for the SOURCE node based on which output port is being executed.
3448
// This enables ${{ steps.X.conclusion }} syntax similar to GitHub Actions.
3549
// The conclusion is set BEFORE downstream nodes execute so they can read it.
@@ -65,6 +79,26 @@ func (n *Executions) Execute(outputPort OutputId, ec *ExecutionState, err error)
6579
return nil
6680
}
6781

82+
// If the destination node has _disable_concurrency set, serialize execution
83+
// through a per-node-ID mutex to prevent concurrent ExecuteImpl calls.
84+
// The lock is stored as pending and released when the node calls Execute
85+
// to dispatch downstream (above), or as a fallback when ExecuteImpl
86+
// returns without dispatching (below).
87+
if dest.DstNode.DisableConcurrency() {
88+
dcNodeId := dest.DstNode.GetId()
89+
actual, _ := ec.Graph.ConcurrencyLocks.LoadOrStore(dcNodeId, &sync.Mutex{})
90+
mu := actual.(*sync.Mutex)
91+
mu.Lock()
92+
ec.PendingConcurrencyLocks.Store(dcNodeId, mu)
93+
// Fallback: release if ExecuteImpl returns without calling Execute
94+
// (end of chain, error, or panic).
95+
defer func() {
96+
if lockVal, loaded := ec.PendingConcurrencyLocks.LoadAndDelete(dcNodeId); loaded {
97+
lockVal.(*sync.Mutex).Unlock()
98+
}
99+
}()
100+
}
101+
68102
err = dest.DstNode.ExecuteImpl(ec, dest.Port, err)
69103
if err != nil {
70104
return err

core/graph.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ type ActionGraph struct {
4040
Outputs map[OutputId]OutputDefinition `yaml:"outputs" json:"outputs" bson:"outputs"`
4141

4242
Entry string
43+
44+
// ConcurrencyLocks maps node ID → *sync.Mutex. Used to serialize concurrent
45+
// calls to a node's ExecuteImpl when the node's _disable_concurrency input is true.
46+
ConcurrencyLocks *sync.Map `yaml:"-" json:"-"`
4347
}
4448

4549
func (ag *ActionGraph) AddNode(nodeId string, node NodeBaseInterface) {
@@ -78,7 +82,8 @@ func (ag *ActionGraph) GetEntry() (NodeEntryInterface, error) {
7882

7983
func NewActionGraph() ActionGraph {
8084
return ActionGraph{
81-
Nodes: make(map[string]NodeBaseInterface),
85+
Nodes: make(map[string]NodeBaseInterface),
86+
ConcurrencyLocks: &sync.Map{},
8287
}
8388
}
8489

@@ -245,10 +250,11 @@ func NewExecutionState(
245250
ctx, cancel := context.WithCancel(ctx)
246251

247252
return &ExecutionState{
248-
Graph: graph,
249-
Hierarchy: make([]NodeBaseInterface, 0),
250-
ContextStackLock: &sync.RWMutex{},
251-
OutputCacheLock: &sync.RWMutex{},
253+
Graph: graph,
254+
Hierarchy: make([]NodeBaseInterface, 0),
255+
ContextStackLock: &sync.RWMutex{},
256+
OutputCacheLock: &sync.RWMutex{},
257+
PendingConcurrencyLocks: &sync.Map{},
252258

253259
IsDebugSession: debugCb != nil,
254260
DebugCallback: debugCb,
@@ -815,6 +821,15 @@ func LoadInputValues(node NodeBaseInterface, nodeI map[string]any, validate bool
815821

816822
subInputs := map[string][]subInput{}
817823

824+
// _disable_concurrency is not a regular input, its stored directly on
825+
// the node instance so we pull it out before processing the rest.
826+
if v, ok := inputValues["_disable_concurrency"]; ok {
827+
if b, ok := v.(bool); ok && b {
828+
node.SetDisableConcurrency(true)
829+
}
830+
delete(inputValues, "_disable_concurrency")
831+
}
832+
818833
for portId, inputValue := range inputValues {
819834
groupInputId, portIndex, isIndexPort := IsValidIndexPortId(portId)
820835
if isIndexPort {
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
start-0
2+
end-0
3+
start-1
4+
end-1
5+
start-2
6+
end-2
7+
start-3
8+
end-3
9+
start-4
10+
end-4
11+
all done
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
entry: start
2+
nodes:
3+
- id: start
4+
type: core/start@v1
5+
position:
6+
x: 10
7+
y: 500
8+
- id: loop
9+
type: core/concurrent-for-loop@v1
10+
position:
11+
x: 290
12+
y: 400
13+
inputs:
14+
first_index: 0
15+
last_index: 4
16+
worker_count: 0
17+
- id: mul
18+
type: core/math-multiply@v1
19+
position:
20+
x: 600
21+
y: 200
22+
inputs:
23+
inputs[0]: null
24+
inputs[1]: 100
25+
- id: stagger
26+
type: core/sleep@v1
27+
position:
28+
x: 900
29+
y: 300
30+
inputs:
31+
unit: milliseconds
32+
- id: fmt
33+
type: core/string-fmt@v1
34+
position:
35+
x: 600
36+
y: 500
37+
inputs:
38+
substitutes[0]: null
39+
fmt: IDX=%v
40+
- id: env
41+
type: core/env-array@v1
42+
position:
43+
x: 1000
44+
y: 500
45+
inputs:
46+
env[0]: ''
47+
- id: run
48+
type: core/run@v1
49+
position:
50+
x: 1400
51+
y: 300
52+
inputs:
53+
_disable_concurrency: true
54+
shell: python
55+
script: |-
56+
import time, sys, os
57+
idx = os.environ["IDX"]
58+
sys.stdout.write("start-" + idx + "\n")
59+
sys.stdout.flush()
60+
time.sleep(0.5)
61+
sys.stdout.write("end-" + idx + "\n")
62+
sys.stdout.flush()
63+
- id: done
64+
type: core/run@v1
65+
position:
66+
x: 1400
67+
y: 800
68+
inputs:
69+
shell: python
70+
script: |-
71+
import sys
72+
sys.stdout.write("all done\n")
73+
connections:
74+
- src:
75+
node: loop
76+
port: index
77+
dst:
78+
node: mul
79+
port: inputs[0]
80+
- src:
81+
node: mul
82+
port: result
83+
dst:
84+
node: stagger
85+
port: duration
86+
- src:
87+
node: loop
88+
port: index
89+
dst:
90+
node: fmt
91+
port: substitutes[0]
92+
- src:
93+
node: fmt
94+
port: result
95+
dst:
96+
node: env
97+
port: env[0]
98+
- src:
99+
node: env
100+
port: env
101+
dst:
102+
node: run
103+
port: env
104+
executions:
105+
- src:
106+
node: start
107+
port: exec
108+
dst:
109+
node: loop
110+
port: exec
111+
- src:
112+
node: loop
113+
port: exec-body
114+
dst:
115+
node: stagger
116+
port: exec
117+
- src:
118+
node: stagger
119+
port: exec
120+
dst:
121+
node: run
122+
port: exec
123+
- src:
124+
node: loop
125+
port: exec-completed
126+
dst:
127+
node: done
128+
port: exec
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
echo "Test Concurrent For Loop with Disable Concurrency"
2+
3+
TEST_NAME=concurrent_for_disable_concurrency
4+
GRAPH_FILE="${ACT_GRAPH_FILES_DIR}${PATH_SEPARATOR}${TEST_NAME}.act"
5+
cp $GRAPH_FILE $TEST_NAME.act
6+
export ACT_GRAPH_FILE=$TEST_NAME.act
7+
export ACT_LOGLEVEL=normal
8+
9+
#! test actrun
10+

0 commit comments

Comments
 (0)