From f46f2aee3f143cbb83cb662d274799e1273180c4 Mon Sep 17 00:00:00 2001 From: Bulat Kurbanov Date: Tue, 18 Mar 2025 18:56:27 +0100 Subject: [PATCH] Fix --- .../stream_notifications/messages_proc.py | 2 +- .../stream_notifications/reward_redemption.py | 7 +-- src/modules/stream_notifications/state.py | 1 + src/modules/stream_notifications/tasks.py | 4 +- .../stream_notifications/twitch/authorize.py | 6 +-- .../twitch/token_storage.py | 10 ++--- .../stream_notifications/twitch/webhook.py | 43 +++++++++---------- src/modules/stream_notifications/watcher.py | 2 +- 8 files changed, 38 insertions(+), 37 deletions(-) diff --git a/src/modules/stream_notifications/messages_proc.py b/src/modules/stream_notifications/messages_proc.py index c5f11a5..e2b8f56 100644 --- a/src/modules/stream_notifications/messages_proc.py +++ b/src/modules/stream_notifications/messages_proc.py @@ -323,7 +323,7 @@ class MessagesProc: await cls._update_history(event) - twitch = await authorize() + twitch = await authorize(event.broadcaster_user_login) await cls._goida(twitch, event) await cls._lasqexx(twitch, event) diff --git a/src/modules/stream_notifications/reward_redemption.py b/src/modules/stream_notifications/reward_redemption.py index 6beffbf..6105117 100644 --- a/src/modules/stream_notifications/reward_redemption.py +++ b/src/modules/stream_notifications/reward_redemption.py @@ -4,7 +4,6 @@ from pydantic import BaseModel from twitchAPI.object.eventsub import ChannelPointsCustomRewardRedemptionAddEvent -from core.config import config from .twitch.authorize import authorize @@ -13,6 +12,7 @@ logger = logging.getLogger(__name__) class RewardRedemption(BaseModel): broadcaster_user_id: str + broadcaster_user_login: str user_name: str reward_title: str reward_prompt: str @@ -21,6 +21,7 @@ class RewardRedemption(BaseModel): def from_twitch_event(cls, event: ChannelPointsCustomRewardRedemptionAddEvent): return cls( broadcaster_user_id=event.event.broadcaster_user_id, + broadcaster_user_login=event.event.broadcaster_user_login, user_name=event.event.user_name, reward_title=event.event.reward.title, reward_prompt=event.event.reward.prompt or "", @@ -30,10 +31,10 @@ class RewardRedemption(BaseModel): async def on_redemption_reward_add(reward: RewardRedemption): logger.info(f"{reward.user_name} just redeemed {reward.reward_title}!") - twitch = await authorize() + twitch = await authorize(reward.broadcaster_user_login) await twitch.send_chat_message( reward.broadcaster_user_id, - config.TWITCH_ADMIN_USER_ID, + reward.broadcaster_user_id, f"🎉 {reward.user_name} just redeemed {reward.reward_title}! 🎉" ) diff --git a/src/modules/stream_notifications/state.py b/src/modules/stream_notifications/state.py index b8ed394..e740fc6 100644 --- a/src/modules/stream_notifications/state.py +++ b/src/modules/stream_notifications/state.py @@ -21,6 +21,7 @@ class State(BaseModel): class UpdateEvent(BaseModel): broadcaster_user_id: str + broadcaster_user_login: str title: str category_name: str diff --git a/src/modules/stream_notifications/tasks.py b/src/modules/stream_notifications/tasks.py index c3d0753..13744bd 100644 --- a/src/modules/stream_notifications/tasks.py +++ b/src/modules/stream_notifications/tasks.py @@ -20,7 +20,7 @@ async def on_stream_state_change_with_check( event: UpdateEvent, event_type: EventType ): - twitch = await authorize() + twitch = await authorize(event.broadcaster_user_login) stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id])) if stream is None: @@ -61,7 +61,7 @@ async def check_streams_states(): streamers = await StreamerConfigRepository.all() streamers_ids = [str(streamer.twitch.id) for streamer in streamers] - twitch = await authorize() + twitch = await authorize("kurbezz") async for stream in twitch.get_streams(user_id=streamers_ids): state = State( diff --git a/src/modules/stream_notifications/twitch/authorize.py b/src/modules/stream_notifications/twitch/authorize.py index 791b831..b16ae6a 100644 --- a/src/modules/stream_notifications/twitch/authorize.py +++ b/src/modules/stream_notifications/twitch/authorize.py @@ -19,16 +19,16 @@ SCOPES = [ ] -async def authorize(auto_refresh_auth: bool = False) -> Twitch: +async def authorize(user: str, auto_refresh_auth: bool = False) -> Twitch: twitch = Twitch( config.TWITCH_CLIENT_ID, config.TWITCH_CLIENT_SECRET ) - twitch.user_auth_refresh_callback = TokenStorage.save + twitch.user_auth_refresh_callback = lambda a, r: TokenStorage.save(user, a, r) twitch.auto_refresh_auth = auto_refresh_auth - token, refresh_token = await TokenStorage.get() + token, refresh_token = await TokenStorage.get(user) await twitch.set_user_authentication( token, SCOPES, diff --git a/src/modules/stream_notifications/twitch/token_storage.py b/src/modules/stream_notifications/twitch/token_storage.py index 13cd018..95a2bc6 100644 --- a/src/modules/stream_notifications/twitch/token_storage.py +++ b/src/modules/stream_notifications/twitch/token_storage.py @@ -3,10 +3,10 @@ from core.mongo import mongo_manager class TokenStorage: COLLECTION_NAME = "secrets" - OBJECT_ID = "twitch_tokens" + TYPE = "twitch_token" @staticmethod - async def save(acceess_token: str, refresh_token: str): + async def save(user: str, acceess_token: str, refresh_token: str): data = {"access_token": acceess_token, "refresh_token": refresh_token} async with mongo_manager.connect() as client: @@ -14,17 +14,17 @@ class TokenStorage: collection = db[TokenStorage.COLLECTION_NAME] await collection.update_one( - {"_id": TokenStorage.OBJECT_ID}, + {"type": TokenStorage.TYPE, "user": user}, {"$set": data}, upsert=True ) @staticmethod - async def get() -> tuple[str, str]: + async def get(user: str) -> tuple[str, str]: async with mongo_manager.connect() as client: db = client.get_default_database() collection = db[TokenStorage.COLLECTION_NAME] - data = await collection.find_one({"_id": TokenStorage.OBJECT_ID}) + data = await collection.find_one({"type": TokenStorage.TYPE, "user": user}) return data["access_token"], data["refresh_token"] diff --git a/src/modules/stream_notifications/twitch/webhook.py b/src/modules/stream_notifications/twitch/webhook.py index 4ae2bf0..444c24e 100644 --- a/src/modules/stream_notifications/twitch/webhook.py +++ b/src/modules/stream_notifications/twitch/webhook.py @@ -2,7 +2,7 @@ from asyncio import sleep, gather import logging from typing import NoReturn, Literal -from twitchAPI.eventsub.webhook import EventSubWebhook +from twitchAPI.eventsub.websocket import EventSubWebsocket from twitchAPI.twitch import Twitch from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent, ChannelChatMessageEvent, ChannelPointsCustomRewardRedemptionAddEvent from twitchAPI.oauth import validate_token @@ -22,8 +22,9 @@ logger = logging.getLogger(__name__) class TwitchService: ONLINE_NOTIFICATION_DELAY = 15 * 60 - def __init__(self, twitch: Twitch): + def __init__(self, twitch: Twitch, streamer: StreamerConfig): self.twitch = twitch + self.streamer = streamer self.failed = False @@ -31,6 +32,7 @@ class TwitchService: await on_stream_state_change_with_check.kiq( UpdateEvent( broadcaster_user_id=event.event.broadcaster_user_id, + broadcaster_user_login=event.event.broadcaster_user_login, title=event.event.title, category_name=event.event.category_name ), @@ -86,7 +88,7 @@ class TwitchService: | Literal["listen_stream_online"] | Literal["listen_channel_chat_message"] | Literal["listen_channel_points_custom_reward_redemption_add"], - eventsub: EventSubWebhook, + eventsub: EventSubWebsocket, streamer: StreamerConfig, retry: int = 10 ): @@ -120,7 +122,7 @@ class TwitchService: await sleep(1) await self.subscribe_with_retry(method, eventsub, streamer, retry - 1) - async def subscribe_to_streamer(self, eventsub: EventSubWebhook, streamer: StreamerConfig): + async def subscribe_to_streamer(self, eventsub: EventSubWebsocket, streamer: StreamerConfig): logger.info(f"Subscribe to events for {streamer.twitch.name}") await gather( self.subscribe_with_retry("listen_channel_update_v2", eventsub, streamer), @@ -150,24 +152,13 @@ class TwitchService: logger.info("Token refreshed") async def run(self) -> NoReturn: - eventsub = EventSubWebhook( - callback_url=config.TWITCH_CALLBACK_URL, - port=config.TWITCH_CALLBACK_PORT, - twitch=self.twitch, - message_deduplication_history_length=50 - ) - eventsub.wait_for_subscription_confirm_timeout = 60 - eventsub.unsubscribe_on_stop = False - - streamers = await StreamerConfigRepository.all() + eventsub = EventSubWebsocket(twitch=self.twitch) try: eventsub.start() logger.info("Subscribe to events...") - await gather( - *[self.subscribe_to_streamer(eventsub, streamer) for streamer in streamers] - ) + await self.subscribe_to_streamer(eventsub, self.streamer) logger.info("Twitch service started") await self._check_token() @@ -175,15 +166,23 @@ class TwitchService: logger.info("Twitch service stopping...") await eventsub.stop() + @classmethod + async def _start_for_streamer(cls, streamer: StreamerConfig): + try: + twith = await authorize(streamer.twitch.name, auto_refresh_auth=True) + await cls(twith, streamer).run() + except Exception as e: + logger.error("Twitch service failed", exc_info=e) + @classmethod async def start(cls): logger.info("Starting Twitch service...") - try: - twith = await authorize(auto_refresh_auth=True) - await cls(twith).run() - except Exception as e: - logger.error("Twitch service failed", exc_info=e) + streamers = await StreamerConfigRepository.all() + + await gather( + *[cls._start_for_streamer(streamer) for streamer in streamers] + ) logger.info("Twitch service stopped") diff --git a/src/modules/stream_notifications/watcher.py b/src/modules/stream_notifications/watcher.py index e665467..eed5d38 100644 --- a/src/modules/stream_notifications/watcher.py +++ b/src/modules/stream_notifications/watcher.py @@ -16,7 +16,7 @@ class StateWatcher: @classmethod async def get_twitch_state(cls, streamer_id: int) -> State | None: - twitch = await authorize() + twitch = await authorize("kurbezz") stream = await first( twitch.get_streams(user_id=[str(streamer_id)])