#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Flask API Server for Kleinanzeigen Scraper Author: Hendrik Schutter Date: 2025/11/24 """ from flask import Flask, request, jsonify from flask_cors import CORS from bs4 import BeautifulSoup from datetime import datetime import urllib3 import random import requests import time import json import os import uuid import threading app = Flask(__name__) CORS(app) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Configuration CACHE_FILE = "zip_cache.json" SESSION_TIMEOUT = 300 # seconds LISTINGS_PER_PAGE = 25 # Global state zip_cache = {} scrape_sessions = {} app_start_time = time.time() # Metrics metrics = { "search_requests": 0, "scrape_requests": 0, "kleinanzeigen_response_codes": {}, "nominatim_response_codes": {}, } def cleanup_old_sessions(): """Remove sessions older than SESSION_TIMEOUT""" current_time = time.time() sessions_to_remove = [] for session_id, session in scrape_sessions.items(): if current_time - session.get("created_at", current_time) > SESSION_TIMEOUT: sessions_to_remove.append(session_id) for session_id in sessions_to_remove: del scrape_sessions[session_id] print(f"Cleaned up old session: {session_id}") return len(sessions_to_remove) def get_random_user_agent(): """Generate random user agent string""" uastrings = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", ] return random.choice(uastrings) def make_soup(url): """Fetch URL and return BeautifulSoup object""" user_agent = {"user-agent": get_random_user_agent()} http = urllib3.PoolManager(10, headers=user_agent) try: r = http.request("GET", url) # Track response code status_code = str(r.status) if "kleinanzeigen.de" in url: metrics["kleinanzeigen_response_codes"][status_code] = ( metrics["kleinanzeigen_response_codes"].get(status_code, 0) + 1 ) return BeautifulSoup(r.data, "lxml") except Exception as e: print(f"Error fetching {url}: {e}") if "kleinanzeigen.de" in url: metrics["kleinanzeigen_response_codes"]["error"] = ( metrics["kleinanzeigen_response_codes"].get("error", 0) + 1 ) raise def geocode_zip(zip_code): """Geocode ZIP code using Nominatim API with caching""" zip_code = str(zip_code) # Check cache first if zip_code in zip_cache: return zip_cache[zip_code] # Call Nominatim API url = "https://nominatim.openstreetmap.org/search" params = { "postalcode": zip_code, "country": "Germany", "format": "json", "limit": 1, } try: response = requests.get( url, params=params, headers={"user-agent": get_random_user_agent()} ) # Track response code status_code = str(response.status_code) metrics["nominatim_response_codes"][status_code] = ( metrics["nominatim_response_codes"].get(status_code, 0) + 1 ) data = response.json() if data: coords = {"lat": float(data[0]["lat"]), "lon": float(data[0]["lon"])} zip_cache[zip_code] = coords # Save cache with open(CACHE_FILE, "w", encoding="utf-8") as f: json.dump(zip_cache, f, ensure_ascii=False, indent=2) time.sleep(1) # Respect API rate limits return coords except Exception as e: print(f"Geocoding error for {zip_code}: {e}") metrics["nominatim_response_codes"]["error"] = ( metrics["nominatim_response_codes"].get("error", 0) + 1 ) return None def search_listings(search_term, max_pages, min_price, max_price): """Search for listings on kleinanzeigen.de - returns only URLs""" base_url = "https://www.kleinanzeigen.de" found_listings = set() for page_counter in range(1, max_pages + 1): listing_url = ( base_url + "/s-anbieter:privat/anzeige:angebote/preis:" + str(min_price) + ":" + str(max_price) + "/seite:" + str(page_counter) + "/" + search_term.replace(" ", "-") + "/k0" ) print(f"Scraping page {page_counter}: {listing_url}") try: soup = make_soup(listing_url) results = soup.find_all("li", class_="ad-listitem fully-clickable-card") if len(results) <= 0: break for result in results: try: listing_href = result.a["href"] found_listings.add(base_url + listing_href) except (AttributeError, KeyError): pass except Exception as e: print(f"Error scraping page {page_counter}: {e}") break return list(found_listings) def scrape_listing(url): """Scrape individual listing details""" try: soup = make_soup(url) metrics["scrape_requests"] += 1 title = soup.find("h1", class_="boxedarticle--title") if not title: return None title = title.text.strip() price_elem = soup.find("h2", class_="boxedarticle--price") price = 0 if price_elem: price_text = price_elem.text.strip().split(" ")[0] try: price = int(price_text.replace(".", "").replace(",", "")) except: price = 0 flexlist = soup.find("ul", class_="flexlist text-light-800") listing_id = 0 if flexlist: flex_items = flexlist.find_all("li", recursive=False) if len(flex_items) > 1: try: listing_id = int(flex_items[1].text.strip()) except: pass locality = soup.find("span", id="viewad-locality") zip_code = None address = "" if locality: locality_text = locality.text.strip() parts = locality_text.split(" ", 1) if parts: zip_code = parts[0] if len(parts) > 1: address = parts[1] date_added = None details_divs = soup.find_all("div", class_="boxedarticle--details--full") if len(details_divs) > 1: date_span = details_divs[1].find("span") if date_span: try: date_added = datetime.strptime(date_span.text, "%d.%m.%Y") except: pass first_image = None img_elem = soup.find("div", class_="galleryimage-element current") if img_elem: img = img_elem.find("img") if img and img.get("src"): first_image = img["src"] if not zip_code: return None listing = { "title": title, "price": price, "id": listing_id, "zip_code": zip_code, "address": address, "date_added": date_added.isoformat() if date_added else None, "image": first_image, "url": url, } # Add coordinates coords = geocode_zip(zip_code) if coords and isinstance(coords, dict): listing["lat"] = coords.get("lat") listing["lon"] = coords.get("lon") return listing except Exception as e: print(f"Error scraping listing {url}: {e}") return None def prefetch_listings_thread(session_id): """Background thread to prefetch all listings""" session = scrape_sessions.get(session_id) if not session: return print(f"Starting prefetch for session {session_id}") for i, url in enumerate(session["urls"]): # Check if session was cancelled or deleted if ( session_id not in scrape_sessions or scrape_sessions[session_id]["cancelled"] ): print(f"Prefetch stopped for session {session_id}") return listing = scrape_listing(url) if listing: session["listings"].append(listing) session["scraped"] += 1 time.sleep(0.3) # Rate limiting print( f"Prefetch complete for session {session_id}: {len(session['listings'])} listings" ) @app.route("/api/search", methods=["POST"]) def api_search(): """API endpoint for searching listings - returns count and starts prefetch""" data = request.json metrics["search_requests"] += 1 # Cleanup old sessions before creating new one cleanup_old_sessions() search_term = data.get("search_term", "") num_listings = data.get("num_listings", 25) min_price = data.get("min_price", 0) max_price = data.get("max_price", 1000000000) if not search_term: return jsonify({"error": "Search term is required"}), 400 # Calculate pages needed max_pages = max(1, (num_listings + LISTINGS_PER_PAGE - 1) // LISTINGS_PER_PAGE) try: # Search for listing URLs only listing_urls = search_listings(search_term, max_pages, min_price, max_price) # Limit to requested number listing_urls = listing_urls[:num_listings] # Create session ID session_id = str(uuid.uuid4()) # Store session with creation timestamp scrape_sessions[session_id] = { "urls": listing_urls, "total": len(listing_urls), "scraped": 0, "listings": [], "cancelled": False, "created_at": time.time(), } # Start prefetch in background thread prefetch_thread = threading.Thread( target=prefetch_listings_thread, args=(session_id,), daemon=True ) prefetch_thread.start() return jsonify({"session_id": session_id, "total": len(listing_urls)}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/scrape/", methods=["GET"]) def api_scrape(session_id): """API endpoint to get next scraped listing from session""" cleanup_old_sessions() if session_id not in scrape_sessions: return jsonify({"error": "Invalid session ID"}), 404 session = scrape_sessions[session_id] if session["cancelled"]: return jsonify({"cancelled": True}), 200 # Wait briefly if no listings are ready yet wait_count = 0 while ( len(session["listings"]) == 0 and session["scraped"] < session["total"] and wait_count < 10 ): time.sleep(0.1) wait_count += 1 if len(session["listings"]) > 0: listing = session["listings"].pop(0) else: listing = None is_complete = ( session["scraped"] >= session["total"] and len(session["listings"]) == 0 ) return jsonify( { "complete": is_complete, "listing": listing, "progress": {"current": session["scraped"], "total": session["total"]}, } ) @app.route("/api/scrape//cancel", methods=["POST"]) def api_cancel_scrape(session_id): """API endpoint to cancel scraping session and delete cached listings""" cleanup_old_sessions() if session_id not in scrape_sessions: return jsonify({"error": "Invalid session ID"}), 404 # Delete session completely (including cached listings) del scrape_sessions[session_id] return jsonify({"cancelled": True, "message": "Session deleted"}) @app.route("/api/health", methods=["GET"]) def health(): """Health check endpoint""" return jsonify({"status": "ok"}) @app.route("/api/metrics", methods=["GET"]) def api_metrics(): """Prometheus-style metrics endpoint""" cleanup_old_sessions() uptime = time.time() - app_start_time return jsonify( { "search_requests_total": metrics["search_requests"], "scrape_requests_total": metrics["scrape_requests"], "uptime_seconds": uptime, "kleinanzeigen_response_codes": metrics["kleinanzeigen_response_codes"], "nominatim_response_codes": metrics["nominatim_response_codes"], "active_sessions": len(scrape_sessions), "cache_size": len(zip_cache), } ) if __name__ == "__main__": print("Starting Kleinanzeigen Scraper API Server...") # Load cache on startup if os.path.exists(CACHE_FILE): with open(CACHE_FILE, "r", encoding="utf-8") as f: zip_cache = json.load(f) print(f"Loaded {len(zip_cache)} ZIP codes from cache") app.run(debug=True, host="0.0.0.0", port=5000, threaded=True)