Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions src/XTMF2/BuiltInTypes/RunStatusStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
Copyright 2026 University of Toronto

This file is part of XTMF2.

XTMF2 is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

XTMF2 is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with XTMF2. If not, see <http://www.gnu.org/licenses/>.
*/
using System;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace XTMF2;

public sealed class RunStatusStream : WriteStream
{
private XTMFRuntime _runtime;

internal RunStatusStream(XTMFRuntime runtime)
{
_runtime = runtime;
}

public override bool CanRead => false;

public override bool CanSeek => false;

public override bool CanWrite => true;

private long _length = 0;

public override long Length => _length;

public override long Position { get => _length; set => throw new InvalidOperationException("Unable to set the position of a RunStatusStream."); }

private readonly Lock _sync = new();

public override void Flush()
{
// Do nothing, we're always flushed.
}

public override int Read(byte[] buffer, int offset, int count)
{
throw new InvalidOperationException("Unable to read from a RunStatusStream.");
}

public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
{
throw new InvalidOperationException("Unable to read from a RunStatusStream.");
}

public override long Seek(long offset, SeekOrigin origin)
{
throw new InvalidOperationException("Unable to seek in a RunStatusStream.");
}

public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
{
lock (_sync)
{
var task = Task.Run(() =>
{
_runtime.RunBus?.SendStatusMessage(Encoding.UTF8.GetString(buffer, offset, count));
});
return TaskToAsyncResult.Begin(task, null, null);
}
}

public override void SetLength(long value)
{
throw new InvalidOperationException("Unable to set the length of a RunStatusStream");
}

public override void Write(byte[] buffer, int offset, int count)
{
Write(buffer.AsSpan(offset, count));
}

public override void Write(ReadOnlySpan<byte> buffer)
{
var message = Encoding.UTF8.GetString(buffer);
lock (_sync)
{
_runtime.RunBus?.SendStatusMessage(message);
}
}

public override ValueTask DisposeAsync()
{
return ValueTask.CompletedTask;
}

public override void Close()
{
Dispose(true);
}

protected override void Dispose(bool disposing)
{

}

}

11 changes: 10 additions & 1 deletion src/XTMF2/BuiltInTypes/WriteStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ You should have received a copy of the GNU General Public License

namespace XTMF2
{
public sealed class WriteStream : Stream
public class WriteStream : Stream
{
private readonly Stream BaseStream;

Expand All @@ -34,6 +34,15 @@ internal WriteStream(Stream baseStream)
}
}

/// <summary>
/// Initializes a new instance of the <see cref="WriteStream"/> class.
/// Call this when writing to something that doesn't have a backing stream.
/// </summary>
internal WriteStream()
{
BaseStream = Stream.Null;
}

public override bool CanRead => false;

public override bool CanSeek => BaseStream.CanSeek;
Expand Down
15 changes: 7 additions & 8 deletions src/XTMF2/Bus/RunBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private enum Out
/// <summary>
/// This must be obtained before sending any data to the host
/// </summary>
private readonly object _writeLock = new object();
private readonly Lock _writeLock = new();

private void Write(Action<BinaryWriter> writeWith)
{
Expand All @@ -82,8 +82,6 @@ private void Write(Action<BinaryWriter> writeWith)
}
}



/// <summary>
/// Signal to the host that the run failed in the validation step.
/// </summary>
Expand Down Expand Up @@ -161,18 +159,18 @@ public void ProcessRequests()
using var reader = new BinaryReader(_toClient, Encoding.UTF8, false);
while (!_Exit)
{
switch ((In)reader.ReadInt32())
var commandNumber = (In)reader.ReadInt32();
switch (commandNumber)
{
case In.RunModelSystem:
{

_id = reader.ReadString();
var cwd = reader.ReadString();
var start = reader.ReadString();
var msSize = (int)reader.ReadInt64();
using var mem = CreateMemoryStreamLoadingFrom(reader.BaseStream, msSize);
var run = new Run(_id, mem.ToArray(), start, _runtime, cwd);
Task.Run(() =>
Task.Factory.StartNew(() =>
{
try
{
Expand All @@ -199,7 +197,7 @@ public void ProcessRequests()
Console.WriteLine(e.Message + "\r\n" + e.StackTrace);
}
Environment.Exit(0);
});
}, TaskCreationOptions.LongRunning);
}
break;
case In.KillRun:
Expand All @@ -208,7 +206,7 @@ public void ProcessRequests()
break;
// failsafe
default:
return;
throw new InvalidOperationException($"Received an invalid command from the host! #{(int)commandNumber}");
}
Interlocked.MemoryBarrier();
}
Expand All @@ -219,6 +217,7 @@ public void ProcessRequests()

