#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Flask API Server for Kleinanzeigen Scraper Author: Hendrik Schutter Date: 2025/11/24 """ from flask import Flask, request, jsonify from flask_cors import CORS from bs4 import BeautifulSoup from datetime import datetime import urllib3 import random import requests import time import json import os import uuid import threading app = Flask(__name__) CORS(app) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Configuration CACHE_FILE = "zip_cache.json" SESSION_TIMEOUT = 300 # seconds LISTINGS_PER_PAGE = 25 # Global state zip_cache = {} scrape_sessions = {} app_start_time = time.time() # Metrics metrics = { "search_requests": 0, "scrape_requests": 0, "kleinanzeigen_response_codes": {}, "nominatim_response_codes": {}, } def cleanup_old_sessions(): """Remove sessions older than SESSION_TIMEOUT""" current_time = time.time() sessions_to_remove = [] for session_id, session in scrape_sessions.items(): if current_time - session.get("created_at", current_time) > SESSION_TIMEOUT: sessions_to_remove.append(session_id) for session_id in sessions_to_remove: del scrape_sessions[session_id] print(f"Cleaned up old session: {session_id}") return len(sessions_to_remove) def get_random_user_agent(): """Generate random user agent string""" uastrings = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", ] return random.choice(uastrings) def make_soup(url): """Fetch URL and return BeautifulSoup object""" user_agent = {"user-agent": get_random_user_agent()} http = urllib3.PoolManager(10, headers=user_agent) try: r = http.request("GET", url) # Track response code status_code = str(r.status) if "kleinanzeigen.de" in url: metrics["kleinanzeigen_response_codes"][status_code] = ( metrics["kleinanzeigen_response_codes"].get(status_code, 0) + 1 ) return BeautifulSoup(r.data, "lxml") except Exception as e: print(f"Error fetching {url}: {e}") if "kleinanzeigen.de" in url: metrics["kleinanzeigen_response_codes"]["error"] = ( metrics["kleinanzeigen_response_codes"].get("error", 0) + 1 ) raise def geocode_zip(zip_code): """Geocode ZIP code using Nominatim API with caching""" zip_code = str(zip_code) # Check cache first if zip_code in zip_cache: return zip_cache[zip_code] # Call Nominatim API url = "https://nominatim.openstreetmap.org/search" params = { "postalcode": zip_code, "country": "Germany", "format": "json", "limit": 1, } try: response = requests.get( url, params=params, headers={"user-agent": get_random_user_agent()} ) # Track response code status_code = str(response.status_code) metrics["nominatim_response_codes"][status_code] = ( metrics["nominatim_response_codes"].get(status_code, 0) + 1 ) data = response.json() if data: coords = {"lat": float(data[0]["lat"]), "lon": float(data[0]["lon"])} zip_cache[zip_code] = coords # Save cache with open(CACHE_FILE, "w", encoding="utf-8") as f: json.dump(zip_cache, f, ensure_ascii=False, indent=2) time.sleep(1) # Respect API rate limits return coords except Exception as e: print(f"Geocoding error for {zip_code}: {e}") metrics["nominatim_response_codes"]["error"] = ( metrics["nominatim_response_codes"].get("error", 0) + 1 ) return None def search_listings(search_term, max_pages, min_price, max_price): """Search for listings on kleinanzeigen.de - returns only URLs""" base_url = "https://www.kleinanzeigen.de" found_listings = set() for page_counter in range(1, max_pages + 1): listing_url = ( base_url + "/s-anbieter:privat/anzeige:angebote/preis:" + str(min_price) + ":" + str(max_price) + "/seite:" + str(page_counter) + "/" + search_term.replace(" ", "-") + "/k0" ) print(f"Scraping page {page_counter}: {listing_url}") try: soup = make_soup(listing_url) results = soup.find_all("li", class_="ad-listitem fully-clickable-card") if len(results) <= 0: break for result in results: try: listing_href = result.a["href"] found_listings.add(base_url + listing_href) except (AttributeError, KeyError): pass except Exception as e: print(f"Error scraping page {page_counter}: {e}") break return list(found_listings) def scrape_listing(url): """Scrape individual listing details""" try: soup = make_soup(url) metrics["scrape_requests"] += 1 title = soup.find("h1", class_="boxedarticle--title") if not title: return None title = title.text.strip() price_elem = soup.find("h2", class_="boxedarticle--price") price = 0 if price_elem: price_text = price_elem.text.strip().split(" ")[0] try: price = int(price_text.replace(".", "").replace(",", "")) except: price = 0 flexlist = soup.find("ul", class_="flexlist text-light-800") listing_id = 0 if flexlist: flex_items = flexlist.find_all("li", recursive=False) if len(flex_items) > 1: try: listing_id = int(flex_items[1].text.strip()) except: pass locality = soup.find("span", id="viewad-locality") zip_code = None address = "" if locality: locality_text = locality.text.strip() parts = locality_text.split(" ", 1) if parts: zip_code = parts[0] if len(parts) > 1: address = parts[1] date_added = None details_divs = soup.find_all("div", class_="boxedarticle--details--full") if len(details_divs) > 1: date_span = details_divs[1].find("span") if date_span: try: date_added = datetime.strptime(date_span.text, "%d.%m.%Y") except: pass first_image = None img_elem = soup.find("div", class_="galleryimage-element current") if img_elem: img = img_elem.find("img") if img and img.get("src"): first_image = img["src"] if not zip_code: return None listing = { "title": title, "price": price, "id": listing_id, "zip_code": zip_code, "address": address, "date_added": date_added.isoformat() if date_added else None, "image": first_image, "url": url, } # Add coordinates coords = geocode_zip(zip_code) if coords and isinstance(coords, dict): listing["lat"] = coords.get("lat") listing["lon"] = coords.get("lon") return listing except Exception as e: print(f"Error scraping listing {url}: {e}") return None def scrape_listing_wrapper(session_id, url, results, index): """Wrapper for scraping listing in thread""" session = scrape_sessions.get(session_id) if not session: return listing = scrape_listing(url) results[index] = listing def prefetch_listings_thread(session_id): """Background thread to prefetch all listings with parallel workers""" session = scrape_sessions.get(session_id) if not session: return urls = session["urls"] max_workers = random.randrange(2, 8) print( f"Starting prefetch for session {session_id} with {max_workers} parallel workers" ) for i in range(0, len(urls), max_workers): # Check if session was cancelled or deleted if ( session_id not in scrape_sessions or scrape_sessions[session_id]["cancelled"] ): print(f"Prefetch stopped for session {session_id}") return # Process batch of URLs in parallel batch = urls[i : i + max_workers] threads = [] results = [None] * len(batch) for j, url in enumerate(batch): thread = threading.Thread( target=scrape_listing_wrapper, args=(session_id, url, results, j), daemon=True, ) thread.start() threads.append(thread) # Wait for all threads in this batch to complete for thread in threads: thread.join() # Add results to session for listing in results: if listing: session["listings"].append(listing) session["scraped"] += len(batch) # Rate limiting between batches time.sleep(0.5) print(f"Prefetch complete for session {session_id}") @app.route("/api/search", methods=["POST"]) def api_search(): """API endpoint for searching listings - returns count and starts prefetch""" data = request.json metrics["search_requests"] += 1 # Cleanup old sessions before creating new one cleanup_old_sessions() search_term = data.get("search_term", "") num_listings = data.get("num_listings", 25) min_price = data.get("min_price", 0) max_price = data.get("max_price", 1000000000) if not search_term: return jsonify({"error": "Search term is required"}), 400 # Calculate pages needed max_pages = max(1, (num_listings + LISTINGS_PER_PAGE - 1) // LISTINGS_PER_PAGE) try: # Search for listing URLs only listing_urls = search_listings(search_term, max_pages, min_price, max_price) # Limit to requested number listing_urls = listing_urls[:num_listings] # Create session ID session_id = str(uuid.uuid4()) # Store session with creation timestamp scrape_sessions[session_id] = { "urls": listing_urls, "total": len(listing_urls), "scraped": 0, "listings": [], "cancelled": False, "created_at": time.time(), } # Start prefetch in background thread prefetch_thread = threading.Thread( target=prefetch_listings_thread, args=(session_id,), daemon=True ) prefetch_thread.start() return jsonify({"session_id": session_id, "total": len(listing_urls)}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/scrape/", methods=["GET"]) def api_scrape(session_id): """API endpoint to get next scraped listing from session""" cleanup_old_sessions() if session_id not in scrape_sessions: return jsonify({"error": "Invalid session ID"}), 404 session = scrape_sessions[session_id] if session["cancelled"]: return jsonify({"cancelled": True}), 200 # Wait briefly if no listings are ready yet wait_count = 0 while ( len(session["listings"]) == 0 and session["scraped"] < session["total"] and wait_count < 10 ): time.sleep(0.1) wait_count += 1 if len(session["listings"]) > 0: listing = session["listings"].pop(0) else: listing = None is_complete = ( session["scraped"] >= session["total"] and len(session["listings"]) == 0 ) return jsonify( { "complete": is_complete, "listing": listing, "progress": {"current": session["scraped"], "total": session["total"]}, } ) @app.route("/api/scrape//cancel", methods=["POST"]) def api_cancel_scrape(session_id): """API endpoint to cancel scraping session and delete cached listings""" cleanup_old_sessions() if session_id not in scrape_sessions: return jsonify({"error": "Invalid session ID"}), 404 # Delete session completely (including cached listings) del scrape_sessions[session_id] return jsonify({"cancelled": True, "message": "Session deleted"}) @app.route("/api/health", methods=["GET"]) def health(): """Health check endpoint""" return jsonify({"status": "ok"}) @app.route("/api/metrics", methods=["GET"]) def api_metrics(): """Prometheus-style metrics endpoint""" cleanup_old_sessions() uptime = time.time() - app_start_time # Build Prometheus text format lines = [] # Search requests lines.append("# HELP search_requests_total Total number of search requests") lines.append("# TYPE search_requests_total counter") lines.append(f"search_requests_total {metrics['search_requests']}") lines.append("") # Scrape requests lines.append("# HELP scrape_requests_total Total number of scrape requests") lines.append("# TYPE scrape_requests_total counter") lines.append(f"scrape_requests_total {metrics['scrape_requests']}") lines.append("") # Uptime lines.append("# HELP uptime_seconds Application uptime in seconds") lines.append("# TYPE uptime_seconds gauge") lines.append(f"uptime_seconds {uptime}") lines.append("") # Active sessions lines.append("# HELP active_sessions Number of active scraping sessions") lines.append("# TYPE active_sessions gauge") lines.append(f"active_sessions {len(scrape_sessions)}") lines.append("") # Cache size lines.append("# HELP cache_size Number of cached ZIP codes") lines.append("# TYPE cache_size gauge") lines.append(f"zip_code_cache_size {len(zip_cache)}") lines.append("") # Kleinanzeigen response codes lines.append( "# HELP kleinanzeigen_http_responses_total HTTP responses from kleinanzeigen.de" ) lines.append("# TYPE kleinanzeigen_http_responses_total counter") for code, count in metrics["kleinanzeigen_response_codes"].items(): lines.append(f'kleinanzeigen_http_responses_total{{code="{code}"}} {count}') lines.append("") # Nominatim response codes lines.append( "# HELP nominatim_http_responses_total HTTP responses from Nominatim API" ) lines.append("# TYPE nominatim_http_responses_total counter") for code, count in metrics["nominatim_response_codes"].items(): lines.append(f'nominatim_http_responses_total{{code="{code}"}} {count}') lines.append("") return ( "\n".join(lines), 200, {"Content-Type": "text/plain; version=0.0.4; charset=utf-8"}, ) if __name__ == "__main__": print("Starting Kleinanzeigen Scraper API Server...") # Load cache on startup if os.path.exists(CACHE_FILE): with open(CACHE_FILE, "r", encoding="utf-8") as f: zip_cache = json.load(f) print(f"Loaded {len(zip_cache)} ZIP codes from cache") app.run(debug=True, host="0.0.0.0", port=5000, threaded=True)