Financial Analysis AI Agent
Part 1: Core Systems and Architecture
System Overview
The Financial Analysis Agent is an intelligent system designed to analyze financial information using multiple specialized agents that communicate with microservice tools (uAgents) through Agentverse. The system combines SEC filing analysis with real-time market data to provide comprehensive financial insights.
Architecture Diagram
data:image/s3,"s3://crabby-images/1f96b/1f96b1a1db23bfd2c8cba4d395ba846e96bb1809" alt="tech-architecture"
Implementation Details
Core Components
Research Team State
class ResearchTeamState(TypedDict):
messages: List[BaseMessage] # Conversation history
team_members: List[str] # Active team members
next: str # Next agent to act
information_needed: List[str] # Required information
reasoning: str # Decision reasoning
Specialized Agents
1. SEC Analysis Agent
- Purpose: Analyzes SEC filings and financial documents
- Key Features:
- Communicates with RAG Tool uAgent through Agentverse
- Focuses on historical financial data
- Processes regulatory filings
- Implementation:
class SECAnalyst:
def __init__(self):
# Hardcoded address of RAG Tool uAgent
self.rag_tool_address ="agent1qdaeq9k7ty2xjp5ylpex0ezxzlg30cc8n3lpvrgh4sqjm863hm0vusghkzu"
async def analyze_filings(self, query):
try:
send_message_to_agent(
financial_identity,
self.rag_tool_address,
{"message": query}
)
return {"status": "filing_analysis_requested"}
except Exception as e:
logger.error(f"Error in filing analysis: {e}")
return {"error": str(e)}
2. Search Agent
-
Purpose: Gathers real-time market information
-
Key Features:
- Communicates with Search Tool uAgent through Agentverse
- Focuses on current market data
-
Retrieves analyst opinions
-
Implementation:
class SearchAgent:
def __init__(self):
# Hardcoded address of Search Tool uAgent
self.search_tool_address = "agent1qgyqf7l0djgvnd37f4fwvrv8xu7he2eatht328u05uy3vc9j33p7qe2dxtr"
async def analyze_market(self, query):
try:
send_message_to_agent(
financial_identity,
self.search_tool_address,
{"message": query}
)
return {"status": "market_analysis_requested"}
except Exception as e:
logger.error(f"Error in market analysis: {e}")
return {"error": str(e)}
3. Supervisor Agent
- Purpose: Coordinates analysis flow
- Key Features:
- Routes queries to appropriate specialists
- Aggregates responses
- Manages analysis flow
- Implementation managed through research graph
Message Models
# Shared between AI Agents and uAgents
class Request(Model):
message: str
class Response(Model):
response: str
Communication Flow
1. Query Flow
- User submits query
- Supervisor analyzes and routes
- Specialist agents communicate with uAgents
- Results aggregated and returned
2. Message Protocol
# Sending messages to uAgents
send_message_to_agent(
sender_identity, # Financial agent identity
tool_address, # uAgent address
payload, # Message payload
)
# Receiving messages (webhook)
@app.route('/webhook', methods=['POST'])
def webhook():
data = request.get_data().decode("utf-8")
message = parse_message_from_agent(data)
return jsonify({"status": "success"})
Part 2: Tool uAgents Implementation
Search Tool uAgent
Basic Implementation
from uagents import Agent, Context, Model
from uagents.setup import fund_agent_if_low
from tavily import Client
# Message Models
class Request(Model):
message: str
class Response(Model):
response: str
# Agent Setup
search_agent = Agent(
name="Market Search Tool",
port=8000,
seed="SEARCH_TOOL_SEED",
endpoint=["http://localhost:8000/submit"]
)
# Startup Handler
@search_agent.on_event("startup")
async def startup_handler(ctx: Context):
await fund_agent_if_low(ctx.ledger)
ctx.logger.info(f"Search Tool Agent started at address: {ctx.address}")
# Message Handler
@search_agent.on_message(model=Request)
async def handle_search(ctx: Context, sender: str, msg: Request):
"""Handle market research requests"""
try:
# Initialize Tavily client
tavily_client = Client(api_key=os.getenv("TAVILY_API_KEY"))
# Perform search
search_result = tavily_client.search(msg.message)
# Send response
await ctx.send(
sender,
Response(response=str(search_result))
)
except Exception as e:
ctx.logger.error(f"Search error: {e}")
await ctx.send(
sender,
Response(response=f"Error performing search: {str(e)}")
)
# Run Agent
if __name__ == "__main__":
search_agent.run()
RAG Tool uAgent
Basic Implementation
from uagents import Agent, Context, Model
from uagents.setup import fund_agent_if_low
g
# Message Models
class Request(Model):
message: str
class Response(Model):
response: str
# Agent Setup
rag_agent = Agent(
name="RAG Analysis Tool",
port=8001,
seed="RAG_TOOL_SEED",
endpoint=["http://localhost:8001/submit"]
)
# Startup Handler
@rag_agent.on_event("startup")
async def startup_handler(ctx: Context):
await fund_agent_if_low(ctx.ledger)
ctx.logger.info(f"RAG Tool Agent started at address: {ctx.address}")
# Message Handler
@rag_agent.on_message(model=Request)
async def handle_analysis(ctx: Context, sender: str, msg: Request):
"""Handle document analysis requests"""
try:
# Initialize RAG system
loader = DocumentLoader("data/raw/apple_10k.pdf")
rag_chain = create_rag_chain(loader)
# Perform analysis
analysis_result = rag_chain.invoke(msg.message)
# Send response
await ctx.send(
sender,
Response(response=str(analysis_result))
)
except Exception as e:
ctx.logger.error(f"RAG analysis error: {e}")
await ctx.send(
sender,
Response(response=f"Error performing analysis: {str(e)}")
)
# Run Agent
if __name__ == "__main__":
rag_agent.run()
Tool Agent Setup and Configuration
Environment Setup
# Required Environment Variables
TAVILY_API_KEY=your_tavily_api_key
SEARCH_TOOL_SEED=your_search_tool_seed
RAG_TOOL_SEED=your_rag_tool_seed
Tool Deployment Script
# tools_setup.py
import os
from dotenv import load_dotenv
def setup_tools():
"""Setup and validate tool environments"""
load_dotenv()
# Validate required environment variables
required_vars = [
"TAVILY_API_KEY",
"SEARCH_TOOL_SEED",
"RAG_TOOL_SEED"
]
for var in required_vars:
if not os.getenv(var):
raise EnvironmentError(f"Missing required environment variable:{var}")
# Additional setup if needed
if __name__ == "__main__":
setup_tools()
Error Handling and Logging
Error Response Format
async def send_error_response(ctx: Context, sender: str, error: Exception):
"""Standardized error response format"""
error_response = {
"status": "error",
"error_type": error.__class__.__name__,
"message": str(error),
"timestamp": ctx.current_timestamp
}
await ctx.send(
sender,
Response(response=str(error_response))
)
Logging Configuration
import logging
def setup_tool_logging(tool_name: str):
"""Configure logging for tool agents"""
logging.basicConfig(
level=logging.INFO,
format=f'%(asctime)s - {tool_name} - %(levelname)s - %(message)s',
handlers=[logging.FileHandler(f"{tool_name.lower()}.log"),logging.StreamHandler()
]
)
Part 3: Helper Functions Implementation
State Management System
What is State?
In our system, state is like the brain's working memory. It keeps track of:
- The entire conversation history
- Which agent is currently working
- What information we still need
- Why we're making certain decisions
Think of it like a group project manager that:
- Remembers what everyone has said
- Knows who should work next
- Keeps track of what information is still missing
- Understands why certain team members should handle specific tasks
State Structure Explained
class ResearchTeamState(TypedDict):
messages: List[BaseMessage]
team_members: List[str]
next: str
information_needed: List[str]
reasoning: str
Each component serves a specific purpose:
1. messages:
- Works like a conversation transcript
- Stores every interaction in order
- Helps agents understand context
- Example: If user asks about "these numbers", agents can look back to see what numbers were discussed
2. team_members:
- Like a team roster
- Lists available specialized agents (Search, SECAnalyst)
- Helps supervisor know who can be assigned tasks
- Example: ["Search", "SECAnalyst"]
3. next:
- Like a "passing the baton" system
- Tells us which agent should act next
- Can also signal when we're done ("FINISH")
- Example: If we need market data, next = "Search"
4. information_needed:
- Like a shopping list of missing information
- Helps agents know what to look for
- Guides the conversation towards completeness
- Example: ["current stock price", "last quarter revenue"]
5. reasoning:
- Like notes explaining decisions
- Helps understand why certain agents were chosen
- Useful for debugging and improvement
- Example: "Choosing SECAnalyst because we need historical financial data"
Helper Functions Explained
A. agent_node Helper
This is like a universal translator and task manager for agents. It:
1. Prepares Information:
- Takes the current state
- Adds any needed information to the query
- Makes sure agent has context
2. Manages Communication:
- Standardizes how agents talk to each other.
- Maintains consistent message format.
- Keeps conversation history organized
def agent_node(state, agent, name):
"""Helper function to create agent nodes."""
try:
# Add information needed to the state if available
if "information_needed" in state:
message_content = f"""Information needed:
{', '.join(state['information_needed'])}
Query: {state['messages'][-1].content}"""
state['messages'][-1] = HumanMessage(content=message_content)
# Process through agent
result = agent.invoke(state)
# Format response
return {
"messages": [
HumanMessage(
content=result["output"],
name=name
) ]
}
except Exception as e:
logger.error(f"Error in agent node {name}: {e}")
raise
B. create_agent Helper
This is like a specialized agent creator that:
1. Sets Up Agent Capabilities:
- Gives agents their special tools
- Defines their area of expertise
- Sets their behavioral guidelines
2. Configures Communication Style:
- Sets up how agent should communicate with tools
- Defines interaction patterns with uAgents
- Establishes message formats
def create_agent(
llm: ChatOpenAI,
tools: list,
system_prompt: str,
) -> AgentExecutor:
"""Create a function-calling agent and add it to the graph."""
try:
# Enhance system prompt with team context
system_prompt += (
"""Work autonomously according to your specialty, using the tools available to you."
Do not ask for clarification.
Your other team members will collaborate with you with their own specialties.
You are chosen for a reason! You are one of the following team members: {team_members}."""
)
# Create prompt template
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
MessagesPlaceholder(variable_name="messages"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
# Create and return agent
agent = create_openai_functions_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools)
return executor
except Exception as e:
logger.error(f"Error creating agent: {e}")
raise
C. create_team_supervisor Helper
This helper creates the supervisor agent that coordinates the team:
def create_team_supervisor(
llm: ChatOpenAI,
system_prompt: str,
members: List[str]
) -> Callable:
"""Create an LLM-based supervisor with enhanced reasoning."""
# Define possible routing options
options = ["FINISH"] + members
# Define routing function schema
function_def = {
"name": "route",
"description": "Select the next role based on query analysis.",
"parameters": {
"title": "routeSchema",
"type": "object",
"properties": {
"next": {
"title": "Next",
"anyOf": [{"enum": options}],
},
"reasoning": {
"title": "Reasoning",
"type": "string",
"description": "Explanation for why this agent should
},
"information_needed": {
"title": "Information Needed",
"type": "array",
"items": {"type": "string"},
"description": "List of specific information needed from
} },
"required": ["next", "reasoning", "information_needed"],
},
}
# Create prompt template
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
MessagesPlaceholder(variable_name="messages"),
(
"system",
"""Given the conversation above, who should act next? Consider:
1. What information do we have?
2. What's still missing?
3. Which agent can best provide the missing information?
Select from: {options}"""
),
]).partial(options=str(options), team_members=", ".join(members))
# Create and return supervisor
return (
prompt | llm.bind_functions(functions=[function_def],function_call="route") | JsonOutputFunctionsParser()
)
Why these helpers are important:
- Standardize agent creation and communication
- Ensure consistent behavior across the system
- Make adding new agents and modifying behavior easier Maintain clean separation of concerns
- Enable structured team coordination
These helper functions form the backbone of our agent system, enabling:
- Standardized agent creation
- Consistent communication patterns
- Proper state management
- Team coordination
- Easy system expansion
Research Graph System
The research graph is like a traffic control system that:
1. Manages Flow:
- Directs queries to right agents
- Handles responses
- Coordinates between agents
2. Controls Transitions:
- Decides when to switch agents
- Determines when task is complete
- Manages information flow
Example Flow:
User Query → Supervisor → Search Agent → Supervisor → SEC Agent → Supervisor → Response
Why it matters:
- Ensures orderly process
- Prevents chaos in communication
- Maintains system organization
Practical Example
Let's say user asks: "How is Apple's revenue trending?"
1. State Tracks:
- The original question
- That Search agent checked market data
- That SEC agent checked financial filings
- What each agent found
- Which information is still needed
2. Helpers Manage:
- Getting market data from Search agent
- Getting filing data from SEC agent Combining information
- Formatting response
3. Graph Controls:
- First getting market trends
- Then getting SEC filing data
- Finally combining for complete answer
This system ensures:
- No information is lost
- All necessary sources are checked
- Response is complete and accurate
Part 4: System Integration and Setup
System Components Integration
A. Project Structure
src/
├── agents/ # AI Agent Components
│ ├── __init__.py
│ ├── search_agent.py # Search specialist using Search uAgent
│ ├── sec_agent.py # SEC specialist using RAG uAgent
│ └── supervisor.py # Team coordinator
├── agentverse/
│ ├── __init__.py
│ └── register.py # Financial Agent registration with Agentverse
├── graph/
│ ├── __init__.py
│ └── state.py # Research team state management
├── tool_agents/
│ ├── __init__.py
│ ├── search_tool/ # uAgent Tools (New)
│ │
│ │
│ │
│ └── rag_tool/
│ ├── __init__.py
│ ├── agent.py # RAG Tool uAgent
│ └── rag_service.py
└── utils/
├── __init__.py
└── helpers.py # Helper functions for agents
B. Environment Configuration
# .env file
# AI Agent Configuration
FINANCIAL_AGENT_KEY="your_financial_agent_seed"
AGENTVERSE_API_KEY="your_agentverse_api_key"
# Tool uAgents Configuration
SEARCH_TOOL_SEED="your_search_tool_seed"
RAG_TOOL_SEED="your_rag_tool_seed"
# External Services
TAVILY_API_KEY="your_tavily_api_key"
# Server Configuration
AI_AGENT_PORT=5008
SEARCH_TOOL_PORT=8000
RAG_TOOL_PORT=8001
Component Startup Sequence
A. Tool uAgents Startup
# tools/startup.py
from search_tool.agent import search_agent
from rag_tool.agent import rag_agent
import asyncio
async def start_tools():
"""Start all tool uAgents"""
try:
# Start search tool
search_task = asyncio.create_task(search_agent.run())
print(f"Search Tool started at address: {search_agent.address}")
# Start RAG tool
rag_task = asyncio.create_task(rag_agent.run())
print(f"RAG Tool started at address: {rag_agent.address}")
# Wait for both agents
await asyncio.gather(search_task, rag_task)
except Exception as e:
print(f"Error starting tools: {e}")
raise
if __name__ == "__main__":
asyncio.run(start_tools())
B. Financial Analysis Agent Startup
# ai_agent/main.py
from dotenv import load_dotenv
from specialist_agents import SearchAgent, SECAnalyst
from utils.helpers import init_agent
def start_financial_agent():
"""Initialize and start the Financial Analysis Agent"""
try:
# Load environment variables
load_dotenv()
# Initialize agent
init_agent()
# Start Flask server
app.run(
host="0.0.0.0",
port=int(os.getenv("AI_AGENT_PORT", 5008))
)
except Exception as e:
print(f"Error starting Financial Agent: {e}")
raise
if __name__ == "__main__":
start_financial_agent()
Communication Setup
A. Tool Address Configuration
# ai_agent/config/addresses.py
from typing import NamedTuple
class ToolAddresses(NamedTuple):
search_tool: str =
"agent1qgyqf7l0djgvnd37f4fwvrv8xu7he2eatht328u05uy3vc9j33p7qe2dxtr"
rag_tool: str =
"agent1qdaeq9k7ty2xjp5ylpex0ezxzlg30cc8n3lpvrgh4sqjm863hm0vusghkzu"
# Usage in specialist agents
from config.addresses import ToolAddresses
class SearchAgent:
def __init__(self):
self.tool_address = ToolAddresses.search_tool
B. Message Handling Setup
# ai_agent/utils/messaging.py
from fetchai.communication import send_message_to_agent,
parse_message_from_agent
async def send_to_tool(identity, tool_address: str, message: str):
"""Send message to tool uAgent"""
try:
send_message_to_agent(
identity,
tool_address,
{"message": message}
)
return {"status": "message_sent"}
except Exception as e:
logger.error(f"Error sending message: {e}")
return {"error": str(e)}
# Webhook handler
@app.route('/webhook', methods=['POST'])
def webhook():
"""Handle incoming messages from tools"""
try:
data = request.get_data().decode("utf-8")
message = parse_message_from_agent(data)
# Process response
process_tool_response(message.payload)
return jsonify({"status": "success"})
except Exception as e:
logger.error(f"Webhook error: {e}")
return jsonify({"error": str(e)}), 500