void Dispose(bool disposing)
{
_Exit = true;
if (!disposedValue)
{
if (disposing)
Expand Down
44 changes: 34 additions & 10 deletions src/XTMF2/Bus/RunContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,24 +89,26 @@ public static bool CreateRunContext(XTMFRuntime runtime, string id, byte[] model
return true;
}

private Stream? CreateRunBusLocal(RunServerBus clientBus)
private (RunBus runBus, Task readerTask) CreateRunBusLocal(RunServerBus clientBus)
{
var pipeName = Guid.NewGuid().ToString();
string? error = null;
Stream? clientToRunStream = null;
RunBus? runBus = null;
Task? readerTask = null;
CreateStreams.CreateNewNamedPipeHost(pipeName, out clientToRunStream, out error, () =>
{
clientBus.StartProcessingRequestFromRun(ID, clientToRunStream!);
// Start the reader task. It takes ownership of clientToRunStream and disposes it
// when it exits (leaveOpen=false in the BinaryReader inside StartProcessingRequestFromRun).
readerTask = clientBus.StartProcessingRequestFromRun(ID, clientToRunStream!);
if (CreateStreams.CreateNamedPipeClient(pipeName, out var runToClientStream, out error))
{
Task.Factory.StartNew(() =>
{
using var rb = new RunBus(ID, runToClientStream!, true, _runtime);
rb.ProcessRequests();
}, TaskCreationOptions.LongRunning);
// Construct RunBus synchronously so that _runtime.RunBus is set before
// CreateRunBusLocal returns and Run.StartRun() is called.
runBus = new RunBus(ID, runToClientStream!, true, _runtime);
}
});
return clientToRunStream;
return (runBus!, readerTask!);
}

private (Stream clientToRunStream, Process runProcess) CreateRunBusRemote(RunServerBus clientBus)
Expand Down Expand Up @@ -164,8 +166,30 @@ public void RunInNewProcess(RunServerBus client)

public void RunInCurrentProcess(RunServerBus client)
{
using var stream = CreateRunBusLocal(client);
new Run(ID, _modelSystem, StartToExecute, _runtime, _currentWorkingDirectory).StartRun();
(RunBus runBus, Task readerTask) = CreateRunBusLocal(client);
using (runBus)
{
var error = new Run(ID, _modelSystem, StartToExecute, _runtime, _currentWorkingDirectory).StartRun();
// Send the terminal message through RunBus so the reader task can forward it to
// RunServerBus (and on to HostBus) and then exit cleanly.
switch (error?.Type)
{
case RunErrorType.Validation:
case RunErrorType.RuntimeValidation:
runBus.ModelRunFailedValidation(error.Message);
break;
case RunErrorType.Runtime:
runBus.ModelRunFailed(error.Message, error.StackTrace);
break;
default:
runBus.ModelRunComplete();
break;
}
}
// Wait for the reader task to finish draining and forwarding all messages
// (including the terminal one above) before returning. This ensures no messages
// are lost due to early stream disposal.
readerTask.Wait(5000);
}
}
}
11 changes: 6 additions & 5 deletions src/XTMF2/Bus/RunServerBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private enum Out
/// <summary>
/// This must be obtained before sending any data to the host
/// </summary>
private readonly object _writeLock = new object();
private readonly Lock _writeLock = new();

private void Write(Action<BinaryWriter> writeWith)
{
Expand All @@ -128,11 +128,12 @@ private void Write(Action<BinaryWriter> writeWith)
}
}

internal void StartProcessingRequestFromRun(string id, Stream clientToRunStream)
internal Task StartProcessingRequestFromRun(string id, Stream clientToRunStream)
{
Task.Factory.StartNew(() =>
return Task.Factory.StartNew(() =>
{
var reader = new BinaryReader(clientToRunStream, Encoding.UTF8, true);
// leaveOpen=false so the stream is disposed when the reader exits.
using var reader = new BinaryReader(clientToRunStream, Encoding.UTF8, false);
try
{
while (true)
Expand All @@ -156,7 +157,7 @@ internal void StartProcessingRequestFromRun(string id, Stream clientToRunStream)
return;
case Out.ProgressUpdate:
SendProgressUpdate(reader.ReadString(), reader.ReadSingle());
return;
break;
default:
return;
}
Expand Down
19 changes: 15 additions & 4 deletions src/XTMF2/RuntimeModules/Log.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ You should have received a copy of the GNU General Public License
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Text.Unicode;
using System.Threading;

namespace XTMF2.RuntimeModules
{
Expand All @@ -30,10 +32,12 @@ public sealed class Log : BaseAction<string>, IDisposable
[SubModule(Required = true, Name = "LogStream", Description = "The stream to save the log to.", Index = 0)]
public IFunction<WriteStream>? LogStream;

private readonly object _writeLock = new object();
private readonly Lock _writeLock = new();

private StreamWriter? _writer;

private bool _alwaysFlush = false;

public override void Invoke(string message)
{
lock (_writeLock)
Expand All @@ -42,22 +46,29 @@ public override void Invoke(string message)
{
if (LogStream?.Invoke() is WriteStream writeStream)
{
_writer = new StreamWriter(writeStream, Encoding.Unicode, 0x4000, false);
// Check to see if we need to always flush the stream.
_alwaysFlush = writeStream is RunStatusStream;
var encoding = _alwaysFlush ? new UTF8Encoding(false, true) : Encoding.UTF8;
_writer = new StreamWriter(writeStream, encoding, 0x4000, false);
}
else
{
throw new XTMFRuntimeException(this, "Unable to create a write stream to store the log into!");
}
}
// don't block while writing
_writer.WriteLineAsync(TimeStampMessage(message));
_writer.Write(TimeStampMessage(message));
if (_alwaysFlush)
{
_writer.Flush();
}
}
}

private static string TimeStampMessage(string message)
{
var now = DateTime.Now;
return $"[{now.Hour}:{now.Minute}:{now.Second}] {message}";
return $"[{now.Hour:D2}:{now.Minute:D2}:{now.Second:D2}] {message}";
}

private void Dispose(bool managed)
Expand Down
Loading
Loading