import { open } from 'sqlite'; import sqlite3 from 'sqlite3'; import fetch from 'node-fetch'; import { createGunzip } from 'zlib'; import { createInterface } from 'readline'; const DB_FILE = './kinopoisk_main.db'; const DATA_URL = 'https://huggingface.co/datasets/opex792/kinopoisk/resolve/main/consolidated/kinopoisk.jsonl.gz?download=true'; let db; /** * Инициализирует базу данных. Схема остается прежней. */ export async function initializeDatabase() { db = await open({ filename: DB_FILE, driver: sqlite3.Database }); console.log('Подключение к SQLite (Main API) установлено.'); await db.exec('PRAGMA journal_mode = WAL;'); await db.exec('PRAGMA synchronous = NORMAL;'); await db.exec(` CREATE TABLE IF NOT EXISTS movies ( id INTEGER PRIMARY KEY, year INTEGER, type TEXT, rating_kp REAL, rating_imdb REAL, age_rating INTEGER, movie_length INTEGER, is_series BOOLEAN, data TEXT ); CREATE TABLE IF NOT EXISTS genres (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE); CREATE TABLE IF NOT EXISTS countries (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE); CREATE TABLE IF NOT EXISTS movie_genres (movie_id INTEGER, genre_id INTEGER, FOREIGN KEY(movie_id) REFERENCES movies(id), FOREIGN KEY(genre_id) REFERENCES genres(id), PRIMARY KEY (movie_id, genre_id)); CREATE TABLE IF NOT EXISTS movie_countries (movie_id INTEGER, country_id INTEGER, FOREIGN KEY(movie_id) REFERENCES movies(id), FOREIGN KEY(country_id) REFERENCES countries(id), PRIMARY KEY (movie_id, country_id)); CREATE INDEX IF NOT EXISTS idx_year ON movies(year); CREATE INDEX IF NOT EXISTS idx_rating_kp ON movies(rating_kp); CREATE INDEX IF NOT EXISTS idx_type ON movies(type); `); const count = await db.get('SELECT COUNT(id) as count FROM movies'); if (count.count === 0) { console.log('База данных пуста. Запускаю начальную загрузку...'); await refreshData(); } else { console.log(`В базе уже есть ${count.count} записей.`); } return db; } /** * Переработанная функция загрузки данных, использующая потоковую обработку * для предотвращения переполнения памяти. */ export async function refreshData() { console.log('Начинается высокопроизводительная перезагрузка данных...'); try { // --- ШАГ 1: Сканирование файла для сбора метаданных (жанры, страны) --- console.log('Шаг 1: Сканирование файла для сбора жанров и стран...'); const response1 = await fetch(DATA_URL); if (!response1.ok) throw new Error(`Ошибка загрузки на первом проходе: ${response1.statusText}`); const gunzip1 = createGunzip(); response1.body.pipe(gunzip1); const rl1 = createInterface({ input: gunzip1, crlfDelay: Infinity }); const allGenres = new Set(); const allCountries = new Set(); for await (const line of rl1) { if (line.trim()) { // Мы не храним весь объект, только извлекаем нужные данные const movie = JSON.parse(line); movie.genres?.forEach(g => allGenres.add(g.name)); movie.countries?.forEach(c => allCountries.add(c.name)); } } console.log(`Шаг 1 завершен. Найдено ${allGenres.size} уникальных жанров, ${allCountries.size} уникальных стран.`); // --- Начало транзакции для массовой вставки --- await db.exec('BEGIN TRANSACTION;'); // Очищаем таблицы перед новой загрузкой await db.exec('DELETE FROM movie_genres;'); await db.exec('DELETE FROM movie_countries;'); await db.exec('DELETE FROM genres;'); await db.exec('DELETE FROM countries;'); await db.exec('DELETE FROM movies;'); // --- ШАГ 2: Пакетная вставка жанров и стран --- console.log('Шаг 2: Пакетная запись жанров и стран...'); const genreStmt = await db.prepare('INSERT INTO genres (name) VALUES (?)'); for (const name of allGenres) { await genreStmt.run(name); } await genreStmt.finalize(); const countryStmt = await db.prepare('INSERT INTO countries (name) VALUES (?)'); for (const name of allCountries) { await countryStmt.run(name); } await countryStmt.finalize(); console.log('Шаг 2 завершен.'); // --- ШАГ 3: Кэширование ID жанров и стран для быстрой связи --- console.log('Шаг 3: Кэширование ID в память...'); const genreCache = new Map(); const countryCache = new Map(); const genresFromDb = await db.all('SELECT id, name FROM genres'); genresFromDb.forEach(g => genreCache.set(g.name, g.id)); const countriesFromDb = await db.all('SELECT id, name FROM countries'); countriesFromDb.forEach(c => countryCache.set(c.name, c.id)); console.log('Шаг 3 завершен.'); // --- ШАГ 4: Потоковая вставка фильмов и их связей (второй проход) --- console.log('Шаг 4: Потоковая запись фильмов и их связей...'); const response2 = await fetch(DATA_URL); if (!response2.ok) throw new Error(`Ошибка загрузки на втором проходе: ${response2.statusText}`); const gunzip2 = createGunzip(); response2.body.pipe(gunzip2); const rl2 = createInterface({ input: gunzip2, crlfDelay: Infinity }); const movieStmt = await db.prepare('INSERT INTO movies (id, year, type, rating_kp, rating_imdb, age_rating, movie_length, is_series, data) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)'); const movieGenreStmt = await db.prepare('INSERT OR IGNORE INTO movie_genres (movie_id, genre_id) VALUES (?, ?)'); const movieCountryStmt = await db.prepare('INSERT OR IGNORE INTO movie_countries (movie_id, country_id) VALUES (?, ?)'); let processedCount = 0; for await (const line of rl2) { if (line.trim()) { const movie = JSON.parse(line); if (!movie.id) continue; await movieStmt.run(movie.id, movie.year, movie.type, movie.rating?.kp, movie.rating?.imdb, movie.ageRating, movie.movieLength, movie.isSeries, JSON.stringify(movie)); if (movie.genres) { for (const genre of movie.genres) { const genreId = genreCache.get(genre.name); if (genreId) await movieGenreStmt.run(movie.id, genreId); } } if (movie.countries) { for (const country of movie.countries) { const countryId = countryCache.get(country.name); if (countryId) await movieCountryStmt.run(movie.id, countryId); } } processedCount++; if (processedCount % 10000 === 0) { console.log(`Обработано ${processedCount} фильмов...`); } } } await movieStmt.finalize(); await movieGenreStmt.finalize(); await movieCountryStmt.finalize(); await db.exec('COMMIT;'); console.log(`Шаг 4 завершен. Перезагрузка данных успешно окончена. Всего обработано ${processedCount} фильмов.`); } catch (error) { console.error('Критическая ошибка во время обновления данных:', error); try { await db.exec('ROLLBACK;'); console.log('Транзакция была отменена.'); } catch (rollbackError) { console.error('Ошибка при откате транзакции:', rollbackError); } } } // --- Функции для API остаются без изменений --- export async function getMovieById(id) { const row = await db.get('SELECT data FROM movies WHERE id = ?', id); return row ? JSON.parse(row.data) : null; } export async function getRandomMovie() { const row = await db.get('SELECT data FROM movies ORDER BY RANDOM() LIMIT 1'); return row ? JSON.parse(row.data) : null; } export async function getMovies(options) { // Эта функция остается без изменений, так как она уже была достаточно оптимизирована для чтения const { filters, pagination, sorting } = options; let whereClauses = []; let params = []; let joins = new Set(); for (const key in filters) { let value = filters[key]; if (!value) continue; if (['year', 'rating.kp', 'rating.imdb', 'ageRating', 'movieLength'].includes(key)) { const dbKey = key.replace('.', '_'); const [min, max] = value.toString().split('-').map(parseFloat); if (!isNaN(min)) { whereClauses.push(`m.${dbKey} >= ?`); params.push(min); } if (max && !isNaN(max)) { whereClauses.push(`m.${dbKey} <= ?`); params.push(max); } } else if (['type', 'isSeries'].includes(key)) { const dbKey = key.replace('.', '_'); const values = Array.isArray(value) ? value : [value]; whereClauses.push(`m.${dbKey} IN (${values.map(() => '?').join(',')})`); params.push(...values); } else if (key === 'genres.name' || key === 'countries.name') { const isGenre = key === 'genres.name'; const linkTable = isGenre ? 'movie_genres' : 'movie_countries'; const mainTable = isGenre ? 'genres' : 'countries'; const linkAlias = isGenre ? 'mg' : 'c'; // Уникальные алиасы const mainAlias = isGenre ? 'g' : 'co'; const values = Array.isArray(value) ? value : [value]; let includeClauses = []; let includeParams = []; for (const v of values) { if (v.startsWith('+')) { const name = v.substring(1); whereClauses.push(`EXISTS (SELECT 1 FROM movie_genres mg_in JOIN genres g_in ON mg_in.genre_id = g_in.id WHERE mg_in.movie_id = m.id AND g_in.name = ?)`); params.push(name); } else if (v.startsWith('!')) { const name = v.substring(1); whereClauses.push(`NOT EXISTS (SELECT 1 FROM movie_genres mg_ex JOIN genres g_ex ON mg_ex.genre_id = g_ex.id WHERE mg_ex.movie_id = m.id AND g_ex.name = ?)`); params.push(name); } else { includeClauses.push('?'); includeParams.push(v); } } if(includeClauses.length > 0){ whereClauses.push(`EXISTS (SELECT 1 FROM movie_genres mg_inc JOIN genres g_inc ON mg_inc.genre_id = g_inc.id WHERE mg_inc.movie_id = m.id AND g_inc.name IN (${includeClauses.join(',')}))`); params.push(...includeParams); } } } const whereString = whereClauses.length > 0 ? `WHERE ${whereClauses.join(' AND ')}` : ''; const joinString = Array.from(joins).join(' '); const totalResult = await db.get(`SELECT COUNT(DISTINCT m.id) as count FROM movies m ${joinString} ${whereString}`, params); const total = totalResult.count; let orderString = 'ORDER BY m.id'; if (sorting && sorting.field && sorting.type) { const sortField = sorting.field.replace('.', '_'); const sortType = sorting.type === '-1' ? 'DESC' : 'ASC'; if (['id', 'year', 'rating_kp', 'rating_imdb'].includes(sortField)) { orderString = `ORDER BY m.${sortField} ${sortType}`; } } const limit = pagination.limit || 10; const offset = ((pagination.page || 1) - 1) * limit; const query = `SELECT DISTINCT m.data FROM movies m ${joinString} ${whereString} ${orderString} LIMIT ? OFFSET ?`; const results = await db.all(query, [...params, limit, offset]); return { docs: results.map(r => JSON.parse(r.data)), total }; }