import requests from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry import json import logging from typing import List, Dict, Optional, Any from bs4 import BeautifulSoup import re import os from pathlib import Path import hashlib from urllib.parse import urlparse import time import random from selenium import webdriver from selenium.webdriver.chrome.options import Options import base64 import urllib.request from PIL import Image import io # Конфигурация CONFIG = { "BASE_URL": "https://www.euro.com.pl/rest/api/products/search", "HEADERS": { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", "Accept": "application/json, text/plain, */*", "Accept-Language": "ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7", "Origin": "https://www.euro.com.pl", "Referer": "https://www.euro.com.pl/", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", }, "CONNECT_TIMEOUT": 10, "READ_TIMEOUT": 30, "ITEMS_PER_PAGE": 24, "IMAGE_DIRS": { "products": "images/products", "descriptions": "images/descriptions", }, "IMAGE_RETRY_COUNT": 3, "IMAGE_RETRY_DELAY": 2, # секунды между попытками } # Настройка логирования logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s: %(message)s" ) logger = logging.getLogger(__name__) def create_retry_session(retries=3, backoff_factor=0.3): """Создание сессии с повторными запросами""" retry_strategy = Retry( total=retries, backoff_factor=backoff_factor, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["GET", "POST"], ) adapter = HTTPAdapter(max_retries=retry_strategy) session = requests.Session() session.mount("https://", adapter) # Устанавливаем таймауты по умолчанию для сессии session.timeout = (CONFIG["CONNECT_TIMEOUT"], CONFIG["READ_TIMEOUT"]) return session def ensure_directory(directory: str) -> bool: """Создает директорию, если она не существует""" try: path = Path(directory) path.mkdir(parents=True, exist_ok=True) return path.exists() and path.is_dir() except Exception as e: logger.error(f"Ошибка создания директории {directory}: {str(e)}") return False def get_file_extension(url: str) -> str: """Получает расширение файла из URL""" parsed = urlparse(url) path = parsed.path return os.path.splitext(path)[1].lower() or ".jpg" def setup_selenium(): """Настройка Selenium WebDriver""" chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") # Добавляем те же заголовки, что использовали ранее chrome_options.add_argument( "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36" ) chrome_options.add_argument("accept-language=pl-PL,pl;q=0.9,en-US;q=0.8,en;q=0.7") return webdriver.Chrome(options=chrome_options) def download_image_selenium( driver: webdriver.Chrome, url: str, save_dir: str, prefix: str = "" ) -> Optional[str]: """ Скачивает изображение используя Selenium """ logger.info(f"Начинаем скачивание изображения через Selenium: {url}") logger.info(f"Директория для сохранения: {save_dir}") if not ensure_directory(save_dir): logger.error(f"Не удалось создать или получить доступ к директории {save_dir}") return None try: if not url or url.isspace(): return None url_hash = hashlib.md5(url.encode()).hexdigest() extension = get_file_extension(url) filename = ( f"{prefix}_{url_hash}{extension}" if prefix else f"{url_hash}{extension}" ) filepath = os.path.join(save_dir, filename) logger.info(f"Сгенерирован путь для сохранения: {filepath}") if os.path.exists(filepath) and os.path.getsize(filepath) > 0: logger.info(f"Файл уже существует: {filepath}") return filepath # Получаем изображение через Selenium driver.get(url) # Получаем изображение как base64 img_base64 = driver.execute_async_script( """ var url = arguments[0]; var callback = arguments[1]; var xhr = new XMLHttpRequest(); xhr.responseType = 'blob'; xhr.onload = function() { var reader = new FileReader(); reader.onloadend = function() { callback(reader.result); }; reader.readAsDataURL(xhr.response); }; xhr.open('GET', url); xhr.send(); """, url, ) if img_base64.startswith("data:image"): # Убираем префикс data URL img_base64 = img_base64.split(",")[1] # Декодируем base64 в бинарные данные img_data = base64.b64decode(img_base64) # Сохраняем изображение with open(filepath, "wb") as f: f.write(img_data) file_size = os.path.getsize(filepath) logger.info(f"Размер сохраненного файла: {file_size} байт") if file_size == 0: raise ValueError("Сохранен пустой файл") logger.info(f"Успешно скачано и сохранено изображение: {filepath}") return filepath else: raise ValueError("Не удалось получить изображение как base64") except Exception as e: logger.error( f"Ошибка при скачивании изображения через Selenium {url}: {str(e)}" ) if os.path.exists(filepath): try: os.remove(filepath) except: pass return None def download_image( session: requests.Session, url: str, save_dir: str, prefix: str = "" ) -> Optional[str]: """ Скачивает изображение и сохраняет его с повторными попытками """ logger.info(f"Начинаем скачивание изображения {url}") logger.info(f"Директория для сохранения: {save_dir}") if not ensure_directory(save_dir): logger.error(f"Не удалось создать или получить доступ к директории {save_dir}") return None try: if not url or url.isspace(): return None url_hash = hashlib.md5(url.encode()).hexdigest() extension = get_file_extension(url) filename = ( f"{prefix}_{url_hash}{extension}" if prefix else f"{url_hash}{extension}" ) filepath = os.path.join(save_dir, filename) logger.info(f"Сгенерирован путь для сохранения: {filepath}") if os.path.exists(filepath) and os.path.getsize(filepath) > 0: logger.info(f"Файл уже существует: {filepath}") return filepath # Улучшенные заголовки image_headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", "Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8", "Accept-Language": "pl-PL,pl;q=0.9,en-US;q=0.8,en;q=0.7", "Accept-Encoding": "gzip, deflate, br", "Referer": "https://www.euro.com.pl/", "Connection": "keep-alive", "Sec-Fetch-Dest": "image", "Sec-Fetch-Mode": "no-cors", "Sec-Fetch-Site": "cross-site", "Pragma": "no-cache", "Cache-Control": "no-cache", } for attempt in range(CONFIG["IMAGE_RETRY_COUNT"]): try: # Случайная задержка перед запросом time.sleep(random.uniform(1, 3)) logger.info(f"Попытка {attempt + 1} скачать изображение") # Используем stream для скачивания with session.get( url, headers=image_headers, timeout=(CONFIG["CONNECT_TIMEOUT"], CONFIG["READ_TIMEOUT"]), stream=True, ) as response: response.raise_for_status() logger.info(f"Получен ответ. Статус: {response.status_code}") logger.info(f"Content-Type: {response.headers.get('Content-Type')}") # Записываем файл порциями with open(filepath, "wb") as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) file_size = os.path.getsize(filepath) logger.info(f"Размер сохраненного файла: {file_size} байт") if file_size == 0: raise ValueError("Сохранен пустой файл") logger.info(f"Успешно скачано и сохранено изображение: {filepath}") return filepath except Exception as e: logger.warning( f"Попытка {attempt + 1} скачать {url} не удалась: {str(e)}" ) if attempt < CONFIG["IMAGE_RETRY_COUNT"] - 1: # Увеличенная случайная задержка между попытками time.sleep(random.uniform(3, 7)) else: logger.error(f"Все попытки скачать изображение {url} не удались") if os.path.exists(filepath): try: os.remove(filepath) except: pass return None except Exception as e: logger.error(f"Критическая ошибка при скачивании изображения {url}: {str(e)}") return None def get_product_description(session: requests.Session, plu: str) -> Optional[str]: """Получает описание товара по его PLU""" url = f"https://www.euro.com.pl/rest/api/products/{plu}/promo-pages" try: logger.info(f"Запрос описания: {url}") response = session.get( url, headers=CONFIG["HEADERS"], timeout=(CONFIG["CONNECT_TIMEOUT"], CONFIG["READ_TIMEOUT"]), ) response.raise_for_status() data = response.json() description = data.get("marketingDescription") logger.info("Описание получено" if description else "Описание отсутствует") return description except requests.RequestException as e: logger.error(f"Ошибка получения описания (PLU: {plu}): {e}") return None def extract_product_info( product: Dict, session: requests.Session, driver: webdriver.Chrome ) -> Dict[str, Any]: """Извлечение структурированной информации о товаре""" promotional_price = product["prices"]["promotionalPrice"] identifiers = product.get("identifiers", {}) delivery = product.get("deliveryAvailability") or {} shop_delivery = (delivery.get("shopDeliveryAvailability") or {}).get("code") home_delivery = (delivery.get("homeDeliveryAvailability") or {}).get("code") ensure_directory(CONFIG["IMAGE_DIRS"]["products"]) product_images = [] for img in product["images"]: if img["type"] == "BIG_PHOTO": image_path = download_image_selenium( driver, img["url"], CONFIG["IMAGE_DIRS"]["products"], f"product_{identifiers.get('plu', 'unknown')}", ) if image_path: product_images.append({"url": img["url"], "local_path": image_path}) return { "plu": identifiers.get("plu"), "name": product["name"], "url": f"https://www.euro.com.pl/{identifiers.get('productGroupLinkName', '')}/{identifiers.get('productLinkName', '')}", "prices": { "mainPrice": product["prices"]["mainPrice"], "promotionalPrice": ( promotional_price["price"] if promotional_price else None ), }, "attributes": [ {"name": attr["name"], "value": [v["name"] for v in attr["value"]]} for base_attr in product["baseAttributes"] for attr in base_attr["attributes"] ], "images": product_images, "availability": { "shop": shop_delivery, "home": home_delivery, }, "in_stock": shop_delivery in ("FOR_TOMORROW", "IMMEDIATE") or home_delivery in ("FOR_TOMORROW", "IMMEDIATE"), } def clean_description( html_description: str, session: requests.Session, driver: webdriver.Chrome, plu: str ) -> List[Dict[str, str]]: """Очищает HTML описание и структурирует его""" if not html_description: return [] # Создаем директорию для изображений описаний ensure_directory(CONFIG["IMAGE_DIRS"]["descriptions"]) html_description = re.sub(r"]*>", "", html_description) soup = BeautifulSoup(html_description, "html.parser") sections = soup.find_all("div", class_="section") cleaned_sections = [] for idx, section in enumerate(sections): header = section.find(["h2"]) paragraph = section.find("p") image = section.find("img", class_="lazy") image_url = ( image.get("data-original") if image and image.get("data-original") else "" ) if image_url.startswith("//"): image_url = f"https:{image_url}" local_image_path = None if image_url: local_image_path = download_image_selenium( driver, image_url, CONFIG["IMAGE_DIRS"]["descriptions"], f"desc_{plu}_section_{idx}", ) cleaned_section = { "title": header.text.strip() if header else "", "text": paragraph.text.strip() if paragraph else "", "image": {"url": image_url, "local_path": local_image_path}, } cleaned_sections.append(cleaned_section) return cleaned_sections def fetch_products( category: str, session: requests.Session, driver: webdriver.Chrome, status: dict = None, ) -> List[Dict]: """Получение всех товаров из категории""" all_products = [] start_from = 0 total_products = None current_product = 0 try: params = { "startFrom": 0, "numberOfItems": CONFIG["ITEMS_PER_PAGE"], "category": category, "developSearchMode": "false", } response = session.get( CONFIG["BASE_URL"], params=params, headers=CONFIG["HEADERS"], timeout=(CONFIG["CONNECT_TIMEOUT"], CONFIG["READ_TIMEOUT"]), ) response.raise_for_status() data = response.json() total_products = int(data.get("productsCount", 0)) if status is not None: status["total_items"] = total_products products = data.get("results", []) while True: if not products: break for product in products: filtered_product = extract_product_info(product, session, driver) if filtered_product["plu"]: description = get_product_description( session, filtered_product["plu"] ) filtered_product["description"] = clean_description( description, session, driver, # Передаем driver в clean_description filtered_product["plu"], ) all_products.append(filtered_product) current_product += 1 if status is not None: status["processed_items"] = current_product if current_product >= total_products: break start_from += len(products) params.update({"startFrom": start_from}) response = session.get( CONFIG["BASE_URL"], params=params, headers=CONFIG["HEADERS"], timeout=(CONFIG["CONNECT_TIMEOUT"], CONFIG["READ_TIMEOUT"]), ) response.raise_for_status() data = response.json() products = data.get("results", []) return all_products except Exception as e: logger.error(f"Error during parsing: {str(e)}") if status is not None: status["error"] = str(e) raise def main(): """Основная функция парсинга""" category = "odkurzacze-automatyczne" with create_retry_session() as session: driver = setup_selenium() try: products = fetch_products(category, session, driver) finally: driver.quit() if products: output_file = os.path.join("output", f"{category}_products.json") logger.info(f"Всего получено товаров: {len(products)}") logger.info(f"Информация сохранена в {output_file}") if __name__ == "__main__": main()