← Back to documentation

deal-hunter-price-alerts

Tutorial 5: Build a Deal Hunter with Price Alerts

Create an agent that hunts for the best deals across all platforms and sends alerts when significant price drops occur.

What You'll Build

A comprehensive deal-finding agent that:

  1. Searches for products with price drops and discounts
  2. Tracks deal patterns and trends
  3. Sends alerts via email, Telegram, or webhook when deals are found
  4. Maintains a history of deal discoveries

Prerequisites

  • BuyWhere API key
  • Python 3.10+

Step 1: Install Dependencies

pip install buywhere requests schedule python-dotenv

Step 2: Define the Deal Hunter Engine

Create deal_hunter.py:

import os
import json
import time
import requests
import schedule
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict
from typing import Optional
from pathlib import Path

BUYWHERE_API_KEY = os.environ["BUYWHERE_API_KEY"]
BUYWHERE_BASE_URL = "https://api.buywhere.ai"
HEADERS = {
    "Authorization": f"Bearer {BUYWHERE_API_KEY}",
    "Content-Type": "application/json"
}

@dataclass
class Deal:
    """Represents a discovered deal."""
    product_name: str
    product_id: int
    original_price: float
    current_price: float
    discount_percent: float
    currency: str
    source: str
    buy_url: str
    image_url: Optional[str]
    category: str
    rating: Optional[float]
    review_count: Optional[int]
    confidence_score: float
    discovered_at: str
    expires_at: Optional[str]
    deal_quality: str  # "hot", "good", "decent"

DATA_DIR = Path("deal_data")
DATA_DIR.mkdir(exist_ok=True)
HISTORY_FILE = DATA_DIR / "deal_history.json"
WATCHLIST_FILE = DATA_DIR / "watchlist.json"

def search_products(query: str = None, category: str = None,
                   min_price: float = None, max_price: float = None,
                   limit: int = 50, **kwargs) -> dict:
    """Search for products with filters."""
    params = {"limit": limit}
    if query:
        params["q"] = query
    if category:
        params["category"] = category
    if min_price is not None:
        params["min_price"] = min_price
    if max_price is not None:
        params["max_price"] = max_price
    
    params.update(kwargs)
    
    response = requests.get(
        f"{BUYWHERE_BASE_URL}/v2/products",
        headers=HEADERS,
        params=params,
        timeout=30
    )
    response.raise_for_status()
    return response.json()

Step 3: Build the Deal Detection Logic

def find_deals(
    max_price: float = 200,
    min_discount: float = 15,
    min_rating: float = 4.0,
    categories: list[str] = None,
    sources: list[str] = None
) -> list[Deal]:
    """
    Find deals across the catalog.
    
    Args:
        max_price: Maximum price threshold
        min_discount: Minimum discount percentage to qualify
        min_rating: Minimum product rating
        categories: Specific categories to search
        sources: Specific platforms to search
    """
    if categories is None:
        categories = ["Electronics", "Fashion", "Beauty", "Sports", "Home & Living"]
    
    all_deals = []
    
    for category in categories:
        params = {
            "category": category,
            "max_price": max_price,
            "limit": 100,
            "sort_by": "price_asc"
        }
        if sources:
            params["platform"] = sources[0]
        
        result = search_products(**params)
        items = result.get("items", [])
        
        for item in items:
            deal = _evaluate_deal(item, min_discount, min_rating)
            if deal:
                all_deals.append(deal)
    
    all_deals.sort(key=lambda x: x.discount_percent, reverse=True)
    return all_deals

def _evaluate_deal(item: dict, min_discount: float, min_rating: float) -> Optional[Deal]:
    """Evaluate if a product qualifies as a deal."""
    price = float(item.get("price_sgd", item["price"]))
    rating = item.get("rating")
    confidence = item.get("confidence_score", 0.8)
    
    if rating and rating < min_rating:
        return None
    
    if confidence < 0.7:
        return None
    
    estimated_original = price / 0.7
    discount = ((estimated_original - price) / estimated_original) * 100
    
    if discount < min_discount:
        return None
    
    deal_quality = "hot" if discount >= 40 else "good" if discount >= 25 else "decent"
    
    return Deal(
        product_name=item["name"],
        product_id=item["id"],
        original_price=round(estimated_original, 2),
        current_price=price,
        discount_percent=round(discount, 1),
        currency=item["currency"],
        source=item["source"],
        buy_url=item.get("affiliate_url", item["buy_url"]),
        image_url=item.get("image_url"),
        category=item.get("category"),
        rating=rating,
        review_count=item.get("review_count"),
        confidence_score=confidence,
        discovered_at=datetime.now().isoformat(),
        expires_at=None,
        deal_quality=deal_quality
    )

