From e7e6382a107d5fe8a6f3850542c4d20571783881 Mon Sep 17 00:00:00 2001 From: MrAkells Date: Thu, 17 Apr 2025 13:56:40 +0300 Subject: [PATCH] first commit --- .gitignore | 4 + README.md | 0 categories.json | 7 + euro_scraper.py | 512 ++++++++++++++++++++ feed_generator.py | 220 +++++++++ requirements.txt | 5 + templates/index.html | 1097 ++++++++++++++++++++++++++++++++++++++++++ templates/login.html | 106 ++++ translator.py | 44 ++ web_interface.py | 613 +++++++++++++++++++++++ 10 files changed, 2608 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 categories.json create mode 100644 euro_scraper.py create mode 100644 feed_generator.py create mode 100644 requirements.txt create mode 100644 templates/index.html create mode 100644 templates/login.html create mode 100644 translator.py create mode 100644 web_interface.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..45bc509 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +venv/ +images/ +output/ +__pycache__/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/categories.json b/categories.json new file mode 100644 index 0000000..d80e027 --- /dev/null +++ b/categories.json @@ -0,0 +1,7 @@ +[ + { + "id": "1", + "name": "Роботи-пилососи", + "portal_id": "63023" + } +] \ No newline at end of file diff --git a/euro_scraper.py b/euro_scraper.py new file mode 100644 index 0000000..669438b --- /dev/null +++ b/euro_scraper.py @@ -0,0 +1,512 @@ +import requests +from requests.adapters import HTTPAdapter +from requests.packages.urllib3.util.retry import Retry +import json +import logging +from typing import List, Dict, Optional, Any +from bs4 import BeautifulSoup +import re +import os +from pathlib import Path +import hashlib +from urllib.parse import urlparse +import time +import random +from selenium import webdriver +from selenium.webdriver.chrome.options import Options +import base64 +import urllib.request +from PIL import Image +import io + + +# Конфигурация +CONFIG = { + "BASE_URL": "https://www.euro.com.pl/rest/api/products/search", + "HEADERS": { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", + "Accept": "application/json, text/plain, */*", + "Accept-Language": "ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7", + "Origin": "https://www.euro.com.pl", + "Referer": "https://www.euro.com.pl/", + "Sec-Fetch-Dest": "empty", + "Sec-Fetch-Mode": "cors", + "Sec-Fetch-Site": "same-origin", + }, + "CONNECT_TIMEOUT": 10, + "READ_TIMEOUT": 30, + "ITEMS_PER_PAGE": 24, + "IMAGE_DIRS": { + "products": "images/products", + "descriptions": "images/descriptions", + }, + "IMAGE_RETRY_COUNT": 3, + "IMAGE_RETRY_DELAY": 2, # секунды между попытками +} + + +# Настройка логирования +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s: %(message)s" +) +logger = logging.getLogger(__name__) + + +def create_retry_session(retries=3, backoff_factor=0.3): + """Создание сессии с повторными запросами""" + retry_strategy = Retry( + total=retries, + backoff_factor=backoff_factor, + status_forcelist=[429, 500, 502, 503, 504], + allowed_methods=["GET", "POST"], + ) + adapter = HTTPAdapter(max_retries=retry_strategy) + session = requests.Session() + session.mount("https://", adapter) + # Устанавливаем таймауты по умолчанию для сессии + session.timeout = (CONFIG["CONNECT_TIMEOUT"], CONFIG["READ_TIMEOUT"]) + return session + + +def ensure_directory(directory: str) -> bool: + """Создает директорию, если она не существует""" + try: + path = Path(directory) + path.mkdir(parents=True, exist_ok=True) + return path.exists() and path.is_dir() + except Exception as e: + logger.error(f"Ошибка создания директории {directory}: {str(e)}") + return False + + +def get_file_extension(url: str) -> str: + """Получает расширение файла из URL""" + parsed = urlparse(url) + path = parsed.path + return os.path.splitext(path)[1].lower() or ".jpg" + + +def setup_selenium(): + """Настройка Selenium WebDriver""" + chrome_options = Options() + chrome_options.add_argument("--headless") + chrome_options.add_argument("--no-sandbox") + chrome_options.add_argument("--disable-dev-shm-usage") + + # Добавляем те же заголовки, что использовали ранее + chrome_options.add_argument( + "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36" + ) + chrome_options.add_argument("accept-language=pl-PL,pl;q=0.9,en-US;q=0.8,en;q=0.7") + + return webdriver.Chrome(options=chrome_options) + + +def download_image_selenium( + driver: webdriver.Chrome, url: str, save_dir: str, prefix: str = "" +) -> Optional[str]: + """ + Скачивает изображение используя Selenium + """ + logger.info(f"Начинаем скачивание изображения через Selenium: {url}") + logger.info(f"Директория для сохранения: {save_dir}") + + if not ensure_directory(save_dir): + logger.error(f"Не удалось создать или получить доступ к директории {save_dir}") + return None + + try: + if not url or url.isspace(): + return None + + url_hash = hashlib.md5(url.encode()).hexdigest() + extension = get_file_extension(url) + filename = ( + f"{prefix}_{url_hash}{extension}" if prefix else f"{url_hash}{extension}" + ) + filepath = os.path.join(save_dir, filename) + + logger.info(f"Сгенерирован путь для сохранения: {filepath}") + + if os.path.exists(filepath) and os.path.getsize(filepath) > 0: + logger.info(f"Файл уже существует: {filepath}") + return filepath + + # Получаем изображение через Selenium + driver.get(url) + + # Получаем изображение как base64 + img_base64 = driver.execute_async_script( + """ + var url = arguments[0]; + var callback = arguments[1]; + var xhr = new XMLHttpRequest(); + xhr.responseType = 'blob'; + xhr.onload = function() { + var reader = new FileReader(); + reader.onloadend = function() { + callback(reader.result); + }; + reader.readAsDataURL(xhr.response); + }; + xhr.open('GET', url); + xhr.send(); + """, + url, + ) + + if img_base64.startswith("data:image"): + # Убираем префикс data URL + img_base64 = img_base64.split(",")[1] + + # Декодируем base64 в бинарные данные + img_data = base64.b64decode(img_base64) + + # Сохраняем изображение + with open(filepath, "wb") as f: + f.write(img_data) + + file_size = os.path.getsize(filepath) + logger.info(f"Размер сохраненного файла: {file_size} байт") + + if file_size == 0: + raise ValueError("Сохранен пустой файл") + + logger.info(f"Успешно скачано и сохранено изображение: {filepath}") + return filepath + else: + raise ValueError("Не удалось получить изображение как base64") + + except Exception as e: + logger.error( + f"Ошибка при скачивании изображения через Selenium {url}: {str(e)}" + ) + if os.path.exists(filepath): + try: + os.remove(filepath) + except: + pass + return None + + +def download_image( + session: requests.Session, url: str, save_dir: str, prefix: str = "" +) -> Optional[str]: + """ + Скачивает изображение и сохраняет его с повторными попытками + """ + logger.info(f"Начинаем скачивание изображения {url}") + logger.info(f"Директория для сохранения: {save_dir}") + + if not ensure_directory(save_dir): + logger.error(f"Не удалось создать или получить доступ к директории {save_dir}") + return None + + try: + if not url or url.isspace(): + return None + + url_hash = hashlib.md5(url.encode()).hexdigest() + extension = get_file_extension(url) + filename = ( + f"{prefix}_{url_hash}{extension}" if prefix else f"{url_hash}{extension}" + ) + filepath = os.path.join(save_dir, filename) + + logger.info(f"Сгенерирован путь для сохранения: {filepath}") + + if os.path.exists(filepath) and os.path.getsize(filepath) > 0: + logger.info(f"Файл уже существует: {filepath}") + return filepath + + # Улучшенные заголовки + image_headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", + "Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8", + "Accept-Language": "pl-PL,pl;q=0.9,en-US;q=0.8,en;q=0.7", + "Accept-Encoding": "gzip, deflate, br", + "Referer": "https://www.euro.com.pl/", + "Connection": "keep-alive", + "Sec-Fetch-Dest": "image", + "Sec-Fetch-Mode": "no-cors", + "Sec-Fetch-Site": "cross-site", + "Pragma": "no-cache", + "Cache-Control": "no-cache", + } + + for attempt in range(CONFIG["IMAGE_RETRY_COUNT"]): + try: + # Случайная задержка перед запросом + time.sleep(random.uniform(1, 3)) + + logger.info(f"Попытка {attempt + 1} скачать изображение") + + # Используем stream для скачивания + with session.get( + url, + headers=image_headers, + timeout=(CONFIG["CONNECT_TIMEOUT"], CONFIG["READ_TIMEOUT"]), + stream=True, + ) as response: + response.raise_for_status() + + logger.info(f"Получен ответ. Статус: {response.status_code}") + logger.info(f"Content-Type: {response.headers.get('Content-Type')}") + + # Записываем файл порциями + with open(filepath, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + + file_size = os.path.getsize(filepath) + logger.info(f"Размер сохраненного файла: {file_size} байт") + + if file_size == 0: + raise ValueError("Сохранен пустой файл") + + logger.info(f"Успешно скачано и сохранено изображение: {filepath}") + return filepath + + except Exception as e: + logger.warning( + f"Попытка {attempt + 1} скачать {url} не удалась: {str(e)}" + ) + if attempt < CONFIG["IMAGE_RETRY_COUNT"] - 1: + # Увеличенная случайная задержка между попытками + time.sleep(random.uniform(3, 7)) + else: + logger.error(f"Все попытки скачать изображение {url} не удались") + if os.path.exists(filepath): + try: + os.remove(filepath) + except: + pass + return None + + except Exception as e: + logger.error(f"Критическая ошибка при скачивании изображения {url}: {str(e)}") + return None + + +def get_product_description(session: requests.Session, plu: str) -> Optional[str]: + """Получает описание товара по его PLU""" + url = f"https://www.euro.com.pl/rest/api/products/{plu}/promo-pages" + + try: + logger.info(f"Запрос описания: {url}") + response = session.get( + url, + headers=CONFIG["HEADERS"], + timeout=(CONFIG["CONNECT_TIMEOUT"], CONFIG["READ_TIMEOUT"]), + ) + response.raise_for_status() + + data = response.json() + description = data.get("marketingDescription") + + logger.info("Описание получено" if description else "Описание отсутствует") + return description + + except requests.RequestException as e: + logger.error(f"Ошибка получения описания (PLU: {plu}): {e}") + return None + + +def extract_product_info( + product: Dict, session: requests.Session, driver: webdriver.Chrome +) -> Dict[str, Any]: + """Извлечение структурированной информации о товаре""" + promotional_price = product["prices"]["promotionalPrice"] + identifiers = product.get("identifiers", {}) + delivery = product.get("deliveryAvailability") or {} + shop_delivery = (delivery.get("shopDeliveryAvailability") or {}).get("code") + home_delivery = (delivery.get("homeDeliveryAvailability") or {}).get("code") + + ensure_directory(CONFIG["IMAGE_DIRS"]["products"]) + + product_images = [] + for img in product["images"]: + if img["type"] == "BIG_PHOTO": + image_path = download_image_selenium( + driver, + img["url"], + CONFIG["IMAGE_DIRS"]["products"], + f"product_{identifiers.get('plu', 'unknown')}", + ) + if image_path: + product_images.append({"url": img["url"], "local_path": image_path}) + + return { + "plu": identifiers.get("plu"), + "name": product["name"], + "url": f"https://www.euro.com.pl/{identifiers.get('productGroupLinkName', '')}/{identifiers.get('productLinkName', '')}", + "prices": { + "mainPrice": product["prices"]["mainPrice"], + "promotionalPrice": ( + promotional_price["price"] if promotional_price else None + ), + }, + "attributes": [ + {"name": attr["name"], "value": [v["name"] for v in attr["value"]]} + for base_attr in product["baseAttributes"] + for attr in base_attr["attributes"] + ], + "images": product_images, + "availability": { + "shop": shop_delivery, + "home": home_delivery, + }, + "in_stock": shop_delivery in ("FOR_TOMORROW", "IMMEDIATE") or home_delivery in ("FOR_TOMORROW", "IMMEDIATE"), + } + + +def clean_description( + html_description: str, session: requests.Session, driver: webdriver.Chrome, plu: str +) -> List[Dict[str, str]]: + """Очищает HTML описание и структурирует его""" + if not html_description: + return [] + + # Создаем директорию для изображений описаний + ensure_directory(CONFIG["IMAGE_DIRS"]["descriptions"]) + + html_description = re.sub(r"]*>", "", html_description) + soup = BeautifulSoup(html_description, "html.parser") + sections = soup.find_all("div", class_="section") + cleaned_sections = [] + + for idx, section in enumerate(sections): + header = section.find(["h2"]) + paragraph = section.find("p") + image = section.find("img", class_="lazy") + image_url = ( + image.get("data-original") if image and image.get("data-original") else "" + ) + + if image_url.startswith("//"): + image_url = f"https:{image_url}" + + local_image_path = None + if image_url: + local_image_path = download_image_selenium( + driver, + image_url, + CONFIG["IMAGE_DIRS"]["descriptions"], + f"desc_{plu}_section_{idx}", + ) + + cleaned_section = { + "title": header.text.strip() if header else "", + "text": paragraph.text.strip() if paragraph else "", + "image": {"url": image_url, "local_path": local_image_path}, + } + + cleaned_sections.append(cleaned_section) + + return cleaned_sections + + +def fetch_products( + category: str, + session: requests.Session, + driver: webdriver.Chrome, + status: dict = None, +) -> List[Dict]: + """Получение всех товаров из категории""" + all_products = [] + start_from = 0 + total_products = None + current_product = 0 + + try: + params = { + "startFrom": 0, + "numberOfItems": CONFIG["ITEMS_PER_PAGE"], + "category": category, + "developSearchMode": "false", + } + + response = session.get( + CONFIG["BASE_URL"], + params=params, + headers=CONFIG["HEADERS"], + timeout=(CONFIG["CONNECT_TIMEOUT"], CONFIG["READ_TIMEOUT"]), + ) + response.raise_for_status() + + data = response.json() + total_products = int(data.get("productsCount", 0)) + + if status is not None: + status["total_items"] = total_products + + products = data.get("results", []) + + while True: + if not products: + break + + for product in products: + filtered_product = extract_product_info(product, session, driver) + if filtered_product["plu"]: + description = get_product_description( + session, filtered_product["plu"] + ) + filtered_product["description"] = clean_description( + description, + session, + driver, # Передаем driver в clean_description + filtered_product["plu"], + ) + all_products.append(filtered_product) + + current_product += 1 + if status is not None: + status["processed_items"] = current_product + + if current_product >= total_products: + break + + start_from += len(products) + params.update({"startFrom": start_from}) + + response = session.get( + CONFIG["BASE_URL"], + params=params, + headers=CONFIG["HEADERS"], + timeout=(CONFIG["CONNECT_TIMEOUT"], CONFIG["READ_TIMEOUT"]), + ) + response.raise_for_status() + + data = response.json() + products = data.get("results", []) + + return all_products + + except Exception as e: + logger.error(f"Error during parsing: {str(e)}") + if status is not None: + status["error"] = str(e) + raise + + +def main(): + """Основная функция парсинга""" + category = "odkurzacze-automatyczne" + + with create_retry_session() as session: + driver = setup_selenium() + try: + products = fetch_products(category, session, driver) + finally: + driver.quit() + + if products: + output_file = os.path.join("output", f"{category}_products.json") + logger.info(f"Всего получено товаров: {len(products)}") + logger.info(f"Информация сохранена в {output_file}") + + +if __name__ == "__main__": + main() diff --git a/feed_generator.py b/feed_generator.py new file mode 100644 index 0000000..7a975d3 --- /dev/null +++ b/feed_generator.py @@ -0,0 +1,220 @@ +import json +import xml.etree.ElementTree as ET +from typing import List, Dict +from datetime import datetime +from urllib.parse import urljoin + + +class RobotVacuumYMLGenerator: + def __init__( + self, + shop_name: str = "Euro Electronics", + base_url: str = "https://mario.mrakells.pp.ua", + use_original_urls: bool = False, + ): + """ + Initialize YML feed generator + + :param shop_name: Name of the shop + :param base_url: Base URL for image hosting + :param use_original_urls: If True, use original image URLs instead of local ones + """ + self.root = ET.Element( + "yml_catalog", {"date": datetime.now().strftime("%Y-%m-%d %H:%M")} + ) + self.shop = ET.SubElement(self.root, "shop") + ET.SubElement(self.shop, "name").text = shop_name + self.base_url = base_url + self.use_original_urls = use_original_urls + + self.categories = ET.SubElement(self.shop, "categories") + self.offers = ET.SubElement(self.shop, "offers") + + def add_category(self, category_id: str, category_name: str, parent_id: str = None): + """ + Add category to YML feed + + :param category_id: Category ID + :param category_name: Category name + :param parent_id: Parent category ID (optional) + """ + attrs = {"id": category_id} + if parent_id: + attrs["parentId"] = parent_id + + category = ET.SubElement(self.categories, "category", attrs) + category.text = category_name + + def get_image_url(self, local_path: str) -> str: + """ + Convert local path to full URL, normalizing path separators + + :param local_path: Local path to image file + :return: Full URL with normalized path separators + """ + if not local_path: + return None + + # Normalize path separators to forward slashes + normalized_path = local_path.replace("\\", "/") + return urljoin(self.base_url, normalized_path) + + def process_attributes(self, attributes: List[Dict]) -> List[Dict]: + """ + Convert attributes to param format for YML + + :param attributes: List of attribute dictionaries + :return: List of param dictionaries + """ + params = [] + for attr in attributes: + value = attr["value"] + + # Handle single or multiple values + if isinstance(value, list): + value = " | ".join(str(v) for v in value) + + params.append({"name": attr["name"], "value": value}) + return params + + def clean_product_name(self, name: str) -> str: + """ + Очищает название продукта, удаляя кириллические слова после латинских символов + + :param name: Исходное название продукта + :return: Очищенное название + """ + # Разбиваем строку на слова + words = name.split() + cleaned_words = [] + last_latin_index = -1 + + # Проходим по словам и ищем последнее слово с латиницей + for i, word in enumerate(words): + # Проверяем, содержит ли слово латинские символы + if any(ord("a") <= ord(c.lower()) <= ord("z") for c in word): + last_latin_index = i + + # Если нашли латинские символы, берём все слова до следующего после последнего латинского + if last_latin_index != -1: + cleaned_words = words[: last_latin_index + 1] + else: + cleaned_words = words + + return " ".join(cleaned_words) + + def add_offer(self, product: Dict): + """ + Add a robot vacuum cleaner offer to the YML feed + + :param product: Product dictionary from JSON + """ + + in_stock = product.get('in_stock', False) + + offer = ET.SubElement(self.offers, 'offer', { + 'id': str(product['plu']), + 'available': 'true' if in_stock else 'false', + 'in_stock': 'true' if in_stock else 'false' + }) + + # Clean product name before adding to feed + cleaned_name = self.clean_product_name(product["name"]) + ET.SubElement(offer, "name").text = cleaned_name + + # Add vendorCode using plu + ET.SubElement(offer, "vendorCode").text = str(product["plu"]) + + ET.SubElement(offer, "price").text = str(product["prices"]["mainPrice"]) + ET.SubElement(offer, "currencyId").text = "PLN" + ET.SubElement(offer, "categoryId").text = str( + product["local_category_id"] + ) # якщо у тебе є локальна категорія + ET.SubElement(offer, "portal_category_id").text = str( + product["portal_category_id"] + ) # ОБОВ'ЯЗКОВО + + # Description with images + if "description" in product: + description_html = "
" + for desc in product["description"]: + description_html += f"

