From cb0df2231f620eb5dadc99e03e4d3effeeda31c8 Mon Sep 17 00:00:00 2001 From: mishaschwartz <4380924+mishaschwartz@users.noreply.github.com> Date: Tue, 7 Oct 2025 10:47:54 -0400 Subject: [PATCH 1/3] support reading log files from pipe-like files --- .pre-commit-config.yaml | 2 +- README.md | 5 +++-- log_parser/log_parser.py | 4 +++- test/test_log_parser.py | 23 +++++++++++++++++++++++ 4 files changed, 30 insertions(+), 4 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b8be77d..4a2f583 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,7 +1,7 @@ repos: - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. - rev: v0.7.1 + rev: v0.13.3 hooks: # Run the linter. - id: ruff diff --git a/README.md b/README.md index 4e986c8..2415068 100644 --- a/README.md +++ b/README.md @@ -73,7 +73,7 @@ asyncio.run(my_coroutine()) `track_file` has the following parameters: -- `log_file`: path to a log file to track +- `log_file`: path to a log file to track. This can be a regular file, stream (e.g. stdout/stderr), or named FIFO pipe. - `line_parsers` a list of [line parsers](#what-is-a-line-parser) - `poll_delay`: (see [parameters](#track-parameters) for `track` and `track_async`) - `tail`: (see [parameters](#track-parameters) for `track` and `track_async`) @@ -237,7 +237,8 @@ these scenarios in the following ways: #### How do I track logs that haven't been written to a file? -Log Parser currently only supports reading logs from files. We'd like to support other log sources in the future though. +Log Parser currently only supports reading logs from files and named pipes (such as the stdout and stderr streams). +We'd like to support other log sources in the future though. Feel free to [contribute](#development) to this project if you want to speed up support for this feature. ## Extensions diff --git a/log_parser/log_parser.py b/log_parser/log_parser.py index b408a0d..1b9c616 100644 --- a/log_parser/log_parser.py +++ b/log_parser/log_parser.py @@ -40,6 +40,8 @@ async def _check_file_state(log_io: AsyncFile[str]) -> _FileState: return _FileState.DELETED file_stat = os.stat(log_io.fileno()) if same_name_file_stat == file_stat: + if not log_io.seekable(): + return _FileState.NOCHANGE if await log_io.tell() > file_stat.st_size: return _FileState.TRUNCATED else: @@ -79,7 +81,7 @@ async def track_file( log_io = await open_file(log_file) logger.debug(f"file '{log_file}' opened for reading") try: - if tail: + if tail and log_io.seekable(): await log_io.seek(0, os.SEEK_END) while True: file_state = await _check_file_state(log_io) diff --git a/test/test_log_parser.py b/test/test_log_parser.py index 3706a82..145f8eb 100644 --- a/test/test_log_parser.py +++ b/test/test_log_parser.py @@ -2,6 +2,7 @@ import asyncio import inspect import itertools +import os from contextlib import asynccontextmanager import anyio @@ -28,6 +29,12 @@ async def tmp_log(self, tmp_path): await log_file.touch() return log_file + @pytest.fixture + async def tmp_log_pipe(self, tmp_path): + log_file = anyio.Path(tmp_path / "test.log") + os.mkfifo(log_file) + return log_file + @staticmethod async def delayed_write(file, text, delay, mode="w"): await asyncio.sleep(delay) @@ -162,6 +169,22 @@ async def test_file_contains_line_mixed_line_parsers(self, tmp_log): ) assert output == text * 2 + async def test_pipe_contains_line(self, tmp_log_pipe): + text = ["test 123"] + output = [] + async with silent_timeout(1.1), asyncio.TaskGroup() as tg: + tg.create_task(log_parser.track_file(str(tmp_log_pipe), [self.basic_line_parser(output)])) + tg.create_task(self.delayed_write(tmp_log_pipe, "\n".join(text), 0.1)) + assert output == text + + async def test_pipe_contains_lines(self, tmp_log_pipe): + text = ["test 123", "test 456"] + output = [] + async with silent_timeout(1.1), asyncio.TaskGroup() as tg: + tg.create_task(log_parser.track_file(str(tmp_log_pipe), [self.basic_line_parser(output)])) + tg.create_task(self.delayed_write(tmp_log_pipe, "\n".join(text), 0.1)) + assert output == text + class _TrackTests(abc.ABC): @abc.abstractmethod From ec9363d3b252673ce858915d54963e8f6e864c95 Mon Sep 17 00:00:00 2001 From: mishaschwartz <4380924+mishaschwartz@users.noreply.github.com> Date: Tue, 7 Oct 2025 10:55:30 -0400 Subject: [PATCH 2/3] undo pre-commit changes (do in other PR) --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 4a2f583..b8be77d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,7 +1,7 @@ repos: - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. - rev: v0.13.3 + rev: v0.7.1 hooks: # Run the linter. - id: ruff From bbb1b0bca7244c69b17e308dbf43a42573c94076 Mon Sep 17 00:00:00 2001 From: mishaschwartz <4380924+mishaschwartz@users.noreply.github.com> Date: Tue, 11 Nov 2025 10:59:16 -0500 Subject: [PATCH 3/3] paramaterize test --- test/test_log_parser.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/test/test_log_parser.py b/test/test_log_parser.py index 145f8eb..6d432ce 100644 --- a/test/test_log_parser.py +++ b/test/test_log_parser.py @@ -169,16 +169,8 @@ async def test_file_contains_line_mixed_line_parsers(self, tmp_log): ) assert output == text * 2 - async def test_pipe_contains_line(self, tmp_log_pipe): - text = ["test 123"] - output = [] - async with silent_timeout(1.1), asyncio.TaskGroup() as tg: - tg.create_task(log_parser.track_file(str(tmp_log_pipe), [self.basic_line_parser(output)])) - tg.create_task(self.delayed_write(tmp_log_pipe, "\n".join(text), 0.1)) - assert output == text - - async def test_pipe_contains_lines(self, tmp_log_pipe): - text = ["test 123", "test 456"] + @pytest.mark.parametrize("text", [["test 123"], ["test 123", "test 456"]]) + async def test_pipe_contains_lines(self, tmp_log_pipe, text): output = [] async with silent_timeout(1.1), asyncio.TaskGroup() as tg: tg.create_task(log_parser.track_file(str(tmp_log_pipe), [self.basic_line_parser(output)]))