Skip to content

Adım 2: Implementation + Monitoring

Asistanı kur. Anthropic SDK + Prometheus + cost tracking. Production-grade code.

Şükrü Yusuf KAYA
16 min read
Advanced

Adım 2: Implementation

Plan tamamlandı. Şimdi kod.

Project Yapısı#

text
final-project/
├── docs/ # 200K dokümantasyon (markdown)
│ ├── getting-started.md
│ ├── api-reference.md
│ └── ...
├── src/
│ ├── assistant.py # Ana asistan logic
│ ├── caching.py # Cache_control logic
│ ├── tools.py # 8 tool tanımı
│ ├── pii_redaction.py # KVKK uyumu
│ ├── monitoring.py # Prometheus metrics
│ └── main.py # FastAPI server
├── tests/
│ ├── test_cache_hit_rate.py
│ └── test_pii_redaction.py
├── docker-compose.yml # Prometheus + Grafana
└── README.md
Proje yapısı

Çekirdek Kod#

python
# src/assistant.py
import anthropic
import time
from typing import List, Dict
from monitoring import metrics
from pii_redaction import redact_pii
from tools import TOOLS
 
class DocsAssistant:
DOCS: str # 200K dokümantasyon
SYSTEM = """Sen yazılım dokümantasyon asistanısın.
Sorgulara dokümantasyondaki bilgiyi referans alarak cevap ver.
Her cevabın sonunda kaynak doc'u belirt (örn. [Source: api-reference.md])."""
 
def __init__(self):
self.client = anthropic.Anthropic()
self.DOCS = self._load_docs()
 
def _load_docs(self) -> str:
with open("docs/combined.md") as f:
return f.read()
 
def query(
self,
user_query: str,
conversation: List[Dict] = None
) -> Dict:
# 1. PII redact
clean_query = redact_pii(user_query)
 
# 2. Build messages with cache breakpoints
system_blocks = [
{
"type": "text",
"text": self.DOCS,
"cache_control": {"type": "ephemeral", "ttl": "1h"},
},
{
"type": "text",
"text": self.SYSTEM,
"cache_control": {"type": "ephemeral", "ttl": "5m"},
},
]
 
tools_with_cache = [
*TOOLS[:-1],
{**TOOLS[-1], "cache_control": {"type": "ephemeral", "ttl": "1h"}},
]
 
messages = []
if conversation:
last = conversation[-1]
messages.extend(conversation[:-1])
messages.append({
**last,
"content": [{
"type": "text",
"text": last["content"],
"cache_control": {"type": "ephemeral", "ttl": "5m"},
}]
})
messages.append({"role": "user", "content": clean_query})
 
# 3. API call + telemetry
start = time.perf_counter()
response = self.client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
system=system_blocks,
tools=tools_with_cache,
messages=messages,
)
latency = time.perf_counter() - start
 
# 4. Metrics
metrics.record(response.usage, latency)
 
return {
"answer": response.content[0].text,
"cache_hit_rate": (
(response.usage.cache_read_input_tokens or 0)
/ max(1, (response.usage.cache_read_input_tokens or 0)
+ (response.usage.cache_creation_input_tokens or 0))
),
"latency_seconds": latency,
}
src/assistant.py — production-grade asistan

Monitoring#

python
# src/monitoring.py
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import threading
 
class Metrics:
def __init__(self):
self.cache_writes = Counter('llm_cache_writes_total', 'Cache write tokens')
self.cache_reads = Counter('llm_cache_reads_total', 'Cache read tokens')
self.input_tokens = Counter('llm_input_tokens_total', 'Fresh input')
self.output_tokens = Counter('llm_output_tokens_total', 'Output')
self.latency = Histogram('llm_latency_seconds', 'Request latency',
buckets=(0.5, 1, 2, 3, 5, 10, 30))
self.cost = Counter('llm_cost_usd_total', 'Cumulative cost in USD')
self.hit_rate_gauge = Gauge('llm_hit_rate', 'Current hit rate (gauge, EMA)')
 
# Exponential moving average for hit rate
self._ema_hit = 0.95
self._ema_alpha = 0.1
self._lock = threading.Lock()
 
def record(self, usage, latency):
cw = usage.cache_creation_input_tokens or 0
cr = usage.cache_read_input_tokens or 0
 
self.cache_writes.inc(cw)
self.cache_reads.inc(cr)
self.input_tokens.inc(usage.input_tokens)
self.output_tokens.inc(usage.output_tokens)
self.latency.observe(latency)
 
# Cost
cost = (
usage.input_tokens / 1e6 * 3.0
+ cw / 1e6 * 3.75
+ cr / 1e6 * 0.30
+ usage.output_tokens / 1e6 * 15.0
)
self.cost.inc(cost)
 
# EMA hit rate
instant_hit = cr / max(1, cr + cw)
with self._lock:
self._ema_hit = self._ema_alpha * instant_hit + (1 - self._ema_alpha) * self._ema_hit
self.hit_rate_gauge.set(self._ema_hit)
 
# Singleton
metrics = Metrics()
 
# /metrics endpoint
start_http_server(9100)
src/monitoring.py — Prometheus exporter

CI: Hit Rate Test#

python
# tests/test_cache_hit_rate.py
import pytest
from src.assistant import DocsAssistant
 
def test_cache_hit_rate_above_90():
"""50 sample query, %90 üstü hit rate."""
assistant = DocsAssistant()
hit_rates = []
 
queries = [f"Test query {i}: nasıl yaparım?" for i in range(50)]
 
for q in queries:
result = assistant.query(q)
hit_rates.append(result["cache_hit_rate"])
 
# İlk istek cache miss; gerisi hit
later_hits = hit_rates[1:]
avg = sum(later_hits) / len(later_hits)
assert avg >= 0.90, f"Hit rate {avg:.2%} < %90 hedef"
tests/test_cache_hit_rate.py — pre-merge regression test
Shift-Left Quality
Bu test'i CI'a ekle (
pytest tests/test_cache_hit_rate.py
). Her PR otomatik koşar. Regression olursa merge blocked.

✓ Pekiştir#

Bir Sonraki Derste#

Adım 3: Cost report + post-mortem.

Yorumlar & Soru-Cevap

(0)
Yorum yazmak için giriş yap.
Yorumlar yükleniyor...

Related Content