From b67d00bcd76970eff5d4cabac6b85519d5cace18 Mon Sep 17 00:00:00 2001 From: Bulat Kurbanov Date: Tue, 22 Apr 2025 19:02:16 +0200 Subject: [PATCH] Fix --- src/applications/temporal_worker/__main__.py | 1 + .../twitch_webhook/activities/__init__.py | 3 +- .../activities/on_state_change.py | 37 ++++++++++++++++++- .../workflows/on_channel_update.py | 30 ++++----------- 4 files changed, 47 insertions(+), 24 deletions(-) diff --git a/src/applications/temporal_worker/__main__.py b/src/applications/temporal_worker/__main__.py index 1e421d6..72c3b2b 100644 --- a/src/applications/temporal_worker/__main__.py +++ b/src/applications/temporal_worker/__main__.py @@ -47,6 +47,7 @@ async def main(): twitch_activities.on_stream_state_change_activity, twitch_activities.check_streams_states, twitch_activities.on_redemption_reward_add_activity, + twitch_activities.on_stream_state_change_activity, ], workflow_runner=UnsandboxedWorkflowRunner() ) diff --git a/src/applications/twitch_webhook/activities/__init__.py b/src/applications/twitch_webhook/activities/__init__.py index 072eec7..aee8127 100644 --- a/src/applications/twitch_webhook/activities/__init__.py +++ b/src/applications/twitch_webhook/activities/__init__.py @@ -1,5 +1,5 @@ from .message_proc import on_message_activity -from .on_state_change import on_stream_state_change_activity +from .on_state_change import on_stream_state_change_activity, on_channel_update_activity from .redemption_reward import on_redemption_reward_add_activity from .state_checker import check_streams_states @@ -9,4 +9,5 @@ __all__ = [ "on_stream_state_change_activity", "check_streams_states", "on_redemption_reward_add_activity", + "on_channel_update_activity", ] diff --git a/src/applications/twitch_webhook/activities/on_state_change.py b/src/applications/twitch_webhook/activities/on_state_change.py index 45f8702..6c225b5 100644 --- a/src/applications/twitch_webhook/activities/on_state_change.py +++ b/src/applications/twitch_webhook/activities/on_state_change.py @@ -1,9 +1,14 @@ +from datetime import datetime, timezone + from temporalio import activity from pydantic import BaseModel -from applications.twitch_webhook.state import State, EventType +from twitchAPI.helper import first + +from applications.twitch_webhook.state import State, EventType, UpdateEvent from applications.twitch_webhook.watcher import StateWatcher +from applications.twitch_webhook.twitch.authorize import authorize class OnStreamStateChangeActivity(BaseModel): @@ -21,3 +26,33 @@ async def on_stream_state_change_activity( data.event_type, data.new_state, ) + + +class OnChannelUpdateActivity(BaseModel): + event: UpdateEvent + event_type: EventType + + +@activity.defn +async def on_channel_update_activity( + data: OnChannelUpdateActivity +): + twitch = await authorize(data.event.broadcaster_user_login) + + stream = await first(twitch.get_streams( + user_id=[data.event.broadcaster_user_id]) + ) + if stream is None: + return + + await on_stream_state_change_activity( + OnStreamStateChangeActivity( + streamer_id=int(data.event.broadcaster_user_id), + event_type=data.event_type, + new_state=State( + title=data.event.title, + category=data.event.category_name, + last_live_at=datetime.now(timezone.utc) + ), + ) + ) diff --git a/src/applications/twitch_webhook/workflows/on_channel_update.py b/src/applications/twitch_webhook/workflows/on_channel_update.py index 7190f5f..e91001b 100644 --- a/src/applications/twitch_webhook/workflows/on_channel_update.py +++ b/src/applications/twitch_webhook/workflows/on_channel_update.py @@ -1,13 +1,10 @@ -from datetime import datetime, timezone, timedelta +from datetime import timedelta from temporalio import workflow -from twitchAPI.helper import first - -from applications.twitch_webhook.state import UpdateEvent, EventType, State -from applications.twitch_webhook.twitch.authorize import authorize -from applications.twitch_webhook.activities.on_state_change import on_stream_state_change_activity, OnStreamStateChangeActivity from applications.temporal_worker.queues import MAIN_QUEUE +from applications.twitch_webhook.activities.on_state_change import OnChannelUpdateActivity, on_channel_update_activity +from applications.twitch_webhook.state import UpdateEvent, EventType @workflow.defn @@ -18,23 +15,12 @@ class OnChannelUpdateWorkflow: event: UpdateEvent, event_type: EventType, ): - twitch = await authorize(event.broadcaster_user_login) - - stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id])) - if stream is None: - return - await workflow.start_activity( - on_stream_state_change_activity, - OnStreamStateChangeActivity( - int(event.broadcaster_user_id), - event_type, - State( - title=event.title, - category=event.category_name, - last_live_at=datetime.now(timezone.utc) - ), + on_channel_update_activity, + OnChannelUpdateActivity( + event=event, + event_type=event_type, ), task_queue=MAIN_QUEUE, - schedule_to_close_timeout=timedelta(minutes=1) + start_to_close_timeout=timedelta(minutes=1), )