diff --git a/photomap/backend/embeddings.py b/photomap/backend/embeddings.py index cab98bb..3b09bb4 100644 --- a/photomap/backend/embeddings.py +++ b/photomap/backend/embeddings.py @@ -19,7 +19,7 @@ from concurrent.futures import ThreadPoolExecutor from datetime import datetime from pathlib import Path -from typing import Any, ClassVar +from typing import Any, ClassVar, NamedTuple import networkx as nx import numpy as np @@ -306,6 +306,21 @@ class IndexResult(BaseModel): embedding_dim: int = 512 +class _ExistingIndex(NamedTuple): + """Snapshot of the arrays loaded from a saved ``.npz`` index. + + Used by both ``update_index`` and ``update_index_async`` so the load + + encoder-spec mismatch check lives in one place. + """ + + embeddings: np.ndarray + filenames: np.ndarray + modification_times: np.ndarray + metadata: np.ndarray + model_id: str + embedding_dim: int + + class Embeddings(BaseModel): """ A class to handle image embeddings using CLIP. @@ -899,6 +914,87 @@ def traversal_callback(count, message): progress_tracker.set_error(album_key, str(e)) raise + def _load_existing_index_arrays(self) -> _ExistingIndex: + """Read the saved ``.npz`` arrays and verify the stored encoder spec. + + Raises :class:`EmbeddingCacheMismatch` when the on-disk encoder model + differs from this instance's ``encoder_spec`` — mixing them silently + would produce nonsense similarity scores. + """ + data = np.load(self.embeddings_path, allow_pickle=True) + existing_embeddings = data["embeddings"] + existing_filenames = data["filenames"] + existing_modtimes = data["modification_times"] + existing_metadatas = data["metadata"] + existing_model_id = ( + str(data["model_id"]) if "model_id" in data.files else LEGACY_ENCODER_SPEC + ) + existing_dim = ( + int(data["embedding_dim"]) + if "embedding_dim" in data.files + else (int(existing_embeddings.shape[1]) if existing_embeddings.size else 512) + ) + if existing_model_id != self.encoder_spec: + raise EmbeddingCacheMismatch( + existing_model_id, self.encoder_spec, str(self.embeddings_path) + ) + return _ExistingIndex( + embeddings=existing_embeddings, + filenames=existing_filenames, + modification_times=existing_modtimes, + metadata=existing_metadatas, + model_id=existing_model_id, + embedding_dim=existing_dim, + ) + + def _finalize_index_update( + self, + filtered_existing: IndexResult, + new_result: IndexResult, + missing_image_count: int, + *, + on_save_start: Callable[[], None] | None = None, + ) -> tuple[IndexResult, bool]: + """Decide between save-the-combined-batch and noop, return the result. + + When there are no new files and no removed files this is a true noop: + ``_save_embeddings`` is *not* called and the caller is expected to + keep using the existing on-disk index. The second tuple element + (``did_rebuild``) lets the caller know whether the UMAP should be + treated as freshly invalidated. + + ``on_save_start`` (when supplied) fires immediately before the + combine+save runs, so the async caller can flip its progress tracker + to "Saving updated index" only when there's actually going to be a + save — without the hook the noop path would briefly show a stale + "Saving" message. + + ``umap_embeddings`` on the returned ``IndexResult`` is left ``None`` + because attaching UMAP differs by call path (sync vs async-thread). + """ + new_files_indexed = new_result.embeddings.shape[0] + old_files_removed = missing_image_count + + if new_files_indexed == 0 and old_files_removed == 0: + return ( + IndexResult( + embeddings=filtered_existing.embeddings, + filenames=filtered_existing.filenames, + modification_times=filtered_existing.modification_times, + metadata=filtered_existing.metadata, + bad_files=new_result.bad_files, + model_id=filtered_existing.model_id, + embedding_dim=filtered_existing.embedding_dim, + ), + False, + ) + + if on_save_start is not None: + on_save_start() + combined = self._combine_index_results(filtered_existing, new_result) + self._save_embeddings(combined) + return combined, True + def update_index( self, image_paths_or_dir: list[Path] | Path, @@ -911,99 +1007,64 @@ def update_index( ), f"Embeddings file {self.embeddings_path} does not exist. Please create an index first." try: - # Load existing data - data = np.load(self.embeddings_path, allow_pickle=True) - existing_embeddings = data["embeddings"] - existing_filenames = data["filenames"] - existing_modtimes = data["modification_times"] - existing_metadatas = data["metadata"] - existing_model_id = ( - str(data["model_id"]) if "model_id" in data.files else LEGACY_ENCODER_SPEC - ) - existing_dim = ( - int(data["embedding_dim"]) - if "embedding_dim" in data.files - else (int(existing_embeddings.shape[1]) if existing_embeddings.size else 512) - ) - - # Refuse to mix encoders within a single index file. - if existing_model_id != self.encoder_spec: - raise EmbeddingCacheMismatch( - existing_model_id, self.encoder_spec, str(self.embeddings_path) - ) + existing = self._load_existing_index_arrays() - # Identify new and missing images logger.info(f"Scanning for new images in {image_paths_or_dir}...") new_image_paths, missing_image_paths = self._get_new_and_missing_images( image_paths_or_dir, - existing_filenames, + existing.filenames, ) - - # Filter out missing images filtered_existing = self._filter_missing_images( missing_image_paths, - existing_embeddings, - existing_filenames, - existing_modtimes, - existing_metadatas, - model_id=existing_model_id, - embedding_dim=existing_dim, + existing.embeddings, + existing.filenames, + existing.modification_times, + existing.metadata, + model_id=existing.model_id, + embedding_dim=existing.embedding_dim, ) if len(filtered_existing.filenames) == 0 and len(new_image_paths) == 0: logger.warning( "No images found in album directory(ies). Exiting update." ) - return + return None - # Update progress tracker with actual count total_new_images = len(new_image_paths) logger.info( - f"Found {total_new_images} new images to index, {len(missing_image_paths)} missing. Beginning indexing..." + f"Found {total_new_images} new images to index, " + f"{len(missing_image_paths)} missing. Beginning indexing..." ) - # Process new images new_result = self._process_images_batch( list(new_image_paths), batch_size=batch_size, num_workers=num_workers ) - new_files_indexed = new_result.embeddings.shape[0] - old_files_removed = len(missing_image_paths) logger.info( - f"New files indexed: {new_files_indexed}, Old files removed: {old_files_removed}" + f"New files indexed: {new_result.embeddings.shape[0]}, " + f"Old files removed: {len(missing_image_paths)}" ) - # If no new embeddings were created, return existing data - if new_files_indexed == 0 and old_files_removed == 0: + result, did_rebuild = self._finalize_index_update( + filtered_existing, new_result, len(missing_image_paths) + ) + if not did_rebuild: logger.info( "No new images needed to be indexed. Will not regenerate umap" ) - return IndexResult( - embeddings=filtered_existing.embeddings, - filenames=filtered_existing.filenames, - modification_times=filtered_existing.modification_times, - metadata=filtered_existing.metadata, - umap_embeddings=self.umap_embeddings, - bad_files=new_result.bad_files, - model_id=filtered_existing.model_id, - embedding_dim=filtered_existing.embedding_dim, - ) - - # Final progress update - logger.info("Indexing completed successfully. Saving updated index...") - - # Combine and save - combined_result = self._combine_index_results(filtered_existing, new_result) - self._save_embeddings(combined_result) + else: + logger.info("Indexing completed successfully. Saving updated index...") - # Rebuild the umap index - combined_result.umap_embeddings = self.umap_embeddings - assert new_result.umap_embeddings is not None - logger.info( - f"UMAP index created with shape: {new_result.umap_embeddings.shape}" - ) + # Attach UMAP: when ``did_rebuild`` is True the saved npz invalidates + # the umap.npz cache and the property rebuilds; otherwise the property + # just loads the existing UMAP from disk. + result.umap_embeddings = self.umap_embeddings + if did_rebuild and result.umap_embeddings is not None: + logger.info( + f"UMAP index created with shape: {result.umap_embeddings.shape}" + ) - return combined_result + return result except Exception as e: logger.error(f"Failed to update index: {e}") @@ -1022,65 +1083,41 @@ async def update_index_async( ), f"Embeddings file {self.embeddings_path} does not exist. Please create an index first." try: - # Load existing data - data = np.load(self.embeddings_path, allow_pickle=True) - existing_embeddings = data["embeddings"] - existing_filenames = data["filenames"] - existing_modtimes = data["modification_times"] - existing_metadatas = data["metadata"] - existing_model_id = ( - str(data["model_id"]) if "model_id" in data.files else LEGACY_ENCODER_SPEC - ) - existing_dim = ( - int(data["embedding_dim"]) - if "embedding_dim" in data.files - else (int(existing_embeddings.shape[1]) if existing_embeddings.size else 512) - ) + existing = self._load_existing_index_arrays() - if existing_model_id != self.encoder_spec: - raise EmbeddingCacheMismatch( - existing_model_id, self.encoder_spec, str(self.embeddings_path) - ) - - # Start scanning phase progress_tracker.start_operation(album_key, 0, "scanning") - # Create progress callback for file traversal def traversal_callback(count, message): - # Update the total as we discover more files + # Update the total as we discover more files. progress_tracker.update_total_images(album_key, max(count, 0)) progress_tracker.update_progress(album_key, count, message) - # Identify new and missing images with progress feedback new_image_paths, missing_image_paths = await asyncio.to_thread( self._get_new_and_missing_images, image_paths_or_dir, - existing_filenames, + existing.filenames, progress_callback=traversal_callback, ) - # Filter out missing images filtered_existing = self._filter_missing_images( missing_image_paths, - existing_embeddings, - existing_filenames, - existing_modtimes, - existing_metadatas, - model_id=existing_model_id, - embedding_dim=existing_dim, + existing.embeddings, + existing.filenames, + existing.modification_times, + existing.metadata, + model_id=existing.model_id, + embedding_dim=existing.embedding_dim, ) if len(filtered_existing.filenames) == 0 and len(new_image_paths) == 0: progress_tracker.set_error( album_key, "No images found in album directory(ies)" ) - return + return None - # Update progress tracker with actual count total_new_images = len(new_image_paths) progress_tracker.start_operation(album_key, total_new_images, "indexing") - # Process new images new_result = await self._process_images_batch_async( list(new_image_paths), album_key, @@ -1088,53 +1125,47 @@ def traversal_callback(count, message): num_workers=num_workers, ) - new_files_indexed = new_result.embeddings.shape[0] - old_files_removed = len(missing_image_paths) logger.info( - f"New files indexed: {new_files_indexed}, Old files removed: {old_files_removed}" + f"New files indexed: {new_result.embeddings.shape[0]}, " + f"Old files removed: {len(missing_image_paths)}" ) - # If no new embeddings were created, return existing data - if new_files_indexed == 0 and old_files_removed == 0: + # The save (when needed) is still a blocking np.savez, so push the + # combine/save through ``to_thread`` to keep the event loop free. + # The "Saving updated index" progress message only fires if a save + # is actually going to happen — see ``on_save_start`` below. + def _on_save_start() -> None: + progress_tracker.update_progress( + album_key, total_new_images, "Saving updated index" + ) + + result, did_rebuild = await asyncio.to_thread( + self._finalize_index_update, + filtered_existing, + new_result, + len(missing_image_paths), + on_save_start=_on_save_start, + ) + if not did_rebuild: logger.info( "No new images needed to be indexed. Will not regenerate umap" ) progress_tracker.complete_operation( album_key, "No new images needed to be indexed" ) - return IndexResult( - embeddings=filtered_existing.embeddings, - filenames=filtered_existing.filenames, - modification_times=filtered_existing.modification_times, - metadata=filtered_existing.metadata, - umap_embeddings=self.umap_embeddings, - bad_files=new_result.bad_files, - model_id=filtered_existing.model_id, - embedding_dim=filtered_existing.embedding_dim, - ) - - # Final progress update - progress_tracker.update_progress( - album_key, total_new_images, "Saving updated index" - ) + else: + progress_tracker.start_operation(album_key, total_new_images, "mapping") - # Combine and save - combined_result = self._combine_index_results(filtered_existing, new_result) - self._save_embeddings(combined_result) + # UMAP rebuild (when needed) is the slow step; keep it off-thread. + result.umap_embeddings = await asyncio.to_thread(lambda: self.umap_embeddings) - # Rebuild the umap index - progress_tracker.start_operation(album_key, total_new_images, "mapping") - new_result.umap_embeddings = await asyncio.to_thread( - lambda: self.umap_embeddings - ) - - # Mark as completed - progress_tracker.complete_operation( - album_key, - f"Successfully indexed {len(new_result.embeddings)} new images", - ) + if did_rebuild: + progress_tracker.complete_operation( + album_key, + f"Successfully indexed {len(result.embeddings)} new images", + ) - return combined_result + return result except Exception as e: progress_tracker.set_error(album_key, str(e))