FET Image Agent Payment Protocol Example
This example showcases a paid image generation service where users pay with native FET tokens directly on the Fetch.ai blockchain. The agent verifies on-chain transactions before generating images using Gemini Imagen and delivers them through the chat protocol. This implementation demonstrates direct blockchain payment integration without third-party payment processors.
What it demonstrates
- On-chain FET payment verification using Fetch.ai ledger queries.
- Seller role implementation in the payment protocol workflow.
- Integration with Gemini Imagen for image generation (Google AI).
- Automatic image delivery as chat resources via tmpfiles.org.
- Robust error handling and one retry without re-payment on failure.
Project Structure
Key files used in this example:
agent.py– Agent setup; includes chat and payment protocolschat_proto.py– Chat protocol and message handlerspayment.py– Seller-side payment logic (request, verify FET payment, generate image)client.py– Gemini Imagen image generation and tmpfiles.org uploadshared.py– Shared utilities for creating chat messages
Core Dependencies
The payment protocol requires these imports:
from uagents_core.contrib.protocols.payment import (
Funds,
RequestPayment,
RejectPayment,
CommitPayment,
CancelPayment,
CompletePayment,
payment_protocol_spec,
)
Additionally, for blockchain verification:
from cosmpy.aerial.client import LedgerClient, NetworkConfig
On-Chain Payment Verification
Unlike token-based payment systems, this example performs direct blockchain verification by querying transaction events on the Fetch.ai network:
def verify_fet_payment_to_agent(
transaction_id: str,
expected_amount_fet: str,
sender_fet_address: str,
recipient_agent_wallet,
logger,
) -> bool:
try:
from cosmpy.aerial.client import LedgerClient, NetworkConfig
testnet = os.getenv("FET_USE_TESTNET", "true").lower() == "true"
network_config = (
NetworkConfig.fetchai_stable_testnet()
if testnet
else NetworkConfig.fetchai_mainnet()
)
ledger = LedgerClient(network_config)
expected_amount_micro = int(float(expected_amount_fet) * 10**18)
logger.info(
f"Verifying payment of {expected_amount_fet} FET from {sender_fet_address} "
f"to {recipient_agent_wallet.address()}"
)
tx_response = ledger.query_tx(transaction_id)
if not tx_response.is_successful():
logger.error(f"Transaction {transaction_id} was not successful")
return False
recipient_found = False
amount_found = False
sender_found = False
denom = "atestfet" if testnet else "afet"
expected_recipient = str(recipient_agent_wallet.address())
for event_type, event_attrs in tx_response.events.items():
if event_type == "transfer":
if event_attrs.get("recipient") == expected_recipient:
recipient_found = True
if event_attrs.get("sender") == sender_fet_address:
sender_found = True
amount_str = event_attrs.get("amount", "")
if amount_str and amount_str.endswith(denom):
try:
amount_value = int(amount_str.replace(denom, ""))
if amount_value >= expected_amount_micro:
amount_found = True
except Exception:
pass
if recipient_found and amount_found and sender_found:
logger.info(f"Payment verified: {transaction_id}")
return True
logger.error(
f"Payment verification failed - recipient: {recipient_found}, "
f"amount: {amount_found}, sender: {sender_found}"
)
return False
except Exception as e:
logger.error(f"FET payment verification failed: {e}")
return False
Payment Protocol and Image Generation
The payment protocol uses FET_FUNDS, set_agent_wallet, request_payment_from_user, and handles CommitPayment / RejectPayment. After a verified payment, generate_image_after_payment calls Gemini Imagen and process_image_result uploads to tmpfiles.org and sends the image as a chat resource:
"""Payment protocol for Gemini Imagen image generation agent."""
import base64
import os
import time
from datetime import datetime, timezone
from uuid import uuid4
from uagents import Context, Protocol
from uagents_core.contrib.protocols.payment import (
CancelPayment,
CommitPayment,
CompletePayment,
Funds,
RejectPayment,
RequestPayment,
payment_protocol_spec,
)
from uagents_core.contrib.protocols.chat import (
ChatMessage as AvChatMessage,
Resource,
ResourceContent,
TextContent,
)
from shared import create_text_chat
_agent_wallet = None
def set_agent_wallet(wallet):
global _agent_wallet
_agent_wallet = wallet
payment_proto = Protocol(spec=payment_protocol_spec, role="seller")
FET_FUNDS = Funds(currency="FET", amount="0.1", payment_method="fet_direct")
ACCEPTED_FUNDS = [FET_FUNDS]
# ... verify_fet_payment_to_agent(...) as above ...
async def request_payment_from_user(
ctx: Context, user_address: str, description: str | None = None
):
accepted_funds = [FET_FUNDS]
metadata = {}
fet_network = (
"stable-testnet"
if os.getenv("FET_USE_TESTNET", "true").lower() == "true"
else "mainnet"
)
if _agent_wallet:
metadata["provider_agent_wallet"] = str(_agent_wallet.address())
metadata["fet_network"] = fet_network
if description:
metadata["content"] = description
else:
metadata["content"] = (
"Please complete the payment to proceed. "
"After payment, I will generate one image from your prompt."
)
payment_request = RequestPayment(
accepted_funds=accepted_funds,
recipient=str(_agent_wallet.address()) if _agent_wallet else "unknown",
deadline_seconds=300,
reference=str(uuid4()),
description=description
or "Gemini Imagen: after payment, I will generate one image from your prompt",
metadata=metadata,
)
ctx.logger.info(f"Sending payment request to {user_address}")
await ctx.send(user_address, payment_request)
def _allow_retry(ctx: Context, sender: str, session_id: str) -> bool:
retry_key = f"{sender}:{session_id}:retry_count"
try:
current = int(ctx.storage.get(retry_key) or 0)
except Exception:
current = 0
if current >= 1:
return False
ctx.storage.set(retry_key, current + 1)
ctx.storage.set(f"{sender}:{session_id}:awaiting_prompt", True)
ctx.storage.set(f"{sender}:{session_id}:verified_payment", True)
return True
@payment_proto.on_message(CommitPayment)
async def handle_commit_payment(ctx: Context, sender: str, msg: CommitPayment):
ctx.logger.info(f"Received payment commitment from {sender}")
payment_verified = False
if msg.funds.payment_method == "fet_direct" and msg.funds.currency == "FET":
try:
buyer_fet_wallet = None
if isinstance(msg.metadata, dict):
buyer_fet_wallet = msg.metadata.get("buyer_fet_wallet") or msg.metadata.get("buyer_fet_address")
if not buyer_fet_wallet:
ctx.logger.error("Missing buyer_fet_wallet in CommitPayment.metadata")
else:
payment_verified = verify_fet_payment_to_agent(
transaction_id=msg.transaction_id,
expected_amount_fet=FET_FUNDS.amount,
sender_fet_address=buyer_fet_wallet,
recipient_agent_wallet=_agent_wallet,
logger=ctx.logger,
)
except Exception as e:
ctx.logger.error(f"FET verify error: {e}")
else:
ctx.logger.error(f"Unsupported payment method: {msg.funds.payment_method}")
if payment_verified:
ctx.logger.info(f"Payment verified successfully from {sender}")
await ctx.send(sender, CompletePayment(transaction_id=msg.transaction_id))
await generate_image_after_payment(ctx, sender)
else:
ctx.logger.error(f"Payment verification failed from {sender}")
await ctx.send(
sender,
CancelPayment(
transaction_id=msg.transaction_id,
reason="Payment verification failed",
),
)
@payment_proto.on_message(RejectPayment)
async def handle_reject_payment(ctx: Context, sender: str, msg: RejectPayment):
ctx.logger.info(f"Payment rejected by {sender}: {msg.reason}")
await ctx.send(
sender,
create_text_chat(
"Sorry, you denied the payment. Reply again and I'll send a new payment request."
),
)
async def generate_image_after_payment(ctx: Context, user_address: str):
from client import run_gemini_image_blocking
session_id = str(ctx.session)
prompt = ctx.storage.get(f"prompt:{user_address}:{session_id}") or ctx.storage.get("current_prompt")
if not prompt:
ctx.logger.error("No prompt found in storage")
await ctx.send(user_address, create_text_chat("Error: No prompt found"))
return
ctx.logger.info(f"Generating image for verified payment: {prompt}")
try:
result = run_gemini_image_blocking(prompt=prompt)
ctx.logger.info(
f"Generation result: status={result.get('status')}, has_image={bool(result.get('image_data'))}"
)
await process_image_result(ctx, user_address, result)
except Exception as e:
ctx.logger.error(f"Image generation error: {e}")
await ctx.send(user_address, create_text_chat(f"Error generating image: {e}"))
async def process_image_result(ctx: Context, sender: str, result: dict):
session_id = str(ctx.session)
if result.get("status") == "failed" or "error" in result:
err = result.get("error", "Unknown error")
await ctx.send(sender, create_text_chat(f"Error: {err}"))
if _allow_retry(ctx, sender, session_id):
await ctx.send(
sender,
create_text_chat(
"Generation failed, but your payment is valid. "
"Send your image prompt again — you won't be charged again."
),
)
return
image_bytes = result.get("image_data")
content_type = result.get("content_type", "image/png")
if not image_bytes:
await ctx.send(sender, create_text_chat("Image generated but could not retrieve bytes"))
if _allow_retry(ctx, sender, session_id):
await ctx.send(
sender,
create_text_chat(
"Delivery failed, but your payment is valid. Send your prompt again — no extra charge."
),
)
return
from client import upload_image_to_tmpfiles
image_url = await upload_image_to_tmpfiles(image_bytes, content_type)
if not image_url:
image_url = f"data:{content_type};base64,{base64.b64encode(image_bytes).decode()}"
elif image_url.startswith("http://"):
image_url = "https://" + image_url[7:]
try:
await ctx.send(
sender,
AvChatMessage(
timestamp=datetime.now(timezone.utc),
msg_id=uuid4(),
content=[
TextContent(type="text", text="Here is your generated image."),
ResourceContent(
type="resource",
resource_id=uuid4(),
resource=Resource(
uri=image_url,
metadata={"mime_type": content_type, "role": "image"},
),
),
],
),
)
ctx.storage.remove(f"{sender}:{session_id}:retry_count")
ctx.logger.info("Image sent successfully")
except Exception as e:
ctx.logger.error(f"Failed to send image: {e}")
if _allow_retry(ctx, sender, session_id):
await ctx.send(
sender,
create_text_chat(
"Could not send image, but your payment is valid. Send your prompt again — no extra charge."
),
)
else:
await ctx.send(
sender,
create_text_chat("Could not send image. Please try again or start a new session."),
)
Image Generation Service
The agent uses Gemini Imagen for image generation and tmpfiles.org for temporary hosting:
"""Gemini Imagen image generation client."""
from __future__ import annotations
import asyncio
import os
from io import BytesIO
from typing import Any, Optional
import aiohttp
from dotenv import load_dotenv
load_dotenv()
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
IMAGEN_MODEL = os.getenv("GEMINI_IMAGEN_MODEL", "imagen-4.0-fast-generate-001")
TMPFILES_API_URL = "https://tmpfiles.org/api/v1/upload"
async def upload_image_to_tmpfiles(image_data: bytes, content_type: str) -> Optional[str]:
"""Upload image to tmpfiles.org and return the public download URL (https)."""
ext = "png" if "png" in content_type else "jpg"
for attempt in range(3):
try:
form = aiohttp.FormData()
form.add_field("file", image_data, filename=f"image.{ext}", content_type=content_type)
async with aiohttp.ClientSession() as session:
async with session.post(
TMPFILES_API_URL, data=form, timeout=aiohttp.ClientTimeout(total=30)
) as resp:
if resp.status != 200:
continue
data = await resp.json()
if data.get("status") == "success" and data.get("data"):
url = (data["data"].get("url") or data.get("url") or "").strip()
if url and url.startswith("http"):
if url.startswith("http://"):
url = "https://" + url[7:]
if "tmpfiles.org/" in url and "/dl/" not in url:
url = url.replace("https://tmpfiles.org/", "https://tmpfiles.org/dl/", 1)
return url
except Exception:
if attempt < 2:
await asyncio.sleep(2 * (attempt + 1))
return None
def run_gemini_image_blocking(*, prompt: str) -> dict[str, Any] | None:
"""Generate one image from a text prompt using Gemini Imagen."""
if not GEMINI_API_KEY:
return {"error": "GEMINI_API_KEY is not set", "status": "failed"}
try:
from google import genai
from google.genai import types
except ImportError:
return {"error": "google-genai not installed. pip install google-genai", "status": "failed"}
try:
client = genai.Client(api_key=GEMINI_API_KEY)
response = client.models.generate_images(
model=IMAGEN_MODEL,
prompt=prompt.strip(),
config=types.GenerateImagesConfig(number_of_images=1, aspect_ratio="1:1"),
)
except Exception as e:
return {"error": str(e), "status": "failed"}
if not response.generated_images or len(response.generated_images) == 0:
return {"error": "No image generated", "status": "failed"}
gen = response.generated_images[0]
image_obj = getattr(gen, "image", None)
if image_obj is None:
return {"error": "Generated image has no image attribute", "status": "failed"}
image_data = None
content_type = "image/png"
img_bytes = getattr(image_obj, "image_bytes", None)
if img_bytes is not None:
image_data = img_bytes if isinstance(img_bytes, bytes) else bytes(img_bytes)
else:
try:
if hasattr(image_obj, "save"):
buf = BytesIO()
image_obj.save(buf, format="PNG")
image_data = buf.getvalue()
except Exception as e:
return {"error": f"Could not extract image bytes: {e}", "status": "failed"}
if not image_data:
return {"error": "Could not extract image bytes", "status": "failed"}
return {"image_data": image_data, "content_type": content_type, "status": "success"}
Chat Message Processing
The chat protocol handles incoming messages and coordinates payment flow:
from datetime import datetime, timezone
from uagents import Context, Protocol
from uagents_core.contrib.protocols.chat import (
ChatAcknowledgement,
ChatMessage,
TextContent,
chat_protocol_spec,
)
from payment import (
request_payment_from_user,
generate_image_after_payment,
)
from shared import create_text_chat
chat_proto = Protocol(spec=chat_protocol_spec)
@chat_proto.on_message(ChatMessage)
async def handle_message(ctx: Context, sender: str, msg: ChatMessage):
ctx.logger.info(f"Got a message from {sender}: {msg.content}")
await ctx.send(
sender,
ChatAcknowledgement(
timestamp=datetime.now(timezone.utc), acknowledged_msg_id=msg.msg_id
),
)
for item in msg.content:
if isinstance(item, TextContent):
text = item.text.strip()
session_id = str(ctx.session)
awaiting_key = f"{sender}:{session_id}:awaiting_prompt"
verified_key = f"{sender}:{session_id}:verified_payment"
if (ctx.storage.has(awaiting_key) or ctx.storage.get(awaiting_key)) and (
ctx.storage.has(verified_key) or ctx.storage.get(verified_key)
):
ctx.logger.info("Consuming prompt post-payment and generating one image")
ctx.storage.remove(awaiting_key)
ctx.storage.remove(verified_key)
ctx.storage.set(f"prompt:{sender}:{session_id}", text)
ctx.storage.set("requesting_user", sender)
await generate_image_after_payment(ctx, sender)
return
ctx.logger.info(f"Requesting payment from {sender} for image generation")
payment_description = "Please complete the payment to generate this image."
ctx.storage.set(f"prompt:{sender}:{session_id}", text)
ctx.storage.set("current_prompt", text)
ctx.storage.remove(f"{sender}:{session_id}:request_recorded")
await request_payment_from_user(ctx, sender, description=payment_description)
return
@chat_proto.on_message(ChatAcknowledgement)
async def handle_ack(ctx: Context, sender: str, msg: ChatAcknowledgement):
ctx.logger.info(f"Got an acknowledgement from {sender} for {msg.acknowledged_msg_id}")
Helper Functions
Shared utilities for creating chat messages:
"""
Shared utilities to avoid circular imports between chat_proto and payment_proto.
"""
from datetime import datetime, timezone
from uuid import uuid4
from uagents_core.contrib.protocols.chat import (
AgentContent,
ChatMessage,
TextContent,
)
def create_text_chat(text: str, end_session: bool = False) -> ChatMessage:
"""Create a text chat message."""
content: list[AgentContent] = [TextContent(type="text", text=text)]
if end_session:
from uagents_core.contrib.protocols.chat import EndSessionContent
content.append(EndSessionContent(type="end-session"))
return ChatMessage(
timestamp=datetime.now(timezone.utc),
msg_id=uuid4(),
content=content,
)
Agent Initialization
The main agent file sets up protocols and wallet using environment-based configuration:
import os
from dotenv import load_dotenv
from uagents import Agent, Context
load_dotenv()
from chat_proto import chat_proto
from payment import payment_proto, set_agent_wallet
agent = Agent(
name=os.getenv("AGENT_NAME", "Fet Example Agent"),
seed=os.getenv("AGENT_SEED_PHRASE", "gemini-imagen-agent"),
port=int(os.getenv("AGENT_PORT", "8000")),
mailbox=True,
)
agent.include(chat_proto, publish_manifest=True)
agent.include(payment_proto, publish_manifest=True)
set_agent_wallet(agent.wallet)
@agent.on_event("startup")
async def startup(ctx: Context):
ctx.logger.info(f"Agent started: {agent.wallet.address()}")
ctx.logger.info("=== Gemini Imagen Image Generation Agent ===")
ctx.logger.info("💰 Accepted: 0.1 FET (direct)")
ctx.logger.info("🎨 Images via Gemini Imagen (GEMINI_API_KEY)")
ctx.logger.info("📧 Chat to request image generation")
if __name__ == "__main__":
agent.run()
Requirements
Install dependencies from requirements.txt:
uagents==0.23.6
uagents-core==0.4.0
python-dotenv
google-genai
Pillow
Configuration
Copy .env.example to .env and set your credentials:
# Agent identity (optional; defaults shown)
AGENT_NAME=Fet Example Agent
AGENT_SEED_PHRASE=gemini-imagen-agent
AGENT_PORT=8000
# Gemini API for image generation (required)
GEMINI_API_KEY=your_gemini_api_key_here
# Optional: Gemini Imagen model name
# GEMINI_IMAGEN_MODEL=imagen-4.0-fast-generate-001
# Fetch.ai network (true = testnet, false = mainnet)
FET_USE_TESTNET=true
Create a .env file with required credentials:
# Copy from .env.example and fill in values
cp .env.example .env
# Edit .env and set GEMINI_API_KEY, etc.
Installation & Execution
Set up the environment and run the agent:
# Create virtual environment
python3 -m venv .venv && source .venv/bin/activate
# Install dependencies from requirements.txt
pip install -r requirements.txt
# Configure environment (copy .env.example to .env and set GEMINI_API_KEY, etc.)
cp .env.example .env
# Start the agent
python3 agent.py
Usage Flow
- User sends message: Submit an image prompt via chat
- Payment request: Agent responds with a payment request for 0.1 FET
- User pays: Buyer completes on-chain FET transfer
- Verification: Agent verifies transaction on Fetch.ai ledger
- Generation: Gemini Imagen creates the image (GEMINI_API_KEY)
- Delivery: Image uploaded to tmpfiles.org and sent as chat resource
Implementation Details
Payment Verification Setup
- Install blockchain library:
pip install cosmpy - Network configuration: Set
FET_USE_TESTNETin.env(true for testnet, false for mainnet) - Payment definition: In
payment.py, defineFET_FUNDS = Funds(currency="FET", amount="0.1", payment_method="fet_direct") - Buyer metadata: On
CommitPayment, buyer must providebuyer_fet_walletinmsg.metadata - On-chain check: Agent uses
LedgerClient.query_tx()to verify transaction events match expected transfer
Key Implementation Features
- Blockchain Integration: Direct on-chain verification without payment intermediaries
- Gemini Imagen: Image generation via Google AI (
GEMINI_API_KEY); tmpfiles.org for hosting - State Management: Session-based storage tracks payment status and prompts
- Error Recovery: Failed generations allow one retry without additional payment (
_allow_retry) - Resource Protocol: Images delivered as
ResourceContentwith "Here is your generated image." and tmpfiles.org URLs
The screenshots below show the Fet Example Agent flow: (1) payment request with Reject / Approve FET Payment, and (2) the generated image delivered after successful payment.
1. Payment request — Agent asks for 0.1 FET on Dorado; user can approve or reject.

2. Generated image — After payment, the agent returns the generated image (e.g. smartwatch on white background).

Getting FET Testnet Tokens
To test this agent on the Fetch.ai testnet, you'll need testnet FET tokens in your wallet. Here's how to get them:
-
Visit the Testnet Faucet: Go to the official Fetch.ai Testnet Faucet to request testnet tokens
-
Wait for Confirmation: Testnet tokens are typically distributed automatically after a short delay
For detailed instructions and a step-by-step visual guide, check out this blog post or watch the video below: