Compare commits

...

9 Commits

Author SHA1 Message Date
19e2207bc2 Fix 2025-03-18 19:29:08 +01:00
c511192f83 Fix 2025-03-18 19:23:54 +01:00
8140648034 Fix 2025-03-18 19:03:16 +01:00
d1f0681dba Fix 2025-03-18 19:02:57 +01:00
4d7ec071cb Fix 2025-03-18 18:58:46 +01:00
f46f2aee3f Fix 2025-03-18 18:56:27 +01:00
b5b3397bff clean subs before start 2025-03-18 18:10:05 +01:00
dac0ddb884 Update 2025-03-18 17:57:22 +01:00
08704c6529 Add reward redemption 2025-03-18 17:51:59 +01:00
8 changed files with 250 additions and 129 deletions

View File

@@ -6,7 +6,7 @@ from twitchAPI.object.eventsub import ChannelChatMessageEvent
from httpx import AsyncClient from httpx import AsyncClient
from core.config import config from core.config import config
from .twitch.authorize import authorize from .twitch.authorize import authorize, Twitch
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -139,9 +139,8 @@ async def get_completion(messages: list[dict]) -> str:
class MessagesProc: class MessagesProc:
IGNORED_USER_LOGINS = [ FULL_IGNORED_USER_LOGINS = [
"jeetbot", "jeetbot",
"kurbezz",
] ]
MESSAGE_LIMIT = 1000 MESSAGE_LIMIT = 1000
@@ -172,9 +171,7 @@ class MessagesProc:
return [m for m in cls.MESSAGE_HISTORY if m["id"] == message_id] return [m for m in cls.MESSAGE_HISTORY if m["id"] == message_id]
@classmethod @classmethod
async def on_message(cls, event: MessageEvent): async def _update_history(cls, event: MessageEvent):
logging.info(f"Received message: {event}")
cls.update_message_history( cls.update_message_history(
id=event.message_id, id=event.message_id,
text=event.message.text, text=event.message.text,
@@ -182,11 +179,8 @@ class MessagesProc:
thread_id=event.reply.thread_message_id if event.reply is not None else None thread_id=event.reply.thread_message_id if event.reply is not None else None
) )
if event.chatter_user_name == "pahangor": @classmethod
return async def _goida(cls, twitch: Twitch, event: MessageEvent):
twitch = await authorize()
if "гойда" in event.message.text.lower(): if "гойда" in event.message.text.lower():
await twitch.send_chat_message( await twitch.send_chat_message(
event.broadcaster_user_id, event.broadcaster_user_id,
@@ -195,67 +189,80 @@ class MessagesProc:
reply_parent_message_id=event.message_id reply_parent_message_id=event.message_id
) )
if "lasqexx" in event.chatter_user_login: @classmethod
if "здароу" in event.message.text.lower(): async def _lasqexx(cls, twitch: Twitch, event: MessageEvent):
if "lasqexx" not in event.chatter_user_login:
return
if "здароу" in event.message.text.lower():
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"Здароу, давай иди уже",
reply_parent_message_id=event.message_id
)
return
if "сосал?" in event.message.text.lower():
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"А ты? Иди уже",
reply_parent_message_id=event.message_id
)
return
if "лан я пошёл" in event.message.text.lower():
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"да да, иди уже",
reply_parent_message_id=event.message_id
)
return
@classmethod
async def _ask_ai(cls, twitch: Twitch, event: MessageEvent):
if not event.message.text.lower().startswith("!ai"):
return
try:
messages = cls.get_message_history_with_thread(
event.message_id,
thread_id=event.reply.thread_message_id if event.reply is not None else None
)
completion = await get_completion(messages)
max_length = 255
completion_parts = [completion[i:i + max_length] for i in range(0, len(completion), max_length)]
for part in completion_parts:
await twitch.send_chat_message( await twitch.send_chat_message(
event.broadcaster_user_id, event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID, config.TWITCH_ADMIN_USER_ID,
"Здароу, давай иди уже", part,
reply_parent_message_id=event.message_id reply_parent_message_id=event.message_id
) )
if "сосал?" in event.message.text.lower(): cls.update_message_history(
await twitch.send_chat_message( id="ai",
event.broadcaster_user_id, text=part,
config.TWITCH_ADMIN_USER_ID, user="kurbezz",
"А ты? Иди уже", thread_id=event.message_id
reply_parent_message_id=event.message_id
) )
except Exception as e:
logger.error("Failed to get completion: {}", e, exc_info=True)
if "лан я пошёл" in event.message.text.lower(): await twitch.send_chat_message(
await twitch.send_chat_message( event.broadcaster_user_id,
event.broadcaster_user_id, config.TWITCH_ADMIN_USER_ID,
config.TWITCH_ADMIN_USER_ID, "Ошибка!",
"да да, иди уже", reply_parent_message_id=event.message_id
reply_parent_message_id=event.message_id )
)
if event.message.text.lower().startswith("!ai"): @classmethod
try: async def _kurbezz(cls, twitch: Twitch, event: MessageEvent):
messages = cls.get_message_history_with_thread( if event.chatter_user_login == "kurbezz":
event.message_id,
thread_id=event.reply.thread_message_id if event.reply is not None else None
)
completion = await get_completion(messages)
max_length = 255
completion_parts = [completion[i:i + max_length] for i in range(0, len(completion), max_length)]
for part in completion_parts:
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
part,
reply_parent_message_id=event.message_id
)
cls.update_message_history(
id="ai",
text=part,
user="kurbezz",
thread_id=event.message_id
)
except Exception as e:
logger.error("Failed to get completion: {}", e, exc_info=True)
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"Ошибка!",
reply_parent_message_id=event.message_id
)
if event.chatter_user_login in cls.IGNORED_USER_LOGINS:
return return
if ("kurbezz" in event.message.text.lower() or \ if ("kurbezz" in event.message.text.lower() or \
@@ -295,3 +302,31 @@ class MessagesProc:
"Пошел нахуй!", "Пошел нахуй!",
reply_parent_message_id=event.message_id reply_parent_message_id=event.message_id
) )
@classmethod
async def _on_custom_reward(cls, twitch: Twitch, event: MessageEvent):
pass
# if event.channel_points_custom_reward_id:
# await twitch.send_chat_message(
# event.broadcaster_user_id,
# config.TWITCH_ADMIN_USER_ID,
# "Спасибо за поддержку!",
# reply_parent_message_id=event.message_id
# )
@classmethod
async def on_message(cls, event: MessageEvent):
if event.chatter_user_name in cls.FULL_IGNORED_USER_LOGINS:
return
logging.info(f"Received message: {event}")
await cls._update_history(event)
twitch = await authorize(event.broadcaster_user_login)
await cls._goida(twitch, event)
await cls._lasqexx(twitch, event)
await cls._ask_ai(twitch, event)
await cls._kurbezz(twitch, event)
await cls._on_custom_reward(twitch, event)