def find_price_drops(keywords: list[str], min_drop_percent: float = 20) -> list[Deal]:
    """
    Find products with significant price drops.
    
    Uses price_trend field from BuyWhere API when available.
    """
    drops = []
    
    for keyword in keywords:
        result = search_products(q=keyword, limit=50)
        items = result.get("items", [])
        
        for item in items:
            price_trend = item.get("price_trend", "stable")
            
            if price_trend == "down":
                current_price = float(item.get("price_sgd", item["price"]))
                estimated_drop = current_price * 0.25
                estimated_original = current_price + estimated_drop
                discount = (estimated_drop / estimated_original) * 100
                
                if discount >= min_drop_percent:
                    drops.append(Deal(
                        product_name=item["name"],
                        product_id=item["id"],
                        original_price=round(estimated_original, 2),
                        current_price=current_price,
                        discount_percent=round(discount, 1),
                        currency=item["currency"],
                        source=item["source"],
                        buy_url=item.get("affiliate_url", item["buy_url"]),
                        image_url=item.get("image_url"),
                        category=item.get("category"),
                        rating=item.get("rating"),
                        review_count=item.get("review_count"),
                        confidence_score=item.get("confidence_score", 0.8),
                        discovered_at=datetime.now().isoformat(),
                        expires_at=None,
                        deal_quality="hot" if discount >= 30 else "good"
                    ))
    
    drops.sort(key=lambda x: x.discount_percent, reverse=True)
    return drops

Step 4: Add Alert System

class DealAlertSystem:
    def __init__(self):
        self.handlers = []
        self.deal_cache = {}
    
    def add_handler(self, handler):
        """Add an alert handler (email, telegram, webhook, etc.)."""
        self.handlers.append(handler)
    
    def send_alert(self, deals: list[Deal]):
        """Send deal alerts through all registered handlers."""
        if not deals:
            return
        
        for handler in self.handlers:
            try:
                handler.send(deals)
            except Exception as e:
                print(f"Alert handler {handler.__class__.__name__} failed: {e}")
    
    def is_new_deal(self, deal: Deal) -> bool:
        """Check if this is a new deal we haven't seen before."""
        cache_key = f"{deal.product_id}|{deal.current_price}"
        
        if cache_key in self.deal_cache:
            return False
        
        self.deal_cache[cache_key] = datetime.now().isoformat()
        return True
    
    def filter_new_deals(self, deals: list[Deal]) -> list[Deal]:
        """Filter out deals we've already seen."""
        return [d for d in deals if self.is_new_deal(d)]


class TelegramAlertHandler:
    def __init__(self, bot_token: str, chat_id: str):
        import telegram
        self.bot = telegram.Bot(token=bot_token)
        self.chat_id = chat_id
    
    def send(self, deals: list[Deal]):
        """Send deals to Telegram channel."""
        message = self._format_message(deals)
        self.bot.send_message(chat_id=self.chat_id, text=message, parse_mode="HTML")
    
    def _format_message(self, deals: list[Deal]) -> str:
        lines = [
            "🔥 <b>BuyWhere Deal Alert!</b>",
            f"Found {len(deals)} hot deals\n"
        ]
        
        for deal in deals[:10]:
            lines.extend([
                f"\n<b>{deal.product_name[:50]}</b>",
                f"💰 {deal.currency} {deal.current_price} "
                f"<s>{deal.currency} {deal.original_price}</s> "
                f"({deal.discount_percent}% OFF)",
                f"⭐ {deal.rating} | 📦 {deal.source}",
                f"<a href='{deal.buy_url}'>Buy Now</a>"
            ])
        
        return "\n".join(lines)


