#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Flask API Server for Kleinanzeigen Scraper Author: Hendrik Schutter Date: 2025/11/24 """ from flask import Flask, request, jsonify from flask_cors import CORS from bs4 import BeautifulSoup from datetime import datetime import urllib3 import random import requests import time import json import os import uuid app = Flask(__name__) CORS(app) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # ZIP code cache file CACHE_FILE = "zip_cache.json" zip_cache = {} # Active scrape sessions scrape_sessions = {} SESSION_TIMEOUT = 300 # seconds def cleanup_old_sessions(): current_time = time.time() sessions_to_remove = [] for session_id, session in scrape_sessions.items(): if current_time - session.get("created_at", current_time) > SESSION_TIMEOUT: sessions_to_remove.append(session_id) for session_id in sessions_to_remove: del scrape_sessions[session_id] print(f"Cleaned up old session: {session_id}") return len(sessions_to_remove) def get_random_user_agent(): """Generate random user agent string""" uastrings = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", ] return random.choice(uastrings) def make_soup(url): """Fetch URL and return BeautifulSoup object""" user_agent = {"user-agent": get_random_user_agent()} http = urllib3.PoolManager(10, headers=user_agent) r = http.request("GET", url) return BeautifulSoup(r.data, "lxml") def geocode_zip(zip_code): """Geocode ZIP code using Nominatim API with caching""" zip_code = str(zip_code) # Check cache first if zip_code in zip_cache: return zip_cache[zip_code] # Call Nominatim API url = "https://nominatim.openstreetmap.org/search" params = { "postalcode": zip_code, "country": "Germany", "format": "json", "limit": 1, } try: response = requests.get( url, params=params, headers={"user-agent": get_random_user_agent()} ) data = response.json() if data: coords = {"lat": float(data[0]["lat"]), "lon": float(data[0]["lon"])} zip_cache[zip_code] = coords # Save cache with open(CACHE_FILE, "w", encoding="utf-8") as f: json.dump(zip_cache, f, ensure_ascii=False, indent=2) time.sleep(1) # Respect API rate limits return coords except Exception as e: print(f"Geocoding error for {zip_code}: {e}") return None def search_listings(search_term, max_pages, min_price, max_price): """Search for listings on kleinanzeigen.de - returns only URLs""" base_url = "https://www.kleinanzeigen.de" found_listings = set() for page_counter in range(1, max_pages + 1): listing_url = ( base_url + "/s-anbieter:privat/anzeige:angebote/preis:" + str(min_price) + ":" + str(max_price) + "/seite:" + str(page_counter) + "/" + search_term.replace(" ", "-") + "/k0" ) print(f"Scraping page {page_counter}: {listing_url}") try: soup = make_soup(listing_url) results = soup.find_all("li", class_="ad-listitem fully-clickable-card") if len(results) <= 0: break for result in results: try: listing_url = result.a["href"] found_listings.add(base_url + listing_url) except (AttributeError, KeyError): pass except Exception as e: print(f"Error scraping page {page_counter}: {e}") break return list(found_listings) def scrape_listing(url): """Scrape individual listing details""" try: soup = make_soup(url) title = soup.find("h1", class_="boxedarticle--title") if not title: return None title = title.text.strip() price_elem = soup.find("h2", class_="boxedarticle--price") price = 0 if price_elem: price_text = price_elem.text.strip().split(" ")[0] try: price = int(price_text.replace(".", "").replace(",", "")) except: price = 0 flexlist = soup.find("ul", class_="flexlist text-light-800") listing_id = 0 if flexlist: flex_items = flexlist.find_all("li", recursive=False) if len(flex_items) > 1: try: listing_id = int(flex_items[1].text.strip()) except: pass locality = soup.find("span", id="viewad-locality") zip_code = None address = "" if locality: locality_text = locality.text.strip() parts = locality_text.split(" ", 1) if parts: zip_code = parts[0] if len(parts) > 1: address = parts[1] date_added = None details_divs = soup.find_all("div", class_="boxedarticle--details--full") if len(details_divs) > 1: date_span = details_divs[1].find("span") if date_span: try: date_added = datetime.strptime(date_span.text, "%d.%m.%Y") except: pass first_image = None img_elem = soup.find("div", class_="galleryimage-element current") if img_elem: img = img_elem.find("img") if img and img.get("src"): first_image = img["src"] if not zip_code: return None listing = { "title": title, "price": price, "id": listing_id, "zip_code": zip_code, "address": address, "date_added": date_added.isoformat() if date_added else None, "image": first_image, "url": url, } # Add coordinates coords = geocode_zip(zip_code) if coords and isinstance(coords, dict): listing["lat"] = coords.get("lat") listing["lon"] = coords.get("lon") return listing except Exception as e: print(f"Error scraping listing {url}: {e}") return None @app.route("/api/search", methods=["POST"]) def api_search(): """API endpoint for searching listings - returns only count and URLs""" data = request.json # Cleanup old sessions before creating new one cleanup_old_sessions() search_term = data.get("search_term", "") max_pages = data.get("max_pages", 1) min_price = data.get("min_price", 0) max_price = data.get("max_price", 10000) if not search_term: return jsonify({"error": "Search term is required"}), 400 try: # Search for listing URLs only listing_urls = search_listings(search_term, max_pages, min_price, max_price) # Create session ID session_id = str(uuid.uuid4()) # Store session with creation timestamp scrape_sessions[session_id] = { "urls": listing_urls, "total": len(listing_urls), "scraped": 0, "listings": [], "cancelled": False, "created_at": time.time(), } return jsonify({"session_id": session_id, "total": len(listing_urls)}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route("/api/scrape/", methods=["GET"]) def api_scrape(session_id): """API endpoint for scraping next listing in session""" # Cleanup old sessions on each request cleanup_old_sessions() if session_id not in scrape_sessions: return jsonify({"error": "Invalid session ID"}), 404 session = scrape_sessions[session_id] if session["cancelled"]: return jsonify({"cancelled": True}), 200 if session["scraped"] >= session["total"]: return jsonify({"complete": True, "listing": None}) # Scrape next listing url = session["urls"][session["scraped"]] listing = scrape_listing(url) if listing: session["listings"].append(listing) session["scraped"] += 1 return jsonify( { "complete": session["scraped"] >= session["total"], "listing": listing, "progress": {"current": session["scraped"], "total": session["total"]}, } ) @app.route("/api/scrape//cancel", methods=["POST"]) def api_cancel_scrape(session_id): """API endpoint to cancel scraping session""" cleanup_old_sessions() if session_id not in scrape_sessions: return jsonify({"error": "Invalid session ID"}), 404 scrape_sessions[session_id]["cancelled"] = True return jsonify( { "cancelled": True, "listings": scrape_sessions[session_id]["listings"], "total_scraped": len(scrape_sessions[session_id]["listings"]), } ) @app.route("/api/scrape//results", methods=["GET"]) def api_get_results(session_id): """API endpoint to get all scraped results""" cleanup_old_sessions() if session_id not in scrape_sessions: return jsonify({"error": "Invalid session ID"}), 404 session = scrape_sessions[session_id] return jsonify( { "listings": session["listings"], "total": len(session["listings"]), "progress": {"current": session["scraped"], "total": session["total"]}, } ) @app.route("/api/health", methods=["GET"]) def health(): """Health check endpoint""" cleanup_old_sessions() return jsonify( { "status": "ok", "cache_size": len(zip_cache), "active_sessions": len(scrape_sessions), } ) if __name__ == "__main__": print("Starting Kleinanzeigen Scraper API Server...") # Load cache on startup if os.path.exists(CACHE_FILE): with open(CACHE_FILE, "r", encoding="utf-8") as f: zip_cache = json.load(f) print(f"Loaded {len(zip_cache)} ZIP codes from cache") print("ZIP code cache loaded with", len(zip_cache), "entries") app.run(debug=True, host="0.0.0.0", port=5000)