← Back to documentation

price-tracker-agent

Tutorial 2: Set Up a Price Tracker Agent

Build an agent that monitors product prices and notifies you when prices drop or hit your target.

What You'll Build

A Python agent that:

  1. Tracks products across Shopee, Lazada, Carousell, and other platforms
  2. Checks prices on a schedule
  3. Sends notifications when prices drop or hit your target

Prerequisites

  • BuyWhere API key
  • Python 3.10+
  • (Optional) Telegram bot for notifications

Step 1: Install Dependencies

pip install buywhere requests schedule python-dotenv

Step 2: Define Your Tracking Logic

Create a price_tracker.py:

import os
import time
import requests
import schedule
from dotenv import load_dotenv
from datetime import datetime

load_dotenv()

BUYWHERE_API_KEY = os.environ["BUYWHERE_API_KEY"]
BUYWHERE_BASE_URL = "https://api.buywhere.ai"

HEADERS = {
    "Authorization": f"Bearer {BUYWHERE_API_KEY}",
    "Content-Type": "application/json"
}

def search_product(query: str, limit: int = 5) -> dict:
    """Search for a product."""
    response = requests.get(
        f"{BUYWHERE_BASE_URL}/v2/products",
        headers=HEADERS,
        params={"q": query, "limit": limit},
        timeout=30
    )
    response.raise_for_status()
    return response.json()

def track_price(product_query: str, target_price: float = None, 
                platform: str = None) -> dict:
    """
    Track a product and return price status.
    
    Args:
        product_query: Product name to search for
        target_price: Your target price (optional)
        platform: Specific platform to track (optional)
    """
    params = {"q": product_query, "limit": 20}
    if platform:
        params["platform"] = platform
    
    results = search_product(product_query, limit=20)
    
    if not results.get("items"):
        return {"status": "not_found", "query": product_query}
    
    items = results["items"]
    
    if target_price:
        below_target = [p for p in items 
                       if float(p.get("price_sgd", p["price"])) <= target_price]
        if below_target:
            best = min(below_target, 
                      key=lambda p: float(p.get("price_sgd", p["price"])))
            return {
                "status": "target_reached",
                "product": best["name"],
                "current_price": best["price"],
                "currency": best["currency"],
                "target_price": target_price,
                "source": best["source"],
                "url": best.get("affiliate_url", best["buy_url"]),
                "savings": target_price - float(best.get("price_sgd", best["price"]))
            }
    
    lowest = min(items, key=lambda p: float(p.get("price_sgd", p["price"])))
    current_lowest = float(lowest.get("price_sgd", lowest["price"]))
    
    return {
        "status": "tracking",
        "product": lowest["name"],
        "current_lowest": current_lowest,
        "currency": lowest["currency"],
        "target_price": target_price,
        "target_reached": target_price <= current_lowest if target_price else False,
        "source": lowest["source"],
        "all_options": [
            {
                "name": p["name"],
                "price": p["price"],
                "source": p["source"],
                "rating": p.get("rating")
            }
            for p in sorted(items, key=lambda x: float(x.get("price_sgd", x["price"])))[:5]
        ]
    }

Step 3: Add Scheduled Checking

def job():
    """Run price checks for tracked products."""
    tracked_products = [
        {"query": "Sony WH-1000XM5", "target": 350.0},
        {"query": "Nintendo Switch OLED", "target": 380.0},
        {"query": "Apple AirPods Pro 2", "target": 250.0},
    ]
    
    for item in tracked_products:
        result = track_price(item["query"], item["target"])
        print(f"[{datetime.now().isoformat()}] {result}")
        
        if result["status"] == "target_reached":
            send_notification(result)

def send_notification(result: dict):
    """Send notification when target price is reached."""
    message = (
        f"🎉 Price Alert!\n\n"
        f"{result['product']}\n"
        f"Current: {result['currency']} {result['current_price']}\n"
        f"Target: {result['currency']} {result['target_price']}\n"
        f"Savings: {result['currency']} {result['savings']:.2f}\n"
        f"Source: {result['source']}\n\n"
        f"Buy: {result['url']}"
    )
    print(f"NOTIFICATION: {message}")

# Schedule checks every 6 hours
schedule.every(6).hours.do(job)

if __name__ == "__main__":
    print("Price tracker started. Checking every 6 hours.")
    job()  # Run immediately
    while True:
        schedule.run_pending()
        time.sleep(60)

Step 4: Run the Tracker

export BUYWHERE_API_KEY="bw_live_your_key_here"
python price_tracker.py

Output:

Price tracker started. Checking every 6 hours.
[2026-04-16T10:00:00] {'status': 'target_reached', 'product': 'Sony WH-1000XM5 Wireless Headphones', 'current_price': '342.00', 'currency': 'SGD', 'target_price': 350.0, 'source': 'shopee_sg', ...}
NOTIFICATION: 🎉 Price Alert!
Sony WH-1000XM5 Wireless Headphones
Current: SGD 342.00
Target: SGD 350.00
Savings: SGD 8.00
Source: shopee_sg

Step 5: Add Telegram Notifications (Optional)

import telegram

TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN")
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID")

def send_telegram_notification(result: dict):
    """Send notification via Telegram."""
    bot = telegram.Bot(token=TELEGRAM_BOT_TOKEN)
    message = (
        f"🎉 *Price Alert!*\n\n"
        f"*{result['product']}*\n"
        f"Current: {result['currency']} {result['current_price']}\n"
        f"Target: {result['currency']} {result['target_price']}\n"
        f"Savings: {result['currency']} {result['savings']:.2f}\n"
        f"Source: {result['source']}\n\n"
        f"[Buy Now]({result['url']})"
    )
    bot.send_message(chat_id=TELEGRAM_CHAT_ID, text=message, parse_mode="Markdown")

Advanced: Multi-Platform Price History

Track price changes over time:

from collections import defaultdict
import json

class PriceHistory:
    def __init__(self, storage_file: str = "price_history.json"):
        self.storage_file = storage_file
        self.history = self._load()
    
    def _load(self) -> dict:
        try:
            with open(self.storage_file, "r") as f:
                return json.load(f)
        except FileNotFoundError:
            return {}
    
    def _save(self):
        with open(self.storage_file, "w") as f:
            json.dump(self.history, f, indent=2)
    
    def record(self, product_name: str, price: float, source: str):
        key = f"{product_name}|{source}"
        if key not in self.history:
            self.history[key] = []
        
        self.history[key].append({
            "timestamp": datetime.now().isoformat(),
            "price": price
        })
        self._save()
    
    def get_trend(self, product_name: str, source: str = None) -> dict:
        key_prefix = product_name
        if source:
            key = f"{product_name}|{source}"
            prices = [e["price"] for e in self.history.get(key, [])]
        else:
            prices = []
            for k, entries in self.history.items():
                if k.startswith(key_prefix):
                    prices.extend([e["price"] for e in entries])
        
        if not prices:
            return {"trend": "unknown", "data_points": 0}
        
        first_price = prices[0]
        last_price = prices[-1]
        change_pct = ((last_price - first_price) / first_price) * 100
        
        return {
            "trend": "down" if change_pct < -5 else "up" if change_pct > 5 else "stable",
            "change_percent": round(change_pct, 2),
            "lowest": min(prices),
            "highest": max(prices),
            "current": last_price,
            "data_points": len(prices)
        }

Next Steps