Tutorial 2: Set Up a Price Tracker Agent
Build an agent that monitors product prices and notifies you when prices drop or hit your target.
What You'll Build
A Python agent that:
- Tracks products across Shopee, Lazada, Carousell, and other platforms
- Checks prices on a schedule
- Sends notifications when prices drop or hit your target
Prerequisites
- BuyWhere API key
- Python 3.10+
- (Optional) Telegram bot for notifications
Step 1: Install Dependencies
pip install buywhere requests schedule python-dotenv
Step 2: Define Your Tracking Logic
Create a price_tracker.py:
import os
import time
import requests
import schedule
from dotenv import load_dotenv
from datetime import datetime
load_dotenv()
BUYWHERE_API_KEY = os.environ["BUYWHERE_API_KEY"]
BUYWHERE_BASE_URL = "https://api.buywhere.ai"
HEADERS = {
"Authorization": f"Bearer {BUYWHERE_API_KEY}",
"Content-Type": "application/json"
}
def search_product(query: str, limit: int = 5) -> dict:
"""Search for a product."""
response = requests.get(
f"{BUYWHERE_BASE_URL}/v2/products",
headers=HEADERS,
params={"q": query, "limit": limit},
timeout=30
)
response.raise_for_status()
return response.json()
def track_price(product_query: str, target_price: float = None,
platform: str = None) -> dict:
"""
Track a product and return price status.
Args:
product_query: Product name to search for
target_price: Your target price (optional)
platform: Specific platform to track (optional)
"""
params = {"q": product_query, "limit": 20}
if platform:
params["platform"] = platform
results = search_product(product_query, limit=20)
if not results.get("items"):
return {"status": "not_found", "query": product_query}
items = results["items"]
if target_price:
below_target = [p for p in items
if float(p.get("price_sgd", p["price"])) <= target_price]
if below_target:
best = min(below_target,
key=lambda p: float(p.get("price_sgd", p["price"])))
return {
"status": "target_reached",
"product": best["name"],
"current_price": best["price"],
"currency": best["currency"],
"target_price": target_price,
"source": best["source"],
"url": best.get("affiliate_url", best["buy_url"]),
"savings": target_price - float(best.get("price_sgd", best["price"]))
}
lowest = min(items, key=lambda p: float(p.get("price_sgd", p["price"])))
current_lowest = float(lowest.get("price_sgd", lowest["price"]))
return {
"status": "tracking",
"product": lowest["name"],
"current_lowest": current_lowest,
"currency": lowest["currency"],
"target_price": target_price,
"target_reached": target_price <= current_lowest if target_price else False,
"source": lowest["source"],
"all_options": [
{
"name": p["name"],
"price": p["price"],
"source": p["source"],
"rating": p.get("rating")
}
for p in sorted(items, key=lambda x: float(x.get("price_sgd", x["price"])))[:5]
]
}
Step 3: Add Scheduled Checking
def job():
"""Run price checks for tracked products."""
tracked_products = [
{"query": "Sony WH-1000XM5", "target": 350.0},
{"query": "Nintendo Switch OLED", "target": 380.0},
{"query": "Apple AirPods Pro 2", "target": 250.0},
]
for item in tracked_products:
result = track_price(item["query"], item["target"])
print(f"[{datetime.now().isoformat()}] {result}")
if result["status"] == "target_reached":
send_notification(result)
def send_notification(result: dict):
"""Send notification when target price is reached."""
message = (
f"🎉 Price Alert!\n\n"
f"{result['product']}\n"
f"Current: {result['currency']} {result['current_price']}\n"
f"Target: {result['currency']} {result['target_price']}\n"
f"Savings: {result['currency']} {result['savings']:.2f}\n"
f"Source: {result['source']}\n\n"
f"Buy: {result['url']}"
)
print(f"NOTIFICATION: {message}")
# Schedule checks every 6 hours
schedule.every(6).hours.do(job)
if __name__ == "__main__":
print("Price tracker started. Checking every 6 hours.")
job() # Run immediately
while True:
schedule.run_pending()
time.sleep(60)
Step 4: Run the Tracker
export BUYWHERE_API_KEY="bw_live_your_key_here"
python price_tracker.py
Output:
Price tracker started. Checking every 6 hours.
[2026-04-16T10:00:00] {'status': 'target_reached', 'product': 'Sony WH-1000XM5 Wireless Headphones', 'current_price': '342.00', 'currency': 'SGD', 'target_price': 350.0, 'source': 'shopee_sg', ...}
NOTIFICATION: 🎉 Price Alert!
Sony WH-1000XM5 Wireless Headphones
Current: SGD 342.00
Target: SGD 350.00
Savings: SGD 8.00
Source: shopee_sg
Step 5: Add Telegram Notifications (Optional)
import telegram
TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN")
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID")
def send_telegram_notification(result: dict):
"""Send notification via Telegram."""
bot = telegram.Bot(token=TELEGRAM_BOT_TOKEN)
message = (
f"🎉 *Price Alert!*\n\n"
f"*{result['product']}*\n"
f"Current: {result['currency']} {result['current_price']}\n"
f"Target: {result['currency']} {result['target_price']}\n"
f"Savings: {result['currency']} {result['savings']:.2f}\n"
f"Source: {result['source']}\n\n"
f"[Buy Now]({result['url']})"
)
bot.send_message(chat_id=TELEGRAM_CHAT_ID, text=message, parse_mode="Markdown")
Advanced: Multi-Platform Price History
Track price changes over time:
from collections import defaultdict
import json
class PriceHistory:
def __init__(self, storage_file: str = "price_history.json"):
self.storage_file = storage_file
self.history = self._load()
def _load(self) -> dict:
try:
with open(self.storage_file, "r") as f:
return json.load(f)
except FileNotFoundError:
return {}
def _save(self):
with open(self.storage_file, "w") as f:
json.dump(self.history, f, indent=2)
def record(self, product_name: str, price: float, source: str):
key = f"{product_name}|{source}"
if key not in self.history:
self.history[key] = []
self.history[key].append({
"timestamp": datetime.now().isoformat(),
"price": price
})
self._save()
def get_trend(self, product_name: str, source: str = None) -> dict:
key_prefix = product_name
if source:
key = f"{product_name}|{source}"
prices = [e["price"] for e in self.history.get(key, [])]
else:
prices = []
for k, entries in self.history.items():
if k.startswith(key_prefix):
prices.extend([e["price"] for e in entries])
if not prices:
return {"trend": "unknown", "data_points": 0}
first_price = prices[0]
last_price = prices[-1]
change_pct = ((last_price - first_price) / first_price) * 100
return {
"trend": "down" if change_pct < -5 else "up" if change_pct > 5 else "stable",
"change_percent": round(change_pct, 2),
"lowest": min(prices),
"highest": max(prices),
"current": last_price,
"data_points": len(prices)
}