first commit

This commit is contained in:
2025-04-17 13:56:40 +03:00
commit e7e6382a10
10 changed files with 2608 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
venv/
images/
output/
__pycache__/

0
README.md Normal file
View File

7
categories.json Normal file
View File

@@ -0,0 +1,7 @@
[
{
"id": "1",
"name": "Роботи-пилососи",
"portal_id": "63023"
}
]

512
euro_scraper.py Normal file
View File

@@ -0,0 +1,512 @@
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import json
import logging
from typing import List, Dict, Optional, Any
from bs4 import BeautifulSoup
import re
import os
from pathlib import Path
import hashlib
from urllib.parse import urlparse
import time
import random
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import base64
import urllib.request
from PIL import Image
import io
# Конфигурация
CONFIG = {
"BASE_URL": "https://www.euro.com.pl/rest/api/products/search",
"HEADERS": {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7",
"Origin": "https://www.euro.com.pl",
"Referer": "https://www.euro.com.pl/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
},
"CONNECT_TIMEOUT": 10,
"READ_TIMEOUT": 30,
"ITEMS_PER_PAGE": 24,
"IMAGE_DIRS": {
"products": "images/products",
"descriptions": "images/descriptions",
},
"IMAGE_RETRY_COUNT": 3,
"IMAGE_RETRY_DELAY": 2, # секунды между попытками
}
# Настройка логирования
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s: %(message)s"
)
logger = logging.getLogger(__name__)
def create_retry_session(retries=3, backoff_factor=0.3):
"""Создание сессии с повторными запросами"""
retry_strategy = Retry(
total=retries,
backoff_factor=backoff_factor,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST"],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session = requests.Session()
session.mount("https://", adapter)
# Устанавливаем таймауты по умолчанию для сессии
session.timeout = (CONFIG["CONNECT_TIMEOUT"], CONFIG["READ_TIMEOUT"])
return session
def ensure_directory(directory: str) -> bool:
"""Создает директорию, если она не существует"""
try:
path = Path(directory)
path.mkdir(parents=True, exist_ok=True)
return path.exists() and path.is_dir()
except Exception as e:
logger.error(f"Ошибка создания директории {directory}: {str(e)}")
return False
def get_file_extension(url: str) -> str:
"""Получает расширение файла из URL"""
parsed = urlparse(url)
path = parsed.path
return os.path.splitext(path)[1].lower() or ".jpg"
def setup_selenium():
"""Настройка Selenium WebDriver"""
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
# Добавляем те же заголовки, что использовали ранее
chrome_options.add_argument(
"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
)
chrome_options.add_argument("accept-language=pl-PL,pl;q=0.9,en-US;q=0.8,en;q=0.7")
return webdriver.Chrome(options=chrome_options)
def download_image_selenium(
driver: webdriver.Chrome, url: str, save_dir: str, prefix: str = ""
) -> Optional[str]:
"""
Скачивает изображение используя Selenium
"""
logger.info(f"Начинаем скачивание изображения через Selenium: {url}")
logger.info(f"Директория для сохранения: {save_dir}")
if not ensure_directory(save_dir):
logger.error(f"Не удалось создать или получить доступ к директории {save_dir}")
return None
try:
if not url or url.isspace():
return None
url_hash = hashlib.md5(url.encode()).hexdigest()
extension = get_file_extension(url)
filename = (
f"{prefix}_{url_hash}{extension}" if prefix else f"{url_hash}{extension}"
)
filepath = os.path.join(save_dir, filename)
logger.info(f"Сгенерирован путь для сохранения: {filepath}")
if os.path.exists(filepath) and os.path.getsize(filepath) > 0:
logger.info(f"Файл уже существует: {filepath}")
return filepath
# Получаем изображение через Selenium
driver.get(url)
# Получаем изображение как base64
img_base64 = driver.execute_async_script(
"""
var url = arguments[0];
var callback = arguments[1];
var xhr = new XMLHttpRequest();
xhr.responseType = 'blob';
xhr.onload = function() {
var reader = new FileReader();
reader.onloadend = function() {
callback(reader.result);
};
reader.readAsDataURL(xhr.response);
};
xhr.open('GET', url);
xhr.send();
""",
url,
)
if img_base64.startswith("data:image"):
# Убираем префикс data URL
img_base64 = img_base64.split(",")[1]
# Декодируем base64 в бинарные данные
img_data = base64.b64decode(img_base64)
# Сохраняем изображение
with open(filepath, "wb") as f:
f.write(img_data)
file_size = os.path.getsize(filepath)
logger.info(f"Размер сохраненного файла: {file_size} байт")
if file_size == 0:
raise ValueError("Сохранен пустой файл")
logger.info(f"Успешно скачано и сохранено изображение: {filepath}")
return filepath
else:
raise ValueError("Не удалось получить изображение как base64")
except Exception as e:
logger.error(
f"Ошибка при скачивании изображения через Selenium {url}: {str(e)}"
)
if os.path.exists(filepath):
try:
os.remove(filepath)
except:
pass
return None
def download_image(
session: requests.Session, url: str, save_dir: str, prefix: str = ""
) -> Optional[str]:
"""
Скачивает изображение и сохраняет его с повторными попытками
"""
logger.info(f"Начинаем скачивание изображения {url}")
logger.info(f"Директория для сохранения: {save_dir}")
if not ensure_directory(save_dir):
logger.error(f"Не удалось создать или получить доступ к директории {save_dir}")
return None
try:
if not url or url.isspace():
return None
url_hash = hashlib.md5(url.encode()).hexdigest()
extension = get_file_extension(url)
filename = (
f"{prefix}_{url_hash}{extension}" if prefix else f"{url_hash}{extension}"
)
filepath = os.path.join(save_dir, filename)
logger.info(f"Сгенерирован путь для сохранения: {filepath}")
if os.path.exists(filepath) and os.path.getsize(filepath) > 0:
logger.info(f"Файл уже существует: {filepath}")
return filepath
# Улучшенные заголовки
image_headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8",
"Accept-Language": "pl-PL,pl;q=0.9,en-US;q=0.8,en;q=0.7",
"Accept-Encoding": "gzip, deflate, br",
"Referer": "https://www.euro.com.pl/",
"Connection": "keep-alive",
"Sec-Fetch-Dest": "image",
"Sec-Fetch-Mode": "no-cors",
"Sec-Fetch-Site": "cross-site",
"Pragma": "no-cache",
"Cache-Control": "no-cache",
}
for attempt in range(CONFIG["IMAGE_RETRY_COUNT"]):
try:
# Случайная задержка перед запросом
time.sleep(random.uniform(1, 3))
logger.info(f"Попытка {attempt + 1} скачать изображение")
# Используем stream для скачивания
with session.get(
url,
headers=image_headers,
timeout=(CONFIG["CONNECT_TIMEOUT"], CONFIG["READ_TIMEOUT"]),
stream=True,
) as response:
response.raise_for_status()
logger.info(f"Получен ответ. Статус: {response.status_code}")
logger.info(f"Content-Type: {response.headers.get('Content-Type')}")
# Записываем файл порциями
with open(filepath, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
file_size = os.path.getsize(filepath)
logger.info(f"Размер сохраненного файла: {file_size} байт")
if file_size == 0:
raise ValueError("Сохранен пустой файл")
logger.info(f"Успешно скачано и сохранено изображение: {filepath}")
return filepath
except Exception as e:
logger.warning(
f"Попытка {attempt + 1} скачать {url} не удалась: {str(e)}"
)
if attempt < CONFIG["IMAGE_RETRY_COUNT"] - 1:
# Увеличенная случайная задержка между попытками
time.sleep(random.uniform(3, 7))
else:
logger.error(f"Все попытки скачать изображение {url} не удались")
if os.path.exists(filepath):
try:
os.remove(filepath)
except:
pass
return None
except Exception as e:
logger.error(f"Критическая ошибка при скачивании изображения {url}: {str(e)}")
return None
def get_product_description(session: requests.Session, plu: str) -> Optional[str]:
"""Получает описание товара по его PLU"""
url = f"https://www.euro.com.pl/rest/api/products/{plu}/promo-pages"
try:
logger.info(f"Запрос описания: {url}")
response = session.get(
url,
headers=CONFIG["HEADERS"],
timeout=(CONFIG["CONNECT_TIMEOUT"], CONFIG["READ_TIMEOUT"]),
)
response.raise_for_status()
data = response.json()
description = data.get("marketingDescription")
logger.info("Описание получено" if description else "Описание отсутствует")
return description
except requests.RequestException as e:
logger.error(f"Ошибка получения описания (PLU: {plu}): {e}")
return None
def extract_product_info(
product: Dict, session: requests.Session, driver: webdriver.Chrome
) -> Dict[str, Any]:
"""Извлечение структурированной информации о товаре"""
promotional_price = product["prices"]["promotionalPrice"]
identifiers = product.get("identifiers", {})
delivery = product.get("deliveryAvailability") or {}
shop_delivery = (delivery.get("shopDeliveryAvailability") or {}).get("code")
home_delivery = (delivery.get("homeDeliveryAvailability") or {}).get("code")
ensure_directory(CONFIG["IMAGE_DIRS"]["products"])
product_images = []
for img in product["images"]:
if img["type"] == "BIG_PHOTO":
image_path = download_image_selenium(
driver,
img["url"],
CONFIG["IMAGE_DIRS"]["products"],
f"product_{identifiers.get('plu', 'unknown')}",
)
if image_path:
product_images.append({"url": img["url"], "local_path": image_path})
return {
"plu": identifiers.get("plu"),
"name": product["name"],
"url": f"https://www.euro.com.pl/{identifiers.get('productGroupLinkName', '')}/{identifiers.get('productLinkName', '')}",
"prices": {
"mainPrice": product["prices"]["mainPrice"],
"promotionalPrice": (
promotional_price["price"] if promotional_price else None
),
},
"attributes": [
{"name": attr["name"], "value": [v["name"] for v in attr["value"]]}
for base_attr in product["baseAttributes"]
for attr in base_attr["attributes"]
],
"images": product_images,
"availability": {
"shop": shop_delivery,
"home": home_delivery,
},
"in_stock": shop_delivery in ("FOR_TOMORROW", "IMMEDIATE") or home_delivery in ("FOR_TOMORROW", "IMMEDIATE"),
}
def clean_description(
html_description: str, session: requests.Session, driver: webdriver.Chrome, plu: str
) -> List[Dict[str, str]]:
"""Очищает HTML описание и структурирует его"""
if not html_description:
return []
# Создаем директорию для изображений описаний
ensure_directory(CONFIG["IMAGE_DIRS"]["descriptions"])
html_description = re.sub(r"<link[^>]*>", "", html_description)
soup = BeautifulSoup(html_description, "html.parser")
sections = soup.find_all("div", class_="section")
cleaned_sections = []
for idx, section in enumerate(sections):
header = section.find(["h2"])
paragraph = section.find("p")
image = section.find("img", class_="lazy")
image_url = (
image.get("data-original") if image and image.get("data-original") else ""
)
if image_url.startswith("//"):
image_url = f"https:{image_url}"
local_image_path = None
if image_url:
local_image_path = download_image_selenium(
driver,
image_url,
CONFIG["IMAGE_DIRS"]["descriptions"],
f"desc_{plu}_section_{idx}",
)
cleaned_section = {
"title": header.text.strip() if header else "",
"text": paragraph.text.strip() if paragraph else "",
"image": {"url": image_url, "local_path": local_image_path},
}
cleaned_sections.append(cleaned_section)
return cleaned_sections
def fetch_products(
category: str,
session: requests.Session,
driver: webdriver.Chrome,
status: dict = None,
) -> List[Dict]:
"""Получение всех товаров из категории"""
all_products = []
start_from = 0
total_products = None
current_product = 0
try:
params = {
"startFrom": 0,
"numberOfItems": CONFIG["ITEMS_PER_PAGE"],
"category": category,
"developSearchMode": "false",
}
response = session.get(
CONFIG["BASE_URL"],
params=params,
headers=CONFIG["HEADERS"],
timeout=(CONFIG["CONNECT_TIMEOUT"], CONFIG["READ_TIMEOUT"]),
)
response.raise_for_status()
data = response.json()
total_products = int(data.get("productsCount", 0))
if status is not None:
status["total_items"] = total_products
products = data.get("results", [])
while True:
if not products:
break
for product in products:
filtered_product = extract_product_info(product, session, driver)
if filtered_product["plu"]:
description = get_product_description(
session, filtered_product["plu"]
)
filtered_product["description"] = clean_description(
description,
session,
driver, # Передаем driver в clean_description
filtered_product["plu"],
)
all_products.append(filtered_product)
current_product += 1
if status is not None:
status["processed_items"] = current_product
if current_product >= total_products:
break
start_from += len(products)
params.update({"startFrom": start_from})
response = session.get(
CONFIG["BASE_URL"],
params=params,
headers=CONFIG["HEADERS"],
timeout=(CONFIG["CONNECT_TIMEOUT"], CONFIG["READ_TIMEOUT"]),
)
response.raise_for_status()
data = response.json()
products = data.get("results", [])
return all_products
except Exception as e:
logger.error(f"Error during parsing: {str(e)}")
if status is not None:
status["error"] = str(e)
raise
def main():
"""Основная функция парсинга"""
category = "odkurzacze-automatyczne"
with create_retry_session() as session:
driver = setup_selenium()
try:
products = fetch_products(category, session, driver)
finally:
driver.quit()
if products:
output_file = os.path.join("output", f"{category}_products.json")
logger.info(f"Всего получено товаров: {len(products)}")
logger.info(f"Информация сохранена в {output_file}")
if __name__ == "__main__":
main()

220
feed_generator.py Normal file
View File

@@ -0,0 +1,220 @@
import json
import xml.etree.ElementTree as ET
from typing import List, Dict
from datetime import datetime
from urllib.parse import urljoin
class RobotVacuumYMLGenerator:
def __init__(
self,
shop_name: str = "Euro Electronics",
base_url: str = "https://mario.mrakells.pp.ua",
use_original_urls: bool = False,
):
"""
Initialize YML feed generator
:param shop_name: Name of the shop
:param base_url: Base URL for image hosting
:param use_original_urls: If True, use original image URLs instead of local ones
"""
self.root = ET.Element(
"yml_catalog", {"date": datetime.now().strftime("%Y-%m-%d %H:%M")}
)
self.shop = ET.SubElement(self.root, "shop")
ET.SubElement(self.shop, "name").text = shop_name
self.base_url = base_url
self.use_original_urls = use_original_urls
self.categories = ET.SubElement(self.shop, "categories")
self.offers = ET.SubElement(self.shop, "offers")
def add_category(self, category_id: str, category_name: str, parent_id: str = None):
"""
Add category to YML feed
:param category_id: Category ID
:param category_name: Category name
:param parent_id: Parent category ID (optional)
"""
attrs = {"id": category_id}
if parent_id:
attrs["parentId"] = parent_id
category = ET.SubElement(self.categories, "category", attrs)
category.text = category_name
def get_image_url(self, local_path: str) -> str:
"""
Convert local path to full URL, normalizing path separators
:param local_path: Local path to image file
:return: Full URL with normalized path separators
"""
if not local_path:
return None
# Normalize path separators to forward slashes
normalized_path = local_path.replace("\\", "/")
return urljoin(self.base_url, normalized_path)
def process_attributes(self, attributes: List[Dict]) -> List[Dict]:
"""
Convert attributes to param format for YML
:param attributes: List of attribute dictionaries
:return: List of param dictionaries
"""
params = []
for attr in attributes:
value = attr["value"]
# Handle single or multiple values
if isinstance(value, list):
value = " | ".join(str(v) for v in value)
params.append({"name": attr["name"], "value": value})
return params
def clean_product_name(self, name: str) -> str:
"""
Очищает название продукта, удаляя кириллические слова после латинских символов
:param name: Исходное название продукта
:return: Очищенное название
"""
# Разбиваем строку на слова
words = name.split()
cleaned_words = []
last_latin_index = -1
# Проходим по словам и ищем последнее слово с латиницей
for i, word in enumerate(words):
# Проверяем, содержит ли слово латинские символы
if any(ord("a") <= ord(c.lower()) <= ord("z") for c in word):
last_latin_index = i
# Если нашли латинские символы, берём все слова до следующего после последнего латинского
if last_latin_index != -1:
cleaned_words = words[: last_latin_index + 1]
else:
cleaned_words = words
return " ".join(cleaned_words)
def add_offer(self, product: Dict):
"""
Add a robot vacuum cleaner offer to the YML feed
:param product: Product dictionary from JSON
"""
in_stock = product.get('in_stock', False)
offer = ET.SubElement(self.offers, 'offer', {
'id': str(product['plu']),
'available': 'true' if in_stock else 'false',
'in_stock': 'true' if in_stock else 'false'
})
# Clean product name before adding to feed
cleaned_name = self.clean_product_name(product["name"])
ET.SubElement(offer, "name").text = cleaned_name
# Add vendorCode using plu
ET.SubElement(offer, "vendorCode").text = str(product["plu"])
ET.SubElement(offer, "price").text = str(product["prices"]["mainPrice"])
ET.SubElement(offer, "currencyId").text = "PLN"
ET.SubElement(offer, "categoryId").text = str(
product["local_category_id"]
) # якщо у тебе є локальна категорія
ET.SubElement(offer, "portal_category_id").text = str(
product["portal_category_id"]
) # ОБОВ'ЯЗКОВО
# Description with images
if "description" in product:
description_html = "<div>"
for desc in product["description"]:
description_html += f"<h3>{desc['title']}</h3>"
description_html += f"<p>{desc['text']}</p>"
if desc["image"].get("local_path") and not self.use_original_urls:
img_url = self.get_image_url(desc["image"]["local_path"])
description_html += f'<img src="{img_url}" alt="{desc["title"]}"/>'
elif desc["image"].get("url") and self.use_original_urls:
img_url = desc["image"]["url"]
description_html += f'<img src="{img_url}" alt="{desc["title"]}"/>'
description_html += "</div>"
description_elem = ET.SubElement(offer, "description")
description_elem.text = description_html
# Product images
for img in product["images"][:10]:
if self.use_original_urls:
img_url = img["url"]
else:
if img.get("local_path"):
img_url = self.get_image_url(img["local_path"])
else:
continue
ET.SubElement(offer, "picture").text = img_url
# Attributes as params
params = self.process_attributes(product["attributes"])
for param in params:
param_elem = ET.SubElement(offer, "param", {"name": param["name"]})
param_elem.text = str(param["value"])
# URL
ET.SubElement(offer, "url").text = product["url"]
def generate_yml(self, products: List[Dict], output_yml: str) -> bool:
"""
Generate YML feed from products data
:param products: List of product dictionaries
:param output_yml: Path to output YML file
:return: True if successful, False otherwise
"""
try:
# Ensure a category exists
if not list(self.categories):
raise ValueError("No categories added to the YML feed.")
# Add offers for each product
for product in products:
self.add_offer(product)
# Write the XML tree
tree = ET.ElementTree(self.root)
tree.write(output_yml, encoding="UTF-8", xml_declaration=True)
print(f"YML feed generated: {output_yml}")
return True
except Exception as e:
print(f"Error generating YML feed: {str(e)}")
return False
def main():
"""
Example usage with command line arguments
"""
import sys
if len(sys.argv) < 2:
print("Usage: python feed_generator.py input.json [output.yml]")
sys.exit(1)
input_json = sys.argv[1]
output_yml = sys.argv[2] if len(sys.argv) > 2 else None
generator = RobotVacuumYMLGenerator()
generator.generate_yml(input_json, output_yml)
if __name__ == "__main__":
main()

5
requirements.txt Normal file
View File

@@ -0,0 +1,5 @@
deep-translator>=1.11.4
flask
flask-login
selenium
Pillow

1097
templates/index.html Normal file

File diff suppressed because it is too large Load Diff

106
templates/login.html Normal file
View File

@@ -0,0 +1,106 @@
<!DOCTYPE html>
<html>
<head>
<title>Вхід - Парсер mariotexno</title>
<style>
body {
margin: 0;
padding: 0;
font-family: Arial, sans-serif;
background-color: #1a1b26;
color: #ffffff;
}
.container {
width: 100%;
max-width: 400px;
margin: 100px auto;
padding: 20px;
}
.login-card {
background-color: #282a36;
border-radius: 8px;
padding: 20px;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
}
.login-header {
text-align: center;
margin-bottom: 30px;
}
.login-header h1 {
color: #ffffff;
font-size: 24px;
margin: 0;
}
.form-group {
margin-bottom: 20px;
}
.form-group label {
display: block;
margin-bottom: 5px;
color: #8be9fd;
}
.form-group input {
width: 100%;
padding: 10px;
border: 1px solid #44475a;
border-radius: 4px;
background-color: #1a1b26;
color: #ffffff;
box-sizing: border-box;
}
.form-group input:focus {
outline: none;
border-color: #6272a4;
}
.login-button {
width: 100%;
padding: 12px;
background-color: #7aa2f7;
border: none;
border-radius: 4px;
color: #ffffff;
font-size: 16px;
cursor: pointer;
transition: background-color 0.3s;
}
.login-button:hover {
background-color: #6b91e4;
}
.error-message {
background-color: #ff5555;
color: #ffffff;
padding: 10px;
border-radius: 4px;
margin-bottom: 20px;
text-align: center;
}
</style>
</head>
<body>
<div class="container">
<div class="login-card">
<div class="login-header">
<h1>Парсер mariotexno</h1>
</div>
{% if error %}
<div class="error-message">
{{ error }}
</div>
{% endif %}
<form method="POST">
<div class="form-group">
<label>Логін</label>
<input type="text" name="username" required>
</div>
<div class="form-group">
<label>Пароль</label>
<input type="password" name="password" required>
</div>
<button type="submit" class="login-button">Увійти</button>
</form>
</div>
</div>
</body>
</html>

44
translator.py Normal file
View File

@@ -0,0 +1,44 @@
from deep_translator import GoogleTranslator
from typing import Dict, Any, List
import time
class ProductTranslator:
def __init__(self):
self.translator = GoogleTranslator(source='pl', target='uk')
def translate_text(self, text: str) -> str:
"""Переводит текст с обработкой ошибок и задержкой"""
if not text or not isinstance(text, str):
return text
try:
translated = self.translator.translate(text)
time.sleep(0.5) # Задержка чтобы избежать блокировки
return translated
except Exception as e:
print(f"Ошибка перевода: {e}")
return text
def translate_list(self, items: List[str]) -> List[str]:
"""Переводит список строк"""
return [self.translate_text(item) for item in items]
def translate_product(self, product: Dict[str, Any]) -> Dict[str, Any]:
"""Переводит все текстовые поля продукта"""
translated = product.copy()
# Переводим название
translated['name'] = self.translate_text(product['name'])
# Переводим атрибуты
for attr in translated['attributes']:
attr['name'] = self.translate_text(attr['name'])
attr['value'] = self.translate_list(attr['value'])
# Переводим описание
if 'description' in translated:
for section in translated['description']:
section['title'] = self.translate_text(section['title'])
section['text'] = self.translate_text(section['text'])
return translated

613
web_interface.py Normal file
View File

@@ -0,0 +1,613 @@
from flask import Flask, render_template, request, jsonify, send_from_directory, redirect, url_for
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from werkzeug.security import generate_password_hash, check_password_hash
import re
from euro_scraper import create_retry_session, fetch_products, setup_selenium
import os
import threading
from datetime import datetime
import json
from translator import ProductTranslator
from feed_generator import RobotVacuumYMLGenerator
from pathlib import Path
from werkzeug.exceptions import NotFound
from urllib.parse import urljoin
BASE_URL = "https://mario.mrakells.pp.ua"
# Добавляем в начало файла
login_manager = LoginManager()
login_manager.login_view = 'login'
class User(UserMixin):
def __init__(self, id, username, password_hash):
self.id = id
self.username = username
self.password_hash = password_hash
# Хранилище пользователей (в реальном приложении использовать базу данных)
users = {
'mario': User(1, 'mario', generate_password_hash('2htC9YlEMXAhNE'))
}
@login_manager.user_loader
def load_user(user_id):
for user in users.values():
if user.id == int(user_id):
return user
return None
app = Flask(__name__)
# Добавляем после создания app
login_manager.init_app(app)
app.config['SECRET_KEY'] = 'your-secret-key-here' # Замените на случайный ключ
# Глобальные настройки
app_settings = {"items_limit": -1} # Ограничение количества обрабатываемых товаров
# Глобальная переменная для хранения статуса перевода
translation_status = {
"is_running": False,
"total_items": 0,
"processed_items": 0,
"error": None,
}
# Добавить в начало файла
CATEGORIES_FILE = "categories.json"
# Создаем константы для путей
OUTPUT_DIR = Path("output")
TRANSLATED_DIR = OUTPUT_DIR / "translated"
YML_DIR = OUTPUT_DIR / "yml"
# Глобальное состояние парсинга
parsing_status = {
"is_running": False,
"total_items": 0,
"processed_items": 0,
"error": None,
}
def load_categories():
"""Загрузка категорий из файла"""
if os.path.exists(CATEGORIES_FILE):
with open(CATEGORIES_FILE, "r", encoding="utf-8") as f:
return json.load(f)
return []
def save_categories(categories):
"""Сохранение категорий в файл"""
with open(CATEGORIES_FILE, "w", encoding="utf-8") as f:
json.dump(categories, f, ensure_ascii=False, indent=2)
def extract_category(url: str) -> str:
"""Извлекает название категории из URL"""
# Пример URL: https://www.euro.com.pl/odkurzacze-automatyczne.bhtml
match = re.search(r"euro\.com\.pl/([^/]+)", url)
if match:
category = match.group(1).replace(".bhtml", "")
return category
return None
def start_parsing(category):
"""Запуск парсинга категории"""
global parsing_status
try:
parsing_status.update(
{"is_running": True, "total_items": 0, "processed_items": 0, "error": None}
)
# Создаем сессию и драйвер Selenium
session = create_retry_session()
driver = setup_selenium()
try:
# Парсим с использованием драйвера
products = fetch_products(category, session, driver, parsing_status)
# Сохраняем результаты
if products:
output_file = os.path.join("output", f"{category}_products.json")
with open(output_file, "w", encoding="utf-8") as f:
json.dump(products, f, ensure_ascii=False, indent=2)
finally:
# Обязательно закрываем драйвер
driver.quit()
except Exception as e:
parsing_status["error"] = str(e)
print(f"Error during parsing: {e}")
finally:
parsing_status["is_running"] = False
def get_file_info(filename, directory="output"):
"""Получение информации о файле"""
filepath = os.path.join(directory, filename)
stat = os.stat(filepath)
return {
"name": filename,
"modified": datetime.fromtimestamp(stat.st_mtime).strftime("%d.%m.%Y %H:%M:%S"),
"size": f"{stat.st_size / 1024:.1f} KB",
}
def get_oldest_parsed_file():
"""Повертає найстаріший _products.json файл"""
folder = "output"
files = [
f for f in os.listdir(folder)
if f.endswith("_products.json") and not f.endswith("_translated_products.json")
]
if not files:
return None
oldest_file = min(files, key=lambda f: os.path.getmtime(os.path.join(folder, f)))
category = oldest_file.replace("_products.json", "")
return category
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user = users.get(username)
if user and check_password_hash(user.password_hash, password):
login_user(user)
return redirect(url_for('index'))
return render_template('login.html', error='Неверный логин или пароль')
return render_template('login.html')
@app.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('login'))
@app.route("/")
@login_required
def index():
"""Главная страница"""
# Получаем спарсенные файлы
parsed_files = []
if os.path.exists("output"):
files = [f for f in os.listdir("output") if f.endswith("_products.json")]
parsed_files = [get_file_info(f, "output") for f in files]
# Получаем переведенные файлы
translated_files = []
if os.path.exists("output/translated"):
files = [
f
for f in os.listdir("output/translated")
if f.endswith("_translated_products.json")
]
translated_files = [get_file_info(f, "output/translated") for f in files]
# Получаем YML файлы
yml_files = []
if os.path.exists("output/yml"):
files = [f for f in os.listdir("output/yml") if f.endswith(".yml")]
yml_files = [get_file_info(f, "output/yml") for f in files]
# Загружаем категории
categories = load_categories()
return render_template(
"index.html",
status=parsing_status,
translation_status=translation_status,
parsed_files=parsed_files,
translated_files=translated_files,
yml_files=yml_files,
categories=categories,
app_settings=app_settings,
)
@app.route("/auto-refresh", methods=["POST"])
def auto_refresh():
"""Запускає парсинг найстарішої збереженої категорії"""
if parsing_status["is_running"]:
return jsonify({"error": "Парсинг уже запущено"})
category = get_oldest_parsed_file()
if not category:
return jsonify({"error": "Немає жодної категорії для оновлення"})
print(f"[AUTO REFRESH] Повторний парсинг для категорії: {category}")
thread = threading.Thread(target=start_parsing, args=(category,))
thread.start()
return jsonify({"success": True, "category": category})
@app.route("/parse", methods=["POST"])
@login_required
def parse():
"""Обработчик запуска парсинга"""
url = request.form.get("url")
if not url:
return jsonify({"error": "URL не указан"})
category = extract_category(url)
if not category:
return jsonify({"error": "Неверный формат URL"})
if parsing_status["is_running"]:
return jsonify({"error": "Парсинг уже запущен"})
# Запускаем парсинг в отдельном потоке
thread = threading.Thread(target=start_parsing, args=(category,))
thread.start()
return jsonify({"status": "ok"})
@app.route("/status")
def get_status():
"""Получение статуса парсинга"""
return jsonify(parsing_status)
@app.route("/download/<path:filename>")
def download_file(filename):
"""Скачивание файла с результатами"""
directory = request.args.get(
"directory", "output"
) # Получаем директорию из параметров запроса
if directory == "translated":
directory = "output/translated"
elif directory == "yml":
directory = "output/yml"
else:
directory = "output"
return send_from_directory(directory, filename, as_attachment=True)
@app.route("/delete/<path:filename>", methods=["POST"])
def delete_file(filename):
"""Удаление файла"""
try:
directory = request.args.get("directory", "output")
if directory == "translated":
file_path = os.path.join("output/translated", filename)
elif directory == "yml":
file_path = os.path.join("output/yml", filename)
else:
file_path = os.path.join("output", filename)
if os.path.exists(file_path):
os.remove(file_path)
return jsonify({"success": True})
else:
return jsonify({"error": "Файл не найден"}), 404
except Exception as e:
return jsonify({"error": str(e)}), 400
@app.route("/translate", methods=["POST"])
def translate():
"""Обработчик запуска перевода"""
if translation_status["is_running"]:
return jsonify({"error": "Перевод уже запущен"})
filename = request.form.get("filename")
if not filename:
return jsonify({"error": "Файл не выбран"})
# Запускаем перевод в отдельном потоке
thread = threading.Thread(target=start_translation, args=(filename,))
thread.start()
return jsonify({"status": "Перевод запущен"})
@app.route("/translation-status")
def get_translation_status():
"""Возвращает текущий статус перевода"""
return jsonify(translation_status)
def start_translation(filename: str):
"""Функция для запуска перевода в отдельнм потоке"""
global translation_status
translation_status["is_running"] = True
translation_status["processed_items"] = 0
translation_status["error"] = None
try:
os.makedirs("output/translated", exist_ok=True)
with open(os.path.join("output", filename), "r", encoding="utf-8") as f:
products = json.load(f)
# Ограничиваем количество товаров только если лимит больше 0
if app_settings["items_limit"] > 0:
products = products[: app_settings["items_limit"]]
translation_status["total_items"] = len(products)
# Создаем экземпляр переводчика
translator = ProductTranslator()
# Переводим товары
translated_products = []
for i, product in enumerate(products):
translated_product = translator.translate_product(product)
translated_products.append(translated_product)
translation_status["processed_items"] = i + 1
# Сохраняем переведенные данные в отдельную директорию
output_filename = filename.replace(
"_products.json", "_translated_products.json"
)
with open(
os.path.join("output/translated", output_filename), "w", encoding="utf-8"
) as f:
json.dump(translated_products, f, ensure_ascii=False, indent=2)
except Exception as e:
translation_status["error"] = str(e)
print(f"Ошибка перевода: {e}")
finally:
translation_status["is_running"] = False
@app.route("/update-settings", methods=["POST"])
def update_settings():
"""Обновление настроек приложения"""
try:
data = request.json
if "items_limit" in data:
items_limit = int(data["items_limit"])
if items_limit == -1 or items_limit >= 1:
app_settings["items_limit"] = items_limit
return jsonify({"success": True})
else:
return jsonify({"error": "Значение должно быть -1 или больше 0"})
except Exception as e:
return jsonify({"error": str(e)})
@app.route("/generate-yml", methods=["POST"])
def generate_yml():
"""Обработчик генерации YML файла"""
try:
data = request.get_json()
print(f"Received data: {data}")
filename = data.get("filename")
category_id = data.get("category_id")
if not filename or not category_id:
return jsonify({"error": "Не вказано файл або категорію"})
# Загружаем категории
categories = load_categories()
category = next((c for c in categories if str(c["id"]) == str(category_id)), None)
if not category:
return jsonify({"error": "Категорія не знайдена"})
portal_category_id = category.get("portal_id")
if not portal_category_id:
return jsonify({"error": "Категорія не має portal_id (ідентифікатор категорії Prom.ua)"})
os.makedirs("output/yml", exist_ok=True)
# Читаем JSON файл с переведенными товарами
input_path = os.path.join("output/translated", filename)
if not os.path.exists(input_path):
return jsonify({"error": "Файл з товарами не знайдено"})
with open(input_path, "r", encoding="utf-8") as f:
products = json.load(f)
# Присваиваем portal_category_id всем товарам
for product in products:
product["portal_category_id"] = portal_category_id
product["local_category_id"] = category["id"]
# Создаем генератор YML с указанием базового URL
generator = RobotVacuumYMLGenerator(base_url=BASE_URL)
generator.add_category(str(category["id"]), category["name"])
# Генерируем имя выходного файла
output_filename = filename.replace("_translated_products.json", ".yml")
output_path = os.path.join("output/yml", output_filename)
# Генерируем YML файл
result = generator.generate_yml(products, output_path)
if result:
return jsonify({"success": True})
else:
return jsonify({"error": "Помилка при генерації YML файлу"})
except Exception as e:
print(f"Error generating YML: {str(e)}")
return jsonify({"error": str(e)})
@app.route("/add-category", methods=["POST"])
def add_category():
"""Додавання нової категорії (локальної + portal_id)"""
try:
data = request.json
categories = load_categories()
# Перевірка обов'язкових полів
if "id" not in data or "name" not in data:
return jsonify({"error": "Обов'язкові поля: id, name"})
# Перевірка унікальності ID
if any(str(c["id"]) == str(data["id"]) for c in categories):
return jsonify({"error": "Категорія з таким ID вже існує"})
# Додаємо категорію з optional portal_id
new_category = {
"id": data["id"],
"name": data["name"],
}
if "portal_id" in data:
new_category["portal_id"] = data["portal_id"]
categories.append(new_category)
save_categories(categories)
return jsonify({"success": True})
except Exception as e:
return jsonify({"error": str(e)})
@app.route("/delete-category", methods=["POST"])
def delete_category():
"""Удаление категории"""
try:
data = request.json
categories = load_categories()
categories = [c for c in categories if c["id"] != data["id"]]
save_categories(categories)
return jsonify({"success": True})
except Exception as e:
return jsonify({"error": str(e)})
@app.route("/get-yml-files")
def get_yml_files():
"""Получение списка YML файлов"""
yml_files = []
if os.path.exists("output/yml"):
files = [f for f in os.listdir("output/yml") if f.endswith(".yml")]
yml_files = [get_file_info(f, "output/yml") for f in files]
return jsonify(yml_files)
@app.route("/get-translated-files")
def get_translated_files():
"""Получение списка переведенных файлов"""
translated_files = []
if os.path.exists("output/translated"):
files = [
f
for f in os.listdir("output/translated")
if f.endswith("_translated_products.json")
]
translated_files = [get_file_info(f, "output/translated") for f in files]
return jsonify(translated_files)
@app.route("/get-parsed-files")
def get_parsed_files():
"""Получение списа спарсенных файлов"""
parsed_files = []
if os.path.exists("output"):
files = [
f
for f in os.listdir("output")
if f.endswith("_products.json")
and not f.endswith("_translated_products.json")
]
parsed_files = [get_file_info(f, "output") for f in files]
return jsonify(parsed_files)
@app.errorhandler(404)
def not_found_error(error):
return jsonify({"error": "Файл не найден"}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({"error": "Внутренняя ошибка сервера"}), 500
@app.route("/get-files/<file_type>")
def get_files(file_type):
"""Получение списка файлов"""
files = []
if file_type == "parsed":
directory = "output"
pattern = lambda f: f.endswith("_products.json") and not f.endswith(
"_translated_products.json"
)
elif file_type == "translated":
directory = "output/translated"
pattern = lambda f: f.endswith("_translated_products.json")
elif file_type == "yml":
directory = "output/yml"
pattern = lambda f: f.endswith(".yml")
else:
return jsonify([])
if os.path.exists(directory):
files = [f for f in os.listdir(directory) if pattern(f)]
files = [get_file_info(f, directory) for f in files]
return jsonify(files)
# Добавляем роуты для отдачи изображений
@app.route("/images/products/<path:filename>")
def serve_product_image(filename):
"""Отдача изображений товаров"""
return send_from_directory("images/products", filename)
@app.route("/images/descriptions/<path:filename>")
def serve_description_image(filename):
"""Отдача изображений описаний"""
return send_from_directory("images/descriptions", filename)
# Добавляем функцию для получения полного URL изображения
def get_image_url(local_path: str) -> str:
"""Преобразует локальный путь в полный URL"""
if not local_path:
return None
return urljoin(BASE_URL, local_path)
def get_file_info(filename, directory):
"""Получение информации о файле"""
path = os.path.join(directory, filename)
stat = os.stat(path)
return {
"name": filename,
"modified": datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S"),
"size": f"{stat.st_size / 1024:.1f} KB",
}
if __name__ == "__main__":
# Создаем необходимые директории
for directory in ["output", "output/translated", "output/yml"]:
os.makedirs(directory, exist_ok=True)
# Создаем файл категорий, если его нет
if not os.path.exists(CATEGORIES_FILE):
save_categories([])
app.run(host='0.0.0.0', port=5000, debug=True)