This commit is contained in:
2025-04-22 13:51:21 +02:00
parent 6ea36f165e
commit 03039bbd09
3 changed files with 24 additions and 20 deletions

View File

@@ -1,6 +1,7 @@
from .sync import syncronize from .sync import syncronize, syncronize_all
__all__ = [ __all__ = [
"syncronize", "syncronize",
"syncronize_all",
] ]

View File

@@ -6,13 +6,24 @@ from applications.schedule_sync.synchronizer import syncronize as syncronize_int
@activity.defn @activity.defn
async def syncronize(twitch_id: int): async def syncronize(twitch_id: int):
try:
streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id) streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id)
if streamer.integrations.discord is None: if streamer.integrations.discord is None:
return return
try:
await syncronize_internal(streamer.twitch, streamer.integrations.discord.guild_id) await syncronize_internal(streamer.twitch, streamer.integrations.discord.guild_id)
except Exception as e: except Exception as e:
activity.logger.error(f"Error during synchronization: {e}") activity.logger.error(f"Error during synchronization: {e}")
raise e raise e
@activity.defn
async def syncronize_all():
streamers = await StreamerConfigRepository().all()
for streamer in streamers:
if streamer.integrations.discord is None:
continue
await syncronize(streamer.twitch.id)

View File

@@ -3,8 +3,7 @@ from datetime import timedelta
from temporalio import workflow from temporalio import workflow
from temporalio.client import Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec from temporalio.client import Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleIntervalSpec
from applications.common.repositories.streamers import StreamerConfigRepository from applications.schedule_sync.activities import syncronize_all
from applications.schedule_sync.activities import syncronize
from applications.temporal_worker.queues import MAIN_QUEUE from applications.temporal_worker.queues import MAIN_QUEUE
@@ -28,15 +27,8 @@ class ScheduleSyncWorkflow:
@workflow.run @workflow.run
async def run(self): async def run(self):
streamers = await StreamerConfigRepository().all() await workflow.execute_activity(
syncronize_all,
for streamer in streamers:
if streamer.integrations.discord is None:
continue
await workflow.start_activity(
syncronize,
streamer.twitch.id,
task_queue=MAIN_QUEUE, task_queue=MAIN_QUEUE,
schedule_to_close_timeout=timedelta(minutes=5), schedule_to_close_timeout=timedelta(minutes=5),
) )