Open in Colab Open in LangChain Academy

동적 중단점(Dynamic breakpoints)

복습

우리는 Human-in-the-Loop의 동기에 대해 논의했습니다:

(1) 승인 - 에이전트를 중단하고, 사용자에게 상태를 표시하며, 사용자가 작업을 승인할 수 있도록 할 수 있습니다

(2) 디버깅 - 그래프를 되감아 문제를 재현하거나 회피할 수 있습니다

(3) 편집 - 상태를 수정할 수 있습니다

특정 단계에서 그래프를 중지하는 일반적인 방법으로 중단점을 다루었으며, 이는 승인과 같은 사용 사례를 가능하게 합니다

또한 그래프 상태를 편집하고 사람의 피드백을 도입하는 방법도 보여주었습니다.

목표

중단점은 그래프 컴파일 중에 특정 노드에 개발자가 설정합니다.

하지만 때로는 그래프가 동적으로 스스로 중단하도록 하는 것이 유용합니다!

이것은 내부 중단점이며, NodeInterrupt를 사용하여 구현할 수 있습니다.

이는 몇 가지 구체적인 이점이 있습니다:

(1) 조건부로 수행할 수 있습니다(개발자가 정의한 로직에 따라 노드 내부에서).

(2) 중단된 이유를 사용자에게 전달할 수 있습니다(NodeInterrupt에 원하는 것을 전달하여).

입력 길이에 따라 NodeInterrupt가 발생하는 그래프를 만들어봅시다.

%%capture --no-stderr
%pip install --quiet -U langgraph langchain_openai langgraph_sdk
from IPython.display import Image, display
 
from typing_extensions import TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langgraph.errors import NodeInterrupt
from langgraph.graph import START, END, StateGraph
 
 
class State(TypedDict):
    input: str
 
 
def step_1(state: State) -> State:
    print("---Step 1---")
    return state
 
 
def step_2(state: State) -> State:
    # Let's optionally raise a NodeInterrupt if the length of the input is longer than 5 characters
    if len(state["input"]) > 5:
        raise NodeInterrupt(f"5글자보다 긴 입력을 받았습니다: {state['input']}")
 
    print("---Step 2---")
    return state
 
 
def step_3(state: State) -> State:
    print("---Step 3---")
    return state
 
 
builder = StateGraph(State)
builder.add_node("step_1", step_1)
builder.add_node("step_2", step_2)
builder.add_node("step_3", step_3)
builder.add_edge(START, "step_1")
builder.add_edge("step_1", "step_2")
builder.add_edge("step_2", "step_3")
builder.add_edge("step_3", END)
 
# Set up memory
memory = MemorySaver()
 
# Compile the graph with memory
graph = builder.compile(checkpointer=memory)
 
# View
display(Image(graph.get_graph().draw_mermaid_png()))

jpeg

입력 길이가 5글자보다 긴 그래프를 실행해 보자.

initial_input = {"input": "hello world"}
thread_config = {"configurable": {"thread_id": "1"}}
 
# 첫 번째 중단이 발생할 때까지 그래프를 실행하십시오
for event in graph.stream(initial_input, thread_config, stream_mode="values"):
    print(event)
{'input': 'hello world'}
---Step 1---
{'input': 'hello world'}

이 시점에서 그래프 상태를 검사하면, 다음에 실행할 노드(step_2)가 설정된 것을 볼 수 있습니다.

state = graph.get_state(thread_config)
print(state.next)
('step_2',)

Interrupt가 상태에 기록된 것을 볼 수 있습니다.

print(state.tasks)
(PregelTask(id='6eb3910d-e231-5ba2-b25e-28ad575690bd', name='step_2', error=None, interrupts=(Interrupt(value='Received input that is longer than 5 characters: hello world', when='during'),), state=None),)

중단점에서 그래프를 재개하려고 시도할 수 있습니다.

하지만 이것은 단순히 동일한 노드를 다시 실행합니다!

