개요

MCP (Model Context Protocol) Server의 성능을 최적화하여 응답 속도를 개선하고 토큰 비용을 절감하는 방법에 대한 연구 기록입니다.

Ch 03. MCP Server 성능 최적화

01. MCP 성능 저하의 주범: Latency와 LLM에 맞지 않는 Response Context(Token)

문제 정의 및 영향

Latency 문제의 근본 원인

MCP 서버는 전통적인 웹 서비스와 근본적으로 다른 성능 특성을 가집니다. 사용자가 클릭 후 대기하는 패턴과 달리, AI 모델은 대화 한 번에 수십 개의 요청을 병렬로 발생시키며, 순차적인 체인을 만들어 각 응답이 다음 요청에 의존하게 됩니다.

실제 측정 데이터:

  • 네트워크 지연 (Network Latency): 100-300ms
    • AI 인프라와 MCP 서버 간 물리적 거리
    • Anthropic의 인프라는 북미 기반 (US-East 배치 시 최적)
    • 유럽/아시아 배치 시 100-300ms 추가 지연
  • 데이터베이스 쿼리 지연: 50ms ~ 수 초
    • 최적화되지 않은 쿼리
    • 커넥션 풀 부재
    • 인덱스 미사용
  • 직렬 요청 체인의 증폭 효과:
    • 단일 요청 200ms × 5회 = 1초
    • 병렬화 시 68% 감소 가능 (실제 이커머스 사례)

Token 사용량의 심각한 영향

리서치 결과, 일반적인 Claude Code 세션에서:

  • MCP 도구 정의가 24%의 컨텍스트 윈도우 소비 (200k 중 47.9k 토큰)
  • 시스템 도구가 추가로 9% 소비 (17.3k 토큰)
  • 실제 대화 시작 전 59%의 컨텍스트 소비
  • 대화와 추론에 사용 가능한 공간은 82k 토큰에 불과

비용 영향:

  • 최적화 전: 3개 쿼리에 $0.42
  • 최적화 후: 3개 쿼리에 $0.01 이상
  • 93-98% 비용 절감 (실제 테마파크 API MCP 사례)

확장성 한계:

  • 100개 이상의 도구를 가진 에이전트는 심각한 정확도 저하
  • “hello” 같은 단순 프롬프트가 46,000+ 토큰 소비
  • 에이전트는 실질적으로 2-3개의 MCP 서버만 연결 가능

구체적인 측정 방법

토큰 카운팅 구현:

import tiktoken
 
def count_tokens(text: str) -> int:
    """토큰 수를 정확히 계산"""
    encoding = tiktoken.get_encoding("cl100k_base")
    return len(encoding.encode(text))
 
# API 응답 측정
import json
from fastmcp import FastMCP
 
mcp = FastMCP("Token Measurement")
 
@mcp.tool()
def get_accounts(budget_id: str) -> dict:
    """계정 목록 반환 (토큰 측정 포함)"""
    # 원시 API 응답
    raw_response = api.get_accounts(budget_id)
    raw_json = json.dumps(raw_response, indent=2)
    raw_tokens = count_tokens(raw_json)
    
    # 필터링된 응답
    filtered_response = [
        {
            "id": account.id,
            "name": account.name,
            "type": account.type,
            "balance": account.balance / 1000,
            "on_budget": account.on_budget,
            "closed": account.closed
        }
        for account in raw_response.accounts 
        if not account.deleted
    ]
    filtered_json = json.dumps(filtered_response, indent=2)
    filtered_tokens = count_tokens(filtered_json)
    
    # 감소율 계산
    reduction = ((raw_tokens - filtered_tokens) / raw_tokens) * 100
    
    print(f"원시: {raw_tokens} 토큰")
    print(f"필터링: {filtered_tokens} 토큰")
    print(f"감소율: {reduction:.1f}%")
    
    return filtered_response

Latency 측정 구현:

import time
from contextlib import asynccontextmanager
from prometheus_client import Histogram, Counter
 
# Prometheus 메트릭 정의
request_latency = Histogram(
    'mcp_request_duration_seconds',
    'MCP 요청 처리 시간',
    ['tool_name', 'cache_hit']
)
 
cache_hits = Counter(
    'mcp_cache_hits_total',
    '캐시 히트 수',
    ['cache_level']
)
 