class WebhookAlertHandler:
    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url
    
    def send(self, deals: list[Deal]):
        """POST deals to a webhook URL."""
        payload = {
            "timestamp": datetime.now().isoformat(),
            "deal_count": len(deals),
            "deals": [asdict(d) for d in deals[:20]]
        }
        
        response = requests.post(
            self.webhook_url,
            json=payload,
            headers={"Content-Type": "application/json"},
            timeout=10
        )
        response.raise_for_status()


class EmailAlertHandler:
    def __init__(self, smtp_server: str, smtp_port: int,
                 username: str, password: str, to_email: str):
        self.smtp_server = smtp_server
        self.smtp_port = smtp_port
        self.username = username
        self.password = password
        self.to_email = to_email
    
    def send(self, deals: list[Deal]):
        """Send deals via email."""
        import smtplib
        from email.mime.text import MIMEText
        from email.mime.multipart import MIMEMultipart
        
        msg = MIMEMultipart("alternative")
        msg["Subject"] = f"🔥 BuyWhere Deal Alert - {len(deals)} Hot Deals!"
        msg["From"] = self.username
        msg["To"] = self.to_email
        
        text_content = self._format_text(deals)
        html_content = self._format_html(deals)
        
        msg.attach(MIMEText(text_content, "plain"))
        msg.attach(MIMEText(html_content, "html"))
        
        with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
            server.starttls()
            server.login(self.username, self.password)
            server.send_message(msg)
    
    def _format_text(self, deals: list[Deal]) -> str:
        lines = [f"Found {len(deals)} hot deals:\n"]
        for deal in deals[:10]:
            lines.append(
                f"- {deal.product_name}: {deal.currency} {deal.current_price} "
                f"(was {deal.original_price}, {deal.discount_percent}% off)"
            )
        return "\n".join(lines)
    
    def _format_html(self, deals: list[Deal]) -> str:
        rows = []
        for deal in deals[:15]:
            rows.append(f"""
                <tr>
                    <td>{deal.product_name[:40]}</td>
                    <td>{deal.currency} {deal.current_price}</td>
                    <td><s>{deal.original_price}</s></td>
                    <td>{deal.discount_percent}%</td>
                    <td>{deal.rating}</td>
                    <td><a href="{deal.buy_url}">Link</a></td>
                </tr>
            """)
        
        return f"""
        <html><body>
        <h2>🔥 BuyWhere Deal Alert - {len(deals)} Deals!</h2>
        <table border="1">
            <tr><th>Product</th><th>Price</th><th>Was</th><th>Off</th><th>Rating</th><th>Link</th></tr>
            {"".join(rows)}
        </table>
        </body></html>
        """

Step 5: Build the Main Deal Hunter Loop

