Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 37 additions & 31 deletions backend/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
except Exception as _e:
raise RuntimeError("google-genai is required. Install with: pip install google-genai") from _e

import logging

logger = logging.getLogger("agents")

def _use_vertex() -> bool:
"""
Expand Down Expand Up @@ -330,22 +333,22 @@ class AgentState(TypedDict):
class KSSearchAgent:
async def run(self, query: str, keywords: List[str], want: int = 45) -> dict:
try:
print(" -> Using parallel enrichment in KS search")
logger.info("Using parallel enrichment in KS search")
general = await general_search_async(query, top_k=min(want, 50), enrich_details=True)
general = general.get("combined_results", [])
except Exception as e:
print(f"Async general search error, falling back to sync: {e}")
logger.error("Async general search error, falling back to sync: %s", e)
try:
general = general_search(query, top_k=min(want, 50), enrich_details=True).get("combined_results", [])
except Exception as e2:
print(f"Sync general search error: {e2}")
logger.error("Sync general search error: %s", e2)
general = []
try:
print(f" -> Running fuzzy search with keywords: {keywords}")
logger.info("Running fuzzy search with keywords: %s", keywords)
fuzzy = global_fuzzy_keyword_search(keywords, top_k=min(want, 50))
print(f" -> Fuzzy search returned {len(fuzzy)} results")
logger.info("Fuzzy search returned %d results", len(fuzzy))
except Exception as e:
print(f"Fuzzy config search error: {e}")
logger.error("Fuzzy config search error: %s", e)
fuzzy = []
return {"combined_results": (general + fuzzy)[: max(want, 15)]}

Expand All @@ -368,25 +371,25 @@ async def run(self, query: str, want: int, context: Optional[Dict] = None) -> Li
)
return [item.__dict__ if hasattr(item, "__dict__") else item for item in results]
except Exception as e:
print(f"Vector search error: {e}")
logger.error("Vector search error: %s", e)
return []


async def extract_keywords_and_rewrite(state: AgentState) -> AgentState:
print("--- Node: Keywords, Rewrite, Intents ---")
logger.info("--- Node: Keywords, Rewrite, Intents ---")
# Detect intents on the raw input first
intents0 = await call_gemini_detect_intents(state["query"], state.get("history", []))
if intents0 == [QueryIntent.GREETING.value]:
print("Pure greeting detected; skipping search.")
logger.info("Pure greeting detected; skipping search.")
return {**state, "effective_query": state["query"], "keywords": [], "intents": intents0}

effective = await call_gemini_rewrite_with_history(state["query"], state.get("history", []))
keywords = await call_gemini_for_keywords(effective)
# Re-evaluate intents after rewrite (usually drops greeting if mixed)
intents = await call_gemini_detect_intents(effective, state.get("history", []))
print(f" -> Effective query: {effective}")
print(f" -> Keywords: {keywords}")
print(f" -> Intents: {intents}")
logger.info("Effective query: %s", effective)
logger.info("Keywords: %s", keywords)
logger.info("Intents: %s", intents)
return {**state, "effective_query": effective, "keywords": keywords, "intents": intents}


Expand All @@ -400,9 +403,9 @@ def get_vector_agent():
return _global_vector_agent

async def execute_search(state: AgentState) -> Dict[str, Any]:
print("--- Node: Search Execution ---")
logger.info("--- Node: Search Execution ---")
if set(state.get("intents", [])) == {QueryIntent.GREETING.value}:
print("Pure greeting; skipping search.")
logger.info("Pure greeting; skipping search.")
return {"ks_results": [], "vector_results": []}
want_pool = 60 # collect enough for several pages (15 per page)

Expand All @@ -421,12 +424,12 @@ async def execute_search(state: AgentState) -> Dict[str, Any]:
ks_results_data, vec_results = await asyncio.gather(ks_task, vec_task)
all_ks_results = ks_results_data.get("combined_results", [])

print(f"Search completed: KS results={len(all_ks_results)}, Vector results={len(vec_results)}")
logger.info("Search completed: KS results=%d, Vector results=%d", len(all_ks_results), len(vec_results))
return {"ks_results": all_ks_results, "vector_results": vec_results}


def fuse_results(state: AgentState) -> AgentState:
print("--- Node: Result Fusion ---")
logger.info("--- Node: Result Fusion ---")
ks_results = state.get("ks_results", [])
vector_results = state.get("vector_results", [])
combined: Dict[str, dict] = {}
Expand All @@ -442,13 +445,18 @@ def fuse_results(state: AgentState) -> AgentState:
else:
combined[doc_id] = {**res, "final_score": res.get("_score", 0) * 0.4}
all_sorted = sorted(combined.values(), key=lambda x: x.get("final_score", 0), reverse=True)
print(f"Results summary: KS={len(ks_results)}, Vector={len(vector_results)}, Combined={len(all_sorted)}")
logger.info(
"Results summary: KS=%d, Vector=%d, Combined=%d",
len(ks_results),
len(vector_results),
len(all_sorted),
)
page_size = 15
return {**state, "all_results": all_sorted, "final_results": all_sorted[:page_size]}


async def generate_final_response(state: AgentState) -> AgentState:
print("--- Node: Response Generation ---")
logger.info("--- Node: Response Generation ---")
intents = state.get("intents", [QueryIntent.DATA_DISCOVERY.value])
if set(intents) == {QueryIntent.GREETING.value}:
response = (
Expand All @@ -469,15 +477,15 @@ async def generate_final_response(state: AgentState) -> AgentState:

start_number = state.get("__start_number__", 1)
prev_text = state.get("__previous_text__", "")
print(f"Generating response for {len(raw_results)} final results, start={start_number}, intents={intents}")

try:
response = await call_gemini_for_final_synthesis(
state["effective_query"], raw_results, intents, start_number=start_number, previous_text=prev_text
)
except Exception:
response = "Unable to process your request. Please try again."

logger.info(
"Generating response for %d final results, start=%d, intents=%s",
len(raw_results),
start_number,
intents,
)
response = await call_gemini_for_final_synthesis(
state["effective_query"], raw_results, intents, start_number=start_number, previous_text=prev_text
)
return {**state, "final_response": response}


Expand Down Expand Up @@ -575,7 +583,5 @@ async def handle_chat(self, session_id: str, query: str, reset: bool = False) ->
self.chat_history[session_id] = self.chat_history[session_id][-20:]
return response_text
except Exception as e:
print(f"Error in handle_chat: {e}")
import traceback
traceback.print_exc()
return f"Error: {e}"
logger.error("Error in handle_chat: %s", e)
logger.exception("Exception occurred in handle_chat")