@mcp.tool()
async def measured_operation(data: str) -> dict:
    """지연 시간 측정이 포함된 작업"""
    start_time = time.time()
    
    try:
        result = await perform_operation(data)
        
        # 메트릭 기록
        duration = time.time() - start_time
        request_latency.labels(
            tool_name='measured_operation',
            cache_hit='false'
        ).observe(duration)
        
        return result
    except Exception as e:
        duration = time.time() - start_time
        print(f"오류 발생 (소요 시간: {duration:.3f}s): {str(e)}")
        raise

전체 워크플로우 측정:

def benchmark_workflow():
    """완전한 워크플로우의 토큰 사용량 측정"""
    start = time.time()
    
    # 단계 1: 계정 조회
    accounts = get_accounts("budget_123")
    accounts_tokens = count_tokens(json.dumps(accounts))
    
    # 단계 2: 카테고리 조회
    categories = get_categories("budget_123")
    categories_tokens = count_tokens(json.dumps(categories))
    
    # 단계 3: 요약 생성
    summary = get_summary("budget_123", "2024-01")
    summary_tokens = count_tokens(json.dumps(summary))
    
    # 총 측정
    total_tokens = accounts_tokens + categories_tokens + summary_tokens
    total_time = time.time() - start
    
    print(f"\n=== 워크플로우 벤치마크 ===")
    print(f"계정: {accounts_tokens} 토큰")
    print(f"카테고리: {categories_tokens} 토큰")
    print(f"요약: {summary_tokens} 토큰")
    print(f"총 토큰: {total_tokens}")
    print(f"총 시간: {total_time:.2f}초")
    print(f"평균 응답 시간: {total_time/3:.2f}초")

실무 적용 팁

  1. 베이스라인 먼저 측정: 최적화 전에 현재 성능을 정확히 측정
  2. 핵심 워크플로우 식별: 20%의 도구가 80%의 요청을 처리하는 패턴 파악
  3. 토큰 비용 추적: GPT-4o 기준 출력 0.01/1K 토큰
  4. 지속적 모니터링: Prometheus + Grafana로 실시간 대시보드 구축

02. MCP 성능 향상: Latency 줄이기

2.1 JSON 응답 최적화

문제: 불필요한 필드가 페이로드와 토큰을 증가시킵니다.

해결 방법: 필수 필드만 반환하도록 응답 필터링

Before (최적화 전):

from fastmcp import FastMCP
 
mcp = FastMCP("Unoptimized Server")
 
@mcp.tool()
def get_user(user_id: str) -> dict:
    """사용자 정보 조회 - 최적화 전"""
    user = db.query(
        "SELECT * FROM users WHERE id = ?", 
        [user_id]
    )
    # 모든 18개 필드 반환
    return dict(user)
 
# 결과:
# {
#   "id": 12345,
#   "first_name": "John",
#   "middle_name": null,
#   "last_name": "Doe",
#   "email": "[email protected]",
#   "phone": "+1234567890",
#   "alternate_phone": null,
#   "address_line_1": "123 Main St",
#   "address_line_2": "Apt 4B",
#   "city": "Springfield",
#   "state": "IL",
#   "zip": "62701",
#   "country": "USA",
#   "created_at": "2024-01-15T10:30:00Z",
#   "updated_at": "2024-06-20T14:22:00Z",
#   "last_login": "2024-11-08T09:15:00Z",
#   "login_count": 142,
#   "is_deleted": false
# }
# 토큰 수: ~350 토큰

After (최적화 후):

@mcp.tool()
def get_user_optimized(user_id: str) -> dict:
    """사용자 정보 조회 - 최적화 후"""
    user = db.query(
        """SELECT id, first_name, last_name, email 
           FROM users WHERE id = ? AND is_deleted = false""", 
        [user_id]
    )
    # 필수 필드만 반환
    return {
        "id": user.id,
        "name": f"{user.first_name} {user.last_name}",
        "email": user.email
    }
 
# 결과:
# {
#   "id": 12345,
#   "name": "John Doe",
#   "email": "[email protected]"
# }
# 토큰 수: ~70 토큰
# 감소율: 80%

성능 개선: 60-80% 페이로드 감소

2.2 데이터베이스 커넥션 풀링

문제: 매 요청마다 새로운 데이터베이스 연결 생성은 큰 오버헤드

해결 방법: 커넥션 풀 사용

Before:

import sqlite3
 
@mcp.tool()
async def query_database_slow(query: str) -> list:
    """느린 데이터베이스 쿼리 - 매번 새 연결"""
    # 매 요청마다 새 연결 생성 (50-100ms 오버헤드)
    conn = sqlite3.connect('database.db')
    cursor = conn.cursor()
    cursor.execute(query)
    results = cursor.fetchall()
    conn.close()
    return results
 
