Adım 2: Implementation + Monitoring
Asistanı kur. Anthropic SDK + Prometheus + cost tracking. Production-grade code.
Şükrü Yusuf KAYA
16 min read
AdvancedAdım 2: Implementation
Plan tamamlandı. Şimdi kod.
Project Yapısı#
text
final-project/├── docs/ # 200K dokümantasyon (markdown)│ ├── getting-started.md│ ├── api-reference.md│ └── ...├── src/│ ├── assistant.py # Ana asistan logic│ ├── caching.py # Cache_control logic│ ├── tools.py # 8 tool tanımı│ ├── pii_redaction.py # KVKK uyumu│ ├── monitoring.py # Prometheus metrics│ └── main.py # FastAPI server├── tests/│ ├── test_cache_hit_rate.py│ └── test_pii_redaction.py├── docker-compose.yml # Prometheus + Grafana└── README.mdProje yapısı
Çekirdek Kod#
python
# src/assistant.pyimport anthropicimport timefrom typing import List, Dictfrom monitoring import metricsfrom pii_redaction import redact_piifrom tools import TOOLS class DocsAssistant: DOCS: str # 200K dokümantasyon SYSTEM = """Sen yazılım dokümantasyon asistanısın.Sorgulara dokümantasyondaki bilgiyi referans alarak cevap ver.Her cevabın sonunda kaynak doc'u belirt (örn. [Source: api-reference.md]).""" def __init__(self): self.client = anthropic.Anthropic() self.DOCS = self._load_docs() def _load_docs(self) -> str: with open("docs/combined.md") as f: return f.read() def query( self, user_query: str, conversation: List[Dict] = None ) -> Dict: # 1. PII redact clean_query = redact_pii(user_query) # 2. Build messages with cache breakpoints system_blocks = [ { "type": "text", "text": self.DOCS, "cache_control": {"type": "ephemeral", "ttl": "1h"}, }, { "type": "text", "text": self.SYSTEM, "cache_control": {"type": "ephemeral", "ttl": "5m"}, }, ] tools_with_cache = [ *TOOLS[:-1], {**TOOLS[-1], "cache_control": {"type": "ephemeral", "ttl": "1h"}}, ] messages = [] if conversation: last = conversation[-1] messages.extend(conversation[:-1]) messages.append({ **last, "content": [{ "type": "text", "text": last["content"], "cache_control": {"type": "ephemeral", "ttl": "5m"}, }] }) messages.append({"role": "user", "content": clean_query}) # 3. API call + telemetry start = time.perf_counter() response = self.client.messages.create( model="claude-sonnet-4-6", max_tokens=1024, system=system_blocks, tools=tools_with_cache, messages=messages, ) latency = time.perf_counter() - start # 4. Metrics metrics.record(response.usage, latency) return { "answer": response.content[0].text, "cache_hit_rate": ( (response.usage.cache_read_input_tokens or 0) / max(1, (response.usage.cache_read_input_tokens or 0) + (response.usage.cache_creation_input_tokens or 0)) ), "latency_seconds": latency, }src/assistant.py — production-grade asistan
Monitoring#
python
# src/monitoring.pyfrom prometheus_client import Counter, Histogram, Gauge, start_http_serverimport threading class Metrics: def __init__(self): self.cache_writes = Counter('llm_cache_writes_total', 'Cache write tokens') self.cache_reads = Counter('llm_cache_reads_total', 'Cache read tokens') self.input_tokens = Counter('llm_input_tokens_total', 'Fresh input') self.output_tokens = Counter('llm_output_tokens_total', 'Output') self.latency = Histogram('llm_latency_seconds', 'Request latency', buckets=(0.5, 1, 2, 3, 5, 10, 30)) self.cost = Counter('llm_cost_usd_total', 'Cumulative cost in USD') self.hit_rate_gauge = Gauge('llm_hit_rate', 'Current hit rate (gauge, EMA)') # Exponential moving average for hit rate self._ema_hit = 0.95 self._ema_alpha = 0.1 self._lock = threading.Lock() def record(self, usage, latency): cw = usage.cache_creation_input_tokens or 0 cr = usage.cache_read_input_tokens or 0 self.cache_writes.inc(cw) self.cache_reads.inc(cr) self.input_tokens.inc(usage.input_tokens) self.output_tokens.inc(usage.output_tokens) self.latency.observe(latency) # Cost cost = ( usage.input_tokens / 1e6 * 3.0 + cw / 1e6 * 3.75 + cr / 1e6 * 0.30 + usage.output_tokens / 1e6 * 15.0 ) self.cost.inc(cost) # EMA hit rate instant_hit = cr / max(1, cr + cw) with self._lock: self._ema_hit = self._ema_alpha * instant_hit + (1 - self._ema_alpha) * self._ema_hit self.hit_rate_gauge.set(self._ema_hit) # Singletonmetrics = Metrics() # /metrics endpointstart_http_server(9100)src/monitoring.py — Prometheus exporter
CI: Hit Rate Test#
python
# tests/test_cache_hit_rate.pyimport pytestfrom src.assistant import DocsAssistant def test_cache_hit_rate_above_90(): """50 sample query, %90 üstü hit rate.""" assistant = DocsAssistant() hit_rates = [] queries = [f"Test query {i}: nasıl yaparım?" for i in range(50)] for q in queries: result = assistant.query(q) hit_rates.append(result["cache_hit_rate"]) # İlk istek cache miss; gerisi hit later_hits = hit_rates[1:] avg = sum(later_hits) / len(later_hits) assert avg >= 0.90, f"Hit rate {avg:.2%} < %90 hedef"tests/test_cache_hit_rate.py — pre-merge regression test
Shift-Left Quality
Bu test'i CI'a ekle (). Her PR otomatik koşar. Regression olursa merge blocked.
pytest tests/test_cache_hit_rate.py✓ Pekiştir#
Bir Sonraki Derste#
Adım 3: Cost report + post-mortem.
Yorumlar & Soru-Cevap
(0)Yorum yazmak için giriş yap.
Yorumlar yükleniyor...
Related Content
1. Temeller — Context Penceresi Ekonomisi
Bu Eğitim Hakkında ve Prompt Caching Neden Önemli?
Start Learning1. Temeller — Context Penceresi Ekonomisi
Token Ekonomisi 101: Input vs Output Cost Asimetrisi
Start Learning1. Temeller — Context Penceresi Ekonomisi