병렬 노드 실행
복습
모듈 3에서는 human-in-the-loop에 대해 깊이 있게 다루며, 3가지 일반적인 사용 사례를 보여주었습니다:
(1) Approval – 에이전트를 중단하고 상태를 사용자에게 표시하여 사용자가 행동을 승인하도록 할 수 있습니다.
(2) Debugging – 그래프를 되감아 문제를 재현하거나 회피할 수 있습니다.
(3) Editing – 상태를 수정할 수 있습니다.
목표
이 모듈은 모듈 2에서 논의된 memory 개념과 함께 human-in-the-loop을 기반으로 합니다.
multi-agent 워크플로우를 탐구하고, 이번 강좌의 모든 모듈을 연결하는 멀티 에이전트 연구 보조자를 구축할 것입니다.
이 멀티 에이전트 연구 보조자를 만들기 위해 먼저 LangGraph 제어 가능성 주제 몇 가지를 논의합니다.
먼저 parallelization부터 시작하겠습니다.
팬아웃(Fan out) 및 팬인(Fan in)
각 단계에서 상태를 덮어쓰는 간단한 선형 그래프를 만들어 봅시다.
%%capture --no-stderr
%pip install -U langgraph tavily-python wikipedia langchain_openai langchain_community langgraph_sdkfrom dotenv import load_dotenv
load_dotenv("../.env", override=True)import os
import getpass
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")from IPython.display import Image, display
from typing import Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
state: str
class ReturnNodeValue:
def __init__(self, node_secret: str):
self._value = node_secret
def __call__(self, state: State) -> Any:
print(f"Adding {self._value} to {state['state']}")
return {"state": [self._value]}
# Add nodes
builder = StateGraph(State)
# Initialize each node with node_secret
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("b", "c")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))
예상대로 상태를 덮어씁니다.
graph.invoke({"state": []})Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm B"]
Adding I'm D to ["I'm C"]
{'state': ["I'm D"]}
이제 b와 c를 병렬로 실행해 봅시다.
그 다음 d를 실행합니다.
a에서 b와 c로 팬‑아웃하고, 이후 d로 팬‑인하면 쉽게 할 수 있습니다.
각 단계가 끝날 때 상태 업데이트가 적용됩니다.
실행해 보겠습니다.
builder = StateGraph(State)
# Initialize each node with node_secret
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))
오류가 발생했습니다!
이는 b와 c가 같은 단계에서 동일한 상태 키/채널에 동시에 쓰고 있기 때문입니다.
from langgraph.errors import InvalidUpdateError
try:
graph.invoke({"state": []})
except InvalidUpdateError as e:
print(f"An error occurred: {e}")Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
An error occurred: At key 'state': Can receive only one value per step. Use an Annotated key to handle multiple values.
For troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/INVALID_CONCURRENT_GRAPH_UPDATE
fan out을 사용할 때, 단계들이 동일한 채널 / 키에 기록한다면 reducer를 사용하고 있는지 확인해야 합니다.
Module 2에서 언급했듯이, operator.add는 파이썬 내장 operator 모듈의 함수입니다.
operator.add를 리스트에 적용하면 리스트 연결을 수행합니다.
import operator
from typing import Annotated
class State(TypedDict):
state: Annotated[list, operator.add]
# Add nodes
builder = StateGraph(State)
# Initialize each node with node_secret
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))
graph.invoke({"state": []})Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm D to ["I'm A", "I'm B", "I'm C"]
{'state': ["I'm A", "I'm B", "I'm C", "I'm D"]}
이제 우리는 b와 c가 병렬로 수행한 업데이트에 대해 상태에 추가한다는 것을 알 수 있다.
노드가 완료될 때까지 기다리기
이제 한 병렬 경로가 다른 경로보다 단계가 더 많은 경우를 고려해 보겠습니다.
builder = StateGraph(State)
# Initialize each node with node_secret
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("b2", ReturnNodeValue("I'm B2"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b2")
builder.add_edge(["b2", "c"], "d")
builder.add_edge("d", END)
graph = builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))
이 경우 b, b2, c는 모두 같은 단계에 속합니다. 그래프는 이들 모두가 완료될 때까지 기다렸다가 단계 d로 진행합니다.
graph.invoke({"state": []})Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm B2 to ["I'm A", "I'm B", "I'm C"]
Adding I'm D to ["I'm A", "I'm B", "I'm C", "I'm B2"]
{'state': ["I'm A", "I'm B", "I'm C", "I'm B2", "I'm D"]}
상태 업데이트 순서 설정
하지만 각 단계 내에서는 상태 업데이트 순서를 구체적으로 제어할 수 없습니다!
간단히 말하면, 이는 그래프 토폴로지를 기반으로 LangGraph가 결정하는 결정론적 순서이며 우리가 제어할 수 없습니다.
위 예시에서는 c가 b2보다 먼저 추가되는 것을 볼 수 있습니다.
하지만 사용자 정의 리듀서를 사용하여 예를 들어 상태 업데이트를 정렬하는 등 순서를 커스터마이즈할 수 있습니다.
def sorting_reducer(left, right):
"""Combines and sorts the values in a list"""
if not isinstance(left, list):
left = [left]
if not isinstance(right, list):
right = [right]
return sorted(left + right, reverse=False)
class State(TypedDict):
# sorting_reducer는 상태의 값들을 정렬합니다
state: Annotated[list, sorting_reducer]
# Add nodes
builder = StateGraph(State)
# Initialize each node with node_secret
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("b2", ReturnNodeValue("I'm B2"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b2")
builder.add_edge(["b2", "c"], "d")
builder.add_edge("d", END)
graph = builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))
graph.invoke({"state": []})Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm B2 to ["I'm A", "I'm B", "I'm C"]
Adding I'm D to ["I'm A", "I'm B", "I'm B2", "I'm C"]
{'state': ["I'm A", "I'm B", "I'm B2", "I'm C", "I'm D"]}
이제 리듀서는 업데이트된 상태 값을 정렬합니다!
sorting_reducer 예제는 모든 값을 전역적으로 정렬합니다. 다음과 같이 구현할 수도 있습니다:
- 병렬 단계에서 출력값을 상태의 별도 필드에 기록하기
- 병렬 단계 이후에 “sink” 노드를 사용해 해당 출력값들을 결합하고 정렬하기
- 결합이 끝난 뒤 임시 필드를 비우기
자세한 내용은 문서를 참고하세요.
LLM 작업하기
이제 현실적인 예시를 추가해 보겠습니다!
두 개의 외부 소스(위키피디아와 웹 검색)에서 컨텍스트를 수집하고, LLM이 질문에 답하도록 합니다.
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o", temperature=0)class State(TypedDict):
question: str
answer: str
context: Annotated[list, operator.add]다양한 웹 검색 도구를 시도해 볼 수 있습니다. Tavily는 고려해 볼 만한 좋은 옵션 중 하나이며, TAVILY_API_KEY가 설정되어 있는지 확인하십시오.
import os
import getpass
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("TAVILY_API_KEY")from langchain_core.messages import HumanMessage, SystemMessage
from langchain_community.document_loaders import WikipediaLoader
from langchain_community.tools import TavilySearchResults
def search_web(state):
"""Retrieve docs from web search"""
# Search
tavily_search = TavilySearchResults(max_results=3)
search_docs = tavily_search.invoke(state["question"])
# Format
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document href="{doc["url"]}">\n{doc["content"]}\n</Document>'
for doc in search_docs
]
)
return {"context": [formatted_search_docs]}
def search_wikipedia(state):
"""Retrieve docs from wikipedia"""
# Search
search_docs = WikipediaLoader(query=state["question"], load_max_docs=2).load()
# Format
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}">\n{doc.page_content}\n</Document>'
for doc in search_docs
]
)
return {"context": [formatted_search_docs]}
def generate_answer(state):
"""Node to answer a question"""
# Get state
context = state["context"]
question = state["question"]
# Template
answer_template = """Answer the question {question} using this context: {context}"""
answer_instructions = answer_template.format(question=question, context=context)
# Answer
answer = llm.invoke(
[SystemMessage(content=answer_instructions)]
+ [HumanMessage(content=f"Answer the question.")]
)
# Append it to state
return {"answer": answer}
# Add nodes
builder = StateGraph(State)
# Initialize each node with node_secret
builder.add_node("search_web", search_web)
builder.add_node("search_wikipedia", search_wikipedia)
builder.add_node("generate_answer", generate_answer)
# Flow
builder.add_edge(START, "search_wikipedia")
builder.add_edge(START, "search_web")
builder.add_edge("search_wikipedia", "generate_answer")
builder.add_edge("search_web", "generate_answer")
builder.add_edge("generate_answer", END)
graph = builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))
result = graph.invoke({"question": "How were Nvidia's Q2 2024 earnings"})
result["answer"].content/var/folders/jz/2tg__xkj7fq6_dzmcc85dlvh0000gn/T/ipykernel_5990/3921850799.py:11: LangChainDeprecationWarning: The class `TavilySearchResults` was deprecated in LangChain 0.3.25 and will be removed in 1.0. An updated version of the class exists in the :class:`~langchain-tavily package and should be used instead. To use it run `pip install -U :class:`~langchain-tavily` and import as `from :class:`~langchain_tavily import TavilySearch``.
tavily_search = TavilySearchResults(max_results=3)
"Nvidia's Q2 2024 earnings were strong, showcasing record revenue and a robust performance in its data center division. The company reported revenue of $30.0 billion, which was up 15% from the previous quarter and up 122% from a year ago. GAAP earnings per diluted share were $0.67, up 12% from the previous quarter and up 168% from a year ago, while non-GAAP earnings per diluted share were $0.68, up 11% from the previous quarter and up 152% from a year ago. Despite a slight dip in gross margin from 78% in Q1 to 75% in Q2, Nvidia remains a dominant force in the AI chip sector. Additionally, Nvidia returned $15.4 billion to shareholders through share repurchases and cash dividends and approved an additional $50.0 billion in share repurchase authorization."
LangGraph API와 함께 사용하기
이 모듈의 /studio 디렉토리에서 터미널에 다음 명령어를 실행하세요:
langgraph dev다음과 같은 출력이 표시됩니다:
- 🚀 API: http://127.0.0.1:2024
- 🎨 Studio UI: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024
- 📚 API Docs: http://127.0.0.1:2024/docs브라우저를 열고 Studio UI로 이동하세요: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024
LangGraph API는 그래프 상태 편집을 지원합니다.
from langgraph_sdk import get_client
client = get_client(url="http://127.0.0.1:2024")thread = await client.threads.create()
input_question = {"question": "How were Nvidia Q2 2024 earnings?"}
async for event in client.runs.stream(
thread["thread_id"],
assistant_id="parallelization",
input=input_question,
stream_mode="values",
):
# Check if answer has been added to state
if event.data is not None:
answer = event.data.get("answer", None)
if answer:
print(answer["content"])Nvidia's Q2 2024 earnings were strong, showcasing record revenue and a robust performance in its data center division. The company reported revenue of $30.0 billion, which was up 15% from the previous quarter and up 122% from a year ago. GAAP earnings per diluted share were $0.67, up 12% from the previous quarter and up 168% from a year ago. Non-GAAP earnings per diluted share were $0.68, up 11% from the previous quarter and up 152% from a year ago. However, the gross margin saw a decline from 78% in Q1 to 75% in Q2, which could indicate higher production costs or more aggressive pricing strategies. Despite this, Nvidia remains a dominant force in the AI chip sector.