# 평균 응답 시간: 650ms
# 변동성: 매우 높음 (500-800ms)

After:

import asyncpg
 
# 애플리케이션 시작 시 한 번 생성
db_pool = None
 
async def init_db():
    """데이터베이스 풀 초기화"""
    global db_pool
    db_pool = await asyncpg.create_pool(
        dsn='postgresql://user:pass@localhost/db',
        min_size=10,      # 최소 연결 수
        max_size=50,      # 최대 연결 수
        max_queries=50000,
        max_inactive_connection_lifetime=300
    )
 
@mcp.tool()
async def query_database_fast(query: str) -> list:
    """빠른 데이터베이스 쿼리 - 커넥션 풀 사용"""
    # 풀에서 연결 재사용 (~1-2ms 오버헤드)
    async with db_pool.acquire() as conn:
        results = await conn.fetch(query)
        return [dict(row) for row in results]
 
# 평균 응답 시간: 50ms (13배 개선)
# 변동성: 매우 낮음 (45-55ms)
# 데이터베이스 부하: 70% 감소

성능 개선:

  • 13배 성능 향상 (650ms → 50ms)
  • 안정적이고 예측 가능한 응답 시간
  • 70% 데이터베이스 부하 감소

2.3 멀티레벨 캐싱 전략

문제: 반복적인 데이터 요청이 서버와 DB에 부하를 줌

해결 방법: L1(메모리) + L2(Redis) + L3(DB) 캐시 계층

구현:

from functools import lru_cache
from typing import Any, Dict
import redis
import time
import json
 
# Redis 클라이언트 초기화
redis_client = redis.Redis(
    host='localhost', 
    port=6379, 
    db=0,
    decode_responses=True
)
 
class MultiLevelCache:
    """3단계 캐시 관리자"""
    
    def __init__(self):
        # L1: 인메모리 캐시 (가장 빠름, 작은 용량)
        self.l1_cache: Dict[str, tuple[Any, float]] = {}
        self.cache_stats = {
            "l1_hits": 0,
            "l2_hits": 0,
            "l3_hits": 0,
            "misses": 0
        }
    
    async def get(self, key: str, ttl: int = 300) -> Any:
        """캐시에서 값 조회 (L1 → L2 → L3)"""
        
        # L1 체크: 인메모리 (~2.3ms)
        if key in self.l1_cache:
            value, expiry = self.l1_cache[key]
            if time.time() < expiry:
                self.cache_stats["l1_hits"] += 1
                return value
            else:
                del self.l1_cache[key]
        
        # L2 체크: Redis (~8.7ms)
        redis_value = redis_client.get(key)
        if redis_value:
            self.cache_stats["l2_hits"] += 1
            value = json.loads(redis_value)
            # L1에도 저장
            self.l1_cache[key] = (value, time.time() + ttl)
            return value
        
        # 캐시 미스
        self.cache_stats["misses"] += 1
        return None
    
    async def set(self, key: str, value: Any, ttl: int = 300):
        """캐시에 값 저장 (L1 + L2)"""
        expiry = time.time() + ttl
        
        # L1에 저장
        self.l1_cache[key] = (value, expiry)
        
        # L2에 저장
        redis_client.setex(key, ttl, json.dumps(value))
    
    def get_stats(self) -> dict:
        """캐시 통계 반환"""
        total = sum(self.cache_stats.values())
        if total == 0:
            return self.cache_stats
        
        return {
            **self.cache_stats,
            "hit_rate": (
                self.cache_stats["l1_hits"] + 
                self.cache_stats["l2_hits"] + 
                self.cache_stats["l3_hits"]
            ) / total * 100
        }
 
# 캐시 인스턴스 생성
cache = MultiLevelCache()
 
@mcp.tool()
async def get_stock_price(symbol: str) -> dict:
    """주식 가격 조회 (5분 캐시)"""
    cache_key = f"stock:{symbol}"
    
    # 캐시 확인
    cached = await cache.get(cache_key, ttl=300)
    if cached:
        print(f"캐시 히트: {symbol}")
        return {**cached, "cached": True}
    
    # API 호출 (캐시 미스)
    print(f"API 호출: {symbol}")
    result = await fetch_stock_api(symbol)
    
    # 캐시에 저장
    await cache.set(cache_key, result, ttl=300)
    
    return {**result, "cached": False}
 
@mcp.tool()
async def get_cache_stats() -> dict:
    """캐시 통계 조회"""
    return cache.get_stats()

Before/After 비교:

