Browse Source

feat: complete property_tracker CRUD with SQLite — full agent access

- Convert property_tracker.py from in-memory dict to SQLite (agent/data/properties.db)
- Add update_property(): update current_value, mortgage_balance, or monthly_rent
- Add get_total_net_worth(portfolio_value): combines investment portfolio + real estate equity
- Add get_properties(): primary function (list_properties() kept as alias for backward compat)
- property_store_clear() now runs DELETE FROM properties (tests stay green)
- _row_to_dict() adds backward-compat added_at alias (= created_at) for existing tests
- Schema: id TEXT, address, property_type, purchase_price, current_value,
  mortgage_balance, monthly_rent, county_key, is_active, created_at, updated_at
- Soft delete: remove_property() sets is_active = 0 (data preserved for audit)
- graph.py: add property_update, property_remove routes + get_total_net_worth execution
- graph.py: add "sold my house", "refinanced", "everything i own" keyword triggers

Made-with: Cursor
pull/6453/head
Priyanka Punukollu 1 month ago
parent
commit
9ecc83d9ab
  1. BIN
      data/properties.db
  2. 51
      graph.py
  3. 465
      tools/property_tracker.py

BIN
data/properties.db

Binary file not shown.

51
graph.py

@ -23,8 +23,11 @@ from tools.real_estate import (
)
from tools.property_tracker import (
add_property,
get_properties,
list_properties,
update_property as update_tracked_property,
get_real_estate_equity,
get_total_net_worth,
remove_property as remove_tracked_property,
is_property_tracking_enabled,
)
@ -456,9 +459,23 @@ async def classify_node(state: AgentState) -> AgentState:
"net worth including", "net worth with real estate",
"total net worth", "total wealth", "all my assets",
"real estate net worth", "net worth and real estate",
"everything i own",
]
property_update_kws = [
"update my home", "update my property", "update my house",
"home value changed", "my home is worth", "refinanced",
"new mortgage balance", "property value update",
]
property_remove_kws = [
"remove property", "delete property", "sold my house",
"sold my home", "sold my property",
]
if any(kw in query for kw in property_add_kws):
return {**state, "query_type": "property_add"}
if any(kw in query for kw in property_remove_kws):
return {**state, "query_type": "property_remove"}
if any(kw in query for kw in property_update_kws):
return {**state, "query_type": "property_update"}
if any(kw in query for kw in property_list_kws):
return {**state, "query_type": "property_list"}
if any(kw in query for kw in property_net_worth_kws):
@ -1327,15 +1344,41 @@ async def tools_node(state: AgentState) -> AgentState:
tool_results.append(result)
elif query_type == "property_list":
result = await list_properties()
result = await get_properties()
tool_results.append(result)
elif query_type == "property_update":
# Extract property ID and new values from query
import re as _re
id_match = _re.search(r'\bprop_[a-f0-9]{8}\b', user_query, _re.I)
prop_id = id_match.group(0).lower() if id_match else ""
new_value = _extract_price(user_query)
result = await update_tracked_property(
property_id=prop_id,
current_value=new_value,
)
tool_results.append(result)
elif query_type == "property_remove":
import re as _re
id_match = _re.search(r'\bprop_[a-f0-9]{8}\b', user_query, _re.I)
prop_id = id_match.group(0).lower() if id_match else ""
result = await remove_tracked_property(prop_id)
tool_results.append(result)
elif query_type == "property_net_worth":
equity_result = await get_real_estate_equity()
tool_results.append(equity_result)
# Also fetch the financial portfolio so the agent can combine both
# Fetch portfolio value, then combine with real estate equity
perf_result = await portfolio_analysis(token=state.get("bearer_token"))
tool_results.append(perf_result)
pv = 0.0
if perf_result.get("success"):
portfolio_snapshot = perf_result
pv = (
perf_result.get("result", {}).get("summary", {})
.get("total_current_value_usd", 0.0)
)
net_worth_result = await get_total_net_worth(pv)
tool_results.append(net_worth_result)
# --- Wealth Bridge tools ---
elif query_type == "wealth_down_payment":

