Add retry middleware

This commit is contained in:
2023-05-20 22:17:44 +02:00
parent db1104aead
commit 96b34863ca
2 changed files with 31 additions and 2 deletions

View File

@@ -0,0 +1,29 @@
from typing import Any
from taskiq import SimpleRetryMiddleware
from taskiq.message import TaskiqMessage
from taskiq.result import TaskiqResult
class FastAPIREtryMiddleware(SimpleRetryMiddleware):
@staticmethod
async def _is_need_to_remove(to_remove: list[Any], value: Any) -> bool:
return type(value) in to_remove
async def on_error(
self, message: TaskiqMessage, result: TaskiqResult[Any], exception: Exception
) -> None:
types_to_remove = list(self.broker.custom_dependency_context.keys())
message.args = [
arg
for arg in message.args
if not self._is_need_to_remove(types_to_remove, arg)
]
message.kwargs = {
key: value
for key, value in message.kwargs.items()
if not self._is_need_to_remove(types_to_remove, value)
}
return await super().on_error(message, result, exception)

View File

@@ -1,8 +1,8 @@
from taskiq import SimpleRetryMiddleware
import taskiq_fastapi
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
from core.config import REDIS_URL
from core.taskiq_middlewares import FastAPIREtryMiddleware
broker = (
@@ -10,7 +10,7 @@ broker = (
.with_result_backend(
RedisAsyncResultBackend(redis_url=REDIS_URL, result_ex_time=5 * 60)
)
.with_middlewares(SimpleRetryMiddleware())
.with_middlewares(FastAPIREtryMiddleware())
)