Skip to content

Commit 941a52e

Browse files
committed
feat(sync): Add metadata logging for sync operations
1 parent 3d478a3 commit 941a52e

File tree

2 files changed

+152
-11
lines changed

2 files changed

+152
-11
lines changed
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import json
2+
import time
3+
from datetime import datetime
4+
from pathlib import Path
5+
from typing import Dict, List, Optional
6+
import logging
7+
8+
logger = logging.getLogger(__name__)
9+
10+
class MetadataHandler:
11+
"""Handles metadata storage and retrieval for sync operations.
12+
13+
This class manages a JSON file that stores the last 5 sync operations
14+
and maintains a record of the most recent operation with detailed information.
15+
"""
16+
17+
def __init__(self, base_dir: Path, max_history: int = 5):
18+
"""Initialize the metadata handler.
19+
20+
Args:
21+
base_dir: Base directory where metadata will be stored
22+
max_history: Maximum number of operations to keep in history
23+
"""
24+
self.base_dir = base_dir
25+
self.metadata_file = base_dir / ".sync_metadata.json"
26+
self.max_history = max_history
27+
self._ensure_metadata_file()
28+
29+
def _ensure_metadata_file(self) -> None:
30+
"""Ensure the metadata file exists with proper structure."""
31+
if not self.metadata_file.exists():
32+
initial_data = {
33+
"last_operation": None,
34+
"history": []
35+
}
36+
self._write_metadata(initial_data)
37+
38+
def _read_metadata(self) -> Dict:
39+
"""Read the current metadata from file."""
40+
try:
41+
with open(self.metadata_file, 'r') as f:
42+
return json.load(f)
43+
except Exception as e:
44+
logger.error(f"Error reading metadata file: {e}")
45+
return {"last_operation": None, "history": []}
46+
47+
def _write_metadata(self, data: Dict) -> None:
48+
"""Write metadata to file."""
49+
try:
50+
self.metadata_file.parent.mkdir(parents=True, exist_ok=True)
51+
with open(self.metadata_file, 'w') as f:
52+
json.dump(data, f, indent=2)
53+
except Exception as e:
54+
logger.error(f"Error writing metadata file: {e}")
55+
56+
def log_operation(
57+
self,
58+
operation_type: str,
59+
path: str,
60+
environment: Optional[str] = None,
61+
successful_files: Optional[List[str]] = None,
62+
failed_files: Optional[List[str]] = None,
63+
error: Optional[str] = None,
64+
start_time: Optional[float] = None
65+
) -> None:
66+
"""Log a sync operation.
67+
68+
Args:
69+
operation_type: Type of operation (e.g., "pull", "push")
70+
path: The path that was synced
71+
environment: Optional environment name
72+
successful_files: List of successfully processed files
73+
failed_files: List of files that failed to process
74+
error: Any error message if the operation failed
75+
start_time: Optional timestamp when the operation started (from time.time())
76+
"""
77+
current_time = datetime.now().isoformat()
78+
duration_ms = int((time.time() - (start_time or time.time())) * 1000) if start_time else 0
79+
80+
operation_data = {
81+
"timestamp": current_time,
82+
"operation_type": operation_type,
83+
"path": path,
84+
"environment": environment,
85+
"successful_files": successful_files or [],
86+
"failed_files": failed_files or [],
87+
"error": error,
88+
"duration_ms": duration_ms
89+
}
90+
91+
metadata = self._read_metadata()
92+
93+
# Update last operation
94+
metadata["last_operation"] = operation_data
95+
96+
# Update history
97+
metadata["history"].insert(0, operation_data)
98+
metadata["history"] = metadata["history"][:self.max_history]
99+
100+
self._write_metadata(metadata)
101+
102+
def get_last_operation(self) -> Optional[Dict]:
103+
"""Get the most recent operation details."""
104+
metadata = self._read_metadata()
105+
return metadata.get("last_operation")
106+
107+
def get_history(self) -> List[Dict]:
108+
"""Get the operation history."""
109+
metadata = self._read_metadata()
110+
return metadata.get("history", [])

src/humanloop/sync/sync_client.py

Lines changed: 42 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
from typing import List, TYPE_CHECKING, Optional
44
from functools import lru_cache
55
from humanloop.types import FileType
6+
from .metadata_handler import MetadataHandler
7+
import time
68

79
if TYPE_CHECKING:
810
from humanloop.base_client import BaseHumanloop
@@ -49,6 +51,8 @@ def __init__(
4951
self._cache_size = cache_size
5052
# Create a new cached version of get_file_content with the specified cache size
5153
self.get_file_content = lru_cache(maxsize=cache_size)(self._get_file_content_impl)
54+
# Initialize metadata handler
55+
self.metadata = MetadataHandler(self.base_dir)
5256

5357
def _get_file_content_impl(self, path: str, file_type: FileType) -> Optional[str]:
5458
"""Implementation of get_file_content without the cache decorator.
@@ -227,10 +231,6 @@ def _pull_directory(self,
227231
logger.warning(f"Skipping unsupported file type: {file.type}")
228232
continue
229233

230-
if not file.path.startswith(path):
231-
# Filter by path
232-
continue
233-
234234
# Skip if no content
235235
if not getattr(file, "content", None):
236236
logger.warning(f"No content found for {file.type} {getattr(file, 'id', '<unknown>')}")
@@ -256,11 +256,12 @@ def _pull_directory(self,
256256

257257
return successful_files
258258

259-
def pull(self, path: str, environment: str | None = None) -> List[str]:
259+
def pull(self, path: str | None = None, environment: str | None = None) -> List[str]:
260260
"""Pull files from Humanloop to local filesystem.
261261
262262
If the path ends with .prompt or .agent, pulls that specific file.
263263
Otherwise, pulls all files under the specified directory path.
264+
If no path is provided, pulls all files from the root.
264265
265266
Args:
266267
path: The path to pull from (either a specific file or directory)
@@ -269,9 +270,39 @@ def pull(self, path: str, environment: str | None = None) -> List[str]:
269270
Returns:
270271
List of successfully processed file paths
271272
"""
272-
normalized_path = self._normalize_path(path)
273-
if self.is_file(path):
274-
self._pull_file(normalized_path, environment)
275-
return [path]
276-
else:
277-
return self._pull_directory(normalized_path, environment)
273+
start_time = time.time()
274+
try:
275+
if path is None:
276+
successful_files = self._pull_directory(None, environment)
277+
failed_files = [] # Failed files are already logged in _pull_directory
278+
else:
279+
normalized_path = self._normalize_path(path)
280+
if self.is_file(path):
281+
self._pull_file(normalized_path, environment)
282+
successful_files = [path]
283+
failed_files = []
284+
else:
285+
successful_files = self._pull_directory(normalized_path, environment)
286+
failed_files = [] # Failed files are already logged in _pull_directory
287+
288+
# Log the successful operation
289+
self.metadata.log_operation(
290+
operation_type="pull",
291+
path=path or "", # Use empty string if path is None
292+
environment=environment,
293+
successful_files=successful_files,
294+
failed_files=failed_files,
295+
start_time=start_time
296+
)
297+
298+
return successful_files
299+
except Exception as e:
300+
# Log the failed operation
301+
self.metadata.log_operation(
302+
operation_type="pull",
303+
path=path or "", # Use empty string if path is None
304+
environment=environment,
305+
error=str(e),
306+
start_time=start_time
307+
)
308+
raise

0 commit comments

Comments
 (0)