import json import logging import time import traceback import os import uuid from datetime import datetime, timedelta logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) logger = logging.getLogger("ghostfolio_agent") from fastapi import FastAPI, Response, Depends, HTTPException, status from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import RedirectResponse, StreamingResponse, HTMLResponse, JSONResponse from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from typing import Optional from pydantic import BaseModel from dotenv import load_dotenv import httpx from langchain_core.messages import HumanMessage, AIMessage from jose import JWTError, jwt load_dotenv() # Load agent/.env so ANTHROPIC_API_KEY, GHOSTFOLIO_BEARER_TOKEN, etc. are available load_dotenv(os.path.join(os.path.dirname(__file__), "agent", ".env")) from graph import build_graph from state import AgentState # ── Auth configuration ── # The agent issues its own short-lived JWT whose `sub` is the user's # Ghostfolio bearer token. This way we never store credentials server-side; # Ghostfolio is the identity provider. _JWT_ALGORITHM = "HS256" _JWT_EXPIRE_HOURS = 24 _http_bearer = HTTPBearer(auto_error=False) def _get_jwt_secret() -> str: secret = os.getenv("JWT_SECRET_KEY", "") if not secret: raise RuntimeError("JWT_SECRET_KEY env var is required") return secret def _create_access_token(subject: str) -> str: expire = datetime.utcnow() + timedelta(hours=_JWT_EXPIRE_HOURS) payload = {"sub": subject, "exp": expire} return jwt.encode(payload, _get_jwt_secret(), algorithm=_JWT_ALGORITHM) def _verify_jwt(token: str) -> str: try: payload = jwt.decode(token, _get_jwt_secret(), algorithms=[_JWT_ALGORITHM]) sub: str = payload.get("sub", "") if not sub: raise ValueError("missing sub") return sub except JWTError as exc: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token", headers={"WWW-Authenticate": "Bearer"}, ) from exc def require_auth(credentials: HTTPAuthorizationCredentials = Depends(_http_bearer)) -> str: if credentials is None or not credentials.credentials: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required", headers={"WWW-Authenticate": "Bearer"}, ) return _verify_jwt(credentials.credentials) app = FastAPI( title="Ghostfolio AI Agent", description="LangGraph-powered portfolio analysis agent on top of Ghostfolio", version="1.0.0", ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) graph = build_graph() feedback_log: list[dict] = [] cost_log: list[dict] = [] error_log: list[dict] = [] # last 50 errors for metrics COST_PER_REQUEST_USD = (2000 * 0.000003) + (500 * 0.000015) def log_error(error: Exception, context: dict) -> dict: """Captures error with full context for debugging.""" error_record = { "timestamp": datetime.utcnow().isoformat() + "Z", "error_type": type(error).__name__, "error_message": str(error), "traceback": traceback.format_exc(), "context": context, } error_log.append(error_record) if len(error_log) > 50: error_log.pop(0) logger.error( f"{error_record['error_type']}: {error_record['error_message']} | context={context}" ) return error_record class ChatRequest(BaseModel): query: Optional[str] = None message: Optional[str] = None session_id: Optional[str] = None history: list[dict] = [] # Clients must echo back pending_write from the previous response when # the user is confirming (or cancelling) a write operation. pending_write: dict | None = None # Optional: the logged-in user's Ghostfolio bearer token. # When provided, the agent uses THIS token for all API calls so it operates # on the caller's own portfolio data instead of the shared env-var token. bearer_token: str | None = None def get_text(self) -> str: """Accept either field name for backwards compatibility.""" return (self.query or self.message or "").strip() class FeedbackRequest(BaseModel): query: str response: str rating: int comment: str = "" @app.post("/chat") async def chat(req: ChatRequest, gf_token: str = Depends(require_auth)): start_time = time.time() trace_id = str(uuid.uuid4()) user_text = req.get_text() if not user_text: return { "error": "No message provided", "response": "Please send a message.", "confidence": 0.0, "verified": False, "latency_ms": int((time.time() - start_time) * 1000), "trace_id": trace_id, "tokens": { "estimated_input": 0, "estimated_output": 0, "estimated_total": 0, "estimated_cost_usd": 0.0, }, } # Build conversation history preserving both user AND assistant turns so # Claude has full context for follow-up questions. history_messages = [] for m in req.history: role = m.get("role", "") content = m.get("content", "") if role == "user": history_messages.append(HumanMessage(content=content)) elif role == "assistant": history_messages.append(AIMessage(content=content)) initial_state: AgentState = { "user_query": req.query, "messages": history_messages, "query_type": "", "portfolio_snapshot": {}, "tool_results": [], "pending_verifications": [], "confidence_score": 1.0, "verification_outcome": "pass", "awaiting_confirmation": False, "confirmation_payload": None, # Carry forward any pending write payload the client echoed back "pending_write": req.pending_write, "bearer_token": gf_token, "confirmation_message": None, "missing_fields": [], "final_response": None, "citations": [], "error": None, } try: result = await graph.ainvoke(initial_state) except Exception as e: log_error( e, { "message": user_text[:200], "session_id": None, "query_type": "unknown", }, ) latency_ms = int((time.time() - start_time) * 1000) return { "response": "I encountered an error processing your request. Please try again.", "error": str(e), "confidence": 0.0, "verified": False, "latency_ms": latency_ms, "trace_id": trace_id, "tokens": { "estimated_input": 0, "estimated_output": 0, "estimated_total": 0, "estimated_cost_usd": 0.0, }, } elapsed = round(time.time() - start_time, 2) latency_ms = int((time.time() - start_time) * 1000) cost_log.append({ "timestamp": datetime.utcnow().isoformat(), "query": user_text[:80], "estimated_cost_usd": round(COST_PER_REQUEST_USD, 5), "latency_seconds": elapsed, }) tools_used = [r["tool_name"] for r in result.get("tool_results", [])] # Extract structured comparison card when compare_neighborhoods ran comparison_card = None for r in result.get("tool_results", []): if ( r.get("tool_name") == "real_estate" and r.get("success") and isinstance(r.get("result"), dict) and "location_a" in r["result"] ): res = r["result"] m = res["metrics"] # Count advantages per city to form a verdict advantages: dict[str, int] = {res["location_a"]: 0, res["location_b"]: 0} for metric_data in m.values(): if isinstance(metric_data, dict): for winner_key in ("more_affordable", "higher_yield", "more_walkable"): winner_city = metric_data.get(winner_key) if winner_city in advantages: advantages[winner_city] += 1 winner = max(advantages, key=lambda c: advantages[c]) loser = [c for c in advantages if c != winner][0] verdict = ( f"{winner} leads on affordability & yield " f"({advantages[winner]} vs {advantages[loser]} metrics)." ) comparison_card = { "city_a": { "name": res["location_a"], "median_price": m["median_price"]["a"], "price_per_sqft": m["price_per_sqft"]["a"], "days_on_market": m["days_on_market"]["a"], "walk_score": m["walk_score"]["a"], "yoy_change": m["yoy_price_change_pct"]["a"], "inventory": m["inventory"]["a"], }, "city_b": { "name": res["location_b"], "median_price": m["median_price"]["b"], "price_per_sqft": m["price_per_sqft"]["b"], "days_on_market": m["days_on_market"]["b"], "walk_score": m["walk_score"]["b"], "yoy_change": m["yoy_price_change_pct"]["b"], "inventory": m["inventory"]["b"], }, "winners": { "median_price": m["median_price"].get("more_affordable"), "price_per_sqft": m["price_per_sqft"].get("more_affordable"), "days_on_market": m["days_on_market"].get("less_competitive"), "walk_score": m["walk_score"].get("more_walkable"), }, "verdict": verdict, } break # Extract portfolio allocation chart data when portfolio_analysis ran chart_data = None for r in result.get("tool_results", []): if ( r.get("tool_name") == "portfolio_analysis" and r.get("success") and isinstance(r.get("result"), dict) ): holdings = r["result"].get("holdings", []) if holdings: # Use top 6 holdings by allocation; group the rest as "Other" sorted_h = sorted(holdings, key=lambda h: h.get("allocation_pct", 0), reverse=True) top = sorted_h[:6] other_alloc = sum(h.get("allocation_pct", 0) for h in sorted_h[6:]) labels = [h.get("symbol", "?") for h in top] values = [round(h.get("allocation_pct", 0), 1) for h in top] if other_alloc > 0.1: labels.append("Other") values.append(round(other_alloc, 1)) chart_data = { "type": "allocation_pie", "labels": labels, "values": values, } break confidence = result.get("confidence_score", 0.0) return { "response": result.get("final_response", "No response generated."), "confidence_score": confidence, "confidence": confidence, "verification_outcome": result.get("verification_outcome", "unknown"), "verified": confidence >= 0.80, "awaiting_confirmation": result.get("awaiting_confirmation", False), "pending_write": result.get("pending_write"), "tools_used": tools_used, "tool": tools_used[0] if tools_used else None, "citations": result.get("citations", []), "latency_seconds": elapsed, "latency_ms": latency_ms, "tokens": { "estimated_input": 1200, "estimated_output": 400, "estimated_total": 1600, "estimated_cost_usd": round( (1200 * 0.000003) + (400 * 0.000015), 4 ), }, "trace_id": trace_id, "comparison_card": comparison_card, "chart_data": chart_data, } @app.post("/chat/stream") async def chat_stream(req: ChatRequest, gf_token: str = Depends(require_auth)): """ Streaming variant of /chat — returns SSE (text/event-stream). Runs the full graph, then streams the final response word by word so the user sees output immediately rather than waiting for the full response. """ user_text = req.get_text() history_messages = [] for m in req.history: role = m.get("role", "") content = m.get("content", "") if role == "user": history_messages.append(HumanMessage(content=content)) elif role == "assistant": history_messages.append(AIMessage(content=content)) initial_state: AgentState = { "user_query": user_text, "messages": history_messages, "query_type": "", "portfolio_snapshot": {}, "tool_results": [], "pending_verifications": [], "confidence_score": 1.0, "verification_outcome": "pass", "awaiting_confirmation": False, "confirmation_payload": None, "pending_write": req.pending_write, "bearer_token": gf_token, "confirmation_message": None, "missing_fields": [], "final_response": None, "citations": [], "error": None, } async def generate(): result = await graph.ainvoke(initial_state) response_text = result.get("final_response", "No response generated.") tools_used = [r["tool_name"] for r in result.get("tool_results", [])] # Stream metadata first meta = { "type": "meta", "confidence_score": result.get("confidence_score", 0.0), "verification_outcome": result.get("verification_outcome", "unknown"), "awaiting_confirmation": result.get("awaiting_confirmation", False), "tools_used": tools_used, "citations": result.get("citations", []), } yield f"data: {json.dumps(meta)}\n\n" # Stream response word by word words = response_text.split(" ") for i, word in enumerate(words): chunk = {"type": "token", "token": word + " ", "done": i == len(words) - 1} yield f"data: {json.dumps(chunk)}\n\n" return StreamingResponse(generate(), media_type="text/event-stream") class SeedRequest(BaseModel): bearer_token: str | None = None @app.post("/seed") async def seed_demo_portfolio(req: SeedRequest): """ Populate the caller's Ghostfolio account with a realistic demo portfolio (18 transactions across AAPL, MSFT, NVDA, GOOGL, AMZN, VTI). Called automatically by the Angular chat when a logged-in user has an empty portfolio, so first-time Google OAuth users see real data immediately after signing in. """ base_url = os.getenv("GHOSTFOLIO_BASE_URL", "http://localhost:3333") token = req.bearer_token or os.getenv("GHOSTFOLIO_BEARER_TOKEN", "") headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} DEMO_ACTIVITIES = [ {"type": "BUY", "symbol": "AAPL", "quantity": 10, "unitPrice": 134.18, "date": "2021-03-15"}, {"type": "BUY", "symbol": "AAPL", "quantity": 5, "unitPrice": 148.56, "date": "2021-09-10"}, {"type": "DIVIDEND", "symbol": "AAPL", "quantity": 1, "unitPrice": 3.44, "date": "2022-02-04"}, {"type": "SELL", "symbol": "AAPL", "quantity": 5, "unitPrice": 183.12, "date": "2023-06-20"}, {"type": "DIVIDEND", "symbol": "AAPL", "quantity": 1, "unitPrice": 3.66, "date": "2023-08-04"}, {"type": "BUY", "symbol": "MSFT", "quantity": 8, "unitPrice": 242.15, "date": "2021-05-20"}, {"type": "BUY", "symbol": "MSFT", "quantity": 4, "unitPrice": 299.35, "date": "2022-01-18"}, {"type": "DIVIDEND", "symbol": "MSFT", "quantity": 1, "unitPrice": 9.68, "date": "2022-06-09"}, {"type": "DIVIDEND", "symbol": "MSFT", "quantity": 1, "unitPrice": 10.40, "date": "2023-06-08"}, {"type": "BUY", "symbol": "NVDA", "quantity": 6, "unitPrice": 143.25, "date": "2021-11-05"}, {"type": "BUY", "symbol": "NVDA", "quantity": 4, "unitPrice": 166.88, "date": "2022-07-12"}, {"type": "BUY", "symbol": "GOOGL", "quantity": 3, "unitPrice": 2718.96,"date": "2021-08-03"}, {"type": "BUY", "symbol": "GOOGL", "quantity": 5, "unitPrice": 102.30, "date": "2022-08-15"}, {"type": "BUY", "symbol": "AMZN", "quantity": 4, "unitPrice": 168.54, "date": "2023-02-08"}, {"type": "BUY", "symbol": "VTI", "quantity": 15, "unitPrice": 207.38, "date": "2021-04-06"}, {"type": "BUY", "symbol": "VTI", "quantity": 10, "unitPrice": 183.52, "date": "2022-10-14"}, {"type": "DIVIDEND", "symbol": "VTI", "quantity": 1, "unitPrice": 10.28, "date": "2022-12-27"}, {"type": "DIVIDEND", "symbol": "VTI", "quantity": 1, "unitPrice": 11.42, "date": "2023-12-27"}, ] async with httpx.AsyncClient(timeout=30.0) as client: # Create a brokerage account for this user acct_resp = await client.post( f"{base_url}/api/v1/account", headers=headers, json={"balance": 0, "currency": "USD", "isExcluded": False, "name": "Demo Portfolio", "platformId": None}, ) if acct_resp.status_code not in (200, 201): return {"success": False, "error": f"Could not create account: {acct_resp.text}"} account_id = acct_resp.json().get("id") # Try YAHOO data source first (gives live prices in the UI). # Fall back to MANUAL per-activity if YAHOO validation fails. imported = 0 for a in DEMO_ACTIVITIES: for data_source in ("YAHOO", "MANUAL"): activity_payload = { "accountId": account_id, "currency": "USD", "dataSource": data_source, "date": f"{a['date']}T00:00:00.000Z", "fee": 0, "quantity": a["quantity"], "symbol": a["symbol"], "type": a["type"], "unitPrice": a["unitPrice"], } resp = await client.post( f"{base_url}/api/v1/import", headers=headers, json={"activities": [activity_payload]}, ) if resp.status_code in (200, 201): imported += 1 break # success — no need to try MANUAL fallback return { "success": True, "message": f"Demo portfolio seeded with {imported} activities across AAPL, MSFT, NVDA, GOOGL, AMZN, VTI.", "account_id": account_id, "activities_imported": imported, } class LoginRequest(BaseModel): email: str password: str @app.post("/auth/login") async def auth_login(req: LoginRequest): """ Simple email/password auth for the agent. Credentials are validated against ADMIN_USERNAME / ADMIN_PASSWORD env vars, falling back to the built-in demo credentials (test@example.com / password). All authenticated users share the GHOSTFOLIO_BEARER_TOKEN from the environment. """ admin_email = os.getenv("ADMIN_USERNAME", "test@example.com").strip().lower() admin_password = os.getenv("ADMIN_PASSWORD", "password") if req.email.strip().lower() != admin_email or req.password != admin_password: return JSONResponse( status_code=401, content={"success": False, "message": "Invalid email or password."}, ) gf_token = os.getenv("GHOSTFOLIO_BEARER_TOKEN", "") session_token = _create_access_token(subject=gf_token or "demo") # Try to get a display name from Ghostfolio if a token is configured display_name = admin_email.split("@")[0] if gf_token: base_url = os.getenv("GHOSTFOLIO_BASE_URL", "http://localhost:3333") try: async with httpx.AsyncClient(timeout=4.0) as client: r = await client.get( f"{base_url}/api/v1/user", headers={"Authorization": f"Bearer {gf_token}"}, ) if r.status_code == 200: data = r.json() alias = data.get("settings", {}).get("alias") or "" display_name = alias or display_name except Exception: pass return { "success": True, "token": session_token, "name": display_name, "email": req.email.strip().lower(), } @app.get("/auth/auto") async def auth_auto(): """ No-login auth: returns the configured token and user info without credentials. Enables the app to work without a login page. """ token = os.getenv("GHOSTFOLIO_BEARER_TOKEN", "") base_url = os.getenv("GHOSTFOLIO_BASE_URL", "http://localhost:3333") display_name = "Investor" try: async with httpx.AsyncClient(timeout=4.0) as client: r = await client.get( f"{base_url}/api/v1/user", headers={"Authorization": f"Bearer {token}"}, ) if r.status_code == 200: data = r.json() alias = data.get("settings", {}).get("alias") or "" display_name = alias or "Investor" except Exception: pass return {"success": True, "token": token, "name": display_name} @app.get("/login", include_in_schema=False) async def login_page(): """Redirect to chat — login is bypassed.""" return RedirectResponse(url="/", status_code=302) @app.get("/me") async def get_me(): """Returns the Ghostfolio user profile for the configured bearer token.""" base_url = os.getenv("GHOSTFOLIO_BASE_URL", "http://localhost:3333") token = os.getenv("GHOSTFOLIO_BEARER_TOKEN", "") try: async with httpx.AsyncClient(timeout=5.0) as client: resp = await client.get( f"{base_url}/api/v1/user", headers={"Authorization": f"Bearer {token}"}, ) if resp.status_code == 200: data = resp.json() alias = data.get("settings", {}).get("alias") or data.get("alias") or "" email = data.get("email", "") display = alias or (email.split("@")[0] if email else "") return { "success": True, "id": data.get("id", ""), "name": display or "Investor", "email": email, } except Exception: pass # Fallback: decode JWT locally (no network) try: import base64 as _b64 padded = token.split(".")[1] + "==" payload = json.loads(_b64.b64decode(padded).decode()) uid = payload.get("id", "") initials = uid[:2].upper() if uid else "IN" return {"success": True, "id": uid, "name": "Investor", "initials": initials, "email": ""} except Exception: pass return {"success": False, "name": "Investor", "id": "", "email": ""} # Node labels shown in the live thinking display _NODE_LABELS = { "classify": "Analyzing your question", "tools": "Fetching portfolio data", "write_prepare": "Preparing transaction", "write_execute": "Recording transaction", "verify": "Verifying data accuracy", "format": "Composing response", } _OUR_NODES = set(_NODE_LABELS.keys()) @app.post("/chat/steps") async def chat_steps(req: ChatRequest, gf_token: str = Depends(require_auth)): """ SSE endpoint that streams LangGraph node events in real time. Clients receive step events as each graph node starts/ends, then a meta event with final metadata, then token events for the response. """ start = time.time() user_text = req.get_text() history_messages = [] for m in req.history: role = m.get("role", "") content = m.get("content", "") if role == "user": history_messages.append(HumanMessage(content=content)) elif role == "assistant": history_messages.append(AIMessage(content=content)) initial_state: AgentState = { "user_query": user_text, "messages": history_messages, "query_type": "", "portfolio_snapshot": {}, "tool_results": [], "pending_verifications": [], "confidence_score": 1.0, "verification_outcome": "pass", "awaiting_confirmation": False, "confirmation_payload": None, "pending_write": req.pending_write, "bearer_token": gf_token, "confirmation_message": None, "missing_fields": [], "final_response": None, "citations": [], "error": None, } async def generate(): seen_nodes = set() try: async for event in graph.astream_events(initial_state, version="v2"): etype = event.get("event", "") ename = event.get("name", "") if ename in _OUR_NODES: if etype == "on_chain_start" and ename not in seen_nodes: seen_nodes.add(ename) payload = { "type": "step", "node": ename, "label": _NODE_LABELS[ename], "status": "running", } yield f"data: {json.dumps(payload)}\n\n" elif etype == "on_chain_end": output = event.get("data", {}).get("output", {}) step_payload: dict = { "type": "step", "node": ename, "label": _NODE_LABELS[ename], "status": "done", } if ename == "tools": results = output.get("tool_results", []) step_payload["tools"] = [r["tool_name"] for r in results] if ename == "verify": step_payload["confidence"] = output.get("confidence_score", 1.0) step_payload["outcome"] = output.get("verification_outcome", "pass") yield f"data: {json.dumps(step_payload)}\n\n" elif ename == "LangGraph" and etype == "on_chain_end": output = event.get("data", {}).get("output", {}) response_text = output.get("final_response", "No response generated.") tool_results = output.get("tool_results", []) elapsed = round(time.time() - start, 2) cost_log.append({ "timestamp": datetime.utcnow().isoformat(), "query": user_text[:80], "estimated_cost_usd": round(COST_PER_REQUEST_USD, 5), "latency_seconds": elapsed, }) meta = { "type": "meta", "confidence_score": output.get("confidence_score", 0.0), "verification_outcome": output.get("verification_outcome", "unknown"), "awaiting_confirmation": output.get("awaiting_confirmation", False), "pending_write": output.get("pending_write"), "tools_used": [r["tool_name"] for r in tool_results], "citations": output.get("citations", []), "latency_seconds": elapsed, } yield f"data: {json.dumps(meta)}\n\n" words = response_text.split(" ") for i, word in enumerate(words): chunk = { "type": "token", "token": word + (" " if i < len(words) - 1 else ""), "done": i == len(words) - 1, } yield f"data: {json.dumps(chunk)}\n\n" yield f"data: {json.dumps({'type': 'done'})}\n\n" except Exception as exc: err_payload = { "type": "error", "message": f"Agent error: {str(exc)}", } yield f"data: {json.dumps(err_payload)}\n\n" return StreamingResponse(generate(), media_type="text/event-stream") @app.get("/", response_class=HTMLResponse, include_in_schema=False) async def chat_ui(): with open(os.path.join(os.path.dirname(__file__), "chat_ui.html")) as f: return f.read() @app.get("/health") async def health(): ghostfolio_ok = False base_url = os.getenv("GHOSTFOLIO_BASE_URL", "http://localhost:3333") try: async with httpx.AsyncClient(timeout=3.0) as client: resp = await client.get(f"{base_url}/api/v1/health") ghostfolio_ok = resp.status_code == 200 except Exception: ghostfolio_ok = False return { "status": "ok", "ghostfolio_reachable": ghostfolio_ok, "timestamp": datetime.utcnow().isoformat(), "version": "2.1.0-complete-showcase", "features": ["relocation_runway", "wealth_gap", "life_decision", "equity_unlock", "family_planner"], } @app.post("/feedback") async def feedback(req: FeedbackRequest, _auth: str = Depends(require_auth)): entry = { "timestamp": datetime.utcnow().isoformat(), "query": req.query, "response": req.response[:200], "rating": req.rating, "comment": req.comment, } feedback_log.append(entry) return {"status": "recorded", "total_feedback": len(feedback_log)} @app.get("/feedback/summary") async def feedback_summary(_auth: str = Depends(require_auth)): if not feedback_log: return { "total": 0, "positive": 0, "negative": 0, "approval_rate": "N/A", "message": "No feedback recorded yet.", } positive = sum(1 for f in feedback_log if f["rating"] > 0) negative = len(feedback_log) - positive approval_rate = f"{(positive / len(feedback_log) * 100):.0f}%" return { "total": len(feedback_log), "positive": positive, "negative": negative, "approval_rate": approval_rate, } @app.get("/real-estate/log") async def real_estate_log(): """ Returns the in-memory real estate tool invocation log. Only available when ENABLE_REAL_ESTATE=true. Each entry: timestamp, function, query (truncated), duration_ms, success. """ from tools.real_estate import is_real_estate_enabled, get_invocation_log if not is_real_estate_enabled(): return JSONResponse( status_code=404, content={"error": "Real estate feature is not enabled."}, ) log = get_invocation_log() total = len(log) successes = sum(1 for e in log if e["success"]) return { "total_invocations": total, "success_count": successes, "failure_count": total - successes, "entries": log[-50:], # last 50 only } @app.get("/costs") async def costs(_auth: str = Depends(require_auth)): total = sum(c["estimated_cost_usd"] for c in cost_log) avg = total / max(len(cost_log), 1) return { "total_requests": len(cost_log), "estimated_cost_usd": round(total, 4), "avg_per_request": round(avg, 5), "cost_assumptions": { "model": "claude-sonnet-4-20250514", "input_tokens_per_request": 2000, "output_tokens_per_request": 500, "input_price_per_million": 3.0, "output_price_per_million": 15.0, }, } @app.get("/metrics") async def get_metrics(): """Observability metrics including recent errors.""" total_requests = len(cost_log) total_latency_ms = sum(c.get("latency_seconds", 0) * 1000 for c in cost_log[-100:]) return { "total_requests": total_requests, "avg_latency_ms": round( total_latency_ms / max(min(total_requests, 100), 1) ) if total_requests > 0 else 0, "recent_errors": error_log[-5:], "error_count": len(error_log), "last_updated": datetime.utcnow().isoformat() + "Z", }