Creating and Registering LangGraph based Financial Analysis Agent
This documentation explains how to build LangGraph agents that perform comprehensive financial analysis, register them on Fetch.ai's Agentverse, and enable collaboration between specialized agents for dynamic data analysis. Below, you'll see the main.py script containing the workflow logic run_financial_analysis_workflow
, as well as specialized agent implementations for financial analysis.
Overview
Purpose
This project showcases:
- Creating LangGraph agents for complex tasks (e.g., analyzing SEC filings, market research, and financial metrics).
- Integrating with Fetch.ai's Agentverse, enabling seamless communication and data exchange.
- Using autonomous agents to streamline financial analysis while combining multiple data sources and expert analysis.
Prerequisites
Environment Setup
To begin creating and registering LangGraph agents, ensure you have the following:
- Python 3.10+ or higher
- A virtual environment (recommended)
- Fetch.ai SDK for agent creation and registration
- LangGraph Framework for defining states and workflows
- LangChain for agent tools and chains
Key Components
- SEC Analysis Agent: A specialized agent that processes SEC filings and extracts financial metrics
- Search Agent: Gathers real-time market data and analyst opinions
- Supervisor Agent: Coordinates analysis flow and combines insights
- Agent Collaboration: Multiple agents work together via LangGraph state management
Architecture Diagram