View File

@@ -0,0 +1,40 @@
import logging
from pydantic import BaseModel
from twitchAPI.object.eventsub import ChannelPointsCustomRewardRedemptionAddEvent
from .twitch.authorize import authorize
logger = logging.getLogger(__name__)
class RewardRedemption(BaseModel):
broadcaster_user_id: str
broadcaster_user_login: str
user_name: str
reward_title: str
reward_prompt: str
@classmethod
def from_twitch_event(cls, event: ChannelPointsCustomRewardRedemptionAddEvent):
return cls(
broadcaster_user_id=event.event.broadcaster_user_id,
broadcaster_user_login=event.event.broadcaster_user_login,
user_name=event.event.user_name,
reward_title=event.event.reward.title,
reward_prompt=event.event.reward.prompt or "",
)
async def on_redemption_reward_add(reward: RewardRedemption):
logger.info(f"{reward.user_name} just redeemed {reward.reward_title}!")
twitch = await authorize(reward.broadcaster_user_login)
await twitch.send_chat_message(
reward.broadcaster_user_id,
reward.broadcaster_user_id,
f"🎉 {reward.user_name} just redeemed {reward.reward_title}! 🎉"
)

View File

@@ -21,6 +21,7 @@ class State(BaseModel):
class UpdateEvent(BaseModel): class UpdateEvent(BaseModel):
broadcaster_user_id: str broadcaster_user_id: str
broadcaster_user_login: str
title: str title: str
category_name: str category_name: str

View File

@@ -9,6 +9,7 @@ from .state import State, UpdateEvent, EventType
from .watcher import StateWatcher from .watcher import StateWatcher
from .messages_proc import MessageEvent, MessagesProc from .messages_proc import MessageEvent, MessagesProc
from .twitch.authorize import authorize from .twitch.authorize import authorize
from .reward_redemption import RewardRedemption, on_redemption_reward_add
@broker.task( @broker.task(
@@ -19,7 +20,7 @@ async def on_stream_state_change_with_check(
event: UpdateEvent, event: UpdateEvent,
event_type: EventType event_type: EventType
): ):
twitch = await authorize() twitch = await authorize(event.broadcaster_user_login)
stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id])) stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id]))
if stream is None: if stream is None:
@@ -60,7 +61,7 @@ async def check_streams_states():
streamers = await StreamerConfigRepository.all() streamers = await StreamerConfigRepository.all()
streamers_ids = [str(streamer.twitch.id) for streamer in streamers] streamers_ids = [str(streamer.twitch.id) for streamer in streamers]
twitch = await authorize() twitch = await authorize("kurbezz")
async for stream in twitch.get_streams(user_id=streamers_ids): async for stream in twitch.get_streams(user_id=streamers_ids):
state = State( state = State(
@@ -82,3 +83,11 @@ async def check_streams_states():
) )
async def on_message(event: MessageEvent): async def on_message(event: MessageEvent):
await MessagesProc.on_message(event) await MessagesProc.on_message(event)
@broker.task(
"stream_notifications.on_redemption_reward_add",
retry_on_error=True
)
async def on_redemption_reward_add_task(event: RewardRedemption):
await on_redemption_reward_add(event)

View File

