diff --git a/.cursor/rules/ai-agents-reference.mdc b/.cursor/rules/ai-agents-reference.mdc new file mode 100644 index 000000000..d5edefcda --- /dev/null +++ b/.cursor/rules/ai-agents-reference.mdc @@ -0,0 +1,163 @@ +--- +description: AI Agents architecture patterns, ReAct loops, tool design, evaluation, and production guardrails reference +globs: +alwaysApply: true +--- + +# AI Agents — Architecture & Production Reference + +## Core Terminology + +- **LLM Call**: Single request-response, no tools, no iteration +- **LLM + Tools**: LLM can call functions, still single-turn +- **Agentic System**: Multi-turn loop with Reasoning → Action → Observation cycle + +We are building an **Agentic System**, not just an LLM wrapper. + +## The ReAct Loop + +``` +THOUGHT ("What do I need?") → ACTION (Call tool) → OBSERVATION (Process result) → REPEAT or ANSWER +``` + +## When to Use Agentic Patterns + +**Use agents when:** +- Unknown information needs (can't predict data sources) +- Multi-step reasoning with dependencies +- Complex analysis requiring iteration +- Dynamic decision trees + +**Do NOT use agents when:** +- Deterministic workflows (use code / if-else) +- Batch processing (use pipelines) +- Simple classification (use single LLM call) +- Speed critical (<1s needed) + +**Key insight**: Start simple. Most problems don't need agents. Match complexity to uncertainty. + +## Complexity Spectrum (Cost & Latency) + +| Pattern | Cost | Latency | +|---|---|---| +| Single LLM | 1x | <1s | +| LLM + Tools | 2-3x | 1-3s | +| ReAct Agent | 5-10x | 5-15s | +| Planning | 10-20x | 15-30s | +| Multi-Agent | 20-50x | 30s+ | + +## Tool Design Principles + +Tools are the building blocks. Each tool must be: +- **Atomic**: One clear purpose, does ONE thing +- **Idempotent**: Safe to retry +- **Well-documented**: LLM reads the description to decide usage +- **Error-handled**: Returns structured errors (ToolResult with status, data, message) +- **Verified**: Check results before returning + +**Anti-patterns**: Too broad ("manage_patient"), missing error states, undocumented, side effects, unverified raw API data. + +## Production Guardrails (Non-Negotiable) + +All four must be implemented: + +1. **MAX_ITERATIONS (10-15)**: Prevents infinite loops and runaway costs +2. **TIMEOUT (30-45s)**: User experience limit / API gateway timeout +3. **COST_LIMIT ($1/query)**: Prevent bill explosions, alert on anomalies +4. **CIRCUIT_BREAKER**: Same action 3x → abort, log for debugging + +Without these: $10K bills from loops, 5min timeouts, hammered downstream services. + +## Verification Layer (3+ Required) + +| Type | Description | +|---|---| +| Fact Checking | Cross-reference claims against authoritative sources | +| Hallucination Detection | Flag unsupported claims, require source attribution | +| Confidence Scoring | Quantify certainty (0-1), surface low-confidence | +| Domain Constraints | Enforce business rules (dosage limits, trade limits) | +| Human-in-the-Loop | Escalation triggers for high-risk decisions | + +Implementation pattern: +``` +ToolResult(status, data, verification: VerificationResult(passed, confidence, warnings, errors, sources)) +``` + +## Evaluation Framework (50+ Test Cases Required) + +**What to measure**: Correctness, tool selection, tool execution, safety, consistency, edge cases, latency, cost. + +**Test case breakdown**: +- 20+ happy path scenarios with expected outcomes +- 10+ edge cases (missing data, boundary conditions) +- 10+ adversarial inputs (bypass attempts) +- 10+ multi-step reasoning scenarios + +Each test case includes: input query, expected tool calls, expected output, pass/fail criteria. + +**Targets**: Pass rate >80% (good), >90% (excellent). Run evals daily. + +## Performance Targets + +| Metric | Target | +|---|---| +| Single-tool latency | <5 seconds | +| Multi-step latency (3+ tools) | <15 seconds | +| Tool success rate | >95% | +| Eval pass rate | >80% | +| Hallucination rate | <5% | +| Verification accuracy | >90% | + +## Observability (Required) + +- **Trace Logging**: Full trace per request (input → reasoning → tool calls → output) +- **Latency Tracking**: Time breakdown for LLM calls, tool execution, total response +- **Error Tracking**: Capture and categorize failures with context +- **Token Usage**: Input/output tokens per request, cost tracking +- **Eval Results**: Historical scores, regression detection + +## BaseAgent Implementation Pattern + +```python +class BaseAgent: + def __init__(self, model, max_iterations=10, timeout_seconds=30.0, max_cost_usd=1.0): + # Register tools, set guardrails + + def run(self, task): + # ReAct loop: + # 1. LLM call with tool schemas + # 2. If no tool_calls → return final answer + # 3. Execute tool calls, append results + # 4. Check guardrails (iterations, timeout, cost, circuit breaker) + # 5. Repeat +``` + +## Cost Analysis (Required for Submission) + +Track during development: +- LLM API costs, total tokens (input/output), number of API calls, observability tool costs + +Project to production at: 100 / 1,000 / 10,000 / 100,000 users/month. + +## Build Strategy (Priority Order) + +1. Basic agent — single tool call working end-to-end +2. Tool expansion — add remaining tools, verify each works +3. Multi-step reasoning — agent chains tools appropriately +4. Observability — integrate tracing +5. Eval framework — build test suite, measure baseline +6. Verification layer — domain-specific checks +7. Iterate on evals — improve agent based on failures +8. Open source prep — package and document + +## Submission Deliverables + +- GitHub repo with setup guide, architecture overview, deployed link +- Demo video (3-5 min) +- Pre-Search document +- Agent architecture doc (1-2 pages) +- AI cost analysis (dev spend + projections) +- Eval dataset (50+ test cases with results) +- Open source contribution (package, PR, or public dataset) +- Deployed application (publicly accessible) +- Social post (X or LinkedIn, tag @GauntletAI) diff --git a/.cursor/rules/ghostfolio-project-spec.mdc b/.cursor/rules/ghostfolio-project-spec.mdc new file mode 100644 index 000000000..a22b9a903 --- /dev/null +++ b/.cursor/rules/ghostfolio-project-spec.mdc @@ -0,0 +1,122 @@ +--- +description: Ghostfolio CSV Import Auditor project specification and requirements +globs: +alwaysApply: true +--- + +# Ghostfolio CSV Import Auditor — Project Spec + +## What We're Building + +A **CSV Import Auditor Reasoning Agent** for the Ghostfolio finance platform that ensures safe, deterministic, and verifiable transaction imports. + +## Use Cases + +1. Parsing broker CSV exports +2. Mapping broker-specific fields to Ghostfolio schema +3. Validating transaction correctness +4. Detecting duplicates and conflicts +5. Generating a structured preview report +6. Safely committing transactions to the database + +## Architecture + +- **Backend**: TypeScript + NestJS (Ghostfolio native) +- **Database**: PostgreSQL (Railway) +- **Validation**: Zod schemas +- **Agent Orchestration**: Custom lightweight controller (no LangChain/LangGraph/CrewAI) +- **LLM**: Reasoning-capable model with tool/function calling +- **Observability**: Langfuse +- **Evaluation**: Deterministic + LLM-as-Judge (dual-layer) +- **CI**: Eval gating +- **Deployment**: Docker-compatible Ghostfolio backend + +## Agent Pipeline (6 Tools) + +``` +CSV → parseCSV → mapBrokerFields → validateTransactions → detectDuplicates → previewImportReport → commitImport +``` + +Each tool has: strict Zod input/output schemas, deterministic behavior, explicit error handling, no hidden side effects. All tools are pure except `commitImport()`. + +## Verification Requirements (Non-Negotiable) + +- Schema validation (required fields, types, formats) +- Accounting invariants (quantity >= 0, commission >= 0, price logic) +- Currency consistency +- Duplicate prevention (idempotency) +- Deterministic normalization +- Transactional database commits +- Before/after state signature validation +- No commit occurs unless ALL deterministic checks pass + +## Human-in-the-Loop + +- Always generate `previewImportReport()` before commit +- Require explicit user confirmation +- Never auto-commit without approval + +## LLM Usage (Cost-Efficient) + +- Parsing and validation are deterministic code (no LLM) +- LLM used ONLY for: broker column mapping suggestions + human-readable preview explanation +- Maximum 1-2 LLM calls per import +- No per-row inference + +## Performance Targets + +- Small CSV: < 2 seconds +- Large CSV: < 10-15 seconds +- Progress feedback required +- Dozens of concurrent imports supported + +## Evaluation Requirements + +### Layer 1: Deterministic (Primary) +- Schema correctness, numeric invariants, duplicate detection, idempotency, state signature comparison +- Minimum 50 test CSV cases, pass/fail enforced in CI + +### Layer 2: LLM-as-a-Judge (Secondary) +- Mapping explanation clarity (1-5), logical completeness (1-5), issue detection completeness (1-5), hallucination presence (0/1), overall quality (1-10) +- Judge does NOT control commit, only scores language outputs + +## Observability (Langfuse) + +Per import: one trace (`agent.import_audit`), one span per tool, latency per tool, token usage, `toolCountPlanned` vs `toolCountExecuted`, validation result, duplicate detection metrics, commit success/failure. + +## Security + +- CSV treated as data only +- Prompt injection prevention +- No arbitrary CSV content in prompts +- API keys secured server-side +- Audit logging enforced + +## Testing Strategy + +- Unit tests per tool +- Integration tests for full pipeline +- Adversarial malformed CSV tests +- Regression test suite in CI +- Deterministic eval dataset + +## Deadlines + +| Checkpoint | Deadline | Focus | +|---|---|---| +| Pre-Search | 2 hours after receiving | Architecture, Plan | +| MVP | Tuesday (24 hours) | Basic agent with tool use | +| Early Submission | Friday (4 days) | Eval framework + observability | +| Final | Sunday (7 days) | Production-ready + open source | + +## MVP Checklist + +- Agent responds to natural language queries in finance domain +- At least 3 functional tools the agent can invoke +- Tool calls execute successfully and return structured results +- Agent synthesizes tool results into coherent responses +- Conversation history maintained across turns +- Basic error handling (graceful failure, not crashes) +- At least one domain-specific verification check +- Simple evaluation: 5+ test cases with expected outcomes +- Deployed and publicly accessible diff --git a/.cursor/rules/product-strategy.mdc b/.cursor/rules/product-strategy.mdc new file mode 100644 index 000000000..a1e6933a3 --- /dev/null +++ b/.cursor/rules/product-strategy.mdc @@ -0,0 +1,152 @@ +--- +description: Product strategy, vision, and accountability for the Ghostfolio CSV Import Auditor reasoning agent +globs: +alwaysApply: true +--- + +# Ghostfolio CSV Import Auditor — Product Strategy + +## Product Vision + +For Ghostfolio users managing personal investment portfolios who need reliable, safe, and verifiable transaction imports, the CSV Import Auditor is a deterministic reasoning agent embedded in Ghostfolio that validates, audits, previews, and safely commits broker CSV imports. + +**Core Purpose**: Eliminate financial data corruption during portfolio imports. + +**Differentiation**: Unlike manual CSV imports or loosely validated parsers, this product guarantees schema correctness, duplicate prevention, accounting invariants, deterministic state verification, and transactional safety before any database mutation. + +## Insights + +### Market + +- Ghostfolio serves thousands of personal portfolio users +- CSV import reliability is a real friction point +- Imports mutate persistent financial state — high trust requirement +- Most portfolio tools provide basic parsers without deterministic validation, transactional state verification, or idempotency protection + +### Trends + +- Movement toward AI-assisted financial tooling +- Demand for explainability and auditability +- Increased distrust in opaque AI automation +- Shift toward deterministic + AI hybrid systems + +### Customer Personas + +**Primary — DIY Investor**: Imports broker exports, low tolerance for data corruption, wants preview + confirmation before commit. + +**Secondary — Power User**: Imports from multiple brokers, needs repeatable idempotent workflow. + +**Core need**: Correctness > speed. + +## Challenges + +### Technical + +- Broker CSV variability +- Deterministic schema validation +- Duplicate detection accuracy +- Safe transactional DB writes +- State mutation protection + +### Customer Pain Points + +- Fear of corrupting portfolio data +- Confusing CSV error messages +- Duplicate transactions after re-import +- Lack of preview transparency + +### GTM Risks + +- PR acceptance into Ghostfolio OSS repo +- Must integrate without architectural disruption +- Must demonstrate reliability via eval data + +### Compliance + +- Financial data integrity expectations +- Audit logging required +- Must prevent partial writes + +## Architecture & Approach + +### Pipeline (6 Tools) + +``` +parseCSV → mapBrokerFields → validateTransactions → detectDuplicates → previewImportReport → commitImport +``` + +- Custom deterministic orchestrator (no LangChain/LangGraph/CrewAI) +- Strict Zod schemas, deterministic outputs, explicit error handling +- Only `commitImport` mutates state + +### Verification Layer (Non-Negotiable) + +No commit without 100% deterministic pass on ALL checks: + +1. Schema validation (required fields, types, formats) +2. Numeric invariants (quantity >= 0, commission >= 0, price logic) +3. Currency consistency +4. Duplicate prevention (idempotency) +5. Deterministic state signature +6. Transactional DB boundary + +### Observability (Langfuse) + +Per import: one trace (`agent.import_audit`), one span per tool, `toolCountPlanned` vs `toolCountExecuted`, latency per tool, token usage, commit status, validation metrics. + +### Evaluation Strategy + +**Layer 1 — Deterministic (Primary)**: 50+ CSV test cases, schema + invariant enforcement, idempotency validation, CI gated. + +**Layer 2 — LLM-as-Judge (Secondary)**: Scores explanation clarity, logical completeness, issue detection quality, hallucination presence. Judge never controls commit. + +## Do's and Don'ts + +### Do + +- Deterministic before AI +- Verification before mutation +- Audit everything +- Eval-gated release + +### Don't + +- No auto-commit without user approval +- No per-row LLM inference +- No silent partial writes +- No hidden tool side effects + +## Accountability + +### North Star Metric + +**Zero corrupted imports.** + +### Success Metrics + +| Metric | Target | +|---|---| +| Tool success rate | >95% | +| Deterministic validation pass rate | 100% | +| Eval pass rate | >80% | +| Duplicate detection accuracy | >95% | +| Small CSV latency | <2s | +| Large CSV latency | <15s | +| Hallucination rate (preview) | <5% | + +### Operational Metrics + +- Import failure reasons categorized +- State signature verification rate +- Commit rollback occurrences +- Mapping confidence scores + +## Strategic Positioning + +This is NOT an AI CSV parser. This is: + +- A **deterministic financial safety layer** +- A **structured reasoning agent** +- An **audit-first import system** +- A **verification-gated mutation controller** +- An **eval-driven OSS contribution** diff --git a/.cursor/skills/frontend-design/SKILL.md b/.cursor/skills/frontend-design/SKILL.md new file mode 100644 index 000000000..0604785f0 --- /dev/null +++ b/.cursor/skills/frontend-design/SKILL.md @@ -0,0 +1,62 @@ +--- +name: frontend-design +description: Create distinctive, production-grade frontend interfaces with high design quality. Use when building web components, pages, dashboards, React components, HTML/CSS layouts, or when styling/beautifying any web UI. Generates creative, polished code that avoids generic AI aesthetics. +--- + +# Frontend Design + +Create distinctive, production-grade frontend interfaces that avoid generic "AI slop" aesthetics. Implement real working code with exceptional attention to aesthetic details and creative choices. + +## Design Thinking + +Before coding, understand the context and commit to a **bold** aesthetic direction: + +- **Purpose**: What problem does this interface solve? Who uses it? +- **Tone**: Pick an extreme: brutally minimal, maximalist chaos, retro-futuristic, organic/natural, luxury/refined, playful/toy-like, editorial/magazine, brutalist/raw, art deco/geometric, soft/pastel, industrial/utilitarian, etc. Use these for inspiration but design one true to the aesthetic direction. +- **Constraints**: Technical requirements (framework, performance, accessibility). +- **Differentiation**: What makes this unforgettable? What's the one thing someone will remember? + +**CRITICAL**: Choose a clear conceptual direction and execute it with precision. Bold maximalism and refined minimalism both work — the key is intentionality, not intensity. + +Then implement working code (HTML/CSS/JS, React, Vue, etc.) that is: + +- Production-grade and functional +- Visually striking and memorable +- Cohesive with a clear aesthetic point-of-view +- Meticulously refined in every detail + +## Frontend Aesthetics Guidelines + +### Typography + +Choose fonts that are beautiful, unique, and interesting. Avoid generic fonts like Arial and Inter; opt for distinctive choices that elevate aesthetics — unexpected, characterful font choices. Pair a distinctive display font with a refined body font. + +### Color & Theme + +Commit to a cohesive aesthetic. Use CSS variables for consistency. Dominant colors with sharp accents outperform timid, evenly-distributed palettes. + +### Motion + +Use animations for effects and micro-interactions. Prioritize CSS-only solutions for HTML. Use Motion library for React when available. Focus on high-impact moments: one well-orchestrated page load with staggered reveals (`animation-delay`) creates more delight than scattered micro-interactions. Use scroll-triggering and hover states that surprise. + +### Spatial Composition + +Unexpected layouts. Asymmetry. Overlap. Diagonal flow. Grid-breaking elements. Generous negative space OR controlled density. + +### Backgrounds & Visual Details + +Create atmosphere and depth rather than defaulting to solid colors. Add contextual effects and textures that match the overall aesthetic. Apply creative forms like gradient meshes, noise textures, geometric patterns, layered transparencies, dramatic shadows, decorative borders, custom cursors, and grain overlays. + +## Anti-Patterns (Never Do These) + +- Overused font families (Inter, Roboto, Arial, system fonts) +- Cliched color schemes (particularly purple gradients on white backgrounds) +- Predictable layouts and cookie-cutter component patterns +- Converging on common choices (e.g. Space Grotesk) across generations +- Generic design that lacks context-specific character + +## Key Principle + +Match implementation complexity to the aesthetic vision. Maximalist designs need elaborate code with extensive animations and effects. Minimalist or refined designs need restraint, precision, and careful attention to spacing, typography, and subtle details. Elegance comes from executing the vision well. + +Interpret creatively and make unexpected choices that feel genuinely designed for the context. No design should be the same. Vary between light and dark themes, different fonts, different aesthetics. diff --git a/.cursor/skills/langchain-architecture/SKILL.md b/.cursor/skills/langchain-architecture/SKILL.md new file mode 100644 index 000000000..ae08e0d91 --- /dev/null +++ b/.cursor/skills/langchain-architecture/SKILL.md @@ -0,0 +1,152 @@ +--- +name: langchain-architecture +description: Design LLM applications using LangChain 1.x and LangGraph for agents, memory, and tool integration. Use when building LangChain applications, implementing AI agents, creating complex LLM workflows, or working with LangGraph state management. +--- + +# LangChain & LangGraph Architecture + +Modern LangChain 1.x and LangGraph patterns for building production-grade LLM applications with agents, state management, memory, and tool integration. + +## When to Use This Skill + +- Building autonomous AI agents with tool access +- Implementing complex multi-step LLM workflows +- Managing conversation memory and state +- Integrating LLMs with external data sources and APIs +- Creating modular, reusable LLM application components +- Implementing document processing pipelines + +## Package Structure (LangChain 1.x) + +``` +langchain (1.2.x) # High-level orchestration +langchain-core (1.2.x) # Core abstractions (messages, prompts, tools) +langchain-community # Third-party integrations +langgraph # Agent orchestration and state management +langchain-openai # OpenAI integrations +langchain-anthropic # Anthropic/Claude integrations +langchain-voyageai # Voyage AI embeddings +langchain-pinecone # Pinecone vector store +``` + +## Core Concepts + +### LangGraph Agents + +LangGraph is the standard for building agents. Key features: + +- **StateGraph**: Explicit state management with typed state +- **Durable Execution**: Agents persist through failures +- **Human-in-the-Loop**: Inspect and modify state at any point +- **Memory**: Short-term and long-term memory across sessions +- **Checkpointing**: Save and resume agent state + +Agent patterns: **ReAct** (`create_react_agent`), **Plan-and-Execute**, **Multi-Agent** (supervisor routing), **Tool-Calling** (Pydantic schemas). + +### State Management + +LangGraph uses TypedDict for explicit state: + +```python +from typing import Annotated, TypedDict +from langgraph.graph import MessagesState + +class AgentState(MessagesState): + context: Annotated[list, "retrieved documents"] + +class CustomState(TypedDict): + messages: Annotated[list, "conversation history"] + context: Annotated[dict, "retrieved context"] + current_step: str + results: list +``` + +### Memory Systems + +- **MemorySaver**: In-memory checkpointer (development) +- **PostgresSaver**: Production checkpointer with PostgreSQL +- **VectorStore Memory**: Semantic similarity retrieval for long-term context +- Each `thread_id` maintains separate conversation state + +### Document Processing + +Components: **Document Loaders** (various sources), **Text Splitters** (intelligent chunking), **Vector Stores** (embeddings storage), **Retrievers** (relevant document fetching). + +### Callbacks & Tracing + +LangSmith is the standard for observability — request/response logging, token tracking, latency monitoring, error tracking, trace visualization. + +```python +import os +os.environ["LANGCHAIN_TRACING_V2"] = "true" +os.environ["LANGCHAIN_API_KEY"] = "your-api-key" +os.environ["LANGCHAIN_PROJECT"] = "my-project" +``` + +## Quick Start: ReAct Agent + +```python +from langgraph.prebuilt import create_react_agent +from langgraph.checkpoint.memory import MemorySaver +from langchain_anthropic import ChatAnthropic +from langchain_core.tools import tool + +llm = ChatAnthropic(model="claude-sonnet-4-6", temperature=0) + +@tool +def search_database(query: str) -> str: + """Search internal database for information.""" + return f"Results for: {query}" + +checkpointer = MemorySaver() +agent = create_react_agent(llm, [search_database], checkpointer=checkpointer) + +config = {"configurable": {"thread_id": "user-123"}} +result = await agent.ainvoke( + {"messages": [("user", "Search for Python tutorials")]}, + config=config +) +``` + +## Architecture Patterns + +Four key patterns (full code in [reference.md](reference.md)): + +1. **RAG with LangGraph**: StateGraph with retrieve → generate nodes, VoyageAI embeddings + Pinecone vector store +2. **Custom Agent with Structured Tools**: Pydantic schemas via `StructuredTool.from_function` +3. **Multi-Step Workflow**: StateGraph with conditional routing between extract → analyze → summarize nodes +4. **Multi-Agent Orchestration**: Supervisor pattern routing between specialized agents (researcher, writer, reviewer) + +## Performance Optimization + +- **Caching**: Redis-backed LLM cache via `langchain_community.cache.RedisCache` +- **Async Batch Processing**: `asyncio.gather` for parallel document processing +- **Connection Pooling**: Reuse Pinecone/database clients across requests +- **Streaming**: Use `astream_events` for real-time token streaming + +## Common Pitfalls + +1. **Using Deprecated APIs**: Use LangGraph for agents, not `initialize_agent` +2. **Memory Overflow**: Use checkpointers with TTL for long-running agents +3. **Poor Tool Descriptions**: Clear descriptions help LLM select correct tools +4. **Context Window Exceeded**: Use summarization or sliding window memory +5. **No Error Handling**: Wrap tool functions with try/except +6. **Blocking Operations**: Use async methods (`ainvoke`, `astream`) +7. **Missing Observability**: Always enable LangSmith tracing in production + +## Production Checklist + +- [ ] Use LangGraph StateGraph for agent orchestration +- [ ] Implement async patterns throughout (`ainvoke`, `astream`) +- [ ] Add production checkpointer (PostgreSQL, Redis) +- [ ] Enable LangSmith tracing +- [ ] Implement structured tools with Pydantic schemas +- [ ] Add timeout limits for agent execution +- [ ] Implement rate limiting +- [ ] Add comprehensive error handling +- [ ] Write integration tests for agent workflows + +## Additional Resources + +- For complete code examples of all patterns, see [reference.md](reference.md) +- [LangChain Docs](https://python.langchain.com/docs/) | [LangGraph Docs](https://langchain-ai.github.io/langgraph/) | [LangSmith](https://smith.langchain.com/) diff --git a/.cursor/skills/langchain-architecture/reference.md b/.cursor/skills/langchain-architecture/reference.md new file mode 100644 index 000000000..8826fecb9 --- /dev/null +++ b/.cursor/skills/langchain-architecture/reference.md @@ -0,0 +1,336 @@ +# LangChain Architecture — Full Code Examples + +## Pattern 1: RAG with LangGraph + +```python +from langgraph.graph import StateGraph, START, END +from langchain_anthropic import ChatAnthropic +from langchain_voyageai import VoyageAIEmbeddings +from langchain_pinecone import PineconeVectorStore +from langchain_core.documents import Document +from langchain_core.prompts import ChatPromptTemplate +from typing import TypedDict, Annotated + +class RAGState(TypedDict): + question: str + context: Annotated[list[Document], "retrieved documents"] + answer: str + +llm = ChatAnthropic(model="claude-sonnet-4-6") +embeddings = VoyageAIEmbeddings(model="voyage-3-large") +vectorstore = PineconeVectorStore(index_name="docs", embedding=embeddings) +retriever = vectorstore.as_retriever(search_kwargs={"k": 4}) + +async def retrieve(state: RAGState) -> RAGState: + docs = await retriever.ainvoke(state["question"]) + return {"context": docs} + +async def generate(state: RAGState) -> RAGState: + prompt = ChatPromptTemplate.from_template( + """Answer based on the context below. If you cannot answer, say so. + + Context: {context} + Question: {question} + Answer:""" + ) + context_text = "\n\n".join(doc.page_content for doc in state["context"]) + response = await llm.ainvoke( + prompt.format(context=context_text, question=state["question"]) + ) + return {"answer": response.content} + +builder = StateGraph(RAGState) +builder.add_node("retrieve", retrieve) +builder.add_node("generate", generate) +builder.add_edge(START, "retrieve") +builder.add_edge("retrieve", "generate") +builder.add_edge("generate", END) + +rag_chain = builder.compile() +result = await rag_chain.ainvoke({"question": "What is the main topic?"}) +``` + +## Pattern 2: Custom Agent with Structured Tools + +```python +from langchain_core.tools import StructuredTool +from pydantic import BaseModel, Field + +class SearchInput(BaseModel): + query: str = Field(description="Search query") + filters: dict = Field(default={}, description="Optional filters") + +class EmailInput(BaseModel): + recipient: str = Field(description="Email recipient") + subject: str = Field(description="Email subject") + content: str = Field(description="Email body") + +async def search_database(query: str, filters: dict = {}) -> str: + return f"Results for '{query}' with filters {filters}" + +async def send_email(recipient: str, subject: str, content: str) -> str: + return f"Email sent to {recipient}" + +tools = [ + StructuredTool.from_function( + coroutine=search_database, + name="search_database", + description="Search internal database", + args_schema=SearchInput + ), + StructuredTool.from_function( + coroutine=send_email, + name="send_email", + description="Send an email", + args_schema=EmailInput + ) +] + +agent = create_react_agent(llm, tools) +``` + +## Pattern 3: Multi-Step Workflow with StateGraph + +```python +from langgraph.graph import StateGraph, START, END +from typing import TypedDict, Literal + +class WorkflowState(TypedDict): + text: str + entities: list + analysis: str + summary: str + current_step: str + +async def extract_entities(state: WorkflowState) -> WorkflowState: + prompt = f"Extract key entities from: {state['text']}\n\nReturn as JSON list." + response = await llm.ainvoke(prompt) + return {"entities": response.content, "current_step": "analyze"} + +async def analyze_entities(state: WorkflowState) -> WorkflowState: + prompt = f"Analyze these entities: {state['entities']}\n\nProvide insights." + response = await llm.ainvoke(prompt) + return {"analysis": response.content, "current_step": "summarize"} + +async def generate_summary(state: WorkflowState) -> WorkflowState: + prompt = f"""Summarize: + Entities: {state['entities']} + Analysis: {state['analysis']} + Provide a concise summary.""" + response = await llm.ainvoke(prompt) + return {"summary": response.content, "current_step": "complete"} + +def route_step(state: WorkflowState) -> Literal["analyze", "summarize", "end"]: + step = state.get("current_step", "extract") + if step == "analyze": + return "analyze" + elif step == "summarize": + return "summarize" + return "end" + +builder = StateGraph(WorkflowState) +builder.add_node("extract", extract_entities) +builder.add_node("analyze", analyze_entities) +builder.add_node("summarize", generate_summary) + +builder.add_edge(START, "extract") +builder.add_conditional_edges("extract", route_step, { + "analyze": "analyze", "summarize": "summarize", "end": END +}) +builder.add_conditional_edges("analyze", route_step, { + "summarize": "summarize", "end": END +}) +builder.add_edge("summarize", END) + +workflow = builder.compile() +``` + +## Pattern 4: Multi-Agent Orchestration + +```python +from langgraph.graph import StateGraph, START, END +from langgraph.prebuilt import create_react_agent +from typing import Literal + +class MultiAgentState(TypedDict): + messages: list + next_agent: str + +researcher = create_react_agent(llm, research_tools) +writer = create_react_agent(llm, writing_tools) +reviewer = create_react_agent(llm, review_tools) + +async def supervisor(state: MultiAgentState) -> MultiAgentState: + prompt = f"""Based on the conversation, which agent should handle this? + Options: researcher, writer, reviewer, FINISH + Messages: {state['messages']} + Respond with just the agent name.""" + response = await llm.ainvoke(prompt) + return {"next_agent": response.content.strip().lower()} + +def route_to_agent(state: MultiAgentState) -> Literal["researcher", "writer", "reviewer", "end"]: + next_agent = state.get("next_agent", "").lower() + if next_agent == "finish": + return "end" + return next_agent if next_agent in ["researcher", "writer", "reviewer"] else "end" + +builder = StateGraph(MultiAgentState) +builder.add_node("supervisor", supervisor) +builder.add_node("researcher", researcher) +builder.add_node("writer", writer) +builder.add_node("reviewer", reviewer) + +builder.add_edge(START, "supervisor") +builder.add_conditional_edges("supervisor", route_to_agent, { + "researcher": "researcher", "writer": "writer", + "reviewer": "reviewer", "end": END +}) +for agent in ["researcher", "writer", "reviewer"]: + builder.add_edge(agent, "supervisor") + +multi_agent = builder.compile() +``` + +## Memory: Token-Based with LangGraph + +```python +from langgraph.checkpoint.memory import MemorySaver + +checkpointer = MemorySaver() +agent = create_react_agent(llm, tools, checkpointer=checkpointer) + +config = {"configurable": {"thread_id": "session-abc123"}} +result1 = await agent.ainvoke({"messages": [("user", "My name is Alice")]}, config) +result2 = await agent.ainvoke({"messages": [("user", "What's my name?")]}, config) +``` + +## Memory: Production PostgreSQL + +```python +from langgraph.checkpoint.postgres import PostgresSaver + +checkpointer = PostgresSaver.from_conn_string( + "postgresql://user:pass@localhost/langgraph" +) +agent = create_react_agent(llm, tools, checkpointer=checkpointer) +``` + +## Memory: Vector Store for Long-Term Context + +```python +from langchain_community.vectorstores import Chroma +from langchain_voyageai import VoyageAIEmbeddings + +embeddings = VoyageAIEmbeddings(model="voyage-3-large") +memory_store = Chroma( + collection_name="conversation_memory", + embedding_function=embeddings, + persist_directory="./memory_db" +) + +async def retrieve_relevant_memory(query: str, k: int = 5) -> list: + docs = await memory_store.asimilarity_search(query, k=k) + return [doc.page_content for doc in docs] + +async def store_memory(content: str, metadata: dict = {}): + await memory_store.aadd_texts([content], metadatas=[metadata]) +``` + +## Custom Callback Handler + +```python +from langchain_core.callbacks import BaseCallbackHandler +from typing import Any, Dict, List + +class CustomCallbackHandler(BaseCallbackHandler): + def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs) -> None: + print(f"LLM started with {len(prompts)} prompts") + + def on_llm_end(self, response, **kwargs) -> None: + print(f"LLM completed: {len(response.generations)} generations") + + def on_tool_start(self, serialized: Dict[str, Any], input_str: str, **kwargs) -> None: + print(f"Tool started: {serialized.get('name')}") + + def on_tool_end(self, output: str, **kwargs) -> None: + print(f"Tool completed: {output[:100]}...") + +result = await agent.ainvoke( + {"messages": [("user", "query")]}, + config={"callbacks": [CustomCallbackHandler()]} +) +``` + +## Streaming Responses + +```python +llm = ChatAnthropic(model="claude-sonnet-4-6", streaming=True) + +async for chunk in llm.astream("Tell me a story"): + print(chunk.content, end="", flush=True) + +async for event in agent.astream_events( + {"messages": [("user", "Search and summarize")]}, + version="v2" +): + if event["event"] == "on_chat_model_stream": + print(event["data"]["chunk"].content, end="") + elif event["event"] == "on_tool_start": + print(f"\n[Using tool: {event['name']}]") +``` + +## Performance: Redis Caching + +```python +from langchain_community.cache import RedisCache +from langchain_core.globals import set_llm_cache +import redis + +redis_client = redis.Redis.from_url("redis://localhost:6379") +set_llm_cache(RedisCache(redis_client)) +``` + +## Performance: Async Batch Processing + +```python +import asyncio +from langchain_core.documents import Document + +async def process_documents(documents: list[Document]) -> list: + tasks = [process_single(doc) for doc in documents] + return await asyncio.gather(*tasks) + +async def process_single(doc: Document) -> dict: + chunks = text_splitter.split_documents([doc]) + embeddings = await embeddings_model.aembed_documents( + [c.page_content for c in chunks] + ) + return {"doc_id": doc.metadata.get("id"), "embeddings": embeddings} +``` + +## Testing Strategies + +```python +import pytest +from unittest.mock import AsyncMock, patch + +@pytest.mark.asyncio +async def test_agent_tool_selection(): + with patch.object(llm, 'ainvoke') as mock_llm: + mock_llm.return_value = AsyncMock(content="Using search_database") + result = await agent.ainvoke({ + "messages": [("user", "search for documents")] + }) + assert "search_database" in str(result) + +@pytest.mark.asyncio +async def test_memory_persistence(): + config = {"configurable": {"thread_id": "test-thread"}} + await agent.ainvoke( + {"messages": [("user", "Remember: the code is 12345")]}, config + ) + result = await agent.ainvoke( + {"messages": [("user", "What was the code?")]}, config + ) + assert "12345" in result["messages"][-1].content +```