949 lines
32 KiB
Python
949 lines
32 KiB
Python
from flask import (
|
||
Flask,
|
||
render_template,
|
||
request,
|
||
jsonify,
|
||
send_from_directory,
|
||
redirect,
|
||
url_for,
|
||
)
|
||
from flask_login import (
|
||
LoginManager,
|
||
UserMixin,
|
||
login_user,
|
||
login_required,
|
||
logout_user,
|
||
current_user,
|
||
)
|
||
from werkzeug.security import generate_password_hash, check_password_hash
|
||
import re
|
||
from euro_scraper import create_retry_session, fetch_products, setup_selenium
|
||
import os
|
||
import threading
|
||
from datetime import datetime
|
||
import json
|
||
from translator import ProductTranslator
|
||
from feed_generator import RobotVacuumYMLGenerator
|
||
from pathlib import Path
|
||
from werkzeug.exceptions import NotFound
|
||
from urllib.parse import urljoin
|
||
from apscheduler.schedulers.background import BackgroundScheduler
|
||
import time
|
||
from config import BASE_URL
|
||
from utils import notify_telegram
|
||
import threading
|
||
from translation_queue import add_to_queue, translate_from_queue
|
||
|
||
# Добавляем в начало файла
|
||
login_manager = LoginManager()
|
||
login_manager.login_view = "login"
|
||
|
||
|
||
class User(UserMixin):
|
||
def __init__(self, id, username, password_hash):
|
||
self.id = id
|
||
self.username = username
|
||
self.password_hash = password_hash
|
||
|
||
|
||
# Хранилище пользователей (в реальном приложении использовать базу данных)
|
||
users = {"mario": User(1, "mario", generate_password_hash("2htC9YlEMXAhNE"))}
|
||
|
||
|
||
@login_manager.user_loader
|
||
def load_user(user_id):
|
||
for user in users.values():
|
||
if user.id == int(user_id):
|
||
return user
|
||
return None
|
||
|
||
|
||
app = Flask(__name__)
|
||
|
||
# Добавляем после создания app
|
||
login_manager.init_app(app)
|
||
app.config["SECRET_KEY"] = "your-secret-key-here" # Замените на случайный ключ
|
||
|
||
# Глобальные настройки
|
||
app_settings = {"items_limit": -1} # Ограничение количества обрабатываемых товаров
|
||
|
||
# Глобальная переменная для хранения статуса перевода
|
||
translation_status = {
|
||
"is_running": False,
|
||
"total_items": 0,
|
||
"processed_items": 0,
|
||
"error": None,
|
||
}
|
||
|
||
# Добавить в начало файла
|
||
CATEGORIES_FILE = "categories.json"
|
||
|
||
# Создаем константы для путей
|
||
OUTPUT_DIR = Path("output")
|
||
TRANSLATED_DIR = OUTPUT_DIR / "translated"
|
||
YML_DIR = OUTPUT_DIR / "yml"
|
||
|
||
# Глобальное состояние парсинга
|
||
parsing_status = {
|
||
"is_running": False,
|
||
"total_items": 0,
|
||
"processed_items": 0,
|
||
"error": None,
|
||
}
|
||
active_translations = set()
|
||
|
||
|
||
def load_categories():
|
||
"""Загрузка категорий из файла"""
|
||
if os.path.exists(CATEGORIES_FILE):
|
||
with open(CATEGORIES_FILE, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
return []
|
||
|
||
|
||
def save_categories(categories):
|
||
"""Сохранение категорий в файл"""
|
||
with open(CATEGORIES_FILE, "w", encoding="utf-8") as f:
|
||
json.dump(categories, f, ensure_ascii=False, indent=2)
|
||
|
||
|
||
def extract_category(url: str) -> str:
|
||
"""Извлекает название категории из URL"""
|
||
# Пример URL: https://www.euro.com.pl/odkurzacze-automatyczne.bhtml
|
||
match = re.search(r"euro\.com\.pl/([^/]+)", url)
|
||
if match:
|
||
category = match.group(1).replace(".bhtml", "")
|
||
return category
|
||
return None
|
||
|
||
# def start_parsing(category):
|
||
"""Запуск парсинга категории"""
|
||
global parsing_status
|
||
|
||
try:
|
||
parsing_status.update(
|
||
{"is_running": True, "total_items": 0, "processed_items": 0, "error": None}
|
||
)
|
||
|
||
# Создаем сессию и драйвер Selenium
|
||
session = create_retry_session()
|
||
driver = setup_selenium()
|
||
|
||
try:
|
||
# Парсим с использованием драйвера
|
||
products = fetch_products(category, session, driver, parsing_status)
|
||
|
||
# Сохраняем результаты
|
||
if products:
|
||
output_file = os.path.join("output", f"{category}_products.json")
|
||
with open(output_file, "w", encoding="utf-8") as f:
|
||
json.dump(products, f, ensure_ascii=False, indent=2)
|
||
|
||
finally:
|
||
# Обязательно закрываем драйвер
|
||
driver.quit()
|
||
|
||
except Exception as e:
|
||
parsing_status["error"] = str(e)
|
||
print(f"Error during parsing: {e}")
|
||
finally:
|
||
parsing_status["is_running"] = False
|
||
|
||
|
||
def start_parsing_with_status(category_url: str):
|
||
"""Універсальний запуск парсингу з оновленням статусу"""
|
||
category = extract_category(category_url)
|
||
|
||
parsing_status.update(
|
||
{
|
||
"is_running": True,
|
||
"total_items": 0,
|
||
"processed_items": 0,
|
||
"error": None,
|
||
"current_category": category,
|
||
}
|
||
)
|
||
|
||
try:
|
||
session = create_retry_session()
|
||
driver = setup_selenium()
|
||
|
||
try:
|
||
products = fetch_products(category, session, driver, parsing_status)
|
||
|
||
if products:
|
||
output_file = os.path.join("output", f"{category}_products.json")
|
||
with open(output_file, "w", encoding="utf-8") as f:
|
||
json.dump(products, f, ensure_ascii=False, indent=2)
|
||
|
||
# ⏩ Після успішного збереження — запустити переклад
|
||
threading.Thread(
|
||
target=start_translation, args=(f"{category}_products.json",)
|
||
).start()
|
||
|
||
finally:
|
||
driver.quit()
|
||
|
||
except Exception as e:
|
||
parsing_status["error"] = str(e)
|
||
print(f"❌ Error during parsing {category}: {e}")
|
||
|
||
finally:
|
||
parsing_status["is_running"] = False
|
||
|
||
|
||
def get_file_info(filename, directory="output"):
|
||
"""Получение информации о файле"""
|
||
filepath = os.path.join(directory, filename)
|
||
stat = os.stat(filepath)
|
||
return {
|
||
"name": filename,
|
||
"modified": datetime.fromtimestamp(stat.st_mtime).strftime("%d.%m.%Y %H:%M:%S"),
|
||
"size": f"{stat.st_size / 1024:.1f} KB",
|
||
}
|
||
|
||
|
||
def get_oldest_parsed_file():
|
||
"""Повертає найстаріший _products.json файл"""
|
||
folder = "output"
|
||
files = [
|
||
f
|
||
for f in os.listdir(folder)
|
||
if f.endswith("_products.json") and not f.endswith("_translated_products.json")
|
||
]
|
||
if not files:
|
||
return None
|
||
|
||
oldest_file = min(files, key=lambda f: os.path.getmtime(os.path.join(folder, f)))
|
||
category = oldest_file.replace("_products.json", "")
|
||
return category
|
||
|
||
|
||
@app.route("/login", methods=["GET", "POST"])
|
||
def login():
|
||
if request.method == "POST":
|
||
username = request.form.get("username")
|
||
password = request.form.get("password")
|
||
|
||
user = users.get(username)
|
||
if user and check_password_hash(user.password_hash, password):
|
||
login_user(user)
|
||
return redirect(url_for("index"))
|
||
|
||
return render_template("login.html", error="Неверный логин или пароль")
|
||
|
||
return render_template("login.html")
|
||
|
||
|
||
@app.route("/logout")
|
||
@login_required
|
||
def logout():
|
||
logout_user()
|
||
return redirect(url_for("login"))
|
||
|
||
|
||
@app.route("/")
|
||
@login_required
|
||
def index():
|
||
"""Главная страница"""
|
||
# Получаем спарсенные файлы
|
||
parsed_files = []
|
||
if os.path.exists("output"):
|
||
files = [f for f in os.listdir("output") if f.endswith("_products.json")]
|
||
parsed_files = [get_file_info(f, "output") for f in files]
|
||
|
||
# Получаем переведенные файлы
|
||
translated_files = []
|
||
if os.path.exists("output/translated"):
|
||
files = [
|
||
f
|
||
for f in os.listdir("output/translated")
|
||
if f.endswith("_translated_products.json")
|
||
]
|
||
translated_files = [get_file_info(f, "output/translated") for f in files]
|
||
|
||
# Получаем YML файлы
|
||
yml_files = []
|
||
if os.path.exists("output/yml"):
|
||
files = [f for f in os.listdir("output/yml") if f.endswith(".yml")]
|
||
yml_files = [get_file_info(f, "output/yml") for f in files]
|
||
|
||
feed_file_info = None
|
||
feed_path = os.path.join("output/yml", "feed.yml")
|
||
if os.path.exists(feed_path):
|
||
feed_file_info = get_file_info("feed.yml", "output/yml")
|
||
|
||
# Загружаем категории
|
||
categories = load_categories()
|
||
|
||
return render_template(
|
||
"index.html",
|
||
status=parsing_status,
|
||
translation_status=translation_status,
|
||
parsed_files=parsed_files,
|
||
translated_files=translated_files,
|
||
yml_files=yml_files,
|
||
categories=categories,
|
||
app_settings=app_settings,
|
||
feed_file_info=feed_file_info,
|
||
)
|
||
|
||
|
||
@app.route("/manual-refresh-all", methods=["POST"])
|
||
def manual_refresh_all():
|
||
threading.Thread(target=refresh_all_categories_daily).start()
|
||
return jsonify({"message": "Оновлення всіх категорій запущено вручну"})
|
||
|
||
|
||
@app.route("/auto-refresh", methods=["POST"])
|
||
def auto_refresh():
|
||
try:
|
||
if parsing_status["is_running"]:
|
||
return jsonify({"error": "Парсинг вже запущений"})
|
||
|
||
categories = load_categories()
|
||
|
||
valid_categories = [c for c in categories if "url" in c and c["url"]]
|
||
if not valid_categories:
|
||
return jsonify({"error": "Немає категорій з URL"})
|
||
|
||
oldest = None
|
||
oldest_mtime = float("inf")
|
||
|
||
for cat in valid_categories:
|
||
category_url = cat["url"]
|
||
category_key = extract_category(category_url)
|
||
filename = f"{category_key}_products.json"
|
||
path = os.path.join("output", filename)
|
||
|
||
# ❗ Якщо файл не існує — одразу обираємо цю категорію
|
||
if not os.path.exists(path):
|
||
oldest = cat
|
||
break
|
||
|
||
mtime = os.path.getmtime(path)
|
||
if mtime < oldest_mtime:
|
||
oldest_mtime = mtime
|
||
oldest = cat
|
||
|
||
if not oldest:
|
||
return jsonify({"error": "Не вдалося знайти категорію для оновлення"})
|
||
|
||
# category = extract_category(oldest["url"])
|
||
thread = threading.Thread(
|
||
target=start_parsing_with_status, args=(oldest["url"],)
|
||
)
|
||
thread.start()
|
||
|
||
return jsonify({"success": True, "category": oldest["name"]})
|
||
|
||
except Exception as e:
|
||
return jsonify({"error": str(e)})
|
||
|
||
|
||
@app.route("/parse", methods=["POST"])
|
||
@login_required
|
||
def parse():
|
||
"""Обработчик запуска парсинга"""
|
||
url = request.form.get("url")
|
||
if not url:
|
||
return jsonify({"error": "URL не указан"})
|
||
|
||
category = extract_category(url)
|
||
if not category:
|
||
return jsonify({"error": "Неверный формат URL"})
|
||
|
||
if parsing_status["is_running"]:
|
||
return jsonify({"error": "Парсинг уже запущен"})
|
||
|
||
# Запускаем парсинг в отдельном потоке
|
||
thread = threading.Thread(target=start_parsing_with_status, args=(url,))
|
||
thread.start()
|
||
|
||
return jsonify({"status": "ok"})
|
||
|
||
|
||
@app.route("/status")
|
||
def get_status():
|
||
# Повертаємо додатково обчислений прогрес
|
||
progress = 0
|
||
if parsing_status["total_items"]:
|
||
progress = round(
|
||
(parsing_status["processed_items"] / parsing_status["total_items"]) * 100
|
||
)
|
||
|
||
return jsonify(
|
||
{
|
||
**parsing_status,
|
||
"current_product": parsing_status["processed_items"],
|
||
"total_products": parsing_status["total_items"],
|
||
"progress": progress,
|
||
}
|
||
)
|
||
|
||
|
||
@app.route("/download/<path:filename>")
|
||
def download_file(filename):
|
||
"""Скачивание файла с результатами"""
|
||
directory = request.args.get(
|
||
"directory", "output"
|
||
) # Получаем директорию из параметров запроса
|
||
|
||
if directory == "translated":
|
||
directory = "output/translated"
|
||
elif directory == "yml":
|
||
directory = "output/yml"
|
||
else:
|
||
directory = "output"
|
||
|
||
return send_from_directory(directory, filename, as_attachment=True)
|
||
|
||
|
||
@app.route("/delete/<path:filename>", methods=["POST"])
|
||
def delete_file(filename):
|
||
"""Удаление файла"""
|
||
try:
|
||
directory = request.args.get("directory", "output")
|
||
|
||
if directory == "translated":
|
||
file_path = os.path.join("output/translated", filename)
|
||
elif directory == "yml":
|
||
file_path = os.path.join("output/yml", filename)
|
||
else:
|
||
file_path = os.path.join("output", filename)
|
||
|
||
if os.path.exists(file_path):
|
||
os.remove(file_path)
|
||
return jsonify({"success": True})
|
||
else:
|
||
return jsonify({"error": "Файл не найден"}), 404
|
||
|
||
except Exception as e:
|
||
return jsonify({"error": str(e)}), 400
|
||
|
||
|
||
@app.route("/translate", methods=["POST"])
|
||
def translate():
|
||
"""Обработчик запуска перевода"""
|
||
if translation_status["is_running"]:
|
||
return jsonify({"error": "Перевод уже запущен"})
|
||
|
||
filename = request.form.get("filename")
|
||
if not filename:
|
||
return jsonify({"error": "Файл не выбран"})
|
||
|
||
# Запускаем перевод в отдельном потоке
|
||
thread = threading.Thread(target=start_translation, args=(filename,))
|
||
thread.start()
|
||
|
||
return jsonify({"status": "Перевод запущен"})
|
||
|
||
|
||
@app.route("/translation-status")
|
||
def get_translation_status():
|
||
"""Возвращает текущий статус перевода"""
|
||
return jsonify(translation_status)
|
||
|
||
|
||
def start_translation(filename: str):
|
||
global translation_status
|
||
|
||
print(f"[TRANSLATE] Починаємо переклад: {filename}")
|
||
|
||
if filename in active_translations:
|
||
print(f"[SKIP] Переклад вже йде: {filename}")
|
||
return
|
||
|
||
active_translations.add(filename)
|
||
|
||
translation_status["is_running"] = True
|
||
translation_status["processed_items"] = 0
|
||
translation_status["error"] = None
|
||
|
||
notify_telegram(f"[🚀] Початок перекладу: {filename}")
|
||
|
||
try:
|
||
os.makedirs("output/translated", exist_ok=True)
|
||
|
||
with open(os.path.join("output", filename), "r", encoding="utf-8") as f:
|
||
products = json.load(f)
|
||
|
||
# Витягуємо category_id з назви файлу
|
||
category_id = filename.split("_")[0]
|
||
categories = load_categories()
|
||
category = next((c for c in categories if str(c["id"]) == category_id), None)
|
||
|
||
# Якщо категорія знайдена — вставляємо ID в продукти
|
||
if category and "portal_id" in category:
|
||
for product in products:
|
||
product["portal_category_id"] = category["portal_id"]
|
||
product["local_category_id"] = category["id"]
|
||
|
||
translation_status["total_items"] = len(products)
|
||
|
||
# Создаем экземпляр переводчика
|
||
translator = ProductTranslator()
|
||
|
||
# Переводим товары
|
||
translated_products = []
|
||
for i, product in enumerate(products):
|
||
translated_product = translator.translate_product(product)
|
||
if translated_product is not None: # ✅ фільтрація
|
||
translated_products.append(translated_product)
|
||
translation_status["processed_items"] = i + 1
|
||
|
||
# Сохраняем переведенные данные в отдельную директорию
|
||
if translated_products:
|
||
output_filename = filename.replace(
|
||
"_products.json", "_translated_products.json"
|
||
)
|
||
with open(
|
||
os.path.join("output/translated", output_filename),
|
||
"w",
|
||
encoding="utf-8",
|
||
) as f:
|
||
json.dump(translated_products, f, ensure_ascii=False, indent=2)
|
||
print(f"[OK] Збережено переклад: {output_filename}")
|
||
notify_telegram(
|
||
f"[✅] Переклад завершено: {filename} ({len(translated_products)} товарів)"
|
||
)
|
||
else:
|
||
print(f"[SKIP] Жодного перекладеного товару: {filename}. Файл не створено.")
|
||
notify_telegram(f"[⚠️] Жодного товару не перекладено: {filename}")
|
||
|
||
except Exception as e:
|
||
translation_status["error"] = str(e)
|
||
notify_telegram(f"[❌] Помилка перекладу: {filename}\n{e}")
|
||
add_to_queue(filename)
|
||
|
||
raise e
|
||
|
||
finally:
|
||
translation_status["is_running"] = False
|
||
active_translations.discard(filename)
|
||
|
||
|
||
def refresh_all_categories_daily():
|
||
"""Оновлює всі категорії один раз (але не частіше ніж раз на добу кожна)"""
|
||
if parsing_status["is_running"]:
|
||
print("[SKIP] Парсинг уже запущено. Пропускаємо автоматичне оновлення.")
|
||
return
|
||
|
||
print("[START] Щоденне оновлення категорій...")
|
||
categories = load_categories()
|
||
|
||
for category in categories:
|
||
category_name = category.get("name")
|
||
category_url = category.get("url")
|
||
|
||
if not category_url:
|
||
print(f"[SKIP] Категорія '{category_name}' не має URL")
|
||
continue
|
||
|
||
category_key = extract_category(category_url)
|
||
filename = f"{category_key}_products.json"
|
||
filepath = os.path.join("output", filename)
|
||
|
||
# ⏱ Перевіряємо, коли файл востаннє оновлювався
|
||
if os.path.exists(filepath):
|
||
modified_time = os.path.getmtime(filepath)
|
||
age_seconds = time.time() - modified_time
|
||
if age_seconds < 86400:
|
||
print(f"[SKIP] {category_name} — оновлено менше доби тому")
|
||
continue
|
||
|
||
print(f"[PARSE] {category_name}")
|
||
try:
|
||
parsing_status["is_running"] = True
|
||
start_parsing_with_status(category_url)
|
||
except Exception as e:
|
||
print(f"[ERROR] Не вдалося оновити {category_name}: {e}")
|
||
finally:
|
||
parsing_status["is_running"] = False
|
||
time.sleep(5)
|
||
|
||
generate_full_yml()
|
||
print("[DONE] Автоматичне оновлення завершено")
|
||
|
||
|
||
def translate_all_parsed_once():
|
||
"""Розумне оновлення перекладу: неперекладені → найстаріші перекладені"""
|
||
print("[START] Розумне оновлення перекладу...")
|
||
|
||
parsed_folder = "output"
|
||
translated_folder = "output/translated"
|
||
|
||
parsed_files = {
|
||
f: os.path.getmtime(os.path.join(parsed_folder, f))
|
||
for f in os.listdir(parsed_folder)
|
||
if f.endswith("_products.json") and not f.endswith("_translated_products.json")
|
||
}
|
||
|
||
translated_files = {
|
||
f.replace("_translated_products.json", "_products.json"): os.path.getmtime(
|
||
os.path.join(translated_folder, f)
|
||
)
|
||
for f in os.listdir(translated_folder)
|
||
if f.endswith("_translated_products.json")
|
||
}
|
||
|
||
# Спочатку неперекладені
|
||
untranslated = [f for f in parsed_files if f not in translated_files]
|
||
# Потім найстаріші з перекладених
|
||
outdated_translations = sorted(
|
||
(f for f in parsed_files if f in translated_files),
|
||
key=lambda f: translated_files[f],
|
||
)
|
||
|
||
to_translate = untranslated + outdated_translations
|
||
|
||
for filename in to_translate:
|
||
try:
|
||
print(f"[TRANSLATE] {filename}")
|
||
start_translation(filename) # ⬅️ БЕЗ ПОТОКУ
|
||
time.sleep(1) # щоб трохи розвантажити
|
||
except Exception as e:
|
||
error_text = str(e).lower()
|
||
if (
|
||
"too many requests" in error_text
|
||
or "you are allowed to make" in error_text
|
||
or "5 requests per second" in error_text
|
||
):
|
||
notify_telegram(
|
||
f"[🛑] Переклад зупинено після помилки ліміту на файлі: {filename}.\n"
|
||
f"Повтор заплановано через 30 хвилин."
|
||
)
|
||
add_to_queue(filename)
|
||
threading.Timer(1800, translate_from_queue).start()
|
||
|
||
break
|
||
else:
|
||
notify_telegram(f"[⚠️] Помилка перекладу {filename}:\n{str(e)}")
|
||
|
||
print("[DONE] translate_all_parsed_once завершено.")
|
||
|
||
|
||
@app.route("/translate-from-queue", methods=["POST"])
|
||
def run_translate_from_queue():
|
||
threading.Thread(target=translate_from_queue, args=(start_translation,)).start()
|
||
return jsonify({"status": "Запущено переклад з черги"})
|
||
|
||
|
||
@app.route("/manual-translate-all", methods=["POST"])
|
||
@login_required
|
||
def manual_translate_all():
|
||
threading.Thread(target=translate_all_parsed_once).start()
|
||
return jsonify({"message": "Запущено переклад усіх категорій"})
|
||
|
||
|
||
@app.route("/generate-full-yml", methods=["POST"])
|
||
def generate_full_yml():
|
||
try:
|
||
translated_folder = "output/translated"
|
||
categories = load_categories()
|
||
all_products = []
|
||
|
||
for file in os.listdir(translated_folder):
|
||
if file.endswith("_translated_products.json"):
|
||
with open(
|
||
os.path.join(translated_folder, file), "r", encoding="utf-8"
|
||
) as f:
|
||
products = json.load(f)
|
||
|
||
if not products:
|
||
continue
|
||
|
||
# Витягуємо slug із назви файлу
|
||
slug = file.replace("_translated_products.json", "")
|
||
|
||
# Шукаємо відповідну категорію за slug у URL
|
||
category_match = next((c for c in categories if slug in c["url"]), None)
|
||
|
||
if not category_match:
|
||
print(f"[SKIP] {file} — не знайдено категорію для slug: {slug}")
|
||
continue
|
||
|
||
for product in products:
|
||
product["portal_category_id"] = category_match["portal_id"]
|
||
product["local_category_id"] = category_match["id"]
|
||
|
||
print(
|
||
f"[OK] {file} — додано категорії {category_match['id']}, {category_match['portal_id']}"
|
||
)
|
||
all_products.extend(products)
|
||
|
||
if not all_products:
|
||
return jsonify({"error": "Не знайдено товарів з валідними категоріями"})
|
||
|
||
generator = RobotVacuumYMLGenerator(
|
||
base_url=BASE_URL, categories_data=categories
|
||
)
|
||
|
||
# Додаємо тільки ті категорії, що реально використовуються
|
||
added_categories = set()
|
||
for product in all_products:
|
||
cid = str(product.get("local_category_id"))
|
||
if cid and cid not in added_categories:
|
||
match = next((c for c in categories if str(c["id"]) == cid), None)
|
||
if match:
|
||
generator.add_category(cid, match["name"])
|
||
added_categories.add(cid)
|
||
|
||
output_path = "output/yml/feed.yml"
|
||
os.makedirs("output/yml", exist_ok=True)
|
||
success = generator.generate_yml(all_products, output_path)
|
||
|
||
if success:
|
||
return jsonify({"success": True})
|
||
else:
|
||
return jsonify({"error": "Помилка при генерації повного YML"})
|
||
|
||
except Exception as e:
|
||
return jsonify({"error": str(e)})
|
||
|
||
|
||
@app.route("/yml/feed.yml")
|
||
def serve_feed():
|
||
return send_from_directory("output/yml", "feed.yml", mimetype="application/xml")
|
||
|
||
|
||
@app.route("/generate-yml", methods=["POST"])
|
||
def generate_yml():
|
||
"""Обработчик генерации YML файла"""
|
||
try:
|
||
data = request.get_json()
|
||
print(f"Received data: {data}")
|
||
|
||
filename = data.get("filename")
|
||
category_id = data.get("category_id")
|
||
|
||
if not filename or not category_id:
|
||
return jsonify({"error": "Не вказано файл або категорію"})
|
||
|
||
# Загружаем категории
|
||
categories = load_categories()
|
||
category = next(
|
||
(c for c in categories if str(c["id"]) == str(category_id)), None
|
||
)
|
||
|
||
if not category:
|
||
return jsonify({"error": "Категорія не знайдена"})
|
||
|
||
portal_category_id = category.get("portal_id")
|
||
if not portal_category_id:
|
||
return jsonify(
|
||
{
|
||
"error": "Категорія не має portal_id (ідентифікатор категорії Prom.ua)"
|
||
}
|
||
)
|
||
|
||
os.makedirs("output/yml", exist_ok=True)
|
||
|
||
# Читаем JSON файл с переведенными товарами
|
||
input_path = os.path.join("output/translated", filename)
|
||
if not os.path.exists(input_path):
|
||
return jsonify({"error": "Файл з товарами не знайдено"})
|
||
|
||
with open(input_path, "r", encoding="utf-8") as f:
|
||
products = json.load(f)
|
||
|
||
# Присваиваем portal_category_id всем товарам
|
||
for product in products:
|
||
product["portal_category_id"] = portal_category_id
|
||
product["local_category_id"] = category["id"]
|
||
|
||
# Создаем генератор YML с указанием базового URL
|
||
categories = load_categories()
|
||
generator = RobotVacuumYMLGenerator(
|
||
base_url=BASE_URL, categories_data=categories
|
||
)
|
||
generator.add_category(str(category["id"]), category["name"])
|
||
|
||
# Генерируем имя выходного файла
|
||
output_filename = filename.replace("_translated_products.json", ".yml")
|
||
output_path = os.path.join("output/yml", output_filename)
|
||
|
||
# Генерируем YML файл
|
||
result = generator.generate_yml(products, output_path)
|
||
|
||
if result:
|
||
return jsonify({"success": True})
|
||
else:
|
||
return jsonify({"error": "Помилка при генерації YML файлу"})
|
||
|
||
except Exception as e:
|
||
print(f"Error generating YML: {str(e)}")
|
||
return jsonify({"error": str(e)})
|
||
|
||
|
||
@app.route("/add-category", methods=["POST"])
|
||
def add_category():
|
||
"""Додавання нової категорії (локальної + portal_id)"""
|
||
try:
|
||
data = request.json
|
||
categories = load_categories()
|
||
|
||
# Перевірка наявності name
|
||
if "name" not in data:
|
||
return jsonify({"error": "Поле name є обов'язковим"})
|
||
|
||
# Генерація нового ID (максимальний + 1 або 1)
|
||
if categories:
|
||
new_id = max(int(c["id"]) for c in categories) + 1
|
||
else:
|
||
new_id = 1
|
||
|
||
new_category = {"id": str(new_id), "name": data["name"]}
|
||
|
||
if "portal_id" in data:
|
||
new_category["portal_id"] = data["portal_id"]
|
||
|
||
if "url" in data:
|
||
new_category["url"] = data["url"]
|
||
|
||
categories.append(new_category)
|
||
save_categories(categories)
|
||
|
||
return jsonify({"success": True})
|
||
|
||
except Exception as e:
|
||
return jsonify({"error": str(e)})
|
||
|
||
|
||
@app.route("/delete-category", methods=["POST"])
|
||
def delete_category():
|
||
"""Видалення категорії"""
|
||
try:
|
||
data = request.json
|
||
target_id = str(data["id"])
|
||
|
||
categories = load_categories()
|
||
categories = [c for c in categories if str(c["id"]) != target_id]
|
||
save_categories(categories)
|
||
return jsonify({"success": True})
|
||
|
||
except Exception as e:
|
||
return jsonify({"error": str(e)})
|
||
|
||
|
||
@app.route("/get-yml-files")
|
||
def get_yml_files():
|
||
"""Получение списка YML файлов"""
|
||
yml_files = []
|
||
if os.path.exists("output/yml"):
|
||
files = [f for f in os.listdir("output/yml") if f.endswith(".yml")]
|
||
yml_files = [get_file_info(f, "output/yml") for f in files]
|
||
return jsonify(yml_files)
|
||
|
||
|
||
@app.route("/get-translated-files")
|
||
def get_translated_files():
|
||
"""Получение списка переведенных файлов"""
|
||
translated_files = []
|
||
if os.path.exists("output/translated"):
|
||
files = [
|
||
f
|
||
for f in os.listdir("output/translated")
|
||
if f.endswith("_translated_products.json")
|
||
]
|
||
translated_files = [get_file_info(f, "output/translated") for f in files]
|
||
return jsonify(translated_files)
|
||
|
||
|
||
@app.route("/get-parsed-files")
|
||
def get_parsed_files():
|
||
"""Получение списа спарсенных файлов"""
|
||
parsed_files = []
|
||
if os.path.exists("output"):
|
||
files = [
|
||
f
|
||
for f in os.listdir("output")
|
||
if f.endswith("_products.json")
|
||
and not f.endswith("_translated_products.json")
|
||
]
|
||
parsed_files = [get_file_info(f, "output") for f in files]
|
||
return jsonify(parsed_files)
|
||
|
||
|
||
@app.errorhandler(404)
|
||
def not_found_error(error):
|
||
return jsonify({"error": "Файл не найден"}), 404
|
||
|
||
|
||
@app.errorhandler(500)
|
||
def internal_error(error):
|
||
return jsonify({"error": "Внутренняя ошибка сервера"}), 500
|
||
|
||
|
||
@app.route("/get-files/<file_type>")
|
||
def get_files(file_type):
|
||
"""Получение списка файлов"""
|
||
files = []
|
||
|
||
if file_type == "parsed":
|
||
directory = "output"
|
||
pattern = lambda f: f.endswith("_products.json") and not f.endswith(
|
||
"_translated_products.json"
|
||
)
|
||
elif file_type == "translated":
|
||
directory = "output/translated"
|
||
pattern = lambda f: f.endswith("_translated_products.json")
|
||
elif file_type == "yml":
|
||
directory = "output/yml"
|
||
pattern = lambda f: f.endswith(".yml")
|
||
else:
|
||
return jsonify([])
|
||
|
||
if os.path.exists(directory):
|
||
files = [f for f in os.listdir(directory) if pattern(f)]
|
||
files = [get_file_info(f, directory) for f in files]
|
||
|
||
return jsonify(files)
|
||
|
||
|
||
# Добавляем роуты для отдачи изображений
|
||
@app.route("/images/products/<path:filename>")
|
||
def serve_product_image(filename):
|
||
"""Отдача изображений товаров"""
|
||
return send_from_directory("images/products", filename)
|
||
|
||
|
||
@app.route("/images/descriptions/<path:filename>")
|
||
def serve_description_image(filename):
|
||
"""Отдача изображений описаний"""
|
||
return send_from_directory("images/descriptions", filename)
|
||
|
||
|
||
# Добавляем функцию для получения полного URL изображения
|
||
def get_image_url(local_path: str) -> str:
|
||
"""Преобразует локальный путь в полный URL"""
|
||
if not local_path:
|
||
return None
|
||
return urljoin(BASE_URL, local_path)
|
||
|
||
|
||
def get_file_info(filename, directory):
|
||
"""Получение информации о файле"""
|
||
path = os.path.join(directory, filename)
|
||
stat = os.stat(path)
|
||
return {
|
||
"name": filename,
|
||
"modified": datetime.fromtimestamp(stat.st_mtime).strftime("%Y-%m-%d %H:%M:%S"),
|
||
"size": f"{stat.st_size / 1024:.1f} KB",
|
||
}
|
||
|
||
|
||
if __name__ == "__main__":
|
||
for directory in ["output", "output/translated", "output/yml"]:
|
||
os.makedirs(directory, exist_ok=True)
|
||
|
||
if not os.path.exists(CATEGORIES_FILE):
|
||
save_categories([])
|
||
|
||
# Запуск планувальника
|
||
scheduler = BackgroundScheduler()
|
||
scheduler.add_job(refresh_all_categories_daily, "interval", days=1)
|
||
scheduler.start()
|
||
|
||
app.run(host="0.0.0.0", port=5000, debug=True)
|