시나리오BeforeAfter개선율
콜드 데이터 (첫 요청)120ms120ms-
L1 히트 (빈번한 접근)120ms2.3ms98%
L2 히트 (공유 캐시)120ms8.7ms93%
데이터베이스 부하100%30%70% 감소

TypeScript 버전:

import NodeCache from 'node-cache';
import Redis from 'ioredis';
 
// L1: 인메모리 캐시 (5분 TTL)
const memCache = new NodeCache({ stdTTL: 300, checkperiod: 60 });
 
// L2: Redis
const redis = new Redis();
 
async function getCached<T>(
  key: string,
  fetcher: () => Promise<T>,
  ttl: number = 300
): Promise<T> {
  // L1 체크
  const cached = memCache.get<T>(key);
  if (cached) {
    console.log(`L1 히트: ${key}`);
    return cached;
  }
  
  // L2 체크
  const redisData = await redis.get(key);
  if (redisData) {
    console.log(`L2 히트: ${key}`);
    const parsed = JSON.parse(redisData) as T;
    memCache.set(key, parsed);
    return parsed;
  }
  
  // 데이터 패치 및 캐시
  console.log(`캐시 미스: ${key}`);
  const data = await fetcher();
  await redis.setex(key, ttl, JSON.stringify(data));
  memCache.set(key, data);
  return data;
}
 
// 사용 예시
server.registerTool(
  'get-user-data',
  {
    inputSchema: { userId: z.string() }
  },
  async ({ userId }) => {
    const userData = await getCached(
      `user:${userId}`,
      async () => {
        const result = await dbPool.query(
          'SELECT * FROM users WHERE id = $1',
          [userId]
        );
        return result.rows[0];
      },
      300
    );
    
    return {
      content: [{ type: 'text', text: JSON.stringify(userData) }],
      structuredContent: userData
    };
  }
);

2.4 병렬 처리 최적화

문제: 순차적 실행이 지연 시간을 누적시킴

해결 방법: 독립적인 작업을 병렬로 실행

Before (순차 실행):

@mcp.tool()
async def fetch_dashboard_data_slow(user_id: str) -> dict:
    """대시보드 데이터 조회 - 순차 실행"""
    # 각 요청이 이전 요청 완료를 기다림
    user = await fetch_user(user_id)           # 200ms
    orders = await fetch_orders(user_id)        # 150ms
    notifications = await fetch_notifications(user_id)  # 100ms
    recommendations = await fetch_recommendations(user_id)  # 180ms
    
    return {
        "user": user,
        "orders": orders,
        "notifications": notifications,
        "recommendations": recommendations
    }
 
# 총 소요 시간: 200 + 150 + 100 + 180 = 630ms

After (병렬 실행):

import asyncio
 
@mcp.tool()
async def fetch_dashboard_data_fast(user_id: str) -> dict:
    """대시보드 데이터 조회 - 병렬 실행"""
    # 모든 요청을 동시에 시작
    user, orders, notifications, recommendations = await asyncio.gather(
        fetch_user(user_id),
        fetch_orders(user_id),
        fetch_notifications(user_id),
        fetch_recommendations(user_id),
        return_exceptions=True  # 하나의 실패가 전체를 막지 않음
    )
    
    return {
        "user": user if not isinstance(user, Exception) else None,
        "orders": orders if not isinstance(orders, Exception) else [],
        "notifications": notifications if not isinstance(notifications, Exception) else [],
        "recommendations": recommendations if not isinstance(recommendations, Exception) else []
    }
 
# 총 소요 시간: max(200, 150, 100, 180) = 200ms
# 개선: 68% 지연 시간 감소

실제 사례 (이커머스):

  • Before: 800ms (순차)
  • After: 250ms (병렬)
  • 개선: 68% 감소
  • 비즈니스 영향: 사용자 참여도 15% 증가

TypeScript 버전:

server.registerTool(
  'fetch-dashboard',
  { inputSchema: { userId: z.string() } },
  async ({ userId }) => {
    // Promise.all로 병렬 실행
    const [user, orders, notifications, recommendations] = await Promise.all([
      fetchUser(userId),
      fetchOrders(userId),
      fetchNotifications(userId),
      fetchRecommendations(userId)
    ].map(p => p.catch(e => ({ error: e.message }))));
    
    return {
      content: [{ type: 'text', text: JSON.stringify({
        user, orders, notifications, recommendations
      }) }]
    };
  }
);

2.5 HTTP 커넥션 풀링

Python:

import aiohttp
 
# 글로벌 세션 (애플리케이션 수명 동안 재사용)
http_session = None
 
