Rate Limiting Guide for AI Agents

This guide covers rate limit handling, backoff strategies, batch endpoints, caching, and webhook alternatives for AI agents consuming the BuyWhere Product Catalog API at high volume.

Rate Limit Tiers

TierRequests/MinuteRequests/Day
Free/Basic10010,000
Standard50050,000
Premium1,000100,000

Rate Limit Headers

Every API response includes headers tracking your current limit window:

HeaderDescription
X-RateLimit-LimitMaximum requests per minute
X-RateLimit-RemainingRemaining requests in current window
X-RateLimit-ResetUnix timestamp when the limit resets
Retry-AfterSeconds to wait (present on 429 responses only)

Example headers:

X-RateLimit-Limit: 500
X-RateLimit-Remaining: 487
X-RateLimit-Reset: 1743782400

Handling 429 Responses

When you exceed the rate limit, the API returns HTTP 429 Too Many Requests:

{
  "detail": "Rate limit exceeded. Retry after 30 seconds."
}

Exponential Backoff with Jitter

Never use a fixed retry delay. Always use exponential backoff with jitter:

import time
import random

def fetch_with_backoff(url, headers, max_retries=5, base_delay=1.0):
    for attempt in range(max_retries):
        response = requests.get(url, headers=headers)
        
        if response.status_code != 429:
            return response
        
        if attempt == max_retries - 1:
            raise Exception("Max retries exceeded")
        
        # Exponential backoff with full jitter
        delay = base_delay * (2 ** attempt)
        jitter = random.uniform(0, delay)
        sleep_time = delay + jitter
        
        retry_after = response.headers.get("Retry-After")
        if retry_after:
            sleep_time = max(sleep_time, int(retry_after))
        
        print(f"Rate limited. Retrying in {sleep_time:.1f}s (attempt {attempt + 1}/{max_retries})")
        time.sleep(sleep_time)

Async Retry with Rate Limit Awareness

For high-throughput agent workloads:

import asyncio
import aiohttp

