From cf68c9557d387debb481430254e28a7f13c5ad7a Mon Sep 17 00:00:00 2001 From: Bulat Kurbanov Date: Sat, 19 Mar 2022 00:59:06 +0300 Subject: [PATCH] Refactoring jobs --- src/app/services.py | 132 ++++++++++++++++++++---------------------- src/core/setup_arq.py | 2 +- 2 files changed, 63 insertions(+), 71 deletions(-) diff --git a/src/app/services.py b/src/app/services.py index 88952d9..dd1989a 100644 --- a/src/app/services.py +++ b/src/app/services.py @@ -8,12 +8,15 @@ from meilisearch import Client from core.config import env_config +thread_pool = concurrent.futures.ThreadPoolExecutor() + + def get_meilisearch_client() -> Client: return Client(url=env_config.MEILI_HOST, api_key=env_config.MEILI_MASTER_KEY) -async def get_postgres_pool() -> asyncpg.Pool: - return await asyncpg.create_pool( # type: ignore +async def get_postgres_connection() -> asyncpg.Connection: + return await asyncpg.connect( database=env_config.POSTGRES_DB_NAME, host=env_config.POSTGRES_HOST, port=env_config.POSTGRES_PORT, @@ -38,26 +41,21 @@ async def update_books(ctx) -> bool: meili = get_meilisearch_client() index = meili.index("books") - postgres_pool = await get_postgres_pool() + postgres = await get_postgres_connection() + cursor = await postgres.cursor( + "SELECT id, title, lang FROM books WHERE is_deleted = 'f';" + ) - with concurrent.futures.ThreadPoolExecutor() as pool: - count = await postgres_pool.fetchval( - "SELECT count(*) FROM books WHERE is_deleted = 'f';" + while rows := await cursor.fetch(1024): + await loop.run_in_executor( + thread_pool, index.add_documents, [dict(row) for row in rows] ) - for offset in range(0, count, 4096): - rows = await postgres_pool.fetch( - "SELECT id, title, lang FROM books WHERE is_deleted = 'f' " - f"ORDER BY id LIMIT 4096 OFFSET {offset}" - ) - - documents = [dict(row) for row in rows] - - await loop.run_in_executor(pool, index.add_documents, documents) - index.update_searchable_attributes(["title"]) index.update_filterable_attributes(["lang"]) + await postgres.close() + return True @@ -67,42 +65,39 @@ async def update_authors(ctx) -> bool: meili = get_meilisearch_client() index = meili.index("authors") - postgres_pool = await get_postgres_pool() + postgres = await get_postgres_connection() + cursor = await postgres.cursor( + "SELECT id, first_name, last_name, middle_name, " + " array(" + " SELECT DISTINCT lang FROM book_authors " + " LEFT JOIN books ON book = books.id " + " WHERE authors.id = book_authors.author " + " AND books.is_deleted = 'f' " + " ) as author_langs, " + " array(" + " SELECT DISTINCT lang FROM translations " + " LEFT JOIN books ON book = books.id " + " WHERE authors.id = translations.author " + " AND books.is_deleted = 'f' " + " ) as translator_langs, " + " (SELECT count(books.id) FROM book_authors " + " LEFT JOIN books ON book = books.id " + " WHERE authors.id = book_authors.author " + " AND books.is_deleted = 'f') as books_count " + "FROM authors;" + ) - with concurrent.futures.ThreadPoolExecutor() as pool: - count = await postgres_pool.fetchval("SELECT count(*) FROM authors;") - - for offset in range(0, count, 4096): - rows = await postgres_pool.fetch( - "SELECT id, first_name, last_name, middle_name, " - " array(" - " SELECT DISTINCT lang FROM book_authors " - " LEFT JOIN books ON book = books.id " - " WHERE authors.id = book_authors.author " - " AND books.is_deleted = 'f' " - " ) as author_langs, " - " array(" - " SELECT DISTINCT lang FROM translations " - " LEFT JOIN books ON book = books.id " - " WHERE authors.id = translations.author " - " AND books.is_deleted = 'f' " - " ) as translator_langs, " - " (SELECT count(books.id) FROM book_authors " - " LEFT JOIN books ON book = books.id " - " WHERE authors.id = book_authors.author " - " AND books.is_deleted = 'f') as books_count " - "FROM authors " - f"ORDER BY id LIMIT 4096 OFFSET {offset}" - ) - - documents = [dict(row) for row in rows] - - await loop.run_in_executor(pool, index.add_documents, documents) + while rows := await cursor.fetch(1024): + await loop.run_in_executor( + thread_pool, index.add_documents, [dict(row) for row in rows] + ) index.update_searchable_attributes(["first_name", "last_name", "middle_name"]) index.update_filterable_attributes(["author_langs", "translator_langs"]) index.update_ranking_rules([*DEFAULT_RANKING_RULES, "books_count:desc"]) + await postgres.close() + return True @@ -112,36 +107,33 @@ async def update_sequences(ctx) -> bool: meili = get_meilisearch_client() index = meili.index("sequences") - postgres_pool = await get_postgres_pool() + postgres = await get_postgres_connection() + cursor = await postgres.cursor( + "SELECT id, name, " + " array(" + " SELECT DISTINCT lang FROM book_sequences " + " LEFT JOIN books ON book = books.id " + " WHERE sequences.id = book_sequences.sequence " + " AND books.is_deleted = 'f' " + " ) as langs, " + " (SELECT count(books.id) FROM book_sequences " + " LEFT JOIN books ON book = books.id " + " WHERE sequences.id = book_sequences.sequence " + " AND books.is_deleted = 'f') as books_count " + "FROM sequences;" + ) - with concurrent.futures.ThreadPoolExecutor() as pool: - count = await postgres_pool.fetchval("SELECT count(*) FROM sequences;") - - for offset in range(0, count, 4096): - rows = await postgres_pool.fetch( - "SELECT id, name, " - " array(" - " SELECT DISTINCT lang FROM book_sequences " - " LEFT JOIN books ON book = books.id " - " WHERE sequences.id = book_sequences.sequence " - " AND books.is_deleted = 'f' " - " ) as langs, " - " (SELECT count(books.id) FROM book_sequences " - " LEFT JOIN books ON book = books.id " - " WHERE sequences.id = book_sequences.sequence " - " AND books.is_deleted = 'f') as books_count " - "FROM sequences " - f"ORDER BY id LIMIT 4096 OFFSET {offset}" - ) - - documents = [dict(row) for row in rows] - - await loop.run_in_executor(pool, index.add_documents, documents) + while rows := await cursor.fetch(1024): + await loop.run_in_executor( + thread_pool, index.add_documents, [dict(row) for row in rows] + ) index.update_searchable_attributes(["name"]) index.update_filterable_attributes(["langs"]) index.update_ranking_rules([*DEFAULT_RANKING_RULES, "books_count:desc"]) + await postgres.close() + return True diff --git a/src/core/setup_arq.py b/src/core/setup_arq.py index 58d7642..b17a230 100644 --- a/src/core/setup_arq.py +++ b/src/core/setup_arq.py @@ -12,6 +12,6 @@ class WorkerSettings: functions = [update, update_books, update_authors, update_sequences] on_startup = startup redis_settings = get_redis_settings() - max_jobs = 1 + max_jobs = 3 job_timeout = 15 * 60 cron_jobs = [cron(update, hour={6}, minute=0)]