mirror of
https://github.com/flibusta-apps/meilie_updater.git
synced 2025-12-06 15:15:37 +01:00
Refactoring jobs
This commit is contained in:
@@ -8,12 +8,15 @@ from meilisearch import Client
|
|||||||
from core.config import env_config
|
from core.config import env_config
|
||||||
|
|
||||||
|
|
||||||
|
thread_pool = concurrent.futures.ThreadPoolExecutor()
|
||||||
|
|
||||||
|
|
||||||
def get_meilisearch_client() -> Client:
|
def get_meilisearch_client() -> Client:
|
||||||
return Client(url=env_config.MEILI_HOST, api_key=env_config.MEILI_MASTER_KEY)
|
return Client(url=env_config.MEILI_HOST, api_key=env_config.MEILI_MASTER_KEY)
|
||||||
|
|
||||||
|
|
||||||
async def get_postgres_pool() -> asyncpg.Pool:
|
async def get_postgres_connection() -> asyncpg.Connection:
|
||||||
return await asyncpg.create_pool( # type: ignore
|
return await asyncpg.connect(
|
||||||
database=env_config.POSTGRES_DB_NAME,
|
database=env_config.POSTGRES_DB_NAME,
|
||||||
host=env_config.POSTGRES_HOST,
|
host=env_config.POSTGRES_HOST,
|
||||||
port=env_config.POSTGRES_PORT,
|
port=env_config.POSTGRES_PORT,
|
||||||
@@ -38,26 +41,21 @@ async def update_books(ctx) -> bool:
|
|||||||
meili = get_meilisearch_client()
|
meili = get_meilisearch_client()
|
||||||
index = meili.index("books")
|
index = meili.index("books")
|
||||||
|
|
||||||
postgres_pool = await get_postgres_pool()
|
postgres = await get_postgres_connection()
|
||||||
|
cursor = await postgres.cursor(
|
||||||
|
"SELECT id, title, lang FROM books WHERE is_deleted = 'f';"
|
||||||
|
)
|
||||||
|
|
||||||
with concurrent.futures.ThreadPoolExecutor() as pool:
|
while rows := await cursor.fetch(1024):
|
||||||
count = await postgres_pool.fetchval(
|
await loop.run_in_executor(
|
||||||
"SELECT count(*) FROM books WHERE is_deleted = 'f';"
|
thread_pool, index.add_documents, [dict(row) for row in rows]
|
||||||
)
|
)
|
||||||
|
|
||||||
for offset in range(0, count, 4096):
|
|
||||||
rows = await postgres_pool.fetch(
|
|
||||||
"SELECT id, title, lang FROM books WHERE is_deleted = 'f' "
|
|
||||||
f"ORDER BY id LIMIT 4096 OFFSET {offset}"
|
|
||||||
)
|
|
||||||
|
|
||||||
documents = [dict(row) for row in rows]
|
|
||||||
|
|
||||||
await loop.run_in_executor(pool, index.add_documents, documents)
|
|
||||||
|
|
||||||
index.update_searchable_attributes(["title"])
|
index.update_searchable_attributes(["title"])
|
||||||
index.update_filterable_attributes(["lang"])
|
index.update_filterable_attributes(["lang"])
|
||||||
|
|
||||||
|
await postgres.close()
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
@@ -67,42 +65,39 @@ async def update_authors(ctx) -> bool:
|
|||||||
meili = get_meilisearch_client()
|
meili = get_meilisearch_client()
|
||||||
index = meili.index("authors")
|
index = meili.index("authors")
|
||||||
|
|
||||||
postgres_pool = await get_postgres_pool()
|
postgres = await get_postgres_connection()
|
||||||
|
cursor = await postgres.cursor(
|
||||||
|
"SELECT id, first_name, last_name, middle_name, "
|
||||||
|
" array("
|
||||||
|
" SELECT DISTINCT lang FROM book_authors "
|
||||||
|
" LEFT JOIN books ON book = books.id "
|
||||||
|
" WHERE authors.id = book_authors.author "
|
||||||
|
" AND books.is_deleted = 'f' "
|
||||||
|
" ) as author_langs, "
|
||||||
|
" array("
|
||||||
|
" SELECT DISTINCT lang FROM translations "
|
||||||
|
" LEFT JOIN books ON book = books.id "
|
||||||
|
" WHERE authors.id = translations.author "
|
||||||
|
" AND books.is_deleted = 'f' "
|
||||||
|
" ) as translator_langs, "
|
||||||
|
" (SELECT count(books.id) FROM book_authors "
|
||||||
|
" LEFT JOIN books ON book = books.id "
|
||||||
|
" WHERE authors.id = book_authors.author "
|
||||||
|
" AND books.is_deleted = 'f') as books_count "
|
||||||
|
"FROM authors;"
|
||||||
|
)
|
||||||
|
|
||||||
with concurrent.futures.ThreadPoolExecutor() as pool:
|
while rows := await cursor.fetch(1024):
|
||||||
count = await postgres_pool.fetchval("SELECT count(*) FROM authors;")
|
await loop.run_in_executor(
|
||||||
|
thread_pool, index.add_documents, [dict(row) for row in rows]
|
||||||
for offset in range(0, count, 4096):
|
)
|
||||||
rows = await postgres_pool.fetch(
|
|
||||||
"SELECT id, first_name, last_name, middle_name, "
|
|
||||||
" array("
|
|
||||||
" SELECT DISTINCT lang FROM book_authors "
|
|
||||||
" LEFT JOIN books ON book = books.id "
|
|
||||||
" WHERE authors.id = book_authors.author "
|
|
||||||
" AND books.is_deleted = 'f' "
|
|
||||||
" ) as author_langs, "
|
|
||||||
" array("
|
|
||||||
" SELECT DISTINCT lang FROM translations "
|
|
||||||
" LEFT JOIN books ON book = books.id "
|
|
||||||
" WHERE authors.id = translations.author "
|
|
||||||
" AND books.is_deleted = 'f' "
|
|
||||||
" ) as translator_langs, "
|
|
||||||
" (SELECT count(books.id) FROM book_authors "
|
|
||||||
" LEFT JOIN books ON book = books.id "
|
|
||||||
" WHERE authors.id = book_authors.author "
|
|
||||||
" AND books.is_deleted = 'f') as books_count "
|
|
||||||
"FROM authors "
|
|
||||||
f"ORDER BY id LIMIT 4096 OFFSET {offset}"
|
|
||||||
)
|
|
||||||
|
|
||||||
documents = [dict(row) for row in rows]
|
|
||||||
|
|
||||||
await loop.run_in_executor(pool, index.add_documents, documents)
|
|
||||||
|
|
||||||
index.update_searchable_attributes(["first_name", "last_name", "middle_name"])
|
index.update_searchable_attributes(["first_name", "last_name", "middle_name"])
|
||||||
index.update_filterable_attributes(["author_langs", "translator_langs"])
|
index.update_filterable_attributes(["author_langs", "translator_langs"])
|
||||||
index.update_ranking_rules([*DEFAULT_RANKING_RULES, "books_count:desc"])
|
index.update_ranking_rules([*DEFAULT_RANKING_RULES, "books_count:desc"])
|
||||||
|
|
||||||
|
await postgres.close()
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
@@ -112,36 +107,33 @@ async def update_sequences(ctx) -> bool:
|
|||||||
meili = get_meilisearch_client()
|
meili = get_meilisearch_client()
|
||||||
index = meili.index("sequences")
|
index = meili.index("sequences")
|
||||||
|
|
||||||
postgres_pool = await get_postgres_pool()
|
postgres = await get_postgres_connection()
|
||||||
|
cursor = await postgres.cursor(
|
||||||
|
"SELECT id, name, "
|
||||||
|
" array("
|
||||||
|
" SELECT DISTINCT lang FROM book_sequences "
|
||||||
|
" LEFT JOIN books ON book = books.id "
|
||||||
|
" WHERE sequences.id = book_sequences.sequence "
|
||||||
|
" AND books.is_deleted = 'f' "
|
||||||
|
" ) as langs, "
|
||||||
|
" (SELECT count(books.id) FROM book_sequences "
|
||||||
|
" LEFT JOIN books ON book = books.id "
|
||||||
|
" WHERE sequences.id = book_sequences.sequence "
|
||||||
|
" AND books.is_deleted = 'f') as books_count "
|
||||||
|
"FROM sequences;"
|
||||||
|
)
|
||||||
|
|
||||||
with concurrent.futures.ThreadPoolExecutor() as pool:
|
while rows := await cursor.fetch(1024):
|
||||||
count = await postgres_pool.fetchval("SELECT count(*) FROM sequences;")
|
await loop.run_in_executor(
|
||||||
|
thread_pool, index.add_documents, [dict(row) for row in rows]
|
||||||
for offset in range(0, count, 4096):
|
)
|
||||||
rows = await postgres_pool.fetch(
|
|
||||||
"SELECT id, name, "
|
|
||||||
" array("
|
|
||||||
" SELECT DISTINCT lang FROM book_sequences "
|
|
||||||
" LEFT JOIN books ON book = books.id "
|
|
||||||
" WHERE sequences.id = book_sequences.sequence "
|
|
||||||
" AND books.is_deleted = 'f' "
|
|
||||||
" ) as langs, "
|
|
||||||
" (SELECT count(books.id) FROM book_sequences "
|
|
||||||
" LEFT JOIN books ON book = books.id "
|
|
||||||
" WHERE sequences.id = book_sequences.sequence "
|
|
||||||
" AND books.is_deleted = 'f') as books_count "
|
|
||||||
"FROM sequences "
|
|
||||||
f"ORDER BY id LIMIT 4096 OFFSET {offset}"
|
|
||||||
)
|
|
||||||
|
|
||||||
documents = [dict(row) for row in rows]
|
|
||||||
|
|
||||||
await loop.run_in_executor(pool, index.add_documents, documents)
|
|
||||||
|
|
||||||
index.update_searchable_attributes(["name"])
|
index.update_searchable_attributes(["name"])
|
||||||
index.update_filterable_attributes(["langs"])
|
index.update_filterable_attributes(["langs"])
|
||||||
index.update_ranking_rules([*DEFAULT_RANKING_RULES, "books_count:desc"])
|
index.update_ranking_rules([*DEFAULT_RANKING_RULES, "books_count:desc"])
|
||||||
|
|
||||||
|
await postgres.close()
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -12,6 +12,6 @@ class WorkerSettings:
|
|||||||
functions = [update, update_books, update_authors, update_sequences]
|
functions = [update, update_books, update_authors, update_sequences]
|
||||||
on_startup = startup
|
on_startup = startup
|
||||||
redis_settings = get_redis_settings()
|
redis_settings = get_redis_settings()
|
||||||
max_jobs = 1
|
max_jobs = 3
|
||||||
job_timeout = 15 * 60
|
job_timeout = 15 * 60
|
||||||
cron_jobs = [cron(update, hour={6}, minute=0)]
|
cron_jobs = [cron(update, hour={6}, minute=0)]
|
||||||
|
|||||||
Reference in New Issue
Block a user