What is an AI Agent? - An autonomous program that makes decisions and takes actions based on blockchain data without constant human oversight
Agent Types You Can Build
Portfolio Monitor
Track portfolio changes and send alerts
- Balance threshold notifications
- Significant movement alerts
- Daily/weekly summaries
- Performance tracking
Transaction Watcher
Monitor blockchain activity
- New transaction alerts
- Large transfer detection
- DeFi interaction tracking
- Gas fee optimization
Yield Optimizer
Analyze DeFi positions
- Track yield rates
- Compare protocols
- Alert on better opportunities
- Risk assessment
Tax Reporter
Automated tax tracking
- Transaction categorization
- Income/expense calculation
- Quarterly reports
- Export generation
Basic Portfolio Monitor Agent
Here’s a complete, production-ready portfolio monitoring agent:Copy
import requests
import time
from datetime import datetime
from typing import Dict, List, Optional
import os
class PortfolioMonitorAgent:
"""Autonomous agent that monitors crypto portfolios using Octav API"""
def __init__(self, api_key: str, check_interval: int = 300):
self.api_key = api_key
self.base_url = 'https://api.octav.fi'
self.check_interval = check_interval # seconds
self.headers = {'Authorization': f'Bearer {api_key}'}
self.last_values: Dict[str, float] = {}
def get_portfolio(self, address: str) -> Optional[Dict]:
"""Fetch current portfolio data"""
try:
response = requests.get(
f'{self.base_url}/v1/portfolio?addresses={address}',
headers=self.headers,
timeout=30
)
response.raise_for_status()
return response.json()[0]
except requests.exceptions.RequestException as e:
print(f"Error fetching portfolio: {e}")
return None
def check_balance_threshold(self, address: str, min_balance: float = 1000):
"""Alert if portfolio falls below threshold"""
portfolio = self.get_portfolio(address)
if not portfolio:
return
current_value = float(portfolio['networth'])
if current_value < min_balance:
self.send_alert(
f"⚠️ Low Balance Alert\n"
f"Address: {address[:10]}...\n"
f"Current: ${current_value:,.2f}\n"
f"Threshold: ${min_balance:,.2f}"
)
def check_significant_change(self, address: str, threshold_pct: float = 5.0):
"""Alert on significant portfolio value changes"""
portfolio = self.get_portfolio(address)
if not portfolio:
return
current_value = float(portfolio['networth'])
last_value = self.last_values.get(address)
if last_value:
change = current_value - last_value
change_pct = (change / last_value) * 100
if abs(change_pct) >= threshold_pct:
direction = "📈" if change > 0 else "📉"
self.send_alert(
f"{direction} Significant Change Detected\n"
f"Address: {address[:10]}...\n"
f"Change: {change_pct:+.2f}%\n"
f"From: ${last_value:,.2f}\n"
f"To: ${current_value:,.2f}"
)
self.last_values[address] = current_value
def generate_daily_summary(self, address: str):
"""Generate and send daily portfolio summary"""
portfolio = self.get_portfolio(address)
if not portfolio:
return
# Calculate chain distribution
chains_summary = []
for chain_key, chain in portfolio['chains'].items():
chains_summary.append(
f" {chain['name']}: ${float(chain['value']):,.2f}"
)
# Calculate protocol distribution
protocols_summary = []
for protocol_key, protocol in portfolio['assetByProtocols'].items():
protocols_summary.append(
f" {protocol['name']}: ${float(protocol['value']):,.2f}"
)
summary = f"""
📊 Daily Portfolio Summary
Address: {address[:10]}...
Total Value: ${float(portfolio['networth']):,.2f}
Chains:
{chr(10).join(chains_summary)}
Protocols:
{chr(10).join(protocols_summary)}
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
self.send_alert(summary.strip())
def monitor(self, addresses: List[str], config: Dict):
"""Main monitoring loop"""
print(f"Starting portfolio monitor for {len(addresses)} addresses")
print(f"Check interval: {self.check_interval}s")
while True:
try:
for address in addresses:
# Run configured checks
if config.get('check_threshold'):
self.check_balance_threshold(
address,
config.get('min_balance', 1000)
)
if config.get('check_changes'):
self.check_significant_change(
address,
config.get('change_threshold', 5.0)
)
# Daily summary at midnight
if config.get('daily_summary'):
now = datetime.now()
if now.hour == 0 and now.minute < 5:
self.generate_daily_summary(address)
# Respect rate limits
time.sleep(2)
time.sleep(self.check_interval)
except KeyboardInterrupt:
print("\nMonitoring stopped by user")
break
except Exception as e:
print(f"Error in monitoring loop: {e}")
time.sleep(60) # Wait before retrying
def send_alert(self, message: str):
"""Send alert notification (implement your preferred method)"""
print(f"\n{'='*50}")
print(message)
print(f"{'='*50}\n")
# TODO: Implement your notification method:
# - Email via SendGrid/Mailgun
# - SMS via Twilio
# - Slack/Discord webhook
# - Telegram bot
# - Push notification service
# Usage Example
if __name__ == "__main__":
agent = PortfolioMonitorAgent(
api_key=os.getenv('OCTAV_API_KEY'),
check_interval=300 # Check every 5 minutes
)
# Configure monitoring
config = {
'check_threshold': True,
'min_balance': 5000,
'check_changes': True,
'change_threshold': 3.0, # Alert on 3%+ changes
'daily_summary': True
}
# Start monitoring
addresses = [
'0x123...', # Your addresses
'0x456...',
]
agent.monitor(addresses, config)
Transaction Monitoring Agent
Monitor and categorize new transactions:Copy
class TransactionMonitorAgent:
"""Monitor new transactions and categorize them"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = 'https://api.octav.fi'
self.headers = {'Authorization': f'Bearer {api_key}'}
self.seen_hashes = set()
def get_transactions(self, address: str, limit: int = 50) -> List[Dict]:
"""Fetch recent transactions"""
try:
response = requests.get(
f'{self.base_url}/v1/transactions?addresses={address}&limit={limit}&sort=DESC',
headers=self.headers
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error fetching transactions: {e}")
return []
def check_new_transactions(self, address: str):
"""Check for and process new transactions"""
transactions = self.get_transactions(address)
new_txs = [
tx for tx in transactions
if tx['hash'] not in self.seen_hashes
]
if new_txs:
for tx in new_txs:
self.process_transaction(tx, address)
self.seen_hashes.add(tx['hash'])
def process_transaction(self, tx: Dict, address: str):
"""Process and categorize a transaction"""
tx_type = tx['txType']
value = sum(float(asset.get('value', 0)) for asset in tx.get('assets', []))
# Alert on large transactions
if value > 1000:
self.send_alert(
f"💰 Large Transaction Detected\n"
f"Type: {tx_type}\n"
f"Value: ${value:,.2f}\n"
f"Hash: {tx['hash'][:16]}...\n"
f"Chain: {tx['chainKey']}"
)
# Alert on specific transaction types
if tx_type in ['TRANSFEROUT', 'SWAP']:
self.send_alert(
f"📤 Outgoing Transaction\n"
f"Type: {tx_type}\n"
f"Value: ${value:,.2f}"
)
elif tx_type in ['CLAIM', 'AIRDROP']:
self.send_alert(
f"🎁 Reward Claimed\n"
f"Type: {tx_type}\n"
f"Value: ${value:,.2f}"
)
Adding Notification Channels
Implement real notifications using popular services:- Email (SendGrid)
- Telegram
- Discord
- Slack
Copy
import os
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail
def send_email_alert(self, message: str):
"""Send email notification via SendGrid"""
email_message = Mail(
from_email='[email protected]',
to_emails='[email protected]',
subject='Portfolio Alert',
plain_text_content=message
)
try:
sg = SendGridAPIClient(os.getenv('SENDGRID_API_KEY'))
response = sg.send(email_message)
print(f"Email sent: {response.status_code}")
except Exception as e:
print(f"Error sending email: {e}")
Copy
import requests
def send_telegram_alert(self, message: str):
"""Send notification via Telegram bot"""
bot_token = os.getenv('TELEGRAM_BOT_TOKEN')
chat_id = os.getenv('TELEGRAM_CHAT_ID')
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
try:
response = requests.post(url, json={
'chat_id': chat_id,
'text': message,
'parse_mode': 'Markdown'
})
response.raise_for_status()
except Exception as e:
print(f"Error sending Telegram message: {e}")
Copy
import requests
def send_discord_alert(self, message: str):
"""Send notification via Discord webhook"""
webhook_url = os.getenv('DISCORD_WEBHOOK_URL')
try:
response = requests.post(webhook_url, json={
'content': message
})
response.raise_for_status()
except Exception as e:
print(f"Error sending Discord message: {e}")
Copy
import requests
def send_slack_alert(self, message: str):
"""Send notification via Slack webhook"""
webhook_url = os.getenv('SLACK_WEBHOOK_URL')
try:
response = requests.post(webhook_url, json={
'text': message
})
response.raise_for_status()
except Exception as e:
print(f"Error sending Slack message: {e}")
Deployment Options
Cloud Functions (Serverless)
Cloud Functions (Serverless)
AWS Lambda, Google Cloud Functions, Vercel FunctionsPerfect for scheduled monitoring:Schedule with CloudWatch Events (every 5 minutes)
Copy
# handler.py - AWS Lambda example
import json
from portfolio_monitor import PortfolioMonitorAgent
def lambda_handler(event, context):
agent = PortfolioMonitorAgent(os.getenv('OCTAV_API_KEY'))
addresses = event.get('addresses', [])
for address in addresses:
agent.check_balance_threshold(address, 1000)
agent.check_significant_change(address, 5.0)
return {
'statusCode': 200,
'body': json.dumps('Monitoring complete')
}
Docker Container
Docker Container
Run agent as a containerDeploy to:
Copy
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY agent.py .
CMD ["python", "agent.py"]
- AWS ECS/Fargate
- Google Cloud Run
- Digital Ocean App Platform
- Fly.io
VPS (Always Running)
VPS (Always Running)
Traditional server deploymentUse systemd service:Start with:
Copy
# /etc/systemd/system/portfolio-monitor.service
[Unit]
Description=Portfolio Monitor Agent
After=network.target
[Service]
Type=simple
User=ubuntu
WorkingDirectory=/home/ubuntu/portfolio-monitor
ExecStart=/usr/bin/python3 agent.py
Restart=always
RestartSec=10
Environment="OCTAV_API_KEY=your_key_here"
[Install]
WantedBy=multi-user.target
systemctl start portfolio-monitorBest Practices
Rate Limiting
Rate Limiting
Respect Octav’s 360 requests/minute limit:
Copy
from time import sleep
from collections import deque
from datetime import datetime, timedelta
class RateLimiter:
def __init__(self, max_requests=360, window=60):
self.max_requests = max_requests
self.window = window
self.requests = deque()
def wait_if_needed(self):
now = datetime.now()
cutoff = now - timedelta(seconds=self.window)
# Remove old requests
while self.requests and self.requests[0] < cutoff:
self.requests.popleft()
# Check limit
if len(self.requests) >= self.max_requests:
sleep_time = (self.requests[0] + timedelta(seconds=self.window) - now).total_seconds()
if sleep_time > 0:
sleep(sleep_time)
self.requests.append(now)
Error Handling
Error Handling
Implement robust error handling:
Copy
import time
from requests.exceptions import RequestException
def fetch_with_retry(self, url, max_retries=3):
"""Fetch with exponential backoff"""
for attempt in range(max_retries):
try:
response = requests.get(url, headers=self.headers, timeout=30)
response.raise_for_status()
return response.json()
except RequestException as e:
if attempt == max_retries - 1:
raise
wait_time = 2 ** attempt
print(f"Retry {attempt + 1}/{max_retries} in {wait_time}s")
time.sleep(wait_time)
Credit Management
Credit Management
Monitor API credit usage:
Copy
def check_credits(self):
"""Check remaining credits before heavy operations"""
response = requests.get(
f'{self.base_url}/v1/credits',
headers=self.headers
)
credits = response.json()
if credits < 100:
self.send_alert(f"⚠️ Low API credits: {credits} remaining")
return credits
Logging
Logging
Implement comprehensive logging:
Copy
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('agent.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
logger.info(f"Monitoring {len(addresses)} addresses")