Rate Limiting Guide for AI Agents
This guide covers rate limit handling, backoff strategies, batch endpoints, caching, and webhook alternatives for AI agents consuming the BuyWhere Product Catalog API at high volume.
Rate Limit Tiers
| Tier | Requests/Minute | Requests/Day |
|---|---|---|
| Free/Basic | 100 | 10,000 |
| Standard | 500 | 50,000 |
| Premium | 1,000 | 100,000 |
Rate Limit Headers
Every API response includes headers tracking your current limit window:
| Header | Description |
|---|---|
X-RateLimit-Limit | Maximum requests per minute |
X-RateLimit-Remaining | Remaining requests in current window |
X-RateLimit-Reset | Unix timestamp when the limit resets |
Retry-After | Seconds to wait (present on 429 responses only) |
Example headers:
X-RateLimit-Limit: 500
X-RateLimit-Remaining: 487
X-RateLimit-Reset: 1743782400
Handling 429 Responses
When you exceed the rate limit, the API returns HTTP 429 Too Many Requests:
{
"detail": "Rate limit exceeded. Retry after 30 seconds."
}
Exponential Backoff with Jitter
Never use a fixed retry delay. Always use exponential backoff with jitter:
import time
import random
def fetch_with_backoff(url, headers, max_retries=5, base_delay=1.0):
for attempt in range(max_retries):
response = requests.get(url, headers=headers)
if response.status_code != 429:
return response
if attempt == max_retries - 1:
raise Exception("Max retries exceeded")
# Exponential backoff with full jitter
delay = base_delay * (2 ** attempt)
jitter = random.uniform(0, delay)
sleep_time = delay + jitter
retry_after = response.headers.get("Retry-After")
if retry_after:
sleep_time = max(sleep_time, int(retry_after))
print(f"Rate limited. Retrying in {sleep_time:.1f}s (attempt {attempt + 1}/{max_retries})")
time.sleep(sleep_time)
Async Retry with Rate Limit Awareness
For high-throughput agent workloads:
import asyncio
import aiohttp
class RateLimitedClient:
def __init__(self, api_key, requests_per_minute=500):
self.api_key = api_key
self.requests_per_minute = requests_per_minute
self.request_times = []
self.semaphore = asyncio.Semaphore(requests_per_minute // 10)
async def get(self, session, url):
async with self.semaphore:
await self._throttle()
headers = {"Authorization": f"Bearer {self.api_key}"}
async with session.get(url, headers=headers) as response:
if response.status == 429:
retry_after = int(response.headers.get("Retry-After", 30))
await asyncio.sleep(retry_after)
return await self.get(session, url)
return response
async def _throttle(self):
now = time.time()
self.request_times = [t for t in self.request_times if now - t < 60]
if len(self.request_times) >= self.requests_per_minute:
sleep_time = 60 - (now - self.request_times[0])
await asyncio.sleep(sleep_time)
self.request_times.append(time.time())
Batch Endpoints for Efficiency
Reduce request count by using batch endpoints. Prefer querying multiple products or offers in a single call rather than individual requests.
Batch Product Fetch
Instead of N requests for N products:
# Bad: N requests
for product_id in product_ids:
response = client.get_product(product_id)
# Good: 1 batch request
response = client.get_products_batch(product_ids)
Bulk Offer Retrieval
For retrieving offers across multiple products:
# Fetch offers for multiple products in one call
offers = client.get_offers_batch(product_ids=product_ids, limit=50)
Ingestion Batch Limits
For scraping pipelines, the ingestion endpoints allow:
- 100 requests/minute per API key
- 1000 products per batch
# Optimal batch size for ingestion
batch = [scrape_product(url) for url in urls[:1000]]
client.ingest_batch(batch)
Caching Recommendations
Cache aggressively to reduce API calls and improve response latency.
Cache Product Data
Product catalog data changes infrequently. Cache with TTL based on freshness needs:
import hashlib
import redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_cached_product(client, product_id, ttl_seconds=3600):
cache_key = f"product:{product_id}"
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
product = client.get_product(product_id)
redis_client.setex(cache_key, ttl_seconds, json.dumps(product))
return product
ETag and If-None-Match
Use ETags for conditional requests to avoid transferring unchanged data:
def fetch_with_etag(session, url, headers):
response = session.get(url, headers=headers)
if response.status_code == 200:
etag = response.headers.get("ETag")
if etag:
# Store etag alongside cached data
cache_key = hashlib.md5(url.encode()).hexdigest()
redis_client.setex(f"etag:{cache_key}", 3600, etag)
return response
def fetch_if_modified(session, url, headers):
cache_key = hashlib.md5(url.encode()).hexdigest()
etag = redis_client.get(f"etag:{cache_key}")
if etag:
headers["If-None-Match"] = etag.decode()
response = session.get(url, headers=headers)
if response.status_code == 304:
# Not modified, use cached version
return get_cached(url)
return response
Cache TTL Guidelines
| Data Type | Suggested TTL | Notes |
|---|---|---|
| Product details | 1 hour | Stable data, low churn |
| Offers/availability | 5-15 min | Changes frequently |
| Search results | 5 min | Freshness important |
| Merchant info | 24 hours | Rarely changes |
| Categories | 24 hours | Stable hierarchy |
Webhook Alternatives for Real-Time Updates
For high-volume agents that need real-time product updates, webhooks are more efficient than polling.
Webhook Subscription
Subscribe to product update events:
# Register webhook endpoint
client.webhooks.register(
event_types=["product.updated", "product.stock_changed", "product.price_changed"],
url="https://your-agent.example.com/webhooks/buywhere",
secret="your-webhook-secret"
)
Webhook Handler
from flask import Flask, request, jsonify
import hmac
import hashlib
app = Flask(__name__)
@app.route("/webhooks/buywhere", methods=["POST"])
def handle_webhook():
signature = request.headers.get("X-BuyWhere-Signature")
payload = request.get_data()
expected = hmac.new(
"your-webhook-secret".encode(),
payload,
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(signature, expected):
return jsonify({"error": "Invalid signature"}), 401
event = request.json
event_type = event.get("type")
product_id = event.get("product_id")
if event_type == "product.price_changed":
# Invalidate cache
redis_client.delete(f"product:{product_id}")
return jsonify({"status": "received"}), 200
Polling Fallback for Agents Without Webhook Support
If webhooks aren't available, use smart polling instead of fixed-interval polling:
def smart_poll(client, product_ids, on_change_callback):
previous_state = {}
while True:
current_state = client.get_products_batch(product_ids)
for product in current_state["products"]:
pid = product["id"]
if pid in previous_state:
if previous_state[pid] != product:
on_change_callback(pid, previous_state[pid], product)
previous_state = {p["id"]: p for p in current_state["products"]}
# Adaptive interval: poll more frequently during active hours
hour = datetime.now().hour
interval = 300 if 9 <= hour <= 22 else 900 # 5 min vs 15 min
time.sleep(interval)
Complete Agent Request Pipeline
import time
import random
import requests
from functools import lru_cache
class BuyWhereAgentClient:
def __init__(self, api_key, tier="standard"):
self.api_key = api_key
self.limits = {"free": 100, "standard": 500, "premium": 1000}
self.rpm = self.limits.get(tier, 500)
self.base_url = "https://api.buywhere.ai/v1"
def _headers(self):
return {"Authorization": f"Bearer {self.api_key}"}
def _throttle(self):
# Client-side rate limiting to avoid 429s
now = time.time()
if not hasattr(self, "_request_log"):
self._request_log = []
self._request_log = [t for t in self._request_log if now - t < 60]
if len(self._request_log) >= self.rpm:
sleep_time = 60 - (now - self._request_log[0])
time.sleep(sleep_time)
self._request_log.append(time.time())
def _backoff(self, attempt, retry_after=None):
base = 1.0
delay = base * (2 ** attempt) + random.uniform(0, 1)
if retry_after:
delay = max(delay, retry_after)
return delay
def request(self, method, endpoint, retries=5):
url = f"{self.base_url}{endpoint}"
for attempt in range(retries):
self._throttle()
response = requests.request(method, url, headers=self._headers())
if response.status_code != 429:
response.raise_for_status()
return response.json()
if attempt == retries - 1:
raise Exception(f"Failed after {retries} retries")
retry_after = response.headers.get("Retry-After")
sleep_time = self._backoff(attempt, int(retry_after) if retry_after else None)
time.sleep(sleep_time)
def get_product(self, product_id):
return self.request("GET", f"/products/{product_id}")
def get_products_batch(self, product_ids):
ids_param = ",".join(product_ids)
return self.request("GET", f"/products/batch?ids={ids_param}")
def search(self, query, limit=20):
return self.request("GET", f"/search?q={query}&limit={limit}")
Summary Checklist
- Implement exponential backoff with jitter (never fixed delays)
- Read and respect
X-RateLimit-*headers proactively - Use batch endpoints instead of N individual requests
- Cache product data with appropriate TTLs
- Use ETags for conditional requests on cached resources
- Prefer webhooks over polling for real-time updates
- If polling, use adaptive intervals based on time of day
- Handle 429 responses gracefully without crashing
- Log rate limit events to monitor your usage patterns