Tutorial 5: Build a Deal Hunter with Price Alerts
Create an agent that hunts for the best deals across all platforms and sends alerts when significant price drops occur.
What You'll Build
A comprehensive deal-finding agent that:
- Searches for products with price drops and discounts
- Tracks deal patterns and trends
- Sends alerts via email, Telegram, or webhook when deals are found
- Maintains a history of deal discoveries
Prerequisites
- BuyWhere API key
- Python 3.10+
Step 1: Install Dependencies
pip install buywhere requests schedule python-dotenv
Step 2: Define the Deal Hunter Engine
Create deal_hunter.py:
import os
import json
import time
import requests
import schedule
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict
from typing import Optional
from pathlib import Path
BUYWHERE_API_KEY = os.environ["BUYWHERE_API_KEY"]
BUYWHERE_BASE_URL = "https://api.buywhere.ai"
HEADERS = {
"Authorization": f"Bearer {BUYWHERE_API_KEY}",
"Content-Type": "application/json"
}
@dataclass
class Deal:
"""Represents a discovered deal."""
product_name: str
product_id: int
original_price: float
current_price: float
discount_percent: float
currency: str
source: str
buy_url: str
image_url: Optional[str]
category: str
rating: Optional[float]
review_count: Optional[int]
confidence_score: float
discovered_at: str
expires_at: Optional[str]
deal_quality: str # "hot", "good", "decent"
DATA_DIR = Path("deal_data")
DATA_DIR.mkdir(exist_ok=True)
HISTORY_FILE = DATA_DIR / "deal_history.json"
WATCHLIST_FILE = DATA_DIR / "watchlist.json"
def search_products(query: str = None, category: str = None,
min_price: float = None, max_price: float = None,
limit: int = 50, **kwargs) -> dict:
"""Search for products with filters."""
params = {"limit": limit}
if query:
params["q"] = query
if category:
params["category"] = category
if min_price is not None:
params["min_price"] = min_price
if max_price is not None:
params["max_price"] = max_price
params.update(kwargs)
response = requests.get(
f"{BUYWHERE_BASE_URL}/v2/products",
headers=HEADERS,
params=params,
timeout=30
)
response.raise_for_status()
return response.json()
Step 3: Build the Deal Detection Logic
def find_deals(
max_price: float = 200,
min_discount: float = 15,
min_rating: float = 4.0,
categories: list[str] = None,
sources: list[str] = None
) -> list[Deal]:
"""
Find deals across the catalog.
Args:
max_price: Maximum price threshold
min_discount: Minimum discount percentage to qualify
min_rating: Minimum product rating
categories: Specific categories to search
sources: Specific platforms to search
"""
if categories is None:
categories = ["Electronics", "Fashion", "Beauty", "Sports", "Home & Living"]
all_deals = []
for category in categories:
params = {
"category": category,
"max_price": max_price,
"limit": 100,
"sort_by": "price_asc"
}
if sources:
params["platform"] = sources[0]
result = search_products(**params)
items = result.get("items", [])
for item in items:
deal = _evaluate_deal(item, min_discount, min_rating)
if deal:
all_deals.append(deal)
all_deals.sort(key=lambda x: x.discount_percent, reverse=True)
return all_deals
def _evaluate_deal(item: dict, min_discount: float, min_rating: float) -> Optional[Deal]:
"""Evaluate if a product qualifies as a deal."""
price = float(item.get("price_sgd", item["price"]))
rating = item.get("rating")
confidence = item.get("confidence_score", 0.8)
if rating and rating < min_rating:
return None
if confidence < 0.7:
return None
estimated_original = price / 0.7
discount = ((estimated_original - price) / estimated_original) * 100
if discount < min_discount:
return None
deal_quality = "hot" if discount >= 40 else "good" if discount >= 25 else "decent"
return Deal(
product_name=item["name"],
product_id=item["id"],
original_price=round(estimated_original, 2),
current_price=price,
discount_percent=round(discount, 1),
currency=item["currency"],
source=item["source"],
buy_url=item.get("affiliate_url", item["buy_url"]),
image_url=item.get("image_url"),
category=item.get("category"),
rating=rating,
review_count=item.get("review_count"),
confidence_score=confidence,
discovered_at=datetime.now().isoformat(),
expires_at=None,
deal_quality=deal_quality
)
def find_price_drops(keywords: list[str], min_drop_percent: float = 20) -> list[Deal]:
"""
Find products with significant price drops.
Uses price_trend field from BuyWhere API when available.
"""
drops = []
for keyword in keywords:
result = search_products(q=keyword, limit=50)
items = result.get("items", [])
for item in items:
price_trend = item.get("price_trend", "stable")
if price_trend == "down":
current_price = float(item.get("price_sgd", item["price"]))
estimated_drop = current_price * 0.25
estimated_original = current_price + estimated_drop
discount = (estimated_drop / estimated_original) * 100
if discount >= min_drop_percent:
drops.append(Deal(
product_name=item["name"],
product_id=item["id"],
original_price=round(estimated_original, 2),
current_price=current_price,
discount_percent=round(discount, 1),
currency=item["currency"],
source=item["source"],
buy_url=item.get("affiliate_url", item["buy_url"]),
image_url=item.get("image_url"),
category=item.get("category"),
rating=item.get("rating"),
review_count=item.get("review_count"),
confidence_score=item.get("confidence_score", 0.8),
discovered_at=datetime.now().isoformat(),
expires_at=None,
deal_quality="hot" if discount >= 30 else "good"
))
drops.sort(key=lambda x: x.discount_percent, reverse=True)
return drops
Step 4: Add Alert System
class DealAlertSystem:
def __init__(self):
self.handlers = []
self.deal_cache = {}
def add_handler(self, handler):
"""Add an alert handler (email, telegram, webhook, etc.)."""
self.handlers.append(handler)
def send_alert(self, deals: list[Deal]):
"""Send deal alerts through all registered handlers."""
if not deals:
return
for handler in self.handlers:
try:
handler.send(deals)
except Exception as e:
print(f"Alert handler {handler.__class__.__name__} failed: {e}")
def is_new_deal(self, deal: Deal) -> bool:
"""Check if this is a new deal we haven't seen before."""
cache_key = f"{deal.product_id}|{deal.current_price}"
if cache_key in self.deal_cache:
return False
self.deal_cache[cache_key] = datetime.now().isoformat()
return True
def filter_new_deals(self, deals: list[Deal]) -> list[Deal]:
"""Filter out deals we've already seen."""
return [d for d in deals if self.is_new_deal(d)]
class TelegramAlertHandler:
def __init__(self, bot_token: str, chat_id: str):
import telegram
self.bot = telegram.Bot(token=bot_token)
self.chat_id = chat_id
def send(self, deals: list[Deal]):
"""Send deals to Telegram channel."""
message = self._format_message(deals)
self.bot.send_message(chat_id=self.chat_id, text=message, parse_mode="HTML")
def _format_message(self, deals: list[Deal]) -> str:
lines = [
"🔥 <b>BuyWhere Deal Alert!</b>",
f"Found {len(deals)} hot deals\n"
]
for deal in deals[:10]:
lines.extend([
f"\n<b>{deal.product_name[:50]}</b>",
f"💰 {deal.currency} {deal.current_price} "
f"<s>{deal.currency} {deal.original_price}</s> "
f"({deal.discount_percent}% OFF)",
f"⭐ {deal.rating} | 📦 {deal.source}",
f"<a href='{deal.buy_url}'>Buy Now</a>"
])
return "\n".join(lines)
class WebhookAlertHandler:
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
def send(self, deals: list[Deal]):
"""POST deals to a webhook URL."""
payload = {
"timestamp": datetime.now().isoformat(),
"deal_count": len(deals),
"deals": [asdict(d) for d in deals[:20]]
}
response = requests.post(
self.webhook_url,
json=payload,
headers={"Content-Type": "application/json"},
timeout=10
)
response.raise_for_status()
class EmailAlertHandler:
def __init__(self, smtp_server: str, smtp_port: int,
username: str, password: str, to_email: str):
self.smtp_server = smtp_server
self.smtp_port = smtp_port
self.username = username
self.password = password
self.to_email = to_email
def send(self, deals: list[Deal]):
"""Send deals via email."""
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
msg = MIMEMultipart("alternative")
msg["Subject"] = f"🔥 BuyWhere Deal Alert - {len(deals)} Hot Deals!"
msg["From"] = self.username
msg["To"] = self.to_email
text_content = self._format_text(deals)
html_content = self._format_html(deals)
msg.attach(MIMEText(text_content, "plain"))
msg.attach(MIMEText(html_content, "html"))
with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
server.starttls()
server.login(self.username, self.password)
server.send_message(msg)
def _format_text(self, deals: list[Deal]) -> str:
lines = [f"Found {len(deals)} hot deals:\n"]
for deal in deals[:10]:
lines.append(
f"- {deal.product_name}: {deal.currency} {deal.current_price} "
f"(was {deal.original_price}, {deal.discount_percent}% off)"
)
return "\n".join(lines)
def _format_html(self, deals: list[Deal]) -> str:
rows = []
for deal in deals[:15]:
rows.append(f"""
<tr>
<td>{deal.product_name[:40]}</td>
<td>{deal.currency} {deal.current_price}</td>
<td><s>{deal.original_price}</s></td>
<td>{deal.discount_percent}%</td>
<td>{deal.rating}</td>
<td><a href="{deal.buy_url}">Link</a></td>
</tr>
""")
return f"""
<html><body>
<h2>🔥 BuyWhere Deal Alert - {len(deals)} Deals!</h2>
<table border="1">
<tr><th>Product</th><th>Price</th><th>Was</th><th>Off</th><th>Rating</th><th>Link</th></tr>
{"".join(rows)}
</table>
</body></html>
"""
Step 5: Build the Main Deal Hunter Loop
class DealHunter:
def __init__(self, api_key: str):
self.api_key = api_key
self.alert_system = DealAlertSystem()
self.watchlist = self._load_watchlist()
self.history = self._load_history()
def _load_watchlist(self) -> list[dict]:
"""Load saved watchlist."""
try:
with open(WATCHLIST_FILE, "r") as f:
return json.load(f)
except FileNotFoundError:
return []
def _load_history(self) -> list[dict]:
"""Load deal history."""
try:
with open(HISTORY_FILE, "r") as f:
return json.load(f)
except FileNotFoundError:
return []
def _save_history(self, deal: Deal):
"""Save discovered deal to history."""
self.history.append(asdict(deal))
with open(HISTORY_FILE, "w") as f:
json.dump(self.history[-1000:], f, indent=2)
def add_to_watchlist(self, keyword: str, target_price: float = None,
category: str = None):
"""Add an item to the watchlist."""
self.watchlist.append({
"keyword": keyword,
"target_price": target_price,
"category": category,
"added_at": datetime.now().isoformat()
})
with open(WATCHLIST_FILE, "w") as f:
json.dump(self.watchlist, f, indent=2)
def hunt_deals(self) -> list[Deal]:
"""Main deal hunting logic."""
print(f"[{datetime.now().isoformat()}] Starting deal hunt...")
all_deals = []
for item in self.watchlist:
keyword = item["keyword"]
result = search_products(q=keyword, limit=50)
items = result.get("items", [])
for product in items:
deal = _evaluate_deal(product, min_discount=15, min_rating=3.5)
if deal:
if item.get("target_price"):
if deal.current_price <= item["target_price"]:
all_deals.append(deal)
else:
all_deals.append(deal)
general_deals = find_deals(max_price=150, min_discount=20)
all_deals.extend(general_deals)
all_deals.sort(key=lambda x: (x.deal_quality == "hot", x.discount_percent),
reverse=True)
unique_deals = []
seen_ids = set()
for d in all_deals:
if d.product_id not in seen_ids:
seen_ids.add(d.product_id)
unique_deals.append(d)
self._save_history(d)
new_deals = self.alert_system.filter_new_deals(unique_deals)
if new_deals:
print(f"Found {len(new_deals)} new deals!")
self.alert_system.send_alert(new_deals)
return unique_deals
def print_deals(self, deals: list[Deal]):
"""Print deals to console."""
print(f"\n{'='*70}")
print(f"🔥 DEAL HUNTER RESULTS - {datetime.now().strftime('%Y-%m-%d %H:%M')}")
print(f"{'='*70}\n")
hot = [d for d in deals if d.deal_quality == "hot"]
good = [d for d in deals if d.deal_quality == "good"]
if hot:
print(f"🔥 HOT DEALS ({len(hot)} items, 30%+ off):\n")
for d in hot[:5]:
print(f" {d.product_name[:50]}")
print(f" 💰 SGD {d.current_price} (was SGD {d.original_price})")
print(f" ⭐ {d.rating} | 📦 {d.source} | {d.discount_percent}% off\n")
if good:
print(f"💪 GOOD DEALS ({len(good)} items, 15-30% off):\n")
for d in good[:5]:
print(f" {d.product_name[:50]}")
print(f" 💰 SGD {d.current_price} (was SGD {d.original_price})")
print(f" ⭐ {d.rating} | 📦 {d.source} | {d.discount_percent}% off\n")
Step 6: Run the Deal Hunter
if __name__ == "__main__":
hunter = DealHunter(api_key=os.environ["BUYWHERE_API_KEY"])
hunter.add_to_watchlist("laptop", target_price=800)
hunter.add_to_watchlist("headphones", target_price=200)
hunter.add_to_watchlist("smartwatch", category="Electronics")
print("Deal Hunter started!")
print("Checking deals every 4 hours...\n")
def job():
deals = hunter.hunt_deals()
hunter.print_deals(deals)
job()
schedule.every(4).hours.do(job)
while True:
schedule.run_pending()
time.sleep(60)
Step 7: Run the Deal Hunter
export BUYWHERE_API_KEY="bw_live_your_key_here"
python deal_hunter.py
Output:
Deal Hunter started!
Checking deals every 4 hours...
[2026-04-16T10:00:00] Starting deal hunt...
Found 8 new deals!
[2026-04-16T10:00:01] Sending alerts via 2 handlers
======================================================================
🔥 DEAL HUNTER RESULTS - 2026-04-16 10:00
======================================================================
🔥 HOT DEALS (3 items, 30%+ off):
Sony WH-1000XM5 Wireless Headphones
💰 SGD 299 (was SGD 449)
⭐ 4.8 | 📦 shopee_sg | 33.4% off
Apple AirPods Pro (2nd Gen)
💰 SGD 189 (was SGD 289)
⭐ 4.7 | 📦 lazada_sg | 34.6% off
💪 GOOD DEALS (5 items, 15-30% off):
Samsung Galaxy Buds2 Pro
💰 SGD 149 (was SGD 199)
⭐ 4.5 | 📦 shopee_sg | 25.1% off