465
tools/property_tracker.py

@ -8,15 +8,33 @@ Allows users to track real estate properties they own alongside
their financial portfolio. Equity is computed as:
equity = current_value - mortgage_balance
Three capabilities:
Five capabilities:
1. add_property(...) record a property you own
2. list_properties() show all properties with equity computed
3. get_real_estate_equity() total equity across all properties (for net worth)
Schema (StoredProperty):
id, address, property_type, purchase_price, purchase_date,
current_value, mortgage_balance, equity, equity_pct,
county_key, added_at
2. get_properties() show all properties with equity computed
3. list_properties() alias for get_properties()
4. update_property(...) update value, mortgage, or rent
5. remove_property(id) soft-delete (set is_active = 0)
6. get_real_estate_equity() total equity across all properties
7. get_total_net_worth(...) portfolio + real estate combined
Storage: SQLite at agent/data/properties.db
(override path with PROPERTIES_DB_PATH env var used in tests)
Schema:
CREATE TABLE IF NOT EXISTS properties (
id TEXT PRIMARY KEY,
address TEXT NOT NULL,
property_type TEXT DEFAULT 'primary',
purchase_price REAL,
purchase_date TEXT,
current_value REAL,
mortgage_balance REAL DEFAULT 0,
monthly_rent REAL DEFAULT 0,
county_key TEXT DEFAULT 'austin',
is_active INTEGER DEFAULT 1,
created_at TEXT,
updated_at TEXT
)
All functions return the standard tool result envelope:
{tool_name, success, tool_result_id, timestamp, result} on success
@ -24,8 +42,10 @@ All functions return the standard tool result envelope:
"""
import os
import time
import sqlite3
import uuid
from datetime import datetime
from typing import Optional
# ---------------------------------------------------------------------------
# Feature flag (shared with real_estate.py)
@ -49,23 +69,87 @@ _FEATURE_DISABLED_RESPONSE = {
},
}
# ---------------------------------------------------------------------------
# In-memory property store
# SQLite connection helpers
# ---------------------------------------------------------------------------
_property_store: dict[str, dict] = {}
_property_counter: list[int] = [0] # mutable container so helpers can increment it
def _db_path() -> str:
"""Returns the SQLite database path (configurable via PROPERTIES_DB_PATH)."""
env_path = os.getenv("PROPERTIES_DB_PATH")
if env_path:
return env_path
# Default: agent/data/properties.db relative to this file's parent (tools/)
tools_dir = os.path.dirname(os.path.abspath(__file__))
agent_dir = os.path.dirname(tools_dir)
data_dir = os.path.join(agent_dir, "data")
os.makedirs(data_dir, exist_ok=True)
return os.path.join(data_dir, "properties.db")
def _get_conn() -> sqlite3.Connection:
"""Opens a SQLite connection and ensures the schema exists."""
path = _db_path()
conn = sqlite3.connect(path)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("""
CREATE TABLE IF NOT EXISTS properties (
id TEXT PRIMARY KEY,
address TEXT NOT NULL,
property_type TEXT DEFAULT 'Single Family',
purchase_price REAL NOT NULL,
purchase_date TEXT,
current_value REAL NOT NULL,
mortgage_balance REAL DEFAULT 0,
monthly_rent REAL DEFAULT 0,
county_key TEXT DEFAULT 'austin',
is_active INTEGER DEFAULT 1,
created_at TEXT,
updated_at TEXT
)
""")
conn.commit()
return conn
def _row_to_dict(row: sqlite3.Row) -> dict:
"""Converts a sqlite3.Row to a plain dict with computed fields."""
d = dict(row)
current_value = d.get("current_value", 0) or 0
mortgage_balance = d.get("mortgage_balance", 0) or 0
purchase_price = d.get("purchase_price", 0) or 0
equity = round(current_value - mortgage_balance, 2)
equity_pct = round((equity / current_value * 100), 2) if current_value > 0 else 0.0
appreciation = round(current_value - purchase_price, 2)
appreciation_pct = round((appreciation / purchase_price * 100), 2) if purchase_price > 0 else 0.0
d["equity"] = equity
d["equity_pct"] = equity_pct
d["appreciation"] = appreciation
d["appreciation_pct"] = appreciation_pct
# Backward-compat alias: old tests expect "added_at" (previous in-memory schema used this name)
d["added_at"] = d.get("created_at")
return d
def property_store_clear() -> None:
"""Clears the property store and resets the counter. Used in tests."""
_property_store.clear()
_property_counter[0] = 0
# ---------------------------------------------------------------------------
# Test helpers — kept for backward compatibility with existing test suite
# ---------------------------------------------------------------------------
def _next_id() -> str:
_property_counter[0] += 1
return f"prop_{_property_counter[0]:03d}"
def property_store_clear() -> None:
"""
Wipes ALL property records from the database.
Used in tests to reset state between test cases.
"""
try:
conn = _get_conn()
conn.execute("DELETE FROM properties")
conn.commit()
conn.close()
except Exception:
pass
# ---------------------------------------------------------------------------
@ -75,20 +159,22 @@ def _next_id() -> str:
async def add_property(
address: str,
purchase_price: float,
current_value: float | None = None,
current_value: Optional[float] = None,
mortgage_balance: float = 0.0,
monthly_rent: float = 0.0,
county_key: str = "austin",
property_type: str = "Single Family",
purchase_date: str | None = None,
purchase_date: Optional[str] = None,
) -> dict:
"""
Records a property in the in-memory store.
Records a property in the SQLite store.
Args:
address: Full street address (e.g. "123 Barton Hills Dr, Austin, TX 78704").
purchase_price: Original purchase price in USD.
current_value: Current estimated market value. Defaults to purchase_price if None.
mortgage_balance: Outstanding mortgage balance. Defaults to 0 (paid off / no mortgage).
monthly_rent: Monthly rental income if this is a rental property. Defaults to 0.
county_key: ACTRIS data key for market context (e.g. "austin", "travis_county").
property_type: "Single Family", "Condo", "Townhouse", "Multi-Family", or "Land".
purchase_date: Optional ISO date string (YYYY-MM-DD).
@ -98,7 +184,6 @@ async def add_property(
tool_result_id = f"prop_add_{int(datetime.utcnow().timestamp())}"
# Validation
if not address or not address.strip():
return {
"tool_name": "property_tracker",
@ -121,34 +206,50 @@ async def add_property(
}
effective_value = current_value if current_value is not None else purchase_price
equity = round(effective_value - mortgage_balance, 2)
equity_pct = round((equity / effective_value * 100), 2) if effective_value > 0 else 0.0
appreciation = round(effective_value - purchase_price, 2)
appreciation_pct = round((appreciation / purchase_price * 100), 2) if purchase_price > 0 else 0.0
prop_id = f"prop_{uuid.uuid4().hex[:8]}"
now = datetime.utcnow().isoformat()
try:
conn = _get_conn()
conn.execute(
"""INSERT INTO properties
(id, address, property_type, purchase_price, purchase_date,
current_value, mortgage_balance, monthly_rent, county_key,
is_active, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 1, ?, ?)""",
(
prop_id, address.strip(), property_type, purchase_price,
purchase_date, effective_value, mortgage_balance, monthly_rent,
county_key, now, now,
),
)
conn.commit()
row = conn.execute(
"SELECT * FROM properties WHERE id = ?", (prop_id,)
).fetchone()
conn.close()
prop_id = _next_id()
record = {
"id": prop_id,
"address": address.strip(),
"property_type": property_type,
"purchase_price": purchase_price,
"purchase_date": purchase_date,
"current_value": effective_value,
"mortgage_balance": mortgage_balance,
"equity": equity,
"equity_pct": equity_pct,
"appreciation": appreciation,
"appreciation_pct": appreciation_pct,
"county_key": county_key,
"added_at": datetime.utcnow().isoformat(),
record = _row_to_dict(row)
except Exception as exc:
return {
"tool_name": "property_tracker",
"success": False,
"tool_result_id": tool_result_id,
"error": {
"code": "PROPERTY_TRACKER_DB_ERROR",
"message": str(exc),
},
}
_property_store[prop_id] = record
equity = record["equity"]
equity_pct = record["equity_pct"]
return {
"tool_name": "property_tracker",
"success": True,
"tool_result_id": tool_result_id,
"timestamp": datetime.utcnow().isoformat(),
"timestamp": now,
"result": {
"status": "added",
"property": record,
@ -161,15 +262,30 @@ async def add_property(
}
async def list_properties() -> dict:
async def get_properties() -> dict:
"""
Returns all stored properties with per-property equity and portfolio totals.
Returns all active properties with per-property equity and portfolio totals.
Alias: also callable as list_properties() for backward compatibility.
"""
if not is_property_tracking_enabled():
return _FEATURE_DISABLED_RESPONSE
tool_result_id = f"prop_list_{int(datetime.utcnow().timestamp())}"
properties = list(_property_store.values())
try:
conn = _get_conn()
rows = conn.execute(
"SELECT * FROM properties WHERE is_active = 1 ORDER BY created_at"
).fetchall()
conn.close()
properties = [_row_to_dict(row) for row in rows]
except Exception as exc:
return {
"tool_name": "property_tracker",
"success": False,
"tool_result_id": tool_result_id,
"error": {"code": "PROPERTY_TRACKER_DB_ERROR", "message": str(exc)},
}
if not properties:
return {
@ -186,6 +302,7 @@ async def list_properties() -> dict:
"total_mortgage_balance": 0,
"total_equity": 0,
"total_equity_pct": 0.0,
"total_monthly_rent": 0,
},
"message": (
"No properties tracked yet. "
@ -200,6 +317,7 @@ async def list_properties() -> dict:
total_mortgage = sum(p["mortgage_balance"] for p in properties)
total_equity = round(total_value - total_mortgage, 2)
total_equity_pct = round((total_equity / total_value * 100), 2) if total_value > 0 else 0.0
total_rent = sum(p.get("monthly_rent", 0) or 0 for p in properties)
return {
"tool_name": "property_tracker",
@ -215,25 +333,111 @@ async def list_properties() -> dict:
"total_mortgage_balance": total_mortgage,
"total_equity": total_equity,
"total_equity_pct": total_equity_pct,
"total_monthly_rent": total_rent,
},
},
}
async def get_real_estate_equity() -> dict:
# Backward-compatible alias
async def list_properties() -> dict:
"""Alias for get_properties() — kept for backward compatibility."""
return await get_properties()
async def update_property(
property_id: str,
current_value: Optional[float] = None,
mortgage_balance: Optional[float] = None,
monthly_rent: Optional[float] = None,
) -> dict:
"""
Returns total real estate equity across all tracked properties.
Designed to be combined with portfolio_analysis for net worth calculation.
Updates a tracked property's current value, mortgage balance, or monthly rent.
Args:
property_id: ID of the property to update (e.g. 'prop_a1b2c3d4').
current_value: New current market value in USD.
mortgage_balance: Updated outstanding mortgage balance in USD.
monthly_rent: Updated monthly rental income in USD.
Returns:
Updated property record with recalculated equity.
"""
if not is_property_tracking_enabled():
return _FEATURE_DISABLED_RESPONSE
tool_result_id = f"prop_equity_{int(datetime.utcnow().timestamp())}"
properties = list(_property_store.values())
tool_result_id = f"prop_update_{int(datetime.utcnow().timestamp())}"
prop_id = property_id.strip()
total_value = sum(p["current_value"] for p in properties)
total_mortgage = sum(p["mortgage_balance"] for p in properties)
total_equity = round(total_value - total_mortgage, 2)
try:
conn = _get_conn()
row = conn.execute(
"SELECT * FROM properties WHERE id = ? AND is_active = 1", (prop_id,)
).fetchone()
if row is None:
conn.close()
return {
"tool_name": "property_tracker",
"success": False,
"tool_result_id": tool_result_id,
"error": {
"code": "PROPERTY_TRACKER_NOT_FOUND",
"message": (
f"Property '{property_id}' not found. "
"Use get_properties() to see valid IDs."
),
},
}
updates = []
params = []
if current_value is not None:
updates.append("current_value = ?")
params.append(current_value)
if mortgage_balance is not None:
updates.append("mortgage_balance = ?")
params.append(mortgage_balance)
if monthly_rent is not None:
updates.append("monthly_rent = ?")
params.append(monthly_rent)
if not updates:
conn.close()
return {
"tool_name": "property_tracker",
"success": False,
"tool_result_id": tool_result_id,
"error": {
"code": "PROPERTY_TRACKER_INVALID_INPUT",
"message": "At least one of current_value, mortgage_balance, or monthly_rent must be provided.",
},
}
now = datetime.utcnow().isoformat()
updates.append("updated_at = ?")
params.append(now)
params.append(prop_id)
conn.execute(
f"UPDATE properties SET {', '.join(updates)} WHERE id = ?",
params,
)
conn.commit()
updated_row = conn.execute(
"SELECT * FROM properties WHERE id = ?", (prop_id,)
).fetchone()
conn.close()
record = _row_to_dict(updated_row)
except Exception as exc:
return {
"tool_name": "property_tracker",
"success": False,
"tool_result_id": tool_result_id,
"error": {"code": "PROPERTY_TRACKER_DB_ERROR", "message": str(exc)},
}
return {
"tool_name": "property_tracker",
@ -241,17 +445,22 @@ async def get_real_estate_equity() -> dict:
"tool_result_id": tool_result_id,
"timestamp": datetime.utcnow().isoformat(),
"result": {
"property_count": len(properties),
"total_real_estate_value": total_value,
"total_mortgage_balance": total_mortgage,
"total_real_estate_equity": total_equity,
"status": "updated",
"property": record,
"message": (
f"Property updated: {record['address']}. "
f"New equity: ${record['equity']:,.0f}."
),
},
}
async def remove_property(property_id: str) -> dict:
"""
Removes a property from the store by its ID (e.g. 'prop_001').
Soft-deletes a property by setting is_active = 0.
Args:
property_id: ID of the property to remove (e.g. 'prop_a1b2c3d4').
"""
if not is_property_tracking_enabled():
return _FEATURE_DISABLED_RESPONSE
@ -259,7 +468,14 @@ async def remove_property(property_id: str) -> dict:
tool_result_id = f"prop_remove_{int(datetime.utcnow().timestamp())}"
prop_id = property_id.strip().lower()
if prop_id not in _property_store:
try:
conn = _get_conn()
row = conn.execute(
"SELECT * FROM properties WHERE id = ? AND is_active = 1", (prop_id,)
).fetchone()
if row is None:
conn.close()
return {
"tool_name": "property_tracker",
"success": False,
@ -268,12 +484,26 @@ async def remove_property(property_id: str) -> dict:
"code": "PROPERTY_TRACKER_NOT_FOUND",
"message": (
f"Property '{property_id}' not found. "
"Use list_properties() to see valid IDs."
"Use get_properties() to see valid IDs."
),
},
}
removed = _property_store.pop(prop_id)
address = row["address"]
conn.execute(
"UPDATE properties SET is_active = 0, updated_at = ? WHERE id = ?",
(datetime.utcnow().isoformat(), prop_id),
)
conn.commit()
conn.close()
except Exception as exc:
return {
"tool_name": "property_tracker",
"success": False,
"tool_result_id": tool_result_id,
"error": {"code": "PROPERTY_TRACKER_DB_ERROR", "message": str(exc)},
}
return {
"tool_name": "property_tracker",
"success": True,
@ -282,7 +512,112 @@ async def remove_property(property_id: str) -> dict:
"result": {
"status": "removed",
"property_id": prop_id,
"address": removed["address"],
"message": f"Property removed: {removed['address']}.",
"address": address,
"message": f"Property removed: {address}.",
},
}
async def get_real_estate_equity() -> dict:
"""
Returns total real estate equity across all tracked active properties.
Designed to be combined with portfolio_analysis for net worth calculation.
"""
if not is_property_tracking_enabled():
return _FEATURE_DISABLED_RESPONSE
tool_result_id = f"prop_equity_{int(datetime.utcnow().timestamp())}"
try:
conn = _get_conn()
rows = conn.execute(
"SELECT current_value, mortgage_balance FROM properties WHERE is_active = 1"
).fetchall()
conn.close()
except Exception as exc:
return {
"tool_name": "property_tracker",
"success": False,
"tool_result_id": tool_result_id,
"error": {"code": "PROPERTY_TRACKER_DB_ERROR", "message": str(exc)},
}
total_value = sum(r["current_value"] for r in rows)
total_mortgage = sum(r["mortgage_balance"] for r in rows)
total_equity = round(total_value - total_mortgage, 2)
return {
"tool_name": "property_tracker",
"success": True,
"tool_result_id": tool_result_id,
"timestamp": datetime.utcnow().isoformat(),
"result": {
"property_count": len(rows),
"total_real_estate_value": total_value,
"total_mortgage_balance": total_mortgage,
"total_real_estate_equity": total_equity,
},
}
async def get_total_net_worth(portfolio_value: float) -> dict:
"""
Combines live investment portfolio value with real estate equity
to produce a unified net worth view.
Args:
portfolio_value: Total liquid investment portfolio value in USD
(pass in from portfolio_analysis tool result).
Returns:
Dict with investment_portfolio, real_estate_equity, total_net_worth,
properties list, and plain-English summary.
"""
if not is_property_tracking_enabled():
return _FEATURE_DISABLED_RESPONSE
tool_result_id = f"prop_networth_{int(datetime.utcnow().timestamp())}"
try:
conn = _get_conn()
rows = conn.execute(
"SELECT * FROM properties WHERE is_active = 1 ORDER BY created_at"
).fetchall()
conn.close()
properties = [_row_to_dict(row) for row in rows]
except Exception as exc:
return {
"tool_name": "property_tracker",
"success": False,
"tool_result_id": tool_result_id,
"error": {"code": "PROPERTY_TRACKER_DB_ERROR", "message": str(exc)},
}
total_value = sum(p["current_value"] for p in properties)
total_mortgage = sum(p["mortgage_balance"] for p in properties)
real_estate_equity = round(total_value - total_mortgage, 2)
total_net_worth = round(portfolio_value + real_estate_equity, 2)
summary = (
f"Total net worth ${total_net_worth:,.0f} across investments "
f"(${portfolio_value:,.0f}) and real estate equity (${real_estate_equity:,.0f})."
)
if not properties:
summary = (
f"Investment portfolio: ${portfolio_value:,.0f}. "
"No properties tracked yet. Add properties to include real estate equity."
)
return {
"tool_name": "property_tracker",
"success": True,
"tool_result_id": tool_result_id,
"timestamp": datetime.utcnow().isoformat(),
"result": {
"investment_portfolio": portfolio_value,
"real_estate_equity": real_estate_equity,
"total_net_worth": total_net_worth,
"properties": properties,
"summary": summary,
},
}

Loading…
Cancel
Save