개요
MCP (Model Context Protocol) Server의 성능을 최적화하여 응답 속도를 개선하고 토큰 비용을 절감하는 방법에 대한 연구 기록입니다.
Ch 03. MCP Server 성능 최적화
01. MCP 성능 저하의 주범: Latency와 LLM에 맞지 않는 Response Context(Token)
문제 정의 및 영향
Latency 문제의 근본 원인
MCP 서버는 전통적인 웹 서비스와 근본적으로 다른 성능 특성을 가집니다. 사용자가 클릭 후 대기하는 패턴과 달리, AI 모델은 대화 한 번에 수십 개의 요청을 병렬로 발생시키며, 순차적인 체인을 만들어 각 응답이 다음 요청에 의존하게 됩니다.
실제 측정 데이터:
- 네트워크 지연 (Network Latency): 100-300ms
- AI 인프라와 MCP 서버 간 물리적 거리
- Anthropic의 인프라는 북미 기반 (US-East 배치 시 최적)
- 유럽/아시아 배치 시 100-300ms 추가 지연
- 데이터베이스 쿼리 지연: 50ms ~ 수 초
- 최적화되지 않은 쿼리
- 커넥션 풀 부재
- 인덱스 미사용
- 직렬 요청 체인의 증폭 효과:
- 단일 요청 200ms × 5회 = 1초
- 병렬화 시 68% 감소 가능 (실제 이커머스 사례)
Token 사용량의 심각한 영향
리서치 결과, 일반적인 Claude Code 세션에서:
- MCP 도구 정의가 24%의 컨텍스트 윈도우 소비 (200k 중 47.9k 토큰)
- 시스템 도구가 추가로 9% 소비 (17.3k 토큰)
- 실제 대화 시작 전 59%의 컨텍스트 소비
- 대화와 추론에 사용 가능한 공간은 82k 토큰에 불과
비용 영향:
- 최적화 전: 3개 쿼리에 $0.42
- 최적화 후: 3개 쿼리에 $0.01 이상
- 93-98% 비용 절감 (실제 테마파크 API MCP 사례)
확장성 한계:
- 100개 이상의 도구를 가진 에이전트는 심각한 정확도 저하
- “hello” 같은 단순 프롬프트가 46,000+ 토큰 소비
- 에이전트는 실질적으로 2-3개의 MCP 서버만 연결 가능
구체적인 측정 방법
토큰 카운팅 구현:
import tiktoken
def count_tokens(text: str) -> int:
"""토큰 수를 정확히 계산"""
encoding = tiktoken.get_encoding("cl100k_base")
return len(encoding.encode(text))
# API 응답 측정
import json
from fastmcp import FastMCP
mcp = FastMCP("Token Measurement")
@mcp.tool()
def get_accounts(budget_id: str) -> dict:
"""계정 목록 반환 (토큰 측정 포함)"""
# 원시 API 응답
raw_response = api.get_accounts(budget_id)
raw_json = json.dumps(raw_response, indent=2)
raw_tokens = count_tokens(raw_json)
# 필터링된 응답
filtered_response = [
{
"id": account.id,
"name": account.name,
"type": account.type,
"balance": account.balance / 1000,
"on_budget": account.on_budget,
"closed": account.closed
}
for account in raw_response.accounts
if not account.deleted
]
filtered_json = json.dumps(filtered_response, indent=2)
filtered_tokens = count_tokens(filtered_json)
# 감소율 계산
reduction = ((raw_tokens - filtered_tokens) / raw_tokens) * 100
print(f"원시: {raw_tokens} 토큰")
print(f"필터링: {filtered_tokens} 토큰")
print(f"감소율: {reduction:.1f}%")
return filtered_responseLatency 측정 구현:
import time
from contextlib import asynccontextmanager
from prometheus_client import Histogram, Counter
# Prometheus 메트릭 정의
request_latency = Histogram(
'mcp_request_duration_seconds',
'MCP 요청 처리 시간',
['tool_name', 'cache_hit']
)
cache_hits = Counter(
'mcp_cache_hits_total',
'캐시 히트 수',
['cache_level']
)
@mcp.tool()
async def measured_operation(data: str) -> dict:
"""지연 시간 측정이 포함된 작업"""
start_time = time.time()
try:
result = await perform_operation(data)
# 메트릭 기록
duration = time.time() - start_time
request_latency.labels(
tool_name='measured_operation',
cache_hit='false'
).observe(duration)
return result
except Exception as e:
duration = time.time() - start_time
print(f"오류 발생 (소요 시간: {duration:.3f}s): {str(e)}")
raise전체 워크플로우 측정:
def benchmark_workflow():
"""완전한 워크플로우의 토큰 사용량 측정"""
start = time.time()
# 단계 1: 계정 조회
accounts = get_accounts("budget_123")
accounts_tokens = count_tokens(json.dumps(accounts))
# 단계 2: 카테고리 조회
categories = get_categories("budget_123")
categories_tokens = count_tokens(json.dumps(categories))
# 단계 3: 요약 생성
summary = get_summary("budget_123", "2024-01")
summary_tokens = count_tokens(json.dumps(summary))
# 총 측정
total_tokens = accounts_tokens + categories_tokens + summary_tokens
total_time = time.time() - start
print(f"\n=== 워크플로우 벤치마크 ===")
print(f"계정: {accounts_tokens} 토큰")
print(f"카테고리: {categories_tokens} 토큰")
print(f"요약: {summary_tokens} 토큰")
print(f"총 토큰: {total_tokens}")
print(f"총 시간: {total_time:.2f}초")
print(f"평균 응답 시간: {total_time/3:.2f}초")실무 적용 팁
- 베이스라인 먼저 측정: 최적화 전에 현재 성능을 정확히 측정
- 핵심 워크플로우 식별: 20%의 도구가 80%의 요청을 처리하는 패턴 파악
- 토큰 비용 추적: GPT-4o 기준 출력 0.01/1K 토큰
- 지속적 모니터링: Prometheus + Grafana로 실시간 대시보드 구축
02. MCP 성능 향상: Latency 줄이기
2.1 JSON 응답 최적화
문제: 불필요한 필드가 페이로드와 토큰을 증가시킵니다.
해결 방법: 필수 필드만 반환하도록 응답 필터링
Before (최적화 전):
from fastmcp import FastMCP
mcp = FastMCP("Unoptimized Server")
@mcp.tool()
def get_user(user_id: str) -> dict:
"""사용자 정보 조회 - 최적화 전"""
user = db.query(
"SELECT * FROM users WHERE id = ?",
[user_id]
)
# 모든 18개 필드 반환
return dict(user)
# 결과:
# {
# "id": 12345,
# "first_name": "John",
# "middle_name": null,
# "last_name": "Doe",
# "email": "[email protected]",
# "phone": "+1234567890",
# "alternate_phone": null,
# "address_line_1": "123 Main St",
# "address_line_2": "Apt 4B",
# "city": "Springfield",
# "state": "IL",
# "zip": "62701",
# "country": "USA",
# "created_at": "2024-01-15T10:30:00Z",
# "updated_at": "2024-06-20T14:22:00Z",
# "last_login": "2024-11-08T09:15:00Z",
# "login_count": 142,
# "is_deleted": false
# }
# 토큰 수: ~350 토큰After (최적화 후):
@mcp.tool()
def get_user_optimized(user_id: str) -> dict:
"""사용자 정보 조회 - 최적화 후"""
user = db.query(
"""SELECT id, first_name, last_name, email
FROM users WHERE id = ? AND is_deleted = false""",
[user_id]
)
# 필수 필드만 반환
return {
"id": user.id,
"name": f"{user.first_name} {user.last_name}",
"email": user.email
}
# 결과:
# {
# "id": 12345,
# "name": "John Doe",
# "email": "[email protected]"
# }
# 토큰 수: ~70 토큰
# 감소율: 80%성능 개선: 60-80% 페이로드 감소
2.2 데이터베이스 커넥션 풀링
문제: 매 요청마다 새로운 데이터베이스 연결 생성은 큰 오버헤드
해결 방법: 커넥션 풀 사용
Before:
import sqlite3
@mcp.tool()
async def query_database_slow(query: str) -> list:
"""느린 데이터베이스 쿼리 - 매번 새 연결"""
# 매 요청마다 새 연결 생성 (50-100ms 오버헤드)
conn = sqlite3.connect('database.db')
cursor = conn.cursor()
cursor.execute(query)
results = cursor.fetchall()
conn.close()
return results
# 평균 응답 시간: 650ms
# 변동성: 매우 높음 (500-800ms)After:
import asyncpg
# 애플리케이션 시작 시 한 번 생성
db_pool = None
async def init_db():
"""데이터베이스 풀 초기화"""
global db_pool
db_pool = await asyncpg.create_pool(
dsn='postgresql://user:pass@localhost/db',
min_size=10, # 최소 연결 수
max_size=50, # 최대 연결 수
max_queries=50000,
max_inactive_connection_lifetime=300
)
@mcp.tool()
async def query_database_fast(query: str) -> list:
"""빠른 데이터베이스 쿼리 - 커넥션 풀 사용"""
# 풀에서 연결 재사용 (~1-2ms 오버헤드)
async with db_pool.acquire() as conn:
results = await conn.fetch(query)
return [dict(row) for row in results]
# 평균 응답 시간: 50ms (13배 개선)
# 변동성: 매우 낮음 (45-55ms)
# 데이터베이스 부하: 70% 감소성능 개선:
- 13배 성능 향상 (650ms → 50ms)
- 안정적이고 예측 가능한 응답 시간
- 70% 데이터베이스 부하 감소
2.3 멀티레벨 캐싱 전략
문제: 반복적인 데이터 요청이 서버와 DB에 부하를 줌
해결 방법: L1(메모리) + L2(Redis) + L3(DB) 캐시 계층
구현:
from functools import lru_cache
from typing import Any, Dict
import redis
import time
import json
# Redis 클라이언트 초기화
redis_client = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True
)
class MultiLevelCache:
"""3단계 캐시 관리자"""
def __init__(self):
# L1: 인메모리 캐시 (가장 빠름, 작은 용량)
self.l1_cache: Dict[str, tuple[Any, float]] = {}
self.cache_stats = {
"l1_hits": 0,
"l2_hits": 0,
"l3_hits": 0,
"misses": 0
}
async def get(self, key: str, ttl: int = 300) -> Any:
"""캐시에서 값 조회 (L1 → L2 → L3)"""
# L1 체크: 인메모리 (~2.3ms)
if key in self.l1_cache:
value, expiry = self.l1_cache[key]
if time.time() < expiry:
self.cache_stats["l1_hits"] += 1
return value
else:
del self.l1_cache[key]
# L2 체크: Redis (~8.7ms)
redis_value = redis_client.get(key)
if redis_value:
self.cache_stats["l2_hits"] += 1
value = json.loads(redis_value)
# L1에도 저장
self.l1_cache[key] = (value, time.time() + ttl)
return value
# 캐시 미스
self.cache_stats["misses"] += 1
return None
async def set(self, key: str, value: Any, ttl: int = 300):
"""캐시에 값 저장 (L1 + L2)"""
expiry = time.time() + ttl
# L1에 저장
self.l1_cache[key] = (value, expiry)
# L2에 저장
redis_client.setex(key, ttl, json.dumps(value))
def get_stats(self) -> dict:
"""캐시 통계 반환"""
total = sum(self.cache_stats.values())
if total == 0:
return self.cache_stats
return {
**self.cache_stats,
"hit_rate": (
self.cache_stats["l1_hits"] +
self.cache_stats["l2_hits"] +
self.cache_stats["l3_hits"]
) / total * 100
}
# 캐시 인스턴스 생성
cache = MultiLevelCache()
@mcp.tool()
async def get_stock_price(symbol: str) -> dict:
"""주식 가격 조회 (5분 캐시)"""
cache_key = f"stock:{symbol}"
# 캐시 확인
cached = await cache.get(cache_key, ttl=300)
if cached:
print(f"캐시 히트: {symbol}")
return {**cached, "cached": True}
# API 호출 (캐시 미스)
print(f"API 호출: {symbol}")
result = await fetch_stock_api(symbol)
# 캐시에 저장
await cache.set(cache_key, result, ttl=300)
return {**result, "cached": False}
@mcp.tool()
async def get_cache_stats() -> dict:
"""캐시 통계 조회"""
return cache.get_stats()Before/After 비교:
| 시나리오 | Before | After | 개선율 |
|---|---|---|---|
| 콜드 데이터 (첫 요청) | 120ms | 120ms | - |
| L1 히트 (빈번한 접근) | 120ms | 2.3ms | 98% |
| L2 히트 (공유 캐시) | 120ms | 8.7ms | 93% |
| 데이터베이스 부하 | 100% | 30% | 70% 감소 |
TypeScript 버전:
import NodeCache from 'node-cache';
import Redis from 'ioredis';
// L1: 인메모리 캐시 (5분 TTL)
const memCache = new NodeCache({ stdTTL: 300, checkperiod: 60 });
// L2: Redis
const redis = new Redis();
async function getCached<T>(
key: string,
fetcher: () => Promise<T>,
ttl: number = 300
): Promise<T> {
// L1 체크
const cached = memCache.get<T>(key);
if (cached) {
console.log(`L1 히트: ${key}`);
return cached;
}
// L2 체크
const redisData = await redis.get(key);
if (redisData) {
console.log(`L2 히트: ${key}`);
const parsed = JSON.parse(redisData) as T;
memCache.set(key, parsed);
return parsed;
}
// 데이터 패치 및 캐시
console.log(`캐시 미스: ${key}`);
const data = await fetcher();
await redis.setex(key, ttl, JSON.stringify(data));
memCache.set(key, data);
return data;
}
// 사용 예시
server.registerTool(
'get-user-data',
{
inputSchema: { userId: z.string() }
},
async ({ userId }) => {
const userData = await getCached(
`user:${userId}`,
async () => {
const result = await dbPool.query(
'SELECT * FROM users WHERE id = $1',
[userId]
);
return result.rows[0];
},
300
);
return {
content: [{ type: 'text', text: JSON.stringify(userData) }],
structuredContent: userData
};
}
);2.4 병렬 처리 최적화
문제: 순차적 실행이 지연 시간을 누적시킴
해결 방법: 독립적인 작업을 병렬로 실행
Before (순차 실행):
@mcp.tool()
async def fetch_dashboard_data_slow(user_id: str) -> dict:
"""대시보드 데이터 조회 - 순차 실행"""
# 각 요청이 이전 요청 완료를 기다림
user = await fetch_user(user_id) # 200ms
orders = await fetch_orders(user_id) # 150ms
notifications = await fetch_notifications(user_id) # 100ms
recommendations = await fetch_recommendations(user_id) # 180ms
return {
"user": user,
"orders": orders,
"notifications": notifications,
"recommendations": recommendations
}
# 총 소요 시간: 200 + 150 + 100 + 180 = 630msAfter (병렬 실행):
import asyncio
@mcp.tool()
async def fetch_dashboard_data_fast(user_id: str) -> dict:
"""대시보드 데이터 조회 - 병렬 실행"""
# 모든 요청을 동시에 시작
user, orders, notifications, recommendations = await asyncio.gather(
fetch_user(user_id),
fetch_orders(user_id),
fetch_notifications(user_id),
fetch_recommendations(user_id),
return_exceptions=True # 하나의 실패가 전체를 막지 않음
)
return {
"user": user if not isinstance(user, Exception) else None,
"orders": orders if not isinstance(orders, Exception) else [],
"notifications": notifications if not isinstance(notifications, Exception) else [],
"recommendations": recommendations if not isinstance(recommendations, Exception) else []
}
# 총 소요 시간: max(200, 150, 100, 180) = 200ms
# 개선: 68% 지연 시간 감소실제 사례 (이커머스):
- Before: 800ms (순차)
- After: 250ms (병렬)
- 개선: 68% 감소
- 비즈니스 영향: 사용자 참여도 15% 증가
TypeScript 버전:
server.registerTool(
'fetch-dashboard',
{ inputSchema: { userId: z.string() } },
async ({ userId }) => {
// Promise.all로 병렬 실행
const [user, orders, notifications, recommendations] = await Promise.all([
fetchUser(userId),
fetchOrders(userId),
fetchNotifications(userId),
fetchRecommendations(userId)
].map(p => p.catch(e => ({ error: e.message }))));
return {
content: [{ type: 'text', text: JSON.stringify({
user, orders, notifications, recommendations
}) }]
};
}
);2.5 HTTP 커넥션 풀링
Python:
import aiohttp
# 글로벌 세션 (애플리케이션 수명 동안 재사용)
http_session = None
async def get_http_session():
"""HTTP 세션 싱글톤"""
global http_session
if not http_session:
connector = aiohttp.TCPConnector(
limit=100, # 총 연결 수 제한
limit_per_host=30, # 호스트당 연결 수 제한
ttl_dns_cache=300 # DNS 캐시 5분
)
http_session = aiohttp.ClientSession(connector=connector)
return http_session
@mcp.tool()
async def fetch_api_data(endpoint: str) -> dict:
"""외부 API 호출 - 커넥션 풀 사용"""
session = await get_http_session()
async with session.get(
endpoint,
timeout=aiohttp.ClientTimeout(total=10)
) as response:
return await response.json()성능 개선:
- DNS 조회 제거
- TCP 핸드셰이크 제거
- TLS 협상 제거
- 결과: 반복 요청 시 50-100ms 절감
실무 적용 팁
- 점진적 최적화: 가장 큰 영향을 주는 것부터 시작 (커넥션 풀링 → 캐싱 → 병렬화)
- 캐시 TTL 전략:
- 실시간 데이터: 5-30초
- 사용자 프로필: 5-15분
- 히스토리 데이터: 시간-일
- 모니터링 필수: Prometheus로 각 최적화의 영향 측정
- 에러 처리: 병렬 실행 시
return_exceptions=True사용으로 부분 실패 허용
03. MCP 성능 향상: Response Context(Token) 최적화와 Conversation Hop 줄이기
3.1 필드 필터링 (핵심 기법)
원리: API가 반환하는 모든 필드가 아닌, LLM이 실제로 필요로 하는 필드만 반환
Before (YNAB 예시):
@mcp.tool()
def get_accounts_unfiltered(budget_id: str) -> list:
"""계정 조회 - 모든 필드 반환"""
response = ynab_api.get_accounts(budget_id)
# 18개 필드 모두 반환
return [dict(account) for account in response.data.accounts]
# 47개 계정, 18개 필드
# 토큰 수: 9,960After:
@mcp.tool()
def get_accounts_filtered(budget_id: str) -> list:
"""계정 조회 - 필수 필드만 반환"""
response = ynab_api.get_accounts(budget_id)
return [
{
"id": account.id,
"name": account.name,
"type": account.type,
"balance": account.balance / 1000, # 밀리단위 → 달러
"on_budget": account.on_budget,
"closed": account.closed
}
for account in response.data.accounts
if not account.deleted # 삭제된 항목 필터링
]
# 47개 계정, 6개 필드
# 토큰 수: 3,451
# 감소: 65.4%가이드라인:
- ✅ 사용자 질문에 답하는 데 필요한 필드만
- ✅ 삭제/아카이브된 항목은 소스에서 필터링
- ✅ 내부 참조 ID, 디버그 메타데이터 제거
- ✅ 옵션 파라미터로 상세 정보 제공
3.2 사전 집계 (Pre-Aggregation)
원리: 원시 데이터를 반환하는 대신 서버 측에서 요약 계산
Before (대량 트랜잭션):
@mcp.tool()
def get_spending_raw(category_id: str, start_date: str, end_date: str) -> list:
"""지출 내역 조회 - 원시 데이터"""
transactions = api.get_transactions(category_id, start_date, end_date)
# 1,704개 트랜잭션 반환
return [dict(t) for t in transactions]
# 6개월 데이터
# 토큰 수: 4,890
# 1년 데이터: 746,800 토큰 (컨텍스트 윈도우 초과!)After (사전 집계):
from collections import defaultdict
from datetime import datetime
@mcp.tool()
def get_spending_summary(category_id: str, start_date: str, end_date: str) -> dict:
"""지출 내역 조회 - 사전 집계"""
transactions = api.get_transactions(category_id, start_date, end_date)
# 서버에서 계산
total_spent = sum(t.amount for t in transactions)
monthly_totals = defaultdict(float)
for t in transactions:
month_key = t.date[:7] # YYYY-MM
monthly_totals[month_key] += t.amount
return {
"category_id": category_id,
"date_range": {"start": start_date, "end": end_date},
"total_spent": total_spent,
"transaction_count": len(transactions),
"average_per_month": total_spent / len(monthly_totals) if monthly_totals else 0,
"monthly_breakdown": [
{"month": month, "spent": amount}
for month, amount in sorted(monthly_totals.items())
]
}
# 6개월 데이터
# 토큰 수: 262
# 감소: 94.6%
# 1년 데이터도 동일하게 262 토큰
# 감소: 99.96%성능 비교:
| 데이터 기간 | 원시 데이터 | 사전 집계 | 감소율 |
|---|---|---|---|
| 6개월 (1,704 txn) | 4,890 토큰 | 262 토큰 | 94.6% |
| 1년 (3,456 txn) | 746,800 토큰 | 262 토큰 | 99.96% |
3.3 동적 도구 로딩
문제: 모든 도구를 미리 로드하면 컨텍스트 소비
해결 방법: 대화 컨텍스트에 따라 필요한 도구만 로드
Before:
# 시작 시 모든 114개 도구 로드
mcp = FastMCP("All Tools Server")
# GitHub 도구 50개
@mcp.tool()
def github_create_issue(...): pass
@mcp.tool()
def github_list_repos(...): pass
# ... 48개 더
# Notion 도구 34개
@mcp.tool()
def notion_create_page(...): pass
@mcp.tool()
def notion_query_db(...): pass
# ... 32개 더
# Grafana 도구 30개
@mcp.tool()
def grafana_create_dashboard(...): pass
# ... 29개 더
# "hello" 입력 시 소비: 46,000 토큰
# "List 10 GitHub issues" 실행: 102,000 토큰 (대부분 불필요)After (도구 그룹화):
# 서버를 도메인별로 분리
github_mcp = FastMCP("GitHub Server")
notion_mcp = FastMCP("Notion Server")
grafana_mcp = FastMCP("Grafana Server")
# Claude Desktop 설정
# claude_desktop_config.json
{
"mcpServers": {
"github": {
"command": "python",
"args": ["github_server.py"]
},
"notion": {
"command": "python",
"args": ["notion_server.py"]
},
"grafana": {
"command": "python",
"args": ["grafana_server.py"]
}
}
}
# 사용자가 "List GitHub issues"라고 하면
# GitHub 서버만 활성화 → ~8,000 토큰만 사용
# 감소: 92%MCP Optimizer 패턴:
# RooCode/ToolHive 방식
class LazyToolLoader:
"""요청 시 도구 스펙 로드"""
def __init__(self):
self.loaded_servers = set()
async def request_tools(self, server_name: str):
"""특정 MCP 서버의 도구만 로드"""
if server_name not in self.loaded_servers:
# 도구 스펙 가져오기
specs = await fetch_tool_specs(server_name)
self.loaded_servers.add(server_name)
return specs
return []
# 초기 프롬프트
"""
사용 가능한 MCP 서버: github, notion, grafana
MCP 도구를 사용하려면 다음과 같이 요청하세요:
'Provide tool specifications for: <server-name>'
"""
# 모델이 GitHub 도구가 필요하면:
# "Provide tool specifications for: github"
# → GitHub 도구만 로드, 8,000 토큰3.4 Conversation Hop 줄이기
문제: 여러 번의 왕복 통신이 지연과 토큰 증가
Pattern 1: 복합 도구
Before (3 hops):
# Hop 1: 목적지 조회
@mcp.tool()
def get_destinations() -> list:
"""모든 리조트/파크 반환"""
return api.get_all_destinations() # 거대한 JSON
# Hop 2: 파크 ID로 상세 조회
@mcp.tool()
def get_park_by_id(park_id: str) -> dict:
return api.get_park(park_id)
# Hop 3: 일정 조회
@mcp.tool()
def get_park_schedule(park_id: str, date: str) -> dict:
return api.get_schedule(park_id, date)
# 사용자: "What time does Epcot open tomorrow?"
# Agent:
# 1. get_destinations() → 93,457 토큰
# 2. get_park_by_id("epcot") → 찾아야 함
# 3. get_park_schedule("epcot", "2024-11-09") → 또 찾아야 함
# 총: 3 hops, 130,000+ 토큰After (1 hop):
@mcp.tool()
def get_park_schedule_by_name(park_name: str, date: str) -> dict:
"""파크 이름으로 직접 일정 조회 (1-hop)"""
# 내부적으로:
# 1. 파크 이름으로 ID 찾기
park_id = find_park_id_by_name(park_name)
if not park_id:
return {"error": f"Park '{park_name}' not found"}
# 2. 일정 가져오기
schedule = api.get_schedule(park_id, date)
# 3. 해당 날짜로 필터링
day_schedule = [
s for s in schedule
if s['date'] == date
]
# 4. 필수 정보만 반환
return {
"park": park_name,
"date": date,
"opening_time": day_schedule[0]['open'] if day_schedule else None,
"closing_time": day_schedule[0]['close'] if day_schedule else None
}
# 사용자: "What time does Epcot open tomorrow?"
# Agent: get_park_schedule_by_name("Epcot", "2024-11-09")
# 총: 1 hop, 1,880 토큰
# 감소: 98%Pattern 2: MCP Prompts (워크플로우 체인)
@mcp.prompt()
async def analyze_overspending() -> str:
"""복잡한 워크플로우를 단일 프롬프트로 패키징"""
# 내부적으로 여러 도구 호출
budget = await get_current_month_budget()
spending = await get_spending_by_category()
# 분석 수행
variances = calculate_variances(budget, spending)
overspent = identify_overspent_categories(variances)
# 결과를 구조화된 프롬프트로 반환
return f"""
당신은 재무 분석가입니다. 다음 예산 초과 상황을 분석하세요:
예산: {json.dumps(budget, indent=2)}
실제 지출: {json.dumps(spending, indent=2)}
초과 카테고리: {json.dumps(overspent, indent=2)}
1. 가장 심각한 초과 항목 식별
2. 가능한 원인 설명
3. 개선 제안
"""
# 사용자: "Am I overspending this month?"
# Agent: 단일 프롬프트 호출 → 모든 분석 완료
# 3-4 hops → 1 hopPattern 3: 배치 실행 (MCP BatchIt)
@mcp.tool()
def batch_file_operations(operations: list) -> list:
"""여러 파일 작업을 한 번에 실행"""
# operations: [
# {"tool": "create_directory", "arguments": {"path": "/data"}},
# {"tool": "write_file", "arguments": {"path": "/data/report.txt", "content": "..."}},
# {"tool": "read_file", "arguments": {"path": "/config.json"}}
# ]
results = []
for op in operations:
try:
result = execute_operation(op['tool'], op['arguments'])
results.append({"success": True, "result": result})
except Exception as e:
results.append({"success": False, "error": str(e)})
if op.get('stop_on_error'):
break
return results
# Before: 3개 파일 작업 = 3 hops
# After: 배치 실행 = 1 hop3.5 점진적 공개 (Progressive Disclosure)
원리: 메타데이터를 먼저 로드하고, 필요 시 상세 정보 요청
Pattern 1: 요약 + 캐시 ID
@mcp.tool()
def list_documents() -> dict:
"""문서 목록 - 요약만 반환"""
docs = fetch_all_documents()
# 전체 데이터를 캐시에 저장
cache_id = store_in_cache(docs)
# 요약만 반환
return {
"summary": [
{
"id": doc.id,
"title": doc.title,
"type": doc.type,
"size": len(doc.content)
}
for doc in docs
],
"cache_id": cache_id,
"message": "Use get_document_details(cache_id) for full content"
}
@mcp.tool()
def get_document_details(cache_id: str) -> dict:
"""캐시된 전체 문서 반환"""
return retrieve_from_cache(cache_id)
# 목록 조회: 500 토큰 (96% 절감)
# 필요 시에만 전체 내용 로드Pattern 2: 파일시스템 기반 발견
# 구조:
# bigquery-skill/
# ├── SKILL.md (개요 - 항상 로드)
# └── reference/
# ├── finance.md (필요 시 로드)
# ├── sales.md
# └── product.md
@mcp.resource("skill://bigquery/overview")
def get_skill_overview() -> str:
"""스킬 개요 (핵심 정보만)"""
return read_file("SKILL.md") # 2,000 토큰
# 사용자: "Show me revenue data"
# Claude:
# 1. SKILL.md 읽기 (finance.md 참조 발견)
# 2. reference/finance.md 읽기
# 3. 다른 파일은 로드하지 않음 (0 토큰)
# 효과: 10-15개 참조 파일 중 1-2개만 로드
# 토큰 절감: 85-95%3.6 Complete Before/After 예시
사례: 테마파크 API MCP
Before (최적화 전):
@mcp.tool()
def get_destinations_naive() -> dict:
"""모든 목적지 반환 - 최적화 전"""
# 리조트 중심 구조
return api.get_destinations()
# {
# "destinations": [
# {
# "id": "wdw",
# "name": "Walt Disney World Resort",
# "parks": [
# {"id": "mk", "name": "Magic Kingdom", "schedule": [...]},
# {"id": "epcot", "name": "Epcot", "schedule": [...]},
# ...
# ],
# "hotels": [...],
# "dining": [...]
# },
# ...
# ]
# }
@mcp.tool()
def get_schedule_naive(park_id: str) -> dict:
return api.get_schedule(park_id)
# Query: "What time does Epcot open tomorrow?"
# Execution:
# 1. get_destinations_naive() → 모든 리조트/파크/호텔/식당
# 2. Agent가 JSON에서 Epcot ID 찾기
# 3. get_schedule_naive("epcot") → 1년치 일정
# 4. Agent가 내일 날짜 필터링
# 토큰 사용:
# Query 1: 93,457 토큰
# Query 2: 40,612 토큰
# Query 3: 23,206 토큰
# 비용: $0.42 (3 queries)After (최적화 후):
# 1. JSON 구조 반전: 파크 중심
# 2. 파크 이름으로 필터링
# 3. 특정 날짜로 필터링
@mcp.tool()
def get_park_hours(park_name: str, date: str) -> dict:
"""특정 파크의 특정 날짜 운영 시간"""
# 파크 이름으로 직접 검색
park = find_park_by_name(park_name)
if not park:
return {
"error": f"Park '{park_name}' not found",
"available_parks": ["Magic Kingdom", "Epcot", "Hollywood Studios", "Animal Kingdom"]
}
# 해당 날짜 일정만 조회
schedule = get_schedule_for_date(park.id, date)
return {
"park": park.name,
"date": date,
"opening_time": schedule.open,
"closing_time": schedule.close,
"special_hours": schedule.special_hours if hasattr(schedule, 'special_hours') else None
}
# Query: "What time does Epcot open tomorrow?"
# Execution:
# 1. get_park_hours("Epcot", "2024-11-09") → 직접 답변
# 토큰 사용:
# Query 1: 1,880 토큰 (98.0% 감소)
# Query 2: 1,857 토큰 (95.4% 감소)
# Query 3: 1,594 토큰 (93.1% 감소)
# 비용: $0.01 (3 queries) → 97.6% 절감최적화 체크리스트:
# ✅ 최적화 평가 함수
def evaluate_tool_efficiency(tool_name: str, response: dict) -> dict:
"""도구 효율성 평가"""
response_json = json.dumps(response)
token_count = count_tokens(response_json)
# 평가 기준
evaluation = {
"tool": tool_name,
"tokens": token_count,
"checks": {
"필드_필터링": check_unnecessary_fields(response),
"삭제된_항목_제거": check_deleted_items(response),
"적절한_TTL": check_cache_ttl(tool_name),
"사전_집계": check_aggregation(response),
"1hop_달성": check_single_hop(tool_name)
}
}
# 점수 계산
passed = sum(1 for v in evaluation["checks"].values() if v)
evaluation["score"] = passed / len(evaluation["checks"])
# 권장사항
if evaluation["score"] < 0.8:
evaluation["recommendations"] = generate_recommendations(evaluation["checks"])
return evaluation실무 적용 팁
- 토큰 측정 자동화: 모든 도구에 토큰 카운팅 래퍼 추가
- 베이스라인 설정: 최적화 전 토큰 사용량 기록
- 점진적 개선:
- 1주차: 필드 필터링 (30-65% 절감)
- 2주차: 사전 집계 (90-99% 절감, 대량 데이터)
- 3주차: 복합 도구 + conversation hop 감소
- 4주차: 동적 로딩 + 점진적 공개
- 우선순위: 20%의 도구가 80%의 토큰을 소비 → 이들부터 최적화
- 모니터링: Grafana 대시보드로 도구별 토큰 사용량 추적
- A/B 테스팅: 최적화 전후 사용자 경험 비교
종합 성과 요약
| 최적화 기법 | 구현 복잡도 | 토큰 절감 | Latency 개선 | ROI |
|---|---|---|---|---|
| 필드 필터링 | 낮음 | 30-65% | 10-20% | 매우 높음 |
| 커넥션 풀링 | 낮음 | - | 13배 | 매우 높음 |
| 멀티레벨 캐싱 | 중간 | 40% | 93-98% | 높음 |
| 병렬 처리 | 중간 | - | 68% | 높음 |
| 사전 집계 | 중간 | 90-99% | 20-30% | 매우 높음 |
| 복합 도구 | 중간 | 50-80% | 66% | 높음 |
| 동적 도구 로딩 | 높음 | 70-90% | - | 중간 |
| 점진적 공개 | 높음 | 85-95% | - | 중간 |