@@ -14,19 +14,21 @@ SCOPES = [
AuthScope.USER_BOT, AuthScope.USER_BOT,
AuthScope.USER_READ_CHAT, AuthScope.USER_READ_CHAT,
AuthScope.USER_WRITE_CHAT, AuthScope.USER_WRITE_CHAT,
AuthScope.CHANNEL_READ_REDEMPTIONS,
] ]
async def authorize(auto_refresh_auth: bool = False) -> Twitch: async def authorize(user: str, auto_refresh_auth: bool = False) -> Twitch:
twitch = Twitch( twitch = Twitch(
config.TWITCH_CLIENT_ID, config.TWITCH_CLIENT_ID,
config.TWITCH_CLIENT_SECRET config.TWITCH_CLIENT_SECRET
) )
twitch.user_auth_refresh_callback = TokenStorage.save twitch.user_auth_refresh_callback = lambda a, r: TokenStorage.save(user, a, r)
twitch.auto_refresh_auth = auto_refresh_auth twitch.auto_refresh_auth = auto_refresh_auth
token, refresh_token = await TokenStorage.get() token, refresh_token = await TokenStorage.get(user)
await twitch.set_user_authentication( await twitch.set_user_authentication(
token, token,
SCOPES, SCOPES,

View File

@@ -1,12 +1,16 @@
import logging
from core.mongo import mongo_manager from core.mongo import mongo_manager
logger = logging.getLogger(__name__)
class TokenStorage: class TokenStorage:
COLLECTION_NAME = "secrets" COLLECTION_NAME = "secrets"
OBJECT_ID = "twitch_tokens" TYPE = "twitch_token"
@staticmethod @staticmethod
async def save(acceess_token: str, refresh_token: str): async def save(user: str, acceess_token: str, refresh_token: str):
data = {"access_token": acceess_token, "refresh_token": refresh_token} data = {"access_token": acceess_token, "refresh_token": refresh_token}
async with mongo_manager.connect() as client: async with mongo_manager.connect() as client:
@@ -14,17 +18,20 @@ class TokenStorage:
collection = db[TokenStorage.COLLECTION_NAME] collection = db[TokenStorage.COLLECTION_NAME]
await collection.update_one( await collection.update_one(
{"_id": TokenStorage.OBJECT_ID}, {"type": TokenStorage.TYPE, "twitch_login": user},
{"$set": data}, {"$set": data},
upsert=True upsert=True
) )
@staticmethod @staticmethod
async def get() -> tuple[str, str]: async def get(user: str) -> tuple[str, str]:
async with mongo_manager.connect() as client: async with mongo_manager.connect() as client:
db = client.get_default_database() db = client.get_default_database()
collection = db[TokenStorage.COLLECTION_NAME] collection = db[TokenStorage.COLLECTION_NAME]
data = await collection.find_one({"_id": TokenStorage.OBJECT_ID}) data = await collection.find_one({"type": TokenStorage.TYPE, "twitch_login": user})
if data is None:
raise RuntimeError(f"Token for user {user} not found")
return data["access_token"], data["refresh_token"] return data["access_token"], data["refresh_token"]

View File

@@ -1,17 +1,18 @@
from asyncio import sleep, gather from asyncio import sleep, gather, wait, FIRST_COMPLETED, create_task
import logging import logging
from typing import NoReturn, Literal from typing import NoReturn, Literal
from twitchAPI.eventsub.webhook import EventSubWebhook from twitchAPI.eventsub.websocket import EventSubWebsocket
from twitchAPI.twitch import Twitch from twitchAPI.twitch import Twitch
from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent, ChannelChatMessageEvent from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent, ChannelChatMessageEvent, ChannelPointsCustomRewardRedemptionAddEvent
from twitchAPI.oauth import validate_token from twitchAPI.oauth import validate_token
from core.config import config from core.config import config
from repositories.streamers import StreamerConfigRepository, StreamerConfig from repositories.streamers import StreamerConfigRepository, StreamerConfig
from modules.stream_notifications.tasks import on_stream_state_change, on_stream_state_change_with_check, on_message from modules.stream_notifications.tasks import on_stream_state_change, on_stream_state_change_with_check, on_message, on_redemption_reward_add_task
from modules.stream_notifications.state import UpdateEvent, EventType from modules.stream_notifications.state import UpdateEvent, EventType
from modules.stream_notifications.messages_proc import MessageEvent from modules.stream_notifications.messages_proc import MessageEvent
from modules.stream_notifications.reward_redemption import RewardRedemption
from .authorize import authorize from .authorize import authorize
@@ -21,8 +22,9 @@ logger = logging.getLogger(__name__)
class TwitchService: class TwitchService:
ONLINE_NOTIFICATION_DELAY = 15 * 60 ONLINE_NOTIFICATION_DELAY = 15 * 60
def __init__(self, twitch: Twitch): def __init__(self, twitch: Twitch, streamer: StreamerConfig):
self.twitch = twitch self.twitch = twitch
self.streamer = streamer
self.failed = False self.failed = False
@@ -30,6 +32,7 @@ class TwitchService:
await on_stream_state_change_with_check.kiq( await on_stream_state_change_with_check.kiq(
UpdateEvent( UpdateEvent(
broadcaster_user_id=event.event.broadcaster_user_id, broadcaster_user_id=event.event.broadcaster_user_id,
broadcaster_user_login=event.event.broadcaster_user_login,
title=event.event.title, title=event.event.title,
category_name=event.event.category_name category_name=event.event.category_name
), ),
@@ -42,41 +45,20 @@ class TwitchService:
EventType.STREAM_ONLINE, EventType.STREAM_ONLINE,
) )
async def on_channel_points_custom_reward_redemption_add(
self,
event: ChannelPointsCustomRewardRedemptionAddEvent
):
await on_redemption_reward_add_task(
RewardRedemption.from_twitch_event(event)
)
async def on_message(self, event: ChannelChatMessageEvent): async def on_message(self, event: ChannelChatMessageEvent):
await on_message.kiq( await on_message.kiq(
MessageEvent.from_twitch_event(event) MessageEvent.from_twitch_event(event)
) )
async def subscribe_with_retry( async def _clean_subs(self, method: str, streamer: StreamerConfig):
self,
method: Literal["listen_channel_update_v2"]
| Literal["listen_stream_online"]
| Literal["listen_channel_chat_message"],
eventsub: EventSubWebhook,
streamer: StreamerConfig,
retry: int = 10
):
try:
match method:
case "listen_channel_update_v2":
await eventsub.listen_channel_update_v2(str(streamer.twitch.id), self.on_channel_update)
case "listen_stream_online":
await eventsub.listen_stream_online(str(streamer.twitch.id), self.on_stream_online)
case "listen_channel_chat_message":
await eventsub.listen_channel_chat_message(
str(streamer.twitch.id),
str(config.TWITCH_ADMIN_USER_ID),
self.on_message
)
case _:
raise ValueError("Unknown method")
return
except Exception as e:
if retry <= 0:
raise e
match method: match method:
case "listen_channel_update_v2": case "listen_channel_update_v2":
sub_type = "channel.update" sub_type = "channel.update"
@@ -84,6 +66,8 @@ class TwitchService:
sub_type = "stream.online" sub_type = "stream.online"
case "listen_channel_chat_message": case "listen_channel_chat_message":
sub_type = "channel.chat.message" sub_type = "channel.chat.message"
case "listen_channel_points_custom_reward_redemption_add":
sub_type = "channel.channel_points_custom_reward_redemption.add"
case _: case _:
raise ValueError("Unknown method") raise ValueError("Unknown method")
@@ -98,15 +82,53 @@ class TwitchService:
except Exception as e: except Exception as e:
logger.error(f"Failed to delete subscription {sub.id}", exc_info=e) logger.error(f"Failed to delete subscription {sub.id}", exc_info=e)
async def subscribe_with_retry(
self,
method: Literal["listen_channel_update_v2"]
| Literal["listen_stream_online"]
| Literal["listen_channel_chat_message"]
| Literal["listen_channel_points_custom_reward_redemption_add"],
eventsub: EventSubWebsocket,
streamer: StreamerConfig,
retry: int = 10
):
await self._clean_subs(method, streamer)
try:
match method:
case "listen_channel_update_v2":
await eventsub.listen_channel_update_v2(str(streamer.twitch.id), self.on_channel_update)
case "listen_stream_online":
await eventsub.listen_stream_online(str(streamer.twitch.id), self.on_stream_online)
case "listen_channel_chat_message":
await eventsub.listen_channel_chat_message(
str(streamer.twitch.id),
str(config.TWITCH_ADMIN_USER_ID),
self.on_message
)
case "listen_channel_points_custom_reward_redemption_add":
await eventsub.listen_channel_points_custom_reward_redemption_add(
str(streamer.twitch.id),
self.on_channel_points_custom_reward_redemption_add
)
case _:
raise ValueError("Unknown method")
return
except Exception as e:
if retry <= 0:
raise e
await sleep(1) await sleep(1)
await self.subscribe_with_retry(method, eventsub, streamer, retry - 1) await self.subscribe_with_retry(method, eventsub, streamer, retry - 1)
async def subscribe_to_streamer(self, eventsub: EventSubWebhook, streamer: StreamerConfig): async def subscribe_to_streamer(self, eventsub: EventSubWebsocket, streamer: StreamerConfig):
logger.info(f"Subscribe to events for {streamer.twitch.name}") logger.info(f"Subscribe to events for {streamer.twitch.name}")
await gather( await gather(
self.subscribe_with_retry("listen_channel_update_v2", eventsub, streamer), self.subscribe_with_retry("listen_channel_update_v2", eventsub, streamer),
self.subscribe_with_retry("listen_stream_online", eventsub, streamer), self.subscribe_with_retry("listen_stream_online", eventsub, streamer),
self.subscribe_with_retry("listen_channel_chat_message", eventsub, streamer), # self.subscribe_with_retry("listen_channel_chat_message", eventsub, streamer),
self.subscribe_with_retry("listen_channel_points_custom_reward_redemption_add", eventsub, streamer)
) )
logger.info(f"Subscribe to events for {streamer.twitch.name} done") logger.info(f"Subscribe to events for {streamer.twitch.name} done")
@@ -130,24 +152,13 @@ class TwitchService:
logger.info("Token refreshed") logger.info("Token refreshed")
async def run(self) -> NoReturn: async def run(self) -> NoReturn:
eventsub = EventSubWebhook( eventsub = EventSubWebsocket(twitch=self.twitch)
callback_url=config.TWITCH_CALLBACK_URL,
port=config.TWITCH_CALLBACK_PORT,
twitch=self.twitch,
message_deduplication_history_length=50
)
eventsub.wait_for_subscription_confirm_timeout = 60
eventsub.unsubscribe_on_stop = False
streamers = await StreamerConfigRepository.all()
try: try:
eventsub.start() eventsub.start()
logger.info("Subscribe to events...") logger.info("Subscribe to events...")
await gather( await self.subscribe_to_streamer(eventsub, self.streamer)
*[self.subscribe_to_streamer(eventsub, streamer) for streamer in streamers]
)
logger.info("Twitch service started") logger.info("Twitch service started")
await self._check_token() await self._check_token()
@@ -155,15 +166,31 @@ class TwitchService:
logger.info("Twitch service stopping...") logger.info("Twitch service stopping...")
await eventsub.stop() await eventsub.stop()
@classmethod
async def _start_for_streamer(cls, streamer: StreamerConfig):
try:
twith = await authorize(streamer.twitch.name, auto_refresh_auth=True)
await cls(twith, streamer).run()
except Exception as e:
logger.error("Twitch service failed", exc_info=e)
@classmethod @classmethod
async def start(cls): async def start(cls):
logger.info("Starting Twitch service...") logger.info("Starting Twitch service...")
try: streamers = await StreamerConfigRepository.all()
twith = await authorize(auto_refresh_auth=True)
await cls(twith).run() await wait(
except Exception as e: [
logger.error("Twitch service failed", exc_info=e) create_task(cls._start_for_streamer(streamer))
for streamer in streamers
],
return_when=FIRST_COMPLETED
)
await gather(
*[cls._start_for_streamer(streamer) for streamer in streamers]
)
logger.info("Twitch service stopped") logger.info("Twitch service stopped")

View File

@@ -16,7 +16,7 @@ class StateWatcher:
@classmethod @classmethod
async def get_twitch_state(cls, streamer_id: int) -> State | None: async def get_twitch_state(cls, streamer_id: int) -> State | None:
twitch = await authorize() twitch = await authorize("kurbezz")
stream = await first( stream = await first(
twitch.get_streams(user_id=[str(streamer_id)]) twitch.get_streams(user_id=[str(streamer_id)])