mirror of https://github.com/ghostfolio/ghostfolio
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
774 lines
29 KiB
774 lines
29 KiB
import json
|
|
import time
|
|
import os
|
|
import uuid
|
|
from datetime import datetime, timedelta
|
|
|
|
from fastapi import FastAPI, Response, Depends, HTTPException, status
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import RedirectResponse, StreamingResponse, HTMLResponse, JSONResponse
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from pydantic import BaseModel
|
|
from dotenv import load_dotenv
|
|
import httpx
|
|
from langchain_core.messages import HumanMessage, AIMessage
|
|
from jose import JWTError, jwt
|
|
|
|
load_dotenv()
|
|
# Load agent/.env so ANTHROPIC_API_KEY, GHOSTFOLIO_BEARER_TOKEN, etc. are available
|
|
load_dotenv(os.path.join(os.path.dirname(__file__), "agent", ".env"))
|
|
|
|
from graph import build_graph
|
|
from state import AgentState
|
|
|
|
# ── Auth configuration ──
|
|
# The agent issues its own short-lived JWT whose `sub` is the user's
|
|
# Ghostfolio bearer token. This way we never store credentials server-side;
|
|
# Ghostfolio is the identity provider.
|
|
_JWT_ALGORITHM = "HS256"
|
|
_JWT_EXPIRE_HOURS = 24
|
|
_http_bearer = HTTPBearer(auto_error=False)
|
|
|
|
|
|
def _get_jwt_secret() -> str:
|
|
secret = os.getenv("JWT_SECRET_KEY", "")
|
|
if not secret:
|
|
raise RuntimeError("JWT_SECRET_KEY env var is required")
|
|
return secret
|
|
|
|
|
|
def _create_access_token(subject: str) -> str:
|
|
expire = datetime.utcnow() + timedelta(hours=_JWT_EXPIRE_HOURS)
|
|
payload = {"sub": subject, "exp": expire}
|
|
return jwt.encode(payload, _get_jwt_secret(), algorithm=_JWT_ALGORITHM)
|
|
|
|
|
|
def _verify_jwt(token: str) -> str:
|
|
try:
|
|
payload = jwt.decode(token, _get_jwt_secret(), algorithms=[_JWT_ALGORITHM])
|
|
sub: str = payload.get("sub", "")
|
|
if not sub:
|
|
raise ValueError("missing sub")
|
|
return sub
|
|
except JWTError as exc:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid or expired token",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
) from exc
|
|
|
|
|
|
def require_auth(credentials: HTTPAuthorizationCredentials = Depends(_http_bearer)) -> str:
|
|
if credentials is None or not credentials.credentials:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Authentication required",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
return _verify_jwt(credentials.credentials)
|
|
|
|
|
|
app = FastAPI(
|
|
title="Ghostfolio AI Agent",
|
|
description="LangGraph-powered portfolio analysis agent on top of Ghostfolio",
|
|
version="1.0.0",
|
|
)
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
graph = build_graph()
|
|
|
|
feedback_log: list[dict] = []
|
|
cost_log: list[dict] = []
|
|
|
|
COST_PER_REQUEST_USD = (2000 * 0.000003) + (500 * 0.000015)
|
|
|
|
|
|
class ChatRequest(BaseModel):
|
|
query: str
|
|
history: list[dict] = []
|
|
# Clients must echo back pending_write from the previous response when
|
|
# the user is confirming (or cancelling) a write operation.
|
|
pending_write: dict | None = None
|
|
# Optional: the logged-in user's Ghostfolio bearer token.
|
|
# When provided, the agent uses THIS token for all API calls so it operates
|
|
# on the caller's own portfolio data instead of the shared env-var token.
|
|
bearer_token: str | None = None
|
|
|
|
|
|
class FeedbackRequest(BaseModel):
|
|
query: str
|
|
response: str
|
|
rating: int
|
|
comment: str = ""
|
|
|
|
|
|
@app.post("/chat")
|
|
async def chat(req: ChatRequest, gf_token: str = Depends(require_auth)):
|
|
start_time = time.time()
|
|
trace_id = str(uuid.uuid4())
|
|
|
|
# Build conversation history preserving both user AND assistant turns so
|
|
# Claude has full context for follow-up questions.
|
|
history_messages = []
|
|
for m in req.history:
|
|
role = m.get("role", "")
|
|
content = m.get("content", "")
|
|
if role == "user":
|
|
history_messages.append(HumanMessage(content=content))
|
|
elif role == "assistant":
|
|
history_messages.append(AIMessage(content=content))
|
|
|
|
initial_state: AgentState = {
|
|
"user_query": req.query,
|
|
"messages": history_messages,
|
|
"query_type": "",
|
|
"portfolio_snapshot": {},
|
|
"tool_results": [],
|
|
"pending_verifications": [],
|
|
"confidence_score": 1.0,
|
|
"verification_outcome": "pass",
|
|
"awaiting_confirmation": False,
|
|
"confirmation_payload": None,
|
|
# Carry forward any pending write payload the client echoed back
|
|
"pending_write": req.pending_write,
|
|
"bearer_token": gf_token,
|
|
"confirmation_message": None,
|
|
"missing_fields": [],
|
|
"final_response": None,
|
|
"citations": [],
|
|
"error": None,
|
|
}
|
|
|
|
result = await graph.ainvoke(initial_state)
|
|
|
|
elapsed = round(time.time() - start_time, 2)
|
|
latency_ms = int((time.time() - start_time) * 1000)
|
|
|
|
cost_log.append({
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"query": req.query[:80],
|
|
"estimated_cost_usd": round(COST_PER_REQUEST_USD, 5),
|
|
"latency_seconds": elapsed,
|
|
})
|
|
|
|
tools_used = [r["tool_name"] for r in result.get("tool_results", [])]
|
|
|
|
# Extract structured comparison card when compare_neighborhoods ran
|
|
comparison_card = None
|
|
for r in result.get("tool_results", []):
|
|
if (
|
|
r.get("tool_name") == "real_estate"
|
|
and r.get("success")
|
|
and isinstance(r.get("result"), dict)
|
|
and "location_a" in r["result"]
|
|
):
|
|
res = r["result"]
|
|
m = res["metrics"]
|
|
# Count advantages per city to form a verdict
|
|
advantages: dict[str, int] = {res["location_a"]: 0, res["location_b"]: 0}
|
|
for metric_data in m.values():
|
|
if isinstance(metric_data, dict):
|
|
for winner_key in ("more_affordable", "higher_yield", "more_walkable"):
|
|
winner_city = metric_data.get(winner_key)
|
|
if winner_city in advantages:
|
|
advantages[winner_city] += 1
|
|
winner = max(advantages, key=lambda c: advantages[c])
|
|
loser = [c for c in advantages if c != winner][0]
|
|
verdict = (
|
|
f"{winner} leads on affordability & yield "
|
|
f"({advantages[winner]} vs {advantages[loser]} metrics)."
|
|
)
|
|
comparison_card = {
|
|
"city_a": {
|
|
"name": res["location_a"],
|
|
"median_price": m["median_price"]["a"],
|
|
"price_per_sqft": m["price_per_sqft"]["a"],
|
|
"days_on_market": m["days_on_market"]["a"],
|
|
"walk_score": m["walk_score"]["a"],
|
|
"yoy_change": m["yoy_price_change_pct"]["a"],
|
|
"inventory": m["inventory"]["a"],
|
|
},
|
|
"city_b": {
|
|
"name": res["location_b"],
|
|
"median_price": m["median_price"]["b"],
|
|
"price_per_sqft": m["price_per_sqft"]["b"],
|
|
"days_on_market": m["days_on_market"]["b"],
|
|
"walk_score": m["walk_score"]["b"],
|
|
"yoy_change": m["yoy_price_change_pct"]["b"],
|
|
"inventory": m["inventory"]["b"],
|
|
},
|
|
"winners": {
|
|
"median_price": m["median_price"].get("more_affordable"),
|
|
"price_per_sqft": m["price_per_sqft"].get("more_affordable"),
|
|
"days_on_market": m["days_on_market"].get("less_competitive"),
|
|
"walk_score": m["walk_score"].get("more_walkable"),
|
|
},
|
|
"verdict": verdict,
|
|
}
|
|
break
|
|
|
|
# Extract portfolio allocation chart data when portfolio_analysis ran
|
|
chart_data = None
|
|
for r in result.get("tool_results", []):
|
|
if (
|
|
r.get("tool_name") == "portfolio_analysis"
|
|
and r.get("success")
|
|
and isinstance(r.get("result"), dict)
|
|
):
|
|
holdings = r["result"].get("holdings", [])
|
|
if holdings:
|
|
# Use top 6 holdings by allocation; group the rest as "Other"
|
|
sorted_h = sorted(holdings, key=lambda h: h.get("allocation_pct", 0), reverse=True)
|
|
top = sorted_h[:6]
|
|
other_alloc = sum(h.get("allocation_pct", 0) for h in sorted_h[6:])
|
|
labels = [h.get("symbol", "?") for h in top]
|
|
values = [round(h.get("allocation_pct", 0), 1) for h in top]
|
|
if other_alloc > 0.1:
|
|
labels.append("Other")
|
|
values.append(round(other_alloc, 1))
|
|
chart_data = {
|
|
"type": "allocation_pie",
|
|
"labels": labels,
|
|
"values": values,
|
|
}
|
|
break
|
|
|
|
confidence = result.get("confidence_score", 0.0)
|
|
|
|
return {
|
|
"response": result.get("final_response", "No response generated."),
|
|
"confidence_score": confidence,
|
|
"confidence": confidence,
|
|
"verification_outcome": result.get("verification_outcome", "unknown"),
|
|
"verified": confidence >= 0.80,
|
|
"awaiting_confirmation": result.get("awaiting_confirmation", False),
|
|
"pending_write": result.get("pending_write"),
|
|
"tools_used": tools_used,
|
|
"tool": tools_used[0] if tools_used else None,
|
|
"citations": result.get("citations", []),
|
|
"latency_seconds": elapsed,
|
|
"latency_ms": latency_ms,
|
|
"tokens": {
|
|
"estimated_input": 1200,
|
|
"estimated_output": 400,
|
|
"estimated_total": 1600,
|
|
"estimated_cost_usd": round(
|
|
(1200 * 0.000003) + (400 * 0.000015), 4
|
|
),
|
|
},
|
|
"trace_id": trace_id,
|
|
"comparison_card": comparison_card,
|
|
"chart_data": chart_data,
|
|
}
|
|
|
|
|
|
@app.post("/chat/stream")
|
|
async def chat_stream(req: ChatRequest, gf_token: str = Depends(require_auth)):
|
|
"""
|
|
Streaming variant of /chat — returns SSE (text/event-stream).
|
|
Runs the full graph, then streams the final response word by word so
|
|
the user sees output immediately rather than waiting for the full response.
|
|
"""
|
|
|
|
history_messages = []
|
|
for m in req.history:
|
|
role = m.get("role", "")
|
|
content = m.get("content", "")
|
|
if role == "user":
|
|
history_messages.append(HumanMessage(content=content))
|
|
elif role == "assistant":
|
|
history_messages.append(AIMessage(content=content))
|
|
|
|
initial_state: AgentState = {
|
|
"user_query": req.query,
|
|
"messages": history_messages,
|
|
"query_type": "",
|
|
"portfolio_snapshot": {},
|
|
"tool_results": [],
|
|
"pending_verifications": [],
|
|
"confidence_score": 1.0,
|
|
"verification_outcome": "pass",
|
|
"awaiting_confirmation": False,
|
|
"confirmation_payload": None,
|
|
"pending_write": req.pending_write,
|
|
"bearer_token": gf_token,
|
|
"confirmation_message": None,
|
|
"missing_fields": [],
|
|
"final_response": None,
|
|
"citations": [],
|
|
"error": None,
|
|
}
|
|
|
|
async def generate():
|
|
result = await graph.ainvoke(initial_state)
|
|
response_text = result.get("final_response", "No response generated.")
|
|
tools_used = [r["tool_name"] for r in result.get("tool_results", [])]
|
|
|
|
# Stream metadata first
|
|
meta = {
|
|
"type": "meta",
|
|
"confidence_score": result.get("confidence_score", 0.0),
|
|
"verification_outcome": result.get("verification_outcome", "unknown"),
|
|
"awaiting_confirmation": result.get("awaiting_confirmation", False),
|
|
"tools_used": tools_used,
|
|
"citations": result.get("citations", []),
|
|
}
|
|
yield f"data: {json.dumps(meta)}\n\n"
|
|
|
|
# Stream response word by word
|
|
words = response_text.split(" ")
|
|
for i, word in enumerate(words):
|
|
chunk = {"type": "token", "token": word + " ", "done": i == len(words) - 1}
|
|
yield f"data: {json.dumps(chunk)}\n\n"
|
|
|
|
return StreamingResponse(generate(), media_type="text/event-stream")
|
|
|
|
|
|
class SeedRequest(BaseModel):
|
|
bearer_token: str | None = None
|
|
|
|
|
|
@app.post("/seed")
|
|
async def seed_demo_portfolio(req: SeedRequest):
|
|
"""
|
|
Populate the caller's Ghostfolio account with a realistic demo portfolio
|
|
(18 transactions across AAPL, MSFT, NVDA, GOOGL, AMZN, VTI).
|
|
|
|
Called automatically by the Angular chat when a logged-in user has an
|
|
empty portfolio, so first-time Google OAuth users see real data
|
|
immediately after signing in.
|
|
"""
|
|
base_url = os.getenv("GHOSTFOLIO_BASE_URL", "http://localhost:3333")
|
|
token = req.bearer_token or os.getenv("GHOSTFOLIO_BEARER_TOKEN", "")
|
|
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
|
|
|
|
DEMO_ACTIVITIES = [
|
|
{"type": "BUY", "symbol": "AAPL", "quantity": 10, "unitPrice": 134.18, "date": "2021-03-15"},
|
|
{"type": "BUY", "symbol": "AAPL", "quantity": 5, "unitPrice": 148.56, "date": "2021-09-10"},
|
|
{"type": "DIVIDEND", "symbol": "AAPL", "quantity": 1, "unitPrice": 3.44, "date": "2022-02-04"},
|
|
{"type": "SELL", "symbol": "AAPL", "quantity": 5, "unitPrice": 183.12, "date": "2023-06-20"},
|
|
{"type": "DIVIDEND", "symbol": "AAPL", "quantity": 1, "unitPrice": 3.66, "date": "2023-08-04"},
|
|
{"type": "BUY", "symbol": "MSFT", "quantity": 8, "unitPrice": 242.15, "date": "2021-05-20"},
|
|
{"type": "BUY", "symbol": "MSFT", "quantity": 4, "unitPrice": 299.35, "date": "2022-01-18"},
|
|
{"type": "DIVIDEND", "symbol": "MSFT", "quantity": 1, "unitPrice": 9.68, "date": "2022-06-09"},
|
|
{"type": "DIVIDEND", "symbol": "MSFT", "quantity": 1, "unitPrice": 10.40, "date": "2023-06-08"},
|
|
{"type": "BUY", "symbol": "NVDA", "quantity": 6, "unitPrice": 143.25, "date": "2021-11-05"},
|
|
{"type": "BUY", "symbol": "NVDA", "quantity": 4, "unitPrice": 166.88, "date": "2022-07-12"},
|
|
{"type": "BUY", "symbol": "GOOGL", "quantity": 3, "unitPrice": 2718.96,"date": "2021-08-03"},
|
|
{"type": "BUY", "symbol": "GOOGL", "quantity": 5, "unitPrice": 102.30, "date": "2022-08-15"},
|
|
{"type": "BUY", "symbol": "AMZN", "quantity": 4, "unitPrice": 168.54, "date": "2023-02-08"},
|
|
{"type": "BUY", "symbol": "VTI", "quantity": 15, "unitPrice": 207.38, "date": "2021-04-06"},
|
|
{"type": "BUY", "symbol": "VTI", "quantity": 10, "unitPrice": 183.52, "date": "2022-10-14"},
|
|
{"type": "DIVIDEND", "symbol": "VTI", "quantity": 1, "unitPrice": 10.28, "date": "2022-12-27"},
|
|
{"type": "DIVIDEND", "symbol": "VTI", "quantity": 1, "unitPrice": 11.42, "date": "2023-12-27"},
|
|
]
|
|
|
|
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
# Create a brokerage account for this user
|
|
acct_resp = await client.post(
|
|
f"{base_url}/api/v1/account",
|
|
headers=headers,
|
|
json={"balance": 0, "currency": "USD", "isExcluded": False, "name": "Demo Portfolio", "platformId": None},
|
|
)
|
|
if acct_resp.status_code not in (200, 201):
|
|
return {"success": False, "error": f"Could not create account: {acct_resp.text}"}
|
|
|
|
account_id = acct_resp.json().get("id")
|
|
|
|
# Try YAHOO data source first (gives live prices in the UI).
|
|
# Fall back to MANUAL per-activity if YAHOO validation fails.
|
|
imported = 0
|
|
for a in DEMO_ACTIVITIES:
|
|
for data_source in ("YAHOO", "MANUAL"):
|
|
activity_payload = {
|
|
"accountId": account_id,
|
|
"currency": "USD",
|
|
"dataSource": data_source,
|
|
"date": f"{a['date']}T00:00:00.000Z",
|
|
"fee": 0,
|
|
"quantity": a["quantity"],
|
|
"symbol": a["symbol"],
|
|
"type": a["type"],
|
|
"unitPrice": a["unitPrice"],
|
|
}
|
|
resp = await client.post(
|
|
f"{base_url}/api/v1/import",
|
|
headers=headers,
|
|
json={"activities": [activity_payload]},
|
|
)
|
|
if resp.status_code in (200, 201):
|
|
imported += 1
|
|
break # success — no need to try MANUAL fallback
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Demo portfolio seeded with {imported} activities across AAPL, MSFT, NVDA, GOOGL, AMZN, VTI.",
|
|
"account_id": account_id,
|
|
"activities_imported": imported,
|
|
}
|
|
|
|
|
|
class LoginRequest(BaseModel):
|
|
email: str
|
|
password: str
|
|
|
|
|
|
@app.post("/auth/login")
|
|
async def auth_login(req: LoginRequest):
|
|
"""
|
|
Simple email/password auth for the agent.
|
|
Credentials are validated against ADMIN_USERNAME / ADMIN_PASSWORD env vars,
|
|
falling back to the built-in demo credentials (test@example.com / password).
|
|
All authenticated users share the GHOSTFOLIO_BEARER_TOKEN from the environment.
|
|
"""
|
|
admin_email = os.getenv("ADMIN_USERNAME", "test@example.com").strip().lower()
|
|
admin_password = os.getenv("ADMIN_PASSWORD", "password")
|
|
|
|
if req.email.strip().lower() != admin_email or req.password != admin_password:
|
|
return JSONResponse(
|
|
status_code=401,
|
|
content={"success": False, "message": "Invalid email or password."},
|
|
)
|
|
|
|
gf_token = os.getenv("GHOSTFOLIO_BEARER_TOKEN", "")
|
|
session_token = _create_access_token(subject=gf_token or "demo")
|
|
|
|
# Try to get a display name from Ghostfolio if a token is configured
|
|
display_name = admin_email.split("@")[0]
|
|
if gf_token:
|
|
base_url = os.getenv("GHOSTFOLIO_BASE_URL", "http://localhost:3333")
|
|
try:
|
|
async with httpx.AsyncClient(timeout=4.0) as client:
|
|
r = await client.get(
|
|
f"{base_url}/api/v1/user",
|
|
headers={"Authorization": f"Bearer {gf_token}"},
|
|
)
|
|
if r.status_code == 200:
|
|
data = r.json()
|
|
alias = data.get("settings", {}).get("alias") or ""
|
|
display_name = alias or display_name
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
"success": True,
|
|
"token": session_token,
|
|
"name": display_name,
|
|
"email": req.email.strip().lower(),
|
|
}
|
|
|
|
|
|
@app.get("/auth/auto")
|
|
async def auth_auto():
|
|
"""
|
|
No-login auth: returns the configured token and user info without credentials.
|
|
Enables the app to work without a login page.
|
|
"""
|
|
token = os.getenv("GHOSTFOLIO_BEARER_TOKEN", "")
|
|
base_url = os.getenv("GHOSTFOLIO_BASE_URL", "http://localhost:3333")
|
|
display_name = "Investor"
|
|
try:
|
|
async with httpx.AsyncClient(timeout=4.0) as client:
|
|
r = await client.get(
|
|
f"{base_url}/api/v1/user",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
if r.status_code == 200:
|
|
data = r.json()
|
|
alias = data.get("settings", {}).get("alias") or ""
|
|
display_name = alias or "Investor"
|
|
except Exception:
|
|
pass
|
|
return {"success": True, "token": token, "name": display_name}
|
|
|
|
|
|
@app.get("/login", include_in_schema=False)
|
|
async def login_page():
|
|
"""Redirect to chat — login is bypassed."""
|
|
return RedirectResponse(url="/", status_code=302)
|
|
|
|
|
|
@app.get("/me")
|
|
async def get_me():
|
|
"""Returns the Ghostfolio user profile for the configured bearer token."""
|
|
base_url = os.getenv("GHOSTFOLIO_BASE_URL", "http://localhost:3333")
|
|
token = os.getenv("GHOSTFOLIO_BEARER_TOKEN", "")
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=5.0) as client:
|
|
resp = await client.get(
|
|
f"{base_url}/api/v1/user",
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
)
|
|
if resp.status_code == 200:
|
|
data = resp.json()
|
|
alias = data.get("settings", {}).get("alias") or data.get("alias") or ""
|
|
email = data.get("email", "")
|
|
display = alias or (email.split("@")[0] if email else "")
|
|
return {
|
|
"success": True,
|
|
"id": data.get("id", ""),
|
|
"name": display or "Investor",
|
|
"email": email,
|
|
}
|
|
except Exception:
|
|
pass
|
|
|
|
# Fallback: decode JWT locally (no network)
|
|
try:
|
|
import base64 as _b64
|
|
padded = token.split(".")[1] + "=="
|
|
payload = json.loads(_b64.b64decode(padded).decode())
|
|
uid = payload.get("id", "")
|
|
initials = uid[:2].upper() if uid else "IN"
|
|
return {"success": True, "id": uid, "name": "Investor", "initials": initials, "email": ""}
|
|
except Exception:
|
|
pass
|
|
|
|
return {"success": False, "name": "Investor", "id": "", "email": ""}
|
|
|
|
|
|
# Node labels shown in the live thinking display
|
|
_NODE_LABELS = {
|
|
"classify": "Analyzing your question",
|
|
"tools": "Fetching portfolio data",
|
|
"write_prepare": "Preparing transaction",
|
|
"write_execute": "Recording transaction",
|
|
"verify": "Verifying data accuracy",
|
|
"format": "Composing response",
|
|
}
|
|
_OUR_NODES = set(_NODE_LABELS.keys())
|
|
|
|
|
|
@app.post("/chat/steps")
|
|
async def chat_steps(req: ChatRequest, gf_token: str = Depends(require_auth)):
|
|
"""
|
|
SSE endpoint that streams LangGraph node events in real time.
|
|
Clients receive step events as each graph node starts/ends,
|
|
then a meta event with final metadata, then token events for the response.
|
|
"""
|
|
start = time.time()
|
|
|
|
history_messages = []
|
|
for m in req.history:
|
|
role = m.get("role", "")
|
|
content = m.get("content", "")
|
|
if role == "user":
|
|
history_messages.append(HumanMessage(content=content))
|
|
elif role == "assistant":
|
|
history_messages.append(AIMessage(content=content))
|
|
|
|
initial_state: AgentState = {
|
|
"user_query": req.query,
|
|
"messages": history_messages,
|
|
"query_type": "",
|
|
"portfolio_snapshot": {},
|
|
"tool_results": [],
|
|
"pending_verifications": [],
|
|
"confidence_score": 1.0,
|
|
"verification_outcome": "pass",
|
|
"awaiting_confirmation": False,
|
|
"confirmation_payload": None,
|
|
"pending_write": req.pending_write,
|
|
"bearer_token": gf_token,
|
|
"confirmation_message": None,
|
|
"missing_fields": [],
|
|
"final_response": None,
|
|
"citations": [],
|
|
"error": None,
|
|
}
|
|
|
|
async def generate():
|
|
seen_nodes = set()
|
|
|
|
try:
|
|
async for event in graph.astream_events(initial_state, version="v2"):
|
|
etype = event.get("event", "")
|
|
ename = event.get("name", "")
|
|
|
|
if ename in _OUR_NODES:
|
|
if etype == "on_chain_start" and ename not in seen_nodes:
|
|
seen_nodes.add(ename)
|
|
payload = {
|
|
"type": "step",
|
|
"node": ename,
|
|
"label": _NODE_LABELS[ename],
|
|
"status": "running",
|
|
}
|
|
yield f"data: {json.dumps(payload)}\n\n"
|
|
|
|
elif etype == "on_chain_end":
|
|
output = event.get("data", {}).get("output", {})
|
|
step_payload: dict = {
|
|
"type": "step",
|
|
"node": ename,
|
|
"label": _NODE_LABELS[ename],
|
|
"status": "done",
|
|
}
|
|
if ename == "tools":
|
|
results = output.get("tool_results", [])
|
|
step_payload["tools"] = [r["tool_name"] for r in results]
|
|
if ename == "verify":
|
|
step_payload["confidence"] = output.get("confidence_score", 1.0)
|
|
step_payload["outcome"] = output.get("verification_outcome", "pass")
|
|
yield f"data: {json.dumps(step_payload)}\n\n"
|
|
|
|
elif ename == "LangGraph" and etype == "on_chain_end":
|
|
output = event.get("data", {}).get("output", {})
|
|
response_text = output.get("final_response", "No response generated.")
|
|
tool_results = output.get("tool_results", [])
|
|
elapsed = round(time.time() - start, 2)
|
|
|
|
cost_log.append({
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"query": req.query[:80],
|
|
"estimated_cost_usd": round(COST_PER_REQUEST_USD, 5),
|
|
"latency_seconds": elapsed,
|
|
})
|
|
|
|
meta = {
|
|
"type": "meta",
|
|
"confidence_score": output.get("confidence_score", 0.0),
|
|
"verification_outcome": output.get("verification_outcome", "unknown"),
|
|
"awaiting_confirmation": output.get("awaiting_confirmation", False),
|
|
"pending_write": output.get("pending_write"),
|
|
"tools_used": [r["tool_name"] for r in tool_results],
|
|
"citations": output.get("citations", []),
|
|
"latency_seconds": elapsed,
|
|
}
|
|
yield f"data: {json.dumps(meta)}\n\n"
|
|
|
|
words = response_text.split(" ")
|
|
for i, word in enumerate(words):
|
|
chunk = {
|
|
"type": "token",
|
|
"token": word + (" " if i < len(words) - 1 else ""),
|
|
"done": i == len(words) - 1,
|
|
}
|
|
yield f"data: {json.dumps(chunk)}\n\n"
|
|
|
|
yield f"data: {json.dumps({'type': 'done'})}\n\n"
|
|
|
|
except Exception as exc:
|
|
err_payload = {
|
|
"type": "error",
|
|
"message": f"Agent error: {str(exc)}",
|
|
}
|
|
yield f"data: {json.dumps(err_payload)}\n\n"
|
|
|
|
return StreamingResponse(generate(), media_type="text/event-stream")
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse, include_in_schema=False)
|
|
async def chat_ui():
|
|
with open(os.path.join(os.path.dirname(__file__), "chat_ui.html")) as f:
|
|
return f.read()
|
|
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
ghostfolio_ok = False
|
|
base_url = os.getenv("GHOSTFOLIO_BASE_URL", "http://localhost:3333")
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=3.0) as client:
|
|
resp = await client.get(f"{base_url}/api/v1/health")
|
|
ghostfolio_ok = resp.status_code == 200
|
|
except Exception:
|
|
ghostfolio_ok = False
|
|
|
|
return {
|
|
"status": "ok",
|
|
"ghostfolio_reachable": ghostfolio_ok,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"version": "2.1.0-complete-showcase",
|
|
"features": ["relocation_runway", "wealth_gap", "life_decision", "equity_unlock", "family_planner"],
|
|
}
|
|
|
|
|
|
@app.post("/feedback")
|
|
async def feedback(req: FeedbackRequest, _auth: str = Depends(require_auth)):
|
|
entry = {
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"query": req.query,
|
|
"response": req.response[:200],
|
|
"rating": req.rating,
|
|
"comment": req.comment,
|
|
}
|
|
feedback_log.append(entry)
|
|
return {"status": "recorded", "total_feedback": len(feedback_log)}
|
|
|
|
|
|
@app.get("/feedback/summary")
|
|
async def feedback_summary(_auth: str = Depends(require_auth)):
|
|
if not feedback_log:
|
|
return {
|
|
"total": 0,
|
|
"positive": 0,
|
|
"negative": 0,
|
|
"approval_rate": "N/A",
|
|
"message": "No feedback recorded yet.",
|
|
}
|
|
|
|
positive = sum(1 for f in feedback_log if f["rating"] > 0)
|
|
negative = len(feedback_log) - positive
|
|
approval_rate = f"{(positive / len(feedback_log) * 100):.0f}%"
|
|
|
|
return {
|
|
"total": len(feedback_log),
|
|
"positive": positive,
|
|
"negative": negative,
|
|
"approval_rate": approval_rate,
|
|
}
|
|
|
|
|
|
@app.get("/real-estate/log")
|
|
async def real_estate_log():
|
|
"""
|
|
Returns the in-memory real estate tool invocation log.
|
|
Only available when ENABLE_REAL_ESTATE=true.
|
|
Each entry: timestamp, function, query (truncated), duration_ms, success.
|
|
"""
|
|
from tools.real_estate import is_real_estate_enabled, get_invocation_log
|
|
|
|
if not is_real_estate_enabled():
|
|
return JSONResponse(
|
|
status_code=404,
|
|
content={"error": "Real estate feature is not enabled."},
|
|
)
|
|
|
|
log = get_invocation_log()
|
|
total = len(log)
|
|
successes = sum(1 for e in log if e["success"])
|
|
return {
|
|
"total_invocations": total,
|
|
"success_count": successes,
|
|
"failure_count": total - successes,
|
|
"entries": log[-50:], # last 50 only
|
|
}
|
|
|
|
|
|
@app.get("/costs")
|
|
async def costs(_auth: str = Depends(require_auth)):
|
|
total = sum(c["estimated_cost_usd"] for c in cost_log)
|
|
avg = total / max(len(cost_log), 1)
|
|
|
|
return {
|
|
"total_requests": len(cost_log),
|
|
"estimated_cost_usd": round(total, 4),
|
|
"avg_per_request": round(avg, 5),
|
|
"cost_assumptions": {
|
|
"model": "claude-sonnet-4-20250514",
|
|
"input_tokens_per_request": 2000,
|
|
"output_tokens_per_request": 500,
|
|
"input_price_per_million": 3.0,
|
|
"output_price_per_million": 15.0,
|
|
},
|
|
}
|
|
|