From 3f46d0ef0d0dded2f3eaa9c3b4e47fad8efd484c Mon Sep 17 00:00:00 2001 From: Bulat Kurbanov Date: Thu, 28 Apr 2022 12:54:40 +0300 Subject: [PATCH] Add webhooks --- src/app/services/updaters/fl_updater.py | 125 ++++++++++++++++------- src/app/services/updaters/utils/tasks.py | 16 +-- src/core/config.py | 2 + src/core/setup_arq.py | 2 +- 4 files changed, 101 insertions(+), 44 deletions(-) diff --git a/src/app/services/updaters/fl_updater.py b/src/app/services/updaters/fl_updater.py index 5698cfa..1b4383f 100644 --- a/src/app/services/updaters/fl_updater.py +++ b/src/app/services/updaters/fl_updater.py @@ -7,6 +7,7 @@ import aiomysql from arq.connections import ArqRedis from arq.worker import Retry import asyncpg +import httpx from app.services.updaters.utils.cmd import run_cmd from app.services.updaters.utils.tasks import is_jobs_complete @@ -50,6 +51,8 @@ class JobId(Enum): update_genres = "update_fl_genres" update_books_genres = "update_fl_books_genres" + webhook = "fl_webhook" + async def import_fl_dump(ctx: dict, filename: str, *args, **kwargs): await run_cmd( @@ -101,10 +104,12 @@ async def get_source(postgres: asyncpg.Connection) -> int: async def update_fl_authors(ctx: dict, *args, prefix: Optional[str] = None, **kwargs): arq_pool: ArqRedis = ctx["arq_pool"] - if not await is_jobs_complete( + is_deps_complete, not_complete_count = await is_jobs_complete( arq_pool, [JobId.import_libavtorname.value], prefix=prefix - ): - raise Retry(defer=60) + ) + + if not is_deps_complete: + raise Retry(defer=60 * not_complete_count) postgres, mysql = await get_db_cons() @@ -156,10 +161,12 @@ async def update_fl_authors(ctx: dict, *args, prefix: Optional[str] = None, **kw async def update_fl_books(ctx: dict, *args, prefix: Optional[str] = None, **kwargs): arq_pool: ArqRedis = ctx["arq_pool"] - if not await is_jobs_complete( + is_deps_complete, not_complete_count = await is_jobs_complete( arq_pool, [JobId.import_libbook.value], prefix=prefix - ): - raise Retry(defer=60) + ) + + if not is_deps_complete: + raise Retry(defer=60 * not_complete_count) postgres, mysql = await get_db_cons() @@ -224,7 +231,7 @@ async def update_fl_books_authors( ): arq_pool: ArqRedis = ctx["arq_pool"] - if not await is_jobs_complete( + is_deps_complete, not_complete_count = await is_jobs_complete( arq_pool, [ JobId.import_libavtor.value, @@ -232,8 +239,10 @@ async def update_fl_books_authors( JobId.update_books.value, ], prefix=prefix, - ): - raise Retry(defer=60) + ) + + if not is_deps_complete: + raise Retry(defer=60 * not_complete_count) postgres, mysql = await get_db_cons() @@ -277,7 +286,7 @@ async def update_fl_translations( ): arq_pool: ArqRedis = ctx["arq_pool"] - if not await is_jobs_complete( + is_deps_complete, not_complete_count = await is_jobs_complete( arq_pool, [ JobId.import_libtranslator.value, @@ -285,8 +294,10 @@ async def update_fl_translations( JobId.update_books.value, ], prefix=prefix, - ): - raise Retry(defer=60) + ) + + if not is_deps_complete: + raise Retry(defer=60 * not_complete_count) postgres, mysql = await get_db_cons() @@ -333,14 +344,16 @@ async def update_fl_translations( async def update_fl_sequences(ctx: dict, *args, prefix: Optional[str] = None, **kwargs): arq_pool: ArqRedis = ctx["arq_pool"] - if not await is_jobs_complete( + is_deps_complete, not_complete_count = await is_jobs_complete( arq_pool, [ JobId.import_libseqname.value, ], prefix=prefix, - ): - raise Retry(defer=60) + ) + + if not is_deps_complete: + raise Retry(defer=60 * not_complete_count) postgres, mysql = await get_db_cons() @@ -386,7 +399,7 @@ async def update_fl_sequences_info( ): arq_pool: ArqRedis = ctx["arq_pool"] - if not await is_jobs_complete( + is_deps_complete, not_complete_count = await is_jobs_complete( arq_pool, [ JobId.import_libseq.value, @@ -394,8 +407,10 @@ async def update_fl_sequences_info( JobId.update_books.value, ], prefix=prefix, - ): - raise Retry(defer=60) + ) + + if not is_deps_complete: + raise Retry(defer=60 * not_complete_count) postgres, mysql = await get_db_cons() @@ -445,15 +460,17 @@ async def update_fl_book_annotations( ): arq_pool: ArqRedis = ctx["arq_pool"] - if not await is_jobs_complete( + is_deps_complete, not_complete_count = await is_jobs_complete( arq_pool, [ JobId.import_lib_b_annotations.value, JobId.update_books.value, ], prefix=prefix, - ): - raise Retry(defer=60) + ) + + if not is_deps_complete: + raise Retry(defer=60 * not_complete_count) postgres, mysql = await get_db_cons() @@ -507,15 +524,17 @@ async def update_fl_book_annotations_pic( ): arq_pool: ArqRedis = ctx["arq_pool"] - if not await is_jobs_complete( + is_deps_complete, not_complete_count = await is_jobs_complete( arq_pool, [ JobId.import_lib_b_annotations_pics.value, JobId.update_book_annotations.value, ], prefix=prefix, - ): - raise Retry(defer=60) + ) + + if not is_deps_complete: + raise Retry(defer=60 * not_complete_count) postgres, mysql = await get_db_cons() @@ -545,15 +564,17 @@ async def update_fl_author_annotations( ): arq_pool: ArqRedis = ctx["arq_pool"] - if not await is_jobs_complete( + is_deps_complete, not_complete_count = await is_jobs_complete( arq_pool, [ JobId.import_lib_a_annotations.value, JobId.update_authors.value, ], prefix=prefix, - ): - raise Retry(defer=60) + ) + + if not is_deps_complete: + raise Retry(defer=60 * not_complete_count) postgres, mysql = await get_db_cons() @@ -640,14 +661,16 @@ async def update_fl_author_annotations_pics( async def update_fl_genres(ctx: dict, *args, prefix: Optional[str] = None, **kwargs): arq_pool: ArqRedis = ctx["arq_pool"] - if not await is_jobs_complete( + is_deps_complete, not_complete_count = await is_jobs_complete( arq_pool, [ JobId.import_libgenrelist.value, ], prefix=prefix, - ): - raise Retry(defer=60) + ) + + if not is_deps_complete: + raise Retry(defer=60 * not_complete_count) postgres, mysql = await get_db_cons() @@ -692,7 +715,7 @@ async def update_fl_books_genres( ): arq_pool: ArqRedis = ctx["arq_pool"] - if not await is_jobs_complete( + is_deps_complete, not_complete_count = await is_jobs_complete( arq_pool, [ JobId.import_libgenre.value, @@ -700,8 +723,10 @@ async def update_fl_books_genres( JobId.update_genres.value, ], prefix=prefix, - ): - raise Retry(defer=60) + ) + + if not is_deps_complete: + raise Retry(defer=60 * not_complete_count) postgres, mysql = await get_db_cons() @@ -740,9 +765,36 @@ async def update_fl_books_genres( mysql.close() -async def run_fl_update(ctx: dict, *args, **kwargs) -> bool: - prefix = str(int(time.time()) // (5 * 60)) +async def update_fl_webhook( + ctx: dict, + *args, + prefix: Optional[str] = None, + **kwargs, +): + arq_pool: ArqRedis = ctx["arq_pool"] + is_deps_complete, not_complete_count = await is_jobs_complete( + arq_pool, [e.value for e in JobId if e != JobId.webhook], prefix=prefix + ) + + if not is_deps_complete: + raise Retry(defer=60 * not_complete_count) + + all_success = True + + for webhook in env_config.WEBHOOKS: + async with httpx.AsyncClient() as client: + response: httpx.Response = await getattr(client, webhook.method)( + webhook.url, headers=webhook.headers + ) + + if response.status_code != 200: + all_success = False + + return all_success + + +async def run_fl_update(ctx: dict, *args, **kwargs) -> bool: IMPORTS = { JobId.import_libbook: "lib.libbook.sql", JobId.import_libavtor: "lib.libavtor.sql", @@ -771,9 +823,11 @@ async def run_fl_update(ctx: dict, *args, **kwargs) -> bool: JobId.update_sequences, JobId.update_sequences_info, JobId.update_genres, + JobId.webhook, ) arq_pool: ArqRedis = ctx["arq_pool"] + prefix = str(int(time.time()) // (5 * 60)) for job_id, filename in IMPORTS.items(): await arq_pool.enqueue_job( @@ -803,4 +857,5 @@ __tasks__ = [ update_fl_author_annotations_pics, update_fl_genres, update_fl_books_genres, + update_fl_webhook, ] diff --git a/src/app/services/updaters/utils/tasks.py b/src/app/services/updaters/utils/tasks.py index 69bdc5c..94321d8 100644 --- a/src/app/services/updaters/utils/tasks.py +++ b/src/app/services/updaters/utils/tasks.py @@ -6,24 +6,24 @@ from arq.jobs import Job, JobStatus async def is_jobs_complete( arq_pool: ArqRedis, job_ids: list[str], prefix: Optional[str] = None -) -> Optional[bool]: - job_statuses = set() +) -> tuple[bool, int]: + job_statuses = [] + for job_id in job_ids: _job_id = f"{prefix}_{job_id}" if prefix else job_id status = await Job( _job_id, arq_pool, arq_pool.default_queue_name, arq_pool.job_deserializer ).status() - job_statuses.add(status.value) - - if JobStatus.not_found.value in job_statuses: - return False + job_statuses.append(status.value) + not_complete_count = 0 for status in ( + JobStatus.not_found.value, JobStatus.deferred.value, JobStatus.in_progress.value, JobStatus.queued.value, ): if status in job_statuses: - return False + not_complete_count += 1 - return True + return not_complete_count == 0, not_complete_count diff --git a/src/core/config.py b/src/core/config.py index bc9ce0b..73a58ab 100644 --- a/src/core/config.py +++ b/src/core/config.py @@ -32,5 +32,7 @@ class EnvConfig(BaseSettings): SENTRY_DSN: str + WEBHOOKS: list[WebhookConfig] + env_config = EnvConfig() diff --git a/src/core/setup_arq.py b/src/core/setup_arq.py index 8f304f0..5b7134a 100644 --- a/src/core/setup_arq.py +++ b/src/core/setup_arq.py @@ -26,4 +26,4 @@ class WorkerSettings: max_jobs = 2 max_tries = 10 job_timeout = 5 * 60 - cron_jobs = [cron(run_fl_update, hour={3}, minute=0)] + cron_jobs = [cron(run_fl_update, hour={5}, minute=0)]