515 lines
19 KiB
Python
515 lines
19 KiB
Python
import requests
|
||
from requests.adapters import HTTPAdapter
|
||
from requests.packages.urllib3.util.retry import Retry
|
||
import json
|
||
import logging
|
||
from typing import List, Dict, Optional, Any
|
||
from bs4 import BeautifulSoup
|
||
import re
|
||
import os
|
||
from pathlib import Path
|
||
import hashlib
|
||
from urllib.parse import urlparse
|
||
import time
|
||
import random
|
||
from selenium import webdriver
|
||
from selenium.webdriver.chrome.options import Options
|
||
import base64
|
||
import urllib.request
|
||
from PIL import Image
|
||
import io
|
||
|
||
|
||
# Конфигурация
|
||
CONFIG = {
|
||
"BASE_URL": "https://www.euro.com.pl/rest/api/products/search",
|
||
"HEADERS": {
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
|
||
"Accept": "application/json, text/plain, */*",
|
||
"Accept-Language": "ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7",
|
||
"Origin": "https://www.euro.com.pl",
|
||
"Referer": "https://www.euro.com.pl/",
|
||
"Sec-Fetch-Dest": "empty",
|
||
"Sec-Fetch-Mode": "cors",
|
||
"Sec-Fetch-Site": "same-origin",
|
||
},
|
||
"CONNECT_TIMEOUT": 10,
|
||
"READ_TIMEOUT": 30,
|
||
"ITEMS_PER_PAGE": 24,
|
||
"IMAGE_DIRS": {
|
||
"products": "images/products",
|
||
"descriptions": "images/descriptions",
|
||
},
|
||
"IMAGE_RETRY_COUNT": 3,
|
||
"IMAGE_RETRY_DELAY": 2, # секунды между попытками
|
||
}
|
||
|
||
|
||
# Настройка логирования
|
||
logging.basicConfig(
|
||
level=logging.INFO, format="%(asctime)s - %(levelname)s: %(message)s"
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def create_retry_session(retries=3, backoff_factor=0.3):
|
||
"""Создание сессии с повторными запросами"""
|
||
retry_strategy = Retry(
|
||
total=retries,
|
||
backoff_factor=backoff_factor,
|
||
status_forcelist=[429, 500, 502, 503, 504],
|
||
allowed_methods=["GET", "POST"],
|
||
)
|
||
adapter = HTTPAdapter(max_retries=retry_strategy)
|
||
session = requests.Session()
|
||
session.mount("https://", adapter)
|
||
# Устанавливаем таймауты по умолчанию для сессии
|
||
session.timeout = (CONFIG["CONNECT_TIMEOUT"], CONFIG["READ_TIMEOUT"])
|
||
return session
|
||
|
||
|
||
def ensure_directory(directory: str) -> bool:
|
||
"""Создает директорию, если она не существует"""
|
||
try:
|
||
path = Path(directory)
|
||
path.mkdir(parents=True, exist_ok=True)
|
||
return path.exists() and path.is_dir()
|
||
except Exception as e:
|
||
logger.error(f"Ошибка создания директории {directory}: {str(e)}")
|
||
return False
|
||
|
||
|
||
def get_file_extension(url: str) -> str:
|
||
"""Получает расширение файла из URL"""
|
||
parsed = urlparse(url)
|
||
path = parsed.path
|
||
return os.path.splitext(path)[1].lower() or ".jpg"
|
||
|
||
|
||
def setup_selenium():
|
||
"""Настройка Selenium WebDriver"""
|
||
chrome_options = Options()
|
||
chrome_options.add_argument("--headless")
|
||
chrome_options.add_argument("--no-sandbox")
|
||
chrome_options.add_argument("--disable-dev-shm-usage")
|
||
|
||
# Добавляем те же заголовки, что использовали ранее
|
||
chrome_options.add_argument(
|
||
"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
|
||
)
|
||
chrome_options.add_argument("accept-language=pl-PL,pl;q=0.9,en-US;q=0.8,en;q=0.7")
|
||
|
||
return webdriver.Chrome(options=chrome_options)
|
||
|
||
|
||
def download_image_selenium(
|
||
driver: webdriver.Chrome, url: str, save_dir: str, prefix: str = ""
|
||
) -> Optional[str]:
|
||
"""
|
||
Скачивает изображение используя Selenium
|
||
"""
|
||
logger.info(f"Начинаем скачивание изображения через Selenium: {url}")
|
||
logger.info(f"Директория для сохранения: {save_dir}")
|
||
|
||
if not ensure_directory(save_dir):
|
||
logger.error(f"Не удалось создать или получить доступ к директории {save_dir}")
|
||
return None
|
||
|
||
try:
|
||
if not url or url.isspace():
|
||
return None
|
||
|
||
url_hash = hashlib.md5(url.encode()).hexdigest()
|
||
extension = get_file_extension(url)
|
||
filename = (
|
||
f"{prefix}_{url_hash}{extension}" if prefix else f"{url_hash}{extension}"
|
||
)
|
||
filepath = os.path.join(save_dir, filename)
|
||
|
||
logger.info(f"Сгенерирован путь для сохранения: {filepath}")
|
||
|
||
if os.path.exists(filepath) and os.path.getsize(filepath) > 0:
|
||
logger.info(f"Файл уже существует: {filepath}")
|
||
return filepath
|
||
|
||
# Получаем изображение через Selenium
|
||
driver.get(url)
|
||
|
||
# Получаем изображение как base64
|
||
img_base64 = driver.execute_async_script(
|
||
"""
|
||
var url = arguments[0];
|
||
var callback = arguments[1];
|
||
var xhr = new XMLHttpRequest();
|
||
xhr.responseType = 'blob';
|
||
xhr.onload = function() {
|
||
var reader = new FileReader();
|
||
reader.onloadend = function() {
|
||
callback(reader.result);
|
||
};
|
||
reader.readAsDataURL(xhr.response);
|
||
};
|
||
xhr.open('GET', url);
|
||
xhr.send();
|
||
""",
|
||
url,
|
||
)
|
||
|
||
if img_base64.startswith("data:image"):
|
||
# Убираем префикс data URL
|
||
img_base64 = img_base64.split(",")[1]
|
||
|
||
# Декодируем base64 в бинарные данные
|
||
img_data = base64.b64decode(img_base64)
|
||
|
||
# Сохраняем изображение
|
||
with open(filepath, "wb") as f:
|
||
f.write(img_data)
|
||
|
||
file_size = os.path.getsize(filepath)
|
||
logger.info(f"Размер сохраненного файла: {file_size} байт")
|
||
|
||
if file_size == 0:
|
||
raise ValueError("Сохранен пустой файл")
|
||
|
||
logger.info(f"Успешно скачано и сохранено изображение: {filepath}")
|
||
return filepath
|
||
else:
|
||
raise ValueError("Не удалось получить изображение как base64")
|
||
|
||
except Exception as e:
|
||
logger.error(
|
||
f"Ошибка при скачивании изображения через Selenium {url}: {str(e)}"
|
||
)
|
||
if os.path.exists(filepath):
|
||
try:
|
||
os.remove(filepath)
|
||
except:
|
||
pass
|
||
return None
|
||
|
||
|
||
def download_image(
|
||
session: requests.Session, url: str, save_dir: str, prefix: str = ""
|
||
) -> Optional[str]:
|
||
"""
|
||
Скачивает изображение и сохраняет его с повторными попытками
|
||
"""
|
||
logger.info(f"Начинаем скачивание изображения {url}")
|
||
logger.info(f"Директория для сохранения: {save_dir}")
|
||
|
||
if not ensure_directory(save_dir):
|
||
logger.error(f"Не удалось создать или получить доступ к директории {save_dir}")
|
||
return None
|
||
|
||
try:
|
||
if not url or url.isspace():
|
||
return None
|
||
|
||
url_hash = hashlib.md5(url.encode()).hexdigest()
|
||
extension = get_file_extension(url)
|
||
filename = (
|
||
f"{prefix}_{url_hash}{extension}" if prefix else f"{url_hash}{extension}"
|
||
)
|
||
filepath = os.path.join(save_dir, filename)
|
||
|
||
logger.info(f"Сгенерирован путь для сохранения: {filepath}")
|
||
|
||
if os.path.exists(filepath) and os.path.getsize(filepath) > 0:
|
||
logger.info(f"Файл уже существует: {filepath}")
|
||
return filepath
|
||
|
||
# Улучшенные заголовки
|
||
image_headers = {
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
|
||
"Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8",
|
||
"Accept-Language": "pl-PL,pl;q=0.9,en-US;q=0.8,en;q=0.7",
|
||
"Accept-Encoding": "gzip, deflate, br",
|
||
"Referer": "https://www.euro.com.pl/",
|
||
"Connection": "keep-alive",
|
||
"Sec-Fetch-Dest": "image",
|
||
"Sec-Fetch-Mode": "no-cors",
|
||
"Sec-Fetch-Site": "cross-site",
|
||
"Pragma": "no-cache",
|
||
"Cache-Control": "no-cache",
|
||
}
|
||
|
||
for attempt in range(CONFIG["IMAGE_RETRY_COUNT"]):
|
||
try:
|
||
# Случайная задержка перед запросом
|
||
time.sleep(random.uniform(1, 3))
|
||
|
||
logger.info(f"Попытка {attempt + 1} скачать изображение")
|
||
|
||
# Используем stream для скачивания
|
||
with session.get(
|
||
url,
|
||
headers=image_headers,
|
||
timeout=(CONFIG["CONNECT_TIMEOUT"], CONFIG["READ_TIMEOUT"]),
|
||
stream=True,
|
||
) as response:
|
||
response.raise_for_status()
|
||
|
||
logger.info(f"Получен ответ. Статус: {response.status_code}")
|
||
logger.info(f"Content-Type: {response.headers.get('Content-Type')}")
|
||
|
||
# Записываем файл порциями
|
||
with open(filepath, "wb") as f:
|
||
for chunk in response.iter_content(chunk_size=8192):
|
||
if chunk:
|
||
f.write(chunk)
|
||
|
||
file_size = os.path.getsize(filepath)
|
||
logger.info(f"Размер сохраненного файла: {file_size} байт")
|
||
|
||
if file_size == 0:
|
||
raise ValueError("Сохранен пустой файл")
|
||
|
||
logger.info(f"Успешно скачано и сохранено изображение: {filepath}")
|
||
return filepath
|
||
|
||
except Exception as e:
|
||
logger.warning(
|
||
f"Попытка {attempt + 1} скачать {url} не удалась: {str(e)}"
|
||
)
|
||
if attempt < CONFIG["IMAGE_RETRY_COUNT"] - 1:
|
||
# Увеличенная случайная задержка между попытками
|
||
time.sleep(random.uniform(3, 7))
|
||
else:
|
||
logger.error(f"Все попытки скачать изображение {url} не удались")
|
||
if os.path.exists(filepath):
|
||
try:
|
||
os.remove(filepath)
|
||
except:
|
||
pass
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"Критическая ошибка при скачивании изображения {url}: {str(e)}")
|
||
return None
|
||
|
||
|
||
def get_product_description(session: requests.Session, plu: str) -> Optional[str]:
|
||
"""Получает описание товара по его PLU"""
|
||
url = f"https://www.euro.com.pl/rest/api/products/{plu}/promo-pages"
|
||
|
||
try:
|
||
logger.info(f"Запрос описания: {url}")
|
||
response = session.get(
|
||
url,
|
||
headers=CONFIG["HEADERS"],
|
||
timeout=(CONFIG["CONNECT_TIMEOUT"], CONFIG["READ_TIMEOUT"]),
|
||
)
|
||
response.raise_for_status()
|
||
|
||
data = response.json()
|
||
description = data.get("marketingDescription")
|
||
|
||
logger.info("Описание получено" if description else "Описание отсутствует")
|
||
return description
|
||
|
||
except requests.RequestException as e:
|
||
logger.error(f"Ошибка получения описания (PLU: {plu}): {e}")
|
||
return None
|
||
|
||
|
||
def extract_product_info(
|
||
product: Dict, session: requests.Session, driver: webdriver.Chrome
|
||
) -> Dict[str, Any]:
|
||
"""Извлечение структурированной информации о товаре"""
|
||
promotional_price = product["prices"]["promotionalPrice"]
|
||
identifiers = product.get("identifiers", {})
|
||
delivery = product.get("deliveryAvailability") or {}
|
||
shop_delivery = (delivery.get("shopDeliveryAvailability") or {}).get("code")
|
||
home_delivery = (delivery.get("homeDeliveryAvailability") or {}).get("code")
|
||
|
||
ensure_directory(CONFIG["IMAGE_DIRS"]["products"])
|
||
|
||
product_images = []
|
||
for img in product["images"]:
|
||
if img["type"] == "BIG_PHOTO":
|
||
image_path = download_image_selenium(
|
||
driver,
|
||
img["url"],
|
||
CONFIG["IMAGE_DIRS"]["products"],
|
||
f"product_{identifiers.get('plu', 'unknown')}",
|
||
)
|
||
if image_path:
|
||
product_images.append({"url": img["url"], "local_path": image_path})
|
||
|
||
return {
|
||
"plu": identifiers.get("plu"),
|
||
"name": product["name"],
|
||
"url": f"https://www.euro.com.pl/{identifiers.get('productGroupLinkName', '')}/{identifiers.get('productLinkName', '')}",
|
||
"prices": {
|
||
"mainPrice": product["prices"]["mainPrice"],
|
||
"promotionalPrice": (
|
||
promotional_price["price"] if promotional_price else None
|
||
),
|
||
},
|
||
"attributes": [
|
||
{"name": attr["name"], "value": [v["name"] for v in attr["value"]]}
|
||
for base_attr in product["baseAttributes"]
|
||
for attr in base_attr["attributes"]
|
||
],
|
||
"images": product_images,
|
||
"availability": {
|
||
"shop": shop_delivery,
|
||
"home": home_delivery,
|
||
},
|
||
# "in_stock": shop_delivery in ("FOR_TOMORROW", "IMMEDIATE") or home_delivery in ("FOR_TOMORROW", "IMMEDIATE"),
|
||
"in_stock": all(code not in ("UNAVAILABLE", "TEMPORARILY_UNAVAILABLE", None) for code in [shop_delivery, home_delivery])
|
||
}
|
||
|
||
|
||
def clean_description(
|
||
html_description: str, session: requests.Session, driver: webdriver.Chrome, plu: str
|
||
) -> List[Dict[str, str]]:
|
||
"""Очищает HTML описание и структурирует его"""
|
||
if not html_description:
|
||
return []
|
||
|
||
# Создаем директорию для изображений описаний
|
||
ensure_directory(CONFIG["IMAGE_DIRS"]["descriptions"])
|
||
|
||
html_description = re.sub(r"<link[^>]*>", "", html_description)
|
||
soup = BeautifulSoup(html_description, "html.parser")
|
||
sections = soup.find_all("div", class_="section")
|
||
cleaned_sections = []
|
||
|
||
for idx, section in enumerate(sections):
|
||
header = section.find(["h2"])
|
||
paragraph = section.find("p")
|
||
image = section.find("img", class_="lazy")
|
||
image_url = (
|
||
image.get("data-original") if image and image.get("data-original") else ""
|
||
)
|
||
|
||
if image_url.startswith("//"):
|
||
image_url = f"https:{image_url}"
|
||
|
||
local_image_path = None
|
||
if image_url:
|
||
local_image_path = download_image_selenium(
|
||
driver,
|
||
image_url,
|
||
CONFIG["IMAGE_DIRS"]["descriptions"],
|
||
f"desc_{plu}_section_{idx}",
|
||
)
|
||
|
||
cleaned_section = {
|
||
"title": header.text.strip() if header else "",
|
||
"text": paragraph.text.strip() if paragraph else "",
|
||
"image": {"url": image_url, "local_path": local_image_path},
|
||
}
|
||
|
||
cleaned_sections.append(cleaned_section)
|
||
|
||
return cleaned_sections
|
||
|
||
|
||
def fetch_products(
|
||
category: str,
|
||
session: requests.Session,
|
||
driver: webdriver.Chrome,
|
||
status: dict = None,
|
||
) -> List[Dict]:
|
||
"""Получение всех товаров из категории"""
|
||
all_products = []
|
||
start_from = 0
|
||
total_products = None
|
||
current_product = 0
|
||
|
||
try:
|
||
params = {
|
||
"startFrom": 0,
|
||
"numberOfItems": CONFIG["ITEMS_PER_PAGE"],
|
||
"category": category,
|
||
"developSearchMode": "false",
|
||
}
|
||
|
||
response = session.get(
|
||
CONFIG["BASE_URL"],
|
||
params=params,
|
||
headers=CONFIG["HEADERS"],
|
||
timeout=(CONFIG["CONNECT_TIMEOUT"], CONFIG["READ_TIMEOUT"]),
|
||
)
|
||
response.raise_for_status()
|
||
|
||
data = response.json()
|
||
total_products = int(data.get("productsCount", 0))
|
||
|
||
if status is not None:
|
||
status["total_items"] = total_products
|
||
|
||
products = data.get("results", [])
|
||
|
||
while True:
|
||
if not products:
|
||
break
|
||
|
||
for product in products:
|
||
filtered_product = extract_product_info(product, session, driver)
|
||
if filtered_product["plu"]:
|
||
description = get_product_description(
|
||
session, filtered_product["plu"]
|
||
)
|
||
filtered_product["description"] = clean_description(
|
||
description,
|
||
session,
|
||
driver, # Передаем driver в clean_description
|
||
filtered_product["plu"],
|
||
)
|
||
if filtered_product["prices"]["mainPrice"] >= 300:
|
||
all_products.append(filtered_product)
|
||
|
||
current_product += 1
|
||
if status is not None:
|
||
status["processed_items"] = current_product
|
||
|
||
if current_product >= total_products:
|
||
break
|
||
|
||
start_from += len(products)
|
||
params.update({"startFrom": start_from})
|
||
|
||
response = session.get(
|
||
CONFIG["BASE_URL"],
|
||
params=params,
|
||
headers=CONFIG["HEADERS"],
|
||
timeout=(CONFIG["CONNECT_TIMEOUT"], CONFIG["READ_TIMEOUT"]),
|
||
)
|
||
response.raise_for_status()
|
||
|
||
data = response.json()
|
||
products = data.get("results", [])
|
||
|
||
return all_products
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error during parsing: {str(e)}")
|
||
if status is not None:
|
||
status["error"] = str(e)
|
||
raise
|
||
|
||
|
||
def main():
|
||
"""Основная функция парсинга"""
|
||
category = "odkurzacze-automatyczne"
|
||
|
||
with create_retry_session() as session:
|
||
driver = setup_selenium()
|
||
try:
|
||
products = fetch_products(category, session, driver)
|
||
finally:
|
||
driver.quit()
|
||
|
||
if products:
|
||
output_file = os.path.join("output", f"{category}_products.json")
|
||
logger.info(f"Всего получено товаров: {len(products)}")
|
||
logger.info(f"Информация сохранена в {output_file}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|