Fix fl update

This commit is contained in:
2022-01-08 00:46:21 +03:00
parent 6ec5fede0f
commit cdd4582b39
3 changed files with 381 additions and 143 deletions

View File

@@ -1,5 +1,4 @@
import asyncio import asyncio
import platform
from typing import Optional from typing import Optional
from aiologger import Logger from aiologger import Logger
@@ -54,40 +53,15 @@ class FlUpdater(BaseUpdater):
sequences_updated_event: asyncio.Event sequences_updated_event: asyncio.Event
genres_updated_event: asyncio.Event genres_updated_event: asyncio.Event
platform: str
logger: Logger logger: Logger
async def log(self, message):
if "windows" in self.platform.lower():
print(message)
else:
await self.logger.info(message)
async def _drop_tables(self):
async with self.mysql_pool.acquire() as conn:
async with conn.cursor() as cursor:
cursor.execute(
"DROP TABLE IF EXISTS libaannotations;"
"DROP TABLE IF EXISTS libapics;"
"DROP TABLE IF EXISTS libavtor;"
"DROP TABLE IF EXISTS libavtorname;"
"DROP TABLE IF EXISTS libbannotations;"
"DROP TABLE IF EXISTS libbook;"
"DROP TABLE IF EXISTS libbpics;"
"DROP TABLE IF EXISTS libgenre;"
"DROP TABLE IF EXISTS libgenrelist;"
"DROP TABLE IF EXISTS libseq;"
"DROP TABLE IF EXISTS libseqname;"
"DROP TABLE IF EXISTS libtranslator;"
)
async def _import_dump(self, filename: str): async def _import_dump(self, filename: str):
result = await run( await run(
f"wget -O - {env_config.FL_BASE_URL}/sql/{filename}.gz | gunzip | " f"wget -O - {env_config.FL_BASE_URL}/sql/{filename}.gz | gunzip | "
f"mysql -h {env_config.MYSQL_HOST} -u {env_config.MYSQL_USER} " f"mysql -h {env_config.MYSQL_HOST} -u {env_config.MYSQL_USER} "
f'-p"{env_config.MYSQL_PASSWORD}" {env_config.MYSQL_DB_NAME}' f'-p"{env_config.MYSQL_PASSWORD}" {env_config.MYSQL_DB_NAME}'
) )
await self.log(f"Imported {filename}: {result}.") await self.logger.info(f"Imported {filename}")
async def _prepare(self): async def _prepare(self):
posgres_pool = await asyncpg.create_pool( posgres_pool = await asyncpg.create_pool(
@@ -112,7 +86,7 @@ class FlUpdater(BaseUpdater):
self.postgres_pool = posgres_pool self.postgres_pool = posgres_pool
async def _set_source(self): async def _set_source(self):
await self.log("Set source...") await self.logger.info("Set source...")
source_row = await self.postgres_pool.fetchrow( source_row = await self.postgres_pool.fetchrow(
"SELECT id FROM sources WHERE name = 'flibusta';" "SELECT id FROM sources WHERE name = 'flibusta';"
@@ -129,7 +103,7 @@ class FlUpdater(BaseUpdater):
self.SOURCE = source_row["id"] self.SOURCE = source_row["id"]
await self.log("Source has set!") await self.logger.info("Source has set!")
async def _update_authors(self): async def _update_authors(self):
def prepare_author(row: list): def prepare_author(row: list):
@@ -141,26 +115,50 @@ class FlUpdater(BaseUpdater):
remove_wrong_ch(row[3]), remove_wrong_ch(row[3]),
] ]
await self.log("Update authors...") await self.logger.info("Update authors...")
await self.postgres_pool.execute(
"""
CREATE OR REPLACE FUNCTION update_author(
source_ smallint, remote_id_ int, first_name_ varchar, last_name_ varchar, middle_name_ varchar
) RETURNS void AS $$
BEGIN
IF EXISTS (SELECT * FROM authors WHERE source = source_ AND remote_id = remote_id_) THEN
UPDATE authors SET first_name = first_name_, last_name = last_name_, middle_name = middle_name_
WHERE source = source_ AND remote_id = remote_id_;
RETURN;
END IF;
INSERT INTO authors (source, remote_id, first_name, last_name, middle_name)
VALUES (source_, remote_id_, first_name_, last_name_, middle_name_);
END;
$$ LANGUAGE plpgsql;
"""
)
async with self.mysql_pool.acquire() as conn: async with self.mysql_pool.acquire() as conn:
async with conn.cursor() as cursor: async with conn.cursor() as cursor:
await cursor.execute("SELECT COUNT(*) FROM libavtorname;")
(rows_count,) = await cursor.fetchone()
for offset in range(0, rows_count, 4096):
await cursor.execute( await cursor.execute(
"SELECT AvtorId, FirstName, LastName, MiddleName FROM libavtorname;" "SELECT AvtorId, FirstName, LastName, MiddleName FROM libavtorname LIMIT 4096 OFFSET {offset};".format(
offset=offset
)
) )
while rows := await cursor.fetchmany(32): rows = await cursor.fetchall()
await self.postgres_pool.executemany( await self.postgres_pool.executemany(
"INSERT INTO authors (source, remote_id, first_name, last_name, middle_name) " "SELECT update_author($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar));",
"VALUES ($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar)) "
"ON CONFLICT (source, remote_id) "
"DO UPDATE SET first_name = EXCLUDED.first_name, last_name = EXCLUDED.last_name, middle_name = EXCLUDED.middle_name;",
[prepare_author(row) for row in rows], [prepare_author(row) for row in rows],
) )
self.authors_updated_event.set() self.authors_updated_event.set()
await self.log("Authors updated!") await self.logger.info("Authors updated!")
async def _update_books(self): async def _update_books(self):
replace_dict = {"ru-": "ru", "ru~": "ru"} replace_dict = {"ru-": "ru", "ru~": "ru"}
@@ -181,76 +179,153 @@ class FlUpdater(BaseUpdater):
row[5] == "1", row[5] == "1",
] ]
await self.log("Update books...") await self.logger.info("Update books...")
await self.postgres_pool.execute(
"""
CREATE OR REPLACE FUNCTION update_book(
source_ smallint, remote_id_ int, title_ varchar, lang_ varchar,
file_type_ varchar, uploaded_ date, is_deleted_ boolean
) RETURNS void AS $$
BEGIN
IF EXISTS (SELECT * FROM books WHERE source = source_ AND remote_id = remote_id_) THEN
UPDATE books SET title = title_, lang = lang_, file_type = file_type_,
uploaded = uploaded_, is_deleted = is_deleted
WHERE source = source_ AND remote_id = remote_id_;
RETURN;
END IF;
INSERT INTO books (source, remote_id, title, lang, file_type, uploaded, is_deleted)
VALUES (source_, remote_id_, title_, lang_, file_type_, uploaded_, is_deleted_);
END;
$$ LANGUAGE plpgsql;
"""
)
async with self.mysql_pool.acquire() as conn: async with self.mysql_pool.acquire() as conn:
async with conn.cursor() as cursor: async with conn.cursor() as cursor:
await cursor.execute("SELECT COUNT(*) FROM libbook;")
(rows_count,) = await cursor.fetchone()
for offset in range(0, rows_count, 4096):
await cursor.execute( await cursor.execute(
"SELECT BookId, Title, Lang, FileType, Time, Deleted FROM libbook;" "SELECT BookId, Title, Lang, FileType, Time, Deleted FROM libbook LIMIT 4096 OFFSET {offset};".format(
offset=offset
)
) )
while rows := await cursor.fetchmany(32): rows = await cursor.fetchall()
await self.postgres_pool.executemany( await self.postgres_pool.executemany(
"INSERT INTO books (source, remote_id, title, lang, file_type, uploaded, is_deleted) " "SELECT update_book($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar), $6, $7);",
"VALUES ($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar), $6, $7) "
"ON CONFLICT (source, remote_id) "
"DO UPDATE SET title = EXCLUDED.title, lang = EXCLUDED.lang, file_type = EXCLUDED.file_type, "
"uploaded = EXCLUDED.uploaded, is_deleted = EXCLUDED.is_deleted;",
[prepare_book(row) for row in rows], [prepare_book(row) for row in rows],
) )
self.books_updated_event.set() self.books_updated_event.set()
await self.log("Books updated!") await self.logger.info("Books updated!")
async def _update_books_authors(self): async def _update_books_authors(self):
await self.books_updated_event.wait() await self.books_updated_event.wait()
await self.authors_updated_event.wait() await self.authors_updated_event.wait()
await self.log("Update books_authors...") await self.logger.info("Update books_authors...")
await self.postgres_pool.execute(
"""
CREATE OR REPLACE FUNCTION update_book_author(source_ smallint, book_ integer, author_ integer) RETURNS void AS $$
DECLARE
book_id integer := -1;
author_id integer := -1;
BEGIN
SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_;
SELECT id INTO author_id FROM authors WHERE source = source_ AND remote_id = author_;
IF EXISTS (SELECT * FROM book_authors WHERE book = book_id AND author = author_id) THEN
RETURN;
END IF;
INSERT INTO book_authors (book, author) VALUES (book_id, author_id);
END;
$$ LANGUAGE plpgsql;
"""
)
async with self.mysql_pool.acquire() as conn: async with self.mysql_pool.acquire() as conn:
async with conn.cursor() as cursor: async with conn.cursor() as cursor:
await cursor.execute("SELECT BookId, AvtorId FROM libavtor;") await cursor.execute("SELECT COUNT(*) FROM libavtorname;")
(rows_count,) = await cursor.fetchone()
for offset in range(0, rows_count, 4096):
await cursor.execute(
"SELECT BookId, AvtorId FROM libavtor LIMIT 4096 OFFSET {offset};".format(
offset=offset
)
)
rows = await cursor.fetchall()
while rows := await cursor.fetchmany(32):
await self.postgres_pool.executemany( await self.postgres_pool.executemany(
"INSERT INTO book_authors (book, author) " "SELECT update_book_author($1, $2, $3);",
"SELECT "
"(SELECT id FROM books WHERE source = $1 AND remote_id = $2), "
"(SELECT id FROM authors WHERE source = $1 AND remote_id = $3) "
"ON CONFLICT (book, author) DO NOTHING;",
[(self.SOURCE, *row) for row in rows], [(self.SOURCE, *row) for row in rows],
) )
await self.log("Books_authors updated!") await self.logger.info("Books_authors updated!")
async def _update_translations(self): async def _update_translations(self):
await self.books_updated_event.wait() await self.books_updated_event.wait()
await self.authors_updated_event.wait() await self.authors_updated_event.wait()
await self.log("Update translations...") await self.logger.info("Update translations...")
await self.postgres_pool.execute(
"""
CREATE OR REPLACE FUNCTION update_translation(source_ smallint, book_ integer, author_ integer, position_ smallint) RETURNS void AS $$
DECLARE
book_id integer := -1;
author_id integer := -1;
BEGIN
SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_;
SELECT id INTO author_id FROM authors WHERE source = source_ AND remote_id = author_;
IF EXISTS (SELECT * FROM translations WHERE book = book_id AND author = author_id) THEN
UPDATE translations SET position = position_
WHERE book = book_id AND author = author_id;
RETURN;
END IF;
INSERT INTO translations (book, author, position) VALUES (book_id, author_id, position_);
END;
$$ LANGUAGE plpgsql;
"""
)
async with self.mysql_pool.acquire() as conn: async with self.mysql_pool.acquire() as conn:
async with conn.cursor() as cursor: async with conn.cursor() as cursor:
await cursor.execute( await cursor.execute(
"SELECT BookId, TranslatorId, Pos FROM libtranslator " "SELECT COUNT(*) FROM libtranslator "
"WHERE BookId IN (SELECT BookId FROM libbook);" "WHERE BookId IN (SELECT BookId FROM libbook);"
) )
while rows := await cursor.fetchmany(32): (rows_count,) = await cursor.fetchone()
for offset in range(0, rows_count, 4096):
await cursor.execute(
"SELECT BookId, TranslatorId, Pos FROM libtranslator "
"WHERE BookId IN (SELECT BookId FROM libbook) "
"LIMIT 4096 OFFSET {offset};".format(offset=offset)
)
rows = await cursor.fetchall()
await self.postgres_pool.executemany( await self.postgres_pool.executemany(
"INSERT INTO translations (book, author, position) " "SELECT update_translation($1, $2, $3, $4)",
"SELECT "
"(SELECT id FROM books WHERE source = $1 AND remote_id = $2), "
"(SELECT id FROM authors WHERE source = $1 AND remote_id = $3), "
"$4 "
"ON CONFLICT (book, author) "
"DO UPDATE SET position = EXCLUDED.position;",
[(self.SOURCE, *row) for row in rows], [(self.SOURCE, *row) for row in rows],
) )
await self.log("Translations updated!") await self.logger.info("Translations updated!")
async def _update_sequences(self): async def _update_sequences(self):
def prepare_sequence(row: list): def prepare_sequence(row: list):
@@ -260,89 +335,172 @@ class FlUpdater(BaseUpdater):
remove_wrong_ch(row[1]), remove_wrong_ch(row[1]),
] ]
await self.log("Update sequences...") await self.logger.info("Update sequences...")
await self.postgres_pool.execute(
"""
CREATE OR REPLACE FUNCTION update_sequences(source_ smallint, remote_id_ int, name_ varchar) RETURNS void AS $$
BEGIN
IF EXISTS (SELECT * FROM sequences WHERE source = source_ AND remote_id = remote_id_) THEN
UPDATE sequences SET name = name_ WHERE source = source_ AND remote_id = remote_id_;
RETURN;
END IF;
INSERT INTO sequences (source, remote_id, name) VALUES (source_, remote_id_, name_);
END;
$$ LANGUAGE plpgsql;
"""
)
async with self.mysql_pool.acquire() as conn: async with self.mysql_pool.acquire() as conn:
async with conn.cursor() as cursor: async with conn.cursor() as cursor:
await cursor.execute("SELECT SeqId, SeqName FROM libseqname;") await cursor.execute("SELECT COUNT(*) FROM libseqname;")
(rows_count,) = await cursor.fetchone()
for offset in range(0, rows_count, 4096):
await cursor.execute(
"SELECT SeqId, SeqName FROM libseqname LIMIT 4096 OFFSET {offset};".format(
offset=offset
)
)
rows = await cursor.fetchall()
while rows := await cursor.fetchmany(32):
await self.postgres_pool.executemany( await self.postgres_pool.executemany(
"INSERT INTO sequences (source, remote_id, name) " "SELECT update_sequences($1, $2, cast($3 as varchar));",
"VALUES ($1, $2, cast($3 as varchar)) "
"ON CONFLICT (source, remote_id) "
"DO UPDATE SET name = EXCLUDED.name;",
[prepare_sequence(row) for row in rows], [prepare_sequence(row) for row in rows],
) )
self.sequences_updated_event.set() self.sequences_updated_event.set()
await self.log("Sequences updated!") await self.logger.info("Sequences updated!")
async def _update_sequences_info(self): async def _update_sequences_info(self):
await self.sequences_updated_event.wait() await self.sequences_updated_event.wait()
await self.books_updated_event.wait() await self.books_updated_event.wait()
await self.log("Update book_sequences...") await self.logger.info("Update book_sequences...")
await self.postgres_pool.execute(
"""
CREATE OR REPLACE FUNCTION update_book_sequence(source_ smallint, book_ integer, sequence_ integer, position_ smallint) RETURNS void AS $$
DECLARE
book_id integer := -1;
sequence_id integer := -1;
BEGIN
SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_;
SELECT id INTO sequence_id FROM sequences WHERE source = source_ AND remote_id = sequence_;
IF EXISTS (SELECT * FROM book_sequences WHERE book = book_id AND sequence = sequence_id) THEN
UPDATE book_sequences SET position = position_ WHERE book = book_id AND sequence = sequence_id;
RETURN;
END IF;
INSERT INTO book_sequences (book, sequence, position) VALUES (book_id, sequence_id, position_);
END;
$$ LANGUAGE plpgsql;
"""
)
async with self.mysql_pool.acquire() as conn: async with self.mysql_pool.acquire() as conn:
async with conn.cursor() as cursor: async with conn.cursor() as cursor:
await cursor.execute( await cursor.execute(
"SELECT BookId, SeqId, level FROM libseq " "SELECT COUNT(*) FROM libseq "
"WHERE " "WHERE "
"BookId IN (SELECT BookId FROM libbook) AND " "BookId IN (SELECT BookId FROM libbook) AND "
"SeqId IN (SELECT SeqId FROM libseqname);" "SeqId IN (SELECT SeqId FROM libseqname);"
) )
while rows := await cursor.fetchmany(32): (rows_count,) = await cursor.fetchone()
for offset in range(0, rows_count, 4096):
await cursor.execute(
"SELECT BookId, SeqId, level FROM libseq "
"WHERE "
"BookId IN (SELECT BookId FROM libbook) AND "
"SeqId IN (SELECT SeqId FROM libseqname) "
"LIMIT 4096 OFFSET {offset};".format(offset=offset)
)
rows = await cursor.fetchall()
await self.postgres_pool.executemany( await self.postgres_pool.executemany(
"INSERT INTO book_sequences (book, sequence, position) " "SELECT update_book_sequence($1, $2, $3, $4);",
"SELECT "
"(SELECT id FROM books WHERE source = $1 AND remote_id = $2), "
"(SELECT id FROM sequences WHERE source = $1 AND remote_id = $3), "
"$4 "
"ON CONFLICT (book, sequence) "
"DO UPDATE SET position = EXCLUDED.position;",
[[self.SOURCE, *row] for row in rows], [[self.SOURCE, *row] for row in rows],
) )
await self.log("Book_sequences updated!") await self.logger.info("Book_sequences updated!")
async def _update_book_annotations(self): async def _update_book_annotations(self):
await self.books_updated_event.wait() await self.books_updated_event.wait()
await self.log("Update book_annotations...") await self.logger.info("Update book_annotations...")
await self.postgres_pool.execute(
"""
CREATE OR REPLACE FUNCTION update_book_annotation(source_ smallint, book_ integer, title_ varchar, text_ text) RETURNS void AS $$
DECLARE
book_id integer := -1;
BEGIN
SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_;
IF EXISTS (SELECT * FROM book_annotations WHERE book = book_id) THEN
UPDATE book_annotations SET title = title_, text = text_ WHERE book = book_id;
RETURN;
END IF;
INSERT INTO book_annotations (book, title, text) VALUES (book_id, title_, text_);
END;
$$ LANGUAGE plpgsql;
"""
)
async with self.mysql_pool.acquire() as conn: async with self.mysql_pool.acquire() as conn:
async with conn.cursor() as cursor: async with conn.cursor() as cursor:
await cursor.execute( await cursor.execute(
"SELECT BookId, Title, Body FROM libbannotations " "SELECT COUNT(*) FROM libbannotations "
"WHERE BookId IN (SELECT BookId FROM libbook);" "WHERE BookId IN (SELECT BookId FROM libbook);"
) )
while rows := await cursor.fetchmany(32): (rows_count,) = await cursor.fetchone()
for offset in range(0, rows_count, 4096):
await cursor.execute(
"SELECT BookId, Title, Body FROM libbannotations "
"WHERE BookId IN (SELECT BookId FROM libbook) "
"LIMIT 4096 OFFSET {offset};".format(offset=offset)
)
rows = await cursor.fetchall()
await self.postgres_pool.executemany( await self.postgres_pool.executemany(
"INSERT INTO book_annotations (book, title, text) " "SELECT update_book_annotation($1, $2, cast($3 as varchar), cast($4 as text));",
"SELECT "
"(SELECT id FROM books WHERE source = $1 AND remote_id = $2), "
"$3, $4 "
"ON CONFLICT (book) "
"DO UPDATE SET title = EXCLUDED.title, text = EXCLUDED.text;",
[[self.SOURCE, *row] for row in rows], [[self.SOURCE, *row] for row in rows],
) )
await self.log("Book_annotations updated!") await self.logger.info("Book_annotations updated!")
await self._update_book_annotations_pic() await self._update_book_annotations_pic()
async def _update_book_annotations_pic(self): async def _update_book_annotations_pic(self):
await self.log("Update book_annotations_pic...") await self.logger.info("Update book_annotations_pic...")
async with self.mysql_pool.acquire() as conn: async with self.mysql_pool.acquire() as conn:
async with conn.cursor() as cursor: async with conn.cursor() as cursor:
await cursor.execute("SELECT BookId, File FROM libbpics;") await cursor.execute("SELECT COUNT(*) FROM libbpics;")
(rows_count,) = await cursor.fetchone()
for offset in range(0, rows_count, 4096):
await cursor.execute(
"SELECT BookId, File FROM libbpics LIMIT 4096 OFFSET {offset};".format(
offset=offset
)
)
rows = await cursor.fetchall()
while rows := await cursor.fetchmany(32):
await self.postgres_pool.executemany( await self.postgres_pool.executemany(
"UPDATE book_annotations " "UPDATE book_annotations "
"SET file = cast($3 as varchar) " "SET file = cast($3 as varchar) "
@@ -351,42 +509,74 @@ class FlUpdater(BaseUpdater):
[[self.SOURCE, *row] for row in rows], [[self.SOURCE, *row] for row in rows],
) )
await self.log("Book_annotation_pics updated!") await self.logger.info("Book_annotation_pics updated!")
async def _update_author_annotations(self): async def _update_author_annotations(self):
await self.authors_updated_event.wait() await self.authors_updated_event.wait()
await self.log("Update author_annotations...") await self.logger.info("Update author_annotations...")
await self.postgres_pool.execute(
"""
CREATE OR REPLACE FUNCTION update_author_annotation(source_ smallint, author_ integer, title_ varchar, text_ text) RETURNS void AS $$
DECLARE
author_id integer := -1;
BEGIN
SELECT id INTO author_id FROM authors WHERE source = source_ AND remote_id = author_;
IF EXISTS (SELECT * FROM author_annotations WHERE author = author_id) THEN
UPDATE author_annotations SET title = title_, text = text_ WHERE author = author_id;
RETURN;
END IF;
INSERT INTO author_annotations (author, title, text) VALUES (author_id, title_, text_);
END;
$$ LANGUAGE plpgsql;
"""
)
async with self.mysql_pool.acquire() as conn: async with self.mysql_pool.acquire() as conn:
async with conn.cursor() as cursor: async with conn.cursor() as cursor:
await cursor.execute("SELECT COUNT(*) FROM libaannotations;")
(rows_count,) = await cursor.fetchone()
for offset in range(0, rows_count, 4096):
await cursor.execute( await cursor.execute(
"SELECT AvtorId, Title, Body FROM libaannotations;" "SELECT AvtorId, Title, Body FROM libaannotations LIMIT 4096 OFFSET {offset};".format(
offset=offset
)
) )
while rows := await cursor.fetchmany(32): rows = await cursor.fetchall()
await self.postgres_pool.executemany( await self.postgres_pool.executemany(
"INSERT INTO author_annotations (author, title, text) " "SELECT update_author_annotation($1, $2, cast($3 as varchar), cast($4 as text));",
"SELECT "
"(SELECT id FROM authors WHERE source = $1 AND remote_id = $2), "
"$3, $4 "
"ON CONFLICT (author) "
"DO UPDATE SET title = EXCLUDED.title, text = EXCLUDED.text;",
[[self.SOURCE, *row] for row in rows], [[self.SOURCE, *row] for row in rows],
) )
await self.log("Author_annotation_updated!") await self.logger.info("Author_annotation_updated!")
await self._update_author_annotations_pics() await self._update_author_annotations_pics()
async def _update_author_annotations_pics(self): async def _update_author_annotations_pics(self):
await self.log("Update author_annotations_pic...") await self.logger.info("Update author_annotations_pic...")
async with self.mysql_pool.acquire() as conn: async with self.mysql_pool.acquire() as conn:
async with conn.cursor() as cursor: async with conn.cursor() as cursor:
await cursor.execute("SELECT AvtorId, File FROM libapics;") await cursor.execute("SELECT COUNT(*) FROM libapics;")
(rows_count,) = await cursor.fetchone()
for offset in range(0, rows_count, 4096):
await cursor.execute(
"SELECT AvtorId, File FROM libapics LIMIT 4096 OFFSET {offset};".format(
offset=offset
)
)
rows = await cursor.fetchall()
while rows := await cursor.fetchmany(32):
await self.postgres_pool.executemany( await self.postgres_pool.executemany(
"UPDATE author_annotations " "UPDATE author_annotations "
"SET file = cast($3 as varchar) " "SET file = cast($3 as varchar) "
@@ -395,58 +585,104 @@ class FlUpdater(BaseUpdater):
[[self.SOURCE, *row] for row in rows], [[self.SOURCE, *row] for row in rows],
) )
await self.log("Author_annotatioins_pic updated!") await self.logger.info("Author_annotatioins_pic updated!")
async def _update_genres(self): async def _update_genres(self):
await self.log("Update genres...") await self.logger.info("Update genres...")
await self.postgres_pool.execute(
"""
CREATE OR REPLACE FUNCTION update_genre(
source_ smallint, remote_id_ int, code_ varchar, description_ varchar, meta_ varchar
) RETURNS void AS $$
BEGIN
IF EXISTS (SELECT * FROM genres WHERE source = source_ AND remote_id = remote_id_) THEN
UPDATE genres SET code = code_, description = description_, meta = meta_
WHERE source = source_ AND remote_id = remote_id_;
RETURN;
END IF;
INSERT INTO authors (source, remote_id, code, description, meta)
VALUES (source_, remote_id_, code_, description_, meta_);
END;
$$ LANGUAGE plpgsql;
"""
)
async with self.mysql_pool.acquire() as conn: async with self.mysql_pool.acquire() as conn:
async with conn.cursor() as cursor: async with conn.cursor() as cursor:
await cursor.execute("SELECT COUNT(*) FROM libgenrelist;")
(rows_count,) = await cursor.fetchone()
for offset in range(0, rows_count, 4096):
await cursor.execute( await cursor.execute(
"SELECT GenreId, GenreCode, GenreDesc, GenreMeta FROM libgenrelist;" "SELECT GenreId, GenreCode, GenreDesc, GenreMeta FROM libgenrelist LIMIT 4096 OFFSET {offset};".format(
offset=offset
)
) )
while rows := await cursor.fetchmany(32): rows = await cursor.fetchall()
await self.postgres_pool.executemany( await self.postgres_pool.executemany(
"INSERT INTO genres (source, remote_id, code, description, meta) " "SELECT update_genre($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar));",
"VALUES ($1, $2, cast($3 as varchar), cast($4 as varchar), cast($5 as varchar)) "
"ON CONFLICT (source, remote_id) "
"DO UPDATE SET code = EXCLUDED.code, description = EXCLUDED.description, meta = EXCLUDED.meta;",
[[self.SOURCE, *row] for row in rows], [[self.SOURCE, *row] for row in rows],
) )
await self.log("Genres updated!") await self.logger.info("Genres updated!")
async def _update_books_genres(self): async def _update_books_genres(self):
await self.books_updated_event.wait() await self.books_updated_event.wait()
await self.genres_updated_event.wait() await self.genres_updated_event.wait()
await self.log("Update book_genres...") await self.logger.info("Update book_genres...")
await self.postgres_pool.execute(
"""
CREATE OR REPLACE FUNCTION update_book_sequence(source_ smallint, book_ integer, genre_ integer) RETURNS void AS $$
DECLARE
book_id integer := -1;
genre_id integer := -1;
BEGIN
SELECT id INTO book_id FROM books WHERE source = source_ AND remote_id = book_;
SELECT id INTO genre_id FROM genres WHERE source = source_ AND remote_id = genre_;
IF EXISTS (SELECT * FROM book_genres WHERE book = book_id AND genre = genre_id) THEN
RETURN;
END IF;
INSERT INTO book_genres (book, genre) VALUES (book_id, genre_id);
END;
$$ LANGUAGE plpgsql;
"""
)
async with self.mysql_pool.acquire() as conn: async with self.mysql_pool.acquire() as conn:
async with conn.cursor() as cursor: async with conn.cursor() as cursor:
await cursor.execute("SELECT BookId, GenreId FROM libgenre;") await cursor.execute("SELECT COUNT(*) FROM libgenre;")
(rows_count,) = await cursor.fetchone()
for offset in range(0, rows_count, 4096):
await cursor.execute(
"SELECT BookId, GenreId FROM libgenre LIMIT 4096 OFFSET {offset};".format(
offset=offset
)
)
rows = await cursor.fetchall()
while rows := await cursor.fetchmany(32):
await self.postgres_pool.executemany( await self.postgres_pool.executemany(
"INSERT INTO book_genres (book, genre) " "SELECT update_book_sequence($1, $2, $3);",
"SELECT "
"(SELECT id FROM books WHERE source = $1 AND remote_id = $2), "
"(SELECT id FROM genres WHERE source = $1 AND remote_id = $3) "
"ON CONFLICT (book, author) DO NOTHING;",
[(self.SOURCE, *row) for row in rows], [(self.SOURCE, *row) for row in rows],
) )
await self.log("Book_genres updated!") await self.logger.info("Book_genres updated!")
async def _update(self) -> bool: async def _update(self) -> bool:
self.platform = platform.platform()
self.logger = Logger.with_default_handlers() self.logger = Logger.with_default_handlers()
await self._prepare() await self._prepare()
await self._drop_tables()
await asyncio.gather(*[self._import_dump(filename) for filename in self.FILES]) await asyncio.gather(*[self._import_dump(filename) for filename in self.FILES])
await self._set_source() await self._set_source()

View File

@@ -6,6 +6,8 @@ from core.config import env_config, WebhookConfig
class WebhookSender: class WebhookSender:
@classmethod @classmethod
async def _make_request(cls, webhook: WebhookConfig): async def _make_request(cls, webhook: WebhookConfig):
print(f"Make request to {webhook.url}")
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
request_maker = getattr(client, webhook.method) request_maker = getattr(client, webhook.method)
await request_maker(webhook.url, headers=webhook.headers) await request_maker(webhook.url, headers=webhook.headers)

View File

@@ -13,4 +13,4 @@ async def update(updater: UpdaterTypes, background_tasks: BackgroundTasks):
background_tasks.add_task(updater_.update) background_tasks.add_task(updater_.update)
return True return "Ok!"