Update job result serialization

This commit is contained in:
2023-03-28 14:22:07 +02:00
parent 17d1c06861
commit bb945649b5

View File

@@ -1,6 +1,7 @@
import asyncio
from typing import Any
from arq.worker import JobExecutionFailed
import msgpack
from app.services.cache_updater import (
@@ -30,12 +31,16 @@ async def shutdown(ctx):
def default(obj: Any):
if isinstance(obj, asyncio.TimeoutError):
return msgpack.ExtType(0, "")
elif isinstance(obj, JobExecutionFailed):
return msgpack.ExtType(1, obj.args[0].encode())
raise TypeError("Unknown type: %r" % (obj,))
def ext_hook(code: int, data: str):
def ext_hook(code: int, data: bytes):
if code == 0:
return asyncio.TimeoutError()
elif code == 1:
return JobExecutionFailed((data.decode()))
return msgpack.ExtType(code, data)
@@ -53,7 +58,7 @@ class WorkerSettings:
on_shutdown = shutdown
redis_settings = get_redis_settings()
max_jobs = 2
max_tries = 1
max_tries = 2
job_timeout = 10 * 60
expires_extra_ms = 7 * 24 * 60 * 1000
job_serializer = job_serializer