diff --git a/agents/matmaster_agent/callback.py b/agents/matmaster_agent/callback.py index bdece768..842dbb91 100644 --- a/agents/matmaster_agent/callback.py +++ b/agents/matmaster_agent/callback.py @@ -17,7 +17,13 @@ from agents.matmaster_agent.model import UserContent from agents.matmaster_agent.prompt import get_user_content_lang from agents.matmaster_agent.services.quota import check_quota_service, use_quota_service -from agents.matmaster_agent.state import ERROR_DETAIL, ERROR_OCCURRED, PLAN, UPLOAD_FILE +from agents.matmaster_agent.state import ( + ERROR_DETAIL, + ERROR_OCCURRED, + HISTORY_STEPS, + PLAN, + UPLOAD_FILE, +) from agents.matmaster_agent.utils.helper_func import get_user_id logger = logging.getLogger(__name__) @@ -131,6 +137,10 @@ async def matmaster_prepare_state( callback_context.state['separate_card_info'] = callback_context.state.get( 'separate_card_info', '' ) + # 历史的所有执行过程 + callback_context.state[HISTORY_STEPS] = callback_context.state.get( + HISTORY_STEPS, [] + ) async def matmaster_set_lang( diff --git a/agents/matmaster_agent/constant.py b/agents/matmaster_agent/constant.py index bd46e4d5..6e81f121 100644 --- a/agents/matmaster_agent/constant.py +++ b/agents/matmaster_agent/constant.py @@ -16,6 +16,7 @@ MATMASTER_AGENT_NAME = 'matmaster_agent' ModelRole = 'model' +UserRole = 'user' Transfer2Agent = 'transfer_to_agent' diff --git a/agents/matmaster_agent/core_agents/base_agents/mcp_agent.py b/agents/matmaster_agent/core_agents/base_agents/mcp_agent.py index 6c828932..7b825c00 100644 --- a/agents/matmaster_agent/core_agents/base_agents/mcp_agent.py +++ b/agents/matmaster_agent/core_agents/base_agents/mcp_agent.py @@ -50,7 +50,7 @@ store_tool_result_in_memory, ) from agents.matmaster_agent.model import CostFuncType -from agents.matmaster_agent.state import PLAN +from agents.matmaster_agent.state import CURRENT_STEP, CURRENT_STEP_RESULT from agents.matmaster_agent.style import tool_response_failed_card from agents.matmaster_agent.utils.event_utils import ( all_text_event, @@ -240,8 +240,10 @@ async def _run_events(self, ctx: InvocationContext) -> AsyncGenerator[Event, Non raise parsed_tool_result = await parse_result(ctx, dict_result) - logger.info( - f'{ctx.session.id} parsed_tool_result = {parsed_tool_result}' + post_execution_step = copy.deepcopy(ctx.session.state[CURRENT_STEP]) + post_execution_step[CURRENT_STEP_RESULT] = parsed_tool_result + yield update_state_event( + ctx, state_delta={CURRENT_STEP: post_execution_step} ) for _frontend_render_event in frontend_render_event( ctx, @@ -263,9 +265,7 @@ async def _run_events(self, ctx: InvocationContext) -> AsyncGenerator[Event, Non not event.partial and event.content.parts[0].text == 'All Function Calls Are Occurred Before, Continue' - and ctx.session.state[PLAN]['steps'][ - ctx.session.state['plan_index'] - ]['status'] + and ctx.session.state[CURRENT_STEP]['status'] == PlanStepStatusEnum.PROCESS ): for _info_event in all_text_event( diff --git a/agents/matmaster_agent/core_agents/base_agents/schema_agent.py b/agents/matmaster_agent/core_agents/base_agents/schema_agent.py index 07089e4f..24c80fdd 100644 --- a/agents/matmaster_agent/core_agents/base_agents/schema_agent.py +++ b/agents/matmaster_agent/core_agents/base_agents/schema_agent.py @@ -13,6 +13,7 @@ from agents.matmaster_agent.core_agents.comp_agents.dntransfer_climit_agent import ( CombinedDisallowTransferAndContentLimitMixin, ) +from agents.matmaster_agent.logger import PrefixFilter from agents.matmaster_agent.utils.event_utils import ( context_function_event, is_function_call, @@ -22,6 +23,8 @@ from agents.matmaster_agent.utils.helper_func import extract_json_from_string logger = logging.getLogger(__name__) +logger.addFilter(PrefixFilter(MATMASTER_AGENT_NAME)) +logger.setLevel(logging.INFO) class SchemaAgent(ErrorHandleLlmAgent): @@ -41,7 +44,7 @@ async def _run_events(self, ctx: InvocationContext) -> AsyncGenerator[Event, Non r',(\s*[}\]])', r'\1', repaired_raw_text ) # 移除尾随逗号 logger.info( - f'[{MATMASTER_AGENT_NAME}]:[{ctx.session.id}] repaired_raw_text = {repaired_raw_text}' + f'{ctx.session.id} {self.name} repaired_raw_text = {repaired_raw_text}' ) schema_info: dict = json.loads(repaired_raw_text) diff --git a/agents/matmaster_agent/core_agents/comp_agents/recommend_summary_agent/agent.py b/agents/matmaster_agent/core_agents/comp_agents/recommend_summary_agent/agent.py index 8099f740..75edae66 100644 --- a/agents/matmaster_agent/core_agents/comp_agents/recommend_summary_agent/agent.py +++ b/agents/matmaster_agent/core_agents/comp_agents/recommend_summary_agent/agent.py @@ -55,7 +55,6 @@ ) from agents.matmaster_agent.flow_agents.model import PlanStepStatusEnum from agents.matmaster_agent.llm_config import MatMasterLlmConfig -from agents.matmaster_agent.locales import i18n from agents.matmaster_agent.logger import PrefixFilter from agents.matmaster_agent.model import ToolCallInfoSchema from agents.matmaster_agent.prompt import ( @@ -63,7 +62,11 @@ GLOBAL_SCHEMA_INSTRUCTION, get_vocabulary_enforce_prompt, ) -from agents.matmaster_agent.state import RECOMMEND_PARAMS, STEP_DESCRIPTION +from agents.matmaster_agent.state import ( + CURRENT_STEP, + CURRENT_STEP_DESCRIPTION, + RECOMMEND_PARAMS, +) from agents.matmaster_agent.sub_agents.tools import ALL_TOOLS from agents.matmaster_agent.utils.event_utils import ( context_function_event, @@ -178,9 +181,7 @@ def summary_agent(self) -> DisallowTransferAndContentLimitLlmAgent: @override async def _run_events(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]: # 根据计划来 - current_step = ctx.session.state['plan']['steps'][ - ctx.session.state['plan_index'] - ] + current_step = ctx.session.state[CURRENT_STEP] current_step_tool_name = current_step['tool_name'] # 连接 tool-server,获取doc和函数声明 @@ -214,7 +215,7 @@ async def _run_events(self, ctx: InvocationContext) -> AsyncGenerator[Event, Non ) self.tool_call_info_agent.instruction = gen_tool_call_info_instruction( - user_prompt=current_step[STEP_DESCRIPTION], + user_prompt=current_step[CURRENT_STEP_DESCRIPTION], agent_prompt=self.instruction, tool_doc=tool_doc, tool_schema=tool_schema, @@ -318,7 +319,7 @@ async def _run_events(self, ctx: InvocationContext) -> AsyncGenerator[Event, Non step_title = ctx.session.state.get('step_title', {}).get( 'title', - f"{i18n.t(ctx.session.state['separate_card_info'])} {ctx.session.state['plan_index'] + 1}: {current_step_tool_name}", + current_step_tool_name, ) for matmaster_flow_event in context_function_event( ctx, diff --git a/agents/matmaster_agent/core_agents/public_agents/job_agents/agent.py b/agents/matmaster_agent/core_agents/public_agents/job_agents/agent.py index beaac29c..c43e3d7c 100644 --- a/agents/matmaster_agent/core_agents/public_agents/job_agents/agent.py +++ b/agents/matmaster_agent/core_agents/public_agents/job_agents/agent.py @@ -25,8 +25,8 @@ SubmitRenderAgent, ) from agents.matmaster_agent.flow_agents.model import PlanStepStatusEnum -from agents.matmaster_agent.locales import i18n from agents.matmaster_agent.logger import PrefixFilter +from agents.matmaster_agent.state import CURRENT_STEP from agents.matmaster_agent.utils.event_utils import ( all_text_event, context_function_event, @@ -132,9 +132,7 @@ async def _run_events(self, ctx: InvocationContext) -> AsyncGenerator[Event, Non async for result_event in self.result_agent.run_async(ctx): yield result_event - current_step = ctx.session.state['plan']['steps'][ - ctx.session.state['plan_index'] - ] + current_step = ctx.session.state[CURRENT_STEP] current_step_tool_name = current_step['tool_name'] current_step_status = current_step['status'] if current_step_status in [ @@ -144,7 +142,7 @@ async def _run_events(self, ctx: InvocationContext) -> AsyncGenerator[Event, Non # Only Query Job Result step_title = ctx.session.state.get('step_title', {}).get( 'title', - f"{i18n.t(ctx.session.state['separate_card_info'])} {ctx.session.state['plan_index'] + 1}: {current_step_tool_name}", + current_step_tool_name, ) for matmaster_flow_event in context_function_event( ctx, diff --git a/agents/matmaster_agent/core_agents/public_agents/job_agents/result_core_agent/agent.py b/agents/matmaster_agent/core_agents/public_agents/job_agents/result_core_agent/agent.py index a8e43cd6..ebc73570 100644 --- a/agents/matmaster_agent/core_agents/public_agents/job_agents/result_core_agent/agent.py +++ b/agents/matmaster_agent/core_agents/public_agents/job_agents/result_core_agent/agent.py @@ -25,12 +25,14 @@ from agents.matmaster_agent.core_agents.public_agents.job_agents.result_core_agent.prompt import ( ResultCoreAgentDescription, ) +from agents.matmaster_agent.flow_agents.step_utils import get_current_step from agents.matmaster_agent.logger import PrefixFilter from agents.matmaster_agent.services.job import ( get_job_detail, parse_and_prepare_err, parse_and_prepare_results, ) +from agents.matmaster_agent.state import CURRENT_STEP, CURRENT_STEP_STATUS from agents.matmaster_agent.utils.event_utils import ( all_text_event, context_function_event, @@ -115,14 +117,8 @@ async def _run_events(self, ctx: InvocationContext) -> AsyncGenerator[Event, Non if status != 'Running': # 更新状态 plan_status = 'success' if status == 'Finished' else 'failed' - update_plan = copy.deepcopy(ctx.session.state['plan']) - logger.info( - f'{ctx.session.id} plan_index = {ctx.session.state['plan_index']}' - ) - update_plan['steps'][ctx.session.state['plan_index']][ - 'status' - ] = plan_status - + post_execution_step = copy.deepcopy(get_current_step(ctx)) + post_execution_step[CURRENT_STEP_STATUS] = plan_status update_long_running_jobs = copy.deepcopy( ctx.session.state['long_running_jobs'] ) @@ -131,7 +127,7 @@ async def _run_events(self, ctx: InvocationContext) -> AsyncGenerator[Event, Non ctx, state_delta={ 'long_running_jobs': update_long_running_jobs, - 'plan': update_plan, + CURRENT_STEP: post_execution_step, }, ) diff --git a/agents/matmaster_agent/core_agents/public_agents/job_agents/submit_core_agent/agent.py b/agents/matmaster_agent/core_agents/public_agents/job_agents/submit_core_agent/agent.py index 4888d1a0..d5291ce2 100644 --- a/agents/matmaster_agent/core_agents/public_agents/job_agents/submit_core_agent/agent.py +++ b/agents/matmaster_agent/core_agents/public_agents/job_agents/submit_core_agent/agent.py @@ -15,10 +15,11 @@ DisallowTransferAndContentLimitMCPAgent, ) from agents.matmaster_agent.flow_agents.model import PlanStepStatusEnum +from agents.matmaster_agent.flow_agents.step_utils import get_current_step from agents.matmaster_agent.locales import i18n from agents.matmaster_agent.logger import PrefixFilter from agents.matmaster_agent.model import BohrJobInfo, DFlowJobInfo -from agents.matmaster_agent.state import PLAN +from agents.matmaster_agent.state import CURRENT_STEP, CURRENT_STEP_STATUS from agents.matmaster_agent.style import tool_response_failed_card from agents.matmaster_agent.utils.event_utils import ( all_text_event, @@ -99,11 +100,13 @@ async def _run_events(self, ctx: InvocationContext) -> AsyncGenerator[Event, Non yield tool_response_failed_event # 更新 plan 为失败 - update_plan = copy.deepcopy(ctx.session.state['plan']) - update_plan['steps'][ctx.session.state['plan_index']][ - 'status' - ] = 'failed' - yield update_state_event(ctx, state_delta={'plan': update_plan}) + post_execution_step = copy.deepcopy(get_current_step(ctx)) + post_execution_step[CURRENT_STEP_STATUS] = ( + PlanStepStatusEnum.FAILED + ) + yield update_state_event( + ctx, state_delta={CURRENT_STEP: post_execution_step} + ) raise RuntimeError('Tool Execution Failed') dict_result = load_tool_response(first_part) @@ -189,12 +192,14 @@ async def _run_events(self, ctx: InvocationContext) -> AsyncGenerator[Event, Non yield tool_response_failed_event # 更新 plan 为失败 - update_plan = copy.deepcopy(ctx.session.state['plan']) - update_plan['steps'][ctx.session.state['plan_index']][ - 'status' - ] = 'failed' + post_execution_step = copy.deepcopy( + get_current_step(ctx) + ) + post_execution_step[CURRENT_STEP_STATUS] = ( + PlanStepStatusEnum.FAILED + ) yield update_state_event( - ctx, state_delta={'plan': update_plan} + ctx, state_delta={CURRENT_STEP: post_execution_step} ) raise RuntimeError('Tool Execution Failed') @@ -263,19 +268,17 @@ async def _run_events(self, ctx: InvocationContext) -> AsyncGenerator[Event, Non not event.partial and event.content.parts[0].text == 'All Function Calls Are Occurred Before, Continue' - and ctx.session.state[PLAN]['steps'][ - ctx.session.state['plan_index'] - ]['status'] + and ctx.session.state[CURRENT_STEP]['status'] == PlanStepStatusEnum.PROCESS ): for _info_event in all_text_event( ctx, self.name, '工具参数无变化,本次跳过执行', ModelRole ): yield _info_event - update_plan = copy.deepcopy(ctx.session.state['plan']) - update_plan['steps'][ctx.session.state['plan_index']][ - 'status' - ] = PlanStepStatusEnum.FAILED - yield update_state_event(ctx, state_delta={'plan': update_plan}) + post_execution_step = copy.deepcopy(ctx.session.state[CURRENT_STEP]) + post_execution_step['status'] = PlanStepStatusEnum.FAILED + yield update_state_event( + ctx, state_delta={CURRENT_STEP: post_execution_step} + ) else: yield event diff --git a/agents/matmaster_agent/flow_agents/agent.py b/agents/matmaster_agent/flow_agents/agent.py index 06369b6e..d8a7dfe0 100644 --- a/agents/matmaster_agent/flow_agents/agent.py +++ b/agents/matmaster_agent/flow_agents/agent.py @@ -14,7 +14,6 @@ ) from agents.matmaster_agent.constant import ( CURRENT_ENV, - FRONTEND_STATE_KEY, MATMASTER_AGENT_NAME, ModelRole, ) @@ -27,8 +26,17 @@ from agents.matmaster_agent.core_agents.comp_agents.dntransfer_climit_agent import ( DisallowTransferAndContentLimitLlmAgent, ) -from agents.matmaster_agent.core_agents.public_agents.job_agents.agent import ( - BaseAsyncJobAgent, +from agents.matmaster_agent.flow_agents.all_finished_agent.callback import ( + only_select_user_request, +) +from agents.matmaster_agent.flow_agents.all_finished_agent.constant import ( + ALL_FINISHED_AGENT, +) +from agents.matmaster_agent.flow_agents.all_finished_agent.prompt import ( + create_all_finished_instruction, +) +from agents.matmaster_agent.flow_agents.all_finished_agent.schema import ( + AllFinishedSchema, ) from agents.matmaster_agent.flow_agents.analysis_agent.prompt import ( get_analysis_instruction, @@ -40,7 +48,6 @@ ) from agents.matmaster_agent.flow_agents.constant import ( MATMASTER_FLOW, - MATMASTER_FLOW_PLANS, MATMASTER_GENERATE_NPS, MATMASTER_INTENT_UI, MATMASTER_THINKING_UI, @@ -71,7 +78,7 @@ get_plan_make_instruction, ) from agents.matmaster_agent.flow_agents.plan_make_agent.schema import ( - create_dynamic_multi_plans_schema, + create_dynamic_step_schema, ) from agents.matmaster_agent.flow_agents.report_agent.prompt import ( get_report_instruction, @@ -79,7 +86,6 @@ from agents.matmaster_agent.flow_agents.scene_agent.constant import SCENE_AGENT from agents.matmaster_agent.flow_agents.scene_agent.prompt import SCENE_INSTRUCTION from agents.matmaster_agent.flow_agents.scene_agent.schema import SceneSchema -from agents.matmaster_agent.flow_agents.schema import FlowStatusEnum from agents.matmaster_agent.flow_agents.step_title_agent.callback import ( filter_llm_contents, ) @@ -87,8 +93,12 @@ STEP_TITLE_INSTRUCTION, ) from agents.matmaster_agent.flow_agents.step_title_agent.schema import StepTitleSchema +from agents.matmaster_agent.flow_agents.step_utils import ( + get_current_step, + is_job_submitted_step, +) from agents.matmaster_agent.flow_agents.step_validation_agent.prompt import ( - STEP_VALIDATION_INSTRUCTION, + create_step_validation_instruction, ) from agents.matmaster_agent.flow_agents.step_validation_agent.schema import ( StepValidationSchema, @@ -99,9 +109,8 @@ ) from agents.matmaster_agent.flow_agents.thinking_agent.constant import THINKING_AGENT from agents.matmaster_agent.flow_agents.utils import ( - check_plan, + find_alternative_tool, get_tools_list, - is_plan_confirmed, scenes_contain_query_job_status, should_bypass_confirmation, ) @@ -131,10 +140,12 @@ from agents.matmaster_agent.services.questions import get_random_questions from agents.matmaster_agent.services.session_files import get_session_files from agents.matmaster_agent.state import ( - BIZ, + CURRENT_STEP, + CURRENT_STEP_TOOL_NAME, EXPAND, + FINISHED_STATE, + HISTORY_STEPS, MULTI_PLANS, - PLAN, UPLOAD_FILE, ) from agents.matmaster_agent.sub_agents.mapping import ( @@ -207,7 +218,7 @@ def after_init(self): name=PLAN_MAKE_AGENT, model=MatMasterLlmConfig.tool_schema_model, description='根据用户的问题依据现有工具执行计划,如果没有工具可用,告知用户,不要自己制造工具或幻想', - state_key=MULTI_PLANS, + state_key=CURRENT_STEP, global_instruction=GLOBAL_INSTRUCTION, before_model_callback=filter_plan_make_llm_contents, ) @@ -224,6 +235,15 @@ def after_init(self): self._execution_agent = None + self._all_finished_agent = DisallowTransferAndContentLimitSchemaAgent( + name=ALL_FINISHED_AGENT, + model=MatMasterLlmConfig.tool_schema_model, + description='检查用户的目标是否完成', + output_schema=AllFinishedSchema, + state_key=FINISHED_STATE, + before_model_callback=only_select_user_request, + ) + self._analysis_agent = DisallowTransferAndContentLimitLlmAgent( name='execution_summary_agent', model=MatMasterLlmConfig.default_litellm_model, @@ -248,6 +268,7 @@ def after_init(self): self.scene_agent, self.thinking_agent, self.plan_make_agent, + self.all_finished_agent, self.analysis_agent, self.report_agent, ] @@ -309,14 +330,27 @@ def analysis_agent(self) -> LlmAgent: def report_agent(self) -> LlmAgent: return self._report_agent + @computed_field + @property + def all_finished_agent(self) -> DisallowTransferAndContentLimitSchemaAgent: + return self._all_finished_agent + def _build_execution_agent_for_plan( self, ctx: InvocationContext ) -> MatMasterSupervisorAgent: + current_step = get_current_step(ctx) + current_step_tool_name = current_step.get(CURRENT_STEP_TOOL_NAME) + belonging_agent = ALL_TOOLS.get(current_step_tool_name, {}).get( + 'belonging_agent' + ) + step_validation_agent = DisallowTransferAndContentLimitSchemaAgent( name='step_validation_agent', model=MatMasterLlmConfig.tool_schema_model, description='校验步骤执行结果是否合理', - instruction=STEP_VALIDATION_INSTRUCTION, + instruction=create_step_validation_instruction( + find_alternative_tool(current_step_tool_name) + ), output_schema=StepValidationSchema, state_key='step_validation', after_model_callback=MatMasterLlmConfig.opik_tracer.after_model_callback, @@ -332,28 +366,16 @@ def _build_execution_agent_for_plan( before_model_callback=filter_llm_contents, after_model_callback=MatMasterLlmConfig.opik_tracer.after_model_callback, ) - plan_steps = ctx.session.state.get('plan', {}).get('steps', []) - agent_names = [] - for step in plan_steps: - tool_name = step.get('tool_name') - if not tool_name: - continue - belonging_agent = ALL_TOOLS.get(tool_name, {}).get('belonging_agent') - if belonging_agent and belonging_agent not in agent_names: - agent_names.append(belonging_agent) - - sub_agents = [ - AGENT_CLASS_MAPPING[agent_name](MatMasterLlmConfig) - for agent_name in agent_names - if agent_name in AGENT_CLASS_MAPPING - ] - execution_agent = MatMasterSupervisorAgent( name='execution_agent', model=MatMasterLlmConfig.default_litellm_model, description='根据 materials_plan 返回的计划进行总结', instruction='', - sub_agents=sub_agents + [step_title_agent] + [step_validation_agent], + sub_agents=[ + AGENT_CLASS_MAPPING[belonging_agent](MatMasterLlmConfig), + step_title_agent, + step_validation_agent, + ], ) track_adk_agent_recursive(execution_agent, MatMasterLlmConfig.opik_tracer) return execution_agent @@ -485,7 +507,7 @@ async def _run_scene_agent( logger.info(f'{ctx.session.id} scenes = {scenes}') yield update_state_event(ctx, state_delta={'scenes': copy.deepcopy(scenes)}) - async def _run_plan_make_agent( + async def _run_step_make_agent( self, ctx: InvocationContext, UPDATE_USER_CONTENT, @@ -493,12 +515,6 @@ async def _run_plan_make_agent( *, skip_thinking: bool = False, ) -> AsyncGenerator[Event, None]: - # 制定计划 - if check_plan(ctx) == FlowStatusEnum.FAILED: - plan_title = i18n.t('RePlanMake') - else: - plan_title = i18n.t('PlanMake') - scenes = ctx.session.state['scenes'] available_tools = get_tools_list(ctx, scenes) if not available_tools: @@ -641,9 +657,7 @@ async def _run_plan_make_agent( thinking_context=thinking_text, session_file_summary=session_file_summary, ) - self.plan_make_agent.output_schema = create_dynamic_multi_plans_schema( - available_tools - ) + self.plan_make_agent.output_schema = create_dynamic_step_schema(available_tools) async for plan_event in self.plan_make_agent.run_async(ctx): yield plan_event @@ -687,77 +701,147 @@ async def _run_plan_make_agent( else: logger.debug('%s memory_writer output 0 insights', ctx.session.id) - # 总结计划 + # 检查是否应该跳过用户确认步骤 + if should_bypass_confirmation(ctx): + logger.info( + f'{ctx.session.id} {ctx.session.state[CURRENT_STEP]['tool_name']} bypass confirmation' + ) + # 自动设置计划确认状态 + yield update_state_event( + ctx, + state_delta={ + 'plan_confirm': { + 'flag': True, + 'selected_plan_id': 0, + 'reason': 'Auto confirmed for single bypass tool', + } + }, + ) + + async def _run_plan_execute_agent( + self, ctx: InvocationContext + ) -> AsyncGenerator[Event, None]: + # 执行计划 + self._execution_agent = self._build_execution_agent_for_plan(ctx) + if self._execution_agent: + # 使用 name 属性来检查,避免 Pydantic __eq__ 的循环引用问题 + if not any( + agent.name == self._execution_agent.name for agent in self.sub_agents + ): + self.sub_agents.append(self._execution_agent) + async for execution_event in self._execution_agent.run_async(ctx): + yield execution_event + + async def _run_summary_agent( + self, ctx: InvocationContext + ) -> AsyncGenerator[Event, None]: + # 渲染总结 yield update_state_event( ctx, state_delta={ 'matmaster_flow_active': { - 'title': plan_title, - 'font_color': '#30B37F', - 'bg_color': '#EFF8F5', - 'border_color': '#B2E0CE', + 'title': i18n.t('PlanSummary'), + 'font_color': '#9479F7', + 'bg_color': '#F5F3FF', + 'border_color': '#CFC3FC', } }, ) for matmaster_flow_event in context_function_event( ctx, self.name, - MATMASTER_FLOW, + 'matmaster_flow', None, ModelRole, { 'matmaster_flow_args': json.dumps( { - 'title': plan_title, + 'title': i18n.t('PlanSummary'), 'status': 'start', - 'font_color': '#30B37F', - 'bg_color': '#EFF8F5', - 'border_color': '#B2E0CE', + 'font_color': '#9479F7', + 'bg_color': '#F5F3FF', + 'border_color': '#CFC3FC', } ) }, ): yield matmaster_flow_event + self._analysis_agent.instruction = get_analysis_instruction( + ctx.session.state['plan'] + ) + analysis_text = '' + async for analysis_event in self.analysis_agent.run_async(ctx): + if (cur := is_text(analysis_event)) and not analysis_event.partial: + analysis_text += cur + yield analysis_event + if analysis_text.strip(): + await memory_write( + session_id=ctx.session.id, + text=f"Plan execution summary: {analysis_text.strip()}", + metadata={'source': 'execution_summary'}, + ) + logger.info( + '%s wrote execution summary to memory (%d chars)', + ctx.session.id, + len(analysis_text), + ) + self._report_agent.instruction = get_report_instruction( + ctx.session.state.get('plan', {}) + ) - plan_make_count = len(ctx.session.state[MULTI_PLANS]['plans']) - plan_info = ctx.session.state[MULTI_PLANS] - intro = plan_info['intro'] - plans = plan_info['plans'] - overall = plan_info['overall'] - - for matmaster_flow_plans_event in context_function_event( - ctx, - self.name, - MATMASTER_FLOW_PLANS, - None, - ModelRole, - { - 'plans_result': json.dumps( - { - 'invocation_id': ctx.invocation_id, - 'intro': intro, - 'plans': plans, - 'overall': overall, - } - ) - }, - ): - yield matmaster_flow_plans_event + # Collect report Markdown + report_markdown = '' + async for report_event in self.report_agent.run_async(ctx): + if (cur_text := is_text(report_event)) and not report_event.partial: + report_markdown += cur_text + + if report_markdown.strip(): + excerpt = report_markdown.strip()[:5000] + await memory_write( + session_id=ctx.session.id, + text=f"Plan execution report (excerpt): {excerpt}", + metadata={'source': 'execution_report'}, + ) + logger.info( + '%s wrote report excerpt to memory (%d chars)', + ctx.session.id, + len(excerpt), + ) + # matmaster_report_md.md + upload_result = await upload_report_md_to_oss( + ReportUploadParams( + report_markdown=report_markdown, + session_id=ctx.session.id, + invocation_id=ctx.invocation_id, + ) + ) + if upload_result: + for report_file_event in context_function_event( + ctx, + self.name, + 'matmaster_report_md', + None, + ModelRole, + { + 'url': upload_result.oss_url, + }, + ): + yield report_file_event for matmaster_flow_event in context_function_event( ctx, self.name, - MATMASTER_FLOW, + 'matmaster_flow', None, ModelRole, { 'matmaster_flow_args': json.dumps( { - 'title': plan_title, + 'title': i18n.t('PlanSummary'), 'status': 'end', - 'font_color': '#30B37F', - 'bg_color': '#EFF8F5', - 'border_color': '#B2E0CE', + 'font_color': '#9479F7', + 'bg_color': '#F5F3FF', + 'border_color': '#CFC3FC', } ) }, @@ -765,205 +849,6 @@ async def _run_plan_make_agent( yield matmaster_flow_event yield update_state_event(ctx, state_delta={'matmaster_flow_active': None}) - # 更新计划为可执行的计划 - update_multi_plans = copy.deepcopy(ctx.session.state[MULTI_PLANS]) - for update_plan in update_multi_plans['plans']: - origin_steps = update_plan['steps'] - actual_steps = [] - for step in origin_steps: - if step.get('tool_name'): - actual_steps.append(step) - else: - break - update_plan['steps'] = actual_steps - yield update_state_event(ctx, state_delta={'multi_plans': update_multi_plans}) - - # 检查是否应该跳过用户确认步骤 - if plan_make_count == 1 and should_bypass_confirmation(ctx): - # 自动设置计划确认状态 - yield update_state_event( - ctx, - state_delta={ - 'plan_confirm': { - 'flag': True, - 'selected_plan_id': 0, - 'reason': 'Auto confirmed for single bypass tool', - } - }, - ) - - async def _run_plan_execute_and_summary_agent( - self, ctx: InvocationContext - ) -> AsyncGenerator[Event, None]: - # 重置 scenes - yield update_state_event(ctx, state_delta={'scenes': []}) - # 执行计划 - if ctx.session.state['plan']['feasibility'] in ['full', 'part']: - self._execution_agent = self._build_execution_agent_for_plan(ctx) - if self._execution_agent: - # 使用 name 属性来检查,避免 Pydantic __eq__ 的循环引用问题 - if not any( - agent.name == self._execution_agent.name - for agent in self.sub_agents - ): - self.sub_agents.append(self._execution_agent) - async for execution_event in self._execution_agent.run_async(ctx): - yield execution_event - - # 全部执行完毕,总结执行情况 - if ( - check_plan(ctx) in [FlowStatusEnum.COMPLETE, FlowStatusEnum.FAILED] - or ctx.session.state['plan']['feasibility'] == 'null' - ): - # Skip summary for single-tool plans - plan_steps = ctx.session.state['plan'].get('steps', []) - tool_count = sum(1 for step in plan_steps if step.get('tool_name')) - - is_async_agent = issubclass( - AGENT_CLASS_MAPPING[ - ALL_TOOLS[plan_steps[0]['tool_name']]['belonging_agent'] - ], - BaseAsyncJobAgent, - ) - logger.info(f'is_async_agent = {is_async_agent}, tool_count = {tool_count}') - - # 渲染总结 - if tool_count > 1 or is_async_agent: - yield update_state_event( - ctx, - state_delta={ - 'matmaster_flow_active': { - 'title': i18n.t('PlanSummary'), - 'font_color': '#9479F7', - 'bg_color': '#F5F3FF', - 'border_color': '#CFC3FC', - } - }, - ) - for matmaster_flow_event in context_function_event( - ctx, - self.name, - 'matmaster_flow', - None, - ModelRole, - { - 'matmaster_flow_args': json.dumps( - { - 'title': i18n.t('PlanSummary'), - 'status': 'start', - 'font_color': '#9479F7', - 'bg_color': '#F5F3FF', - 'border_color': '#CFC3FC', - } - ) - }, - ): - yield matmaster_flow_event - self._analysis_agent.instruction = get_analysis_instruction( - ctx.session.state['plan'] - ) - analysis_text = '' - async for analysis_event in self.analysis_agent.run_async(ctx): - if (cur := is_text(analysis_event)) and not analysis_event.partial: - analysis_text += cur - yield analysis_event - if analysis_text.strip(): - await memory_write( - session_id=ctx.session.id, - text=f"Plan execution summary: {analysis_text.strip()}", - metadata={'source': 'execution_summary'}, - ) - logger.info( - '%s wrote execution summary to memory (%d chars)', - ctx.session.id, - len(analysis_text), - ) - self._report_agent.instruction = get_report_instruction( - ctx.session.state.get('plan', {}) - ) - - # Collect report Markdown - report_markdown = '' - async for report_event in self.report_agent.run_async(ctx): - if (cur_text := is_text(report_event)) and not report_event.partial: - report_markdown += cur_text - - if report_markdown.strip(): - excerpt = report_markdown.strip()[:5000] - await memory_write( - session_id=ctx.session.id, - text=f"Plan execution report (excerpt): {excerpt}", - metadata={'source': 'execution_report'}, - ) - logger.info( - '%s wrote report excerpt to memory (%d chars)', - ctx.session.id, - len(excerpt), - ) - - # matmaster_report_md.md - upload_result = await upload_report_md_to_oss( - ReportUploadParams( - report_markdown=report_markdown, - session_id=ctx.session.id, - invocation_id=ctx.invocation_id, - ) - ) - if upload_result: - for report_file_event in context_function_event( - ctx, - self.name, - 'matmaster_report_md', - None, - ModelRole, - { - 'url': upload_result.oss_url, - }, - ): - yield report_file_event - for matmaster_flow_event in context_function_event( - ctx, - self.name, - 'matmaster_flow', - None, - ModelRole, - { - 'matmaster_flow_args': json.dumps( - { - 'title': i18n.t('PlanSummary'), - 'status': 'end', - 'font_color': '#9479F7', - 'bg_color': '#F5F3FF', - 'border_color': '#CFC3FC', - } - ) - }, - ): - yield matmaster_flow_event - yield update_state_event( - ctx, state_delta={'matmaster_flow_active': None} - ) - - # 渲染追问组件 - follow_up_list = await get_random_questions(i18n=i18n) - for generate_follow_up_event in context_function_event( - ctx, - self.name, - 'matmaster_generate_follow_up', - {}, - ModelRole, - { - 'follow_up_result': json.dumps( - { - 'invocation_id': ctx.invocation_id, - 'title': i18n.t('MoreQuestions'), - 'list': follow_up_list, - } - ) - }, - ): - yield generate_follow_up_event - async def _run_research_flow( self, ctx: InvocationContext ) -> AsyncGenerator[Event, None]: @@ -993,63 +878,63 @@ async def _run_research_flow( ): yield _scene_event - # 清空 Plan 和 MULTI_PLANS - if check_plan(ctx) == FlowStatusEnum.COMPLETE: - yield update_state_event(ctx, state_delta={PLAN: {}, MULTI_PLANS: {}}) - - # 制定计划(1. 无计划;2. 计划已完成;3. 计划失败;4. 用户未确认计划) - # 仅查询任务状态时跳过 thinking(查任务状态不 thinking) - skip_thinking = scenes_contain_query_job_status(ctx) - if check_plan(ctx) in [ - FlowStatusEnum.NO_PLAN, - FlowStatusEnum.COMPLETE, - FlowStatusEnum.FAILED, - ] or not is_plan_confirmed(ctx): - async for _plan_make_event in self._run_plan_make_agent( - ctx, - UPDATE_USER_CONTENT, - TOOLCHAIN_EXAMPLES_PROMPT, - skip_thinking=skip_thinking, - ): - yield _plan_make_event - - # 从 MultiPlans 中选择某个计划 - logger.info(f'{ctx.session.id} check_plan = {check_plan(ctx)}') - if is_plan_confirmed(ctx) and check_plan(ctx) in [FlowStatusEnum.NEW_PLAN]: - selected_plan_id = ctx.session.state[FRONTEND_STATE_KEY][BIZ].get( - 'selected_plan_id', 0 - ) - logger.info( - f'{ctx.session.id} biz_state = {ctx.session.state[FRONTEND_STATE_KEY][BIZ]} ' - f'selected_plan_id = {selected_plan_id}' - ) - plans = ctx.session.state.get(MULTI_PLANS, {}).get('plans', []) - if ( - selected_plan_id is None - or not isinstance(selected_plan_id, int) - or selected_plan_id < 0 - or selected_plan_id >= len(plans) - ): - logger.warning( - f'{ctx.session.id} invalid selected_plan_id={selected_plan_id}, ' - f'fallback to 0' + execution_count = 0 + while True: + if not is_job_submitted_step(ctx): + skip_thinking = scenes_contain_query_job_status(ctx) or execution_count + async for _step_make_event in self._run_step_make_agent( + ctx, + UPDATE_USER_CONTENT, + TOOLCHAIN_EXAMPLES_PROMPT, + skip_thinking=skip_thinking, + ): + yield _step_make_event + + async for _plan_execute_event in self._run_plan_execute_agent(ctx): + yield _plan_execute_event + execution_count += 1 + + # 检查是否为等待异步任务执行完成的阶段 + if not is_job_submitted_step(ctx): + # 回顾历史执行 + user_request = ctx.user_content.parts[0].text + history_steps = ctx.session.state[HISTORY_STEPS] + session_files = await get_session_files(ctx.session.id) + self.all_finished_agent.instruction = create_all_finished_instruction( + user_request, history_steps, session_files ) - selected_plan_id = 0 - if not plans: - logger.warning(f'{ctx.session.id} empty multi_plans, skip plan select') - return - selected_plan = plans[selected_plan_id] - yield update_state_event(ctx, state_delta={PLAN: selected_plan}) - logger.info( - f'{ctx.session.id} Reset Plan, plan = {ctx.session.state[PLAN]}' - ) + async for _all_finished_event in self.all_finished_agent.run_async(ctx): + yield _all_finished_event + + if ctx.session.state[FINISHED_STATE]['finished']: + break + else: + break + + if not is_job_submitted_step(ctx): + # 总结计划 + async for _plan_summary_event in self._run_summary_agent(ctx): + yield _plan_summary_event - # 计划未确认,暂停往下执行 - if is_plan_confirmed(ctx): - async for ( - _plan_execute_and_summary_event - ) in self._run_plan_execute_and_summary_agent(ctx): - yield _plan_execute_and_summary_event + # 渲染追问组件 + follow_up_list = await get_random_questions(i18n=i18n) + for generate_follow_up_event in context_function_event( + ctx, + self.name, + 'matmaster_generate_follow_up', + {}, + ModelRole, + { + 'follow_up_result': json.dumps( + { + 'invocation_id': ctx.invocation_id, + 'title': i18n.t('MoreQuestions'), + 'list': follow_up_list, + } + ) + }, + ): + yield generate_follow_up_event async def _run_async_impl( self, ctx: InvocationContext diff --git a/agents/matmaster_agent/flow_agents/all_finished_agent/__init__.py b/agents/matmaster_agent/flow_agents/all_finished_agent/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/agents/matmaster_agent/flow_agents/all_finished_agent/callback.py b/agents/matmaster_agent/flow_agents/all_finished_agent/callback.py new file mode 100644 index 00000000..65812188 --- /dev/null +++ b/agents/matmaster_agent/flow_agents/all_finished_agent/callback.py @@ -0,0 +1,24 @@ +import logging +from typing import Optional + +from google.adk.agents.callback_context import CallbackContext +from google.adk.models import LlmRequest, LlmResponse +from google.genai.types import Content, Part + +from agents.matmaster_agent.constant import MATMASTER_AGENT_NAME, UserRole +from agents.matmaster_agent.logger import PrefixFilter + +logger = logging.getLogger(__name__) +logger.addFilter(PrefixFilter(MATMASTER_AGENT_NAME)) +logger.setLevel(logging.INFO) + + +async def only_select_user_request( + callback_context: CallbackContext, llm_request: LlmRequest +) -> Optional[LlmResponse]: + llm_request.contents = [ + Content( + role=UserRole, + parts=[Part(text=callback_context.user_content.parts[0].text)], + ) + ] diff --git a/agents/matmaster_agent/flow_agents/all_finished_agent/constant.py b/agents/matmaster_agent/flow_agents/all_finished_agent/constant.py new file mode 100644 index 00000000..c2359707 --- /dev/null +++ b/agents/matmaster_agent/flow_agents/all_finished_agent/constant.py @@ -0,0 +1 @@ +ALL_FINISHED_AGENT = 'all_finished_agent' diff --git a/agents/matmaster_agent/flow_agents/all_finished_agent/prompt.py b/agents/matmaster_agent/flow_agents/all_finished_agent/prompt.py new file mode 100644 index 00000000..c56e44fb --- /dev/null +++ b/agents/matmaster_agent/flow_agents/all_finished_agent/prompt.py @@ -0,0 +1,90 @@ +import json + + +def create_all_finished_instruction(user_request, history_steps, session_files): + """ + Build an instruction prompt for an agent that decides whether the user's overall goal + has been completed up to the current point in the tool-call history. + The agent must output a structured JSON: + {"finished": bool, "reason": str} + """ + history_text = json.dumps(history_steps, ensure_ascii=False, indent=2) + session_files_text = json.dumps(session_files, ensure_ascii=False, indent=2) + return f""" +You are a "Goal Completion Judge" agent. Decide whether the user's overall final objective +has been completed *as of now*, based ONLY on history_steps and session_files. +Key principle: "finished" indicates whether the session should STOP now. +- If the goal is completed: finished=true. +- If the goal is NOT completed but still achievable with further actions: finished=false. +- If the goal is NOT completed AND is blocked/unachievable given the evidence: finished=true (Termination/Unachievable), and the reason MUST explicitly say it is not completed but cannot be completed. + +IMPORTANT: The user's goal may be "content in chat" (e.g., a tutorial/summary), not necessarily a file. +Only require session_files evidence when the user explicitly asked for a file or a file is clearly the expected deliverable. + +IMPORTANT: If user_request asks for multiple items (A and B / compare X vs Y / generate N variants), finished=true ONLY when ALL are done. + +IMPORTANT: Treat explicit numeric/parameter constraints (layers, vacuum thickness, slab orientation/cut, supercell expansion like 5×5×1, etc.) as mandatory. finished=true ONLY if history_steps explicitly confirms EACH constraint was applied. + +IMPORTANT (NEW, HIGH PRIORITY): history_steps[*].suggestion is PRIMARY evidence for whether the task is still achievable. +- If ANY actionable suggestion exists (even if earlier), and it has NOT been explicitly attempted and exhausted in later history_steps, you MUST set finished=false (unless the goal is already completed). +- Actionable suggestions include: retrying with modified parameters, switching tools/providers, requesting missing inputs, rerunning with fixes, alternative workflows, etc. +- You MUST NOT output finished=true (Termination/Unachievable) when there exists any untried actionable suggestion. +- Only consider Termination/Unachievable when (a) NOT completed, AND (b) all actionable suggestions have been tried (and are evidenced as tried) with continued failure, AND (c) no remaining viable next action is suggested anywhere in history_steps. + +CRITICAL: Do NOT treat "suggestion was not acted upon" as evidence of unachievability. +If there exists any actionable history_steps[*].suggestion that has not been tried, the task is still achievable => finished=false. + +# Input +user_request: +{user_request} +history_steps (JSON): +{history_text} +session_files (JSON): +{session_files_text} + +# Decision Rules (must follow) +1) Judge ONLY the user's final goal completion / stop condition, not whether all intermediate steps ran. + +2) Deliverable type: + - If a file artifact is required (PDF/DOCX/ZIP/code project/structure file, etc.), you MUST verify an appropriate OSS link exists in session_files; otherwise finished=false (unless Termination/Unachievable applies). + - If in-chat content is required, verify the complete requested content already exists in history_steps outputs; otherwise finished=false (unless Termination/Unachievable applies). + +3) If any critical step is failed/missing/running OR outputs are insufficient to prove completion, set finished=false (unless Termination/Unachievable applies). + +4) Insufficient evidence => finished=false and state exactly what is missing (unless Termination/Unachievable applies). + +5) Contradictions: prefer later entries; if still unclear => finished=false and explain contradiction (unless Termination/Unachievable applies). + +6) Do NOT assume results not explicitly supported by history_steps/session_files. + +6.1) For explicit parameter constraints, if ANY constraint is not explicitly evidenced, finished=false (unless Termination/Unachievable applies). + +7) Suggestion-first achievability check (MUST APPLY BEFORE declaring finished=true for Termination/Unachievable): + - Scan ALL history_steps for actionable suggestions. + - If any actionable suggestion is not explicitly shown as attempted and exhausted, output finished=false. + +8) Termination/Unachievable (STOP even though not done): + You may output finished=true for Termination/Unachievable ONLY if: + - The goal is NOT completed, AND + - history_steps provide concrete evidence that no viable next action exists, AND + - EVERY actionable history_steps[*].suggestion has been explicitly tried in later history_steps and still failed, leaving no remaining options. + If ANY unresolved suggestion proposes a viable next action (e.g., change parameters, switch provider/tool, request missing info), + you MUST output finished=false (the session should continue), unless the goal is already completed. + + If you output finished=true (Termination/Unachievable), the reason MUST include: + - "NOT completed" and + - "cannot be completed / unachievable" and + - the blocking evidence (specific failed steps / missing inputs). + + You MUST NOT output finished=true (Termination/Unachievable) when the only blocking evidence is that a tool failed once and the agent has not yet tried actionable suggestions (e.g., switching provider/tool, changing parameters). In that case, output finished=false. + +# Output Format +Output ONLY ONE JSON object exactly: +{{ + "finished": true|false, + "reason": "Brief, specific English explanation citing concrete evidence from history_steps and/or session_files. If using Termination/Unachievable, explicitly state: NOT completed but cannot be completed, and cite the blocking evidence." +}} + +# Output Constraints +- Output ONLY valid JSON (no Markdown / code fences / extra text). +""".strip() diff --git a/agents/matmaster_agent/flow_agents/all_finished_agent/schema.py b/agents/matmaster_agent/flow_agents/all_finished_agent/schema.py new file mode 100644 index 00000000..f078278e --- /dev/null +++ b/agents/matmaster_agent/flow_agents/all_finished_agent/schema.py @@ -0,0 +1,6 @@ +from pydantic import BaseModel + + +class AllFinishedSchema(BaseModel): + finished: bool + reason: str diff --git a/agents/matmaster_agent/flow_agents/execution_agent/agent.py b/agents/matmaster_agent/flow_agents/execution_agent/agent.py index 5285d035..e02eb48b 100644 --- a/agents/matmaster_agent/flow_agents/execution_agent/agent.py +++ b/agents/matmaster_agent/flow_agents/execution_agent/agent.py @@ -14,29 +14,35 @@ DisallowTransferAndContentLimitLlmAgent, ) from agents.matmaster_agent.flow_agents.constant import ( - EXECUTION_TYPE_LABEL_CHANGE_TOOL, - EXECUTION_TYPE_LABEL_RETRY, MATMASTER_SUPERVISOR_AGENT, ) -from agents.matmaster_agent.flow_agents.execution_agent.utils import ( - should_exit_retryLoop, -) from agents.matmaster_agent.flow_agents.model import PlanStepStatusEnum +from agents.matmaster_agent.flow_agents.step_utils import ( + get_current_step, + get_current_step_validation, + is_job_submitted_step, +) from agents.matmaster_agent.flow_agents.step_validation_agent.prompt import ( - STEP_VALIDATION_INSTRUCTION, + create_step_validation_instruction, ) from agents.matmaster_agent.flow_agents.style import separate_card from agents.matmaster_agent.flow_agents.utils import ( check_plan, find_alternative_tool, get_agent_for_tool, - has_self_check, ) from agents.matmaster_agent.llm_config import MatMasterLlmConfig -from agents.matmaster_agent.locales import i18n from agents.matmaster_agent.logger import PrefixFilter from agents.matmaster_agent.prompt import MatMasterCheckTransferPrompt -from agents.matmaster_agent.state import PLAN, STEP_DESCRIPTION, StepKey +from agents.matmaster_agent.state import ( + CURRENT_STEP, + CURRENT_STEP_DESCRIPTION, + CURRENT_STEP_STATUS, + CURRENT_STEP_TOOL_NAME, + HISTORY_STEPS, + PLAN, + StepKey, +) from agents.matmaster_agent.sub_agents.mapping import ( MatMasterSubAgentsEnum, ) @@ -91,17 +97,16 @@ async def _update_retry_count( yield update_state_event(ctx, state_delta={PLAN: update_plan}) async def _construct_function_call_ctx( - self, ctx: InvocationContext, index + self, ctx: InvocationContext ) -> AsyncGenerator[Event, None]: - update_plan = copy.deepcopy(ctx.session.state['plan']) - current_tool_name = update_plan['steps'][index]['tool_name'] - current_tool_description = update_plan['steps'][index][STEP_DESCRIPTION] - update_plan['steps'][index]['status'] = PlanStepStatusEnum.PROCESS + current_step = copy.deepcopy(ctx.session.state[CURRENT_STEP]) + current_step_tool_name = current_step['tool_name'] + current_step_tool_description = current_step[CURRENT_STEP_DESCRIPTION] + current_step['status'] = PlanStepStatusEnum.PROCESS yield update_state_event( ctx, state_delta={ - 'plan': update_plan, - 'plan_index': index, + CURRENT_STEP: current_step, }, ) for materials_plan_function_call_event in context_function_event( @@ -109,25 +114,19 @@ async def _construct_function_call_ctx( self.name, 'materials_plan_function_call', { - 'msg': f'According to the plan, I will call the `{current_tool_name}`: {current_tool_description}' + 'msg': f'According to the plan, I will call the `{current_step_tool_name}`: {current_step_tool_description}' }, ModelRole, ): yield materials_plan_function_call_event async def _core_execution_agent( - self, ctx: InvocationContext, index + self, ctx: InvocationContext ) -> AsyncGenerator[Event, None]: logger.info( f'{ctx.session.id} Before Run: plan_index = {ctx.session.state["plan_index"]}, plan = {ctx.session.state['plan']}' ) - if ctx.session.state[PLAN]['steps'][index]['retry_count']: - separate_card_info = 'ReExecuteStep' - retry_info = f'({ctx.session.state[PLAN]['steps'][index]['retry_count']}/{MAX_TOOL_RETRIES})' - else: - separate_card_info = 'Step' - retry_info = '' - + separate_card_info = 'Step' yield update_state_event( ctx, state_delta={ @@ -138,29 +137,12 @@ async def _core_execution_agent( # 引导标题 async for title_event in self.title_agent.run_async(ctx): yield title_event - if ctx.session.state[PLAN]['steps'][index]['retry_count']: - step_title = ( - i18n.t(separate_card_info) - + f'{retry_info}' - + ': ' - + ctx.session.state.get('step_title', {}).get('title', '') - ) - else: - step_title = ctx.session.state.get('step_title', {}).get('title', '') - if ( - ctx.session.state[PLAN]['steps'][index]['status'] - == PlanStepStatusEnum.SUBMITTED - ): - step_title = '获取任务结果' + step_title = ctx.session.state.get('step_title', {}).get('title', '') - # 展示文案:更换工具 / 重试工具 / 空(正常执行),直接传给前端 - if ctx.session.state.pop('matmaster_flow_switched_tool', None): - execution_type_label = EXECUTION_TYPE_LABEL_CHANGE_TOOL - elif ctx.session.state[PLAN]['steps'][index]['retry_count']: - execution_type_label = EXECUTION_TYPE_LABEL_RETRY - else: - execution_type_label = '' + if ctx.session.state[CURRENT_STEP]['status'] == PlanStepStatusEnum.SUBMITTED: + step_title = '获取任务结果' + execution_type_label = '' yield update_state_event( ctx, state_delta={ @@ -194,10 +176,10 @@ async def _core_execution_agent( yield matmaster_flow_event # 核心执行工具(更换工具时新工具所属 Agent 可能不在 sub_agents,需动态获取) - current_tool_name = ctx.session.state[PLAN]['steps'][index]['tool_name'] - target_agent = get_agent_for_tool(current_tool_name, self.sub_agents) + current_step_tool_name = ctx.session.state[CURRENT_STEP]['tool_name'] + target_agent = get_agent_for_tool(current_step_tool_name, self.sub_agents) logger.info( - f'{ctx.session.id} tool_name = {current_tool_name}, target_agent = {target_agent.name}' + f'{ctx.session.id} tool_name = {current_step_tool_name}, target_agent = {target_agent.name}' ) async for event in target_agent.run_async(ctx): yield event @@ -206,11 +188,11 @@ async def _core_execution_agent( ) async def _tool_result_validation( - self, ctx: InvocationContext, index + self, ctx: InvocationContext ) -> AsyncGenerator[Event, None]: - current_tool_name = ctx.session.state[PLAN]['steps'][index]['tool_name'] - current_tool_description = ctx.session.state[PLAN]['steps'][index][ - STEP_DESCRIPTION + current_step_tool_name = ctx.session.state[CURRENT_STEP][CURRENT_STEP_TOOL_NAME] + current_step_tool_description = ctx.session.state[CURRENT_STEP][ + CURRENT_STEP_DESCRIPTION ] user_text = ( ctx.user_content.parts[0].text @@ -218,16 +200,21 @@ async def _tool_result_validation( else '' ) user_text = sanitize_braces(user_text) - current_tool_description = sanitize_braces(current_tool_description or '') + current_step_tool_description = sanitize_braces( + current_step_tool_description or '' + ) lines = ( f"用户原始请求: {user_text}", - f"当前步骤描述: {current_tool_description}", - f"工具名称: {current_tool_name}", + f"当前步骤描述: {current_step_tool_description}", + f"工具名称: {current_step_tool_name}", '请根据以上信息判断,工具的参数配置及对应的执行结果是否严格满足用户原始需求。', ) validation_instruction = '\n'.join(lines) self.validation_agent.instruction = ( - STEP_VALIDATION_INSTRUCTION + validation_instruction + create_step_validation_instruction( + find_alternative_tool(current_step_tool_name) + ) + + validation_instruction ) async for validation_event in self.validation_agent.run_async(ctx): @@ -266,9 +253,11 @@ async def _prepare_retry_fake_success( update_plan = copy.deepcopy(ctx.session.state['plan']) update_plan['steps'][index]['status'] = PlanStepStatusEnum.PROCESS update_plan['steps'][index]['validation_failure_reason'] = validation_reason - original_description = ctx.session.state[PLAN]['steps'][index][STEP_DESCRIPTION] + original_description = ctx.session.state[PLAN]['steps'][index][ + CURRENT_STEP_DESCRIPTION + ] update_plan['steps'][index][ - STEP_DESCRIPTION + CURRENT_STEP_DESCRIPTION ] = f"{original_description}\n\n注意:上次执行因以下原因校验失败,请改进:{validation_reason}" yield update_state_event(ctx, state_delta={'plan': update_plan}) @@ -312,11 +301,11 @@ async def _prepare_retry_other_tool( update_plan['steps'][index]['tool_name'] = next_tool update_plan['steps'][index]['status'] = PlanStepStatusEnum.PROCESS original_description = ctx.session.state[PLAN]['steps'][index][ - STEP_DESCRIPTION + CURRENT_STEP_DESCRIPTION ].split('\n\n注意:')[ 0 ] # 移除之前的失败原因 - update_plan['steps'][index][STEP_DESCRIPTION] = original_description + update_plan['steps'][index][CURRENT_STEP_DESCRIPTION] = original_description yield update_state_event( ctx, state_delta={ @@ -327,140 +316,30 @@ async def _prepare_retry_other_tool( @override async def _run_events(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]: - plan = ctx.session.state['plan'] - logger.info(f'{ctx.session.id} plan = {plan}') - - for index, initial_step in enumerate(plan['steps']): - initial_current_tool_name = initial_step['tool_name'] - tried_tools = [initial_current_tool_name] - alternatives = find_alternative_tool(initial_current_tool_name) - - tool_attempt_success = False - while not tool_attempt_success: - if ( - ctx.session.state[PLAN]['steps'][index]['status'] - == PlanStepStatusEnum.SUCCESS - ): - tool_attempt_success = True - break - else: - # 初始化 retry_count - async for _update_retry_event in self._update_retry_count( - ctx, index, 0 - ): - yield _update_retry_event - - # 同一工具重试 - while ( - ctx.session.state[PLAN]['steps'][index]['retry_count'] - <= MAX_TOOL_RETRIES - ): - # 制造工具调用上下文,已提交的任务跳过该步骤 - if ( - ctx.session.state[PLAN]['steps'][index]['status'] - != PlanStepStatusEnum.SUBMITTED - ): - async for ( - _construct_function_call_event - ) in self._construct_function_call_ctx(ctx, index): - yield _construct_function_call_event - - # 核心工具调用 - async for _core_execution_event in self._core_execution_agent( - ctx, index - ): - yield _core_execution_event - - current_steps = ctx.session.state['plan']['steps'] - # 工具调用结果返回【成功】 - if current_steps[index]['status'] == PlanStepStatusEnum.SUCCESS: - # 对成功的工具调用结果进行校验 - if has_self_check( - ctx.session.state[PLAN]['steps'][index]['tool_name'] - ): - # 校验工具结果 - async for ( - _tool_result_validation_event - ) in self._tool_result_validation(ctx, index): - yield _tool_result_validation_event - - validation_result = ctx.session.state.get( - 'step_validation', {} - ) - is_valid = validation_result.get('is_valid', True) - validation_reason = validation_result.get('reason', '') - - # “假成功”结果,计划重试 - if (not is_valid) and ctx.session.state[PLAN]['steps'][ - index - ]['retry_count'] < MAX_TOOL_RETRIES: - async for ( - _prepare_retry_fake_success_event - ) in self._prepare_retry_fake_success( - ctx, index, validation_reason - ): - yield _prepare_retry_fake_success_event - else: - # 校验成功,步骤完成 - tool_attempt_success = True - break - else: - # 无需校验,步骤完成 - tool_attempt_success = True - break - # 工具调用失败,且符合重试条件 - elif ( - current_steps[index]['status'] == PlanStepStatusEnum.FAILED - and ctx.session.state[PLAN]['steps'][index]['retry_count'] - < MAX_TOOL_RETRIES - ): - # 对于某些错误,重试没有必要,直接退出 - if should_exit_retryLoop(ctx): - break - - validation_result = ctx.session.state.get( - 'step_validation', {} - ) - validation_reason = validation_result.get('reason', '') - async for ( - _prepare_retry_failed_result_event - ) in self._prepare_retry_failed_result( - ctx, index, validation_reason - ): - yield _prepare_retry_failed_result_event - # 异步任务,直接退出当前函数 - elif ( - current_steps[index]['status'] - == PlanStepStatusEnum.SUBMITTED - ): - return - else: - # 其他状态(SUBMITTED等),退出循环 - break - - # 更换其他工具重试 - if ( - not tool_attempt_success - and ctx.session.state['plan']['steps'][index]['status'] - != PlanStepStatusEnum.SUBMITTED - ): - available_alts = [ - alt for alt in alternatives if alt not in tried_tools - ] - if available_alts: - # 尝试替换工具 - next_tool = available_alts[0] - tried_tools.append(next_tool) - async for ( - _prepare_retry_other_tool_event - ) in self._prepare_retry_other_tool(ctx, index, next_tool): - yield _prepare_retry_other_tool_event - else: - logger.warning( - f'{ctx.session.id} No more alternative tools for step {index + 1}' - ) - break # 退出tool while + # 制造工具调用上下文,已提交的任务跳过该步骤 + if not is_job_submitted_step(ctx): + async for ( + _construct_function_call_event + ) in self._construct_function_call_ctx(ctx): + yield _construct_function_call_event + + # 核心工具调用 + async for _core_execution_event in self._core_execution_agent(ctx): + yield _core_execution_event + + post_execution_step = get_current_step(ctx) + if post_execution_step[CURRENT_STEP_STATUS] != PlanStepStatusEnum.SUBMITTED: + # 校验工具结果 + async for _tool_result_validation_event in self._tool_result_validation( + ctx + ): + yield _tool_result_validation_event + # 异步任务,直接退出当前函数 + else: + return - # 最终仍然没有成功,中止计划 - if not tool_attempt_success: - break + update_history_steps = copy.deepcopy(ctx.session.state[HISTORY_STEPS]) + update_history_steps.append( + {**post_execution_step, **get_current_step_validation(ctx)} + ) + yield update_state_event(ctx, state_delta={HISTORY_STEPS: update_history_steps}) diff --git a/agents/matmaster_agent/flow_agents/plan_make_agent/agent.py b/agents/matmaster_agent/flow_agents/plan_make_agent/agent.py index 638b9d5b..a5ae24b2 100644 --- a/agents/matmaster_agent/flow_agents/plan_make_agent/agent.py +++ b/agents/matmaster_agent/flow_agents/plan_make_agent/agent.py @@ -9,7 +9,7 @@ DisallowTransferAndContentLimitSchemaAgent, ) from agents.matmaster_agent.logger import PrefixFilter -from agents.matmaster_agent.state import MULTI_PLANS +from agents.matmaster_agent.state import CURRENT_STEP from agents.matmaster_agent.utils.event_utils import update_state_event logger = logging.getLogger(__name__) @@ -24,44 +24,22 @@ async def _run_events(self, ctx: InvocationContext) -> AsyncGenerator[Event, Non async for event in super()._run_events(ctx): yield event - if ctx.session.state.get(MULTI_PLANS): + if ctx.session.state.get(CURRENT_STEP): logger.info( - f'{ctx.session.id} multi_plans = {ctx.session.state[MULTI_PLANS]}' + f'{ctx.session.id} NEXT_STEP = {ctx.session.state[CURRENT_STEP]}' ) break else: logger.error(f'{ctx.session.id} Multi Plans Generate Error, Retry') - if not ctx.session.state.get(MULTI_PLANS): + if not ctx.session.state.get(CURRENT_STEP): raise RuntimeError( f'{ctx.session.id} After Retry, Multi Plans Generate Still Error!!' ) - # 计算 feasibility - update_multi_plans = ctx.session.state['multi_plans'] - for update_plan in update_multi_plans['plans']: - update_plan['feasibility'] = 'null' - total_steps = len(update_plan.get('steps', [])) - exist_step = 0 - update_plan_steps = [] - for step in update_plan.get('steps', []): - if not step['tool_name']: - step['tool_name'] = 'llm_tool' - update_plan_steps.append(step) - update_plan['steps'] = update_plan_steps + # 处理无工具的情况 + update_current_step = ctx.session.state[CURRENT_STEP] + if not update_current_step['tool_name']: + update_current_step['tool_name'] = 'llm_tool' - for index, step in enumerate(update_plan['steps']): - if index == 0 and not step['tool_name']: - break - if step['tool_name']: - exist_step += 1 - else: - break - if not exist_step: - pass - elif exist_step != total_steps: - update_plan['feasibility'] = 'part' - else: - update_plan['feasibility'] = 'full' - - yield update_state_event(ctx, state_delta={'multi_plans': update_multi_plans}) + yield update_state_event(ctx, state_delta={CURRENT_STEP: update_current_step}) diff --git a/agents/matmaster_agent/flow_agents/plan_make_agent/prompt.py b/agents/matmaster_agent/flow_agents/plan_make_agent/prompt.py index f1c09a89..509c87d6 100644 --- a/agents/matmaster_agent/flow_agents/plan_make_agent/prompt.py +++ b/agents/matmaster_agent/flow_agents/plan_make_agent/prompt.py @@ -13,27 +13,16 @@ def get_static_plan_system_block(available_tools_with_info: str) -> str: ### OUTPUT LANGUAGE: All natural-language fields in the output MUST be written in {{target_language}}. -This includes (but is not limited to): "intro", each plan's "plan_description", each step's "step_description", each step's "feasibility", and "overall". +This includes (but is not limited to): each step's "step_description", and each step's "feasibility". Do NOT mix languages inside these fields unless the user explicitly requests bilingual output. -### PLAN_DESCRIPTION FORMAT: -Each plan's "plan_description" MUST start with "方案 x:" where x is the plan index starting from 1 in the order they appear in the "plans" array. -Example (in {{target_language}}): -- "方案 1:……" -- "方案 2:……" -Constraints: -- The prefix must be exactly "方案 x:" (Arabic numeral + full-width Chinese colon). -- Do NOT add any content before this prefix. - ### STEP_DESCRIPTION FORMAT: Each step's "step_description" MUST strictly follow this format: -- Start with an Arabic numeral index beginning at 1, incrementing by 1 within EACH plan (1, 2, 3, ...). -- Immediately after the number, use an English period "." (e.g., "1."). -- Then use the phrasing: "使用<工具名>工具进行<工作内容>". -- If "tool_name" is null, the phrasing MUST be: "使用llm_tool工具进行<工作内容>" (still must follow numbering). +- Use the phrasing: "使用<工具名>工具进行<工作内容>". +- If "tool_name" is null, the phrasing MUST be: "使用llm_tool工具进行<工作内容>". Examples (in {{target_language}}): -- "1. 使用ToolA工具进行读取用户提供的结构并执行能量计算" -- "2. 使用llm_tool工具进行总结结果并生成报告" +- "使用ToolA工具进行读取用户提供的结构并执行能量计算" +- "使用llm_tool工具进行总结结果并生成报告" Constraints: - Do NOT add extra prefixes/suffixes outside this template. @@ -43,29 +32,16 @@ def get_static_plan_system_block(available_tools_with_info: str) -> str: ### RE-PLANNING LOGIC: If the input contains errors from previous steps, analyze the failure and adjust the current plan (e.g., fix parameters or change tools) to resolve the issue. Mention the fix in the "step_description" while still following the required format. Do not ask the user whether to fix—output the adjusted plan directly. Do not end intro/overall with a question. -### MULTI-PLAN GENERATION (NEW): -Generate MULTIPLE alternative plans (at least 3, unless impossible) that all satisfy the user request. -Each plan MUST use a DIFFERENT tool orchestration strategy (i.e., different tool choices and/or different step ordering). -If there is only one feasible orchestration due to tool constraints, still output multiple plans and clearly explain in each plan's "feasibility" why divergence is not possible. - -Return a JSON structure with the following format: +### OUTPUT FORMAT (UPDATED): +Return ONLY ONE JSON object representing EXACTLY ONE step (not an array; no extra keys; no surrounding text). +Do NOT output "intro", "plans", "plan_description", or "overall". +Do NOT output multiple alternative plans. +Always output exactly this schema: {{ - "intro": , // MUST be in {{target_language}} - "plans": [ - {{ - "plan_id": , - "plan_description": , // MUST be in {{target_language}} and start with "方案 x:" - "steps": [ - {{ - "tool_name": , // Name of the tool to use (exact match from available list). Use null if no suitable tool exists - "step_description": , // MUST be in {{target_language}} and follow STEP_DESCRIPTION FORMAT - "feasibility": , // MUST be in {{target_language}} - "status": "plan" // Always return "plan" - }} - ] - }} - ], - "overall": // MUST be in {{target_language}} + "tool_name": , // Name of the tool to use (exact match from available list). Use null if no suitable tool exists + "step_description": , // MUST be in {{target_language}} and follow STEP_DESCRIPTION FORMAT + "feasibility": , // MUST be in {{target_language}} + "status": "plan" // Always return "plan" }} CRITICAL GUIDELINES: @@ -76,8 +52,6 @@ def get_static_plan_system_block(available_tools_with_info: str) -> str: 5. Use null for tool_name only when no appropriate tool exists in the available tools list 6. Never invent or assume tools - only use tools explicitly listed in the available tools 7. Match tools precisely to requirements - if functionality doesn't align exactly, use null -8. Ensure each plan's steps array represents a complete execution sequence for the request -9. Across different plans, avoid producing identical step lists; vary tooling and/or ordering whenever feasible. EXECUTION PRINCIPLES: - Make sure that the previous steps can provide the input information required for the current step, such as the file URL @@ -85,19 +59,19 @@ def get_static_plan_system_block(available_tools_with_info: str) -> str: - **File URLs should be treated as direct inputs to processing tools - no separate download, parsing, or preparation steps** - **Assume processing tools can handle URLs directly and include all necessary preprocessing capabilities** - **Skip any intermediate file preparation steps - go directly to the core processing task** -- **For multiple structures: Always use one step per structure per operation type (generation → structure1, generation → structure2; retrieval → structure1, retrieval → structure2; etc.)** -- **Maintain strict sequential processing: complete all operations for one structure before moving to the next, or group by operation type across all structures** +- **For multiple structures: Always use one step per structure per operation type** - Prioritize accuracy over assumptions - Maintain logical flow in step sequencing - Ensure step_descriptions clearly communicate purpose - Validate tool compatibility before assignment -### SELF-CHECK (NEW, MUST FOLLOW BEFORE OUTPUT): +### SELF-CHECK (UPDATED, MUST FOLLOW BEFORE OUTPUT): Before returning the final JSON, verify: -- "intro" and "overall" exist and are fully in {{target_language}}. -- Every "plan_description" starts with "方案 x:" where x increments from 1 in the order of the "plans" array. -- Every "step_description" starts with "1." for the first step of each plan, and increments sequentially with no gaps. -- Every "step_description" contains "使用" + (exact tool name or "llm_tool") + "工具进行". +- Output is a SINGLE JSON object (no surrounding text/markdown). +- No keys other than: tool_name, step_description, feasibility, status. +- "status" is exactly "plan". +- "step_description" does NOT need to start with a number. +- "step_description" contains "使用" + (exact tool name or "llm_tool") + "工具进行". - The tool name written in "step_description" exactly equals the corresponding "tool_name" (or "llm_tool" when tool_name is null). - All natural-language fields are fully in {{target_language}}. """ diff --git a/agents/matmaster_agent/flow_agents/plan_make_agent/schema.py b/agents/matmaster_agent/flow_agents/plan_make_agent/schema.py index f7b8bf5e..0ad2c71a 100644 --- a/agents/matmaster_agent/flow_agents/plan_make_agent/schema.py +++ b/agents/matmaster_agent/flow_agents/plan_make_agent/schema.py @@ -1,11 +1,11 @@ -from typing import List, Literal, Optional +from typing import Literal, Optional from pydantic import BaseModel, create_model from agents.matmaster_agent.flow_agents.model import PlanStepStatusEnum -def create_dynamic_multi_plans_schema(available_tools: list): +def create_dynamic_step_schema(available_tools: list): # 动态创建 PlanStepSchema DynamicPlanStepSchema = create_model( 'DynamicPlanStepSchema', @@ -19,22 +19,4 @@ def create_dynamic_multi_plans_schema(available_tools: list): __base__=BaseModel, ) - # 动态创建 PlanSchema - DynamicPlanSchema = create_model( - 'DynamicPlanSchema', - plan_id=(str, ...), - plan_description=(str, ...), - steps=(List[DynamicPlanStepSchema], ...), - __base__=BaseModel, - ) - - # 动态创建 MultiPlansSchema - DynamicMultiPlansSchema = create_model( - 'DynamicMultiPlansSchema', - intro=(str, ...), - plans=(List[DynamicPlanSchema], ...), - overall=(str, ...), - __base__=BaseModel, - ) - - return DynamicMultiPlansSchema + return DynamicPlanStepSchema diff --git a/agents/matmaster_agent/flow_agents/step_utils.py b/agents/matmaster_agent/flow_agents/step_utils.py new file mode 100644 index 00000000..b2f30e40 --- /dev/null +++ b/agents/matmaster_agent/flow_agents/step_utils.py @@ -0,0 +1,22 @@ +from google.adk.agents import InvocationContext + +from agents.matmaster_agent.flow_agents.model import PlanStepStatusEnum +from agents.matmaster_agent.state import ( + CURRENT_STEP, + CURRENT_STEP_STATUS, + CURRENT_STEP_VALIDATION, +) + + +def get_current_step(ctx: InvocationContext): + return ctx.session.state.get(CURRENT_STEP, {}) + + +def get_current_step_validation(ctx: InvocationContext): + return ctx.session.state.get(CURRENT_STEP_VALIDATION, {}) + + +def is_job_submitted_step(ctx: InvocationContext) -> bool: + return ( + get_current_step(ctx).get(CURRENT_STEP_STATUS) == PlanStepStatusEnum.SUBMITTED + ) diff --git a/agents/matmaster_agent/flow_agents/step_validation_agent/prompt.py b/agents/matmaster_agent/flow_agents/step_validation_agent/prompt.py index 47a93c81..3dd3ccb6 100644 --- a/agents/matmaster_agent/flow_agents/step_validation_agent/prompt.py +++ b/agents/matmaster_agent/flow_agents/step_validation_agent/prompt.py @@ -1,4 +1,8 @@ -STEP_VALIDATION_INSTRUCTION = """ +from typing import List + + +def create_step_validation_instruction(alternative_tools: List[str]): + return f""" You are a validation agent responsible for checking if the execution result of a step matches the user's requirements and basic chemical/materials science knowledge. Your task is to analyze: @@ -9,6 +13,9 @@ Based on this analysis, determine if the result is reasonable and matches expectations. +Backup tools you may suggest using if the result is invalid or uncertain: +{alternative_tools} + # Validation Criteria: 1. **Relevance**: Does the result address the step's intended purpose? 2. **Accuracy**: Is the result consistent with basic chemical/materials science knowledge? @@ -17,11 +24,12 @@ # Output Format: You must respond with a JSON object containing: -{ +{{ "is_valid": boolean, // true if result matches requirements and knowledge, false otherwise "reason": "string", // brief explanation of validation result - "confidence": "high|medium|low" // confidence level in the validation -} + "confidence": "high|medium|low", // confidence level in the validation + "suggestion": "string" // actionable suggestion; if invalid/uncertain, suggest fixes or using one of the backup tools above +}} # Important Rules: - If the result contains obvious errors (wrong chemical formulas, impossible physical properties, etc.), mark as invalid diff --git a/agents/matmaster_agent/flow_agents/step_validation_agent/schema.py b/agents/matmaster_agent/flow_agents/step_validation_agent/schema.py index 20cd2b32..a96c4685 100644 --- a/agents/matmaster_agent/flow_agents/step_validation_agent/schema.py +++ b/agents/matmaster_agent/flow_agents/step_validation_agent/schema.py @@ -5,3 +5,4 @@ class StepValidationSchema(BaseModel): is_valid: bool reason: str confidence: str # "high", "medium", "low" + suggestion: str diff --git a/agents/matmaster_agent/flow_agents/utils.py b/agents/matmaster_agent/flow_agents/utils.py index dd97c500..2d8bca1f 100644 --- a/agents/matmaster_agent/flow_agents/utils.py +++ b/agents/matmaster_agent/flow_agents/utils.py @@ -11,6 +11,7 @@ from agents.matmaster_agent.llm_config import MatMasterLlmConfig from agents.matmaster_agent.state import ( BIZ, + CURRENT_STEP, MULTI_PLANS, PLAN, PLAN_CONFIRM, @@ -119,30 +120,14 @@ def check_plan(ctx: InvocationContext): def should_bypass_confirmation(ctx: InvocationContext) -> bool: """Determine whether to skip plan confirmation based on the tools in the plan.""" - plan_steps = ctx.session.state['plan'].get('steps', []) - tool_count = len( - plan_steps - ) # plan steps are `actual_steps` validated by `tool_name` before appended - - # Check if there is exactly one tool in the plan - if tool_count == 1: - # Find the first (and only) tool name - first_tool_name = plan_steps[0].get('tool_name', '') - - # Check if this tool has bypass_confirmation set to True - if ALL_TOOLS.get(first_tool_name, {}).get('bypass_confirmation') is True: - return True - - # TODO: Add more logic here for handling multiple tools in the plan - elif tool_count == 2: - first_tool_name = plan_steps[0].get('tool_name', '') - second_tool_name = plan_steps[1].get('tool_name', '') - - if ( - first_tool_name == 'web-search' - and second_tool_name == 'extract_info_from_webpage' - ): - return True + current_step = ctx.session.state[CURRENT_STEP] + + # Find the first (and only) tool name + first_tool_name = current_step.get('tool_name', '') + + # Check if this tool has bypass_confirmation set to True + if ALL_TOOLS.get(first_tool_name, {}).get('bypass_confirmation') is True: + return True return False diff --git a/agents/matmaster_agent/memory/inject_memory_callback.py b/agents/matmaster_agent/memory/inject_memory_callback.py index 62d98470..b4dedf35 100644 --- a/agents/matmaster_agent/memory/inject_memory_callback.py +++ b/agents/matmaster_agent/memory/inject_memory_callback.py @@ -16,7 +16,7 @@ from agents.matmaster_agent.constant import MATMASTER_AGENT_NAME from agents.matmaster_agent.logger import PrefixFilter from agents.matmaster_agent.services.memory import format_short_term_memory -from agents.matmaster_agent.state import PLAN, STEP_DESCRIPTION +from agents.matmaster_agent.state import CURRENT_STEP, CURRENT_STEP_DESCRIPTION logger = logging.getLogger(__name__) logger.addFilter(PrefixFilter(MATMASTER_AGENT_NAME)) @@ -30,15 +30,10 @@ def _query_from_request_and_state( # Prefer current step description (tool step) so retrieval is relevant to the tool being filled state = getattr(callback_context, 'state', None) if state: - plan = state.get(PLAN) or {} - steps = plan.get('steps', []) - plan_index = state.get('plan_index', 0) - if 0 <= plan_index < len(steps): - desc = steps[plan_index].get(STEP_DESCRIPTION) or steps[plan_index].get( - 'step_description', '' - ) - if desc: - return desc.strip() + current_step = state.get(CURRENT_STEP, {}) + desc = current_step.get(CURRENT_STEP_DESCRIPTION, '') + if desc: + return desc.strip() # Fallback: last text from contents if llm_request.contents: for content in reversed(llm_request.contents): diff --git a/agents/matmaster_agent/services/job.py b/agents/matmaster_agent/services/job.py index ad9a2684..ba286716 100644 --- a/agents/matmaster_agent/services/job.py +++ b/agents/matmaster_agent/services/job.py @@ -362,8 +362,8 @@ async def parse_and_prepare_err( if __name__ == '__main__': print( asyncio.run( - parse_and_prepare_err( - job_id='2e42dde813fa452f99aabde95ccbb7cd', + check_job_create_service( + project_id=os.getenv('MATERIALS_PROJECT_ID'), access_key=os.getenv('MATERIALS_ACCESS_KEY'), ) ) diff --git a/agents/matmaster_agent/state.py b/agents/matmaster_agent/state.py index 7d241be5..4fa3fcc3 100644 --- a/agents/matmaster_agent/state.py +++ b/agents/matmaster_agent/state.py @@ -13,6 +13,16 @@ MULTI_PLANS = 'multi_plans' PLAN_CONFIRM = 'plan_confirm' +CURRENT_STEP = 'current_step' +CURRENT_STEP_STATUS = 'status' +CURRENT_STEP_TOOL_NAME = 'tool_name' +CURRENT_STEP_DESCRIPTION = 'step_description' +CURRENT_STEP_RESULT = 'step_result' +CURRENT_STEP_VALIDATION = 'step_validation' + +HISTORY_STEPS = 'history_steps' +FINISHED_STATE = 'finished_state' + # Other Key STEP_DESCRIPTION = 'step_description' diff --git a/agents/matmaster_agent/sub_agents/built_in_agent/llm_tool_agent/agent.py b/agents/matmaster_agent/sub_agents/built_in_agent/llm_tool_agent/agent.py index 54e99590..a3ccac4c 100644 --- a/agents/matmaster_agent/sub_agents/built_in_agent/llm_tool_agent/agent.py +++ b/agents/matmaster_agent/sub_agents/built_in_agent/llm_tool_agent/agent.py @@ -9,8 +9,13 @@ DisallowTransferAndContentLimitLlmAgent, ) from agents.matmaster_agent.flow_agents.model import PlanStepStatusEnum +from agents.matmaster_agent.flow_agents.step_utils import get_current_step from agents.matmaster_agent.llm_config import LLMConfig -from agents.matmaster_agent.locales import i18n +from agents.matmaster_agent.state import ( + CURRENT_STEP, + CURRENT_STEP_STATUS, + CURRENT_STEP_TOOL_NAME, +) from agents.matmaster_agent.sub_agents.built_in_agent.llm_tool_agent.constant import ( TOOL_AGENT_NAME, ) @@ -33,19 +38,13 @@ async def _run_events(self, ctx: InvocationContext) -> AsyncGenerator[Event, Non async for event in super()._run_events(ctx): yield event - update_plan = copy.deepcopy(ctx.session.state['plan']) - update_plan['steps'][ctx.session.state['plan_index']][ - 'status' - ] = PlanStepStatusEnum.SUCCESS - yield update_state_event(ctx, state_delta={'plan': update_plan}) - - current_step = ctx.session.state['plan']['steps'][ - ctx.session.state['plan_index'] - ] - current_step_tool_name = current_step['tool_name'] + post_execution_step = copy.deepcopy(get_current_step(ctx)) + post_execution_step[CURRENT_STEP_STATUS] = PlanStepStatusEnum.SUCCESS + yield update_state_event(ctx, state_delta={CURRENT_STEP: post_execution_step}) + current_step_tool_name = post_execution_step[CURRENT_STEP_TOOL_NAME] step_title = ctx.session.state.get('step_title', {}).get( 'title', - f"{i18n.t(ctx.session.state['separate_card_info'])} {ctx.session.state['plan_index'] + 1}: {current_step_tool_name}", + current_step_tool_name, ) for matmaster_flow_event in context_function_event( ctx, diff --git a/agents/matmaster_agent/utils/event_utils.py b/agents/matmaster_agent/utils/event_utils.py index 680a3977..b6c7996d 100644 --- a/agents/matmaster_agent/utils/event_utils.py +++ b/agents/matmaster_agent/utils/event_utils.py @@ -27,13 +27,20 @@ ContentLimitLlmAgent, ) from agents.matmaster_agent.flow_agents.model import PlanStepStatusEnum +from agents.matmaster_agent.flow_agents.step_utils import get_current_step from agents.matmaster_agent.flow_agents.style import separate_card from agents.matmaster_agent.llm_config import DEFAULT_MODEL, MatMasterLlmConfig from agents.matmaster_agent.locales import i18n from agents.matmaster_agent.model import RenderTypeEnum from agents.matmaster_agent.prompt import GLOBAL_INSTRUCTION from agents.matmaster_agent.services.session_files import insert_session_files -from agents.matmaster_agent.state import ERROR_DETAIL, PLAN, UPLOAD_FILE +from agents.matmaster_agent.state import ( + CURRENT_STEP, + CURRENT_STEP_STATUS, + ERROR_DETAIL, + PLAN, + UPLOAD_FILE, +) from agents.matmaster_agent.style import ( no_found_structure_card, photon_consume_free_card, @@ -474,11 +481,9 @@ def handle_tool_error(ctx, author, error_message, error_type): ) # 更新 plan 状态为失败 - update_plan = copy.deepcopy(ctx.session.state['plan']) - update_plan['steps'][ctx.session.state['plan_index']][ - 'status' - ] = PlanStepStatusEnum.FAILED - yield update_state_event(ctx, state_delta={'plan': update_plan}) + post_execution_step = copy.deepcopy(get_current_step(ctx)) + post_execution_step[CURRENT_STEP_STATUS] = PlanStepStatusEnum.FAILED + yield update_state_event(ctx, state_delta={CURRENT_STEP: post_execution_step}) # 抛出相应的异常 raise RuntimeError(f'Tool Execution Error: {error_type}') @@ -507,13 +512,13 @@ async def display_failed_result_or_consume( yield event else: # 更新 plan 为成功 - update_plan = copy.deepcopy(ctx.session.state['plan']) + post_execution_step = copy.deepcopy(ctx.session.state[CURRENT_STEP]) if not dict_result.get('job_id'): status = PlanStepStatusEnum.SUCCESS # real-time else: status = PlanStepStatusEnum.SUBMITTED # job-type - update_plan['steps'][ctx.session.state['plan_index']]['status'] = status - yield update_state_event(ctx, state_delta={'plan': update_plan}) + post_execution_step['status'] = status + yield update_state_event(ctx, state_delta={CURRENT_STEP: post_execution_step}) if USE_PHOTON: async for consume_event in photon_consume_event(ctx, event, author): diff --git a/agents/matmaster_agent/utils/helper_func.py b/agents/matmaster_agent/utils/helper_func.py index 914dd63b..f4dfc984 100644 --- a/agents/matmaster_agent/utils/helper_func.py +++ b/agents/matmaster_agent/utils/helper_func.py @@ -17,6 +17,7 @@ from agents.matmaster_agent.constant import FRONTEND_STATE_KEY, MATMASTER_AGENT_NAME from agents.matmaster_agent.flow_agents.model import PlanStepStatusEnum from agents.matmaster_agent.logger import PrefixFilter +from agents.matmaster_agent.state import CURRENT_STEP logger = logging.getLogger(__name__) logger.addFilter(PrefixFilter(MATMASTER_AGENT_NAME)) @@ -184,7 +185,7 @@ def get_new_function_call_indices( def get_current_step_function_call(current_function_calls, ctx): - current_step = ctx.state['plan']['steps'][ctx.state['plan_index']] + current_step = ctx.state[CURRENT_STEP] current_step_tool_name, current_step_satus = ( current_step['tool_name'], current_step['status'], @@ -210,9 +211,7 @@ def manual_build_current_function_call(callback_context: CallbackContext): logger.warning( f'{callback_context.session.id} current_function_calls empty, manually build one' ) - current_step = callback_context.state['plan']['steps'][ - callback_context.state['plan_index'] - ] + current_step = callback_context.state[CURRENT_STEP] function_call_id = f"added_{str(uuid.uuid4()).replace('-', '')[:24]}" current_function_calls = [ {