Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/kaos/src/kaos/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ async def readtext(
errors: Literal["strict", "ignore", "replace"] = "strict",
) -> str:
local_path = path.unsafe_to_local_path() if isinstance(path, KaosPath) else Path(path)
async with aiofiles.open(local_path, encoding=encoding, errors=errors) as f:
async with aiofiles.open(local_path, encoding=encoding, errors=errors, newline="") as f:
return await f.read()

async def readlines(
Expand All @@ -132,7 +132,7 @@ async def readlines(
errors: Literal["strict", "ignore", "replace"] = "strict",
) -> AsyncGenerator[str]:
local_path = path.unsafe_to_local_path() if isinstance(path, KaosPath) else Path(path)
async with aiofiles.open(local_path, encoding=encoding, errors=errors) as f:
async with aiofiles.open(local_path, encoding=encoding, errors=errors, newline="") as f:
async for line in f:
yield line

Expand Down
15 changes: 10 additions & 5 deletions packages/kaos/src/kaos/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,9 @@ async def readtext(
encoding: str = "utf-8",
errors: Literal["strict", "ignore", "replace"] = "strict",
) -> str:
async with self._sftp.open(str(path), "r", encoding=encoding, errors=errors) as f:
return await f.read()
# Read raw bytes to avoid any newline translation by the remote SFTP server.
raw = await self.readbytes(path)
return raw.decode(encoding, errors=errors)

async def readlines(
self,
Expand All @@ -240,7 +241,7 @@ async def readlines(
) -> AsyncGenerator[str]:
# NOTE: readlines is not supported by SFTPClientFile
text = await self.readtext(path, encoding=encoding, errors=errors)
for line in text.splitlines():
for line in text.splitlines(keepends=True):
yield line

async def writebytes(self, path: StrOrKaosPath, data: bytes) -> int:
Expand All @@ -256,8 +257,12 @@ async def writetext(
encoding: str = "utf-8",
errors: Literal["strict", "ignore", "replace"] = "strict",
) -> int:
async with self._sftp.open(str(path), mode, encoding=encoding, errors=errors) as f:
return await f.write(data)
# Write raw bytes to avoid any newline translation by the remote SFTP server.
bytes_mode = "ab" if mode == "a" else "wb"
encoded = data.encode(encoding, errors=errors)
async with self._sftp.open(str(path), bytes_mode) as f:
await f.write(encoded)
return len(data)

async def mkdir(
self,
Expand Down
35 changes: 35 additions & 0 deletions packages/kaos/tests/test_local_kaos.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,41 @@ async def test_writetext_preserves_crlf_line_endings(local_kaos: LocalKaos):
assert raw == b"hello\r\nworld\r\n", f"Expected CRLF preserved, got {raw!r}"


async def test_readtext_preserves_lf_line_endings(local_kaos: LocalKaos):
"""readtext should not convert LF to CRLF."""
tmp_path = local_kaos.getcwd()
file_path = tmp_path / "lf_read.txt"

await local_kaos.writebytes(file_path, b"hello\nworld\n")

text = await local_kaos.readtext(file_path)
assert "\r" not in text, f"Expected no \\r in read text, got {text!r}"
assert text == "hello\nworld\n", f"Expected LF line endings, got {text!r}"


async def test_readtext_preserves_crlf_line_endings(local_kaos: LocalKaos):
"""readtext should preserve CRLF line endings."""
tmp_path = local_kaos.getcwd()
file_path = tmp_path / "crlf_read.txt"

await local_kaos.writebytes(file_path, b"hello\r\nworld\r\n")

text = await local_kaos.readtext(file_path)
assert "\r\n" in text, f"Expected CRLF in read text, got {text!r}"
assert text == "hello\r\nworld\r\n", f"Expected CRLF preserved, got {text!r}"


async def test_readlines_preserves_crlf_line_endings(local_kaos: LocalKaos):
"""readlines should preserve CRLF line endings in each yielded line."""
tmp_path = local_kaos.getcwd()
file_path = tmp_path / "crlf_lines.txt"

await local_kaos.writebytes(file_path, b"line1\r\nline2\r\n")

lines = [line async for line in local_kaos.readlines(file_path)]
assert lines == ["line1\r\n", "line2\r\n"], f"Expected CRLF preserved in lines, got {lines!r}"


async def test_mkdir_with_parents(local_kaos: LocalKaos):
tmp_path = local_kaos.getcwd()
nested_dir = tmp_path / "a" / "b" / "c"
Expand Down
15 changes: 15 additions & 0 deletions packages/kaos/tests/test_ssh_kaos.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,21 @@ async def test_kaospath_roundtrip(bind_current_kaos: SSHKaos, remote_base: str):
assert str(KaosPath.cwd()) == remote_base


async def test_writetext_returns_char_count_not_byte_count(
ssh_kaos: SSHKaos, remote_base: str
):
"""writetext() must return character count, not byte count, for CJK text."""
file_path = os.path.join(remote_base, "cjk.txt")
cjk_text = "你好世界"
written = await ssh_kaos.writetext(file_path, cjk_text)
assert written == len(cjk_text), (
f"Expected {len(cjk_text)} characters, got {written}"
)

read_back = await ssh_kaos.readtext(file_path)
assert read_back == cjk_text


async def test_iterdir_lists_child_entries(ssh_kaos: SSHKaos, remote_base: str):
await ssh_kaos.writetext(os.path.join(remote_base, "file1.txt"), "1")
await ssh_kaos.writetext(os.path.join(remote_base, "file2.log"), "2")
Expand Down
35 changes: 26 additions & 9 deletions src/kimi_cli/tools/file/replace.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,23 +128,40 @@ async def __call__(self, params: Params) -> ToolReturnValue:
brief="Invalid path",
)

# Read the file content
content = await p.read_text(errors="replace")

original_content = content
# Read raw bytes to preserve original line endings
raw_bytes = await p.read_bytes()
original_content = raw_bytes.decode("utf-8", errors="replace")

# Detect dominant line ending style
if b"\r\n" in raw_bytes:
eol_style = "\r\n"
elif b"\r" in raw_bytes:
eol_style = "\r"
else:
eol_style = "\n"

# Normalize to \n for model matching (model-generated old/new use \n)
normalized_content = original_content.replace("\r\n", "\n").replace("\r", "\n")

content = normalized_content
edits = [params.edit] if isinstance(params.edit, Edit) else params.edit

# Apply all edits
for edit in edits:
content = self._apply_edit(content, edit)

# Check if any changes were made
if content == original_content:
if content == normalized_content:
return ToolError(
message="No replacements were made. The old string was not found in the file.",
brief="No replacements made",
)

# Restore original line ending style before writing
if eol_style != "\n":
# Normalize to LF first to avoid double-converting existing CRLF in edit.new
content = content.replace("\r\n", "\n").replace("\r", "\n").replace("\n", eol_style)

diff_blocks: list[DisplayBlock] = await build_diff_blocks(
str(p), original_content, content
)
Expand All @@ -166,16 +183,16 @@ async def __call__(self, params: Params) -> ToolReturnValue:
if not result:
return result.rejection_error()

# Write the modified content back to the file
await p.write_text(content, errors="replace")
# Write the modified content back to the file preserving original line endings
await p.write_bytes(content.encode("utf-8"))

# Count changes for success message
total_replacements = 0
for edit in edits:
if edit.replace_all:
total_replacements += original_content.count(edit.old)
total_replacements += normalized_content.count(edit.old)
else:
total_replacements += 1 if edit.old in original_content else 0
total_replacements += 1 if edit.old in normalized_content else 0

return ToolReturnValue(
is_error=False,
Expand Down
28 changes: 22 additions & 6 deletions src/kimi_cli/tools/file/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,23 @@ async def __call__(self, params: Params) -> ToolReturnValue:

file_existed = await p.exists()
old_text = None
original_bytes = b""
if file_existed:
old_text = await p.read_text(errors="replace")
original_bytes = await p.read_bytes()
old_text = original_bytes.decode("utf-8", errors="replace")

# Detect dominant line ending style from existing file
eol_style = "\n"
if file_existed:
if b"\r\n" in original_bytes:
eol_style = "\r\n"
elif b"\r" in original_bytes:
eol_style = "\r"

new_text = params.content
if params.mode == "append":
new_text = (old_text or "") + params.content

new_text = (
params.content if params.mode == "overwrite" else (old_text or "") + params.content
)
diff_blocks: list[DisplayBlock] = await build_diff_blocks(
str(p),
old_text or "",
Expand All @@ -152,10 +163,15 @@ async def __call__(self, params: Params) -> ToolReturnValue:
if not result:
return result.rejection_error()

# Write content to file
# Write content to file preserving original line endings when overwriting
match params.mode:
case "overwrite":
await p.write_text(params.content)
if eol_style != "\n":
# Normalize to LF first to avoid double-converting existing CRLF
content_to_write = params.content.replace("\r\n", "\n").replace("\r", "\n").replace("\n", eol_style)
else:
content_to_write = params.content
await p.write_bytes(content_to_write.encode("utf-8"))
case "append":
await p.append_text(params.content)
Comment thread
Sisyphbaous-DT-Project marked this conversation as resolved.

Expand Down
90 changes: 90 additions & 0 deletions tests/tools/test_str_replace_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,93 @@ async def test_replace_empty_strings(
assert not result.is_error
assert "successfully edited" in result.message
assert await file_path.read_text() == "Hello !"


async def test_replace_preserves_crlf_line_endings(
str_replace_file_tool: StrReplaceFile, temp_work_dir: KaosPath
):
"""StrReplaceFile should preserve CRLF line endings in the file."""
file_path = temp_work_dir / "crlf.txt"
original_bytes = b"line1\r\nline2\r\nline3\r\n"
await file_path.write_bytes(original_bytes)

result = await str_replace_file_tool(
Params(
path=str(file_path),
edit=Edit(old="line2", new="modified_line2"),
)
)

assert not result.is_error
assert "successfully edited" in result.message

# Verify bytes on disk are still CRLF
raw = await file_path.read_bytes()
expected = b"line1\r\nmodified_line2\r\nline3\r\n"
assert raw == expected, f"Expected CRLF preserved, got {raw!r}"


async def test_replace_preserves_lf_line_endings(
str_replace_file_tool: StrReplaceFile, temp_work_dir: KaosPath
):
"""StrReplaceFile should preserve LF line endings in the file."""
file_path = temp_work_dir / "lf.txt"
original_bytes = b"line1\nline2\nline3\n"
await file_path.write_bytes(original_bytes)

result = await str_replace_file_tool(
Params(
path=str(file_path),
edit=Edit(old="line2", new="modified_line2"),
)
)

assert not result.is_error
assert "successfully edited" in result.message

raw = await file_path.read_bytes()
expected = b"line1\nmodified_line2\nline3\n"
assert raw == expected, f"Expected LF preserved, got {raw!r}"


async def test_replace_multiline_preserves_crlf(
str_replace_file_tool: StrReplaceFile, temp_work_dir: KaosPath
):
"""StrReplaceFile should preserve CRLF when replacing multi-line blocks."""
file_path = temp_work_dir / "crlf_multi.txt"
original_bytes = b"header\r\nline1\r\nline2\r\nfooter\r\n"
await file_path.write_bytes(original_bytes)

result = await str_replace_file_tool(
Params(
path=str(file_path),
edit=Edit(old="line1\nline2", new="new1\nnew2"),
)
)

assert not result.is_error
raw = await file_path.read_bytes()
expected = b"header\r\nnew1\r\nnew2\r\nfooter\r\n"
assert raw == expected, f"Expected CRLF preserved, got {raw!r}"


async def test_replace_with_crlf_in_new_no_double_conversion(
str_replace_file_tool: StrReplaceFile, temp_work_dir: KaosPath
):
"""StrReplaceFile should not double-convert CRLF that already exists in edit.new."""
file_path = temp_work_dir / "crlf_in_new.txt"
original_bytes = b"header\r\nline1\r\nfooter\r\n"
await file_path.write_bytes(original_bytes)

# edit.new contains CRLF (model may pass CRLF content)
result = await str_replace_file_tool(
Params(
path=str(file_path),
edit=Edit(old="line1", new="new1\r\nnew2"),
)
)

assert not result.is_error
raw = await file_path.read_bytes()
expected = b"header\r\nnew1\r\nnew2\r\nfooter\r\n"
assert raw == expected, f"Expected CRLF in new preserved without double conversion, got {raw!r}"
68 changes: 68 additions & 0 deletions tests/tools/test_write_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,71 @@ async def test_write_large_content(write_file_tool: WriteFile, temp_work_dir: Ka
assert not result.is_error
assert await file_path.exists()
assert await file_path.read_text() == content


async def test_overwrite_preserves_crlf_line_endings(
write_file_tool: WriteFile, temp_work_dir: KaosPath
):
"""WriteFile overwrite should preserve CRLF line endings from existing file."""
file_path = temp_work_dir / "crlf_overwrite.txt"
original_bytes = b"old_line1\r\nold_line2\r\n"
await file_path.write_bytes(original_bytes)

new_content = "new_line1\nnew_line2\n"
result = await write_file_tool(Params(path=str(file_path), content=new_content))

assert not result.is_error
assert "successfully overwritten" in result.message

raw = await file_path.read_bytes()
expected = b"new_line1\r\nnew_line2\r\n"
assert raw == expected, f"Expected CRLF preserved, got {raw!r}"


async def test_overwrite_preserves_lf_line_endings(
write_file_tool: WriteFile, temp_work_dir: KaosPath
):
"""WriteFile overwrite should preserve LF line endings from existing file."""
file_path = temp_work_dir / "lf_overwrite.txt"
original_bytes = b"old_line1\nold_line2\n"
await file_path.write_bytes(original_bytes)

new_content = "new_line1\nnew_line2\n"
result = await write_file_tool(Params(path=str(file_path), content=new_content))

assert not result.is_error
raw = await file_path.read_bytes()
expected = b"new_line1\nnew_line2\n"
assert raw == expected, f"Expected LF preserved, got {raw!r}"


async def test_new_file_write_uses_lf(
write_file_tool: WriteFile, temp_work_dir: KaosPath
):
"""Writing a new file should use the content's line endings as-is (LF by default)."""
file_path = temp_work_dir / "new_lf.txt"
content = "line1\nline2\n"

result = await write_file_tool(Params(path=str(file_path), content=content))

assert not result.is_error
raw = await file_path.read_bytes()
assert raw == b"line1\nline2\n", f"Expected LF for new file, got {raw!r}"


async def test_overwrite_crlf_input_no_double_conversion(
write_file_tool: WriteFile, temp_work_dir: KaosPath
):
"""WriteFile overwrite should not double-convert existing CRLF in params.content."""
file_path = temp_work_dir / "crlf_overwrite_input.txt"
original_bytes = b"old1\r\nold2\r\n"
await file_path.write_bytes(original_bytes)

# params.content already contains CRLF (e.g., from model)
new_content = "new1\r\nnew2\r\n"
result = await write_file_tool(Params(path=str(file_path), content=new_content))

assert not result.is_error
raw = await file_path.read_bytes()
expected = b"new1\r\nnew2\r\n"
assert raw == expected, f"Expected preserved CRLF without double conversion, got {raw!r}"
Loading