async def get_http_session():
    """HTTP 세션 싱글톤"""
    global http_session
    if not http_session:
        connector = aiohttp.TCPConnector(
            limit=100,           # 총 연결 수 제한
            limit_per_host=30,   # 호스트당 연결 수 제한
            ttl_dns_cache=300    # DNS 캐시 5분
        )
        http_session = aiohttp.ClientSession(connector=connector)
    return http_session
 
@mcp.tool()
async def fetch_api_data(endpoint: str) -> dict:
    """외부 API 호출 - 커넥션 풀 사용"""
    session = await get_http_session()
    
    async with session.get(
        endpoint,
        timeout=aiohttp.ClientTimeout(total=10)
    ) as response:
        return await response.json()

성능 개선:

  • DNS 조회 제거
  • TCP 핸드셰이크 제거
  • TLS 협상 제거
  • 결과: 반복 요청 시 50-100ms 절감

실무 적용 팁

  1. 점진적 최적화: 가장 큰 영향을 주는 것부터 시작 (커넥션 풀링 → 캐싱 → 병렬화)
  2. 캐시 TTL 전략:
    • 실시간 데이터: 5-30초
    • 사용자 프로필: 5-15분
    • 히스토리 데이터: 시간-일
  3. 모니터링 필수: Prometheus로 각 최적화의 영향 측정
  4. 에러 처리: 병렬 실행 시 return_exceptions=True 사용으로 부분 실패 허용

03. MCP 성능 향상: Response Context(Token) 최적화와 Conversation Hop 줄이기

3.1 필드 필터링 (핵심 기법)

원리: API가 반환하는 모든 필드가 아닌, LLM이 실제로 필요로 하는 필드만 반환

Before (YNAB 예시):

@mcp.tool()
def get_accounts_unfiltered(budget_id: str) -> list:
    """계정 조회 - 모든 필드 반환"""
    response = ynab_api.get_accounts(budget_id)
    # 18개 필드 모두 반환
    return [dict(account) for account in response.data.accounts]
 
# 47개 계정, 18개 필드
# 토큰 수: 9,960

After:

@mcp.tool()
def get_accounts_filtered(budget_id: str) -> list:
    """계정 조회 - 필수 필드만 반환"""
    response = ynab_api.get_accounts(budget_id)
    
    return [
        {
            "id": account.id,
            "name": account.name,
            "type": account.type,
            "balance": account.balance / 1000,  # 밀리단위 → 달러
            "on_budget": account.on_budget,
            "closed": account.closed
        }
        for account in response.data.accounts
        if not account.deleted  # 삭제된 항목 필터링
    ]
 
# 47개 계정, 6개 필드
# 토큰 수: 3,451
# 감소: 65.4%

가이드라인:

  • ✅ 사용자 질문에 답하는 데 필요한 필드만
  • ✅ 삭제/아카이브된 항목은 소스에서 필터링
  • ✅ 내부 참조 ID, 디버그 메타데이터 제거
  • ✅ 옵션 파라미터로 상세 정보 제공

3.2 사전 집계 (Pre-Aggregation)

원리: 원시 데이터를 반환하는 대신 서버 측에서 요약 계산

Before (대량 트랜잭션):

@mcp.tool()
def get_spending_raw(category_id: str, start_date: str, end_date: str) -> list:
    """지출 내역 조회 - 원시 데이터"""
    transactions = api.get_transactions(category_id, start_date, end_date)
    # 1,704개 트랜잭션 반환
    return [dict(t) for t in transactions]
 
# 6개월 데이터
# 토큰 수: 4,890
# 1년 데이터: 746,800 토큰 (컨텍스트 윈도우 초과!)

After (사전 집계):

from collections import defaultdict
from datetime import datetime
 
@mcp.tool()
def get_spending_summary(category_id: str, start_date: str, end_date: str) -> dict:
    """지출 내역 조회 - 사전 집계"""
    transactions = api.get_transactions(category_id, start_date, end_date)
    
    # 서버에서 계산
    total_spent = sum(t.amount for t in transactions)
    
    monthly_totals = defaultdict(float)
    for t in transactions:
        month_key = t.date[:7]  # YYYY-MM
        monthly_totals[month_key] += t.amount
    
    return {
        "category_id": category_id,
        "date_range": {"start": start_date, "end": end_date},
        "total_spent": total_spent,
        "transaction_count": len(transactions),
        "average_per_month": total_spent / len(monthly_totals) if monthly_totals else 0,
        "monthly_breakdown": [
            {"month": month, "spent": amount}
            for month, amount in sorted(monthly_totals.items())
        ]
    }
 