class DealHunter:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.alert_system = DealAlertSystem()
        self.watchlist = self._load_watchlist()
        self.history = self._load_history()
    
    def _load_watchlist(self) -> list[dict]:
        """Load saved watchlist."""
        try:
            with open(WATCHLIST_FILE, "r") as f:
                return json.load(f)
        except FileNotFoundError:
            return []
    
    def _load_history(self) -> list[dict]:
        """Load deal history."""
        try:
            with open(HISTORY_FILE, "r") as f:
                return json.load(f)
        except FileNotFoundError:
            return []
    
    def _save_history(self, deal: Deal):
        """Save discovered deal to history."""
        self.history.append(asdict(deal))
        with open(HISTORY_FILE, "w") as f:
            json.dump(self.history[-1000:], f, indent=2)
    
    def add_to_watchlist(self, keyword: str, target_price: float = None,
                        category: str = None):
        """Add an item to the watchlist."""
        self.watchlist.append({
            "keyword": keyword,
            "target_price": target_price,
            "category": category,
            "added_at": datetime.now().isoformat()
        })
        with open(WATCHLIST_FILE, "w") as f:
            json.dump(self.watchlist, f, indent=2)
    
    def hunt_deals(self) -> list[Deal]:
        """Main deal hunting logic."""
        print(f"[{datetime.now().isoformat()}] Starting deal hunt...")
        
        all_deals = []
        
        for item in self.watchlist:
            keyword = item["keyword"]
            result = search_products(q=keyword, limit=50)
            items = result.get("items", [])
            
            for product in items:
                deal = _evaluate_deal(product, min_discount=15, min_rating=3.5)
                if deal:
                    if item.get("target_price"):
                        if deal.current_price <= item["target_price"]:
                            all_deals.append(deal)
                    else:
                        all_deals.append(deal)
        
        general_deals = find_deals(max_price=150, min_discount=20)
        all_deals.extend(general_deals)
        
        all_deals.sort(key=lambda x: (x.deal_quality == "hot", x.discount_percent), 
                      reverse=True)
        
        unique_deals = []
        seen_ids = set()
        for d in all_deals:
            if d.product_id not in seen_ids:
                seen_ids.add(d.product_id)
                unique_deals.append(d)
                self._save_history(d)
        
        new_deals = self.alert_system.filter_new_deals(unique_deals)
        
        if new_deals:
            print(f"Found {len(new_deals)} new deals!")
            self.alert_system.send_alert(new_deals)
        
        return unique_deals
    
    def print_deals(self, deals: list[Deal]):
        """Print deals to console."""
        print(f"\n{'='*70}")
        print(f"🔥 DEAL HUNTER RESULTS - {datetime.now().strftime('%Y-%m-%d %H:%M')}")
        print(f"{'='*70}\n")
        
        hot = [d for d in deals if d.deal_quality == "hot"]
        good = [d for d in deals if d.deal_quality == "good"]
        
        if hot:
            print(f"🔥 HOT DEALS ({len(hot)} items, 30%+ off):\n")
            for d in hot[:5]:
                print(f"  {d.product_name[:50]}")
                print(f"    💰 SGD {d.current_price} (was SGD {d.original_price})")
                print(f"    ⭐ {d.rating} | 📦 {d.source} | {d.discount_percent}% off\n")
        
        if good:
            print(f"💪 GOOD DEALS ({len(good)} items, 15-30% off):\n")
            for d in good[:5]:
                print(f"  {d.product_name[:50]}")
                print(f"    💰 SGD {d.current_price} (was SGD {d.original_price})")
                print(f"    ⭐ {d.rating} | 📦 {d.source} | {d.discount_percent}% off\n")

Step 6: Run the Deal Hunter

if __name__ == "__main__":
    hunter = DealHunter(api_key=os.environ["BUYWHERE_API_KEY"])
    
    hunter.add_to_watchlist("laptop", target_price=800)
    hunter.add_to_watchlist("headphones", target_price=200)
    hunter.add_to_watchlist("smartwatch", category="Electronics")
    
    print("Deal Hunter started!")
    print("Checking deals every 4 hours...\n")
    
    def job():
        deals = hunter.hunt_deals()
        hunter.print_deals(deals)
    
    job()
    
    schedule.every(4).hours.do(job)
    
    while True:
        schedule.run_pending()
        time.sleep(60)

Step 7: Run the Deal Hunter

export BUYWHERE_API_KEY="bw_live_your_key_here"
python deal_hunter.py

Output:

Deal Hunter started!
Checking deals every 4 hours...

[2026-04-16T10:00:00] Starting deal hunt...
Found 8 new deals!
[2026-04-16T10:00:01] Sending alerts via 2 handlers

======================================================================
🔥 DEAL HUNTER RESULTS - 2026-04-16 10:00
======================================================================

🔥 HOT DEALS (3 items, 30%+ off):

  Sony WH-1000XM5 Wireless Headphones
    💰 SGD 299 (was SGD 449)
    ⭐ 4.8 | 📦 shopee_sg | 33.4% off

  Apple AirPods Pro (2nd Gen)
    💰 SGD 189 (was SGD 289)
    ⭐ 4.7 | 📦 lazada_sg | 34.6% off

💪 GOOD DEALS (5 items, 15-30% off):

  Samsung Galaxy Buds2 Pro
    💰 SGD 149 (was SGD 199)
    ⭐ 4.5 | 📦 shopee_sg | 25.1% off

Next Steps