You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

313 lines
13 KiB

import asyncio
import re
import httpx
import os
import time
from datetime import datetime
_UUID_RE = re.compile(
r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$",
re.IGNORECASE,
)
# In-memory price cache: {symbol: {"data": {...}, "expires_at": float}}
_price_cache: dict[str, dict] = {}
_CACHE_TTL_SECONDS = 1800
def _merge_holding(existing: dict, new: dict) -> None:
"""Add `new` holding's numeric fields into `existing` in-place."""
existing_qty = existing.get("quantity", 0)
new_qty = new.get("quantity", 0)
total_qty = existing_qty + new_qty
if total_qty > 0 and existing.get("averagePrice") and new.get("averagePrice"):
existing["averagePrice"] = (
(existing.get("averagePrice", 0) * existing_qty)
+ (new.get("averagePrice", 0) * new_qty)
) / total_qty
existing["quantity"] = total_qty
existing["investment"] = existing.get("investment", 0) + new.get("investment", 0)
existing["valueInBaseCurrency"] = (
existing.get("valueInBaseCurrency", 0) + new.get("valueInBaseCurrency", 0)
)
existing["grossPerformance"] = (
existing.get("grossPerformance", 0) + new.get("grossPerformance", 0)
)
existing["allocationInPercentage"] = (
existing.get("allocationInPercentage", 0) + new.get("allocationInPercentage", 0)
)
def consolidate_holdings(holdings: list) -> list:
"""
Merge holdings into one entry per real ticker symbol.
Ghostfolio uses UUID strings as `symbol` for MANUAL-datasource activities
(e.g. symbol='00fda606-...' name='AAPL') instead of the real ticker.
Strategy:
1. First pass: index real-ticker entries (non-UUID symbol) by symbol.
2. Second pass: for UUID-symbol entries, look up a matching real-ticker
entry by name and merge into it; if no match, use the name as symbol.
Also handles any remaining duplicate real-ticker rows by summing them.
"""
consolidated: dict[str, dict] = {}
# Pass 1 — real tickers (non-UUID symbols)
for h in holdings:
symbol = h.get("symbol", "")
if _UUID_RE.match(symbol):
continue
if symbol not in consolidated:
consolidated[symbol] = h.copy()
else:
_merge_holding(consolidated[symbol], h)
# Pass 2 — UUID-symbol entries: merge by matching name to a real ticker
for h in holdings:
symbol = h.get("symbol", "")
if not _UUID_RE.match(symbol):
continue
name = (h.get("name") or "").strip().upper()
# Try to find a real-ticker entry with the same name
matched_key = None
for key, existing in consolidated.items():
if (existing.get("name") or "").strip().upper() == name or key.upper() == name:
matched_key = key
break
if matched_key:
_merge_holding(consolidated[matched_key], h)
else:
# No matching real ticker — promote name as the symbol key
if name not in consolidated:
consolidated[name] = h.copy()
consolidated[name]["symbol"] = name
else:
_merge_holding(consolidated[name], h)
return list(consolidated.values())
# In-memory portfolio result cache with 60-second TTL.
# Keyed by token so each user gets their own cached result.
_portfolio_cache: dict[str, dict] = {}
_PORTFOLIO_CACHE_TTL = 60
# Cash/currency symbols — worth 1.0 in their own currency. Skip Yahoo Finance
# to avoid forex rates inflating values (e.g. 10M USD * 52.85 = 528M).
CASH_SYMBOLS = frozenset({
"USD", "EUR", "GBP", "JPY", "CHF",
"AUD", "CAD", "HKD", "SGD", "NZD",
"USDC", "USDT", "DAI",
})
async def _fetch_prices(client: httpx.AsyncClient, symbol: str) -> dict:
"""
Fetches current price and YTD start price (Jan 2, 2026) from Yahoo Finance.
Cash/currency symbols return 1.0 (no Yahoo call) to avoid forex rate inflation.
Caches results for _CACHE_TTL_SECONDS to avoid rate limiting during eval runs.
Returns dict with 'current' and 'ytd_start' prices (both may be None on failure).
"""
if symbol and symbol.upper() in CASH_SYMBOLS:
return {"current": 1.0, "ytd_start": 1.0}
cached = _price_cache.get(symbol)
if cached and cached["expires_at"] > time.time():
return cached["data"]
result = {"current": None, "ytd_start": None}
try:
resp = await client.get(
f"https://query1.finance.yahoo.com/v8/finance/chart/{symbol}",
params={"interval": "1d", "range": "1y"},
headers={"User-Agent": "Mozilla/5.0"},
timeout=8.0,
)
if resp.status_code != 200:
return result
data = resp.json()
chart_result = data.get("chart", {}).get("result", [{}])[0]
meta = chart_result.get("meta", {})
timestamps = chart_result.get("timestamp", [])
closes = chart_result.get("indicators", {}).get("quote", [{}])[0].get("close", [])
result["current"] = float(meta.get("regularMarketPrice") or meta.get("previousClose") or 0) or None
# Find the first trading day of 2026 (Jan 2, 2026 = 1735776000 unix)
ytd_start_ts = 1735776000 # Jan 2, 2026 00:00 UTC
ytd_price = None
for ts, close in zip(timestamps, closes):
if ts >= ytd_start_ts and close:
ytd_price = float(close)
break
result["ytd_start"] = ytd_price
except Exception:
pass
_price_cache[symbol] = {"data": result, "expires_at": time.time() + _CACHE_TTL_SECONDS}
return result
async def portfolio_analysis(date_range: str = "max", token: str = None) -> dict:
"""
Fetches portfolio holdings from Ghostfolio and computes real performance
by fetching current prices directly from Yahoo Finance.
Ghostfolio's own performance endpoint returns zeros locally due to
Yahoo Finance feed errors — this tool works around that.
Results are cached for 60 seconds per token to avoid redundant API calls
within multi-step conversations.
"""
base_url = os.getenv("GHOSTFOLIO_BASE_URL", "http://localhost:3333")
token = token or os.getenv("GHOSTFOLIO_BEARER_TOKEN", "")
tool_result_id = f"portfolio_{int(datetime.utcnow().timestamp())}"
# Return cached result if fresh enough
cache_key = token or "__default__"
cached = _portfolio_cache.get(cache_key)
if cached and (time.time() - cached["timestamp"]) < _PORTFOLIO_CACHE_TTL:
result = dict(cached["data"])
result["from_cache"] = True
result["tool_result_id"] = tool_result_id # fresh ID for citation tracking
return result
try:
async with httpx.AsyncClient(timeout=10.0) as client:
headers = {"Authorization": f"Bearer {token}"}
holdings_resp = await client.get(
f"{base_url}/api/v1/portfolio/holdings",
headers=headers,
)
holdings_resp.raise_for_status()
raw = holdings_resp.json()
# Holdings is a list directly
raw_list = raw if isinstance(raw, list) else raw.get("holdings", [])
# Merge duplicate symbol lots (e.g. 3 AAPL buys → 1 AAPL row)
holdings_list = consolidate_holdings(raw_list)
enriched_holdings = []
total_cost_basis = 0.0
total_current_value = 0.0
prices_fetched = 0
ytd_cost_basis = 0.0
ytd_current_value = 0.0
# Fetch all prices in parallel
symbols = [h.get("symbol", "") for h in holdings_list]
price_results = await asyncio.gather(
*[_fetch_prices(client, sym) for sym in symbols],
return_exceptions=True,
)
for h, prices_or_exc in zip(holdings_list, price_results):
symbol = h.get("symbol", "")
quantity = h.get("quantity", 0)
# `investment` = original money paid (cost basis); `valueInBaseCurrency` = current market value
cost_basis = h.get("investment") or h.get("valueInBaseCurrency", 0)
allocation_pct = round(h.get("allocationInPercentage", 0) * 100, 2)
prices = prices_or_exc if isinstance(prices_or_exc, dict) else {"current": None, "ytd_start": None}
current_price = prices["current"]
ytd_start_price = prices["ytd_start"]
if current_price is not None:
current_value = round(quantity * current_price, 2)
gain_usd = round(current_value - cost_basis, 2)
gain_pct = round((gain_usd / cost_basis * 100), 2) if cost_basis > 0 else 0.0
prices_fetched += 1
else:
current_value = cost_basis
gain_usd = 0.0
gain_pct = 0.0
# YTD: compare Jan 2 2026 value to today
if ytd_start_price and current_price:
ytd_start_value = round(quantity * ytd_start_price, 2)
ytd_gain_usd = round(current_value - ytd_start_value, 2)
ytd_gain_pct = round(ytd_gain_usd / ytd_start_value * 100, 2) if ytd_start_value else 0.0
ytd_cost_basis += ytd_start_value
ytd_current_value += current_value
else:
ytd_gain_usd = None
ytd_gain_pct = None
total_cost_basis += cost_basis
total_current_value += current_value
enriched_holdings.append({
"symbol": symbol,
"name": h.get("name", symbol),
"quantity": quantity,
"cost_basis_usd": cost_basis,
"current_price_usd": current_price,
"ytd_start_price_usd": ytd_start_price,
"current_value_usd": current_value,
"gain_usd": gain_usd,
"gain_pct": gain_pct,
"ytd_gain_usd": ytd_gain_usd,
"ytd_gain_pct": ytd_gain_pct,
"allocation_pct": allocation_pct,
"currency": h.get("currency", "USD"),
"asset_class": h.get("assetClass", ""),
})
total_gain_usd = round(total_current_value - total_cost_basis, 2)
total_gain_pct = (
round(total_gain_usd / total_cost_basis * 100, 2)
if total_cost_basis > 0 else 0.0
)
ytd_total_gain_usd = round(ytd_current_value - ytd_cost_basis, 2) if ytd_cost_basis else None
ytd_total_gain_pct = (
round(ytd_total_gain_usd / ytd_cost_basis * 100, 2)
if ytd_cost_basis and ytd_total_gain_usd is not None else None
)
# Sort holdings by current value descending
enriched_holdings.sort(key=lambda x: x["current_value_usd"], reverse=True)
result = {
"tool_name": "portfolio_analysis",
"success": True,
"tool_result_id": tool_result_id,
"timestamp": datetime.utcnow().isoformat(),
"endpoint": "/api/v1/portfolio/holdings + Yahoo Finance (live prices)",
"result": {
"summary": {
"total_cost_basis_usd": round(total_cost_basis, 2),
"total_current_value_usd": round(total_current_value, 2),
"total_gain_usd": total_gain_usd,
"total_gain_pct": total_gain_pct,
"ytd_gain_usd": ytd_total_gain_usd,
"ytd_gain_pct": ytd_total_gain_pct,
"holdings_count": len(enriched_holdings),
"live_prices_fetched": prices_fetched,
"date_range": date_range,
"note": (
"Performance uses live Yahoo Finance prices. "
"YTD = Jan 2 2026 to today. "
"Total return = purchase date to today."
),
},
"holdings": enriched_holdings,
},
}
_portfolio_cache[cache_key] = {"data": result, "timestamp": time.time()}
return result
except httpx.TimeoutException:
return {
"tool_name": "portfolio_analysis",
"success": False,
"tool_result_id": tool_result_id,
"error": "TIMEOUT",
"message": "Portfolio API timed out. Try again shortly.",
}
except Exception as e:
return {
"tool_name": "portfolio_analysis",
"success": False,
"tool_result_id": tool_result_id,
"error": "API_ERROR",
"message": f"Failed to fetch portfolio data: {str(e)}",
}