Environment Variables
Create a .env
file with your API keys:
OPENAI_API_KEY=<your_openai_api_key>
TAVILY_API_KEY=<your_tavily_api_key>
AGENTVERSE_API_KEY=<your_agentverse_api_key>
Replace the placeholders with your actual keys:
Project Directory & Configuration Files
For a clean, maintainable setup, we recommend the following structure:
financial_analysis_agent/
├── .env
├── src/
│ ├── agents/
│ │ ├── __init__.py
│ │ ├── search_agent.py # Market research specialist
│ │ ├── sec_agent.py # SEC filings specialist
│ │ └── supervisor.py # Team coordinator
│ ├── tools/
│ │ ├── __init__.py
│ │ ├── search.py # Tavily search implementation
│ │ └── analysis.py # RAG implementation
│ ├── rag/
│ │ ├── __init__.py
│ │ ├── chain.py # RAG chain setup
│ │ └── loader.py # Document processing
│ ├── graph/
│ │ ├── __init__.py
│ │ └── state.py # State management
│ └── utils/
│ ├── __init__.py
│ └── helpers.py # Helper functions
├── main.py # Main workflow
├── register.py # Agentverse registration
├── requirements.txt
└── README.md
Core Implementation Files
1. requirements.txt
Purpose: Declares project dependencies for installing all necessary packages quickly with pip install -r requirements.txt
.
Create a file called requirements.txt
at the root of your project. Example contents:
langchain==0.1.0
langchain-core==0.1.10
langgraph==0.0.10
fetchai-sdk==0.16.3
flask==2.2.5
flask-cors==3.0.10
python-dotenv==1.0.0
tavily-python==0.2.6
qdrant-client==1.7.0
tiktoken==0.5.2
Install everything at once via:
pip install -r requirements.txt
2. State Management (src/graph/state.py)
Purpose: Defines the state structure and management for the research team's workflow.
Place state.py
in the src/graph
directory. It contains:
from typing import TypedDict, List, Annotated
from langchain_core.messages import BaseMessage
import operator
class ResearchTeamState(TypedDict):
"""State structure for research team coordination."""
messages: Annotated[List[BaseMessage], operator.add] # Conversation history
team_members: List[str] # Available agents
next: str # Next agent to act
information_needed: List[str] # Required information
reasoning: str # Decision reasoning
def create_initial_state(query: str) -> ResearchTeamState:
"""Create initial state from user query."""
return {
"messages": [HumanMessage(content=query)],
"team_members": ["Search", "SECAnalyst"],
"next": "",
"information_needed": [],
"reasoning": ""
}
def update_state(state: ResearchTeamState, agent_response: dict) -> ResearchTeamState:
"""Update state with agent response."""
new_state = state.copy()
new_state["messages"].extend(agent_response["messages"])
return new_state
3. Tools Implementation (src/tools)
Purpose: Implements specialized tools for market research and SEC filing analysis.
A. Search Tool (search.py)
from typing import Annotated
from langchain_core.tools import tool
from tavily import TavilyClient
import os
@tool
def tavily_search(query: str) -> str:
"""Search for real-time market information."""
try:
client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))
result = client.search(query)
return str(result)
except Exception as e:
return f"Error performing search: {str(e)}"
B. Analysis Tool (analysis.py)
from typing import Annotated
from langchain_core.tools import tool
from ..rag.chain import create_rag_chain
@tool
def retrieve_information(query: Annotated[str, "query to analyze financial documents"]) -> str:
"""Analyze SEC filings using RAG."""
try:
rag_chain = create_rag_chain()
return rag_chain.invoke(query)
except Exception as e:
return f"Error analyzing documents: {str(e)}"
4. RAG Implementation (src/rag)
Purpose: Handles document processing and retrieval for SEC filing analysis.
A. Document Loader (loader.py)
class DocumentLoader:
def __init__(self, file_path: str):
self.file_path = file_path
@staticmethod
def tiktoken_len(text):
tokens = tiktoken.encoding_for_model("gpt-4").encode(text)
return len(tokens)
def load_and_split(self):
"""Load and chunk documents for processing."""
docs = PyMuPDFLoader(self.file_path).load()
splitter = RecursiveCharacterTextSplitter(
chunk_size=300,
chunk_overlap=0,
length_function=self.tiktoken_len
)
return splitter.split_documents(docs)
B. RAG Chain (chain.py)
def create_rag_chain(file_path: str = "data/raw/apple_10k.pdf"):
"""Create RAG chain for SEC filing analysis."""
# Initialize document processing
loader = DocumentLoader(file_path)
chunks = loader.load_and_split()
# Set up vector store
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Qdrant.from_documents(
chunks,
embeddings,
location=":memory:",
collection_name="sec_filings"
)
# Create retrieval chain
template = """Use the context to answer financial questions.
Context: {context}
Question: {question}
Answer with specific numbers and data when available."""
prompt = ChatPromptTemplate.from_template(template)
chain = (
{"context": vectorstore.as_retriever(), "question": RunnablePassthrough()}
| prompt
| ChatOpenAI(model="gpt-4-turbo-preview")
| StrOutputParser()
)
return chain
5. Helper Functions (src/utils/helpers.py)
Purpose: Provides utility functions for agent creation and node management.
def create_agent(llm: ChatOpenAI, tools: list, system_prompt: str) -> AgentExecutor:
"""Create a specialized agent with tools and prompt."""
try:
system_prompt += (
"\nWork autonomously using your tools."
" Do not ask for clarification."
" Your team members will help with their specialties."
)
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
MessagesPlaceholder(variable_name="messages"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
agent = create_openai_functions_agent(llm, tools, prompt)
return AgentExecutor(agent=agent, tools=tools)
except Exception as e:
logger.error(f"Error creating agent: {e}")
raise
def agent_node(state, agent, name):
"""Create agent node for the graph."""
try:
if "information_needed" in state:
message_content = f"""Information needed:
{', '.join(state['information_needed'])}
Query: {state['messages'][-1].content}"""
state['messages'][-1] = HumanMessage(content=message_content)
result = agent.invoke(state)
return {
"messages": [
HumanMessage(content=result["output"], name=name)
]
}
except Exception as e:
logger.error(f"Error in agent node {name}: {e}")
raise
Financial Analysis Agent Implementation
A. SEC Analysis Agent (src/agents/sec_agent.py)
A specialized agent that processes SEC filings and financial documents. It uses RAG to analyze documents and extract relevant financial metrics.
# src/agents/sec_agent.py
from langchain.chat_models import ChatOpenAI
from ..tools.analysis import retrieve_information
def create_sec_agent(llm: ChatOpenAI):
"""Creates an agent specialized in SEC filings analysis."""
system_prompt = """You are a financial analyst specialized in SEC filings analysis.
After analyzing SEC filings:
1. If you need market context, clearly state what specific market data you need
2. If numbers need industry comparison, explicitly request competitor data
3. Always include specific numbers and trends from the filings
4. If you spot significant changes or unusual patterns, highlight them
Format your response as:
1. Data from SEC Filings: [your findings]
2. Additional Context Needed: [if any]
3. Analysis: [your insights]
"""
return create_agent(
llm=llm,
tools=[retrieve_information],
system_prompt=system_prompt
)
B. Search Agent (src/agents/search_agent.py)
Handles real-time market research and data gathering using external search tools.
# src/agents/search_agent.py
from langchain.chat_models import ChatOpenAI
from ..tools.search import tavily_search
def create_search_agent(llm: ChatOpenAI):
"""Creates a search agent specialized in market research."""
system_prompt = """You are a research assistant who can search for up-to-date
financial information using the tavily search engine.
When responding:
1. Always cite sources
2. Focus on recent market data and analyst reports
3. If SEC data is mentioned, compare it with current market views
4. Highlight any significant discrepancies with official filings
Format your response as:
1. Market Data: [your findings]
2. Analyst Views: [key opinions]
3. Relevance to SEC Data: [if applicable]
"""
return create_agent(
llm=llm,
tools=[tavily_search],
system_prompt=system_prompt
)
C. Supervisor Agent (src/agents/supervisor.py)
Coordinates between agents and manages the analysis workflow.
# src/agents/supervisor.py
def create_supervisor_agent(llm: ChatOpenAI):
"""Creates the supervisor agent for coordinating analysis."""
function_def = {
"name": "route",
"description": "Select the next role based on query analysis.",
"parameters": {
"title": "routeSchema",
"type": "object",
"properties": {
"next": {
"title": "Next",
"anyOf": [{"enum": ["Search", "SECAnalyst", "FINISH"]}],
},
"reasoning": {
"title": "Reasoning",
"type": "string",
"description": "Explanation for why this agent should act next"
},
"information_needed": {
"title": "Information Needed",
"type": "array",
"items": {"type": "string"},
"description": "List of specific information needed from this agent"
}
},
"required": ["next", "reasoning", "information_needed"],
},
}
prompt = ChatPromptTemplate.from_messages([
("system", """You are a financial research team supervisor.
Your role is to:
1. Analyze incoming queries
2. Determine what information is needed
3. Choose the appropriate agent for each task
4. Coordinate between agents
5. Ensure comprehensive analysis"""),
MessagesPlaceholder(variable_name="messages"),
("system", "Who should act next? Consider available information and agent specialties.")
])
return (
prompt
| llm.bind_functions(functions=[function_def], function_call="route")
| JsonOutputFunctionsParser()
)
Main Workflow Implementation (main.py)
The main script that initializes the LangGraph workflow and handles financial analysis requests.
import os
from dotenv import load_dotenv
from src.rag.chain import create_rag_chain
from src.graph.state import create_research_graph
def init_financial_system():
"""Initialize the RAG and research system."""
try:
# Create RAG chain for SEC document analysis
rag_chain = create_rag_chain("data/raw/apple_10k.pdf")
# Initialize research graph with RAG chain
chain = create_research_graph(rag_chain)
return chain
except Exception as e:
logger.error(f"Error initializing system: {e}")
raise
async def run_financial_analysis(query: str):
"""
Process financial analysis queries through the research graph.
Args:
query (str): The financial analysis query to process
Returns:
dict: Analysis results from multiple agents
"""
try:
# Initialize state with query
state = {
"messages": [HumanMessage(content=query)],
"team_members": ["Search", "SECAnalyst"],
"information_needed": [],
"reasoning": ""
}
# Process through research chain
result = research_chain.invoke(state)
return {
"status": "success",
"analysis": result.get("messages", [])
}
except Exception as e:
logger.error(f"Analysis error: {e}")
return {
"status": "error",
"message": str(e)
}
if __name__ == "__main__":
# Load environment variables
load_dotenv()
# Initialize the system
research_chain = init_financial_system()
This workflow:
- Initializes the research system with RAG capabilities
- Sets up the specialized agents and their tools
- Creates the research graph for coordinated analysis
- Processes queries through the team of agents
- Returns comprehensive financial analysis results
Agentverse Integration (register.py)
The Financial Analysis Agent registers itself with Agentverse and handles incoming analysis requests:
import os
import logging
from flask import Flask, request, jsonify
from fetchai.crypto import Identity
from fetchai.registration import register_with_agentverse
from fetchai.communication import parse_message_from_agent, send_message_to_agent
from main import init_financial_system
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize Flask app
app = Flask(__name__)
# Global variables
financial_identity = None
research_chain = None
def init_agent():
"""Initialize and register the agent with agentverse"""
global financial_identity, research_chain
try:
# Initialize the research chain
research_chain = init_financial_system()
# Initialize identity and register with Agentverse
financial_identity = Identity.from_seed("Financial Analysis Agent", 0)
# Register with detailed capabilities description
register_with_agentverse(
identity=financial_identity,
url="http://localhost:5008/webhook",
agentverse_token=os.getenv("AGENTVERSE_API_KEY"),
agent_title="Financial Analysis Agent",
readme = """
<description>A comprehensive financial analysis agent that combines
SEC filing analysis with real-time market data.</description>
<use_cases>
<use_case>Analyze company financial metrics from SEC filings</use_case>
<use_case>Research market trends and analyst opinions</use_case>
<use_case>Compare financial performance with competitors</use_case>
</use_cases>
<payload_requirements>
<payload>
<requirement>
<parameter>query</parameter>
<description>What would you like to know about the company's financials?</description>
</requirement>
</payload>
</payload_requirements>
"""
)
logger.info("Financial Analysis Agent registered successfully!")
except Exception as e:
logger.error(f"Error initializing agent: {e}")
raise
@app.route('/webhook', methods=['POST'])
async def webhook():
"""Handle incoming requests from other agents"""
try:
data = request.get_data().decode('utf-8')
message = parse_message_from_agent(data)
query = message.payload.get("request", "")
agent_address = message.sender
if not query:
return jsonify({"status": "error", "message": "No query provided"}), 400
# Process query using research chain
result = research_chain.invoke({
"messages": [HumanMessage(content=query)],
"team_members": ["Search", "SECAnalyst"]
})
# Format response
formatted_result = {
"analysis": [
{
"role": msg.type if hasattr(msg, 'type') else "message",
"content": msg.content,
"name": msg.name if hasattr(msg, 'name') else None
}
for msg in result.get('messages', [])
]
}
# Send response back through Agentverse
send_message_to_agent(
financial_identity,
agent_address,
{'analysis_result': formatted_result}
)
return jsonify({"status": "analysis_sent"})
except Exception as e:
logger.error(f"Error in webhook: {e}")
return jsonify({"status": "error", "message": str(e)}), 500
def run_agent():
"""Initialize and start the agent"""
try:
init_agent()
app.run(host="0.0.0.0", port=5008, debug=True)
except Exception as e:
logger.error(f"Error running agent: {e}")
raise
Step-by-Step Execution
-
Environment Setup
# Create and activate virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install requirements
pip install -r requirements.txt -
Configure Environment Variables Create
.env
file with required API keys:OPENAI_API_KEY=<your_openai_api_key>
TAVILY_API_KEY=<your_tavily_api_key>
AGENTVERSE_API_KEY=<your_agentverse_api_key> -
Start the Financial Analysis Agent
python register.py
This will:
- Initialize the RAG system
- Register the agent with Agentverse
- Start the Flask server on port 5008
-
Verify Agent Registration
- Check logs for successful registration message
- Verify agent appears in Agentverse registry
User Agent Script (user_agent.py)
The User Agent serves as a client. It registers itself with Agentverse, can search for existing agents, and forwards financial analysis queries to the Financial Analysis Agent. Afterwards, it retrieves the response and processes it.
Script Breakdown
Importing Required Libraries
The script imports libraries for Flask-based HTTP handling, Fetch.ai's identity and communication utilities, and environment variable management:
from flask import Flask, request, jsonify
from flask_cors import CORS
from fetchai.crypto import Identity
from fetchai.registration import register_with_agentverse
from fetchai.communication import parse_message_from_agent, send_message_to_agent
from fetchai import fetch
import logging
import os
from dotenv import load_dotenv
Setting Up Flask and Logging
Logging and Flask are initialised to handle web requests and log agent activities:
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
app = Flask(__name__)
CORS(app, resources={r"/api/*": {'origins': 'http://localhost:5174'}})
# Global variables for client identity and responses
primary_agent = None
Initialising the User Agent
The PrimaryAgent class handles initialization and registration with Fetch.ai's Agentverse:
class PrimaryAgent:
def __init__(self):
self.identity = None
self.latest_response = None
def initialize(self):
try:
self.identity = Identity.from_seed(os.getenv("PRIMARY_AGENT_KEY"), 0)
register_with_agentverse(
identity=self.identity,
url="http://localhost:5001/webhook",
agentverse_token=os.getenv("AGENTVERSE_API_KEY"),
agent_title="Financial Query Router",
readme="<description>Routes queries to Financial Analysis Agent</description>"
)
logger.info("Primary agent initialized successfully!")
except Exception as e:
logger.error(f"Initialization error: {e}")
raise
Searching for Agents
The /api/search-agents endpoint allows the User Agent to search for agents in Agentverse based on a query:
@app.route('/api/search-agents', methods=['GET'])
def search_agents():
"""Search for available agents based on the financial query"""
try:
query = request.args.get('query', '')
if not query:
return jsonify({"error": "Query parameter 'query' is required."}), 400
logger.info(f"Searching for agents with query: {query}")
available_ais = fetch.ai(query)
agents = available_ais.get('ais', [])
extracted_data = [
{
'name': agent.get('name'),
'address': agent.get('address')
}
for agent in agents
]
logger.info(f"Found {len(extracted_data)} agents matching the query")
return jsonify(extracted_data), 200
except Exception as e:
logger.error(f"Error finding agents: {e}")
return jsonify({"error": str(e)}), 500
Sending Data to Financial Analysis Agent
The /api/send-request endpoint forwards financial analysis queries to the selected agent:
@app.route('/api/send-request', methods=['POST'])
def send_request():
try:
data = request.json
payload = data.get('payload', {})
user_input = payload.get('request')
agent_address = data.get('agentAddress')
if not user_input:
return jsonify({"error": "No input provided"}), 400
send_message_to_agent(
primary_agent.identity,
agent_address,
{
"request": user_input
}
)
return jsonify({
"status": "request_sent",
"agent_address": agent_address,
"payload": payload
})
except Exception as e:
logger.error(f"Error processing request: {e}")
return jsonify({"error": str(e)}), 500
Retrieving the Agent's Response
The /api/get-response endpoint retrieves the analysis from the Financial Analysis Agent:
@app.route('/api/get-response', methods=['GET'])
def get_response():
try:
if primary_agent.latest_response:
response = primary_agent.latest_response
primary_agent.latest_response = None
return jsonify(response)
return jsonify({"status": "waiting"})
except Exception as e:
logger.error(f"Error getting response: {e}")
return jsonify({"error": str(e)}), 500
Webhook Endpoint
The webhook endpoint processes messages sent by the Financial Analysis Agent:
@app.route('/webhook', methods=['POST'])
def webhook():
try:
data = request.get_data().decode("utf-8")
message = parse_message_from_agent(data)
primary_agent.latest_response = message.payload
return jsonify({"status": "success"})
except Exception as e:
logger.error(f"Error in webhook: {e}")
return jsonify({"error": str(e)}), 500
Running the User Agent
The Flask server runs on port 5001 to handle requests:
if __name__ == "__main__":
load_dotenv()
primary_agent.initialize()
app.run(host="0.0.0.0", port=5001)
Interacting with the Agents
Search for Financial Analysis Agents
To list agents in the Agentverse:
curl -X GET "http://localhost:5001/api/search-agents?query=Analyze%20Apple's%20supply%20chain%20risks"
Sample Output
[
{
"name": "Financial Analysis Agent",
"address": "agent1qvkq8x25tvuzclq6v34skqryzsfmqdd6snv8apu3yutc4fylkm4u5e9rsxf"
},
{
"name": "Dashboard Analytics Frontend Client",
"address": "agent1qthka4n7q0m7zwegq0qg5p3aaw5x309e8swc2nttnf3pxd3tusdwzch6ncn"
}
]
Send Analysis Request
Send a financial analysis query:
curl -X POST "http://localhost:5001/api/send-request" \
-H "Content-Type: application/json" \
-d '{
"payload": {
"request": "What are Apple'\''s recent revenue trends and market performance?"
},
"agentAddress": "agent1qvkq8x25tvuzclq6v34skqryzsfmqdd6snv8apu3yutc4fylkm4u5e9rsxf"
}'
Sample Output
{
"status": "request_sent",
"agent_address": "agent1qvkq8x25tvuzclq6v34skqryzsfmqdd6snv8apu3yutc4fylkm4u5e9rsxf",
"payload": {
"request": "What are Apple's recent revenue trends and market performance?"
}
}
Retrieve the Analysis Response
Fetch the analysis response:
curl -X GET "http://localhost:5001/api/get-response"
Sample Output
{
"analysis_result": {
"analysis": [
{
"content": "Information needed:\n Historical revenue trends for Apple over the last few quarters\n \n Query: What are Apple's recent revenue trends and market performance?",
"name": null,
"role": "human"
}
]
}
}
You now have a LangGraph-based Financial Analysis Agent integrated with the Fetch.ai Agentverse, handling complex financial analysis tasks through a team of specialized agents (Search Agent and SEC Analyst). Feel free to adapt the LangGraph workflow, state management, and agent interactions to suit your own requirements—whether you're adding new analysis capabilities, expanding the agent team, or customizing the financial analysis patterns.