This commit is contained in:
2025-04-21 21:54:30 +02:00
parent fb19f7a125
commit 77fb68a5e3
11 changed files with 134 additions and 119 deletions

View File

@@ -1,6 +1,6 @@
from .sync import ScheduleSyncActivity
from .sync import syncronize
__all__ = [
"ScheduleSyncActivity",
"syncronize",
]

View File

@@ -1,16 +1,14 @@
from temporalio import activity
from applications.common.repositories.streamers import StreamerConfigRepository
from applications.schedule_sync.synchronizer import syncronize
from applications.schedule_sync.synchronizer import syncronize as syncronize_internal
class ScheduleSyncActivity:
@classmethod
@activity.defn
async def syncronize(cls, twitch_id: int):
@activity.defn
async def syncronize(twitch_id: int):
streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id)
if streamer.integrations.discord is None:
return
await syncronize(streamer.twitch, streamer.integrations.discord.guild_id)
await syncronize_internal(streamer.twitch, streamer.integrations.discord.guild_id)

View File

@@ -4,10 +4,8 @@ from temporalio import workflow
from temporalio.client import Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec
from applications.common.repositories.streamers import StreamerConfigRepository
from applications.schedule_sync.activities import ScheduleSyncActivity
TASK_QUEUE = "main"
from applications.schedule_sync.activities import syncronize
from applications.temporal_worker.queues import MAIN_QUEUE
@workflow.defn
@@ -19,7 +17,7 @@ class ScheduleSyncWorkflow:
action=ScheduleActionStartWorkflow(
cls.run,
id="ScheduleSyncWorkflow",
task_queue=TASK_QUEUE,
task_queue=MAIN_QUEUE,
),
spec=ScheduleSpec(
intervals=[ScheduleIntervalSpec(every=timedelta(minutes=5))]
@@ -35,7 +33,7 @@ class ScheduleSyncWorkflow:
if streamer.integrations.discord is None:
continue
await workflow.execute_activity_method(
ScheduleSyncActivity.syncronize,
await workflow.execute_activity(
syncronize,
streamer.twitch.id
)

View File

@@ -3,11 +3,10 @@ from asyncio import run
from temporalio.client import Client, ScheduleAlreadyRunningError
from temporalio.worker import Worker, UnsandboxedWorkflowRunner
from applications.schedule_sync.activities import ScheduleSyncActivity
from applications.schedule_sync import activities as schedule_sync_activities
from applications.schedule_sync.workflows import ScheduleSyncWorkflow
TASK_QUEUE = "main"
from .queues import MAIN_QUEUE
async def main():
@@ -21,12 +20,12 @@ async def main():
worker: Worker = Worker(
client,
task_queue=TASK_QUEUE,
task_queue=MAIN_QUEUE,
workflows=[
ScheduleSyncWorkflow
],
activities=[
ScheduleSyncActivity.syncronize
schedule_sync_activities.syncronize,
],
workflow_runner=UnsandboxedWorkflowRunner(),
)

View File

@@ -0,0 +1 @@
MAIN_QUEUE = "main"

View File

@@ -0,0 +1,11 @@
from temporalio import activity
from applications.twitch_webhook.messages_proc import MessageEvent, MessagesProc
@activity.defn
async def on_message_activity(
received_as: str,
event: MessageEvent
):
await MessagesProc.on_message(received_as, event)

View File

@@ -0,0 +1,39 @@
from temporalio import activity
from applications.twitch_webhook.state import State, EventType
from applications.twitch_webhook.watcher import StateWatcher
@activity.defn
async def on_stream_state_change_activity(
streamer_id: int,
event_type: EventType,
new_state: State | None = None
):
await StateWatcher.on_stream_state_change(
streamer_id,
event_type,
new_state,
)
# @activity.defn
# async def on_stream_state_change_with_check(
# event: UpdateEvent,
# event_type: EventType
# ):
# twitch = await authorize(event.broadcaster_user_login)
# stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id]))
# if stream is None:
# return
# await on_stream_state_change.kiq(
# int(event.broadcaster_user_id),
# event_type,
# State(
# title=event.title,
# category=event.category_name,
# last_live_at=datetime.now(timezone.utc)
# )
# )

View File

@@ -0,0 +1,8 @@
from temporalio import activity
from applications.twitch_webhook.reward_redemption import RewardRedemption, on_redemption_reward_add
@activity.defn
async def on_redemption_reward_add_activity(event: RewardRedemption):
await on_redemption_reward_add(event)

View File

@@ -0,0 +1,29 @@
from datetime import datetime, timezone
from temporalio import activity
from applications.common.repositories.streamers import StreamerConfigRepository
from applications.twitch_webhook.twitch.authorize import authorize
from applications.twitch_webhook.state import State, EventType
from applications.twitch_webhook.watcher import StateWatcher
@activity.defn
async def check_streams_states():
streamers = await StreamerConfigRepository.all()
streamers_ids = [str(streamer.twitch.id) for streamer in streamers]
twitch = await authorize("kurbezz")
async for stream in twitch.get_streams(user_id=streamers_ids):
state = State(
title=stream.title,
category=stream.game_name,
last_live_at=datetime.now(timezone.utc)
)
await StateWatcher.on_stream_state_change(
int(stream.user_id),
EventType.UNKNOWN,
state
)

View File

@@ -1,96 +0,0 @@
from datetime import datetime, timezone
from twitchAPI.helper import first
from core.broker import broker
from applications.common.repositories.streamers import StreamerConfigRepository
from .state import State, UpdateEvent, EventType
from .watcher import StateWatcher
from .messages_proc import MessageEvent, MessagesProc
from .twitch.authorize import authorize
from .reward_redemption import RewardRedemption, on_redemption_reward_add
@broker.task(
"stream_notifications.twitch.on_stream_state_change_with_check",
retry_on_error=True
)
async def on_stream_state_change_with_check(
event: UpdateEvent,
event_type: EventType
):
twitch = await authorize(event.broadcaster_user_login)
stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id]))
if stream is None:
return
await on_stream_state_change.kiq(
int(event.broadcaster_user_id),
event_type,
State(
title=event.title,
category=event.category_name,
last_live_at=datetime.now(timezone.utc)
)
)
@broker.task(
"stream_notifications.twitch.on_stream_state_change",
retry_on_error=True
)
async def on_stream_state_change(
streamer_id: int,
event_type: EventType,
new_state: State | None = None
):
await StateWatcher.on_stream_state_change(
streamer_id,
event_type,
new_state,
)
@broker.task(
"stream_notifications.check_streams_states",
schedule=[{"cron": "*/2 * * * *"}]
)
async def check_streams_states():
streamers = await StreamerConfigRepository.all()
streamers_ids = [str(streamer.twitch.id) for streamer in streamers]
twitch = await authorize("kurbezz")
async for stream in twitch.get_streams(user_id=streamers_ids):
state = State(
title=stream.title,
category=stream.game_name,
last_live_at=datetime.now(timezone.utc)
)
await StateWatcher.on_stream_state_change(
int(stream.user_id),
EventType.UNKNOWN,
state
)
@broker.task(
"stream_notifications.on_message",
retry_on_error=True
)
async def on_message(
received_as: str,
event: MessageEvent
):
await MessagesProc.on_message(received_as, event)
@broker.task(
"stream_notifications.on_redemption_reward_add",
retry_on_error=True
)
async def on_redemption_reward_add_task(event: RewardRedemption):
await on_redemption_reward_add(event)

View File

@@ -0,0 +1,28 @@
from datetime import timedelta
from temporalio import workflow
from temporalio.client import Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec
from applications.temporal_worker.queues import MAIN_QUEUE
workflow.defn()
class StreamsCheckWorkflow:
@classmethod
def get_schedules(cls) -> dict[str, Schedule]:
return {
"check": Schedule(
action=ScheduleActionStartWorkflow(
cls.run,
id="StreamsCheckWorkflow",
task_queue=MAIN_QUEUE,
),
spec=ScheduleSpec(
intervals=[ScheduleIntervalSpec(every=timedelta(minutes=2))]
)
)
}
@workflow.run
async def run(self):
pass