# 6개월 데이터
# 토큰 수: 262
# 감소: 94.6%
 
# 1년 데이터도 동일하게 262 토큰
# 감소: 99.96%

성능 비교:

데이터 기간원시 데이터사전 집계감소율
6개월 (1,704 txn)4,890 토큰262 토큰94.6%
1년 (3,456 txn)746,800 토큰262 토큰99.96%

3.3 동적 도구 로딩

문제: 모든 도구를 미리 로드하면 컨텍스트 소비

해결 방법: 대화 컨텍스트에 따라 필요한 도구만 로드

Before:

# 시작 시 모든 114개 도구 로드
mcp = FastMCP("All Tools Server")
 
# GitHub 도구 50개
@mcp.tool()
def github_create_issue(...): pass
@mcp.tool()
def github_list_repos(...): pass
# ... 48개 더
 
# Notion 도구 34개
@mcp.tool()
def notion_create_page(...): pass
@mcp.tool()
def notion_query_db(...): pass
# ... 32개 더
 
# Grafana 도구 30개
@mcp.tool()
def grafana_create_dashboard(...): pass
# ... 29개 더
 
# "hello" 입력 시 소비: 46,000 토큰
# "List 10 GitHub issues" 실행: 102,000 토큰 (대부분 불필요)

After (도구 그룹화):

# 서버를 도메인별로 분리
github_mcp = FastMCP("GitHub Server")
notion_mcp = FastMCP("Notion Server")
grafana_mcp = FastMCP("Grafana Server")
 
# Claude Desktop 설정
# claude_desktop_config.json
{
  "mcpServers": {
    "github": {
      "command": "python",
      "args": ["github_server.py"]
    },
    "notion": {
      "command": "python",
      "args": ["notion_server.py"]
    },
    "grafana": {
      "command": "python",
      "args": ["grafana_server.py"]
    }
  }
}
 
# 사용자가 "List GitHub issues"라고 하면
# GitHub 서버만 활성화 → ~8,000 토큰만 사용
# 감소: 92%

MCP Optimizer 패턴:

# RooCode/ToolHive 방식
class LazyToolLoader:
    """요청 시 도구 스펙 로드"""
    
    def __init__(self):
        self.loaded_servers = set()
    
    async def request_tools(self, server_name: str):
        """특정 MCP 서버의 도구만 로드"""
        if server_name not in self.loaded_servers:
            # 도구 스펙 가져오기
            specs = await fetch_tool_specs(server_name)
            self.loaded_servers.add(server_name)
            return specs
        return []
 
# 초기 프롬프트
"""
사용 가능한 MCP 서버: github, notion, grafana
 
MCP 도구를 사용하려면 다음과 같이 요청하세요:
'Provide tool specifications for: <server-name>'
"""
 
# 모델이 GitHub 도구가 필요하면:
# "Provide tool specifications for: github"
# → GitHub 도구만 로드, 8,000 토큰

3.4 Conversation Hop 줄이기

문제: 여러 번의 왕복 통신이 지연과 토큰 증가

Pattern 1: 복합 도구

Before (3 hops):

# Hop 1: 목적지 조회
@mcp.tool()
def get_destinations() -> list:
    """모든 리조트/파크 반환"""
    return api.get_all_destinations()  # 거대한 JSON
 
# Hop 2: 파크 ID로 상세 조회
@mcp.tool()
def get_park_by_id(park_id: str) -> dict:
    return api.get_park(park_id)
 
# Hop 3: 일정 조회
@mcp.tool()
def get_park_schedule(park_id: str, date: str) -> dict:
    return api.get_schedule(park_id, date)
 
# 사용자: "What time does Epcot open tomorrow?"
# Agent:
# 1. get_destinations() → 93,457 토큰
# 2. get_park_by_id("epcot") → 찾아야 함
# 3. get_park_schedule("epcot", "2024-11-09") → 또 찾아야 함
# 총: 3 hops, 130,000+ 토큰

After (1 hop):

@mcp.tool()
def get_park_schedule_by_name(park_name: str, date: str) -> dict:
    """파크 이름으로 직접 일정 조회 (1-hop)"""
    # 내부적으로:
    # 1. 파크 이름으로 ID 찾기
    park_id = find_park_id_by_name(park_name)
    if not park_id:
        return {"error": f"Park '{park_name}' not found"}
    
    # 2. 일정 가져오기
    schedule = api.get_schedule(park_id, date)
    
    # 3. 해당 날짜로 필터링
    day_schedule = [
        s for s in schedule 
        if s['date'] == date
    ]
    
    # 4. 필수 정보만 반환
    return {
        "park": park_name,
        "date": date,
        "opening_time": day_schedule[0]['open'] if day_schedule else None,
        "closing_time": day_schedule[0]['close'] if day_schedule else None
    }
 