상태가 변경되지 않으면 여기에 갇히게 됩니다.

for event in graph.stream(None, thread_config, stream_mode="values"):
    print(event)
{'input': 'hello world'}
state = graph.get_state(thread_config)
print(state.next)
('step_2',)

이제 상태를 업데이트할 수 있습니다.

graph.update_state(
    thread_config,
    {"input": "hi"},
)
{'configurable': {'thread_id': '1',
  'checkpoint_ns': '',
  'checkpoint_id': '1ef6a434-06cf-6f1e-8002-0ea6dc69e075'}}
for event in graph.stream(None, thread_config, stream_mode="values"):
    print(event)
{'input': 'hi'}
---Step 2---
{'input': 'hi'}
---Step 3---
{'input': 'hi'}

LangGraph API 사용법

로컬 개발 서버를 시작하려면 이 모듈의 /studio 디렉토리에서 터미널에 다음 명령어를 실행하세요:

langgraph dev

다음과 같은 출력을 볼 수 있습니다:

- 🚀 API: http://127.0.0.1:2024
- 🎨 Studio UI: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024
- 📚 API Docs: http://127.0.0.1:2024/docs

브라우저를 열고 Studio UI로 이동하세요: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:2024.

SDK를 통해 연결합니다.

from langgraph_sdk import get_client
 
# This is the URL of the local development server
URL = "http://127.0.0.1:2024"
client = get_client(url=URL)
 
# Search all hosted graphs
assistants = await client.assistants.search()
thread = await client.threads.create()
input_dict = {"input": "hello world"}
 
async for chunk in client.runs.stream(
    thread["thread_id"],
    assistant_id="dynamic_breakpoints",
    input=input_dict,
    stream_mode="values",
):
    print(f"Receiving new event of type: {chunk.event}...")
    print(chunk.data)
    print("\n\n")
Receiving new event of type: metadata...
{'run_id': '1ef6a43a-1b04-64d0-9a79-1caff72c8a89'}



Receiving new event of type: values...
{'input': 'hello world'}



Receiving new event of type: values...
{'input': 'hello world'}


current_state = await client.threads.get_state(thread["thread_id"])
current_state["next"]
['step_2']
await client.threads.update_state(thread["thread_id"], {"input": "hi!"})
{'configurable': {'thread_id': 'ea8c2912-987e-49d9-b890-6e81d46065f9',
  'checkpoint_ns': '',
  'checkpoint_id': '1ef6a43a-64b2-6e85-8002-3cf4f2873968'},
 'checkpoint_id': '1ef6a43a-64b2-6e85-8002-3cf4f2873968'}
async for chunk in client.runs.stream(
    thread["thread_id"],
    assistant_id="dynamic_breakpoints",
    input=None,
    stream_mode="values",
):
    print(f"Receiving new event of type: {chunk.event}...")
    print(chunk.data)
    print("\n\n")
Receiving new event of type: metadata...
{'run_id': '1ef64c33-fb34-6eaf-8b59-1d85c5b8acc9'}



Receiving new event of type: values...
{'input': 'hi!'}



Receiving new event of type: values...
{'input': 'hi!'}


current_state = await client.threads.get_state(thread["thread_id"])
current_state
{'values': {'input': 'hi!'},
 'next': ['step_2'],
 'tasks': [{'id': '858e41b2-6501-585c-9bca-55c1e729ef91',
   'name': 'step_2',
   'error': None,
   'interrupts': [],
   'state': None}],
 'metadata': {'step': 2,
  'source': 'update',
  'writes': {'step_1': {'input': 'hi!'}},
  'parents': {},
  'graph_id': 'dynamic_breakpoints'},
 'created_at': '2024-09-03T22:27:05.707260+00:00',
 'checkpoint_id': '1ef6a43a-64b2-6e85-8002-3cf4f2873968',
 'parent_checkpoint_id': '1ef6a43a-1cb8-6c3d-8001-7b11d0d34f00'}