Files
mario_scraper/web_interface.py

949 lines
32 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from flask import (
Flask,
render_template,
request,
jsonify,
send_from_directory,
redirect,
url_for,
)
from flask_login import (
LoginManager,
UserMixin,
login_user,
login_required,
logout_user,
current_user,
)
from werkzeug.security import generate_password_hash, check_password_hash
import re
from euro_scraper import create_retry_session, fetch_products, setup_selenium
import os
import threading
from datetime import datetime
import json
from translator import ProductTranslator
from feed_generator import RobotVacuumYMLGenerator
from pathlib import Path
from werkzeug.exceptions import NotFound
from urllib.parse import urljoin
from apscheduler.schedulers.background import BackgroundScheduler
import time
from config import BASE_URL
from utils import notify_telegram
import threading
from translation_queue import add_to_queue, translate_from_queue
# Добавляем в начало файла
login_manager = LoginManager()
login_manager.login_view = "login"
class User(UserMixin):
def __init__(self, id, username, password_hash):
self.id = id
self.username = username
self.password_hash = password_hash
# Хранилище пользователей (в реальном приложении использовать базу данных)
users = {"mario": User(1, "mario", generate_password_hash("2htC9YlEMXAhNE"))}
@login_manager.user_loader
def load_user(user_id):
for user in users.values():
if user.id == int(user_id):
return user
return None
app = Flask(__name__)
# Добавляем после создания app
login_manager.init_app(app)
app.config["SECRET_KEY"] = "your-secret-key-here" # Замените на случайный ключ
# Глобальные настройки
app_settings = {"items_limit": -1} # Ограничение количества обрабатываемых товаров
# Глобальная переменная для хранения статуса перевода
translation_status = {
"is_running": False,
"total_items": 0,
"processed_items": 0,
"error": None,
}
# Добавить в начало файла
CATEGORIES_FILE = "categories.json"
# Создаем константы для путей
OUTPUT_DIR = Path("output")
TRANSLATED_DIR = OUTPUT_DIR / "translated"
YML_DIR = OUTPUT_DIR / "yml"
# Глобальное состояние парсинга
parsing_status = {
"is_running": False,
"total_items": 0,
"processed_items": 0,
"error": None,
}
active_translations = set()
def load_categories():
"""Загрузка категорий из файла"""
if os.path.exists(CATEGORIES_FILE):
with open(CATEGORIES_FILE, "r", encoding="utf-8") as f:
return json.load(f)
return []
def save_categories(categories):
"""Сохранение категорий в файл"""
with open(CATEGORIES_FILE, "w", encoding="utf-8") as f:
json.dump(categories, f, ensure_ascii=False, indent=2)
def extract_category(url: str) -> str:
"""Извлекает название категории из URL"""
# Пример URL: https://www.euro.com.pl/odkurzacze-automatyczne.bhtml
match = re.search(r"euro\.com\.pl/([^/]+)", url)
if match:
category = match.group(1).replace(".bhtml", "")
return category
return None
# def start_parsing(category):
"""Запуск парсинга категории"""
global parsing_status
try:
parsing_status.update(
{"is_running": True, "total_items": 0, "processed_items": 0, "error": None}
)
# Создаем сессию и драйвер Selenium
session = create_retry_session()
driver = setup_selenium()
try:
# Парсим с использованием драйвера
products = fetch_products(category, session, driver, parsing_status)
# Сохраняем результаты
if products:
output_file = os.path.join("output", f"{category}_products.json")
with open(output_file, "w", encoding="utf-8") as f:
json.dump(products, f, ensure_ascii=False, indent=2)
finally:
# Обязательно закрываем драйвер
driver.quit()
except Exception as e:
parsing_status["error"] = str(e)
print(f"Error during parsing: {e}")
finally:
parsing_status["is_running"] = False
def start_parsing_with_status(category_url: str):
"""Універсальний запуск парсингу з оновленням статусу"""
category = extract_category(category_url)
parsing_status.update(
{
"is_running": True,
"total_items": 0,
"processed_items": 0,
"error": None,
"current_category": category,
}
)
try:
session = create_retry_session()
driver = setup_selenium()
try:
products = fetch_products(category, session, driver, parsing_status)
if products:
output_file = os.path.join("output", f"{category}_products.json")
with open(output_file, "w", encoding="utf-8") as f:
json.dump(products, f, ensure_ascii=False, indent=2)
# ⏩ Після успішного збереження — запустити переклад
threading.Thread(
target=start_translation, args=(f"{category}_products.json",)
).start()
finally:
driver.quit()
except Exception as e:
parsing_status["error"] = str(e)
print(f"❌ Error during parsing {category}: {e}")
finally:
parsing_status["is_running"] = False
def get_file_info(filename, directory="output"):
"""Получение информации о файле"""
filepath = os.path.join(directory, filename)
stat = os.stat(filepath)
return {
"name": filename,
"modified": datetime.fromtimestamp(stat.st_mtime).strftime("%d.%m.%Y %H:%M:%S"),
"size": f"{stat.st_size / 1024:.1f} KB",
}
def get_oldest_parsed_file():
"""Повертає найстаріший _products.json файл"""
folder = "output"
files = [
f
for f in os.listdir(folder)
if f.endswith("_products.json") and not f.endswith("_translated_products.json")
]
if not files:
return None
oldest_file = min(files, key=lambda f: os.path.getmtime(os.path.join(folder, f)))
category = oldest_file.replace("_products.json", "")
return category
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
user = users.get(username)
if user and check_password_hash(user.password_hash, password):
login_user(user)
return redirect(url_for("index"))
return render_template("login.html", error="Неверный логин или пароль")
return render_template("login.html")
@app.route("/logout")
@login_required
def logout():
logout_user()
return redirect(url_for("login"))
@app.route("/")
@login_required
def index():
"""Главная страница"""
# Получаем спарсенные файлы
parsed_files = []
if os.path.exists("output"):
files = [f for f in os.listdir("output") if f.endswith("_products.json")]
parsed_files = [get_file_info(f, "output") for f in files]
# Получаем переведенные файлы
translated_files = []
if os.path.exists("output/translated"):
files = [
f
for f in os.listdir("output/translated")
if f.endswith("_translated_products.json")
]
translated_files = [get_file_info(f, "output/translated") for f in files]
# Получаем YML файлы
yml_files = []
if os.path.exists("output/yml"):
files = [f for f in os.listdir("output/yml") if f.endswith(".yml")]
yml_files = [get_file_info(f, "output/yml") for f in files]
feed_file_info = None
feed_path = os.path.join("output/yml", "feed.yml")
if os.path.exists(feed_path):
feed_file_info = get_file_info("feed.yml", "output/yml")
# Загружаем категории
categories = load_categories()
return render_template(
"index.html",
status=parsing_status,
translation_status=translation_status,
parsed_files=parsed_files,
translated_files=translated_files,
yml_files=yml_files,
categories=categories,
app_settings=app_settings,
feed_file_info=feed_file_info,
)
@app.route("/manual-refresh-all", methods=["POST"])
def manual_refresh_all():
threading.Thread(target=refresh_all_categories_daily).start()
return jsonify({"message": "Оновлення всіх категорій запущено вручну"})
@app.route("/auto-refresh", methods=["POST"])
def auto_refresh():
try:
if parsing_status["is_running"]:
return jsonify({"error": "Парсинг вже запущений"})
categories = load_categories()
valid_categories = [c for c in categories if "url" in c and c["url"]]
if not valid_categories:
return jsonify({"error": "Немає категорій з URL"})
oldest = None
oldest_mtime = float("inf")
for cat in valid_categories:
category_url = cat["url"]
category_key = extract_category(category_url)
filename = f"{category_key}_products.json"
path = os.path.join("output", filename)
# ❗ Якщо файл не існує — одразу обираємо цю категорію
if not os.path.exists(path):
oldest = cat
break
mtime = os.path.getmtime(path)
if mtime < oldest_mtime:
oldest_mtime = mtime
oldest = cat
if not oldest:
return jsonify({"error": "Не вдалося знайти категорію для оновлення"})
# category = extract_category(oldest["url"])
thread = threading.Thread(
target=start_parsing_with_status, args=(oldest["url"],)
)
thread.start()
return jsonify({"success": True, "category": oldest["name"]})
except Exception as e:
return jsonify({"error": str(e)})
@app.route("/parse", methods=["POST"])
@login_required
def parse():
"""Обработчик запуска парсинга"""
url = request.form.get("url")
if not url:
return jsonify({"error": "URL не указан"})
category = extract_category(url)
if not category:
return jsonify({"error": "Неверный формат URL"})
if parsing_status["is_running"]:
return jsonify({"error": "Парсинг уже запущен"})
# Запускаем парсинг в отдельном потоке
thread = threading.Thread(target=start_parsing_with_status, args=(url,))
thread.start()
return jsonify({"status": "ok"})
@app.route("/status")
def get_status():
# Повертаємо додатково обчислений прогрес
progress = 0
if parsing_status["total_items"]:
progress = round(
(parsing_status["processed_items"] / parsing_status["total_items"]) * 100
)
return jsonify(
{
**parsing_status,
"current_product": parsing_status["processed_items"],
"total_products": parsing_status["total_items"],
"progress": progress,
}
)
@app.route("/download/<path:filename>")
def download_file(filename):
"""Скачивание файла с результатами"""
directory = request.args.get(
"directory", "output"
) # Получаем директорию из параметров запроса
if directory == "translated":
directory = "output/translated"
elif directory == "yml":
directory = "output/yml"
else:
directory = "output"
return send_from_directory(directory, filename, as_attachment=True)
@app.route("/delete/<path:filename>", methods=["POST"])
def delete_file(filename):
"""Удаление файла"""
try:
directory = request.args.get("directory", "output")
if directory == "translated":
file_path = os.path.join("output/translated", filename)
elif directory == "yml":
file_path = os.path.join("output/yml", filename)
else:
file_path = os.path.join("output", filename)
if os.path.exists(file_path):
os.remove(file_path)
return jsonify({"success": True})
else:
return jsonify({"error": "Файл не найден"}), 404
except Exception as e:
return jsonify({"error": str(e)}), 400
@app.route("/translate", methods=["POST"])
def translate():
"""Обработчик запуска перевода"""
if translation_status["is_running"]:
return jsonify({"error": "Перевод уже запущен"})
filename = request.form.get("filename")
if not filename:
return jsonify({"error": "Файл не выбран"})
# Запускаем перевод в отдельном потоке
thread = threading.Thread(target=start_translation, args=(filename,))
thread.start()
return jsonify({"status": "Перевод запущен"})
@app.route("/translation-status")
def get_translation_status():
"""Возвращает текущий статус перевода"""
return jsonify(translation_status)
def start_translation(filename: str):
global translation_status
print(f"[TRANSLATE] Починаємо переклад: {filename}")
if filename in active_translations:
print(f"[SKIP] Переклад вже йде: {filename}")
return
active_translations.add(filename)
translation_status["is_running"] = True
translation_status["processed_items"] = 0
translation_status["error"] = None
notify_telegram(f"[🚀] Початок перекладу: {filename}")
try:
os.makedirs("output/translated", exist_ok=True)
with open(os.path.join("output", filename), "r", encoding="utf-8") as f:
products = json.load(f)
# Витягуємо category_id з назви файлу
category_id = filename.split("_")[0]
categories = load_categories()
category = next((c for c in categories if str(c["id"]) == category_id), None)
# Якщо категорія знайдена — вставляємо ID в продукти
if category and "portal_id" in category:
for product in products:
product["portal_category_id"] = category["portal_id"]
product["local_category_id"] = category["id"]
translation_status["total_items"] = len(products)
# Создаем экземпляр переводчика
translator = ProductTranslator()
# Переводим товары
translated_products = []
for i, product in enumerate(products):
translated_product = translator.translate_product(product)
if translated_product is not None: # ✅ фільтрація
translated_products.append(translated_product)
translation_status["processed_items"] = i + 1
# Сохраняем переведенные данные в отдельную директорию
if translated_products:
output_filename = filename.replace(
"_products.json", "_translated_products.json"
)
with open(
os.path.join("output/translated", output_filename),
"w",
encoding="utf-8",
) as f:
json.dump(translated_products, f, ensure_ascii=False, indent=2)
print(f"[OK] Збережено переклад: {output_filename}")
notify_telegram(
f"[✅] Переклад завершено: {filename} ({len(translated_products)} товарів)"
)
else:
print(f"[SKIP] Жодного перекладеного товару: {filename}. Файл не створено.")
notify_telegram(f"[⚠️] Жодного товару не перекладено: {filename}")
except Exception as e:
translation_status["error"] = str(e)
notify_telegram(f"[❌] Помилка перекладу: {filename}\n{e}")
add_to_queue(filename)
raise e
finally:
translation_status["is_running"] = False
active_translations.discard(filename)
def refresh_all_categories_daily():
"""Оновлює всі категорії один раз (але не частіше ніж раз на добу кожна)"""
if parsing_status["is_running"]:
print("[SKIP] Парсинг уже запущено. Пропускаємо автоматичне оновлення.")
return
print("[START] Щоденне оновлення категорій...")
categories = load_categories()
for category in categories:
category_name = category.get("name")
category_url = category.get("url")
if not category_url:
print(f"[SKIP] Категорія '{category_name}' не має URL")
continue
category_key = extract_category(category_url)
filename = f"{category_key}_products.json"
filepath = os.path.join("output", filename)
# ⏱ Перевіряємо, коли файл востаннє оновлювався
if os.path.exists(filepath):
modified_time = os.path.getmtime(filepath)
age_seconds = time.time() - modified_time
if age_seconds < 86400:
print(f"[SKIP] {category_name} — оновлено менше доби тому")
continue
print(f"[PARSE] {category_name}")
try:
parsing_status["is_running"] = True
start_parsing_with_status(category_url)
except Exception as e:
print(f"[ERROR] Не вдалося оновити {category_name}: {e}")
finally:
parsing_status["is_running"] = False
time.sleep(5)
generate_full_yml()
print("[DONE] Автоматичне оновлення завершено")
def translate_all_parsed_once():
"""Розумне оновлення перекладу: неперекладені → найстаріші перекладені"""
print("[START] Розумне оновлення перекладу...")
parsed_folder = "output"
translated_folder = "output/translated"
parsed_files = {
f: os.path.getmtime(os.path.join(parsed_folder, f))
for f in os.listdir(parsed_folder)
if f.endswith("_products.json") and not f.endswith("_translated_products.json")
}
translated_files = {
f.replace("_translated_products.json", "_products.json"): os.path.getmtime(
os.path.join(translated_folder, f)
)
for f in os.listdir(translated_folder)
if f.endswith("_translated_products.json")
}
# Спочатку неперекладені
untranslated = [f for f in parsed_files if f not in translated_files]
# Потім найстаріші з перекладених
outdated_translations = sorted(
(f for f in parsed_files if f in translated_files),
key=lambda f: translated_files[f],
)
to_translate = untranslated + outdated_translations
for filename in to_translate:
try:
print(f"[TRANSLATE] {filename}")
start_translation(filename) # ⬅️ БЕЗ ПОТОКУ
time.sleep(1) # щоб трохи розвантажити
except Exception as e:
error_text = str(e).lower()
if (
"too many requests" in error_text
or "you are allowed to make" in error_text
or "5 requests per second" in error_text
):
notify_telegram(
f"[🛑] Переклад зупинено після помилки ліміту на файлі: {filename}.\n"
f"Повтор заплановано через 30 хвилин."
)
add_to_queue(filename)
threading.Timer(1800, translate_from_queue).start()
break
else:
notify_telegram(f"[⚠️] Помилка перекладу {filename}:\n{str(e)}")
print("[DONE] translate_all_parsed_once завершено.")
@app.route("/translate-from-queue", methods=["POST"])
def run_translate_from_queue():
threading.Thread(target=translate_from_queue, args=(start_translation,)).start()
return jsonify({"status": "Запущено переклад з черги"})
@app.route("/manual-translate-all", methods=["POST"])
@login_required
def manual_translate_all():
threading.Thread(target=translate_all_parsed_once).start()
return jsonify({"message": "Запущено переклад усіх категорій"})
@app.route("/generate-full-yml", methods=["POST"])
def generate_full_yml():
try:
translated_folder = "output/translated"
categories = load_categories()
all_products = []
for file in os.listdir(translated_folder):
if file.endswith("_translated_products.json"):
with open(
os.path.join(translated_folder, file), "r", encoding="utf-8"
) as f:
products = json.load(f)
if not products:
continue
# Витягуємо slug із назви файлу
slug = file.replace("_translated_products.json", "")
# Шукаємо відповідну категорію за slug у URL
category_match = next((c for c in categories if slug in c["url"]), None)
if not category_match:
print(f"[SKIP] {file} — не знайдено категорію для slug: {slug}")
continue
for product in products:
product["portal_category_id"] = category_match["portal_id"]
product["local_category_id"] = category_match["id"]
print(
f"[OK] {file} — додано категорії {category_match['id']}, {category_match['portal_id']}"
)
all_products.extend(products)
if not all_products:
return jsonify({"error": "Не знайдено товарів з валідними категоріями"})
generator = RobotVacuumYMLGenerator(
base_url=BASE_URL, categories_data=categories
)
# Додаємо тільки ті категорії, що реально використовуються
added_categories = set()
for product in all_products:
cid = str(product.get("local_category_id"))
if cid and cid not in added_categories:
match = next((c for c in categories if str(c["id"]) == cid), None)
if match:
generator.add_category(cid, match["name"])
added_categories.add(cid)
output_path = "output/yml/feed.yml"
os.makedirs("output/yml", exist_ok=True)
success = generator.generate_yml(all_products, output_path)
if success:
return jsonify({"success": True})
else:
return jsonify({"error": "Помилка при генерації повного YML"})
except Exception as e:
return jsonify({"error": str(e)})
@app.route("/yml/feed.yml")
def serve_feed():
return send_from_directory("output/yml", "feed.yml", mimetype="application/xml")
@app.route("/generate-yml", methods=["POST"])
def generate_yml():
"""Обработчик генерации YML файла"""
try:
data = request.get_json()
print(f"Received data: {data}")
filename = data.get("filename")
category_id = data.get("category_id")
if not filename or not category_id:
return jsonify({"error": "Не вказано файл або категорію"})
# Загружаем категории
categories = load_categories()
category = next(
(c for c in categories if str(c["id"]) == str(category_id)), None
)
if not category:
return jsonify({"error": "Категорія не знайдена"})
portal_category_id = category.get("portal_id")
if not portal_category_id:
return jsonify(
{
"error": "Категорія не має portal_id (ідентифікатор категорії Prom.ua)"
}
)
os.makedirs("output/yml", exist_ok=True)
# Читаем JSON файл с переведенными товарами
input_path = os.path.join("output/translated", filename)
if not os.path.exists(input_path):
return jsonify({"error": "Файл з товарами не знайдено"})
with open(input_path, "r", encoding="utf-8") as f:
products = json.load(f)
# Присваиваем portal_category_id всем товарам
for product in products:
product["portal_category_id"] = portal_category_id
product["local_category_id"] = category["id"]
# Создаем генератор YML с указанием базового URL
categories = load_categories()
generator = RobotVacuumYMLGenerator(
base_url=BASE_URL, categories_data=categories
)
generator.add_category(str(category["id"]), category["name"])
# Генерируем имя выходного файла
output_filename = filename.replace("_translated_products.json", ".yml")
output_path = os.path.join("output/yml", output_filename)
# Генерируем YML файл
result = generator.generate_yml(products, output_path)
if result:
return jsonify({"success": True})
else:
return jsonify({"error": "Помилка при генерації YML файлу"})
except Exception as e:
print(f"Error generating YML: {str(e)}")
return jsonify({"error": str(e)})
@app.route("/add-category", methods=["POST"])
def add_category():
"""Додавання нової категорії (локальної + portal_id)"""
try:
data = request.json
categories = load_categories()
# Перевірка наявності name
if "name" not in data:
return jsonify({"error": "Поле name є обов'язковим"})
# Генерація нового ID (максимальний + 1 або 1)
if categories:
new_id = max(int(c["id"]) for c in categories) + 1
else:
new_id = 1
new_category = {"id": str(new_id), "name": data["name"]}
if "portal_id" in data:
new_category["portal_id"] = data["portal_id"]
if "url" in data:
new_category["url"] = data["url"]
categories.append(new_category)
save_categories(categories)
return jsonify({"success": True})
except Exception as e:
return jsonify({"error": str(e)})
@app.route("/delete-category", methods=["POST"])
def delete_category():
"""Видалення категорії"""
try:
data = request.json
target_id = str(data["id"])
categories = load_categories()
categories = [c for c in categories if str(c["id"]) != target_id]
save_categories(categories)
return jsonify({"success": True})
except Exception as e:
return jsonify({"error": str(e)})
@app.route("/get-yml-files")
def get_yml_files():
"""Получение списка YML файлов"""
yml_files = []
if os.path.exists("output/yml"):
files = [f for f in os.listdir("output/yml") if f.endswith(".yml")]
yml_files = [get_file_info(f, "output/yml") for f in files]
return jsonify(yml_files)
@app.route("/get-translated-files")
def get_translated_files():
"""Получение списка переведенных файлов"""
translated_files = []
if os.path.exists("output/translated"):
files = [
f
for f in os.listdir("output/translated")
if f.endswith("_translated_products.json")
]
translated_files = [get_file_info(f, "output/translated") for f in files]
return jsonify(translated_files)
@app.route("/get-parsed-files")
def get_parsed_files():
"""Получение списа спарсенных файлов"""
parsed_files = []
if os.path.exists("output"):
files = [
f
for f in os.listdir("output")
if f.endswith("_products.json")
and not f.endswith("_translated_products.json")
]
parsed_files = [get_file_info(f, "output") for f in files]
return jsonify(parsed_files)
@app.errorhandler(404)
def not_found_error(error):
return jsonify({"error": "Файл не найден"}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({"error": "Внутренняя ошибка сервера"}), 500
@app.route("/get-files/<file_type>")
def get_files(file_type):
"""Получение списка файлов"""
files = []
if file_type == "parsed":
directory = "output"
pattern = lambda f: f.endswith("_products.json") and not f.endswith(
"_translated_products.json"
)
elif file_type == "translated":
directory = "output/translated"
pattern = lambda f: f.endswith("_translated_products.json")
elif file_type == "yml":
directory = "output/yml"
pattern = lambda f: f.endswith(".yml")
else:
return jsonify([])
if os.path.exists(directory):
files = [f for f in os.listdir(directory) if pattern(f)]
files = [get_file_info(f, directory) for f in files]
return jsonify(files)
# Добавляем роуты для отдачи изображений
@app.route("/images/products/<path:filename>")
def serve_product_image(filename):
"""Отдача изображений товаров"""
return send_from_directory("images/products", filename)
@app.route("/images/descriptions/<path:filename>")
def serve_description_image(filename):
"""Отдача изображений описаний"""
return send_from_directory("images/descriptions", filename)
# Добавляем функцию для получения полного URL изображения
def get_image_url(local_path: str) -> str:
"""Преобразует локальный путь в полный URL"""
if not local_path:
return None
return urljoin(BASE_URL, local_path)
def get_file_info(filename, directory):
"""Получение информации о файле"""
path = os.path.join(directory, filename)
stat = os.stat(path)
return {
"name": filename,
"modified": datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S"),
"size": f"{stat.st_size / 1024:.1f} KB",
}
if __name__ == "__main__":
for directory in ["output", "output/translated", "output/yml"]:
os.makedirs(directory, exist_ok=True)
if not os.path.exists(CATEGORIES_FILE):
save_categories([])
# Запуск планувальника
scheduler = BackgroundScheduler()
scheduler.add_job(refresh_all_categories_daily, "interval", days=1)
scheduler.start()
app.run(host="0.0.0.0", port=5000, debug=True)