Skip to content

Commit 0e5dfa7

Browse files
authored
Merge pull request #8 from viamus/feature/parallel-log-improvements
Replace parallel logs with live-updating table
2 parents 32ccb41 + e4075d1 commit 0e5dfa7

5 files changed

Lines changed: 398 additions & 201 deletions

File tree

CodeGenesis.Engine/Pipeline/PipelineExecutor.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ public async Task<bool> RunAsync(
2727
StepResult result;
2828
try
2929
{
30-
// Foreach renders its own sequential progress interleaved with sub-steps;
31-
// wrapping it in a spinner would swallow that output.
32-
// Parallel/ParallelForeach suppress sub-step rendering, so the spinner
33-
// runs at the bottom while branch/item completions appear above it.
34-
// ApprovalStep requires interactive Console input — no spinner.
35-
if (step is ForeachStep or ApprovalStep)
30+
// Foreach renders its own sequential progress interleaved with sub-steps.
31+
// Parallel/ParallelForeach use their own Live table display.
32+
// ApprovalStep requires interactive Console input.
33+
// None of these can be wrapped in a spinner (Spectre.Console
34+
// does not allow concurrent interactive displays).
35+
if (step is ForeachStep or ParallelStep or ParallelForeachStep or ApprovalStep)
3636
result = await step.ExecuteAsync(context, ct);
3737
else
3838
result = await renderer.RunWithSpinner(

CodeGenesis.Engine/Steps/ParallelForeachStep.cs

Lines changed: 77 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ public async Task<StepResult> ExecuteAsync(PipelineContext context, Cancellation
3333
var collectionRaw = resolveTemplate(config.Collection, allVars);
3434
var items = CollectionParser.Parse(collectionRaw);
3535

36-
renderer.RenderParallelForeachStart(config.ItemVar, items.Count, config.MaxConcurrency);
37-
3836
if (items.Count == 0)
3937
{
4038
sw.Stop();
@@ -46,104 +44,107 @@ public async Task<StepResult> ExecuteAsync(PipelineContext context, Cancellation
4644
};
4745
}
4846

47+
var concurrencyInfo = config.MaxConcurrency.HasValue
48+
? $"max {config.MaxConcurrency}"
49+
: "unlimited";
50+
var detail = $"{config.ItemVar} {items.Count} item(s) concurrency: {concurrencyInfo}";
51+
4952
var maxConcurrency = config.MaxConcurrency ?? int.MaxValue;
5053
using var semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
5154
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
5255

5356
var iterationResults = new (bool Success, PipelineContext Context, string Item, int Index)[items.Count];
54-
var tasks = new Task[items.Count];
5557

56-
for (var i = 0; i < items.Count; i++)
58+
await renderer.RunParallelWithLiveTable(items, "parallel_foreach", detail, async liveTable =>
5759
{
58-
var index = i;
59-
var item = items[i];
60+
var tasks = new Task[items.Count];
6061

61-
tasks[i] = Task.Run(async () =>
62+
for (var i = 0; i < items.Count; i++)
6263
{
63-
await semaphore.WaitAsync(linkedCts.Token);
64-
try
65-
{
66-
// Suppress all sub-step rendering; only item-level messages show
67-
renderer.PushScope();
68-
renderer.SuppressRendering();
64+
var index = i;
65+
var item = items[i];
6966

70-
// Skip null or empty items
71-
if (string.IsNullOrWhiteSpace(item))
67+
tasks[i] = Task.Run(async () =>
68+
{
69+
await semaphore.WaitAsync(linkedCts.Token);
70+
try
7271
{
73-
renderer.ResumeRendering();
74-
renderer.RenderParallelForeachItemStart("(empty)", index, items.Count);
72+
// Suppress all sub-step rendering inside parallel
73+
renderer.PushScope();
7574
renderer.SuppressRendering();
76-
var emptyCtx = new PipelineContext
75+
76+
// Skip null or empty items
77+
if (string.IsNullOrWhiteSpace(item))
78+
{
79+
var emptyCtx = new PipelineContext
80+
{
81+
TaskDescription = context.TaskDescription,
82+
WorkingDirectory = context.WorkingDirectory
83+
};
84+
liveTable.MarkStarted(index);
85+
liveTable.MarkComplete(index, true, TimeSpan.Zero, 0, 0);
86+
iterationResults[index] = (true, emptyCtx, item, index);
87+
return;
88+
}
89+
90+
liveTable.MarkStarted(index);
91+
92+
// Create isolated context for this iteration
93+
var iterationContext = new PipelineContext
7794
{
7895
TaskDescription = context.TaskDescription,
79-
WorkingDirectory = context.WorkingDirectory
96+
WorkingDirectory = context.WorkingDirectory,
97+
StatusUpdate = msg => liveTable.UpdateActivity(index, msg)
8098
};
81-
iterationResults[index] = (true, emptyCtx, item, index);
82-
return;
83-
}
8499

85-
// Temporarily resume to render our own item-level message
86-
renderer.ResumeRendering();
87-
renderer.RenderParallelForeachItemStart(item, index, items.Count);
88-
renderer.SuppressRendering();
100+
// Copy parent step outputs so sub-steps can read them
101+
foreach (var (key, value) in context.StepOutputs)
102+
iterationContext.StepOutputs[key] = value;
89103

90-
// Create isolated context for this iteration
91-
var iterationContext = new PipelineContext
92-
{
93-
TaskDescription = context.TaskDescription,
94-
WorkingDirectory = context.WorkingDirectory,
95-
StatusUpdate = msg => renderer.RenderThinking(item, msg)
96-
};
97-
98-
// Copy parent step outputs so sub-steps can read them
99-
foreach (var (key, value) in context.StepOutputs)
100-
iterationContext.StepOutputs[key] = value;
101-
102-
// Build loop variables for this iteration
103-
var iterVars = new Dictionary<string, string>(allVars)
104-
{
105-
["loop.item"] = item,
106-
["loop.index"] = index.ToString(),
107-
[config.ItemVar] = item
108-
};
104+
// Build loop variables for this iteration
105+
var iterVars = new Dictionary<string, string>(allVars)
106+
{
107+
["loop.item"] = item,
108+
["loop.index"] = index.ToString(),
109+
[config.ItemVar] = item
110+
};
109111

110-
// Clone sub-steps for this iteration so parallel threads don't share mutable state
111-
var clonedSubSteps = CloneSubSteps(subSteps, iterVars);
112+
// Clone sub-steps for this iteration so parallel threads don't share mutable state
113+
var clonedSubSteps = CloneSubSteps(subSteps, iterVars);
112114

113-
var iterSw = Stopwatch.StartNew();
115+
var iterSw = Stopwatch.StartNew();
114116

115-
var success = await executor.RunAsync(clonedSubSteps, iterationContext, linkedCts.Token,
116-
onBeforeStep: step => ResolveBeforeStep(step, iterVars, iterationContext));
117+
var success = await executor.RunAsync(clonedSubSteps, iterationContext, linkedCts.Token,
118+
onBeforeStep: step => ResolveBeforeStep(step, iterVars, iterationContext));
117119

118-
iterSw.Stop();
120+
iterSw.Stop();
119121

120-
iterationResults[index] = (success, iterationContext, item, index);
122+
iterationResults[index] = (success, iterationContext, item, index);
121123

122-
if (!success && config.FailFast)
123-
await linkedCts.CancelAsync();
124+
if (!success && config.FailFast)
125+
await linkedCts.CancelAsync();
124126

125-
// Resume rendering for our own completion message
126-
renderer.ResumeRendering();
127-
var iterTokens = iterationContext.TotalInputTokens + iterationContext.TotalOutputTokens;
128-
renderer.RenderParallelForeachItemComplete(item, index, items.Count, success, iterSw.Elapsed, iterTokens, iterationContext.TotalCostUsd);
129-
}
130-
finally
131-
{
132-
renderer.ResumeRendering();
133-
renderer.PopScope();
134-
semaphore.Release();
135-
}
136-
}, linkedCts.Token);
137-
}
127+
var iterTokens = iterationContext.TotalInputTokens + iterationContext.TotalOutputTokens;
128+
liveTable.MarkComplete(index, success, iterSw.Elapsed, iterTokens, iterationContext.TotalCostUsd);
129+
}
130+
finally
131+
{
132+
renderer.ResumeRendering();
133+
renderer.PopScope();
134+
semaphore.Release();
135+
}
136+
}, linkedCts.Token);
137+
}
138138

139-
try
140-
{
141-
await Task.WhenAll(tasks);
142-
}
143-
catch (OperationCanceledException) when (config.FailFast)
144-
{
145-
// Expected when fail_fast cancels siblings
146-
}
139+
try
140+
{
141+
await Task.WhenAll(tasks);
142+
}
143+
catch (OperationCanceledException) when (config.FailFast)
144+
{
145+
// Expected when fail_fast cancels siblings
146+
}
147+
});
147148

148149
sw.Stop();
149150

@@ -191,7 +192,7 @@ public async Task<StepResult> ExecuteAsync(PipelineContext context, Cancellation
191192

192193
var succeeded = iterationResults.Count(r => r.Success);
193194
var failed = items.Count - succeeded;
194-
renderer.RenderParallelForeachEnd(items.Count, succeeded, failed);
195+
renderer.RenderParallelSummary(items.Count, succeeded, failed);
195196

196197
return new StepResult
197198
{

0 commit comments

Comments
 (0)