LangGraph Platform 배포에 연결하기
배포 생성 (복습)
방금 모듈 5의 task_maistro 앱에 대한 배포를 생성했습니다.
- LangGraph CLI를 사용하여
task_maistro그래프가 포함된 LangGraph 서버용 Docker 이미지를 빌드했습니다. - 제공된
docker-compose.yml파일을 사용하여 정의된 서비스에 따라 세 개의 개별 컨테이너를 생성했습니다:langgraph-redis: 공식 Redis 이미지를 사용하는 새 컨테이너를 생성합니다.langgraph-postgres: 공식 Postgres 이미지를 사용하는 새 컨테이너를 생성합니다.langgraph-api: 미리 빌드한task_maistroDocker 이미지를 사용하는 새 컨테이너를 생성합니다.
$ cd module-6/deployment
$ docker compose up실행이 완료되면, 다음 주소를 통해 배포된 서비스에 접근할 수 있습니다:
- API: http://localhost:8123
- 문서(Docs): http://localhost:8123/docs
- LangGraph Studio: https://smith.langchain.com/studio/?baseUrl=http://127.0.0.1:8123
API 사용하기
LangGraph 서버는 배포된 에이전트와 상호작용하기 위한 다양한 API 엔드포인트를 제공합니다.
이 엔드포인트들은 일반적인 에이전트의 몇 가지 요구사항에 따라 그룹화할 수 있습니다:
- 실행(Runs): 단일 원자적(atomic) 에이전트 실행
- 스레드(Threads): 다중 턴(multi-turn) 상호작용 또는 사용자 개입(human-in-the-loop)
- 저장소(Store): 장기 기억(long-term memory)
API 문서에서 직접 요청을 테스트해 볼 수 있습니다.
SDK
LangGraph SDK (Python 및 JS)는 위에서 소개한 LangGraph 서버 API와 상호작용하기 위한 개발자 친화적인 인터페이스를 제공합니다.
capture --no-stderr
%pip install -U langchain_openai langgraph langchain_corefrom langgraph.pregel.remote import RemoteGraph
from langchain_core.messages import convert_to_messages
from langchain_core.messages import HumanMessage, SystemMessage
# 원격 그래프를 통해 연결
url = "http://localhost:8123"
graph_name = "task_maistro"
remote_graph = RemoteGraph(graph_name, url=url)실행 (Runs)
“실행(run)“은 그래프의 단일 실행을 의미합니다. 클라이언트가 요청을 보낼 때마다 다음 과정이 일어납니다:
- HTTP 워커(worker)가 고유한 실행 ID(run ID)를 생성합니다.
- 이 실행과 그 결과는 PostgreSQL에 저장됩니다.
- 이 실행 기록을 쿼리하여 다음을 수행할 수 있습니다:
- 상태 확인
- 결과 가져오기
- 실행 기록 추적
다양한 유형의 실행에 대한 전체 How To 가이드는 여기에서 확인할 수 있습니다.
이제 실행(run)을 통해 할 수 있는 몇 가지 흥미로운 작업들을 살펴보겠습니다.
백그라운드 실행 (Background Runs)
LangGraph 서버는 두 가지 유형의 실행을 지원합니다:
- Fire and forget (실행 후 망각) - 백그라운드에서 실행을 시작하고, 완료될 때까지 기다리지 않습니다.
- 응답 대기 (블로킹 또는 폴링) - 실행을 시작하고 그 출력을 기다리거나 스트리밍합니다.
백그라운드 실행과 폴링은 오래 실행되는 에이전트와 작업할 때 매우 유용합니다.
이것이 어떻게 작동하는지 알아보겠습니다.
# 새 스레드 생성
thread = await client.threads.create()
thread{'thread_id': '7f71c0dd-768b-4e53-8349-42bdd10e7caf',
'created_at': '2024-11-14T19:36:08.459457+00:00',
'updated_at': '2024-11-14T19:36:08.459457+00:00',
'metadata': {},
'status': 'idle',
'config': {},
'values': None}
# 스레드에서 실행 중인 작업이 있는지 확인
thread = await client.threads.create()
runs = await client.runs.list(thread["thread_id"])
print(runs)[]
# 몇 가지 To-Do(할 일)를 생성하여 특정 user_id에 저장합니다.
user_input = "다음 주 말까지 홍콩 여행 예약을 마치는 To-Do를 추가해줘. 그리고 추수감사절 계획에 대해 부모님께 다시 전화드리는 To-Do도 추가해줘."
config = {"configurable": {"user_id": "Test"}}
graph_name = "task_maistro"
run = await client.runs.create(
thread["thread_id"],
graph_name,
input={
"messages": [HumanMessage(content=user_input)],
},
config=config,
)# 새로운 스레드와 실행(run)을 시작합니다.
thread = await client.threads.create()
user_input = "모든 To-Do(할 일) 항목들을 요약해줘."
config = {"configurable": {"user_id": "Test"}}
graph_name = "task_maistro"
run = await client.runs.create(
thread["thread_id"],
graph_name,
input={
"messages": [HumanMessage(content=user_input)],
},
config=config,
)# 실행 상태 확인
print(await client.runs.get(thread["thread_id"], run["run_id"])){'run_id': '1efa2c00-63e4-6f4a-9c5b-ca3f5f9bff07', 'thread_id': '641c195a-9e31-4250-a729-6b742c089df8', 'assistant_id': 'ea4ebafa-a81d-5063-a5fa-67c755d98a21', 'created_at': '2024-11-14T19:38:29.394777+00:00', 'updated_at': '2024-11-14T19:38:29.394777+00:00', 'metadata': {}, 'status': 'pending', 'kwargs': {'input': {'messages': [{'id': None, 'name': None, 'type': 'human', 'content': 'Give me a summary of all ToDos.', 'example': False, 'additional_kwargs': {}, 'response_metadata': {}}]}, 'config': {'metadata': {'created_by': 'system'}, 'configurable': {'run_id': '1efa2c00-63e4-6f4a-9c5b-ca3f5f9bff07', 'user_id': 'Test', 'graph_id': 'task_maistro', 'thread_id': '641c195a-9e31-4250-a729-6b742c089df8', 'assistant_id': 'ea4ebafa-a81d-5063-a5fa-67c755d98a21'}}, 'webhook': None, 'subgraphs': False, 'temporary': False, 'stream_mode': ['values'], 'feedback_keys': None, 'interrupt_after': None, 'interrupt_before': None}, 'multitask_strategy': 'reject'}
실행이 아직 진행 중이므로 'status': 'pending' 상태인 것을 확인할 수 있습니다.
만약 실행이 완료될 때까지 기다려서, 이를 블로킹(blocking) 실행으로 만들고 싶다면 어떻게 해야 할까요?
client.runs.join을 사용하면 실행이 완료될 때까지 기다릴 수 있습니다.
이렇게 하면 해당 스레드에서 현재 실행이 완료될 때까지 새로운 실행이 시작되지 않도록 보장할 수 있습니다.
# 실행이 완료될 때까지 기다리십시오
await client.runs.join(thread["thread_id"], run["run_id"])
print(await client.runs.get(thread["thread_id"], run["run_id"])){'run_id': '1efa2c00-63e4-6f4a-9c5b-ca3f5f9bff07', 'thread_id': '641c195a-9e31-4250-a729-6b742c089df8', 'assistant_id': 'ea4ebafa-a81d-5063-a5fa-67c755d98a21', 'created_at': '2024-11-14T19:38:29.394777+00:00', 'updated_at': '2024-11-14T19:38:29.394777+00:00', 'metadata': {}, 'status': 'success', 'kwargs': {'input': {'messages': [{'id': None, 'name': None, 'type': 'human', 'content': 'Give me a summary of all ToDos.', 'example': False, 'additional_kwargs': {}, 'response_metadata': {}}]}, 'config': {'metadata': {'created_by': 'system'}, 'configurable': {'run_id': '1efa2c00-63e4-6f4a-9c5b-ca3f5f9bff07', 'user_id': 'Test', 'graph_id': 'task_maistro', 'thread_id': '641c195a-9e31-4250-a729-6b742c089df8', 'assistant_id': 'ea4ebafa-a81d-5063-a5fa-67c755d98a21'}}, 'webhook': None, 'subgraphs': False, 'temporary': False, 'stream_mode': ['values'], 'feedback_keys': None, 'interrupt_after': None, 'interrupt_before': None}, 'multitask_strategy': 'reject'}
이제 실행이 완료되었으므로 'status': 'success' 상태가 되었습니다.
스트리밍 실행 (Streaming Runs)
클라이언트가 스트리밍 요청을 보낼 때마다 다음 과정이 일어납니다:
- HTTP 워커(worker)가 고유한 실행 ID(run ID)를 생성합니다.
- 큐(Queue) 워커가 해당 실행에 대한 작업을 시작합니다.
- 실행 중에 큐 워커는 Redis로 업데이트를 발행(publish)합니다.
- HTTP 워커는 해당 실행에 대한 Redis의 업데이트를 구독(subscribe)하고, 이를 클라이언트에게 반환합니다.
이러한 방식으로 스트리밍이 가능해집니다!
이전 모듈들에서 스트리밍에 대해 다루었지만, 여기서는 그중 한 가지 방법인 토큰 스트리밍(streaming tokens) 에 초점을 맞춰보겠습니다.
클라이언트에게 토큰을 스트리밍으로 반환하는 것은, 완료되기까지 시간이 걸릴 수 있는 운영 환경의 에이전트와 작업할 때 특히 유용합니다.
stream_mode="messages-tuple"을 사용하여 토큰을 스트리밍할 수 있습니다.
user_input = "어떤 To-Do(할 일)에 가장 먼저 집중해야 할까?"
# stream_mode="messages-tuple"을 사용하여 응답을 스트리밍합니다.
async for chunk in client.runs.stream(
thread["thread_id"],
graph_name,
input={"messages": [HumanMessage(content=user_input)]},
config=config,
stream_mode="messages-tuple",
):
# 이벤트가 'messages'인 경우, 데이터 조각(chunk)의 내용을 실시간으로 출력합니다.
if chunk.event == "messages":
print(
"".join(
data_item["content"]
for data_item in chunk.data
if "content" in data_item
),
end="",
flush=True,
)You might want to focus on "Call parents back about Thanksgiving plans" first. It has a shorter estimated time to complete (15 minutes) and doesn't have a specific deadline, so it could be a quick task to check off your list. Once that's done, you can dedicate more time to "Finish booking travel to Hong Kong," which is more time-consuming and has a deadline.
스레드 (Threads)
실행(run)이 그래프의 단일 실행인 반면, 스레드는 다중 턴(multi-turn) 상호작용을 지원합니다.
클라이언트가 thread_id를 사용하여 그래프를 실행하면, 서버는 실행 중의 모든 체크포인트(단계)를 해당 스레드의 정보로 Postgres 데이터베이스에 저장합니다.
서버를 통해 생성된 스레드의 상태를 확인할 수 있습니다.
스레드 상태 확인
또한, 특정 스레드에 저장된 상태 체크포인트에 쉽게 접근할 수 있습니다.
thread_state = await client.threads.get_state(thread["thread_id"])
for m in convert_to_messages(thread_state["values"]["messages"]):
m.pretty_print()================================[1m Human Message [0m=================================
Give me a summary of all ToDos.
==================================[1m Ai Message [0m==================================
Here's a summary of your current ToDo list:
1. **Task:** Finish booking travel to Hong Kong
- **Status:** Not started
- **Deadline:** November 22, 2024
- **Solutions:**
- Check flight prices on Skyscanner
- Book hotel through Booking.com
- Arrange airport transfer
- **Estimated Time to Complete:** 120 minutes
2. **Task:** Call parents back about Thanksgiving plans
- **Status:** Not started
- **Deadline:** None
- **Solutions:**
- Check calendar for availability
- Discuss travel arrangements
- Confirm dinner plans
- **Estimated Time to Complete:** 15 minutes
Let me know if there's anything else you'd like to do with your ToDo list!
================================[1m Human Message [0m=================================
What ToDo should I focus on first.
==================================[1m Ai Message [0m==================================
You might want to focus on "Call parents back about Thanksgiving plans" first. It has a shorter estimated time to complete (15 minutes) and doesn't have a specific deadline, so it could be a quick task to check off your list. Once that's done, you can dedicate more time to "Finish booking travel to Hong Kong," which is more time-consuming and has a deadline.
스레드 복사
또한 기존 스레드를 복사 (즉, “포크(fork)“)할 수도 있습니다.
이렇게 하면 기존 스레드의 기록은 그대로 유지하면서, 원본 스레드에는 영향을 주지 않는 독립적인 실행을 생성할 수 있습니다.
# 스레드 복사하기
copied_thread = await client.threads.copy(thread["thread_id"])# 복사된 스레드의 상태를 확인하십시오
copied_thread_state = await client.threads.get_state(copied_thread["thread_id"])
for m in convert_to_messages(copied_thread_state["values"]["messages"]):
m.pretty_print()================================[1m Human Message [0m=================================
Give me a summary of all ToDos.
==================================[1m Ai Message [0m==================================
Here's a summary of your current ToDo list:
1. **Task:** Finish booking travel to Hong Kong
- **Status:** Not started
- **Deadline:** November 22, 2024
- **Solutions:**
- Check flight prices on Skyscanner
- Book hotel through Booking.com
- Arrange airport transfer
- **Estimated Time to Complete:** 120 minutes
2. **Task:** Call parents back about Thanksgiving plans
- **Status:** Not started
- **Deadline:** None
- **Solutions:**
- Check calendar for availability
- Discuss travel arrangements
- Confirm dinner plans
- **Estimated Time to Complete:** 15 minutes
Let me know if there's anything else you'd like to do with your ToDo list!
================================[1m Human Message [0m=================================
What ToDo should I focus on first.
==================================[1m Ai Message [0m==================================
You might want to focus on "Call parents back about Thanksgiving plans" first. It has a shorter estimated time to complete (15 minutes) and doesn't have a specific deadline, so it could be a quick task to check off your list. Once that's done, you can dedicate more time to "Finish booking travel to Hong Kong," which is more time-consuming and has a deadline.
휴먼-인-더-루프 (Human in the loop)
모듈 3에서 휴먼-인-더-루프에 대해 다루었으며, 서버는 우리가 논의했던 모든 관련 기능을 지원합니다.
예를 들어, 이전의 어떤 체크포인트에서든 그래프 실행을 검색하고, 편집하며, 이어서 계속 진행할 수 있습니다.
# 스레드의 기록을 가져오기
states = await client.threads.get_history(thread["thread_id"])
# 포크할 상태 업데이트를 선택하세요
to_fork = states[-2]
to_fork["values"]{'messages': [{'content': 'Give me a summary of all ToDos.',
'additional_kwargs': {'example': False,
'additional_kwargs': {},
'response_metadata': {}},
'response_metadata': {},
'type': 'human',
'name': None,
'id': '3680da45-e3a5-4a47-b5b1-4fd4d3e8baf9',
'example': False}]}
to_fork["values"]["messages"][0]["id"]'3680da45-e3a5-4a47-b5b1-4fd4d3e8baf9'
to_fork["next"]['task_mAIstro']
to_fork["checkpoint_id"]'1efa2c00-6609-67ff-8000-491b1dcf8129'
이제 상태(state)를 수정해 보겠습니다. messages에 적용된 리듀서(reducer)가 어떻게 작동하는지 기억해 보세요:
- 메시지 ID를 제공하지 않으면, 메시지는 (기존 목록에) 추가(append)됩니다.
- 상태에 추가하는 대신, 메시지 ID를 제공하여 기존 메시지를 덮어쓸 수 있습니다!
forked_input = {
"messages": HumanMessage(
content="다음 주 내에 해야 할 모든 To-Do(할 일)들을 요약해줘.",
# 기존 메시지를 덮어쓰기 위해 ID를 지정합니다.
id=to_fork["values"]["messages"][0]["id"],
)
}
# 상태를 업데이트하여 스레드에 새로운 체크포인트를 생성합니다.
forked_config = await client.threads.update_state(
thread["thread_id"], forked_input, checkpoint_id=to_fork["checkpoint_id"]
)# 스레드에 있는 새로운 체크포인트부터 그래프를 실행합니다.
async for chunk in client.runs.stream(
thread["thread_id"],
graph_name,
input=None, # 입력은 이미 체크포인트의 상태에 포함되어 있습니다.
config=config,
checkpoint_id=forked_config["checkpoint_id"],
stream_mode="messages-tuple",
):
# 이벤트가 'messages'인 경우, 데이터 조각(chunk)의 내용을 실시간으로 출력합니다.
if chunk.event == "messages":
print(
"".join(
data_item["content"]
for data_item in chunk.data
if "content" in data_item
),
end="",
flush=True,
)Here's a summary of your ToDos that need to be done in the next week:
1. **Finish booking travel to Hong Kong**
- **Status:** Not started
- **Deadline:** November 22, 2024
- **Solutions:**
- Check flight prices on Skyscanner
- Book hotel through Booking.com
- Arrange airport transfer
- **Estimated Time to Complete:** 120 minutes
It looks like this task is due soon, so you might want to prioritize it. Let me know if there's anything else you need help with!
스레드 간 메모리 (Across-thread memory)
모듈 5에서 LangGraph 메모리 스토어(store)를 사용하여 여러 스레드에 걸쳐 정보를 저장하는 방법에 대해 다루었습니다.
우리가 배포한 task_maistro 그래프는 스토어(store)를 사용하여, user_id를 네임스페이스(namespace)로 지정해 To-Do(할 일)와 같은 정보를 저장합니다.
우리의 배포 환경에는 Postgres 데이터베이스가 포함되어 있으며, 이 데이터베이스가 이러한 장기(스레드 간) 메모리를 저장합니다.
LangGraph SDK를 사용하면 우리의 배포 환경에서 스토어(store)와 상호작용하기 위한 여러 메서드를 사용할 수 있습니다.
항목 검색하기
task_maistro 그래프는 스토어(store)를 사용하여 To-Do 항목들을 저장하며, 이때 기본적으로 (todo, todo_category, user_id) 튜플로 네임스페이스를 지정합니다.
todo_category는 (deployment/configuration.py 파일에서 확인할 수 있듯이) 기본적으로 general로 설정됩니다.
모든 To-Do 항목을 검색하려면 이 튜플을 제공하기만 하면 됩니다.
items = await client.store.search_items(
("todo", "general", "Test"),
limit=5,
offset=0,
)
items["items"][{'value': {'task': 'Finish booking travel to Hong Kong',
'status': 'not started',
'deadline': '2024-11-22T23:59:59',
'solutions': ['Check flight prices on Skyscanner',
'Book hotel through Booking.com',
'Arrange airport transfer'],
'time_to_complete': 120},
'key': '18524803-c182-49de-9b10-08ccb0a06843',
'namespace': ['todo', 'general', 'Test'],
'created_at': '2024-11-14T19:37:41.664827+00:00',
'updated_at': '2024-11-14T19:37:41.664827+00:00'},
{'value': {'task': 'Call parents back about Thanksgiving plans',
'status': 'not started',
'deadline': None,
'solutions': ['Check calendar for availability',
'Discuss travel arrangements',
'Confirm dinner plans'],
'time_to_complete': 15},
'key': '375d9596-edf8-4de2-985b-bacdc623d6ef',
'namespace': ['todo', 'general', 'Test'],
'created_at': '2024-11-14T19:37:41.664827+00:00',
'updated_at': '2024-11-14T19:37:41.664827+00:00'}]
항목 추가하기
우리가 만든 그래프에서는 put을 호출하여 스토어(store)에 항목을 추가합니다.
만약 그래프 외부에서 스토어에 직접 항목을 추가하고 싶다면, SDK의 put 메서드를 사용할 수 있습니다.
from uuid import uuid4
await client.store.put_item(
("testing", "Test"),
key=str(uuid4()),
value={"todo": "SDK 테스트 put_item"},
)items = await client.store.search_items(
("testing", "Test"),
limit=5,
offset=0,
)
items["items"][{'value': {'todo': 'Test SDK put_item'},
'key': '3de441ba-8c79-4beb-8f52-00e4dcba68d4',
'namespace': ['testing', 'Test'],
'created_at': '2024-11-14T19:56:30.452808+00:00',
'updated_at': '2024-11-14T19:56:30.452808+00:00'},
{'value': {'todo': 'Test SDK put_item'},
'key': '09b9a869-4406-47c5-a635-4716bd79a8b3',
'namespace': ['testing', 'Test'],
'created_at': '2024-11-14T19:53:24.812558+00:00',
'updated_at': '2024-11-14T19:53:24.812558+00:00'}]
항목 삭제하기
SDK를 사용하여 키(key)를 이용해 스토어(store)에서 항목을 삭제할 수 있습니다.
[item["key"] for item in items["items"]]['3de441ba-8c79-4beb-8f52-00e4dcba68d4',
'09b9a869-4406-47c5-a635-4716bd79a8b3']
await client.store.delete_item(
("testing", "Test"),
key="3de441ba-8c79-4beb-8f52-00e4dcba68d4",
)items = await client.store.search_items(
("testing", "Test"),
limit=5,
offset=0,
)
items["items"][{'value': {'todo': 'Test SDK put_item'},
'key': '09b9a869-4406-47c5-a635-4716bd79a8b3',
'namespace': ['testing', 'Test'],
'created_at': '2024-11-14T19:53:24.812558+00:00',
'updated_at': '2024-11-14T19:53:24.812558+00:00'}]