From 04533ab75908669c460f080010dd11476274bbed Mon Sep 17 00:00:00 2001 From: Bulat Kurbanov Date: Tue, 28 Mar 2023 15:39:18 +0200 Subject: [PATCH] Fix job serializer/deserializer --- poetry.lock | 21 +++++++++++++---- pyproject.toml | 2 +- src/app/services/cache_updater.py | 14 +++++++---- src/app/views.py | 1 + src/core/arq_pool.py | 32 +++++++++++++++++++++++-- src/core/setup_arq.py | 39 ++++++------------------------- 6 files changed, 65 insertions(+), 44 deletions(-) diff --git a/poetry.lock b/poetry.lock index 5a7294f..4868c21 100644 --- a/poetry.lock +++ b/poetry.lock @@ -61,6 +61,18 @@ typing-extensions = ">=4.1.0" [package.extras] watch = ["watchfiles (>=0.16)"] +[[package]] +name = "async-timeout" +version = "4.0.2" +description = "Timeout context manager for asyncio programs" +category = "main" +optional = false +python-versions = ">=3.6" +files = [ + {file = "async-timeout-4.0.2.tar.gz", hash = "sha256:2163e1640ddb52b7a8c80d0a67a08587e5d245cc9c553a74a847056bc2976b15"}, + {file = "async_timeout-4.0.2-py3-none-any.whl", hash = "sha256:8ca1e4fcf50d07413d66d1a5e416e42cfdf5851c981d679a09851a6853383b3c"}, +] + [[package]] name = "asyncpg" version = "0.27.0" @@ -1083,17 +1095,18 @@ files = [ [[package]] name = "redis" -version = "4.5.3" +version = "4.5.1" description = "Python client for Redis database and key-value store" category = "main" optional = false python-versions = ">=3.7" files = [ - {file = "redis-4.5.3-py3-none-any.whl", hash = "sha256:7df17a0a2b72a4c8895b462dd07616c51b1dcb48fdd7ecb7b6f4bf39ecb2e94e"}, - {file = "redis-4.5.3.tar.gz", hash = "sha256:56732e156fe31801c4f43396bd3ca0c2a7f6f83d7936798531b9848d103381aa"}, + {file = "redis-4.5.1-py3-none-any.whl", hash = "sha256:5deb072d26e67d2be1712603bfb7947ec3431fb0eec9c578994052e33035af6d"}, + {file = "redis-4.5.1.tar.gz", hash = "sha256:1eec3741cda408d3a5f84b78d089c8b8d895f21b3b050988351e925faf202864"}, ] [package.dependencies] +async-timeout = ">=4.0.2" hiredis = {version = ">=1.0.0", optional = true, markers = "extra == \"hiredis\""} [package.extras] @@ -1515,4 +1528,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "0eb78a3efe04a0f5b41e9bd97d82df9785441a0b5a30c9f98fb55229b64494b8" +content-hash = "0a86fd39c7fc7e173a3bde8b93e30bb806c22b90531079a0378c0e487bd204ea" diff --git a/pyproject.toml b/pyproject.toml index 8e8d7fd..0a36c29 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,7 @@ orjson = "^3.8.8" sentry-sdk = "^1.17.0" ormar = {extras = ["postgresql"], version = "^0.12.1"} pydantic = "^1.10.4" -redis = {extras = ["hiredis"], version = "^4.5.3"} +redis = {version = "4.5.1", extras = ["hiredis"]} msgpack = "^1.0.5" [tool.poetry.group.dev.dependencies] diff --git a/src/app/services/cache_updater.py b/src/app/services/cache_updater.py index be20e46..338c3f3 100644 --- a/src/app/services/cache_updater.py +++ b/src/app/services/cache_updater.py @@ -30,8 +30,8 @@ class FileTypeNotAllowed(Exception): super().__init__(message) -async def check_books_page(ctx, page_number: int) -> None: - arq_pool: ArqRedis = ctx["arc_pool"] +async def check_books_page(ctx: dict, page_number: int) -> None: + arq_pool: ArqRedis = ctx["arq_pool"] page = await get_books(page_number, PAGE_SIZE) @@ -55,8 +55,8 @@ async def check_books_page(ctx, page_number: int) -> None: ) -async def check_books(ctx: dict, *args, **kwargs) -> None: # NOSONAR - arq_pool: ArqRedis = ctx["arc_pool"] +async def check_books(ctx: dict, *args, **kwargs) -> bool: # NOSONAR + arq_pool: ArqRedis = ctx["arq_pool"] last_book_id = await get_last_book_id() @@ -66,6 +66,8 @@ async def check_books(ctx: dict, *args, **kwargs) -> None: # NOSONAR page_number, ) + return True + async def cache_file(book: Book, file_type: str) -> Optional[CachedFile]: if await CachedFile.objects.filter( @@ -128,7 +130,9 @@ async def cache_file_by_book_id( if file_type not in book.available_types: return None - lock = r_client.lock(f"{book_id}_{file_type}", blocking_timeout=5) + lock = r_client.lock( + f"{book_id}_{file_type}", blocking_timeout=5, thread_local=False + ) try: try: diff --git a/src/app/views.py b/src/app/views.py index db8cc87..fbf7f47 100644 --- a/src/app/views.py +++ b/src/app/views.py @@ -119,6 +119,7 @@ async def create_or_update_cached_file(data: CreateCachedFile): @router.post("/update_cache") async def update_cache(request: Request): arq_pool: ArqRedis = request.app.state.arq_pool + await arq_pool.enqueue_job("check_books") return "Ok!" diff --git a/src/core/arq_pool.py b/src/core/arq_pool.py index ded0f4b..13422d5 100644 --- a/src/core/arq_pool.py +++ b/src/core/arq_pool.py @@ -1,9 +1,37 @@ +import asyncio +from typing import Any + from arq.connections import ArqRedis, RedisSettings, create_pool +from arq.worker import JobExecutionFailed import msgpack from core.config import env_config +def default(obj: Any): + if isinstance(obj, asyncio.TimeoutError): + return msgpack.ExtType(0, "") + elif isinstance(obj, JobExecutionFailed): + return msgpack.ExtType(1, obj.args[0].encode()) + raise TypeError("Unknown type: %r" % (obj,)) + + +def ext_hook(code: int, data: bytes): + if code == 0: + return asyncio.TimeoutError() + elif code == 1: + return JobExecutionFailed((data.decode())) + return msgpack.ExtType(code, data) + + +def job_serializer(d): + return msgpack.packb(d, default=default, use_bin_type=True) # noqa: E731 + + +def job_deserializer(b): + return msgpack.unpackb(b, ext_hook=ext_hook, raw=False) # noqa: E731 + + def get_redis_settings() -> RedisSettings: return RedisSettings( host=env_config.REDIS_HOST, @@ -15,6 +43,6 @@ def get_redis_settings() -> RedisSettings: async def get_arq_pool() -> ArqRedis: return await create_pool( get_redis_settings(), - job_serializer=msgpack.packb, # type: ignore - job_deserializer=lambda b: msgpack.unpackb(b, raw=False), # noqa: E731 + job_serializer=job_serializer, # type: ignore + job_deserializer=job_deserializer, # noqa: E731 ) diff --git a/src/core/setup_arq.py b/src/core/setup_arq.py index c9620de..bfc6012 100644 --- a/src/core/setup_arq.py +++ b/src/core/setup_arq.py @@ -1,15 +1,14 @@ -import asyncio -from typing import Any - -from arq.worker import JobExecutionFailed -import msgpack - from app.services.cache_updater import ( cache_file_by_book_id, check_books, check_books_page, ) -from core.arq_pool import get_arq_pool, get_redis_settings +from core.arq_pool import ( + get_arq_pool, + get_redis_settings, + job_deserializer, + job_serializer, +) from core.db import database from core.redis_client import get_client import core.sentry # noqa: F401 @@ -19,7 +18,7 @@ async def startup(ctx): if not database.is_connected: await database.connect() - ctx["arc_pool"] = await get_arq_pool() + ctx["arq_pool"] = await get_arq_pool() ctx["redis"] = get_client() @@ -28,30 +27,6 @@ async def shutdown(ctx): await database.disconnect() -def default(obj: Any): - if isinstance(obj, asyncio.TimeoutError): - return msgpack.ExtType(0, "") - elif isinstance(obj, JobExecutionFailed): - return msgpack.ExtType(1, obj.args[0].encode()) - raise TypeError("Unknown type: %r" % (obj,)) - - -def ext_hook(code: int, data: bytes): - if code == 0: - return asyncio.TimeoutError() - elif code == 1: - return JobExecutionFailed((data.decode())) - return msgpack.ExtType(code, data) - - -def job_serializer(d): - return msgpack.packb(d, default=default, use_bin_type=True) # noqa: E731 - - -def job_deserializer(b): - return msgpack.unpackb(b, ext_hook=ext_hook, raw=False) # noqa: E731 - - class WorkerSettings: functions = [check_books, check_books_page, cache_file_by_book_id] on_startup = startup