Skip to main content
AI agents are autonomous programs that independently monitor portfolios, track transactions, and provide insights without human intervention. This guide shows you how to build production-ready agents using the Octav API.
What is an AI Agent? - An autonomous program that makes decisions and takes actions based on blockchain data without constant human oversight

Agent Types You Can Build

Portfolio Monitor

Track portfolio changes and send alerts
  • Balance threshold notifications
  • Significant movement alerts
  • Daily/weekly summaries
  • Performance tracking

Transaction Watcher

Monitor blockchain activity
  • New transaction alerts
  • Large transfer detection
  • DeFi interaction tracking
  • Gas fee optimization

Yield Optimizer

Analyze DeFi positions
  • Track yield rates
  • Compare protocols
  • Alert on better opportunities
  • Risk assessment

Tax Reporter

Automated tax tracking
  • Transaction categorization
  • Income/expense calculation
  • Quarterly reports
  • Export generation

Basic Portfolio Monitor Agent

Here’s a complete, production-ready portfolio monitoring agent:
import requests
import time
from datetime import datetime
from typing import Dict, List, Optional
import os

class PortfolioMonitorAgent:
    """Autonomous agent that monitors crypto portfolios using Octav API"""

    def __init__(self, api_key: str, check_interval: int = 300):
        self.api_key = api_key
        self.base_url = 'https://api.octav.fi'
        self.check_interval = check_interval  # seconds
        self.headers = {'Authorization': f'Bearer {api_key}'}
        self.last_values: Dict[str, float] = {}

    def get_portfolio(self, address: str) -> Optional[Dict]:
        """Fetch current portfolio data"""
        try:
            response = requests.get(
                f'{self.base_url}/v1/portfolio?addresses={address}',
                headers=self.headers,
                timeout=30
            )
            response.raise_for_status()
            return response.json()[0]
        except requests.exceptions.RequestException as e:
            print(f"Error fetching portfolio: {e}")
            return None

    def check_balance_threshold(self, address: str, min_balance: float = 1000):
        """Alert if portfolio falls below threshold"""
        portfolio = self.get_portfolio(address)
        if not portfolio:
            return

        current_value = float(portfolio['networth'])

        if current_value < min_balance:
            self.send_alert(
                f"⚠️ Low Balance Alert\n"
                f"Address: {address[:10]}...\n"
                f"Current: ${current_value:,.2f}\n"
                f"Threshold: ${min_balance:,.2f}"
            )

    def check_significant_change(self, address: str, threshold_pct: float = 5.0):
        """Alert on significant portfolio value changes"""
        portfolio = self.get_portfolio(address)
        if not portfolio:
            return

        current_value = float(portfolio['networth'])
        last_value = self.last_values.get(address)

        if last_value:
            change = current_value - last_value
            change_pct = (change / last_value) * 100

            if abs(change_pct) >= threshold_pct:
                direction = "📈" if change > 0 else "📉"
                self.send_alert(
                    f"{direction} Significant Change Detected\n"
                    f"Address: {address[:10]}...\n"
                    f"Change: {change_pct:+.2f}%\n"
                    f"From: ${last_value:,.2f}\n"
                    f"To: ${current_value:,.2f}"
                )

        self.last_values[address] = current_value

    def generate_daily_summary(self, address: str):
        """Generate and send daily portfolio summary"""
        portfolio = self.get_portfolio(address)
        if not portfolio:
            return

        # Calculate chain distribution
        chains_summary = []
        for chain_key, chain in portfolio['chains'].items():
            chains_summary.append(
                f"  {chain['name']}: ${float(chain['value']):,.2f}"
            )

        # Calculate protocol distribution
        protocols_summary = []
        for protocol_key, protocol in portfolio['assetByProtocols'].items():
            protocols_summary.append(
                f"  {protocol['name']}: ${float(protocol['value']):,.2f}"
            )

        summary = f"""
📊 Daily Portfolio Summary
Address: {address[:10]}...
Total Value: ${float(portfolio['networth']):,.2f}

Chains:
{chr(10).join(chains_summary)}

Protocols:
{chr(10).join(protocols_summary)}

Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
        """

        self.send_alert(summary.strip())

    def monitor(self, addresses: List[str], config: Dict):
        """Main monitoring loop"""
        print(f"Starting portfolio monitor for {len(addresses)} addresses")
        print(f"Check interval: {self.check_interval}s")

        while True:
            try:
                for address in addresses:
                    # Run configured checks
                    if config.get('check_threshold'):
                        self.check_balance_threshold(
                            address,
                            config.get('min_balance', 1000)
                        )

                    if config.get('check_changes'):
                        self.check_significant_change(
                            address,
                            config.get('change_threshold', 5.0)
                        )

                    # Daily summary at midnight
                    if config.get('daily_summary'):
                        now = datetime.now()
                        if now.hour == 0 and now.minute < 5:
                            self.generate_daily_summary(address)

                    # Respect rate limits
                    time.sleep(2)

                time.sleep(self.check_interval)

            except KeyboardInterrupt:
                print("\nMonitoring stopped by user")
                break
            except Exception as e:
                print(f"Error in monitoring loop: {e}")
                time.sleep(60)  # Wait before retrying

    def send_alert(self, message: str):
        """Send alert notification (implement your preferred method)"""
        print(f"\n{'='*50}")
        print(message)
        print(f"{'='*50}\n")

        # TODO: Implement your notification method:
        # - Email via SendGrid/Mailgun
        # - SMS via Twilio
        # - Slack/Discord webhook
        # - Telegram bot
        # - Push notification service

