From 2f1e2c0ffadedfac95ddf74e88a24985a72a1e7c Mon Sep 17 00:00:00 2001 From: Bulat Kurbanov Date: Tue, 19 Nov 2024 14:14:50 +0100 Subject: [PATCH] Add on_stream_state_change retries --- src/core/broker.py | 4 ++++ src/modules/stream_notifications/tasks.py | 5 ++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/core/broker.py b/src/core/broker.py index 58ee1e7..39b084f 100644 --- a/src/core/broker.py +++ b/src/core/broker.py @@ -1,4 +1,5 @@ from taskiq import TaskiqScheduler +from taskiq.middlewares.retry_middleware import SimpleRetryMiddleware from taskiq.schedule_sources import LabelScheduleSource from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend @@ -6,6 +7,9 @@ from core.config import config broker = ListQueueBroker(url=config.REDIS_URI) \ + .with_middlewares( + SimpleRetryMiddleware(default_retry_count=5) + ) \ .with_result_backend(RedisAsyncResultBackend( redis_url=config.REDIS_URI, result_ex_time=60 * 60 * 24 * 7, diff --git a/src/modules/stream_notifications/tasks.py b/src/modules/stream_notifications/tasks.py index eed416a..f716098 100644 --- a/src/modules/stream_notifications/tasks.py +++ b/src/modules/stream_notifications/tasks.py @@ -8,7 +8,10 @@ from .watcher import StateWatcher from .twitch.authorize import authorize -@broker.task("stream_notifications.twitch.on_stream_state_change") +@broker.task( + "stream_notifications.twitch.on_stream_state_change", + retry_on_error=True +) async def on_stream_state_change( streamer_id: int, new_state: State | None = None ):