# 사용자: "What time does Epcot open tomorrow?"
# Agent: get_park_schedule_by_name("Epcot", "2024-11-09")
# 총: 1 hop, 1,880 토큰
# 감소: 98%

Pattern 2: MCP Prompts (워크플로우 체인)

@mcp.prompt()
async def analyze_overspending() -> str:
    """복잡한 워크플로우를 단일 프롬프트로 패키징"""
    # 내부적으로 여러 도구 호출
    budget = await get_current_month_budget()
    spending = await get_spending_by_category()
    
    # 분석 수행
    variances = calculate_variances(budget, spending)
    overspent = identify_overspent_categories(variances)
    
    # 결과를 구조화된 프롬프트로 반환
    return f"""
    당신은 재무 분석가입니다. 다음 예산 초과 상황을 분석하세요:
    
    예산: {json.dumps(budget, indent=2)}
    실제 지출: {json.dumps(spending, indent=2)}
    초과 카테고리: {json.dumps(overspent, indent=2)}
    
    1. 가장 심각한 초과 항목 식별
    2. 가능한 원인 설명
    3. 개선 제안
    """
 
# 사용자: "Am I overspending this month?"
# Agent: 단일 프롬프트 호출 → 모든 분석 완료
# 3-4 hops → 1 hop

Pattern 3: 배치 실행 (MCP BatchIt)

@mcp.tool()
def batch_file_operations(operations: list) -> list:
    """여러 파일 작업을 한 번에 실행"""
    # operations: [
    #   {"tool": "create_directory", "arguments": {"path": "/data"}},
    #   {"tool": "write_file", "arguments": {"path": "/data/report.txt", "content": "..."}},
    #   {"tool": "read_file", "arguments": {"path": "/config.json"}}
    # ]
    
    results = []
    for op in operations:
        try:
            result = execute_operation(op['tool'], op['arguments'])
            results.append({"success": True, "result": result})
        except Exception as e:
            results.append({"success": False, "error": str(e)})
            if op.get('stop_on_error'):
                break
    
    return results
 
# Before: 3개 파일 작업 = 3 hops
# After: 배치 실행 = 1 hop

3.5 점진적 공개 (Progressive Disclosure)

원리: 메타데이터를 먼저 로드하고, 필요 시 상세 정보 요청

Pattern 1: 요약 + 캐시 ID

@mcp.tool()
def list_documents() -> dict:
    """문서 목록 - 요약만 반환"""
    docs = fetch_all_documents()
    
    # 전체 데이터를 캐시에 저장
    cache_id = store_in_cache(docs)
    
    # 요약만 반환
    return {
        "summary": [
            {
                "id": doc.id,
                "title": doc.title,
                "type": doc.type,
                "size": len(doc.content)
            }
            for doc in docs
        ],
        "cache_id": cache_id,
        "message": "Use get_document_details(cache_id) for full content"
    }
 
@mcp.tool()
def get_document_details(cache_id: str) -> dict:
    """캐시된 전체 문서 반환"""
    return retrieve_from_cache(cache_id)
 
# 목록 조회: 500 토큰 (96% 절감)
# 필요 시에만 전체 내용 로드

Pattern 2: 파일시스템 기반 발견

# 구조:
# bigquery-skill/
# ├── SKILL.md (개요 - 항상 로드)
# └── reference/
#     ├── finance.md (필요 시 로드)
#     ├── sales.md
#     └── product.md
 
@mcp.resource("skill://bigquery/overview")
def get_skill_overview() -> str:
    """스킬 개요 (핵심 정보만)"""
    return read_file("SKILL.md")  # 2,000 토큰
 
# 사용자: "Show me revenue data"
# Claude:
# 1. SKILL.md 읽기 (finance.md 참조 발견)
# 2. reference/finance.md 읽기
# 3. 다른 파일은 로드하지 않음 (0 토큰)
 
# 효과: 10-15개 참조 파일 중 1-2개만 로드
# 토큰 절감: 85-95%

3.6 Complete Before/After 예시

사례: 테마파크 API MCP

Before (최적화 전):