# Usage Example
if __name__ == "__main__":
    agent = PortfolioMonitorAgent(
        api_key=os.getenv('OCTAV_API_KEY'),
        check_interval=300  # Check every 5 minutes
    )

    # Configure monitoring
    config = {
        'check_threshold': True,
        'min_balance': 5000,
        'check_changes': True,
        'change_threshold': 3.0,  # Alert on 3%+ changes
        'daily_summary': True
    }

    # Start monitoring
    addresses = [
        '0x123...',  # Your addresses
        '0x456...',
    ]

    agent.monitor(addresses, config)

Transaction Monitoring Agent

Monitor and categorize new transactions:
class TransactionMonitorAgent:
    """Monitor new transactions and categorize them"""

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = 'https://api.octav.fi'
        self.headers = {'Authorization': f'Bearer {api_key}'}
        self.seen_hashes = set()

    def get_transactions(self, address: str, limit: int = 50) -> List[Dict]:
        """Fetch recent transactions"""
        try:
            response = requests.get(
                f'{self.base_url}/v1/transactions?addresses={address}&limit={limit}&sort=DESC',
                headers=self.headers
            )
            response.raise_for_status()
            return response.json()
        except Exception as e:
            print(f"Error fetching transactions: {e}")
            return []

    def check_new_transactions(self, address: str):
        """Check for and process new transactions"""
        transactions = self.get_transactions(address)

        new_txs = [
            tx for tx in transactions
            if tx['hash'] not in self.seen_hashes
        ]

        if new_txs:
            for tx in new_txs:
                self.process_transaction(tx, address)
                self.seen_hashes.add(tx['hash'])

    def process_transaction(self, tx: Dict, address: str):
        """Process and categorize a transaction"""
        tx_type = tx['txType']
        value = sum(float(asset.get('value', 0)) for asset in tx.get('assets', []))

        # Alert on large transactions
        if value > 1000:
            self.send_alert(
                f"💰 Large Transaction Detected\n"
                f"Type: {tx_type}\n"
                f"Value: ${value:,.2f}\n"
                f"Hash: {tx['hash'][:16]}...\n"
                f"Chain: {tx['chainKey']}"
            )

        # Alert on specific transaction types
        if tx_type in ['TRANSFEROUT', 'SWAP']:
            self.send_alert(
                f"📤 Outgoing Transaction\n"
                f"Type: {tx_type}\n"
                f"Value: ${value:,.2f}"
            )

        elif tx_type in ['CLAIM', 'AIRDROP']:
            self.send_alert(
                f"🎁 Reward Claimed\n"
                f"Type: {tx_type}\n"
                f"Value: ${value:,.2f}"
            )