{desc['title']}

" + description_html += f"

{desc['text']}

" + if desc["image"].get("local_path") and not self.use_original_urls: + img_url = self.get_image_url(desc["image"]["local_path"]) + description_html += f'{desc[' + elif desc["image"].get("url") and self.use_original_urls: + img_url = desc["image"]["url"] + description_html += f'{desc[' + description_html += "
" + + description_elem = ET.SubElement(offer, "description") + description_elem.text = description_html + + # Product images + for img in product["images"][:10]: + if self.use_original_urls: + img_url = img["url"] + else: + if img.get("local_path"): + img_url = self.get_image_url(img["local_path"]) + else: + continue + ET.SubElement(offer, "picture").text = img_url + + # Attributes as params + params = self.process_attributes(product["attributes"]) + for param in params: + param_elem = ET.SubElement(offer, "param", {"name": param["name"]}) + param_elem.text = str(param["value"]) + + # URL + ET.SubElement(offer, "url").text = product["url"] + + def generate_yml(self, products: List[Dict], output_yml: str) -> bool: + """ + Generate YML feed from products data + + :param products: List of product dictionaries + :param output_yml: Path to output YML file + :return: True if successful, False otherwise + """ + try: + # Ensure a category exists + if not list(self.categories): + raise ValueError("No categories added to the YML feed.") + + # Add offers for each product + for product in products: + self.add_offer(product) + + # Write the XML tree + tree = ET.ElementTree(self.root) + tree.write(output_yml, encoding="UTF-8", xml_declaration=True) + print(f"YML feed generated: {output_yml}") + return True + + except Exception as e: + print(f"Error generating YML feed: {str(e)}") + return False + + +def main(): + """ + Example usage with command line arguments + """ + import sys + + if len(sys.argv) < 2: + print("Usage: python feed_generator.py input.json [output.yml]") + sys.exit(1) + + input_json = sys.argv[1] + output_yml = sys.argv[2] if len(sys.argv) > 2 else None + + generator = RobotVacuumYMLGenerator() + generator.generate_yml(input_json, output_yml) + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..6016229 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +deep-translator>=1.11.4 +flask +flask-login +selenium +Pillow \ No newline at end of file diff --git a/templates/index.html b/templates/index.html new file mode 100644 index 0000000..51ac82f --- /dev/null +++ b/templates/index.html @@ -0,0 +1,1097 @@ + + + + + Парсер mariotexno + + + + +

Парсер euro.com.pl

+ +
+
+ + + + +
+ + +
+ +
+ + + + +
+
+ + +
+ + + +
+ {% if status.is_running %} +

Парсинг запущено для категорії: {{ status.current_category }}

+ {% else %} +

Парсер не активний

+ {% endif %} +
+ +
+

Збережені результати:

+
    + {% for file in parsed_files %} +
  • +
    + + {{ file.name }} + + {{ file.modified }} + {{ file.size }} +
    + +
  • + {% endfor %} +
+
+
+ + +
+
+ + +
+ + + +
+ {% if translation_status.is_running %} +

Переклад в процесі...
+ Оброблено: {{ translation_status.processed_items }} з {{ translation_status.total_items }}

+ {% else %} +

Переклад не активний

+ {% endif %} +
+ +
+

Перекладені файли:

+
    + {% for file in translated_files %} +
  • +
    + + {{ file.name }} + + {{ file.modified }} + {{ file.size }} +
    + +
  • + {% endfor %} +
+
+
+ + +
+
+
+ + +
+ +
+ + +
+ + +
+ +
+

Генератор не активний

+
+ +
+

Згенеровані YML файли:

+
    + {% for file in yml_files %} +
  • +
    + + {{ file.name }} + + {{ file.modified }} + {{ file.size }} +
    + +
  • + {% endfor %} +
+
+ +
+ + +
+
+
+
+ + +
+
+ + +
+
+ + +
+ +
+ + +
+

Збережені категорії:

+
    + {% for category in categories %} +
  • + {{ category.id }} - {{ category.name }} {% if category.portal_id %} (portal_id: {{ category.portal_id }}){% endif %} + +
  • + {% endfor %} +
+
+
+
+
+ + + + + + + diff --git a/templates/login.html b/templates/login.html new file mode 100644 index 0000000..539f746 --- /dev/null +++ b/templates/login.html @@ -0,0 +1,106 @@ + + + + Вхід - Парсер mariotexno + + + +
+ +
+ + \ No newline at end of file diff --git a/translator.py b/translator.py new file mode 100644 index 0000000..8d64db6 --- /dev/null +++ b/translator.py @@ -0,0 +1,44 @@ +from deep_translator import GoogleTranslator +from typing import Dict, Any, List +import time + +class ProductTranslator: + def __init__(self): + self.translator = GoogleTranslator(source='pl', target='uk') + + def translate_text(self, text: str) -> str: + """Переводит текст с обработкой ошибок и задержкой""" + if not text or not isinstance(text, str): + return text + + try: + translated = self.translator.translate(text) + time.sleep(0.5) # Задержка чтобы избежать блокировки + return translated + except Exception as e: + print(f"Ошибка перевода: {e}") + return text + + def translate_list(self, items: List[str]) -> List[str]: + """Переводит список строк""" + return [self.translate_text(item) for item in items] + + def translate_product(self, product: Dict[str, Any]) -> Dict[str, Any]: + """Переводит все текстовые поля продукта""" + translated = product.copy() + + # Переводим название + translated['name'] = self.translate_text(product['name']) + + # Переводим атрибуты + for attr in translated['attributes']: + attr['name'] = self.translate_text(attr['name']) + attr['value'] = self.translate_list(attr['value']) + + # Переводим описание + if 'description' in translated: + for section in translated['description']: + section['title'] = self.translate_text(section['title']) + section['text'] = self.translate_text(section['text']) + + return translated \ No newline at end of file diff --git a/web_interface.py b/web_interface.py new file mode 100644 index 0000000..27a2735 --- /dev/null +++ b/web_interface.py @@ -0,0 +1,613 @@ +from flask import Flask, render_template, request, jsonify, send_from_directory, redirect, url_for +from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user +from werkzeug.security import generate_password_hash, check_password_hash +import re +from euro_scraper import create_retry_session, fetch_products, setup_selenium +import os +import threading +from datetime import datetime +import json +from translator import ProductTranslator +from feed_generator import RobotVacuumYMLGenerator +from pathlib import Path +from werkzeug.exceptions import NotFound +from urllib.parse import urljoin + +BASE_URL = "https://mario.mrakells.pp.ua" + +# Добавляем в начало файла +login_manager = LoginManager() +login_manager.login_view = 'login' + +class User(UserMixin): + def __init__(self, id, username, password_hash): + self.id = id + self.username = username + self.password_hash = password_hash + +# Хранилище пользователей (в реальном приложении использовать базу данных) +users = { + 'mario': User(1, 'mario', generate_password_hash('2htC9YlEMXAhNE')) +} + +@login_manager.user_loader +def load_user(user_id): + for user in users.values(): + if user.id == int(user_id): + return user + return None + +app = Flask(__name__) + +# Добавляем после создания app +login_manager.init_app(app) +app.config['SECRET_KEY'] = 'your-secret-key-here' # Замените на случайный ключ + +# Глобальные настройки +app_settings = {"items_limit": -1} # Ограничение количества обрабатываемых товаров + +# Глобальная переменная для хранения статуса перевода +translation_status = { + "is_running": False, + "total_items": 0, + "processed_items": 0, + "error": None, +} + +# Добавить в начало файла +CATEGORIES_FILE = "categories.json" + +# Создаем константы для путей +OUTPUT_DIR = Path("output") +TRANSLATED_DIR = OUTPUT_DIR / "translated" +YML_DIR = OUTPUT_DIR / "yml" + +# Глобальное состояние парсинга +parsing_status = { + "is_running": False, + "total_items": 0, + "processed_items": 0, + "error": None, +} + + +def load_categories(): + """Загрузка категорий из файла""" + if os.path.exists(CATEGORIES_FILE): + with open(CATEGORIES_FILE, "r", encoding="utf-8") as f: + return json.load(f) + return [] + + +def save_categories(categories): + """Сохранение категорий в файл""" + with open(CATEGORIES_FILE, "w", encoding="utf-8") as f: + json.dump(categories, f, ensure_ascii=False, indent=2) + + +def extract_category(url: str) -> str: + """Извлекает название категории из URL""" + # Пример URL: https://www.euro.com.pl/odkurzacze-automatyczne.bhtml + match = re.search(r"euro\.com\.pl/([^/]+)", url) + if match: + category = match.group(1).replace(".bhtml", "") + return category + return None + + +def start_parsing(category): + """Запуск парсинга категории""" + global parsing_status + + try: + parsing_status.update( + {"is_running": True, "total_items": 0, "processed_items": 0, "error": None} + ) + + # Создаем сессию и драйвер Selenium + session = create_retry_session() + driver = setup_selenium() + + try: + # Парсим с использованием драйвера + products = fetch_products(category, session, driver, parsing_status) + + # Сохраняем результаты + if products: + output_file = os.path.join("output", f"{category}_products.json") + with open(output_file, "w", encoding="utf-8") as f: + json.dump(products, f, ensure_ascii=False, indent=2) + + finally: + # Обязательно закрываем драйвер + driver.quit() + + except Exception as e: + parsing_status["error"] = str(e) + print(f"Error during parsing: {e}") + finally: + parsing_status["is_running"] = False + + +def get_file_info(filename, directory="output"): + """Получение информации о файле""" + filepath = os.path.join(directory, filename) + stat = os.stat(filepath) + return { + "name": filename, + "modified": datetime.fromtimestamp(stat.st_mtime).strftime("%d.%m.%Y %H:%M:%S"), + "size": f"{stat.st_size / 1024:.1f} KB", + } + +def get_oldest_parsed_file(): + """Повертає найстаріший _products.json файл""" + folder = "output" + files = [ + f for f in os.listdir(folder) + if f.endswith("_products.json") and not f.endswith("_translated_products.json") + ] + if not files: + return None + + oldest_file = min(files, key=lambda f: os.path.getmtime(os.path.join(folder, f))) + category = oldest_file.replace("_products.json", "") + return category + + +@app.route('/login', methods=['GET', 'POST']) +def login(): + if request.method == 'POST': + username = request.form.get('username') + password = request.form.get('password') + + user = users.get(username) + if user and check_password_hash(user.password_hash, password): + login_user(user) + return redirect(url_for('index')) + + return render_template('login.html', error='Неверный логин или пароль') + + return render_template('login.html') + +@app.route('/logout') +@login_required +def logout(): + logout_user() + return redirect(url_for('login')) + +@app.route("/") +@login_required +def index(): + """Главная страница""" + # Получаем спарсенные файлы + parsed_files = [] + if os.path.exists("output"): + files = [f for f in os.listdir("output") if f.endswith("_products.json")] + parsed_files = [get_file_info(f, "output") for f in files] + + # Получаем переведенные файлы + translated_files = [] + if os.path.exists("output/translated"): + files = [ + f + for f in os.listdir("output/translated") + if f.endswith("_translated_products.json") + ] + translated_files = [get_file_info(f, "output/translated") for f in files] + + # Получаем YML файлы + yml_files = [] + if os.path.exists("output/yml"): + files = [f for f in os.listdir("output/yml") if f.endswith(".yml")] + yml_files = [get_file_info(f, "output/yml") for f in files] + + # Загружаем категории + categories = load_categories() + + return render_template( + "index.html", + status=parsing_status, + translation_status=translation_status, + parsed_files=parsed_files, + translated_files=translated_files, + yml_files=yml_files, + categories=categories, + app_settings=app_settings, + ) + + +@app.route("/auto-refresh", methods=["POST"]) +def auto_refresh(): + """Запускає парсинг найстарішої збереженої категорії""" + if parsing_status["is_running"]: + return jsonify({"error": "Парсинг уже запущено"}) + + category = get_oldest_parsed_file() + if not category: + return jsonify({"error": "Немає жодної категорії для оновлення"}) + + print(f"[AUTO REFRESH] Повторний парсинг для категорії: {category}") + + thread = threading.Thread(target=start_parsing, args=(category,)) + thread.start() + + return jsonify({"success": True, "category": category}) + + +@app.route("/parse", methods=["POST"]) +@login_required +def parse(): + """Обработчик запуска парсинга""" + url = request.form.get("url") + if not url: + return jsonify({"error": "URL не указан"}) + + category = extract_category(url) + if not category: + return jsonify({"error": "Неверный формат URL"}) + + if parsing_status["is_running"]: + return jsonify({"error": "Парсинг уже запущен"}) + + # Запускаем парсинг в отдельном потоке + thread = threading.Thread(target=start_parsing, args=(category,)) + thread.start() + + return jsonify({"status": "ok"}) + + +@app.route("/status") +def get_status(): + """Получение статуса парсинга""" + return jsonify(parsing_status) + + +@app.route("/download/") +def download_file(filename): + """Скачивание файла с результатами""" + directory = request.args.get( + "directory", "output" + ) # Получаем директорию из параметров запроса + + if directory == "translated": + directory = "output/translated" + elif directory == "yml": + directory = "output/yml" + else: + directory = "output" + + return send_from_directory(directory, filename, as_attachment=True) + + +@app.route("/delete/", methods=["POST"]) +def delete_file(filename): + """Удаление файла""" + try: + directory = request.args.get("directory", "output") + + if directory == "translated": + file_path = os.path.join("output/translated", filename) + elif directory == "yml": + file_path = os.path.join("output/yml", filename) + else: + file_path = os.path.join("output", filename) + + if os.path.exists(file_path): + os.remove(file_path) + return jsonify({"success": True}) + else: + return jsonify({"error": "Файл не найден"}), 404 + + except Exception as e: + return jsonify({"error": str(e)}), 400 + + +@app.route("/translate", methods=["POST"]) +def translate(): + """Обработчик запуска перевода""" + if translation_status["is_running"]: + return jsonify({"error": "Перевод уже запущен"}) + + filename = request.form.get("filename") + if not filename: + return jsonify({"error": "Файл не выбран"}) + + # Запускаем перевод в отдельном потоке + thread = threading.Thread(target=start_translation, args=(filename,)) + thread.start() + + return jsonify({"status": "Перевод запущен"}) + + +@app.route("/translation-status") +def get_translation_status(): + """Возвращает текущий статус перевода""" + return jsonify(translation_status) + + +def start_translation(filename: str): + """Функция для запуска перевода в отдельнм потоке""" + global translation_status + + translation_status["is_running"] = True + translation_status["processed_items"] = 0 + translation_status["error"] = None + + try: + os.makedirs("output/translated", exist_ok=True) + + with open(os.path.join("output", filename), "r", encoding="utf-8") as f: + products = json.load(f) + + # Ограничиваем количество товаров только если лимит больше 0 + if app_settings["items_limit"] > 0: + products = products[: app_settings["items_limit"]] + translation_status["total_items"] = len(products) + + # Создаем экземпляр переводчика + translator = ProductTranslator() + + # Переводим товары + translated_products = [] + for i, product in enumerate(products): + translated_product = translator.translate_product(product) + translated_products.append(translated_product) + translation_status["processed_items"] = i + 1 + + # Сохраняем переведенные данные в отдельную директорию + output_filename = filename.replace( + "_products.json", "_translated_products.json" + ) + with open( + os.path.join("output/translated", output_filename), "w", encoding="utf-8" + ) as f: + json.dump(translated_products, f, ensure_ascii=False, indent=2) + + except Exception as e: + translation_status["error"] = str(e) + print(f"Ошибка перевода: {e}") + finally: + translation_status["is_running"] = False + + +@app.route("/update-settings", methods=["POST"]) +def update_settings(): + """Обновление настроек приложения""" + try: + data = request.json + if "items_limit" in data: + items_limit = int(data["items_limit"]) + if items_limit == -1 or items_limit >= 1: + app_settings["items_limit"] = items_limit + return jsonify({"success": True}) + else: + return jsonify({"error": "Значение должно быть -1 или больше 0"}) + except Exception as e: + return jsonify({"error": str(e)}) + + +@app.route("/generate-yml", methods=["POST"]) +def generate_yml(): + """Обработчик генерации YML файла""" + try: + data = request.get_json() + print(f"Received data: {data}") + + filename = data.get("filename") + category_id = data.get("category_id") + + if not filename or not category_id: + return jsonify({"error": "Не вказано файл або категорію"}) + + # Загружаем категории + categories = load_categories() + category = next((c for c in categories if str(c["id"]) == str(category_id)), None) + + if not category: + return jsonify({"error": "Категорія не знайдена"}) + + portal_category_id = category.get("portal_id") + if not portal_category_id: + return jsonify({"error": "Категорія не має portal_id (ідентифікатор категорії Prom.ua)"}) + + os.makedirs("output/yml", exist_ok=True) + + # Читаем JSON файл с переведенными товарами + input_path = os.path.join("output/translated", filename) + if not os.path.exists(input_path): + return jsonify({"error": "Файл з товарами не знайдено"}) + + with open(input_path, "r", encoding="utf-8") as f: + products = json.load(f) + + # Присваиваем portal_category_id всем товарам + for product in products: + product["portal_category_id"] = portal_category_id + product["local_category_id"] = category["id"] + + # Создаем генератор YML с указанием базового URL + generator = RobotVacuumYMLGenerator(base_url=BASE_URL) + generator.add_category(str(category["id"]), category["name"]) + + # Генерируем имя выходного файла + output_filename = filename.replace("_translated_products.json", ".yml") + output_path = os.path.join("output/yml", output_filename) + + # Генерируем YML файл + result = generator.generate_yml(products, output_path) + + if result: + return jsonify({"success": True}) + else: + return jsonify({"error": "Помилка при генерації YML файлу"}) + + except Exception as e: + print(f"Error generating YML: {str(e)}") + return jsonify({"error": str(e)}) + + +@app.route("/add-category", methods=["POST"]) +def add_category(): + """Додавання нової категорії (локальної + portal_id)""" + try: + data = request.json + categories = load_categories() + + # Перевірка обов'язкових полів + if "id" not in data or "name" not in data: + return jsonify({"error": "Обов'язкові поля: id, name"}) + + # Перевірка унікальності ID + if any(str(c["id"]) == str(data["id"]) for c in categories): + return jsonify({"error": "Категорія з таким ID вже існує"}) + + # Додаємо категорію з optional portal_id + new_category = { + "id": data["id"], + "name": data["name"], + } + + if "portal_id" in data: + new_category["portal_id"] = data["portal_id"] + + categories.append(new_category) + save_categories(categories) + + return jsonify({"success": True}) + + except Exception as e: + return jsonify({"error": str(e)}) + + + +@app.route("/delete-category", methods=["POST"]) +def delete_category(): + """Удаление категории""" + try: + data = request.json + categories = load_categories() + categories = [c for c in categories if c["id"] != data["id"]] + save_categories(categories) + return jsonify({"success": True}) + + except Exception as e: + return jsonify({"error": str(e)}) + + +@app.route("/get-yml-files") +def get_yml_files(): + """Получение списка YML файлов""" + yml_files = [] + if os.path.exists("output/yml"): + files = [f for f in os.listdir("output/yml") if f.endswith(".yml")] + yml_files = [get_file_info(f, "output/yml") for f in files] + return jsonify(yml_files) + + +@app.route("/get-translated-files") +def get_translated_files(): + """Получение списка переведенных файлов""" + translated_files = [] + if os.path.exists("output/translated"): + files = [ + f + for f in os.listdir("output/translated") + if f.endswith("_translated_products.json") + ] + translated_files = [get_file_info(f, "output/translated") for f in files] + return jsonify(translated_files) + + +@app.route("/get-parsed-files") +def get_parsed_files(): + """Получение списа спарсенных файлов""" + parsed_files = [] + if os.path.exists("output"): + files = [ + f + for f in os.listdir("output") + if f.endswith("_products.json") + and not f.endswith("_translated_products.json") + ] + parsed_files = [get_file_info(f, "output") for f in files] + return jsonify(parsed_files) + + +@app.errorhandler(404) +def not_found_error(error): + return jsonify({"error": "Файл не найден"}), 404 + + +@app.errorhandler(500) +def internal_error(error): + return jsonify({"error": "Внутренняя ошибка сервера"}), 500 + + +@app.route("/get-files/") +def get_files(file_type): + """Получение списка файлов""" + files = [] + + if file_type == "parsed": + directory = "output" + pattern = lambda f: f.endswith("_products.json") and not f.endswith( + "_translated_products.json" + ) + elif file_type == "translated": + directory = "output/translated" + pattern = lambda f: f.endswith("_translated_products.json") + elif file_type == "yml": + directory = "output/yml" + pattern = lambda f: f.endswith(".yml") + else: + return jsonify([]) + + if os.path.exists(directory): + files = [f for f in os.listdir(directory) if pattern(f)] + files = [get_file_info(f, directory) for f in files] + + return jsonify(files) + + +# Добавляем роуты для отдачи изображений +@app.route("/images/products/") +def serve_product_image(filename): + """Отдача изображений товаров""" + return send_from_directory("images/products", filename) + + +@app.route("/images/descriptions/") +def serve_description_image(filename): + """Отдача изображений описаний""" + return send_from_directory("images/descriptions", filename) + + +# Добавляем функцию для получения полного URL изображения +def get_image_url(local_path: str) -> str: + """Преобразует локальный путь в полный URL""" + if not local_path: + return None + return urljoin(BASE_URL, local_path) + + +def get_file_info(filename, directory): + """Получение информации о файле""" + path = os.path.join(directory, filename) + stat = os.stat(path) + return { + "name": filename, + "modified": datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S"), + "size": f"{stat.st_size / 1024:.1f} KB", + } + + +if __name__ == "__main__": + # Создаем необходимые директории + for directory in ["output", "output/translated", "output/yml"]: + os.makedirs(directory, exist_ok=True) + + # Создаем файл категорий, если его нет + if not os.path.exists(CATEGORIES_FILE): + save_categories([]) + + app.run(host='0.0.0.0', port=5000, debug=True)