Skip to content
6 changes: 6 additions & 0 deletions redbox/redbox/api/format.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ def format_mcp_tool_response(tool_response, creator_type: ChunkCreatorType) -> t
deep_links = []
match result_type:
case "nullable":
if isinstance(result, str):
return result, metadata

deep_links = [(result.get("url"), result)]
case "paged":
deep_links = [(p.get("url"), p) for p in result.get("items", [])]
Expand Down Expand Up @@ -99,4 +102,7 @@ def format_mcp_tool_response(tool_response, creator_type: ChunkCreatorType) -> t
else:
response.append(json.dumps(item))

if not response:
return ("No results found.", metadata)

return ("\n\n".join(response), metadata)
50 changes: 31 additions & 19 deletions redbox/redbox/graph/nodes/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,37 +699,49 @@ def local_loop_condition():
result = "Tool error: no results received."

elif has_loop and len(ai_msg.tool_calls) > 0: # if loop, we need to transform results
result = result[-1].content # this is a tuple
# format of result: (result, success, is_intermediate_step)
log.warning("my-overall-result")
log.warning(result)
result_content = result[0]
success = result[1]
is_intermediate_step = eval(result[2])
collated_result = ""
feedback_reasons = []
for i, r in enumerate(result):
current_result = r.content # this is a tuple
# format of result: (result, success, is_intermediate_step)
log.warning("my-overall-result")
log.warning(current_result)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe unneeded as warning but unsure and super minor

Suggested change
log.warning("my-overall-result")
log.warning(current_result)
log.debug("my-overall-result")
log.debug(current_result)

result_content = current_result[0]
success = current_result[1]
is_intermediate_step = eval(current_result[2])

if len(current_result) > 3:
reason = current_result[3]
feedback_reasons.append(f"Failure reason: {reason}.\n\n{result_content}")
else:
collated_result += f"<tool_result_{i}>{result_content}</tool_result_{i}>"

if success == "fail":
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not bool? Should it be? Unsure if there is a non pass/fail hence why String

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this is supposed to be string - this follows the same logic as build_agent_with_loop for datahub agent

think the initial reason it was set to string so could be put in AIMessage (which is what run_tools_parallel returns), however, in future want to refactor run_tools_parallel so can leverage more structured return types so dont need these poorly typed strings

# pass error back if any
additional_variables.update({"previous_tool_error": result_content})
else:
# if success tool invocation, and intermediate steps then pass info back
if is_intermediate_step:
additional_variables.update(
{"previous_tool_error": "", "previous_tool_results": all_results}
)

if len(result) > 3:
reason = result[3]
if feedback_reasons:
combined_feedback = "\n\n".join(feedback_reasons)
return {
"agents_results": {
task.id: AIMessage(
content=f"<{agent_name}_Result>Ask user for feedback based on failure reason. Failure reason: {reason}.\n\n{result_content}</{agent_name}_Result>",
content=f"<{agent_name}_Result>Ask user for feedback based on failure reason. {combined_feedback}\n\n{collated_result}</{agent_name}_Result>",
kwargs={
"reason": reason,
"reason": combined_feedback,
},
)
},
"tasks_evaluator": task.task + "\n" + task.expected_output,
"agent_plans": state.agent_plans.update_task_status(task.id, TaskStatus.REQUIRES_USER_FEEDBACK),
}

if success == "fail":
# pass error back if any
additional_variables.update({"previous_tool_error": result_content})
else:
# if success tool invocation, and intermediate steps then pass info back
if is_intermediate_step:
additional_variables.update({"previous_tool_error": "", "previous_tool_results": all_results})
result = result_content
result = collated_result

if isinstance(result, str):
log.warning(f"{log_stub} Using raw string result.")
Expand Down
32 changes: 17 additions & 15 deletions redbox/redbox/graph/nodes/sends.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,24 +127,23 @@ def wrap_async_tool(tool, tool_name):
"""

def wrapper(args):
# Create a new event loop for this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

# get mcp tool url
mcp_url = tool.metadata["url"]
creator_type = tool.metadata["creator_type"]
sso_access_token = tool.metadata["sso_access_token"].get()

try:
sso_access_token = tool.metadata["sso_access_token"].get()
except Exception as e:
log.error(f"wrap_async_tool - Failed to retrieve sso_access_token: {e}")
raise

if not sso_access_token:
log.error("wrap_async_tool - MCP sso_access_token is None")

headers = _get_mcp_headers(sso_access_token)

try:
# Define the async operation
async def run_tool():
# tool need to be executed within the connection context manager
async def run_tool():
try:
async with streamablehttp_client(mcp_url, headers=headers or None) as (
read,
write,
Expand All @@ -168,7 +167,7 @@ async def run_tool():
raise ValueError(f"tool with name '{tool_name}' not found")

# remove intermediate step argument if it is not required by tool
if "is_intermediate_step" not in selected_tool.args_schema["required"] and args.get(
if "is_intermediate_step" not in selected_tool.args_schema.get("required", []) and args.get(
"is_intermediate_step"
):
args.pop("is_intermediate_step")
Expand All @@ -193,12 +192,15 @@ async def run_tool():
f"wrap_async_tool - Returning raw MCP tool response for creator_type='{creator_type}'"
)
return result
except Exception as e:
log.error(f"wrap_async_tool - Failed to connect to MCP server at '{mcp_url}': {e}")
raise

# Run the async function and return its result
return loop.run_until_complete(run_tool())
finally:
# Clean up resources
loop.close()
try:
return asyncio.run(run_tool())
except Exception as e:
log.error(f"wrap_async_tool - Unhandled error running tool '{tool_name}': {e}", exc_info=True)
raise

return wrapper

Expand Down
Loading