class RateLimitedClient:
    def __init__(self, api_key, requests_per_minute=500):
        self.api_key = api_key
        self.requests_per_minute = requests_per_minute
        self.request_times = []
        self.semaphore = asyncio.Semaphore(requests_per_minute // 10)
    
    async def get(self, session, url):
        async with self.semaphore:
            await self._throttle()
            
            headers = {"Authorization": f"Bearer {self.api_key}"}
            async with session.get(url, headers=headers) as response:
                if response.status == 429:
                    retry_after = int(response.headers.get("Retry-After", 30))
                    await asyncio.sleep(retry_after)
                    return await self.get(session, url)
                
                return response
    
    async def _throttle(self):
        now = time.time()
        self.request_times = [t for t in self.request_times if now - t < 60]
        
        if len(self.request_times) >= self.requests_per_minute:
            sleep_time = 60 - (now - self.request_times[0])
            await asyncio.sleep(sleep_time)
        
        self.request_times.append(time.time())

Batch Endpoints for Efficiency

Reduce request count by using batch endpoints. Prefer querying multiple products or offers in a single call rather than individual requests.

Batch Product Fetch

Instead of N requests for N products:

# Bad: N requests
for product_id in product_ids:
    response = client.get_product(product_id)

# Good: 1 batch request
response = client.get_products_batch(product_ids)

Bulk Offer Retrieval

For retrieving offers across multiple products:

# Fetch offers for multiple products in one call
offers = client.get_offers_batch(product_ids=product_ids, limit=50)

Ingestion Batch Limits

For scraping pipelines, the ingestion endpoints allow:

  • 100 requests/minute per API key
  • 1000 products per batch
# Optimal batch size for ingestion
batch = [scrape_product(url) for url in urls[:1000]]
client.ingest_batch(batch)

Caching Recommendations

Cache aggressively to reduce API calls and improve response latency.

Cache Product Data

Product catalog data changes infrequently. Cache with TTL based on freshness needs:

import hashlib
import redis

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def get_cached_product(client, product_id, ttl_seconds=3600):
    cache_key = f"product:{product_id}"
    
    cached = redis_client.get(cache_key)
    if cached:
        return json.loads(cached)
    
    product = client.get_product(product_id)
    redis_client.setex(cache_key, ttl_seconds, json.dumps(product))
    return product

ETag and If-None-Match

Use ETags for conditional requests to avoid transferring unchanged data:

def fetch_with_etag(session, url, headers):
    response = session.get(url, headers=headers)
    
    if response.status_code == 200:
        etag = response.headers.get("ETag")
        if etag:
            # Store etag alongside cached data
            cache_key = hashlib.md5(url.encode()).hexdigest()
            redis_client.setex(f"etag:{cache_key}", 3600, etag)
    
    return response

def fetch_if_modified(session, url, headers):
    cache_key = hashlib.md5(url.encode()).hexdigest()
    etag = redis_client.get(f"etag:{cache_key}")
    
    if etag:
        headers["If-None-Match"] = etag.decode()
    
    response = session.get(url, headers=headers)
    
    if response.status_code == 304:
        # Not modified, use cached version
        return get_cached(url)
    
    return response

Cache TTL Guidelines

Data TypeSuggested TTLNotes
Product details1 hourStable data, low churn
Offers/availability5-15 minChanges frequently
Search results5 minFreshness important
Merchant info24 hoursRarely changes
Categories24 hoursStable hierarchy

Webhook Alternatives for Real-Time Updates

For high-volume agents that need real-time product updates, webhooks are more efficient than polling.

Webhook Subscription

Subscribe to product update events:

# Register webhook endpoint
client.webhooks.register(
    event_types=["product.updated", "product.stock_changed", "product.price_changed"],
    url="https://your-agent.example.com/webhooks/buywhere",
    secret="your-webhook-secret"
)

Webhook Handler

from flask import Flask, request, jsonify
import hmac
import hashlib

app = Flask(__name__)

@app.route("/webhooks/buywhere", methods=["POST"])
def handle_webhook():
    signature = request.headers.get("X-BuyWhere-Signature")
    payload = request.get_data()
    
    expected = hmac.new(
        "your-webhook-secret".encode(),
        payload,
        hashlib.sha256
    ).hexdigest()
    
    if not hmac.compare_digest(signature, expected):
        return jsonify({"error": "Invalid signature"}), 401
    
    event = request.json
    event_type = event.get("type")
    product_id = event.get("product_id")
    
    if event_type == "product.price_changed":
        # Invalidate cache
        redis_client.delete(f"product:{product_id}")
    
    return jsonify({"status": "received"}), 200

Polling Fallback for Agents Without Webhook Support

If webhooks aren't available, use smart polling instead of fixed-interval polling:

def smart_poll(client, product_ids, on_change_callback):
    previous_state = {}
    
    while True:
        current_state = client.get_products_batch(product_ids)
        
        for product in current_state["products"]:
            pid = product["id"]
            if pid in previous_state:
                if previous_state[pid] != product:
                    on_change_callback(pid, previous_state[pid], product)
        
        previous_state = {p["id"]: p for p in current_state["products"]}
        
        # Adaptive interval: poll more frequently during active hours
        hour = datetime.now().hour
        interval = 300 if 9 <= hour <= 22 else 900  # 5 min vs 15 min
        time.sleep(interval)

Complete Agent Request Pipeline

import time
import random
import requests
from functools import lru_cache

class BuyWhereAgentClient:
    def __init__(self, api_key, tier="standard"):
        self.api_key = api_key
        self.limits = {"free": 100, "standard": 500, "premium": 1000}
        self.rpm = self.limits.get(tier, 500)
        self.base_url = "https://api.buywhere.ai/v1"
    
    def _headers(self):
        return {"Authorization": f"Bearer {self.api_key}"}
    
    def _throttle(self):
        # Client-side rate limiting to avoid 429s
        now = time.time()
        if not hasattr(self, "_request_log"):
            self._request_log = []
        self._request_log = [t for t in self._request_log if now - t < 60]
        
        if len(self._request_log) >= self.rpm:
            sleep_time = 60 - (now - self._request_log[0])
            time.sleep(sleep_time)
        
        self._request_log.append(time.time())
    
    def _backoff(self, attempt, retry_after=None):
        base = 1.0
        delay = base * (2 ** attempt) + random.uniform(0, 1)
        if retry_after:
            delay = max(delay, retry_after)
        return delay
    
    def request(self, method, endpoint, retries=5):
        url = f"{self.base_url}{endpoint}"
        
        for attempt in range(retries):
            self._throttle()
            response = requests.request(method, url, headers=self._headers())
            
            if response.status_code != 429:
                response.raise_for_status()
                return response.json()
            
            if attempt == retries - 1:
                raise Exception(f"Failed after {retries} retries")
            
            retry_after = response.headers.get("Retry-After")
            sleep_time = self._backoff(attempt, int(retry_after) if retry_after else None)
            time.sleep(sleep_time)
    
    def get_product(self, product_id):
        return self.request("GET", f"/products/{product_id}")
    
    def get_products_batch(self, product_ids):
        ids_param = ",".join(product_ids)
        return self.request("GET", f"/products/batch?ids={ids_param}")
    
    def search(self, query, limit=20):
        return self.request("GET", f"/search?q={query}&limit={limit}")

Summary Checklist

  • Implement exponential backoff with jitter (never fixed delays)
  • Read and respect X-RateLimit-* headers proactively
  • Use batch endpoints instead of N individual requests
  • Cache product data with appropriate TTLs
  • Use ETags for conditional requests on cached resources
  • Prefer webhooks over polling for real-time updates
  • If polling, use adaptive intervals based on time of day
  • Handle 429 responses gracefully without crashing
  • Log rate limit events to monitor your usage patterns