diff --git a/src/core/taskiq_middlewares.py b/src/core/taskiq_middlewares.py new file mode 100644 index 0000000..a24738d --- /dev/null +++ b/src/core/taskiq_middlewares.py @@ -0,0 +1,29 @@ +from typing import Any + +from taskiq import SimpleRetryMiddleware +from taskiq.message import TaskiqMessage +from taskiq.result import TaskiqResult + + +class FastAPIREtryMiddleware(SimpleRetryMiddleware): + @staticmethod + async def _is_need_to_remove(to_remove: list[Any], value: Any) -> bool: + return type(value) in to_remove + + async def on_error( + self, message: TaskiqMessage, result: TaskiqResult[Any], exception: Exception + ) -> None: + types_to_remove = list(self.broker.custom_dependency_context.keys()) + + message.args = [ + arg + for arg in message.args + if not self._is_need_to_remove(types_to_remove, arg) + ] + message.kwargs = { + key: value + for key, value in message.kwargs.items() + if not self._is_need_to_remove(types_to_remove, value) + } + + return await super().on_error(message, result, exception) diff --git a/src/core/taskiq_worker.py b/src/core/taskiq_worker.py index acab39f..95f9055 100644 --- a/src/core/taskiq_worker.py +++ b/src/core/taskiq_worker.py @@ -1,8 +1,8 @@ -from taskiq import SimpleRetryMiddleware import taskiq_fastapi from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend from core.config import REDIS_URL +from core.taskiq_middlewares import FastAPIREtryMiddleware broker = ( @@ -10,7 +10,7 @@ broker = ( .with_result_backend( RedisAsyncResultBackend(redis_url=REDIS_URL, result_ex_time=5 * 60) ) - .with_middlewares(SimpleRetryMiddleware()) + .with_middlewares(FastAPIREtryMiddleware()) )