From 77fb68a5e320f6695baa170ef4d05ed65c62e042 Mon Sep 17 00:00:00 2001 From: Bulat Kurbanov Date: Mon, 21 Apr 2025 21:54:30 +0200 Subject: [PATCH] Fix --- .../schedule_sync/activities/__init__.py | 4 +- .../schedule_sync/activities/sync.py | 16 ++-- .../schedule_sync/workflows/sync.py | 12 +-- src/applications/temporal_worker/__main__.py | 9 +- src/applications/temporal_worker/queues.py | 1 + .../twitch_webhook/activities/message_proc.py | 11 +++ .../activities/on_state_change.py | 39 ++++++++ .../activities/redemption_reward.py | 8 ++ .../activities/state_checker.py | 29 ++++++ src/applications/twitch_webhook/tasks.py | 96 ------------------- .../twitch_webhook/workflows/checker.py | 28 ++++++ 11 files changed, 134 insertions(+), 119 deletions(-) create mode 100644 src/applications/temporal_worker/queues.py create mode 100644 src/applications/twitch_webhook/activities/message_proc.py create mode 100644 src/applications/twitch_webhook/activities/on_state_change.py create mode 100644 src/applications/twitch_webhook/activities/redemption_reward.py create mode 100644 src/applications/twitch_webhook/activities/state_checker.py delete mode 100644 src/applications/twitch_webhook/tasks.py create mode 100644 src/applications/twitch_webhook/workflows/checker.py diff --git a/src/applications/schedule_sync/activities/__init__.py b/src/applications/schedule_sync/activities/__init__.py index e8df4a7..f8fa121 100644 --- a/src/applications/schedule_sync/activities/__init__.py +++ b/src/applications/schedule_sync/activities/__init__.py @@ -1,6 +1,6 @@ -from .sync import ScheduleSyncActivity +from .sync import syncronize __all__ = [ - "ScheduleSyncActivity", + "syncronize", ] diff --git a/src/applications/schedule_sync/activities/sync.py b/src/applications/schedule_sync/activities/sync.py index 2f03bac..b41a7dd 100644 --- a/src/applications/schedule_sync/activities/sync.py +++ b/src/applications/schedule_sync/activities/sync.py @@ -1,16 +1,14 @@ from temporalio import activity from applications.common.repositories.streamers import StreamerConfigRepository -from applications.schedule_sync.synchronizer import syncronize +from applications.schedule_sync.synchronizer import syncronize as syncronize_internal -class ScheduleSyncActivity: - @classmethod - @activity.defn - async def syncronize(cls, twitch_id: int): - streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id) +@activity.defn +async def syncronize(twitch_id: int): + streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id) - if streamer.integrations.discord is None: - return + if streamer.integrations.discord is None: + return - await syncronize(streamer.twitch, streamer.integrations.discord.guild_id) + await syncronize_internal(streamer.twitch, streamer.integrations.discord.guild_id) diff --git a/src/applications/schedule_sync/workflows/sync.py b/src/applications/schedule_sync/workflows/sync.py index 8516729..e3d1ee3 100644 --- a/src/applications/schedule_sync/workflows/sync.py +++ b/src/applications/schedule_sync/workflows/sync.py @@ -4,10 +4,8 @@ from temporalio import workflow from temporalio.client import Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec from applications.common.repositories.streamers import StreamerConfigRepository -from applications.schedule_sync.activities import ScheduleSyncActivity - - -TASK_QUEUE = "main" +from applications.schedule_sync.activities import syncronize +from applications.temporal_worker.queues import MAIN_QUEUE @workflow.defn @@ -19,7 +17,7 @@ class ScheduleSyncWorkflow: action=ScheduleActionStartWorkflow( cls.run, id="ScheduleSyncWorkflow", - task_queue=TASK_QUEUE, + task_queue=MAIN_QUEUE, ), spec=ScheduleSpec( intervals=[ScheduleIntervalSpec(every=timedelta(minutes=5))] @@ -35,7 +33,7 @@ class ScheduleSyncWorkflow: if streamer.integrations.discord is None: continue - await workflow.execute_activity_method( - ScheduleSyncActivity.syncronize, + await workflow.execute_activity( + syncronize, streamer.twitch.id ) diff --git a/src/applications/temporal_worker/__main__.py b/src/applications/temporal_worker/__main__.py index 534c1dd..b12e6d0 100644 --- a/src/applications/temporal_worker/__main__.py +++ b/src/applications/temporal_worker/__main__.py @@ -3,11 +3,10 @@ from asyncio import run from temporalio.client import Client, ScheduleAlreadyRunningError from temporalio.worker import Worker, UnsandboxedWorkflowRunner -from applications.schedule_sync.activities import ScheduleSyncActivity +from applications.schedule_sync import activities as schedule_sync_activities from applications.schedule_sync.workflows import ScheduleSyncWorkflow - -TASK_QUEUE = "main" +from .queues import MAIN_QUEUE async def main(): @@ -21,12 +20,12 @@ async def main(): worker: Worker = Worker( client, - task_queue=TASK_QUEUE, + task_queue=MAIN_QUEUE, workflows=[ ScheduleSyncWorkflow ], activities=[ - ScheduleSyncActivity.syncronize + schedule_sync_activities.syncronize, ], workflow_runner=UnsandboxedWorkflowRunner(), ) diff --git a/src/applications/temporal_worker/queues.py b/src/applications/temporal_worker/queues.py new file mode 100644 index 0000000..38fcd8f --- /dev/null +++ b/src/applications/temporal_worker/queues.py @@ -0,0 +1 @@ +MAIN_QUEUE = "main" diff --git a/src/applications/twitch_webhook/activities/message_proc.py b/src/applications/twitch_webhook/activities/message_proc.py new file mode 100644 index 0000000..b267b40 --- /dev/null +++ b/src/applications/twitch_webhook/activities/message_proc.py @@ -0,0 +1,11 @@ +from temporalio import activity + +from applications.twitch_webhook.messages_proc import MessageEvent, MessagesProc + + +@activity.defn +async def on_message_activity( + received_as: str, + event: MessageEvent +): + await MessagesProc.on_message(received_as, event) diff --git a/src/applications/twitch_webhook/activities/on_state_change.py b/src/applications/twitch_webhook/activities/on_state_change.py new file mode 100644 index 0000000..8b230d1 --- /dev/null +++ b/src/applications/twitch_webhook/activities/on_state_change.py @@ -0,0 +1,39 @@ +from temporalio import activity + +from applications.twitch_webhook.state import State, EventType +from applications.twitch_webhook.watcher import StateWatcher + + +@activity.defn +async def on_stream_state_change_activity( + streamer_id: int, + event_type: EventType, + new_state: State | None = None +): + await StateWatcher.on_stream_state_change( + streamer_id, + event_type, + new_state, + ) + + +# @activity.defn +# async def on_stream_state_change_with_check( +# event: UpdateEvent, +# event_type: EventType +# ): +# twitch = await authorize(event.broadcaster_user_login) + +# stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id])) +# if stream is None: +# return + +# await on_stream_state_change.kiq( +# int(event.broadcaster_user_id), +# event_type, +# State( +# title=event.title, +# category=event.category_name, +# last_live_at=datetime.now(timezone.utc) +# ) +# ) diff --git a/src/applications/twitch_webhook/activities/redemption_reward.py b/src/applications/twitch_webhook/activities/redemption_reward.py new file mode 100644 index 0000000..d5851a4 --- /dev/null +++ b/src/applications/twitch_webhook/activities/redemption_reward.py @@ -0,0 +1,8 @@ +from temporalio import activity + +from applications.twitch_webhook.reward_redemption import RewardRedemption, on_redemption_reward_add + + +@activity.defn +async def on_redemption_reward_add_activity(event: RewardRedemption): + await on_redemption_reward_add(event) diff --git a/src/applications/twitch_webhook/activities/state_checker.py b/src/applications/twitch_webhook/activities/state_checker.py new file mode 100644 index 0000000..3a014fa --- /dev/null +++ b/src/applications/twitch_webhook/activities/state_checker.py @@ -0,0 +1,29 @@ +from datetime import datetime, timezone + +from temporalio import activity + +from applications.common.repositories.streamers import StreamerConfigRepository +from applications.twitch_webhook.twitch.authorize import authorize +from applications.twitch_webhook.state import State, EventType +from applications.twitch_webhook.watcher import StateWatcher + + +@activity.defn +async def check_streams_states(): + streamers = await StreamerConfigRepository.all() + streamers_ids = [str(streamer.twitch.id) for streamer in streamers] + + twitch = await authorize("kurbezz") + + async for stream in twitch.get_streams(user_id=streamers_ids): + state = State( + title=stream.title, + category=stream.game_name, + last_live_at=datetime.now(timezone.utc) + ) + + await StateWatcher.on_stream_state_change( + int(stream.user_id), + EventType.UNKNOWN, + state + ) diff --git a/src/applications/twitch_webhook/tasks.py b/src/applications/twitch_webhook/tasks.py deleted file mode 100644 index 147b6dc..0000000 --- a/src/applications/twitch_webhook/tasks.py +++ /dev/null @@ -1,96 +0,0 @@ -from datetime import datetime, timezone - -from twitchAPI.helper import first - -from core.broker import broker -from applications.common.repositories.streamers import StreamerConfigRepository - -from .state import State, UpdateEvent, EventType -from .watcher import StateWatcher -from .messages_proc import MessageEvent, MessagesProc -from .twitch.authorize import authorize -from .reward_redemption import RewardRedemption, on_redemption_reward_add - - -@broker.task( - "stream_notifications.twitch.on_stream_state_change_with_check", - retry_on_error=True -) -async def on_stream_state_change_with_check( - event: UpdateEvent, - event_type: EventType -): - twitch = await authorize(event.broadcaster_user_login) - - stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id])) - if stream is None: - return - - await on_stream_state_change.kiq( - int(event.broadcaster_user_id), - event_type, - State( - title=event.title, - category=event.category_name, - last_live_at=datetime.now(timezone.utc) - ) - ) - - -@broker.task( - "stream_notifications.twitch.on_stream_state_change", - retry_on_error=True -) -async def on_stream_state_change( - streamer_id: int, - event_type: EventType, - new_state: State | None = None -): - await StateWatcher.on_stream_state_change( - streamer_id, - event_type, - new_state, - ) - - -@broker.task( - "stream_notifications.check_streams_states", - schedule=[{"cron": "*/2 * * * *"}] -) -async def check_streams_states(): - streamers = await StreamerConfigRepository.all() - streamers_ids = [str(streamer.twitch.id) for streamer in streamers] - - twitch = await authorize("kurbezz") - - async for stream in twitch.get_streams(user_id=streamers_ids): - state = State( - title=stream.title, - category=stream.game_name, - last_live_at=datetime.now(timezone.utc) - ) - - await StateWatcher.on_stream_state_change( - int(stream.user_id), - EventType.UNKNOWN, - state - ) - - -@broker.task( - "stream_notifications.on_message", - retry_on_error=True -) -async def on_message( - received_as: str, - event: MessageEvent -): - await MessagesProc.on_message(received_as, event) - - -@broker.task( - "stream_notifications.on_redemption_reward_add", - retry_on_error=True -) -async def on_redemption_reward_add_task(event: RewardRedemption): - await on_redemption_reward_add(event) diff --git a/src/applications/twitch_webhook/workflows/checker.py b/src/applications/twitch_webhook/workflows/checker.py new file mode 100644 index 0000000..aa91971 --- /dev/null +++ b/src/applications/twitch_webhook/workflows/checker.py @@ -0,0 +1,28 @@ +from datetime import timedelta + +from temporalio import workflow +from temporalio.client import Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec + +from applications.temporal_worker.queues import MAIN_QUEUE + + +workflow.defn() +class StreamsCheckWorkflow: + @classmethod + def get_schedules(cls) -> dict[str, Schedule]: + return { + "check": Schedule( + action=ScheduleActionStartWorkflow( + cls.run, + id="StreamsCheckWorkflow", + task_queue=MAIN_QUEUE, + ), + spec=ScheduleSpec( + intervals=[ScheduleIntervalSpec(every=timedelta(minutes=2))] + ) + ) + } + + @workflow.run + async def run(self): + pass