#!/usr/bin/env python3 # -*- coding: utf-8 -*- from flask import Flask, request, jsonify from flask_cors import CORS import time import json import os import uuid import threading import random from kleinanzeigen_scrape import KleinanzeigenScraper app = Flask(__name__) CORS(app) # Load configuration CONFIG_FILE = "config.json" config = {} if os.path.exists(CONFIG_FILE): with open(CONFIG_FILE, "r", encoding="utf-8") as f: config = json.load(f) else: print(f"ERROR: {CONFIG_FILE} not found!") exit(1) # Configuration values LISTINGS_PER_PAGE = config["scraping"]["listings_per_page"] MAX_WORKERS = config["scraping"]["max_workers"] MIN_WORKERS = config["scraping"]["min_workers"] RATE_LIMIT_DELAY = config["scraping"]["rate_limit_delay"] SERVER_HOST = config["server"]["host"] SERVER_PORT = config["server"]["port"] SERVER_DEBUG = config["server"]["debug"] # Global state scrape_sessions = {} app_start_time = time.time() # Metrics metrics = { "search_requests": 0, "scrape_requests": 0, "kleinanzeigen_response_codes": {}, "nominatim_response_codes": {}, } # Initialize scraper scraper = KleinanzeigenScraper(config, metrics) def cleanup_old_sessions(): """Remove sessions older than SESSION_TIMEOUT""" session_timeout = config["scraping"]["session_timeout"] current_time = time.time() sessions_to_remove = [] for session_id, session in scrape_sessions.items(): if current_time - session.get("created_at", current_time) > session_timeout: sessions_to_remove.append(session_id) for session_id in sessions_to_remove: del scrape_sessions[session_id] print(f"Cleaned up old session: {session_id}") return len(sessions_to_remove) def scrape_listing_wrapper(session_id, url, results, index): """Wrapper for scraping listing in thread""" session = scrape_sessions.get(session_id) if not session: return listing = scraper.scrape_listing(url) results[index] = listing def prefetch_listings_thread(session_id): """Background thread to prefetch all listings with parallel workers""" session = scrape_sessions.get(session_id) if not session: return urls = session["urls"] workers = random.randrange(MIN_WORKERS, MAX_WORKERS) print(f"Starting prefetch for session {session_id} with {workers} parallel workers") for i in range(0, len(urls), workers): # Check if session was cancelled or deleted if ( session_id not in scrape_sessions or scrape_sessions[session_id]["cancelled"] ): print(f"Prefetch stopped for session {session_id}") return # Process batch of URLs in parallel batch = urls[i : i + workers] threads = [] results = [None] * len(batch) for j, url in enumerate(batch): thread = threading.Thread( target=scrape_listing_wrapper, args=(session_id, url, results, j), daemon=True, ) thread.start() threads.append(thread) # Wait for all threads in this batch to complete for thread in threads: thread.join() # Add results to session for listing in results: if listing: session["listings"].append(listing) session["scraped"] += len(batch) # Rate limiting between batches time.sleep(RATE_LIMIT_DELAY) print(f"Prefetch complete for session {session_id}") @app.route("/api/search", methods=["POST"]) def api_search(): """API endpoint for searching listings - returns count and starts prefetch""" data = request.json metrics["search_requests"] += 1 # Cleanup old sessions before creating new one cleanup_old_sessions() search_term = data.get("search_term", "") num_listings = data.get("num_listings", 25) min_price = data.get("min_price", 0) max_price = data.get("max_price", 1000000000) if not search_term: return jsonify({"error": "Search term is required"}), 400 # Calculate pages needed max_pages = max(1, (num_listings + LISTINGS_PER_PAGE - 1) // LISTINGS_PER_PAGE) try: # Search for listing URLs only listing_urls = scraper.search_listings( search_term, max_pages, min_price, max_price ) # Limit to requested number listing_urls = listing_urls[:num_listings] # Create session ID session_id = str(uuid.uuid4()) # Store session with creation timestamp scrape_sessions[session_id] = { "urls": listing_urls, "total": len(listing_urls), "scraped": 0, "listings": [], "cancelled": False, "created_at": time.time(), } # Start prefetch in background thread prefetch_thread = threading.Thread( target=prefetch_listings_thread, args=(session_id,), daemon=True ) prefetch_thread.start() return jsonify({"session_id": session_id, "total": len(listing_urls)}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/scrape/", methods=["GET"]) def api_scrape(session_id): """API endpoint to get next scraped listing from session""" cleanup_old_sessions() if session_id not in scrape_sessions: return jsonify({"error": "Invalid session ID"}), 404 session = scrape_sessions[session_id] if session["cancelled"]: return jsonify({"cancelled": True}), 200 # Wait briefly if no listings are ready yet wait_count = 0 while ( len(session["listings"]) == 0 and session["scraped"] < session["total"] and wait_count < 10 ): time.sleep(0.1) wait_count += 1 if len(session["listings"]) > 0: listing = session["listings"].pop(0) else: listing = None is_complete = ( session["scraped"] >= session["total"] and len(session["listings"]) == 0 ) return jsonify( { "complete": is_complete, "listing": listing, "progress": {"current": session["scraped"], "total": session["total"]}, } ) @app.route("/api/scrape//cancel", methods=["POST"]) def api_cancel_scrape(session_id): """API endpoint to cancel scraping session and delete cached listings""" cleanup_old_sessions() if session_id not in scrape_sessions: return jsonify({"error": "Invalid session ID"}), 404 # Delete session completely (including cached listings) del scrape_sessions[session_id] return jsonify({"cancelled": True, "message": "Session deleted"}) @app.route("/api/health", methods=["GET"]) def health(): """Health check endpoint""" return jsonify({"status": "ok"}) @app.route("/api/metrics", methods=["GET"]) def api_metrics(): """Prometheus-style metrics endpoint""" uptime = time.time() - app_start_time # Build Prometheus text format lines = [] # Search requests lines.append("# HELP search_requests_total Total number of search requests") lines.append("# TYPE search_requests_total counter") lines.append(f"search_requests_total {metrics['search_requests']}") lines.append("") # Scrape requests lines.append("# HELP scrape_requests_total Total number of scrape requests") lines.append("# TYPE scrape_requests_total counter") lines.append(f"scrape_requests_total {metrics['scrape_requests']}") lines.append("") # Uptime lines.append("# HELP uptime_seconds Application uptime in seconds") lines.append("# TYPE uptime_seconds gauge") lines.append(f"uptime_seconds {uptime}") lines.append("") # Active sessions lines.append("# HELP active_sessions Number of active scraping sessions") lines.append("# TYPE active_sessions gauge") lines.append(f"active_sessions {len(scrape_sessions)}") lines.append("") # Cache size lines.append("# HELP cache_size Number of cached ZIP codes") lines.append("# TYPE cache_size gauge") lines.append(f"zip_code_cache_size {len(scraper.zip_cache)}") lines.append("") # Kleinanzeigen response codes lines.append( "# HELP kleinanzeigen_http_responses_total HTTP responses from kleinanzeigen.de" ) lines.append("# TYPE kleinanzeigen_http_responses_total counter") for code, count in metrics["kleinanzeigen_response_codes"].items(): lines.append(f'kleinanzeigen_http_responses_total{{code="{code}"}} {count}') lines.append("") # Nominatim response codes lines.append( "# HELP nominatim_http_responses_total HTTP responses from Nominatim API" ) lines.append("# TYPE nominatim_http_responses_total counter") for code, count in metrics["nominatim_response_codes"].items(): lines.append(f'nominatim_http_responses_total{{code="{code}"}} {count}') lines.append("") return ( "\n".join(lines), 200, {"Content-Type": "text/plain; version=0.0.4; charset=utf-8"}, ) if __name__ == "__main__": print("Starting Kleinanzeigen Scraper API Server...") print(f"ZIP code cache loaded with {len(scraper.zip_cache)} entries") app.run(debug=SERVER_DEBUG, host=SERVER_HOST, port=SERVER_PORT, threaded=True)