mirror of https://github.com/ghostfolio/ghostfolio
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
641 lines
22 KiB
641 lines
22 KiB
"""
|
|
Property Tracker Tool — AgentForge integration
|
|
===============================================
|
|
Feature flag: set ENABLE_REAL_ESTATE=true in .env to activate.
|
|
(Shares the same flag as the real estate market data tool.)
|
|
|
|
Allows users to track real estate properties they own alongside
|
|
their financial portfolio. Equity is computed as:
|
|
equity = current_value - mortgage_balance
|
|
|
|
Seven capabilities:
|
|
1. add_property(...) — record a property you own
|
|
2. get_properties() — show all active properties with equity
|
|
3. list_properties() — alias for get_properties()
|
|
4. update_property(...) — update current value, mortgage, or rent
|
|
5. remove_property(id) — soft-delete (set is_active = 0)
|
|
6. get_real_estate_equity() — total equity across all properties
|
|
7. get_total_net_worth(...) — portfolio + real estate combined
|
|
|
|
Storage: SQLite at agent/data/properties.db
|
|
(override path with PROPERTIES_DB_PATH env var — used in tests for :memory:)
|
|
|
|
All functions return the standard tool result envelope:
|
|
{tool_name, success, tool_result_id, timestamp, result} — on success
|
|
{tool_name, success, tool_result_id, error: {code, message}} — on failure
|
|
"""
|
|
|
|
import os
|
|
import sqlite3
|
|
import uuid
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Feature flag (shared with real_estate.py)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def is_property_tracking_enabled() -> bool:
|
|
"""Returns True only when ENABLE_REAL_ESTATE=true in environment."""
|
|
return os.getenv("ENABLE_REAL_ESTATE", "false").strip().lower() == "true"
|
|
|
|
|
|
_FEATURE_DISABLED_RESPONSE = {
|
|
"tool_name": "property_tracker",
|
|
"success": False,
|
|
"tool_result_id": "property_tracker_disabled",
|
|
"error": {
|
|
"code": "PROPERTY_TRACKER_FEATURE_DISABLED",
|
|
"message": (
|
|
"The Property Tracker feature is not currently enabled. "
|
|
"Set ENABLE_REAL_ESTATE=true in your environment to activate it."
|
|
),
|
|
},
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SQLite connection helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_SCHEMA_SQL = """
|
|
CREATE TABLE IF NOT EXISTS properties (
|
|
id TEXT PRIMARY KEY,
|
|
address TEXT NOT NULL,
|
|
property_type TEXT DEFAULT 'Single Family',
|
|
purchase_price REAL NOT NULL,
|
|
purchase_date TEXT,
|
|
current_value REAL NOT NULL,
|
|
mortgage_balance REAL DEFAULT 0,
|
|
monthly_rent REAL DEFAULT 0,
|
|
county_key TEXT DEFAULT 'austin',
|
|
is_active INTEGER DEFAULT 1,
|
|
created_at TEXT,
|
|
updated_at TEXT
|
|
)
|
|
"""
|
|
|
|
# Module-level cached connection for :memory: databases.
|
|
# SQLite :memory: creates a fresh DB per connection — we must reuse the same one.
|
|
_MEMORY_CONN: Optional[sqlite3.Connection] = None
|
|
|
|
|
|
def _db_path() -> str:
|
|
"""Returns the SQLite database path (configurable via PROPERTIES_DB_PATH)."""
|
|
env_path = os.getenv("PROPERTIES_DB_PATH")
|
|
if env_path:
|
|
return env_path
|
|
tools_dir = os.path.dirname(os.path.abspath(__file__))
|
|
agent_dir = os.path.dirname(tools_dir)
|
|
data_dir = os.path.join(agent_dir, "data")
|
|
os.makedirs(data_dir, exist_ok=True)
|
|
return os.path.join(data_dir, "properties.db")
|
|
|
|
|
|
def _get_conn() -> sqlite3.Connection:
|
|
"""
|
|
Returns a SQLite connection with the schema initialized.
|
|
For :memory: databases, returns the same connection every time so
|
|
data persists across calls within a session / test run.
|
|
"""
|
|
global _MEMORY_CONN
|
|
path = _db_path()
|
|
|
|
if path == ":memory:":
|
|
if _MEMORY_CONN is None:
|
|
_MEMORY_CONN = sqlite3.connect(":memory:", check_same_thread=False)
|
|
_MEMORY_CONN.row_factory = sqlite3.Row
|
|
_MEMORY_CONN.execute(_SCHEMA_SQL)
|
|
_MEMORY_CONN.commit()
|
|
return _MEMORY_CONN
|
|
|
|
conn = sqlite3.connect(path, check_same_thread=False)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute(_SCHEMA_SQL)
|
|
conn.commit()
|
|
return conn
|
|
|
|
|
|
def _close_conn(conn: sqlite3.Connection) -> None:
|
|
"""Closes file-based connections; leaves :memory: connection open."""
|
|
if _db_path() != ":memory:":
|
|
conn.close()
|
|
|
|
|
|
def _row_to_dict(row: sqlite3.Row) -> dict:
|
|
"""Converts a sqlite3.Row to a plain dict with computed equity/appreciation fields."""
|
|
d = dict(row)
|
|
current_value = d.get("current_value", 0) or 0
|
|
mortgage_balance = d.get("mortgage_balance", 0) or 0
|
|
purchase_price = d.get("purchase_price", 0) or 0
|
|
|
|
equity = round(current_value - mortgage_balance, 2)
|
|
equity_pct = round((equity / current_value * 100), 2) if current_value > 0 else 0.0
|
|
appreciation = round(current_value - purchase_price, 2)
|
|
appreciation_pct = (
|
|
round((appreciation / purchase_price * 100), 2) if purchase_price > 0 else 0.0
|
|
)
|
|
|
|
d["equity"] = equity
|
|
d["equity_pct"] = equity_pct
|
|
d["appreciation"] = appreciation
|
|
d["appreciation_pct"] = appreciation_pct
|
|
# Backward-compat alias: existing tests check for "added_at"
|
|
d["added_at"] = d.get("created_at")
|
|
return d
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Test helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def property_store_clear() -> None:
|
|
"""
|
|
Wipes ALL property records. Used in tests to reset state between cases.
|
|
For :memory: databases, deletes all rows from the shared connection.
|
|
"""
|
|
global _MEMORY_CONN
|
|
path = _db_path()
|
|
try:
|
|
if path == ":memory:":
|
|
if _MEMORY_CONN is not None:
|
|
_MEMORY_CONN.execute("DELETE FROM properties")
|
|
_MEMORY_CONN.commit()
|
|
else:
|
|
conn = _get_conn()
|
|
conn.execute("DELETE FROM properties")
|
|
conn.commit()
|
|
_close_conn(conn)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public tool functions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def add_property(
|
|
address: str,
|
|
purchase_price: float,
|
|
current_value: Optional[float] = None,
|
|
mortgage_balance: float = 0.0,
|
|
monthly_rent: float = 0.0,
|
|
county_key: str = "austin",
|
|
property_type: str = "Single Family",
|
|
purchase_date: Optional[str] = None,
|
|
) -> dict:
|
|
"""
|
|
Records a property in the SQLite store.
|
|
|
|
Args:
|
|
address: Full street address.
|
|
purchase_price: Original purchase price in USD.
|
|
current_value: Current estimated market value. Defaults to purchase_price.
|
|
mortgage_balance: Outstanding mortgage balance. Defaults to 0.
|
|
monthly_rent: Monthly rental income if a rental property. Defaults to 0.
|
|
county_key: ACTRIS area key (e.g. "austin", "travis_county").
|
|
property_type: "Single Family", "Condo", "Townhouse", etc.
|
|
purchase_date: Optional ISO date string (YYYY-MM-DD).
|
|
"""
|
|
if not is_property_tracking_enabled():
|
|
return _FEATURE_DISABLED_RESPONSE
|
|
|
|
tool_result_id = f"prop_add_{int(datetime.utcnow().timestamp())}"
|
|
|
|
if not address or not address.strip():
|
|
return {
|
|
"tool_name": "property_tracker",
|
|
"success": False,
|
|
"tool_result_id": tool_result_id,
|
|
"error": {
|
|
"code": "PROPERTY_TRACKER_INVALID_INPUT",
|
|
"message": "address is required and cannot be empty.",
|
|
},
|
|
}
|
|
if purchase_price <= 0:
|
|
return {
|
|
"tool_name": "property_tracker",
|
|
"success": False,
|
|
"tool_result_id": tool_result_id,
|
|
"error": {
|
|
"code": "PROPERTY_TRACKER_INVALID_INPUT",
|
|
"message": "purchase_price must be greater than zero.",
|
|
},
|
|
}
|
|
|
|
effective_value = current_value if current_value is not None else purchase_price
|
|
prop_id = f"prop_{uuid.uuid4().hex[:8]}"
|
|
now = datetime.utcnow().isoformat()
|
|
|
|
try:
|
|
conn = _get_conn()
|
|
conn.execute(
|
|
"""INSERT INTO properties
|
|
(id, address, property_type, purchase_price, purchase_date,
|
|
current_value, mortgage_balance, monthly_rent, county_key,
|
|
is_active, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1, ?, ?)""",
|
|
(
|
|
prop_id, address.strip(), property_type, purchase_price,
|
|
purchase_date, effective_value, mortgage_balance, monthly_rent,
|
|
county_key, now, now,
|
|
),
|
|
)
|
|
conn.commit()
|
|
row = conn.execute(
|
|
"SELECT * FROM properties WHERE id = ?", (prop_id,)
|
|
).fetchone()
|
|
_close_conn(conn)
|
|
record = _row_to_dict(row)
|
|
except Exception as exc:
|
|
return {
|
|
"tool_name": "property_tracker",
|
|
"success": False,
|
|
"tool_result_id": tool_result_id,
|
|
"error": {"code": "PROPERTY_TRACKER_DB_ERROR", "message": str(exc)},
|
|
}
|
|
|
|
equity = record["equity"]
|
|
equity_pct = record["equity_pct"]
|
|
|
|
return {
|
|
"tool_name": "property_tracker",
|
|
"success": True,
|
|
"tool_result_id": tool_result_id,
|
|
"timestamp": now,
|
|
"result": {
|
|
"status": "added",
|
|
"property": record,
|
|
"message": (
|
|
f"Property recorded: {address.strip()}. "
|
|
f"Current equity: ${equity:,.0f} "
|
|
f"({equity_pct:.1f}% of ${effective_value:,.0f} value)."
|
|
),
|
|
},
|
|
}
|
|
|
|
|
|
async def get_properties() -> dict:
|
|
"""
|
|
Returns all active properties with per-property equity and portfolio totals.
|
|
Primary read function. list_properties() is kept as an alias.
|
|
"""
|
|
if not is_property_tracking_enabled():
|
|
return _FEATURE_DISABLED_RESPONSE
|
|
|
|
tool_result_id = f"prop_list_{int(datetime.utcnow().timestamp())}"
|
|
|
|
try:
|
|
conn = _get_conn()
|
|
rows = conn.execute(
|
|
"SELECT * FROM properties WHERE is_active = 1 ORDER BY created_at"
|
|
).fetchall()
|
|
_close_conn(conn)
|
|
properties = [_row_to_dict(row) for row in rows]
|
|
except Exception as exc:
|
|
return {
|
|
"tool_name": "property_tracker",
|
|
"success": False,
|
|
"tool_result_id": tool_result_id,
|
|
"error": {"code": "PROPERTY_TRACKER_DB_ERROR", "message": str(exc)},
|
|
}
|
|
|
|
if not properties:
|
|
return {
|
|
"tool_name": "property_tracker",
|
|
"success": True,
|
|
"tool_result_id": tool_result_id,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"result": {
|
|
"properties": [],
|
|
"summary": {
|
|
"property_count": 0,
|
|
"total_purchase_price": 0,
|
|
"total_current_value": 0,
|
|
"total_mortgage_balance": 0,
|
|
"total_equity": 0,
|
|
"total_equity_pct": 0.0,
|
|
"total_monthly_rent": 0,
|
|
},
|
|
"message": (
|
|
"No properties tracked yet. "
|
|
"Add a property with: \"Add my property at [address], "
|
|
"purchased for $X, worth $Y, mortgage $Z.\""
|
|
),
|
|
},
|
|
}
|
|
|
|
total_purchase = sum(p["purchase_price"] for p in properties)
|
|
total_value = sum(p["current_value"] for p in properties)
|
|
total_mortgage = sum(p["mortgage_balance"] for p in properties)
|
|
total_equity = round(total_value - total_mortgage, 2)
|
|
total_equity_pct = (
|
|
round((total_equity / total_value * 100), 2) if total_value > 0 else 0.0
|
|
)
|
|
total_rent = sum(p.get("monthly_rent", 0) or 0 for p in properties)
|
|
|
|
return {
|
|
"tool_name": "property_tracker",
|
|
"success": True,
|
|
"tool_result_id": tool_result_id,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"result": {
|
|
"properties": properties,
|
|
"summary": {
|
|
"property_count": len(properties),
|
|
"total_purchase_price": total_purchase,
|
|
"total_current_value": total_value,
|
|
"total_mortgage_balance": total_mortgage,
|
|
"total_equity": total_equity,
|
|
"total_equity_pct": total_equity_pct,
|
|
"total_monthly_rent": total_rent,
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
async def list_properties() -> dict:
|
|
"""Alias for get_properties() — kept for backward compatibility."""
|
|
return await get_properties()
|
|
|
|
|
|
async def update_property(
|
|
property_id: str,
|
|
current_value: Optional[float] = None,
|
|
mortgage_balance: Optional[float] = None,
|
|
monthly_rent: Optional[float] = None,
|
|
) -> dict:
|
|
"""
|
|
Updates a tracked property's current value, mortgage balance, or monthly rent.
|
|
|
|
Args:
|
|
property_id: ID of the property to update (e.g. 'prop_a1b2c3d4').
|
|
current_value: New current market value in USD.
|
|
mortgage_balance: Updated outstanding mortgage balance in USD.
|
|
monthly_rent: Updated monthly rental income in USD.
|
|
|
|
Returns:
|
|
Updated property record with recalculated equity.
|
|
"""
|
|
if not is_property_tracking_enabled():
|
|
return _FEATURE_DISABLED_RESPONSE
|
|
|
|
tool_result_id = f"prop_update_{int(datetime.utcnow().timestamp())}"
|
|
prop_id = property_id.strip()
|
|
|
|
try:
|
|
conn = _get_conn()
|
|
row = conn.execute(
|
|
"SELECT * FROM properties WHERE id = ? AND is_active = 1", (prop_id,)
|
|
).fetchone()
|
|
|
|
if row is None:
|
|
_close_conn(conn)
|
|
return {
|
|
"tool_name": "property_tracker",
|
|
"success": False,
|
|
"tool_result_id": tool_result_id,
|
|
"error": {
|
|
"code": "PROPERTY_TRACKER_NOT_FOUND",
|
|
"message": (
|
|
f"Property '{property_id}' not found. "
|
|
"Use get_properties() to see valid IDs."
|
|
),
|
|
},
|
|
}
|
|
|
|
updates = []
|
|
params = []
|
|
if current_value is not None:
|
|
updates.append("current_value = ?")
|
|
params.append(current_value)
|
|
if mortgage_balance is not None:
|
|
updates.append("mortgage_balance = ?")
|
|
params.append(mortgage_balance)
|
|
if monthly_rent is not None:
|
|
updates.append("monthly_rent = ?")
|
|
params.append(monthly_rent)
|
|
|
|
if not updates:
|
|
_close_conn(conn)
|
|
return {
|
|
"tool_name": "property_tracker",
|
|
"success": False,
|
|
"tool_result_id": tool_result_id,
|
|
"error": {
|
|
"code": "PROPERTY_TRACKER_INVALID_INPUT",
|
|
"message": (
|
|
"At least one of current_value, mortgage_balance, "
|
|
"or monthly_rent must be provided."
|
|
),
|
|
},
|
|
}
|
|
|
|
now = datetime.utcnow().isoformat()
|
|
updates.append("updated_at = ?")
|
|
params.append(now)
|
|
params.append(prop_id)
|
|
|
|
conn.execute(
|
|
f"UPDATE properties SET {', '.join(updates)} WHERE id = ?",
|
|
params,
|
|
)
|
|
conn.commit()
|
|
|
|
updated_row = conn.execute(
|
|
"SELECT * FROM properties WHERE id = ?", (prop_id,)
|
|
).fetchone()
|
|
_close_conn(conn)
|
|
record = _row_to_dict(updated_row)
|
|
except Exception as exc:
|
|
return {
|
|
"tool_name": "property_tracker",
|
|
"success": False,
|
|
"tool_result_id": tool_result_id,
|
|
"error": {"code": "PROPERTY_TRACKER_DB_ERROR", "message": str(exc)},
|
|
}
|
|
|
|
return {
|
|
"tool_name": "property_tracker",
|
|
"success": True,
|
|
"tool_result_id": tool_result_id,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"result": {
|
|
"status": "updated",
|
|
"property": record,
|
|
"message": (
|
|
f"Property updated: {record['address']}. "
|
|
f"New equity: ${record['equity']:,.0f}."
|
|
),
|
|
},
|
|
}
|
|
|
|
|
|
async def remove_property(property_id: str) -> dict:
|
|
"""
|
|
Soft-deletes a property by setting is_active = 0.
|
|
Data is preserved for audit purposes.
|
|
"""
|
|
if not is_property_tracking_enabled():
|
|
return _FEATURE_DISABLED_RESPONSE
|
|
|
|
tool_result_id = f"prop_remove_{int(datetime.utcnow().timestamp())}"
|
|
prop_id = property_id.strip().lower()
|
|
|
|
try:
|
|
conn = _get_conn()
|
|
row = conn.execute(
|
|
"SELECT * FROM properties WHERE id = ? AND is_active = 1", (prop_id,)
|
|
).fetchone()
|
|
|
|
if row is None:
|
|
_close_conn(conn)
|
|
return {
|
|
"tool_name": "property_tracker",
|
|
"success": False,
|
|
"tool_result_id": tool_result_id,
|
|
"error": {
|
|
"code": "PROPERTY_TRACKER_NOT_FOUND",
|
|
"message": (
|
|
f"Property '{property_id}' not found. "
|
|
"Use get_properties() to see valid IDs."
|
|
),
|
|
},
|
|
}
|
|
|
|
address = row["address"]
|
|
conn.execute(
|
|
"UPDATE properties SET is_active = 0, updated_at = ? WHERE id = ?",
|
|
(datetime.utcnow().isoformat(), prop_id),
|
|
)
|
|
conn.commit()
|
|
_close_conn(conn)
|
|
except Exception as exc:
|
|
return {
|
|
"tool_name": "property_tracker",
|
|
"success": False,
|
|
"tool_result_id": tool_result_id,
|
|
"error": {"code": "PROPERTY_TRACKER_DB_ERROR", "message": str(exc)},
|
|
}
|
|
|
|
return {
|
|
"tool_name": "property_tracker",
|
|
"success": True,
|
|
"tool_result_id": tool_result_id,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"result": {
|
|
"status": "removed",
|
|
"property_id": prop_id,
|
|
"address": address,
|
|
"message": f"Property removed: {address}.",
|
|
},
|
|
}
|
|
|
|
|
|
async def get_real_estate_equity() -> dict:
|
|
"""
|
|
Returns total real estate equity across all tracked active properties.
|
|
Designed to be combined with portfolio_analysis for net worth calculation.
|
|
"""
|
|
if not is_property_tracking_enabled():
|
|
return _FEATURE_DISABLED_RESPONSE
|
|
|
|
tool_result_id = f"prop_equity_{int(datetime.utcnow().timestamp())}"
|
|
|
|
try:
|
|
conn = _get_conn()
|
|
rows = conn.execute(
|
|
"SELECT current_value, mortgage_balance "
|
|
"FROM properties WHERE is_active = 1"
|
|
).fetchall()
|
|
_close_conn(conn)
|
|
except Exception as exc:
|
|
return {
|
|
"tool_name": "property_tracker",
|
|
"success": False,
|
|
"tool_result_id": tool_result_id,
|
|
"error": {"code": "PROPERTY_TRACKER_DB_ERROR", "message": str(exc)},
|
|
}
|
|
|
|
total_value = sum(r["current_value"] for r in rows)
|
|
total_mortgage = sum(r["mortgage_balance"] for r in rows)
|
|
total_equity = round(total_value - total_mortgage, 2)
|
|
|
|
return {
|
|
"tool_name": "property_tracker",
|
|
"success": True,
|
|
"tool_result_id": tool_result_id,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"result": {
|
|
"property_count": len(rows),
|
|
"total_real_estate_value": total_value,
|
|
"total_mortgage_balance": total_mortgage,
|
|
"total_real_estate_equity": total_equity,
|
|
},
|
|
}
|
|
|
|
|
|
async def get_total_net_worth(portfolio_value: float) -> dict:
|
|
"""
|
|
Combines live investment portfolio value with real estate equity
|
|
for a unified net worth view.
|
|
|
|
Args:
|
|
portfolio_value: Total liquid investment portfolio value in USD
|
|
(from portfolio_analysis tool result).
|
|
|
|
Returns:
|
|
Dict with investment_portfolio, real_estate_equity, total_net_worth,
|
|
properties list, and plain-English summary.
|
|
"""
|
|
if not is_property_tracking_enabled():
|
|
return _FEATURE_DISABLED_RESPONSE
|
|
|
|
tool_result_id = f"prop_networth_{int(datetime.utcnow().timestamp())}"
|
|
|
|
try:
|
|
conn = _get_conn()
|
|
rows = conn.execute(
|
|
"SELECT * FROM properties WHERE is_active = 1 ORDER BY created_at"
|
|
).fetchall()
|
|
_close_conn(conn)
|
|
properties = [_row_to_dict(row) for row in rows]
|
|
except Exception as exc:
|
|
return {
|
|
"tool_name": "property_tracker",
|
|
"success": False,
|
|
"tool_result_id": tool_result_id,
|
|
"error": {"code": "PROPERTY_TRACKER_DB_ERROR", "message": str(exc)},
|
|
}
|
|
|
|
total_value = sum(p["current_value"] for p in properties)
|
|
total_mortgage = sum(p["mortgage_balance"] for p in properties)
|
|
real_estate_equity = round(total_value - total_mortgage, 2)
|
|
total_net_worth = round(portfolio_value + real_estate_equity, 2)
|
|
|
|
if properties:
|
|
summary = (
|
|
f"Total net worth ${total_net_worth:,.0f} across investments "
|
|
f"(${portfolio_value:,.0f}) and real estate equity "
|
|
f"(${real_estate_equity:,.0f})."
|
|
)
|
|
else:
|
|
summary = (
|
|
f"Investment portfolio: ${portfolio_value:,.0f}. "
|
|
"No properties tracked yet. Add properties to include real estate equity."
|
|
)
|
|
|
|
return {
|
|
"tool_name": "property_tracker",
|
|
"success": True,
|
|
"tool_result_id": tool_result_id,
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"result": {
|
|
"investment_portfolio": portfolio_value,
|
|
"real_estate_equity": real_estate_equity,
|
|
"total_net_worth": total_net_worth,
|
|
"properties": properties,
|
|
"summary": summary,
|
|
},
|
|
}
|
|
|