@mcp.tool()
def get_destinations_naive() -> dict:
    """모든 목적지 반환 - 최적화 전"""
    # 리조트 중심 구조
    return api.get_destinations()
    # {
    #   "destinations": [
    #     {
    #       "id": "wdw",
    #       "name": "Walt Disney World Resort",
    #       "parks": [
    #         {"id": "mk", "name": "Magic Kingdom", "schedule": [...]},
    #         {"id": "epcot", "name": "Epcot", "schedule": [...]},
    #         ...
    #       ],
    #       "hotels": [...],
    #       "dining": [...]
    #     },
    #     ...
    #   ]
    # }
 
@mcp.tool()
def get_schedule_naive(park_id: str) -> dict:
    return api.get_schedule(park_id)
 
# Query: "What time does Epcot open tomorrow?"
# Execution:
# 1. get_destinations_naive() → 모든 리조트/파크/호텔/식당
# 2. Agent가 JSON에서 Epcot ID 찾기
# 3. get_schedule_naive("epcot") → 1년치 일정
# 4. Agent가 내일 날짜 필터링
 
# 토큰 사용:
# Query 1: 93,457 토큰
# Query 2: 40,612 토큰
# Query 3: 23,206 토큰
# 비용: $0.42 (3 queries)

After (최적화 후):

# 1. JSON 구조 반전: 파크 중심
# 2. 파크 이름으로 필터링
# 3. 특정 날짜로 필터링
 
@mcp.tool()
def get_park_hours(park_name: str, date: str) -> dict:
    """특정 파크의 특정 날짜 운영 시간"""
    # 파크 이름으로 직접 검색
    park = find_park_by_name(park_name)
    if not park:
        return {
            "error": f"Park '{park_name}' not found",
            "available_parks": ["Magic Kingdom", "Epcot", "Hollywood Studios", "Animal Kingdom"]
        }
    
    # 해당 날짜 일정만 조회
    schedule = get_schedule_for_date(park.id, date)
    
    return {
        "park": park.name,
        "date": date,
        "opening_time": schedule.open,
        "closing_time": schedule.close,
        "special_hours": schedule.special_hours if hasattr(schedule, 'special_hours') else None
    }
 
# Query: "What time does Epcot open tomorrow?"
# Execution:
# 1. get_park_hours("Epcot", "2024-11-09") → 직접 답변
 
# 토큰 사용:
# Query 1: 1,880 토큰 (98.0% 감소)
# Query 2: 1,857 토큰 (95.4% 감소)
# Query 3: 1,594 토큰 (93.1% 감소)
# 비용: $0.01 (3 queries) → 97.6% 절감

최적화 체크리스트:

# ✅ 최적화 평가 함수
def evaluate_tool_efficiency(tool_name: str, response: dict) -> dict:
    """도구 효율성 평가"""
    response_json = json.dumps(response)
    token_count = count_tokens(response_json)
    
    # 평가 기준
    evaluation = {
        "tool": tool_name,
        "tokens": token_count,
        "checks": {
            "필드_필터링": check_unnecessary_fields(response),
            "삭제된_항목_제거": check_deleted_items(response),
            "적절한_TTL": check_cache_ttl(tool_name),
            "사전_집계": check_aggregation(response),
            "1hop_달성": check_single_hop(tool_name)
        }
    }
    
    # 점수 계산
    passed = sum(1 for v in evaluation["checks"].values() if v)
    evaluation["score"] = passed / len(evaluation["checks"])
    
    # 권장사항
    if evaluation["score"] < 0.8:
        evaluation["recommendations"] = generate_recommendations(evaluation["checks"])
    
    return evaluation

실무 적용 팁

  1. 토큰 측정 자동화: 모든 도구에 토큰 카운팅 래퍼 추가
  2. 베이스라인 설정: 최적화 전 토큰 사용량 기록
  3. 점진적 개선:
    • 1주차: 필드 필터링 (30-65% 절감)
    • 2주차: 사전 집계 (90-99% 절감, 대량 데이터)
    • 3주차: 복합 도구 + conversation hop 감소
    • 4주차: 동적 로딩 + 점진적 공개
  4. 우선순위: 20%의 도구가 80%의 토큰을 소비 → 이들부터 최적화
  5. 모니터링: Grafana 대시보드로 도구별 토큰 사용량 추적
  6. A/B 테스팅: 최적화 전후 사용자 경험 비교

종합 성과 요약

최적화 기법구현 복잡도토큰 절감Latency 개선ROI
필드 필터링낮음30-65%10-20%매우 높음
커넥션 풀링낮음-13배매우 높음
멀티레벨 캐싱중간40%93-98%높음
병렬 처리중간-68%높음
사전 집계중간90-99%20-30%매우 높음
복합 도구중간50-80%66%높음
동적 도구 로딩높음70-90%-중간
점진적 공개높음85-95%-중간