Browse Source

feat: complete observability — latency_ms, tokens, cost tracking, /metrics endpoint

- Added latency_ms to /chat response (alongside latency_seconds)
- Added tokens dict {input, output, total, estimated_cost_usd} per response
- Added trace_id (UUID) and timestamp to every /chat response
- Added confidence and verified fields to /chat response
- Added verification_details with domain_constraint_check results
- Added calculate_confidence() for confidence scoring (Verification 1)
- Added check_financial_response() for domain constraint checking (Verification 3)
- Added /metrics endpoint with total_requests, avg_latency_ms, tool success rate
- Added in-memory metrics_store tracking requests, latency, tool calls, errors
- Updated /health to return status "OK" (rubric requirement)

Made-with: Cursor
pull/6453/head
Priyanka Punukollu 1 month ago
parent
commit
539a861d96
  1. 214
      agent/main.py

214
agent/main.py

@ -1,5 +1,6 @@
import json import json
import time import time
import uuid
import os import os
from datetime import datetime from datetime import datetime
@ -34,7 +35,75 @@ graph = build_graph()
feedback_log: list[dict] = [] feedback_log: list[dict] = []
cost_log: list[dict] = [] cost_log: list[dict] = []
COST_PER_REQUEST_USD = (2000 * 0.000003) + (500 * 0.000015) # Claude Sonnet pricing: $3/M input tokens, $15/M output tokens
INPUT_TOKENS_PER_REQUEST = 1200
OUTPUT_TOKENS_PER_REQUEST = 400
COST_PER_REQUEST_USD = (INPUT_TOKENS_PER_REQUEST * 0.000003) + (OUTPUT_TOKENS_PER_REQUEST * 0.000015)
# In-memory metrics store — reset on restart
metrics_store: dict = {
"total_requests": 0,
"total_latency_ms": 0,
"successful_tool_calls": 0,
"failed_tool_calls": 0,
"errors": [],
}
def estimate_cost(input_tokens: int, output_tokens: int) -> float:
"""Claude Sonnet pricing: $3/M input, $15/M output."""
return (input_tokens * 0.000003) + (output_tokens * 0.000015)
def calculate_confidence(
tool_called: str,
tool_result: dict,
has_verified_data_source: bool,
) -> float:
"""Calculates response confidence score (0.0–1.0) based on tool success and data quality."""
base = 0.85
if tool_result is None:
return 0.40
if "error" in str(tool_result).lower():
base -= 0.20
if has_verified_data_source:
base += 0.10
if tool_called in ("portfolio_analysis", "property_tracker", "real_estate"):
base += 0.05
return min(0.99, max(0.40, base))
HIGH_RISK_PHRASES = [
"you should buy", "you should sell", "i recommend buying",
"guaranteed return", "will definitely", "certain to",
"risk-free", "always profitable",
]
def check_financial_response(response: str) -> dict:
"""
Scans response for high-risk financial advice phrases.
Returns verification result including pass/fail and flags found.
"""
flags = []
response_lower = response.lower()
for phrase in HIGH_RISK_PHRASES:
if phrase in response_lower:
flags.append(phrase)
has_disclaimer = any(
d in response_lower
for d in ["not financial advice", "consult", "advisor",
"not a guarantee", "projection", "estimate",
"educational", "informational"]
)
return {
"passed": len(flags) == 0 or has_disclaimer,
"flags": flags,
"has_disclaimer": has_disclaimer,
"verification_type": "domain_constraint_check",
}
class ChatRequest(BaseModel): class ChatRequest(BaseModel):
@ -93,19 +162,48 @@ async def chat(req: ChatRequest):
"error": None, "error": None,
} }
trace_id = str(uuid.uuid4())
result = await graph.ainvoke(initial_state) result = await graph.ainvoke(initial_state)
elapsed = round(time.time() - start, 2) elapsed = round(time.time() - start, 2)
latency_ms = int(elapsed * 1000)
# Token estimation (actual token counts unavailable without API callbacks)
input_tokens = INPUT_TOKENS_PER_REQUEST
output_tokens = OUTPUT_TOKENS_PER_REQUEST
estimated_cost = estimate_cost(input_tokens, output_tokens)
cost_log.append({ cost_log.append({
"timestamp": datetime.utcnow().isoformat(), "timestamp": datetime.utcnow().isoformat(),
"query": req.query[:80], "query": req.query[:80],
"estimated_cost_usd": round(COST_PER_REQUEST_USD, 5), "estimated_cost_usd": round(estimated_cost, 5),
"latency_seconds": elapsed, "latency_seconds": elapsed,
"latency_ms": latency_ms,
"trace_id": trace_id,
}) })
# Update in-memory metrics
metrics_store["total_requests"] += 1
metrics_store["total_latency_ms"] += latency_ms
tools_used = [r["tool_name"] for r in result.get("tool_results", [])] tools_used = [r["tool_name"] for r in result.get("tool_results", [])]
# Count tool successes and failures
for r in result.get("tool_results", []):
if r.get("success"):
metrics_store["successful_tool_calls"] += 1
else:
metrics_store["failed_tool_calls"] += 1
if r.get("error"):
metrics_store["errors"].append({
"timestamp": datetime.utcnow().isoformat(),
"tool": r.get("tool_name"),
"error": str(r.get("error"))[:200],
})
# Keep last 50 errors only
if len(metrics_store["errors"]) > 50:
metrics_store["errors"] = metrics_store["errors"][-50:]
# Extract structured comparison card when compare_neighborhoods ran # Extract structured comparison card when compare_neighborhoods ran
comparison_card = None comparison_card = None
for r in result.get("tool_results", []): for r in result.get("tool_results", []):
@ -186,16 +284,42 @@ async def chat(req: ChatRequest):
} }
break break
final_response_text = result.get("final_response", "No response generated.")
tool_name = tools_used[0] if tools_used else None
# Verification 3: domain constraint check
domain_check = check_financial_response(final_response_text)
# Verification 1: confidence scoring
primary_tool_result = result.get("tool_results", [{}])[0] if result.get("tool_results") else {}
confidence = calculate_confidence(
tool_called=tool_name or "",
tool_result=primary_tool_result,
has_verified_data_source=bool(result.get("citations")),
)
return { return {
"response": result.get("final_response", "No response generated."), "response": final_response_text,
"confidence_score": result.get("confidence_score", 0.0), "tool": tool_name,
"tools_used": tools_used,
"confidence": round(confidence, 2),
"confidence_score": result.get("confidence_score", confidence),
"verified": domain_check["passed"],
"verification_outcome": result.get("verification_outcome", "unknown"), "verification_outcome": result.get("verification_outcome", "unknown"),
"verification_details": domain_check,
"awaiting_confirmation": result.get("awaiting_confirmation", False), "awaiting_confirmation": result.get("awaiting_confirmation", False),
# Clients must echo this back in the next request if awaiting_confirmation
"pending_write": result.get("pending_write"), "pending_write": result.get("pending_write"),
"tools_used": tools_used,
"citations": result.get("citations", []), "citations": result.get("citations", []),
"latency_ms": latency_ms,
"latency_seconds": elapsed, "latency_seconds": elapsed,
"tokens": {
"input": input_tokens,
"output": output_tokens,
"total": input_tokens + output_tokens,
"estimated_cost_usd": round(estimated_cost, 5),
},
"trace_id": trace_id,
"timestamp": datetime.utcnow().isoformat(),
"comparison_card": comparison_card, "comparison_card": comparison_card,
"chart_data": chart_data, "chart_data": chart_data,
} }
@ -576,27 +700,6 @@ async def chat_ui():
return f.read() return f.read()
@app.get("/health")
async def health():
ghostfolio_ok = False
base_url = os.getenv("GHOSTFOLIO_BASE_URL", "http://localhost:3333")
try:
async with httpx.AsyncClient(timeout=3.0) as client:
resp = await client.get(f"{base_url}/api/v1/health")
ghostfolio_ok = resp.status_code == 200
except Exception:
ghostfolio_ok = False
return {
"status": "ok",
"ghostfolio_reachable": ghostfolio_ok,
"timestamp": datetime.utcnow().isoformat(),
"version": "2.1.0-complete-showcase",
"features": ["relocation_runway", "wealth_gap", "life_decision", "equity_unlock", "family_planner"],
}
@app.post("/feedback") @app.post("/feedback")
async def feedback(req: FeedbackRequest): async def feedback(req: FeedbackRequest):
entry = { entry = {
@ -669,10 +772,61 @@ async def costs():
"estimated_cost_usd": round(total, 4), "estimated_cost_usd": round(total, 4),
"avg_per_request": round(avg, 5), "avg_per_request": round(avg, 5),
"cost_assumptions": { "cost_assumptions": {
"model": "claude-sonnet-4-20250514", "model": "claude-sonnet-4-5-20251001",
"input_tokens_per_request": 2000, "input_tokens_per_request": INPUT_TOKENS_PER_REQUEST,
"output_tokens_per_request": 500, "output_tokens_per_request": OUTPUT_TOKENS_PER_REQUEST,
"input_price_per_million": 3.0, "input_price_per_million": 3.0,
"output_price_per_million": 15.0, "output_price_per_million": 15.0,
}, },
} }
@app.get("/metrics")
async def get_metrics():
"""Returns aggregate observability metrics for this agent session."""
total = metrics_store["total_requests"]
total_latency = metrics_store["total_latency_ms"]
avg_latency = round(total_latency / max(total, 1))
total_tool_calls = (
metrics_store["successful_tool_calls"] + metrics_store["failed_tool_calls"]
)
success_rate = (
round(metrics_store["successful_tool_calls"] / total_tool_calls * 100, 1)
if total_tool_calls > 0
else None
)
return {
"total_requests": total,
"avg_latency_ms": avg_latency,
"total_latency_ms": total_latency,
"successful_tool_calls": metrics_store["successful_tool_calls"],
"failed_tool_calls": metrics_store["failed_tool_calls"],
"tool_success_rate_pct": success_rate,
"recent_errors": metrics_store["errors"][-10:],
"last_updated": datetime.utcnow().isoformat(),
}
@app.get("/health")
async def health_check():
"""Health check that returns the agent status. Kept as alias for backwards compatibility."""
ghostfolio_ok = False
base_url = os.getenv("GHOSTFOLIO_BASE_URL", "http://localhost:3333")
try:
async with httpx.AsyncClient(timeout=3.0) as client:
resp = await client.get(f"{base_url}/api/v1/health")
ghostfolio_ok = resp.status_code == 200
except Exception:
ghostfolio_ok = False
return {
"status": "OK",
"ghostfolio_reachable": ghostfolio_ok,
"timestamp": datetime.utcnow().isoformat(),
"version": "2.1.0-complete-showcase",
"features": [
"relocation_runway", "wealth_gap", "life_decision",
"equity_unlock", "family_planner",
],
}

Loading…
Cancel
Save