Adding Notification Channels

Implement real notifications using popular services:
  • Email (SendGrid)
  • Telegram
  • Discord
  • Slack
import os
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail

def send_email_alert(self, message: str):
    """Send email notification via SendGrid"""
    email_message = Mail(
        from_email='[email protected]',
        to_emails='[email protected]',
        subject='Portfolio Alert',
        plain_text_content=message
    )

    try:
        sg = SendGridAPIClient(os.getenv('SENDGRID_API_KEY'))
        response = sg.send(email_message)
        print(f"Email sent: {response.status_code}")
    except Exception as e:
        print(f"Error sending email: {e}")

Deployment Options

AWS Lambda, Google Cloud Functions, Vercel FunctionsPerfect for scheduled monitoring:
# handler.py - AWS Lambda example
import json
from portfolio_monitor import PortfolioMonitorAgent

def lambda_handler(event, context):
    agent = PortfolioMonitorAgent(os.getenv('OCTAV_API_KEY'))

    addresses = event.get('addresses', [])

    for address in addresses:
        agent.check_balance_threshold(address, 1000)
        agent.check_significant_change(address, 5.0)

    return {
        'statusCode': 200,
        'body': json.dumps('Monitoring complete')
    }
Schedule with CloudWatch Events (every 5 minutes)
Run agent as a container
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY agent.py .

CMD ["python", "agent.py"]
Deploy to:
  • AWS ECS/Fargate
  • Google Cloud Run
  • Digital Ocean App Platform
  • Fly.io
Traditional server deploymentUse systemd service:
# /etc/systemd/system/portfolio-monitor.service
[Unit]
Description=Portfolio Monitor Agent
After=network.target

[Service]
Type=simple
User=ubuntu
WorkingDirectory=/home/ubuntu/portfolio-monitor
ExecStart=/usr/bin/python3 agent.py
Restart=always
RestartSec=10
Environment="OCTAV_API_KEY=your_key_here"

[Install]
WantedBy=multi-user.target
Start with: systemctl start portfolio-monitor

Best Practices

Respect Octav’s 360 requests/minute limit:
from time import sleep
from collections import deque
from datetime import datetime, timedelta

class RateLimiter:
    def __init__(self, max_requests=360, window=60):
        self.max_requests = max_requests
        self.window = window
        self.requests = deque()

    def wait_if_needed(self):
        now = datetime.now()
        cutoff = now - timedelta(seconds=self.window)

        # Remove old requests
        while self.requests and self.requests[0] < cutoff:
            self.requests.popleft()

        # Check limit
        if len(self.requests) >= self.max_requests:
            sleep_time = (self.requests[0] + timedelta(seconds=self.window) - now).total_seconds()
            if sleep_time > 0:
                sleep(sleep_time)

        self.requests.append(now)
Implement robust error handling:
import time
from requests.exceptions import RequestException

def fetch_with_retry(self, url, max_retries=3):
    """Fetch with exponential backoff"""
    for attempt in range(max_retries):
        try:
            response = requests.get(url, headers=self.headers, timeout=30)
            response.raise_for_status()
            return response.json()
        except RequestException as e:
            if attempt == max_retries - 1:
                raise
            wait_time = 2 ** attempt
            print(f"Retry {attempt + 1}/{max_retries} in {wait_time}s")
            time.sleep(wait_time)
Monitor API credit usage:
def check_credits(self):
    """Check remaining credits before heavy operations"""
    response = requests.get(
        f'{self.base_url}/v1/credits',
        headers=self.headers
    )
    credits = response.json()

    if credits < 100:
        self.send_alert(f"⚠️ Low API credits: {credits} remaining")

    return credits
Implement comprehensive logging:
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('agent.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)
logger.info(f"Monitoring {len(addresses)} addresses")

Next Steps