Add webhooks

This commit is contained in:
2022-04-28 12:54:40 +03:00
parent da9585badf
commit 3f46d0ef0d
4 changed files with 101 additions and 44 deletions

View File

@@ -7,6 +7,7 @@ import aiomysql
from arq.connections import ArqRedis from arq.connections import ArqRedis
from arq.worker import Retry from arq.worker import Retry
import asyncpg import asyncpg
import httpx
from app.services.updaters.utils.cmd import run_cmd from app.services.updaters.utils.cmd import run_cmd
from app.services.updaters.utils.tasks import is_jobs_complete from app.services.updaters.utils.tasks import is_jobs_complete
@@ -50,6 +51,8 @@ class JobId(Enum):
update_genres = "update_fl_genres" update_genres = "update_fl_genres"
update_books_genres = "update_fl_books_genres" update_books_genres = "update_fl_books_genres"
webhook = "fl_webhook"
async def import_fl_dump(ctx: dict, filename: str, *args, **kwargs): async def import_fl_dump(ctx: dict, filename: str, *args, **kwargs):
await run_cmd( await run_cmd(
@@ -101,10 +104,12 @@ async def get_source(postgres: asyncpg.Connection) -> int:
async def update_fl_authors(ctx: dict, *args, prefix: Optional[str] = None, **kwargs): async def update_fl_authors(ctx: dict, *args, prefix: Optional[str] = None, **kwargs):
arq_pool: ArqRedis = ctx["arq_pool"] arq_pool: ArqRedis = ctx["arq_pool"]
if not await is_jobs_complete( is_deps_complete, not_complete_count = await is_jobs_complete(
arq_pool, [JobId.import_libavtorname.value], prefix=prefix arq_pool, [JobId.import_libavtorname.value], prefix=prefix
): )
raise Retry(defer=60)
if not is_deps_complete:
raise Retry(defer=60 * not_complete_count)
postgres, mysql = await get_db_cons() postgres, mysql = await get_db_cons()
@@ -156,10 +161,12 @@ async def update_fl_authors(ctx: dict, *args, prefix: Optional[str] = None, **kw
async def update_fl_books(ctx: dict, *args, prefix: Optional[str] = None, **kwargs): async def update_fl_books(ctx: dict, *args, prefix: Optional[str] = None, **kwargs):
arq_pool: ArqRedis = ctx["arq_pool"] arq_pool: ArqRedis = ctx["arq_pool"]
if not await is_jobs_complete( is_deps_complete, not_complete_count = await is_jobs_complete(
arq_pool, [JobId.import_libbook.value], prefix=prefix arq_pool, [JobId.import_libbook.value], prefix=prefix
): )
raise Retry(defer=60)
if not is_deps_complete:
raise Retry(defer=60 * not_complete_count)
postgres, mysql = await get_db_cons() postgres, mysql = await get_db_cons()
@@ -224,7 +231,7 @@ async def update_fl_books_authors(
): ):
arq_pool: ArqRedis = ctx["arq_pool"] arq_pool: ArqRedis = ctx["arq_pool"]
if not await is_jobs_complete( is_deps_complete, not_complete_count = await is_jobs_complete(
arq_pool, arq_pool,
[ [
JobId.import_libavtor.value, JobId.import_libavtor.value,
@@ -232,8 +239,10 @@ async def update_fl_books_authors(
JobId.update_books.value, JobId.update_books.value,
], ],
prefix=prefix, prefix=prefix,
): )
raise Retry(defer=60)
if not is_deps_complete:
raise Retry(defer=60 * not_complete_count)
postgres, mysql = await get_db_cons() postgres, mysql = await get_db_cons()
@@ -277,7 +286,7 @@ async def update_fl_translations(
): ):
arq_pool: ArqRedis = ctx["arq_pool"] arq_pool: ArqRedis = ctx["arq_pool"]
if not await is_jobs_complete( is_deps_complete, not_complete_count = await is_jobs_complete(
arq_pool, arq_pool,
[ [
JobId.import_libtranslator.value, JobId.import_libtranslator.value,
@@ -285,8 +294,10 @@ async def update_fl_translations(
JobId.update_books.value, JobId.update_books.value,
], ],
prefix=prefix, prefix=prefix,
): )
raise Retry(defer=60)
if not is_deps_complete:
raise Retry(defer=60 * not_complete_count)
postgres, mysql = await get_db_cons() postgres, mysql = await get_db_cons()
@@ -333,14 +344,16 @@ async def update_fl_translations(
async def update_fl_sequences(ctx: dict, *args, prefix: Optional[str] = None, **kwargs): async def update_fl_sequences(ctx: dict, *args, prefix: Optional[str] = None, **kwargs):
arq_pool: ArqRedis = ctx["arq_pool"] arq_pool: ArqRedis = ctx["arq_pool"]
if not await is_jobs_complete( is_deps_complete, not_complete_count = await is_jobs_complete(
arq_pool, arq_pool,
[ [
JobId.import_libseqname.value, JobId.import_libseqname.value,
], ],
prefix=prefix, prefix=prefix,
): )
raise Retry(defer=60)
if not is_deps_complete:
raise Retry(defer=60 * not_complete_count)
postgres, mysql = await get_db_cons() postgres, mysql = await get_db_cons()
@@ -386,7 +399,7 @@ async def update_fl_sequences_info(
): ):
arq_pool: ArqRedis = ctx["arq_pool"] arq_pool: ArqRedis = ctx["arq_pool"]
if not await is_jobs_complete( is_deps_complete, not_complete_count = await is_jobs_complete(
arq_pool, arq_pool,
[ [
JobId.import_libseq.value, JobId.import_libseq.value,
@@ -394,8 +407,10 @@ async def update_fl_sequences_info(
JobId.update_books.value, JobId.update_books.value,
], ],
prefix=prefix, prefix=prefix,
): )
raise Retry(defer=60)
if not is_deps_complete:
raise Retry(defer=60 * not_complete_count)
postgres, mysql = await get_db_cons() postgres, mysql = await get_db_cons()
@@ -445,15 +460,17 @@ async def update_fl_book_annotations(
): ):
arq_pool: ArqRedis = ctx["arq_pool"] arq_pool: ArqRedis = ctx["arq_pool"]
if not await is_jobs_complete( is_deps_complete, not_complete_count = await is_jobs_complete(
arq_pool, arq_pool,
[ [
JobId.import_lib_b_annotations.value, JobId.import_lib_b_annotations.value,
JobId.update_books.value, JobId.update_books.value,
], ],
prefix=prefix, prefix=prefix,
): )
raise Retry(defer=60)
if not is_deps_complete:
raise Retry(defer=60 * not_complete_count)
postgres, mysql = await get_db_cons() postgres, mysql = await get_db_cons()
@@ -507,15 +524,17 @@ async def update_fl_book_annotations_pic(
): ):
arq_pool: ArqRedis = ctx["arq_pool"] arq_pool: ArqRedis = ctx["arq_pool"]
if not await is_jobs_complete( is_deps_complete, not_complete_count = await is_jobs_complete(
arq_pool, arq_pool,
[ [
JobId.import_lib_b_annotations_pics.value, JobId.import_lib_b_annotations_pics.value,
JobId.update_book_annotations.value, JobId.update_book_annotations.value,
], ],
prefix=prefix, prefix=prefix,
): )
raise Retry(defer=60)
if not is_deps_complete:
raise Retry(defer=60 * not_complete_count)
postgres, mysql = await get_db_cons() postgres, mysql = await get_db_cons()
@@ -545,15 +564,17 @@ async def update_fl_author_annotations(
): ):
arq_pool: ArqRedis = ctx["arq_pool"] arq_pool: ArqRedis = ctx["arq_pool"]
if not await is_jobs_complete( is_deps_complete, not_complete_count = await is_jobs_complete(
arq_pool, arq_pool,
[ [
JobId.import_lib_a_annotations.value, JobId.import_lib_a_annotations.value,
JobId.update_authors.value, JobId.update_authors.value,
], ],
prefix=prefix, prefix=prefix,
): )
raise Retry(defer=60)
if not is_deps_complete:
raise Retry(defer=60 * not_complete_count)
postgres, mysql = await get_db_cons() postgres, mysql = await get_db_cons()
@@ -640,14 +661,16 @@ async def update_fl_author_annotations_pics(
async def update_fl_genres(ctx: dict, *args, prefix: Optional[str] = None, **kwargs): async def update_fl_genres(ctx: dict, *args, prefix: Optional[str] = None, **kwargs):
arq_pool: ArqRedis = ctx["arq_pool"] arq_pool: ArqRedis = ctx["arq_pool"]
if not await is_jobs_complete( is_deps_complete, not_complete_count = await is_jobs_complete(
arq_pool, arq_pool,
[ [
JobId.import_libgenrelist.value, JobId.import_libgenrelist.value,
], ],
prefix=prefix, prefix=prefix,
): )
raise Retry(defer=60)
if not is_deps_complete:
raise Retry(defer=60 * not_complete_count)
postgres, mysql = await get_db_cons() postgres, mysql = await get_db_cons()
@@ -692,7 +715,7 @@ async def update_fl_books_genres(
): ):
arq_pool: ArqRedis = ctx["arq_pool"] arq_pool: ArqRedis = ctx["arq_pool"]
if not await is_jobs_complete( is_deps_complete, not_complete_count = await is_jobs_complete(
arq_pool, arq_pool,
[ [
JobId.import_libgenre.value, JobId.import_libgenre.value,
@@ -700,8 +723,10 @@ async def update_fl_books_genres(
JobId.update_genres.value, JobId.update_genres.value,
], ],
prefix=prefix, prefix=prefix,
): )
raise Retry(defer=60)
if not is_deps_complete:
raise Retry(defer=60 * not_complete_count)
postgres, mysql = await get_db_cons() postgres, mysql = await get_db_cons()
@@ -740,9 +765,36 @@ async def update_fl_books_genres(
mysql.close() mysql.close()
async def run_fl_update(ctx: dict, *args, **kwargs) -> bool: async def update_fl_webhook(
prefix = str(int(time.time()) // (5 * 60)) ctx: dict,
*args,
prefix: Optional[str] = None,
**kwargs,
):
arq_pool: ArqRedis = ctx["arq_pool"]
is_deps_complete, not_complete_count = await is_jobs_complete(
arq_pool, [e.value for e in JobId if e != JobId.webhook], prefix=prefix
)
if not is_deps_complete:
raise Retry(defer=60 * not_complete_count)
all_success = True
for webhook in env_config.WEBHOOKS:
async with httpx.AsyncClient() as client:
response: httpx.Response = await getattr(client, webhook.method)(
webhook.url, headers=webhook.headers
)
if response.status_code != 200:
all_success = False
return all_success
async def run_fl_update(ctx: dict, *args, **kwargs) -> bool:
IMPORTS = { IMPORTS = {
JobId.import_libbook: "lib.libbook.sql", JobId.import_libbook: "lib.libbook.sql",
JobId.import_libavtor: "lib.libavtor.sql", JobId.import_libavtor: "lib.libavtor.sql",
@@ -771,9 +823,11 @@ async def run_fl_update(ctx: dict, *args, **kwargs) -> bool:
JobId.update_sequences, JobId.update_sequences,
JobId.update_sequences_info, JobId.update_sequences_info,
JobId.update_genres, JobId.update_genres,
JobId.webhook,
) )
arq_pool: ArqRedis = ctx["arq_pool"] arq_pool: ArqRedis = ctx["arq_pool"]
prefix = str(int(time.time()) // (5 * 60))
for job_id, filename in IMPORTS.items(): for job_id, filename in IMPORTS.items():
await arq_pool.enqueue_job( await arq_pool.enqueue_job(
@@ -803,4 +857,5 @@ __tasks__ = [
update_fl_author_annotations_pics, update_fl_author_annotations_pics,
update_fl_genres, update_fl_genres,
update_fl_books_genres, update_fl_books_genres,
update_fl_webhook,
] ]

View File

@@ -6,24 +6,24 @@ from arq.jobs import Job, JobStatus
async def is_jobs_complete( async def is_jobs_complete(
arq_pool: ArqRedis, job_ids: list[str], prefix: Optional[str] = None arq_pool: ArqRedis, job_ids: list[str], prefix: Optional[str] = None
) -> Optional[bool]: ) -> tuple[bool, int]:
job_statuses = set() job_statuses = []
for job_id in job_ids: for job_id in job_ids:
_job_id = f"{prefix}_{job_id}" if prefix else job_id _job_id = f"{prefix}_{job_id}" if prefix else job_id
status = await Job( status = await Job(
_job_id, arq_pool, arq_pool.default_queue_name, arq_pool.job_deserializer _job_id, arq_pool, arq_pool.default_queue_name, arq_pool.job_deserializer
).status() ).status()
job_statuses.add(status.value) job_statuses.append(status.value)
if JobStatus.not_found.value in job_statuses:
return False
not_complete_count = 0
for status in ( for status in (
JobStatus.not_found.value,
JobStatus.deferred.value, JobStatus.deferred.value,
JobStatus.in_progress.value, JobStatus.in_progress.value,
JobStatus.queued.value, JobStatus.queued.value,
): ):
if status in job_statuses: if status in job_statuses:
return False not_complete_count += 1
return True return not_complete_count == 0, not_complete_count

View File

@@ -32,5 +32,7 @@ class EnvConfig(BaseSettings):
SENTRY_DSN: str SENTRY_DSN: str
WEBHOOKS: list[WebhookConfig]
env_config = EnvConfig() env_config = EnvConfig()

View File

@@ -26,4 +26,4 @@ class WorkerSettings:
max_jobs = 2 max_jobs = 2
max_tries = 10 max_tries = 10
job_timeout = 5 * 60 job_timeout = 5 * 60
cron_jobs = [cron(run_fl_update, hour={3}, minute=0)] cron_jobs = [cron(run_fl_update, hour={5}, minute=0)]