7 Commits

Author SHA1 Message Date
eddad6454d Disable messages proc 2025-04-24 17:17:45 +02:00
8ac15a1687 Fix 2025-04-23 09:48:26 +02:00
36c542b822 Fix 2025-04-22 19:04:16 +02:00
b67d00bcd7 Fix 2025-04-22 19:02:16 +02:00
81a51a3d0d Fix 2025-04-22 18:48:47 +02:00
90756c884c Fix 2025-04-22 18:45:30 +02:00
df501c27d7 Fix 2025-04-22 18:30:14 +02:00
7 changed files with 51 additions and 26 deletions

View File

@@ -47,6 +47,7 @@ async def main():
twitch_activities.on_stream_state_change_activity, twitch_activities.on_stream_state_change_activity,
twitch_activities.check_streams_states, twitch_activities.check_streams_states,
twitch_activities.on_redemption_reward_add_activity, twitch_activities.on_redemption_reward_add_activity,
twitch_activities.on_channel_update_activity,
], ],
workflow_runner=UnsandboxedWorkflowRunner() workflow_runner=UnsandboxedWorkflowRunner()
) )

View File

@@ -1,5 +1,5 @@
from .message_proc import on_message_activity from .message_proc import on_message_activity
from .on_state_change import on_stream_state_change_activity from .on_state_change import on_stream_state_change_activity, on_channel_update_activity
from .redemption_reward import on_redemption_reward_add_activity from .redemption_reward import on_redemption_reward_add_activity
from .state_checker import check_streams_states from .state_checker import check_streams_states
@@ -9,4 +9,5 @@ __all__ = [
"on_stream_state_change_activity", "on_stream_state_change_activity",
"check_streams_states", "check_streams_states",
"on_redemption_reward_add_activity", "on_redemption_reward_add_activity",
"on_channel_update_activity",
] ]

View File

@@ -1,9 +1,14 @@
from datetime import datetime, timezone
from temporalio import activity from temporalio import activity
from pydantic import BaseModel from pydantic import BaseModel
from applications.twitch_webhook.state import State, EventType from twitchAPI.helper import first
from applications.twitch_webhook.state import State, EventType, UpdateEvent
from applications.twitch_webhook.watcher import StateWatcher from applications.twitch_webhook.watcher import StateWatcher
from applications.twitch_webhook.twitch.authorize import authorize
class OnStreamStateChangeActivity(BaseModel): class OnStreamStateChangeActivity(BaseModel):
@@ -21,3 +26,33 @@ async def on_stream_state_change_activity(
data.event_type, data.event_type,
data.new_state, data.new_state,
) )
class OnChannelUpdateActivity(BaseModel):
event: UpdateEvent
event_type: EventType
@activity.defn
async def on_channel_update_activity(
data: OnChannelUpdateActivity
):
twitch = await authorize(data.event.broadcaster_user_login)
stream = await first(twitch.get_streams(
user_id=[data.event.broadcaster_user_id])
)
if stream is None:
return
await on_stream_state_change_activity(
OnStreamStateChangeActivity(
streamer_id=int(data.event.broadcaster_user_id),
event_type=data.event_type,
new_state=State(
title=data.event.title,
category=data.event.category_name,
last_live_at=datetime.now(timezone.utc)
),
)
)

View File

@@ -308,6 +308,8 @@ class MessagesProc:
@classmethod @classmethod
async def on_message(cls, received_as: str, event: MessageEvent): async def on_message(cls, received_as: str, event: MessageEvent):
return
if event.chatter_user_name in cls.FULL_IGNORED_USER_LOGINS: if event.chatter_user_name in cls.FULL_IGNORED_USER_LOGINS:
return return

View File

@@ -1,13 +1,10 @@
from datetime import datetime, timezone, timedelta from datetime import timedelta
from temporalio import workflow from temporalio import workflow
from twitchAPI.helper import first
from applications.twitch_webhook.state import UpdateEvent, EventType, State
from applications.twitch_webhook.twitch.authorize import authorize
from applications.twitch_webhook.activities.on_state_change import on_stream_state_change_activity, OnStreamStateChangeActivity
from applications.temporal_worker.queues import MAIN_QUEUE from applications.temporal_worker.queues import MAIN_QUEUE
from applications.twitch_webhook.activities.on_state_change import OnChannelUpdateActivity, on_channel_update_activity
from applications.twitch_webhook.state import UpdateEvent, EventType
@workflow.defn @workflow.defn
@@ -18,23 +15,12 @@ class OnChannelUpdateWorkflow:
event: UpdateEvent, event: UpdateEvent,
event_type: EventType, event_type: EventType,
): ):
twitch = await authorize(event.broadcaster_user_login)
stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id]))
if stream is None:
return
await workflow.start_activity( await workflow.start_activity(
on_stream_state_change_activity, on_channel_update_activity,
OnStreamStateChangeActivity( OnChannelUpdateActivity(
int(event.broadcaster_user_id), event=event,
event_type, event_type=event_type,
State(
title=event.title,
category=event.category_name,
last_live_at=datetime.now(timezone.utc)
),
), ),
task_queue=MAIN_QUEUE, task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=1) start_to_close_timeout=timedelta(minutes=1),
) )

View File

@@ -15,5 +15,5 @@ class OnMessageWorkflow:
on_message_activity, on_message_activity,
message, message,
task_queue=MAIN_QUEUE, task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=1) schedule_to_close_timeout=timedelta(minutes=5)
) )

View File

@@ -10,7 +10,7 @@ from applications.temporal_worker.queues import MAIN_QUEUE
@workflow.defn @workflow.defn
class OnStreamOnlineWorkflow: class OnStreamOnlineWorkflow:
@workflow.run @workflow.run
async def run(self, broadcaster_user_id: str, event_type: EventType): async def run(self, broadcaster_user_id: str | int, event_type: EventType):
await workflow.start_activity( await workflow.start_activity(
on_stream_state_change_activity, on_stream_state_change_activity,
OnStreamStateChangeActivity( OnStreamStateChangeActivity(