diff --git a/src/XTMF2/BuiltInTypes/RunStatusStream.cs b/src/XTMF2/BuiltInTypes/RunStatusStream.cs new file mode 100644 index 0000000..91b412c --- /dev/null +++ b/src/XTMF2/BuiltInTypes/RunStatusStream.cs @@ -0,0 +1,117 @@ +/* + Copyright 2026 University of Toronto + + This file is part of XTMF2. + + XTMF2 is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + XTMF2 is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with XTMF2. If not, see . +*/ +using System; +using System.IO; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace XTMF2; + +public sealed class RunStatusStream : WriteStream +{ + private XTMFRuntime _runtime; + + internal RunStatusStream(XTMFRuntime runtime) + { + _runtime = runtime; + } + + public override bool CanRead => false; + + public override bool CanSeek => false; + + public override bool CanWrite => true; + + private long _length = 0; + + public override long Length => _length; + + public override long Position { get => _length; set => throw new InvalidOperationException("Unable to set the position of a RunStatusStream."); } + + private readonly Lock _sync = new(); + + public override void Flush() + { + // Do nothing, we're always flushed. + } + + public override int Read(byte[] buffer, int offset, int count) + { + throw new InvalidOperationException("Unable to read from a RunStatusStream."); + } + + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) + { + throw new InvalidOperationException("Unable to read from a RunStatusStream."); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new InvalidOperationException("Unable to seek in a RunStatusStream."); + } + + public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) + { + lock (_sync) + { + var task = Task.Run(() => + { + _runtime.RunBus?.SendStatusMessage(Encoding.UTF8.GetString(buffer, offset, count)); + }); + return TaskToAsyncResult.Begin(task, null, null); + } + } + + public override void SetLength(long value) + { + throw new InvalidOperationException("Unable to set the length of a RunStatusStream"); + } + + public override void Write(byte[] buffer, int offset, int count) + { + Write(buffer.AsSpan(offset, count)); + } + + public override void Write(ReadOnlySpan buffer) + { + var message = Encoding.UTF8.GetString(buffer); + lock (_sync) + { + _runtime.RunBus?.SendStatusMessage(message); + } + } + + public override ValueTask DisposeAsync() + { + return ValueTask.CompletedTask; + } + + public override void Close() + { + Dispose(true); + } + + protected override void Dispose(bool disposing) + { + + } + +} + diff --git a/src/XTMF2/BuiltInTypes/WriteStream.cs b/src/XTMF2/BuiltInTypes/WriteStream.cs index baf6e4d..31e4d41 100644 --- a/src/XTMF2/BuiltInTypes/WriteStream.cs +++ b/src/XTMF2/BuiltInTypes/WriteStream.cs @@ -21,7 +21,7 @@ You should have received a copy of the GNU General Public License namespace XTMF2 { - public sealed class WriteStream : Stream + public class WriteStream : Stream { private readonly Stream BaseStream; @@ -34,6 +34,15 @@ internal WriteStream(Stream baseStream) } } + /// + /// Initializes a new instance of the class. + /// Call this when writing to something that doesn't have a backing stream. + /// + internal WriteStream() + { + BaseStream = Stream.Null; + } + public override bool CanRead => false; public override bool CanSeek => BaseStream.CanSeek; diff --git a/src/XTMF2/Bus/RunBus.cs b/src/XTMF2/Bus/RunBus.cs index ad6b9fe..13289ae 100644 --- a/src/XTMF2/Bus/RunBus.cs +++ b/src/XTMF2/Bus/RunBus.cs @@ -71,7 +71,7 @@ private enum Out /// /// This must be obtained before sending any data to the host /// - private readonly object _writeLock = new object(); + private readonly Lock _writeLock = new(); private void Write(Action writeWith) { @@ -82,8 +82,6 @@ private void Write(Action writeWith) } } - - /// /// Signal to the host that the run failed in the validation step. /// @@ -161,18 +159,18 @@ public void ProcessRequests() using var reader = new BinaryReader(_toClient, Encoding.UTF8, false); while (!_Exit) { - switch ((In)reader.ReadInt32()) + var commandNumber = (In)reader.ReadInt32(); + switch (commandNumber) { case In.RunModelSystem: { - _id = reader.ReadString(); var cwd = reader.ReadString(); var start = reader.ReadString(); var msSize = (int)reader.ReadInt64(); using var mem = CreateMemoryStreamLoadingFrom(reader.BaseStream, msSize); var run = new Run(_id, mem.ToArray(), start, _runtime, cwd); - Task.Run(() => + Task.Factory.StartNew(() => { try { @@ -199,7 +197,7 @@ public void ProcessRequests() Console.WriteLine(e.Message + "\r\n" + e.StackTrace); } Environment.Exit(0); - }); + }, TaskCreationOptions.LongRunning); } break; case In.KillRun: @@ -208,7 +206,7 @@ public void ProcessRequests() break; // failsafe default: - return; + throw new InvalidOperationException($"Received an invalid command from the host! #{(int)commandNumber}"); } Interlocked.MemoryBarrier(); } @@ -219,6 +217,7 @@ public void ProcessRequests() void Dispose(bool disposing) { + _Exit = true; if (!disposedValue) { if (disposing) diff --git a/src/XTMF2/Bus/RunContext.cs b/src/XTMF2/Bus/RunContext.cs index a5275eb..a8725cb 100644 --- a/src/XTMF2/Bus/RunContext.cs +++ b/src/XTMF2/Bus/RunContext.cs @@ -89,24 +89,26 @@ public static bool CreateRunContext(XTMFRuntime runtime, string id, byte[] model return true; } - private Stream? CreateRunBusLocal(RunServerBus clientBus) + private (RunBus runBus, Task readerTask) CreateRunBusLocal(RunServerBus clientBus) { var pipeName = Guid.NewGuid().ToString(); string? error = null; Stream? clientToRunStream = null; + RunBus? runBus = null; + Task? readerTask = null; CreateStreams.CreateNewNamedPipeHost(pipeName, out clientToRunStream, out error, () => { - clientBus.StartProcessingRequestFromRun(ID, clientToRunStream!); + // Start the reader task. It takes ownership of clientToRunStream and disposes it + // when it exits (leaveOpen=false in the BinaryReader inside StartProcessingRequestFromRun). + readerTask = clientBus.StartProcessingRequestFromRun(ID, clientToRunStream!); if (CreateStreams.CreateNamedPipeClient(pipeName, out var runToClientStream, out error)) { - Task.Factory.StartNew(() => - { - using var rb = new RunBus(ID, runToClientStream!, true, _runtime); - rb.ProcessRequests(); - }, TaskCreationOptions.LongRunning); + // Construct RunBus synchronously so that _runtime.RunBus is set before + // CreateRunBusLocal returns and Run.StartRun() is called. + runBus = new RunBus(ID, runToClientStream!, true, _runtime); } }); - return clientToRunStream; + return (runBus!, readerTask!); } private (Stream clientToRunStream, Process runProcess) CreateRunBusRemote(RunServerBus clientBus) @@ -164,8 +166,30 @@ public void RunInNewProcess(RunServerBus client) public void RunInCurrentProcess(RunServerBus client) { - using var stream = CreateRunBusLocal(client); - new Run(ID, _modelSystem, StartToExecute, _runtime, _currentWorkingDirectory).StartRun(); + (RunBus runBus, Task readerTask) = CreateRunBusLocal(client); + using (runBus) + { + var error = new Run(ID, _modelSystem, StartToExecute, _runtime, _currentWorkingDirectory).StartRun(); + // Send the terminal message through RunBus so the reader task can forward it to + // RunServerBus (and on to HostBus) and then exit cleanly. + switch (error?.Type) + { + case RunErrorType.Validation: + case RunErrorType.RuntimeValidation: + runBus.ModelRunFailedValidation(error.Message); + break; + case RunErrorType.Runtime: + runBus.ModelRunFailed(error.Message, error.StackTrace); + break; + default: + runBus.ModelRunComplete(); + break; + } + } + // Wait for the reader task to finish draining and forwarding all messages + // (including the terminal one above) before returning. This ensures no messages + // are lost due to early stream disposal. + readerTask.Wait(5000); } } } diff --git a/src/XTMF2/Bus/RunServerBus.cs b/src/XTMF2/Bus/RunServerBus.cs index 6c4996a..e3d1f93 100644 --- a/src/XTMF2/Bus/RunServerBus.cs +++ b/src/XTMF2/Bus/RunServerBus.cs @@ -117,7 +117,7 @@ private enum Out /// /// This must be obtained before sending any data to the host /// - private readonly object _writeLock = new object(); + private readonly Lock _writeLock = new(); private void Write(Action writeWith) { @@ -128,11 +128,12 @@ private void Write(Action writeWith) } } - internal void StartProcessingRequestFromRun(string id, Stream clientToRunStream) + internal Task StartProcessingRequestFromRun(string id, Stream clientToRunStream) { - Task.Factory.StartNew(() => + return Task.Factory.StartNew(() => { - var reader = new BinaryReader(clientToRunStream, Encoding.UTF8, true); + // leaveOpen=false so the stream is disposed when the reader exits. + using var reader = new BinaryReader(clientToRunStream, Encoding.UTF8, false); try { while (true) @@ -156,7 +157,7 @@ internal void StartProcessingRequestFromRun(string id, Stream clientToRunStream) return; case Out.ProgressUpdate: SendProgressUpdate(reader.ReadString(), reader.ReadSingle()); - return; + break; default: return; } diff --git a/src/XTMF2/RuntimeModules/Log.cs b/src/XTMF2/RuntimeModules/Log.cs index e338c97..74ae219 100644 --- a/src/XTMF2/RuntimeModules/Log.cs +++ b/src/XTMF2/RuntimeModules/Log.cs @@ -20,6 +20,8 @@ You should have received a copy of the GNU General Public License using System.Collections.Generic; using System.IO; using System.Text; +using System.Text.Unicode; +using System.Threading; namespace XTMF2.RuntimeModules { @@ -30,10 +32,12 @@ public sealed class Log : BaseAction, IDisposable [SubModule(Required = true, Name = "LogStream", Description = "The stream to save the log to.", Index = 0)] public IFunction? LogStream; - private readonly object _writeLock = new object(); + private readonly Lock _writeLock = new(); private StreamWriter? _writer; + private bool _alwaysFlush = false; + public override void Invoke(string message) { lock (_writeLock) @@ -42,7 +46,10 @@ public override void Invoke(string message) { if (LogStream?.Invoke() is WriteStream writeStream) { - _writer = new StreamWriter(writeStream, Encoding.Unicode, 0x4000, false); + // Check to see if we need to always flush the stream. + _alwaysFlush = writeStream is RunStatusStream; + var encoding = _alwaysFlush ? new UTF8Encoding(false, true) : Encoding.UTF8; + _writer = new StreamWriter(writeStream, encoding, 0x4000, false); } else { @@ -50,14 +57,18 @@ public override void Invoke(string message) } } // don't block while writing - _writer.WriteLineAsync(TimeStampMessage(message)); + _writer.Write(TimeStampMessage(message)); + if (_alwaysFlush) + { + _writer.Flush(); + } } } private static string TimeStampMessage(string message) { var now = DateTime.Now; - return $"[{now.Hour}:{now.Minute}:{now.Second}] {message}"; + return $"[{now.Hour:D2}:{now.Minute:D2}:{now.Second:D2}] {message}"; } private void Dispose(bool managed) diff --git a/src/XTMF2/RuntimeModules/OpenWriteStreamToRunStatus.cs b/src/XTMF2/RuntimeModules/OpenWriteStreamToRunStatus.cs new file mode 100644 index 0000000..1755e73 --- /dev/null +++ b/src/XTMF2/RuntimeModules/OpenWriteStreamToRunStatus.cs @@ -0,0 +1,38 @@ +/* + Copyright 2026 University of Toronto + + This file is part of XTMF2. + + XTMF2 is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + XTMF2 is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with XTMF2. If not, see . +*/ + +namespace XTMF2.RuntimeModules; + +[Module(Name ="OpenWriteStreamToRunStatus", + Description = "Opens a stream that can be written to in order to send status messages back to the client. This works with Logs.", + DocumentationLink = "https://tmg.utoronto.ca/doc/2.0")] +public sealed class OpenWriteStreamToRunStatus : BaseFunction +{ + private readonly XTMFRuntime _runtime; + + public OpenWriteStreamToRunStatus(XTMFRuntime runtime) + { + _runtime = runtime; + } + + public override WriteStream Invoke() + { + return new RunStatusStream(_runtime); + } +}