diff --git a/src/app/services/updaters/fl_updater.py b/src/app/services/updaters/fl_updater.py index b8e942f..4315bff 100644 --- a/src/app/services/updaters/fl_updater.py +++ b/src/app/services/updaters/fl_updater.py @@ -1,5 +1,4 @@ import asyncio -import platform from typing import Optional from aiologger import Logger @@ -54,40 +53,15 @@ class FlUpdater(BaseUpdater): sequences_updated_event: asyncio.Event genres_updated_event: asyncio.Event - platform: str logger: Logger - async def log(self, message): - if "windows" in self.platform.lower(): - print(message) - else: - await self.logger.info(message) - - async def _drop_tables(self): - async with self.mysql_pool.acquire() as conn: - async with conn.cursor() as cursor: - cursor.execute( - "DROP TABLE IF EXISTS libaannotations;" - "DROP TABLE IF EXISTS libapics;" - "DROP TABLE IF EXISTS libavtor;" - "DROP TABLE IF EXISTS libavtorname;" - "DROP TABLE IF EXISTS libbannotations;" - "DROP TABLE IF EXISTS libbook;" - "DROP TABLE IF EXISTS libbpics;" - "DROP TABLE IF EXISTS libgenre;" - "DROP TABLE IF EXISTS libgenrelist;" - "DROP TABLE IF EXISTS libseq;" - "DROP TABLE IF EXISTS libseqname;" - "DROP TABLE IF EXISTS libtranslator;" - ) - async def _import_dump(self, filename: str): - result = await run( + await run( f"wget -O - {env_config.FL_BASE_URL}/sql/{filename}.gz | gunzip | " f"mysql -h {env_config.MYSQL_HOST} -u {env_config.MYSQL_USER} " f'-p"{env_config.MYSQL_PASSWORD}" {env_config.MYSQL_DB_NAME}' ) - await self.log(f"Imported {filename}: {result}.") + await self.logger.info(f"Imported {filename}") async def _prepare(self): posgres_pool = await asyncpg.create_pool( @@ -112,7 +86,7 @@ class FlUpdater(BaseUpdater): self.postgres_pool = posgres_pool async def _set_source(self): - await self.log("Set source...") + await self.logger.info("Set source...") source_row = await self.postgres_pool.fetchrow( "SELECT id FROM sources WHERE name = 'flibusta';" @@ -129,7 +103,7 @@ class FlUpdater(BaseUpdater): self.SOURCE = source_row["id"] - await self.log("Source has set!") + await self.logger.info("Source has set!") async def _update_authors(self): def prepare_author(row: list): @@ -141,26 +115,50 @@ class FlUpdater(BaseUpdater): remove_wrong_ch(row[3]), ] - await self.log("Update authors...") + await self.logger.info("Update authors...") + + await self.postgres_pool.execute( + """ + CREATE OR REPLACE FUNCTION update_author( + source_ smallint, remote_id_ int, first_name_ varchar, last_name_ varchar, middle_name_ varchar + ) RETURNS void AS $$ + BEGIN + IF EXISTS (SELECT * FROM authors WHERE source = source_ AND remote_id = remote_id_) THEN + UPDATE authors SET first_name = first_name_, last_name = last_name_, middle_name = middle_name_ + WHERE source = source_ AND remote_id = remote_id_; + RETURN; + END IF; + + INSERT INTO authors (source, remote_id, first_name, last_name, middle_name) + VALUES (source_, remote_id_, first_name_, last_name_, middle_name_); + END; + $$ LANGUAGE plpgsql; + """ + ) async with self.mysql_pool.acquire() as conn: async with conn.cursor() as cursor: - await cursor.execute( - "SELECT AvtorId, FirstName, LastName, MiddleName FROM libavtorname;" - ) + await cursor.execute("SELECT COUNT(*) FROM libavtorname;") + + (rows_count,) = await cursor.fetchone() + + for offset in range(0, rows_count, 4096): + await cursor.execute( + "SELECT AvtorId, FirstName, LastName, MiddleName FROM libavtorname LIMIT 4096 OFFSET {offset};".format( + offset=offset + ) + ) + + rows = await cursor.fetchall() - while rows := await cursor.fetchmany(32): await self.postgres_pool.executemany( - "INSERT INTO authors (source, remote_id, first_name, last_name, middle_name) " - "VALUES ($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar)) " - "ON CONFLICT (source, remote_id) " - "DO UPDATE SET first_name = EXCLUDED.first_name, last_name = EXCLUDED.last_name, middle_name = EXCLUDED.middle_name;", + "SELECT update_author($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar));", [prepare_author(row) for row in rows], ) self.authors_updated_event.set() - await self.log("Authors updated!") + await self.logger.info("Authors updated!") async def _update_books(self): replace_dict = {"ru-": "ru", "ru~": "ru"} @@ -181,76 +179,153 @@ class FlUpdater(BaseUpdater): row[5] == "1", ] - await self.log("Update books...") + await self.logger.info("Update books...") + + await self.postgres_pool.execute( + """ + CREATE OR REPLACE FUNCTION update_book( + source_ smallint, remote_id_ int, title_ varchar, lang_ varchar, + file_type_ varchar, uploaded_ date, is_deleted_ boolean + ) RETURNS void AS $$ + BEGIN + IF EXISTS (SELECT * FROM books WHERE source = source_ AND remote_id = remote_id_) THEN + UPDATE books SET title = title_, lang = lang_, file_type = file_type_, + uploaded = uploaded_, is_deleted = is_deleted + WHERE source = source_ AND remote_id = remote_id_; + RETURN; + END IF; + + INSERT INTO books (source, remote_id, title, lang, file_type, uploaded, is_deleted) + VALUES (source_, remote_id_, title_, lang_, file_type_, uploaded_, is_deleted_); + END; + $$ LANGUAGE plpgsql; + """ + ) async with self.mysql_pool.acquire() as conn: async with conn.cursor() as cursor: - await cursor.execute( - "SELECT BookId, Title, Lang, FileType, Time, Deleted FROM libbook;" - ) + await cursor.execute("SELECT COUNT(*) FROM libbook;") + + (rows_count,) = await cursor.fetchone() + + for offset in range(0, rows_count, 4096): + await cursor.execute( + "SELECT BookId, Title, Lang, FileType, Time, Deleted FROM libbook LIMIT 4096 OFFSET {offset};".format( + offset=offset + ) + ) + + rows = await cursor.fetchall() - while rows := await cursor.fetchmany(32): await self.postgres_pool.executemany( - "INSERT INTO books (source, remote_id, title, lang, file_type, uploaded, is_deleted) " - "VALUES ($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar), $6, $7) " - "ON CONFLICT (source, remote_id) " - "DO UPDATE SET title = EXCLUDED.title, lang = EXCLUDED.lang, file_type = EXCLUDED.file_type, " - "uploaded = EXCLUDED.uploaded, is_deleted = EXCLUDED.is_deleted;", + "SELECT update_book($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar), $6, $7);", [prepare_book(row) for row in rows], ) self.books_updated_event.set() - await self.log("Books updated!") + await self.logger.info("Books updated!") async def _update_books_authors(self): await self.books_updated_event.wait() await self.authors_updated_event.wait() - await self.log("Update books_authors...") + await self.logger.info("Update books_authors...") + + await self.postgres_pool.execute( + """ + CREATE OR REPLACE FUNCTION update_book_author(source_ smallint, book_ integer, author_ integer) RETURNS void AS $$ + DECLARE + book_id integer := -1; + author_id integer := -1; + BEGIN + SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_; + SELECT id INTO author_id FROM authors WHERE source = source_ AND remote_id = author_; + + IF EXISTS (SELECT * FROM book_authors WHERE book = book_id AND author = author_id) THEN + RETURN; + END IF; + + INSERT INTO book_authors (book, author) VALUES (book_id, author_id); + END; + $$ LANGUAGE plpgsql; + """ + ) async with self.mysql_pool.acquire() as conn: async with conn.cursor() as cursor: - await cursor.execute("SELECT BookId, AvtorId FROM libavtor;") + await cursor.execute("SELECT COUNT(*) FROM libavtorname;") + + (rows_count,) = await cursor.fetchone() + + for offset in range(0, rows_count, 4096): + await cursor.execute( + "SELECT BookId, AvtorId FROM libavtor LIMIT 4096 OFFSET {offset};".format( + offset=offset + ) + ) + + rows = await cursor.fetchall() - while rows := await cursor.fetchmany(32): await self.postgres_pool.executemany( - "INSERT INTO book_authors (book, author) " - "SELECT " - "(SELECT id FROM books WHERE source = $1 AND remote_id = $2), " - "(SELECT id FROM authors WHERE source = $1 AND remote_id = $3) " - "ON CONFLICT (book, author) DO NOTHING;", + "SELECT update_book_author($1, $2, $3);", [(self.SOURCE, *row) for row in rows], ) - await self.log("Books_authors updated!") + await self.logger.info("Books_authors updated!") async def _update_translations(self): await self.books_updated_event.wait() await self.authors_updated_event.wait() - await self.log("Update translations...") + await self.logger.info("Update translations...") + + await self.postgres_pool.execute( + """ + CREATE OR REPLACE FUNCTION update_translation(source_ smallint, book_ integer, author_ integer, position_ smallint) RETURNS void AS $$ + DECLARE + book_id integer := -1; + author_id integer := -1; + BEGIN + SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_; + SELECT id INTO author_id FROM authors WHERE source = source_ AND remote_id = author_; + + IF EXISTS (SELECT * FROM translations WHERE book = book_id AND author = author_id) THEN + UPDATE translations SET position = position_ + WHERE book = book_id AND author = author_id; + RETURN; + END IF; + + INSERT INTO translations (book, author, position) VALUES (book_id, author_id, position_); + END; + $$ LANGUAGE plpgsql; + """ + ) async with self.mysql_pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute( - "SELECT BookId, TranslatorId, Pos FROM libtranslator " + "SELECT COUNT(*) FROM libtranslator " "WHERE BookId IN (SELECT BookId FROM libbook);" ) - while rows := await cursor.fetchmany(32): + (rows_count,) = await cursor.fetchone() + + for offset in range(0, rows_count, 4096): + await cursor.execute( + "SELECT BookId, TranslatorId, Pos FROM libtranslator " + "WHERE BookId IN (SELECT BookId FROM libbook) " + "LIMIT 4096 OFFSET {offset};".format(offset=offset) + ) + + rows = await cursor.fetchall() + await self.postgres_pool.executemany( - "INSERT INTO translations (book, author, position) " - "SELECT " - "(SELECT id FROM books WHERE source = $1 AND remote_id = $2), " - "(SELECT id FROM authors WHERE source = $1 AND remote_id = $3), " - "$4 " - "ON CONFLICT (book, author) " - "DO UPDATE SET position = EXCLUDED.position;", + "SELECT update_translation($1, $2, $3, $4)", [(self.SOURCE, *row) for row in rows], ) - await self.log("Translations updated!") + await self.logger.info("Translations updated!") async def _update_sequences(self): def prepare_sequence(row: list): @@ -260,89 +335,172 @@ class FlUpdater(BaseUpdater): remove_wrong_ch(row[1]), ] - await self.log("Update sequences...") + await self.logger.info("Update sequences...") + + await self.postgres_pool.execute( + """ + CREATE OR REPLACE FUNCTION update_sequences(source_ smallint, remote_id_ int, name_ varchar) RETURNS void AS $$ + BEGIN + IF EXISTS (SELECT * FROM sequences WHERE source = source_ AND remote_id = remote_id_) THEN + UPDATE sequences SET name = name_ WHERE source = source_ AND remote_id = remote_id_; + RETURN; + END IF; + + INSERT INTO sequences (source, remote_id, name) VALUES (source_, remote_id_, name_); + END; + $$ LANGUAGE plpgsql; + """ + ) async with self.mysql_pool.acquire() as conn: async with conn.cursor() as cursor: - await cursor.execute("SELECT SeqId, SeqName FROM libseqname;") + await cursor.execute("SELECT COUNT(*) FROM libseqname;") + + (rows_count,) = await cursor.fetchone() + + for offset in range(0, rows_count, 4096): + await cursor.execute( + "SELECT SeqId, SeqName FROM libseqname LIMIT 4096 OFFSET {offset};".format( + offset=offset + ) + ) + + rows = await cursor.fetchall() - while rows := await cursor.fetchmany(32): await self.postgres_pool.executemany( - "INSERT INTO sequences (source, remote_id, name) " - "VALUES ($1, $2, cast($3 as varchar)) " - "ON CONFLICT (source, remote_id) " - "DO UPDATE SET name = EXCLUDED.name;", + "SELECT update_sequences($1, $2, cast($3 as varchar));", [prepare_sequence(row) for row in rows], ) self.sequences_updated_event.set() - await self.log("Sequences updated!") + await self.logger.info("Sequences updated!") async def _update_sequences_info(self): await self.sequences_updated_event.wait() await self.books_updated_event.wait() - await self.log("Update book_sequences...") + await self.logger.info("Update book_sequences...") + + await self.postgres_pool.execute( + """ + CREATE OR REPLACE FUNCTION update_book_sequence(source_ smallint, book_ integer, sequence_ integer, position_ smallint) RETURNS void AS $$ + DECLARE + book_id integer := -1; + sequence_id integer := -1; + BEGIN + SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_; + SELECT id INTO sequence_id FROM sequences WHERE source = source_ AND remote_id = sequence_; + + IF EXISTS (SELECT * FROM book_sequences WHERE book = book_id AND sequence = sequence_id) THEN + UPDATE book_sequences SET position = position_ WHERE book = book_id AND sequence = sequence_id; + RETURN; + END IF; + + INSERT INTO book_sequences (book, sequence, position) VALUES (book_id, sequence_id, position_); + END; + $$ LANGUAGE plpgsql; + """ + ) async with self.mysql_pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute( - "SELECT BookId, SeqId, level FROM libseq " + "SELECT COUNT(*) FROM libseq " "WHERE " "BookId IN (SELECT BookId FROM libbook) AND " "SeqId IN (SELECT SeqId FROM libseqname);" ) - while rows := await cursor.fetchmany(32): + (rows_count,) = await cursor.fetchone() + + for offset in range(0, rows_count, 4096): + await cursor.execute( + "SELECT BookId, SeqId, level FROM libseq " + "WHERE " + "BookId IN (SELECT BookId FROM libbook) AND " + "SeqId IN (SELECT SeqId FROM libseqname) " + "LIMIT 4096 OFFSET {offset};".format(offset=offset) + ) + + rows = await cursor.fetchall() + await self.postgres_pool.executemany( - "INSERT INTO book_sequences (book, sequence, position) " - "SELECT " - "(SELECT id FROM books WHERE source = $1 AND remote_id = $2), " - "(SELECT id FROM sequences WHERE source = $1 AND remote_id = $3), " - "$4 " - "ON CONFLICT (book, sequence) " - "DO UPDATE SET position = EXCLUDED.position;", + "SELECT update_book_sequence($1, $2, $3, $4);", [[self.SOURCE, *row] for row in rows], ) - await self.log("Book_sequences updated!") + await self.logger.info("Book_sequences updated!") async def _update_book_annotations(self): await self.books_updated_event.wait() - await self.log("Update book_annotations...") + await self.logger.info("Update book_annotations...") + + await self.postgres_pool.execute( + """ + CREATE OR REPLACE FUNCTION update_book_annotation(source_ smallint, book_ integer, title_ varchar, text_ text) RETURNS void AS $$ + DECLARE + book_id integer := -1; + BEGIN + SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_; + + IF EXISTS (SELECT * FROM book_annotations WHERE book = book_id) THEN + UPDATE book_annotations SET title = title_, text = text_ WHERE book = book_id; + RETURN; + END IF; + + INSERT INTO book_annotations (book, title, text) VALUES (book_id, title_, text_); + END; + $$ LANGUAGE plpgsql; + """ + ) async with self.mysql_pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute( - "SELECT BookId, Title, Body FROM libbannotations " + "SELECT COUNT(*) FROM libbannotations " "WHERE BookId IN (SELECT BookId FROM libbook);" ) - while rows := await cursor.fetchmany(32): + (rows_count,) = await cursor.fetchone() + + for offset in range(0, rows_count, 4096): + await cursor.execute( + "SELECT BookId, Title, Body FROM libbannotations " + "WHERE BookId IN (SELECT BookId FROM libbook) " + "LIMIT 4096 OFFSET {offset};".format(offset=offset) + ) + + rows = await cursor.fetchall() + await self.postgres_pool.executemany( - "INSERT INTO book_annotations (book, title, text) " - "SELECT " - "(SELECT id FROM books WHERE source = $1 AND remote_id = $2), " - "$3, $4 " - "ON CONFLICT (book) " - "DO UPDATE SET title = EXCLUDED.title, text = EXCLUDED.text;", + "SELECT update_book_annotation($1, $2, cast($3 as varchar), cast($4 as text));", [[self.SOURCE, *row] for row in rows], ) - await self.log("Book_annotations updated!") + await self.logger.info("Book_annotations updated!") await self._update_book_annotations_pic() async def _update_book_annotations_pic(self): - await self.log("Update book_annotations_pic...") + await self.logger.info("Update book_annotations_pic...") async with self.mysql_pool.acquire() as conn: async with conn.cursor() as cursor: - await cursor.execute("SELECT BookId, File FROM libbpics;") + await cursor.execute("SELECT COUNT(*) FROM libbpics;") + + (rows_count,) = await cursor.fetchone() + + for offset in range(0, rows_count, 4096): + await cursor.execute( + "SELECT BookId, File FROM libbpics LIMIT 4096 OFFSET {offset};".format( + offset=offset + ) + ) + + rows = await cursor.fetchall() - while rows := await cursor.fetchmany(32): await self.postgres_pool.executemany( "UPDATE book_annotations " "SET file = cast($3 as varchar) " @@ -351,42 +509,74 @@ class FlUpdater(BaseUpdater): [[self.SOURCE, *row] for row in rows], ) - await self.log("Book_annotation_pics updated!") + await self.logger.info("Book_annotation_pics updated!") async def _update_author_annotations(self): await self.authors_updated_event.wait() - await self.log("Update author_annotations...") + await self.logger.info("Update author_annotations...") + + await self.postgres_pool.execute( + """ + CREATE OR REPLACE FUNCTION update_author_annotation(source_ smallint, author_ integer, title_ varchar, text_ text) RETURNS void AS $$ + DECLARE + author_id integer := -1; + BEGIN + SELECT id INTO author_id FROM authors WHERE source = source_ AND remote_id = author_; + + IF EXISTS (SELECT * FROM author_annotations WHERE author = author_id) THEN + UPDATE author_annotations SET title = title_, text = text_ WHERE author = author_id; + RETURN; + END IF; + + INSERT INTO author_annotations (author, title, text) VALUES (author_id, title_, text_); + END; + $$ LANGUAGE plpgsql; + """ + ) async with self.mysql_pool.acquire() as conn: async with conn.cursor() as cursor: - await cursor.execute( - "SELECT AvtorId, Title, Body FROM libaannotations;" - ) + await cursor.execute("SELECT COUNT(*) FROM libaannotations;") + + (rows_count,) = await cursor.fetchone() + + for offset in range(0, rows_count, 4096): + await cursor.execute( + "SELECT AvtorId, Title, Body FROM libaannotations LIMIT 4096 OFFSET {offset};".format( + offset=offset + ) + ) + + rows = await cursor.fetchall() - while rows := await cursor.fetchmany(32): await self.postgres_pool.executemany( - "INSERT INTO author_annotations (author, title, text) " - "SELECT " - "(SELECT id FROM authors WHERE source = $1 AND remote_id = $2), " - "$3, $4 " - "ON CONFLICT (author) " - "DO UPDATE SET title = EXCLUDED.title, text = EXCLUDED.text;", + "SELECT update_author_annotation($1, $2, cast($3 as varchar), cast($4 as text));", [[self.SOURCE, *row] for row in rows], ) - await self.log("Author_annotation_updated!") + await self.logger.info("Author_annotation_updated!") await self._update_author_annotations_pics() async def _update_author_annotations_pics(self): - await self.log("Update author_annotations_pic...") + await self.logger.info("Update author_annotations_pic...") async with self.mysql_pool.acquire() as conn: async with conn.cursor() as cursor: - await cursor.execute("SELECT AvtorId, File FROM libapics;") + await cursor.execute("SELECT COUNT(*) FROM libapics;") + + (rows_count,) = await cursor.fetchone() + + for offset in range(0, rows_count, 4096): + await cursor.execute( + "SELECT AvtorId, File FROM libapics LIMIT 4096 OFFSET {offset};".format( + offset=offset + ) + ) + + rows = await cursor.fetchall() - while rows := await cursor.fetchmany(32): await self.postgres_pool.executemany( "UPDATE author_annotations " "SET file = cast($3 as varchar) " @@ -395,58 +585,104 @@ class FlUpdater(BaseUpdater): [[self.SOURCE, *row] for row in rows], ) - await self.log("Author_annotatioins_pic updated!") + await self.logger.info("Author_annotatioins_pic updated!") async def _update_genres(self): - await self.log("Update genres...") + await self.logger.info("Update genres...") + + await self.postgres_pool.execute( + """ + CREATE OR REPLACE FUNCTION update_genre( + source_ smallint, remote_id_ int, code_ varchar, description_ varchar, meta_ varchar + ) RETURNS void AS $$ + BEGIN + IF EXISTS (SELECT * FROM genres WHERE source = source_ AND remote_id = remote_id_) THEN + UPDATE genres SET code = code_, description = description_, meta = meta_ + WHERE source = source_ AND remote_id = remote_id_; + RETURN; + END IF; + + INSERT INTO authors (source, remote_id, code, description, meta) + VALUES (source_, remote_id_, code_, description_, meta_); + END; + $$ LANGUAGE plpgsql; + """ + ) async with self.mysql_pool.acquire() as conn: async with conn.cursor() as cursor: - await cursor.execute( - "SELECT GenreId, GenreCode, GenreDesc, GenreMeta FROM libgenrelist;" - ) + await cursor.execute("SELECT COUNT(*) FROM libgenrelist;") - while rows := await cursor.fetchmany(32): + (rows_count,) = await cursor.fetchone() + + for offset in range(0, rows_count, 4096): + await cursor.execute( + "SELECT GenreId, GenreCode, GenreDesc, GenreMeta FROM libgenrelist LIMIT 4096 OFFSET {offset};".format( + offset=offset + ) + ) + + rows = await cursor.fetchall() await self.postgres_pool.executemany( - "INSERT INTO genres (source, remote_id, code, description, meta) " - "VALUES ($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar)) " - "ON CONFLICT (source, remote_id) " - "DO UPDATE SET code = EXCLUDED.code, description = EXCLUDED.description, meta = EXCLUDED.meta;", + "SELECT update_genre($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar));", [[self.SOURCE, *row] for row in rows], ) - await self.log("Genres updated!") + await self.logger.info("Genres updated!") async def _update_books_genres(self): await self.books_updated_event.wait() await self.genres_updated_event.wait() - await self.log("Update book_genres...") + await self.logger.info("Update book_genres...") + + await self.postgres_pool.execute( + """ + CREATE OR REPLACE FUNCTION update_book_sequence(source_ smallint, book_ integer, genre_ integer) RETURNS void AS $$ + DECLARE + book_id integer := -1; + genre_id integer := -1; + BEGIN + SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_; + SELECT id INTO genre_id FROM genres WHERE source = source_ AND remote_id = genre_; + + IF EXISTS (SELECT * FROM book_genres WHERE book = book_id AND genre = genre_id) THEN + RETURN; + END IF; + + INSERT INTO book_genres (book, genre) VALUES (book_id, genre_id); + END; + $$ LANGUAGE plpgsql; + """ + ) async with self.mysql_pool.acquire() as conn: async with conn.cursor() as cursor: - await cursor.execute("SELECT BookId, GenreId FROM libgenre;") + await cursor.execute("SELECT COUNT(*) FROM libgenre;") + + (rows_count,) = await cursor.fetchone() + + for offset in range(0, rows_count, 4096): + await cursor.execute( + "SELECT BookId, GenreId FROM libgenre LIMIT 4096 OFFSET {offset};".format( + offset=offset + ) + ) + + rows = await cursor.fetchall() - while rows := await cursor.fetchmany(32): await self.postgres_pool.executemany( - "INSERT INTO book_genres (book, genre) " - "SELECT " - "(SELECT id FROM books WHERE source = $1 AND remote_id = $2), " - "(SELECT id FROM genres WHERE source = $1 AND remote_id = $3) " - "ON CONFLICT (book, author) DO NOTHING;", + "SELECT update_book_sequence($1, $2, $3);", [(self.SOURCE, *row) for row in rows], ) - await self.log("Book_genres updated!") + await self.logger.info("Book_genres updated!") async def _update(self) -> bool: - self.platform = platform.platform() self.logger = Logger.with_default_handlers() await self._prepare() - await self._drop_tables() - await asyncio.gather(*[self._import_dump(filename) for filename in self.FILES]) await self._set_source() diff --git a/src/app/services/webhook.py b/src/app/services/webhook.py index 22149ac..1ef53d6 100644 --- a/src/app/services/webhook.py +++ b/src/app/services/webhook.py @@ -6,6 +6,8 @@ from core.config import env_config, WebhookConfig class WebhookSender: @classmethod async def _make_request(cls, webhook: WebhookConfig): + print(f"Make request to {webhook.url}") + async with httpx.AsyncClient() as client: request_maker = getattr(client, webhook.method) await request_maker(webhook.url, headers=webhook.headers) diff --git a/src/app/views.py b/src/app/views.py index 555d65d..484b8a1 100644 --- a/src/app/views.py +++ b/src/app/views.py @@ -13,4 +13,4 @@ async def update(updater: UpdaterTypes, background_tasks: BackgroundTasks): background_tasks.add_task(updater_.update) - return True + return "Ok!"