306 lines
10 KiB
Python
306 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from bs4 import BeautifulSoup
|
|
from datetime import datetime
|
|
import urllib3
|
|
import random
|
|
import requests
|
|
import time
|
|
import json
|
|
import os
|
|
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
class KleinanzeigenScraper:
|
|
def __init__(self, config, metrics=None):
|
|
"""
|
|
Initialize the scraper with configuration
|
|
|
|
Args:
|
|
config: Configuration dictionary
|
|
metrics: Optional metrics dictionary to track statistics
|
|
"""
|
|
self.config = config
|
|
self.metrics = metrics
|
|
self.zip_cache = {}
|
|
self.cache_file = config["cache"]["zip_cache_file"]
|
|
|
|
# Load cache
|
|
self._load_cache()
|
|
|
|
def _load_cache(self):
|
|
"""Load ZIP code cache from file"""
|
|
if os.path.exists(self.cache_file):
|
|
with open(self.cache_file, "r", encoding="utf-8") as f:
|
|
self.zip_cache = json.load(f)
|
|
print(f"Loaded {len(self.zip_cache)} ZIP codes from cache")
|
|
|
|
def _save_cache(self):
|
|
"""Save ZIP code cache to file"""
|
|
with open(self.cache_file, "w", encoding="utf-8") as f:
|
|
json.dump(self.zip_cache, f, ensure_ascii=False, indent=2)
|
|
|
|
def get_random_user_agent(self):
|
|
"""Generate random user agent string"""
|
|
return random.choice(self.config["user_agents"])
|
|
|
|
def make_soup(self, url):
|
|
"""Fetch URL and return BeautifulSoup object"""
|
|
user_agent = {"user-agent": self.get_random_user_agent()}
|
|
http = urllib3.PoolManager(10, headers=user_agent)
|
|
try:
|
|
r = http.request("GET", url)
|
|
# Track response code
|
|
if self.metrics is not None:
|
|
status_code = str(r.status)
|
|
if self.config["apis"]["kleinanzeigen"]["base_url"] in url:
|
|
self.metrics["kleinanzeigen_response_codes"][status_code] = (
|
|
self.metrics["kleinanzeigen_response_codes"].get(status_code, 0)
|
|
+ 1
|
|
)
|
|
return BeautifulSoup(r.data, "lxml")
|
|
except Exception as e:
|
|
print(f"Error fetching {url}: {e}")
|
|
if self.metrics is not None:
|
|
if self.config["apis"]["kleinanzeigen"]["base_url"] in url:
|
|
self.metrics["kleinanzeigen_response_codes"]["error"] = (
|
|
self.metrics["kleinanzeigen_response_codes"].get("error", 0) + 1
|
|
)
|
|
raise
|
|
|
|
def geocode_zip(self, zip_code):
|
|
"""Geocode ZIP code using Nominatim API with caching"""
|
|
zip_code = str(zip_code)
|
|
|
|
# Check cache first
|
|
if zip_code in self.zip_cache:
|
|
return self.zip_cache[zip_code]
|
|
|
|
# Call Nominatim API
|
|
url = self.config["apis"]["nominatim"]["url"]
|
|
params = {
|
|
"postalcode": zip_code,
|
|
"country": "Germany",
|
|
"format": "json",
|
|
"limit": 1,
|
|
}
|
|
|
|
try:
|
|
response = requests.get(
|
|
url,
|
|
params=params,
|
|
headers={"user-agent": self.config["apis"]["nominatim"]["user_agent"]},
|
|
)
|
|
|
|
# Track response code
|
|
if self.metrics is not None:
|
|
status_code = str(response.status_code)
|
|
self.metrics["nominatim_response_codes"][status_code] = (
|
|
self.metrics["nominatim_response_codes"].get(status_code, 0) + 1
|
|
)
|
|
|
|
data = response.json()
|
|
|
|
if data:
|
|
coords = {"lat": float(data[0]["lat"]), "lon": float(data[0]["lon"])}
|
|
self.zip_cache[zip_code] = coords
|
|
|
|
# Save cache
|
|
self._save_cache()
|
|
|
|
time.sleep(self.config["scraping"]["geocoding_delay"])
|
|
return coords
|
|
except Exception as e:
|
|
print(f"Geocoding error for {zip_code}: {e}")
|
|
if self.metrics is not None:
|
|
self.metrics["nominatim_response_codes"]["error"] = (
|
|
self.metrics["nominatim_response_codes"].get("error", 0) + 1
|
|
)
|
|
|
|
return None
|
|
|
|
def search_listings(self, search_term, max_pages, min_price, max_price):
|
|
"""Search for listings on kleinanzeigen.de - returns only URLs"""
|
|
base_url = self.config["apis"]["kleinanzeigen"]["base_url"]
|
|
found_listings = set()
|
|
|
|
for page_counter in range(1, max_pages + 1):
|
|
listing_url = (
|
|
base_url
|
|
+ "/s-anbieter:privat/anzeige:angebote/preis:"
|
|
+ str(min_price)
|
|
+ ":"
|
|
+ str(max_price)
|
|
+ "/seite:"
|
|
+ str(page_counter)
|
|
+ "/"
|
|
+ search_term.replace(" ", "-")
|
|
+ "/k0"
|
|
)
|
|
|
|
print(f"Scraping page {page_counter}: {listing_url}")
|
|
|
|
try:
|
|
soup = self.make_soup(listing_url)
|
|
results = soup.find_all("li", class_="ad-listitem fully-clickable-card")
|
|
|
|
if len(results) <= 0:
|
|
break
|
|
|
|
for result in results:
|
|
try:
|
|
listing_href = result.a["href"]
|
|
found_listings.add(base_url + listing_href)
|
|
except (AttributeError, KeyError):
|
|
pass
|
|
except Exception as e:
|
|
print(f"Error scraping page {page_counter}: {e}")
|
|
break
|
|
|
|
return list(found_listings)
|
|
|
|
def scrape_listing(self, url):
|
|
"""Scrape individual listing details"""
|
|
try:
|
|
soup = self.make_soup(url)
|
|
if self.metrics is not None:
|
|
self.metrics["scrape_requests"] += 1
|
|
|
|
title = soup.find("h1", class_="boxedarticle--title")
|
|
if not title:
|
|
return None
|
|
title = title.text.strip()
|
|
|
|
price_elem = soup.find("h2", class_="boxedarticle--price")
|
|
price = 0
|
|
if price_elem:
|
|
price_text = price_elem.text.strip().split(" ")[0]
|
|
try:
|
|
price = int(price_text.replace(".", "").replace(",", ""))
|
|
except:
|
|
price = 0
|
|
|
|
flexlist = soup.find("ul", class_="flexlist text-light-800")
|
|
listing_id = 0
|
|
if flexlist:
|
|
flex_items = flexlist.find_all("li", recursive=False)
|
|
if len(flex_items) > 1:
|
|
try:
|
|
listing_id = int(flex_items[1].text.strip())
|
|
except:
|
|
pass
|
|
|
|
locality = soup.find("span", id="viewad-locality")
|
|
zip_code = None
|
|
address = ""
|
|
if locality:
|
|
locality_text = locality.text.strip()
|
|
parts = locality_text.split(" ", 1)
|
|
if parts:
|
|
zip_code = parts[0]
|
|
if len(parts) > 1:
|
|
address = parts[1]
|
|
|
|
date_added = None
|
|
details_divs = soup.find_all("div", class_="boxedarticle--details--full")
|
|
if len(details_divs) > 1:
|
|
date_span = details_divs[1].find("span")
|
|
if date_span:
|
|
try:
|
|
date_added = datetime.strptime(date_span.text, "%d.%m.%Y")
|
|
except:
|
|
pass
|
|
|
|
first_image = None
|
|
img_elem = soup.find("div", class_="galleryimage-element current")
|
|
if img_elem:
|
|
img = img_elem.find("img")
|
|
if img and img.get("src"):
|
|
first_image = img["src"]
|
|
|
|
if not zip_code:
|
|
return None
|
|
|
|
listing = {
|
|
"title": title,
|
|
"price": price,
|
|
"id": listing_id,
|
|
"zip_code": zip_code,
|
|
"address": address,
|
|
"date_added": date_added.isoformat() if date_added else None,
|
|
"image": first_image,
|
|
"url": url,
|
|
}
|
|
|
|
# Add coordinates
|
|
coords = self.geocode_zip(zip_code)
|
|
if coords and isinstance(coords, dict):
|
|
listing["lat"] = coords.get("lat")
|
|
listing["lon"] = coords.get("lon")
|
|
|
|
return listing
|
|
|
|
except Exception as e:
|
|
print(f"Error scraping listing {url}: {e}")
|
|
return None
|
|
|
|
|
|
def main():
|
|
"""Test the scraper functionality"""
|
|
print("=== Kleinanzeigen Scraper Test ===\n")
|
|
|
|
# Load config
|
|
config_file = "config.json"
|
|
if not os.path.exists(config_file):
|
|
print(f"ERROR: {config_file} not found!")
|
|
return
|
|
|
|
with open(config_file, "r", encoding="utf-8") as f:
|
|
config = json.load(f)
|
|
|
|
# Initialize scraper
|
|
scraper = KleinanzeigenScraper(config)
|
|
|
|
# Test parameters
|
|
search_term = "Fahrrad"
|
|
max_pages = 1
|
|
min_price = 100
|
|
max_price = 500
|
|
|
|
print(f"Searching for: {search_term}")
|
|
print(f"Price range: {min_price}€ - {max_price}€")
|
|
print(f"Max pages: {max_pages}\n")
|
|
|
|
# Search for listings
|
|
print("Step 1: Searching for listing URLs...")
|
|
listing_urls = scraper.search_listings(search_term, max_pages, min_price, max_price)
|
|
print(f"Found {len(listing_urls)} listings\n")
|
|
|
|
if len(listing_urls) > 0:
|
|
# Scrape first listing as test
|
|
print("Step 2: Scraping first listing details...")
|
|
test_url = listing_urls[0]
|
|
print(f"URL: {test_url}")
|
|
|
|
listing = scraper.scrape_listing(test_url)
|
|
|
|
if listing:
|
|
print("\nListing details:")
|
|
print(f" Title: {listing['title']}")
|
|
print(f" Price: €{listing['price']}")
|
|
print(f" ID: {listing['id']}")
|
|
print(f" Location: {listing['address']} ({listing['zip_code']})")
|
|
print(f" Date: {listing['date_added']}")
|
|
print(f" Coordinates: {listing.get('lat')}, {listing.get('lon')}")
|
|
print(f" Image: {listing['image']}")
|
|
else:
|
|
print("Failed to scrape listing")
|
|
|
|
print(f"\n=== Test completed ===")
|
|
print(f"ZIP cache size: {len(scraper.zip_cache)}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|