from dotenv import load_dotenv
load_dotenv(".env", override=True)
%load_ext autoreload
%autoreload 2The autoreload extension is already loaded. To reload it, use:
%reload_ext autoreload
연구를 위한 딥 에이전트
개요
이제 우리가 배운 모든 것을 함께 사용할 수 있습니다:
- 작업을 추적하기 위해 할 일(TODOs) 을 사용할 것입니다.
- 원본 도구 호출 결과를 저장하기 위해 파일 을 사용할 것입니다.
- 컨텍스트 격리를 위해 연구 작업을 하위 에이전트에 위임 할 것입니다.
검색 도구
우리는 원본 콘텐츠를 파일로 오프로드하고 요약만 에이전트에게 반환하는 검색 도구를 구축할 것입니다. 이것은 장기 실행 에이전트 궤적에 대한 일반적인 패턴입니다. Manus에서 우리가 본 것처럼요!
핵심 구성 요소
-
검색 실행 (
run_tavily_search): Tavily API를 사용하여 실제 웹 검색을 수행하며, 결과 개수 및 주제 필터링에 대해 구성 가능한 매개변수를 사용합니다. -
콘텐츠 요약 (
summarize_webpage_content): 경량 모델(GPT-4o-mini)을 사용하여 웹페이지 콘텐츠의 구조화된 요약을 생성하며, 설명적인 파일명과 핵심 학습 요약을 모두 생성합니다. -
결과 처리 (
process_search_results): HTTP를 통해 전체 웹페이지 콘텐츠를 가져오고markdownify를 사용하여 HTML을 마크다운으로 변환한 후 각 결과에 대한 요약을 생성합니다. -
컨텍스트 오프로딩 (
tavily_search도구): 다음을 수행하는 주요 도구입니다:- 검색 실행 및 결과 처리
- 전체 콘텐츠를 에이전트 상태의 파일에 저장 (컨텍스트 오프로딩)
- 최소한의 요약만 에이전트에게 반환 (컨텍스트 오염 방지)
- LangGraph의
Command를 사용하여 파일과 메시지를 모두 업데이트
-
전략적 사고 (
think_tool): 에이전트가 발견 사항을 분석하고, 차이를 평가하며, 연구 워크플로우의 다음 단계를 계획할 수 있도록 하는 구조화된 반성 메커니즘을 제공합니다.
이 아키텍처는 상세한 검색 결과를 파일에 저장하면서 에이전트의 작업 컨텍스트를 최소한으로 유지하고 집중하게 함으로써 토큰 효율성 문제를 해결합니다.
%%writefile ./src/deep_agents_from_scratch/research_tools.py
"""연구 도구.
이 모듈은 웹 검색 기능 및 콘텐츠 요약 도구를 포함한
연구 에이전트를 위한 검색 및 콘텐츠 처리 유틸리티를 제공합니다.
"""
import os
from datetime import datetime
import uuid, base64
import httpx
from langchain.chat_models import init_chat_model
from langchain_core.messages import HumanMessage, ToolMessage
from langchain_core.tools import InjectedToolArg, InjectedToolCallId, tool
from langgraph.prebuilt import InjectedState
from langgraph.types import Command
from markdownify import markdownify
from pydantic import BaseModel, Field
from tavily import TavilyClient
from typing import Annotated, Literal
from .prompts import SUMMARIZE_WEB_SEARCH
from .state import DeepAgentState
# 요약 모델
summarization_model = init_chat_model(model="openai:gpt-4o-mini")
tavily_client = TavilyClient()
class Summary(BaseModel):
"""웹페이지 콘텐츠 요약을 위한 스키마."""
filename: str = Field(description="저장할 파일의 이름.")
summary: str = Field(description="웹페이지에서 얻은 핵심 학습.")
def get_today_str() -> str:
"""현재 날짜를 사람이 읽기 쉬운 형식으로 반환합니다."""
return datetime.now().strftime("%a %b %-d, %Y")
def run_tavily_search(
search_query: str,
max_results: int = 1,
topic: Literal["general", "news", "finance"] = "general",
include_raw_content: bool = True,
) -> dict:
"""단일 쿼리에 대해 Tavily API를 사용하여 검색을 수행합니다.
Args:
search_query: 실행할 검색 쿼리
max_results: 쿼리당 최대 결과 수
topic: 검색 결과에 대한 주제 필터
include_raw_content: 원본 웹페이지 콘텐츠 포함 여부
Returns:
검색 결과 딕셔너리
"""
result = tavily_client.search(
search_query,
max_results=max_results,
include_raw_content=include_raw_content,
topic=topic,
)
return result
def summarize_webpage_content(webpage_content: str) -> Summary:
"""구성된 요약 모델을 사용하여 웹페이지 콘텐츠를 요약합니다.
Args:
webpage_content: 요약할 원본 웹페이지 콘텐츠
Returns:
파일명과 요약이 포함된 Summary 객체
"""
try:
# 요약을 위한 구조화된 출력 모델 설정
structured_model = summarization_model.with_structured_output(Summary)
# 요약 생성
summary_and_filename = structured_model.invoke(
[
HumanMessage(
content=SUMMARIZE_WEB_SEARCH.format(
webpage_content=webpage_content, date=get_today_str()
)
)
]
)
return summary_and_filename
except Exception:
# 실패 시 기본 요약 객체 반환
return Summary(
filename="search_result.md",
summary=webpage_content[:1000] + "..."
if len(webpage_content) > 1000
else webpage_content,
)
def process_search_results(results: dict) -> list[dict]:
"""사용 가능한 곳에서 콘텐츠를 요약하여 검색 결과를 처리합니다.
Args:
results: Tavily 검색 결과 딕셔너리
Returns:
요약이 포함된 처리된 결과 리스트
"""
processed_results = []
# HTTP 요청을 위한 클라이언트 생성
HTTPX_CLIENT = httpx.Client()
for result in results.get("results", []):
# URL 가져오기
url = result["url"]
# URL 읽기
response = HTTPX_CLIENT.get(url)
if response.status_code == 200:
# HTML을 마크다운으로 변환
raw_content = markdownify(response.text)
summary_obj = summarize_webpage_content(raw_content)
else:
# Tavily의 생성된 요약 사용
raw_content = result.get("raw_content", "")
summary_obj = Summary(
filename="URL_error.md",
summary=result.get(
"content", "URL을 읽을 수 없습니다. 다른 검색을 시도하세요."
),
)
# 파일 이름 중복 제거
uid = (
base64.urlsafe_b64encode(uuid.uuid4().bytes)
.rstrip(b"=")
.decode("ascii")[:8]
)
name, ext = os.path.splitext(summary_obj.filename)
summary_obj.filename = f"{name}_{uid}{ext}"
processed_results.append(
{
"url": result["url"],
"title": result["title"],
"summary": summary_obj.summary,
"filename": summary_obj.filename,
"raw_content": raw_content,
}
)
return processed_results
@tool(parse_docstring=True)
def tavily_search(
query: str,
state: Annotated[DeepAgentState, InjectedState],
tool_call_id: Annotated[str, InjectedToolCallId],
max_results: Annotated[int, InjectedToolArg] = 1,
topic: Annotated[
Literal["general", "news", "finance"], InjectedToolArg
] = "general",
) -> Command:
"""웹을 검색하고 상세한 결과를 파일에 저장하며 최소한의 컨텍스트만 반환합니다.
웹 검색을 수행하고 전체 콘텐츠를 파일에 저장하여 컨텍스트를 오프로드합니다.
에이전트가 다음 단계를 결정하는 데 도움이 되는 필수 정보만 반환합니다.
Args:
query: 실행할 검색 쿼리
state: 파일 저장을 위한 주입된 에이전트 상태
tool_call_id: 주입된 도구 호출 식별자
max_results: 반환할 최대 결과 수 (기본값: 1)
topic: 주제 필터 - 'general', 'news', 또는 'finance' (기본값: 'general')
Returns:
전체 결과를 파일에 저장하고 최소한의 요약을 제공하는 Command
"""
# 검색 실행
search_results = run_tavily_search(
query,
max_results=max_results,
topic=topic,
include_raw_content=True,
)
# 결과 처리 및 요약
processed_results = process_search_results(search_results)
# 각 결과를 파일에 저장하고 요약 준비
files = state.get("files", {})
saved_files = []
summaries = []
for i, result in enumerate(processed_results):
# 요약에서 생성된 AI 파일명 사용
filename = result["filename"]
# 전체 세부 정보가 포함된 파일 콘텐츠 생성
file_content = f"""# 검색 결과: {result["title"]}
**URL:** {result["url"]}
**쿼리:** {query}
**날짜:** {get_today_str()}
## 요약
{result["summary"]}
## 원본 콘텐츠
{result["raw_content"] if result["raw_content"] else "사용 가능한 원본 콘텐츠 없음"}
"""
files[filename] = file_content
saved_files.append(filename)
summaries.append(f"- {filename}: {result['summary']}...")
# 도구 메시지를 위한 최소한의 요약 생성 - 수집된 내용에 초점
summary_text = f"""🔍 '{query}'에 대해 {len(processed_results)}개의 결과를 찾았습니다:
{chr(10).join(summaries)}
파일: {", ".join(saved_files)}
💡 필요할 때 read_file()을 사용하여 전체 세부 정보에 접근하세요."""
return Command(
update={
"files": files,
"messages": [ToolMessage(summary_text, tool_call_id=tool_call_id)],
}
)
@tool(parse_docstring=True)
def think_tool(reflection: str) -> str:
"""연구 진행 및 의사결정에 대한 전략적 반성을 위한 도구.
각 검색 후 이 도구를 사용하여 결과를 분석하고 다음 단계를 체계적으로 계획합니다.
이것은 고품질의 의사결정을 위해 연구 워크플로우에서 의도적인 일시 정지를 만듭니다.
사용 시기:
- 검색 결과를 받은 후: 어떤 핵심 정보를 찾았는가?
- 다음 단계를 결정하기 전에: 포괄적으로 답변할 충분한 정보가 있는가?
- 연구 격차를 평가할 때: 여전히 어떤 구체적인 정보가 필요한가?
- 연구를 결론 짓기 전에: 지금 완벽한 답변을 제공할 수 있는가?
- 질문의 복잡도: 검색 제한 횟수에 도달했는가?
반성은 다음을 다루어야 합니다:
1. 현재 발견의 분석 - 구체적으로 어떤 정보를 수집했는가?
2. 격차 평가 - 여전히 어떤 중요한 정보가 누락되었는가?
3. 품질 평가 - 좋은 답변을 위해 충분한 증거/예시가 있는가?
4. 전략적 결정 - 계속 검색해야 하는가 아니면 답변을 제공해야 하는가?
Args:
reflection: 연구 진행, 발견, 격차 및 다음 단계에 대한 상세한 반성
Returns:
의사결정을 위해 반성이 기록되었음을 확인
"""
return f"반성이 기록되었습니다: {reflection}"Overwriting ./src/deep_agents_from_scratch/research_tools.py
딥 에이전트
이제 우리는 이전의 모든 학습을 적용할 수 있습니다:
- 연구자에게
think_tool과 위의search_tool을 제공할 것입니다. - 부모 에이전트에게 파일 도구,
think_tool, 그리고task도구를 제공할 것입니다.
from datetime import datetime
from src.deep_agents_from_scratch.file_tools import ls, read_file, write_file
from src.deep_agents_from_scratch.prompts import (
RESEARCHER_INSTRUCTIONS,
SUBAGENT_USAGE_INSTRUCTIONS,
)
from src.deep_agents_from_scratch.research_tools import (
get_today_str,
tavily_search,
think_tool,
)
from src.deep_agents_from_scratch.state import DeepAgentState
from src.deep_agents_from_scratch.task_tool import _create_task_tool
from src.deep_agents_from_scratch.todo_tools import read_todos, write_todos
from utils import show_prompt
from langchain.chat_models import init_chat_model
# create_react_agent를 직접 사용하여 에이전트 생성
model = init_chat_model(model="anthropic:claude-sonnet-4-20250514", temperature=0.0)
# 제한
max_concurrent_research_units = 3
max_researcher_iterations = 3
# 도구
sub_agent_tools = [tavily_search, think_tool]
built_in_tools = [ls, read_file, write_file, write_todos, read_todos, think_tool]
# 연구 하위 에이전트 생성
research_sub_agent = {
"name": "research-agent",
"description": "연구를 하위 에이전트 연구자에게 위임합니다. 한 번에 이 연구자에게 하나의 주제만 제공하세요.",
"prompt": RESEARCHER_INSTRUCTIONS.format(date=get_today_str()),
"tools": ["tavily_search", "think_tool"],
}
# 하위 에이전트에 작업을 위임할 task 도구 생성
task_tool = _create_task_tool(
sub_agent_tools, [research_sub_agent], model, DeepAgentState
)
delegation_tools = [task_tool]
all_tools = (
sub_agent_tools + built_in_tools + delegation_tools
) # 간단한 경우를 위해 주 에이전트가 검색을 사용할 수 있음
# 프롬프트 구축
SUBAGENT_INSTRUCTIONS = SUBAGENT_USAGE_INSTRUCTIONS.format(
max_concurrent_research_units=max_concurrent_research_units,
max_researcher_iterations=max_researcher_iterations,
date=datetime.now().strftime("%a %b %-d, %Y"),
)
show_prompt(RESEARCHER_INSTRUCTIONS)╭──────────────────────────────────────────────────── Prompt ─────────────────────────────────────────────────────╮ │ │ │ You are a research assistant conducting research on the user's input topic. For context, today's date is │ │ {date}. │ │ │ │ <Task> │ │ Your job is to use tools to gather information about the user's input topic. │ │ You can use any of the tools provided to you to find resources that can help answer the research question. │ │ You can call these tools in series or in parallel, your research is conducted in a tool-calling loop. │ │ </Task> │ │ │ │ <Available Tools> │ │ You have access to two main tools: │ │ 1. **tavily_search**: For conducting web searches to gather information │ │ 2. **think_tool**: For reflection and strategic planning during research │ │ │ │ **CRITICAL: Use think_tool after each search to reflect on results and plan next steps** │ │ </Available Tools> │ │ │ │ <Instructions> │ │ Think like a human researcher with limited time. Follow these steps: │ │ │ │ 1. **Read the question carefully** - What specific information does the user need? │ │ 2. **Start with broader searches** - Use broad, comprehensive queries first │ │ 3. **After each search, pause and assess** - Do I have enough to answer? What's still missing? │ │ 4. **Execute narrower searches as you gather information** - Fill in the gaps │ │ 5. **Stop when you can answer confidently** - Don't keep searching for perfection │ │ </Instructions> │ │ │ │ <Hard Limits> │ │ **Tool Call Budgets** (Prevent excessive searching): │ │ - **Simple queries**: Use 1-2 search tool calls maximum │ │ - **Normal queries**: Use 2-3 search tool calls maximum │ │ - **Very Complex queries**: Use up to 5 search tool calls maximum │ │ - **Always stop**: After 5 search tool calls if you cannot find the right sources │ │ │ │ **Stop Immediately When**: │ │ - You can answer the user's question comprehensively │ │ - You have 3+ relevant examples/sources for the question │ │ - Your last 2 searches returned similar information │ │ </Hard Limits> │ │ │ │ <Show Your Thinking> │ │ After each search tool call, use think_tool to analyze the results: │ │ - What key information did I find? │ │ - What's missing? │ │ - Do I have enough to answer the question comprehensively? │ │ - Should I search more or provide my answer? │ │ </Show Your Thinking> │ │ │ │ │ ╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
from src.deep_agents_from_scratch.prompts import (
FILE_USAGE_INSTRUCTIONS,
RESEARCHER_INSTRUCTIONS,
SUBAGENT_USAGE_INSTRUCTIONS,
TODO_USAGE_INSTRUCTIONS,
)
INSTRUCTIONS = (
"# TODO MANAGEMENT\n"
+ TODO_USAGE_INSTRUCTIONS
+ "\n\n"
+ "=" * 80
+ "\n\n"
+ "# FILE SYSTEM USAGE\n"
+ FILE_USAGE_INSTRUCTIONS
+ "\n\n"
+ "=" * 80
+ "\n\n"
+ "# SUB-AGENT DELEGATION\n"
+ SUBAGENT_INSTRUCTIONS
)
show_prompt(INSTRUCTIONS)╭──────────────────────────────────────────────────── Prompt ─────────────────────────────────────────────────────╮ │ │ │ # TODO MANAGEMENT │ │ Based upon the user's request: │ │ 1. Use the write_todos tool to create TODO at the start of a user request, per the tool description. │ │ 2. After you accomplish a TODO, use the read_todos to read the TODOs in order to remind yourself of the plan. │ │ 3. Reflect on what you've done and the TODO. │ │ 4. Mark you task as completed, and proceed to the next TODO. │ │ 5. Continue this process until you have completed all TODOs. │ │ │ │ IMPORTANT: Always create a research plan of TODOs and conduct research following the above guidelines for ANY │ │ user request. │ │ IMPORTANT: Aim to batch research tasks into a *single TODO* in order to minimize the number of TODOs you have │ │ to keep track of. │ │ │ │ │ │ ================================================================================ │ │ │ │ # FILE SYSTEM USAGE │ │ You have access to a virtual file system to help you retain and save context. │ │ │ │ ## Workflow Process │ │ 1. **Orient**: Use ls() to see existing files before starting work │ │ 2. **Save**: Use write_file() to store the user's request so that we can keep it for later │ │ 3. **Research**: Proceed with research. The search tool will write files. │ │ 4. **Read**: Once you are satisfied with the collected sources, read the files and use them to answer the │ │ user's question directly. │ │ │ │ │ │ ================================================================================ │ │ │ │ # SUB-AGENT DELEGATION │ │ You can delegate tasks to sub-agents. │ │ │ │ <Task> │ │ Your role is to coordinate research by delegating specific research tasks to sub-agents. │ │ </Task> │ │ │ │ <Available Tools> │ │ 1. **task(description, subagent_type)**: Delegate research tasks to specialized sub-agents │ │ - description: Clear, specific research question or task │ │ - subagent_type: Type of agent to use (e.g., "research-agent") │ │ 2. **think_tool(reflection)**: Reflect on the results of each delegated task and plan next steps. │ │ - reflection: Your detailed reflection on the results of the task and next steps. │ │ │ │ **PARALLEL RESEARCH**: When you identify multiple independent research directions, make multiple **task** │ │ tool calls in a single response to enable parallel execution. Use at most 3 parallel agents per iteration. │ │ </Available Tools> │ │ │ │ <Hard Limits> │ │ **Task Delegation Budgets** (Prevent excessive delegation): │ │ - **Bias towards focused research** - Use single agent for simple questions, multiple only when clearly │ │ beneficial or when you have multiple independent research directions based on the user's request. │ │ - **Stop when adequate** - Don't over-research; stop when you have sufficient information │ │ - **Limit iterations** - Stop after 3 task delegations if you haven't found adequate sources │ │ </Hard Limits> │ │ │ │ <Scaling Rules> │ │ **Simple fact-finding, lists, and rankings** can use a single sub-agent: │ │ - *Example*: "List the top 10 coffee shops in San Francisco" → Use 1 sub-agent, store in │ │ `findings_coffee_shops.md` │ │ │ │ **Comparisons** can use a sub-agent for each element of the comparison: │ │ - *Example*: "Compare OpenAI vs. Anthropic vs. DeepMind approaches to AI safety" → Use 3 sub-agents │ │ - Store findings in separate files: `findings_openai_safety.md`, `findings_anthropic_safety.md`, │ │ `findings_deepmind_safety.md` │ │ │ │ **Multi-faceted research** can use parallel agents for different aspects: │ │ - *Example*: "Research renewable energy: costs, environmental impact, and adoption rates" → Use 3 sub-agents │ │ - Organize findings by aspect in separate files │ │ │ │ **Important Reminders:** │ │ - Each **task** call creates a dedicated research agent with isolated context │ │ - Sub-agents can't see each other's work - provide complete standalone instructions │ │ - Use clear, specific language - avoid acronyms or abbreviations in task descriptions │ │ </Scaling Rules> │ │ │ ╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
from IPython.display import Image
from langchain.agents import create_agent
# Create agent
agent = create_agent(
model,
all_tools,
system_prompt=INSTRUCTIONS,
state_schema=DeepAgentState,
)
# Show the agent
display(Image(agent.get_graph(xray=True).draw_mermaid_png()))
from utils import format_messages
result = agent.invoke(
{
"messages": [
{
"role": "user",
"content": "Model Context Protocol (MCP)에 대한 개요를 제시해주세요.",
}
],
}
)
format_messages(result["messages"])---------------------------------------------------------------------------
ConnectTimeout Traceback (most recent call last)
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/httpx/_transports/default.py:101, in map_httpcore_exceptions()
100 try:
--> 101 yield
102 except Exception as exc:
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/httpx/_transports/default.py:250, in HTTPTransport.handle_request(self, request)
249 with map_httpcore_exceptions():
--> 250 resp = self._pool.handle_request(req)
252 assert isinstance(resp.stream, typing.Iterable)
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/httpcore/_sync/connection_pool.py:256, in ConnectionPool.handle_request(self, request)
255 self._close_connections(closing)
--> 256 raise exc from None
258 # Return the response. Note that in this case we still have to manage
259 # the point at which the response is closed.
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/httpcore/_sync/connection_pool.py:236, in ConnectionPool.handle_request(self, request)
234 try:
235 # Send the request on the assigned connection.
--> 236 response = connection.handle_request(
237 pool_request.request
238 )
239 except ConnectionNotAvailable:
240 # In some cases a connection may initially be available to
241 # handle a request, but then become unavailable.
242 #
243 # In this case we clear the connection and try again.
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/httpcore/_sync/connection.py:101, in HTTPConnection.handle_request(self, request)
100 self._connect_failed = True
--> 101 raise exc
103 return self._connection.handle_request(request)
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/httpcore/_sync/connection.py:78, in HTTPConnection.handle_request(self, request)
77 if self._connection is None:
---> 78 stream = self._connect(request)
80 ssl_object = stream.get_extra_info("ssl_object")
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/httpcore/_sync/connection.py:124, in HTTPConnection._connect(self, request)
123 with Trace("connect_tcp", logger, request, kwargs) as trace:
--> 124 stream = self._network_backend.connect_tcp(**kwargs)
125 trace.return_value = stream
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/httpcore/_backends/sync.py:207, in SyncBackend.connect_tcp(self, host, port, timeout, local_address, socket_options)
202 exc_map: ExceptionMapping = {
203 socket.timeout: ConnectTimeout,
204 OSError: ConnectError,
205 }
--> 207 with map_exceptions(exc_map):
208 sock = socket.create_connection(
209 address,
210 timeout,
211 source_address=source_address,
212 )
File /usr/local/lib/python3.13/contextlib.py:162, in _GeneratorContextManager.__exit__(self, typ, value, traceback)
161 try:
--> 162 self.gen.throw(value)
163 except StopIteration as exc:
164 # Suppress StopIteration *unless* it's the same exception that
165 # was passed to throw(). This prevents a StopIteration
166 # raised inside the "with" statement from being suppressed.
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/httpcore/_exceptions.py:14, in map_exceptions(map)
13 if isinstance(exc, from_exc):
---> 14 raise to_exc(exc) from exc
15 raise
ConnectTimeout: timed out
The above exception was the direct cause of the following exception:
ConnectTimeout Traceback (most recent call last)
Cell In[15], line 4
1 from utils import format_messages
----> 4 result = agent.invoke(
5 {
6 "messages": [
7 {
8 "role": "user",
9 "content": "Model Context Protocol (MCP)에 대한 개요를 제시해주세요.",
10 }
11 ],
12 }
13 )
15 format_messages(result["messages"])
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langgraph/pregel/main.py:3094, in Pregel.invoke(self, input, config, context, stream_mode, print_mode, output_keys, interrupt_before, interrupt_after, durability, **kwargs)
3091 chunks: list[dict[str, Any] | Any] = []
3092 interrupts: list[Interrupt] = []
-> 3094 for chunk in self.stream(
3095 input,
3096 config,
3097 context=context,
3098 stream_mode=["updates", "values"]
3099 if stream_mode == "values"
3100 else stream_mode,
3101 print_mode=print_mode,
3102 output_keys=output_keys,
3103 interrupt_before=interrupt_before,
3104 interrupt_after=interrupt_after,
3105 durability=durability,
3106 **kwargs,
3107 ):
3108 if stream_mode == "values":
3109 if len(chunk) == 2:
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langgraph/pregel/main.py:2679, in Pregel.stream(self, input, config, context, stream_mode, print_mode, output_keys, interrupt_before, interrupt_after, durability, subgraphs, debug, **kwargs)
2677 for task in loop.match_cached_writes():
2678 loop.output_writes(task.id, task.writes, cached=True)
-> 2679 for _ in runner.tick(
2680 [t for t in loop.tasks.values() if not t.writes],
2681 timeout=self.step_timeout,
2682 get_waiter=get_waiter,
2683 schedule_task=loop.accept_push,
2684 ):
2685 # emit output
2686 yield from _output(
2687 stream_mode, print_mode, subgraphs, stream.get, queue.Empty
2688 )
2689 loop.after_tick()
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langgraph/pregel/_runner.py:167, in PregelRunner.tick(self, tasks, reraise, timeout, retry_policy, get_waiter, schedule_task)
165 t = tasks[0]
166 try:
--> 167 run_with_retry(
168 t,
169 retry_policy,
170 configurable={
171 CONFIG_KEY_CALL: partial(
172 _call,
173 weakref.ref(t),
174 retry_policy=retry_policy,
175 futures=weakref.ref(futures),
176 schedule_task=schedule_task,
177 submit=self.submit,
178 ),
179 },
180 )
181 self.commit(t, None)
182 except Exception as exc:
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langgraph/pregel/_retry.py:42, in run_with_retry(task, retry_policy, configurable)
40 task.writes.clear()
41 # run the task
---> 42 return task.proc.invoke(task.input, config)
43 except ParentCommand as exc:
44 ns: str = config[CONF][CONFIG_KEY_CHECKPOINT_NS]
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langgraph/_internal/_runnable.py:656, in RunnableSeq.invoke(self, input, config, **kwargs)
654 # run in context
655 with set_config_context(config, run) as context:
--> 656 input = context.run(step.invoke, input, config, **kwargs)
657 else:
658 input = step.invoke(input, config)
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langgraph/_internal/_runnable.py:400, in RunnableCallable.invoke(self, input, config, **kwargs)
398 run_manager.on_chain_end(ret)
399 else:
--> 400 ret = self.func(*args, **kwargs)
401 if self.recurse and isinstance(ret, Runnable):
402 return ret.invoke(input, config)
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langgraph/prebuilt/tool_node.py:716, in ToolNode._func(self, input, config, runtime)
714 input_types = [input_type] * len(tool_calls)
715 with get_executor_for_config(config) as executor:
--> 716 outputs = list(
717 executor.map(self._run_one, tool_calls, input_types, tool_runtimes)
718 )
720 return self._combine_tool_outputs(outputs, input_type)
File /usr/local/lib/python3.13/concurrent/futures/_base.py:619, in Executor.map.<locals>.result_iterator()
616 while fs:
617 # Careful not to keep a reference to the popped future
618 if timeout is None:
--> 619 yield _result_or_cancel(fs.pop())
620 else:
621 yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
File /usr/local/lib/python3.13/concurrent/futures/_base.py:317, in _result_or_cancel(***failed resolving arguments***)
315 try:
316 try:
--> 317 return fut.result(timeout)
318 finally:
319 fut.cancel()
File /usr/local/lib/python3.13/concurrent/futures/_base.py:456, in Future.result(self, timeout)
454 raise CancelledError()
455 elif self._state == FINISHED:
--> 456 return self.__get_result()
457 else:
458 raise TimeoutError()
File /usr/local/lib/python3.13/concurrent/futures/_base.py:401, in Future.__get_result(self)
399 if self._exception is not None:
400 try:
--> 401 raise self._exception
402 finally:
403 # Break a reference cycle with the exception in self._exception
404 self = None
File /usr/local/lib/python3.13/concurrent/futures/thread.py:59, in _WorkItem.run(self)
56 return
58 try:
---> 59 result = self.fn(*self.args, **self.kwargs)
60 except BaseException as exc:
61 self.future.set_exception(exc)
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langchain_core/runnables/config.py:546, in ContextThreadPoolExecutor.map.<locals>._wrapped_fn(*args)
545 def _wrapped_fn(*args: Any) -> T:
--> 546 return contexts.pop().run(fn, *args)
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langgraph/prebuilt/tool_node.py:931, in ToolNode._run_one(self, call, input_type, tool_runtime)
927 config = tool_runtime.config
929 if self._wrap_tool_call is None:
930 # No wrapper - execute directly
--> 931 return self._execute_tool_sync(tool_request, input_type, config)
933 # Define execute callable that can be called multiple times
934 def execute(req: ToolCallRequest) -> ToolMessage | Command:
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langgraph/prebuilt/tool_node.py:880, in ToolNode._execute_tool_sync(self, request, input_type, config)
877 raise
879 # Error is handled - create error ToolMessage
--> 880 content = _handle_tool_error(e, flag=self._handle_tool_errors)
881 return ToolMessage(
882 content=content,
883 name=call["name"],
884 tool_call_id=call["id"],
885 status="error",
886 )
888 # Process successful response
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langgraph/prebuilt/tool_node.py:401, in _handle_tool_error(e, flag)
399 content = flag
400 elif callable(flag):
--> 401 content = flag(e) # type: ignore [assignment, call-arg]
402 else:
403 msg = (
404 f"Got unexpected type of `handle_tool_error`. Expected bool, str "
405 f"or callable. Received: {flag}"
406 )
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langgraph/prebuilt/tool_node.py:358, in _default_handle_tool_errors(e)
356 if isinstance(e, ToolInvocationError):
357 return e.message
--> 358 raise e
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langgraph/prebuilt/tool_node.py:833, in ToolNode._execute_tool_sync(self, request, input_type, config)
831 try:
832 try:
--> 833 response = tool.invoke(call_args, config)
834 except ValidationError as exc:
835 # Filter out errors for injected arguments
836 filtered_errors = _filter_validation_errors(
837 exc,
838 self._tool_to_state_args.get(call["name"], {}),
839 self._tool_to_store_arg.get(call["name"]),
840 self._tool_to_runtime_arg.get(call["name"]),
841 )
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langchain_core/tools/base.py:591, in BaseTool.invoke(self, input, config, **kwargs)
583 @override
584 def invoke(
585 self,
(...) 588 **kwargs: Any,
589 ) -> Any:
590 tool_input, kwargs = _prep_run_args(input, config, **kwargs)
--> 591 return self.run(tool_input, **kwargs)
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langchain_core/tools/base.py:897, in BaseTool.run(self, tool_input, verbose, start_color, color, callbacks, tags, metadata, run_name, run_id, config, tool_call_id, **kwargs)
895 if error_to_raise:
896 run_manager.on_tool_error(error_to_raise)
--> 897 raise error_to_raise
898 output = _format_output(content, artifact, tool_call_id, self.name, status)
899 run_manager.on_tool_end(output, color=color, name=self.name, **kwargs)
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langchain_core/tools/base.py:866, in BaseTool.run(self, tool_input, verbose, start_color, color, callbacks, tags, metadata, run_name, run_id, config, tool_call_id, **kwargs)
864 if config_param := _get_runnable_config_param(self._run):
865 tool_kwargs |= {config_param: config}
--> 866 response = context.run(self._run, *tool_args, **tool_kwargs)
867 if self.response_format == "content_and_artifact":
868 if not isinstance(response, tuple) or len(response) != 2:
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langchain_core/tools/structured.py:90, in StructuredTool._run(self, config, run_manager, *args, **kwargs)
88 if config_param := _get_runnable_config_param(self.func):
89 kwargs[config_param] = config
---> 90 return self.func(*args, **kwargs)
91 msg = "StructuredTool does not support sync invocation."
92 raise NotImplementedError(msg)
File /workspaces/langchain-academy/langgraph-examples/17-deepagent/src/deep_agents_from_scratch/task_tool.py:99, in _create_task_tool.<locals>.task(description, subagent_type, state, tool_call_id)
96 state["messages"] = [{"role": "user", "content": description}]
98 # 격리된 상태에서 하위 에이전트 실행
---> 99 result = sub_agent.invoke(state)
101 # Command 상태 업데이트를 통해 부모 에이전트에게 결과 반환
102 return Command(
103 update={
104 "files": result.get("files", {}), # 파일 변경 사항 병합
(...) 111 }
112 )
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langgraph/pregel/main.py:3094, in Pregel.invoke(self, input, config, context, stream_mode, print_mode, output_keys, interrupt_before, interrupt_after, durability, **kwargs)
3091 chunks: list[dict[str, Any] | Any] = []
3092 interrupts: list[Interrupt] = []
-> 3094 for chunk in self.stream(
3095 input,
3096 config,
3097 context=context,
3098 stream_mode=["updates", "values"]
3099 if stream_mode == "values"
3100 else stream_mode,
3101 print_mode=print_mode,
3102 output_keys=output_keys,
3103 interrupt_before=interrupt_before,
3104 interrupt_after=interrupt_after,
3105 durability=durability,
3106 **kwargs,
3107 ):
3108 if stream_mode == "values":
3109 if len(chunk) == 2:
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langgraph/pregel/main.py:2679, in Pregel.stream(self, input, config, context, stream_mode, print_mode, output_keys, interrupt_before, interrupt_after, durability, subgraphs, debug, **kwargs)
2677 for task in loop.match_cached_writes():
2678 loop.output_writes(task.id, task.writes, cached=True)
-> 2679 for _ in runner.tick(
2680 [t for t in loop.tasks.values() if not t.writes],
2681 timeout=self.step_timeout,
2682 get_waiter=get_waiter,
2683 schedule_task=loop.accept_push,
2684 ):
2685 # emit output
2686 yield from _output(
2687 stream_mode, print_mode, subgraphs, stream.get, queue.Empty
2688 )
2689 loop.after_tick()
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langgraph/pregel/_runner.py:167, in PregelRunner.tick(self, tasks, reraise, timeout, retry_policy, get_waiter, schedule_task)
165 t = tasks[0]
166 try:
--> 167 run_with_retry(
168 t,
169 retry_policy,
170 configurable={
171 CONFIG_KEY_CALL: partial(
172 _call,
173 weakref.ref(t),
174 retry_policy=retry_policy,
175 futures=weakref.ref(futures),
176 schedule_task=schedule_task,
177 submit=self.submit,
178 ),
179 },
180 )
181 self.commit(t, None)
182 except Exception as exc:
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langgraph/pregel/_retry.py:42, in run_with_retry(task, retry_policy, configurable)
40 task.writes.clear()
41 # run the task
---> 42 return task.proc.invoke(task.input, config)
43 except ParentCommand as exc:
44 ns: str = config[CONF][CONFIG_KEY_CHECKPOINT_NS]
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langgraph/_internal/_runnable.py:656, in RunnableSeq.invoke(self, input, config, **kwargs)
654 # run in context
655 with set_config_context(config, run) as context:
--> 656 input = context.run(step.invoke, input, config, **kwargs)
657 else:
658 input = step.invoke(input, config)
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langgraph/_internal/_runnable.py:400, in RunnableCallable.invoke(self, input, config, **kwargs)
398 run_manager.on_chain_end(ret)
399 else:
--> 400 ret = self.func(*args, **kwargs)
401 if self.recurse and isinstance(ret, Runnable):
402 return ret.invoke(input, config)
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langgraph/prebuilt/tool_node.py:716, in ToolNode._func(self, input, config, runtime)
714 input_types = [input_type] * len(tool_calls)
715 with get_executor_for_config(config) as executor:
--> 716 outputs = list(
717 executor.map(self._run_one, tool_calls, input_types, tool_runtimes)
718 )
720 return self._combine_tool_outputs(outputs, input_type)
File /usr/local/lib/python3.13/concurrent/futures/_base.py:619, in Executor.map.<locals>.result_iterator()
616 while fs:
617 # Careful not to keep a reference to the popped future
618 if timeout is None:
--> 619 yield _result_or_cancel(fs.pop())
620 else:
621 yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
File /usr/local/lib/python3.13/concurrent/futures/_base.py:317, in _result_or_cancel(***failed resolving arguments***)
315 try:
316 try:
--> 317 return fut.result(timeout)
318 finally:
319 fut.cancel()
File /usr/local/lib/python3.13/concurrent/futures/_base.py:456, in Future.result(self, timeout)
454 raise CancelledError()
455 elif self._state == FINISHED:
--> 456 return self.__get_result()
457 else:
458 raise TimeoutError()
File /usr/local/lib/python3.13/concurrent/futures/_base.py:401, in Future.__get_result(self)
399 if self._exception is not None:
400 try:
--> 401 raise self._exception
402 finally:
403 # Break a reference cycle with the exception in self._exception
404 self = None
File /usr/local/lib/python3.13/concurrent/futures/thread.py:59, in _WorkItem.run(self)
56 return
58 try:
---> 59 result = self.fn(*self.args, **self.kwargs)
60 except BaseException as exc:
61 self.future.set_exception(exc)
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langchain_core/runnables/config.py:546, in ContextThreadPoolExecutor.map.<locals>._wrapped_fn(*args)
545 def _wrapped_fn(*args: Any) -> T:
--> 546 return contexts.pop().run(fn, *args)
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langgraph/prebuilt/tool_node.py:931, in ToolNode._run_one(self, call, input_type, tool_runtime)
927 config = tool_runtime.config
929 if self._wrap_tool_call is None:
930 # No wrapper - execute directly
--> 931 return self._execute_tool_sync(tool_request, input_type, config)
933 # Define execute callable that can be called multiple times
934 def execute(req: ToolCallRequest) -> ToolMessage | Command:
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langgraph/prebuilt/tool_node.py:880, in ToolNode._execute_tool_sync(self, request, input_type, config)
877 raise
879 # Error is handled - create error ToolMessage
--> 880 content = _handle_tool_error(e, flag=self._handle_tool_errors)
881 return ToolMessage(
882 content=content,
883 name=call["name"],
884 tool_call_id=call["id"],
885 status="error",
886 )
888 # Process successful response
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langgraph/prebuilt/tool_node.py:401, in _handle_tool_error(e, flag)
399 content = flag
400 elif callable(flag):
--> 401 content = flag(e) # type: ignore [assignment, call-arg]
402 else:
403 msg = (
404 f"Got unexpected type of `handle_tool_error`. Expected bool, str "
405 f"or callable. Received: {flag}"
406 )
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langgraph/prebuilt/tool_node.py:358, in _default_handle_tool_errors(e)
356 if isinstance(e, ToolInvocationError):
357 return e.message
--> 358 raise e
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langgraph/prebuilt/tool_node.py:833, in ToolNode._execute_tool_sync(self, request, input_type, config)
831 try:
832 try:
--> 833 response = tool.invoke(call_args, config)
834 except ValidationError as exc:
835 # Filter out errors for injected arguments
836 filtered_errors = _filter_validation_errors(
837 exc,
838 self._tool_to_state_args.get(call["name"], {}),
839 self._tool_to_store_arg.get(call["name"]),
840 self._tool_to_runtime_arg.get(call["name"]),
841 )
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langchain_core/tools/base.py:591, in BaseTool.invoke(self, input, config, **kwargs)
583 @override
584 def invoke(
585 self,
(...) 588 **kwargs: Any,
589 ) -> Any:
590 tool_input, kwargs = _prep_run_args(input, config, **kwargs)
--> 591 return self.run(tool_input, **kwargs)
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langchain_core/tools/base.py:897, in BaseTool.run(self, tool_input, verbose, start_color, color, callbacks, tags, metadata, run_name, run_id, config, tool_call_id, **kwargs)
895 if error_to_raise:
896 run_manager.on_tool_error(error_to_raise)
--> 897 raise error_to_raise
898 output = _format_output(content, artifact, tool_call_id, self.name, status)
899 run_manager.on_tool_end(output, color=color, name=self.name, **kwargs)
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langchain_core/tools/base.py:866, in BaseTool.run(self, tool_input, verbose, start_color, color, callbacks, tags, metadata, run_name, run_id, config, tool_call_id, **kwargs)
864 if config_param := _get_runnable_config_param(self._run):
865 tool_kwargs |= {config_param: config}
--> 866 response = context.run(self._run, *tool_args, **tool_kwargs)
867 if self.response_format == "content_and_artifact":
868 if not isinstance(response, tuple) or len(response) != 2:
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/langchain_core/tools/structured.py:90, in StructuredTool._run(self, config, run_manager, *args, **kwargs)
88 if config_param := _get_runnable_config_param(self.func):
89 kwargs[config_param] = config
---> 90 return self.func(*args, **kwargs)
91 msg = "StructuredTool does not support sync invocation."
92 raise NotImplementedError(msg)
File /workspaces/langchain-academy/langgraph-examples/17-deepagent/src/deep_agents_from_scratch/research_tools.py:195, in tavily_search(query, state, tool_call_id, max_results, topic)
187 search_results = run_tavily_search(
188 query,
189 max_results=max_results,
190 topic=topic,
191 include_raw_content=True,
192 )
194 # 결과 처리 및 요약
--> 195 processed_results = process_search_results(search_results)
197 # 각 결과를 파일에 저장하고 요약 준비
198 files = state.get("files", {})
File /workspaces/langchain-academy/langgraph-examples/17-deepagent/src/deep_agents_from_scratch/research_tools.py:123, in process_search_results(results)
120 url = result["url"]
122 # URL 읽기
--> 123 response = HTTPX_CLIENT.get(url)
125 if response.status_code == 200:
126 # HTML을 마크다운으로 변환
127 raw_content = markdownify(response.text)
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/httpx/_client.py:1053, in Client.get(self, url, params, headers, cookies, auth, follow_redirects, timeout, extensions)
1036 def get(
1037 self,
1038 url: URL | str,
(...) 1046 extensions: RequestExtensions | None = None,
1047 ) -> Response:
1048 """
1049 Send a `GET` request.
1050
1051 **Parameters**: See `httpx.request`.
1052 """
-> 1053 return self.request(
1054 "GET",
1055 url,
1056 params=params,
1057 headers=headers,
1058 cookies=cookies,
1059 auth=auth,
1060 follow_redirects=follow_redirects,
1061 timeout=timeout,
1062 extensions=extensions,
1063 )
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/httpx/_client.py:825, in Client.request(self, method, url, content, data, files, json, params, headers, cookies, auth, follow_redirects, timeout, extensions)
810 warnings.warn(message, DeprecationWarning, stacklevel=2)
812 request = self.build_request(
813 method=method,
814 url=url,
(...) 823 extensions=extensions,
824 )
--> 825 return self.send(request, auth=auth, follow_redirects=follow_redirects)
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/httpx/_client.py:914, in Client.send(self, request, stream, auth, follow_redirects)
910 self._set_timeout(request)
912 auth = self._build_request_auth(request, auth)
--> 914 response = self._send_handling_auth(
915 request,
916 auth=auth,
917 follow_redirects=follow_redirects,
918 history=[],
919 )
920 try:
921 if not stream:
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/httpx/_client.py:942, in Client._send_handling_auth(self, request, auth, follow_redirects, history)
939 request = next(auth_flow)
941 while True:
--> 942 response = self._send_handling_redirects(
943 request,
944 follow_redirects=follow_redirects,
945 history=history,
946 )
947 try:
948 try:
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/httpx/_client.py:979, in Client._send_handling_redirects(self, request, follow_redirects, history)
976 for hook in self._event_hooks["request"]:
977 hook(request)
--> 979 response = self._send_single_request(request)
980 try:
981 for hook in self._event_hooks["response"]:
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/httpx/_client.py:1014, in Client._send_single_request(self, request)
1009 raise RuntimeError(
1010 "Attempted to send an async request with a sync Client instance."
1011 )
1013 with request_context(request=request):
-> 1014 response = transport.handle_request(request)
1016 assert isinstance(response.stream, SyncByteStream)
1018 response.request = request
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/httpx/_transports/default.py:249, in HTTPTransport.handle_request(self, request)
235 import httpcore
237 req = httpcore.Request(
238 method=request.method,
239 url=httpcore.URL(
(...) 247 extensions=request.extensions,
248 )
--> 249 with map_httpcore_exceptions():
250 resp = self._pool.handle_request(req)
252 assert isinstance(resp.stream, typing.Iterable)
File /usr/local/lib/python3.13/contextlib.py:162, in _GeneratorContextManager.__exit__(self, typ, value, traceback)
160 value = typ()
161 try:
--> 162 self.gen.throw(value)
163 except StopIteration as exc:
164 # Suppress StopIteration *unless* it's the same exception that
165 # was passed to throw(). This prevents a StopIteration
166 # raised inside the "with" statement from being suppressed.
167 return exc is not value
File /workspaces/langchain-academy/.venv/lib/python3.13/site-packages/httpx/_transports/default.py:118, in map_httpcore_exceptions()
115 raise
117 message = str(exc)
--> 118 raise mapped_exc(message) from exc
ConnectTimeout: timed out
During task with name 'tools' and id '30df9e54-7391-ff1f-cd36-ee125050ab7f'
During task with name 'tools' and id '41ce4c64-bb38-8628-a5b6-8137f5b9b443'