Compare commits

...

9 Commits

Author SHA1 Message Date
19e2207bc2 Fix 2025-03-18 19:29:08 +01:00
c511192f83 Fix 2025-03-18 19:23:54 +01:00
8140648034 Fix 2025-03-18 19:03:16 +01:00
d1f0681dba Fix 2025-03-18 19:02:57 +01:00
4d7ec071cb Fix 2025-03-18 18:58:46 +01:00
f46f2aee3f Fix 2025-03-18 18:56:27 +01:00
b5b3397bff clean subs before start 2025-03-18 18:10:05 +01:00
dac0ddb884 Update 2025-03-18 17:57:22 +01:00
08704c6529 Add reward redemption 2025-03-18 17:51:59 +01:00
8 changed files with 250 additions and 129 deletions

View File

@@ -6,7 +6,7 @@ from twitchAPI.object.eventsub import ChannelChatMessageEvent
from httpx import AsyncClient
from core.config import config
from .twitch.authorize import authorize
from .twitch.authorize import authorize, Twitch
logger = logging.getLogger(__name__)
@@ -139,9 +139,8 @@ async def get_completion(messages: list[dict]) -> str:
class MessagesProc:
IGNORED_USER_LOGINS = [
FULL_IGNORED_USER_LOGINS = [
"jeetbot",
"kurbezz",
]
MESSAGE_LIMIT = 1000
@@ -172,9 +171,7 @@ class MessagesProc:
return [m for m in cls.MESSAGE_HISTORY if m["id"] == message_id]
@classmethod
async def on_message(cls, event: MessageEvent):
logging.info(f"Received message: {event}")
async def _update_history(cls, event: MessageEvent):
cls.update_message_history(
id=event.message_id,
text=event.message.text,
@@ -182,11 +179,8 @@ class MessagesProc:
thread_id=event.reply.thread_message_id if event.reply is not None else None
)
if event.chatter_user_name == "pahangor":
return
twitch = await authorize()
@classmethod
async def _goida(cls, twitch: Twitch, event: MessageEvent):
if "гойда" in event.message.text.lower():
await twitch.send_chat_message(
event.broadcaster_user_id,
@@ -195,7 +189,11 @@ class MessagesProc:
reply_parent_message_id=event.message_id
)
if "lasqexx" in event.chatter_user_login:
@classmethod
async def _lasqexx(cls, twitch: Twitch, event: MessageEvent):
if "lasqexx" not in event.chatter_user_login:
return
if "здароу" in event.message.text.lower():
await twitch.send_chat_message(
event.broadcaster_user_id,
@@ -203,6 +201,7 @@ class MessagesProc:
"Здароу, давай иди уже",
reply_parent_message_id=event.message_id
)
return
if "сосал?" in event.message.text.lower():
await twitch.send_chat_message(
@@ -211,6 +210,7 @@ class MessagesProc:
"А ты? Иди уже",
reply_parent_message_id=event.message_id
)
return
if "лан я пошёл" in event.message.text.lower():
await twitch.send_chat_message(
@@ -219,8 +219,13 @@ class MessagesProc:
"да да, иди уже",
reply_parent_message_id=event.message_id
)
return
@classmethod
async def _ask_ai(cls, twitch: Twitch, event: MessageEvent):
if not event.message.text.lower().startswith("!ai"):
return
if event.message.text.lower().startswith("!ai"):
try:
messages = cls.get_message_history_with_thread(
event.message_id,
@@ -255,7 +260,9 @@ class MessagesProc:
reply_parent_message_id=event.message_id
)
if event.chatter_user_login in cls.IGNORED_USER_LOGINS:
@classmethod
async def _kurbezz(cls, twitch: Twitch, event: MessageEvent):
if event.chatter_user_login == "kurbezz":
return
if ("kurbezz" in event.message.text.lower() or \
@@ -295,3 +302,31 @@ class MessagesProc:
"Пошел нахуй!",
reply_parent_message_id=event.message_id
)
@classmethod
async def _on_custom_reward(cls, twitch: Twitch, event: MessageEvent):
pass
# if event.channel_points_custom_reward_id:
# await twitch.send_chat_message(
# event.broadcaster_user_id,
# config.TWITCH_ADMIN_USER_ID,
# "Спасибо за поддержку!",
# reply_parent_message_id=event.message_id
# )
@classmethod
async def on_message(cls, event: MessageEvent):
if event.chatter_user_name in cls.FULL_IGNORED_USER_LOGINS:
return
logging.info(f"Received message: {event}")
await cls._update_history(event)
twitch = await authorize(event.broadcaster_user_login)
await cls._goida(twitch, event)
await cls._lasqexx(twitch, event)
await cls._ask_ai(twitch, event)
await cls._kurbezz(twitch, event)
await cls._on_custom_reward(twitch, event)

View File

@@ -0,0 +1,40 @@
import logging
from pydantic import BaseModel
from twitchAPI.object.eventsub import ChannelPointsCustomRewardRedemptionAddEvent
from .twitch.authorize import authorize
logger = logging.getLogger(__name__)
class RewardRedemption(BaseModel):
broadcaster_user_id: str
broadcaster_user_login: str
user_name: str
reward_title: str
reward_prompt: str
@classmethod
def from_twitch_event(cls, event: ChannelPointsCustomRewardRedemptionAddEvent):
return cls(
broadcaster_user_id=event.event.broadcaster_user_id,
broadcaster_user_login=event.event.broadcaster_user_login,
user_name=event.event.user_name,
reward_title=event.event.reward.title,
reward_prompt=event.event.reward.prompt or "",
)
async def on_redemption_reward_add(reward: RewardRedemption):
logger.info(f"{reward.user_name} just redeemed {reward.reward_title}!")
twitch = await authorize(reward.broadcaster_user_login)
await twitch.send_chat_message(
reward.broadcaster_user_id,
reward.broadcaster_user_id,
f"🎉 {reward.user_name} just redeemed {reward.reward_title}! 🎉"
)

View File

@@ -21,6 +21,7 @@ class State(BaseModel):
class UpdateEvent(BaseModel):
broadcaster_user_id: str
broadcaster_user_login: str
title: str
category_name: str

View File

@@ -9,6 +9,7 @@ from .state import State, UpdateEvent, EventType
from .watcher import StateWatcher
from .messages_proc import MessageEvent, MessagesProc
from .twitch.authorize import authorize
from .reward_redemption import RewardRedemption, on_redemption_reward_add
@broker.task(
@@ -19,7 +20,7 @@ async def on_stream_state_change_with_check(
event: UpdateEvent,
event_type: EventType
):
twitch = await authorize()
twitch = await authorize(event.broadcaster_user_login)
stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id]))
if stream is None:
@@ -60,7 +61,7 @@ async def check_streams_states():
streamers = await StreamerConfigRepository.all()
streamers_ids = [str(streamer.twitch.id) for streamer in streamers]
twitch = await authorize()
twitch = await authorize("kurbezz")
async for stream in twitch.get_streams(user_id=streamers_ids):
state = State(
@@ -82,3 +83,11 @@ async def check_streams_states():
)
async def on_message(event: MessageEvent):
await MessagesProc.on_message(event)
@broker.task(
"stream_notifications.on_redemption_reward_add",
retry_on_error=True
)
async def on_redemption_reward_add_task(event: RewardRedemption):
await on_redemption_reward_add(event)

View File

@@ -14,19 +14,21 @@ SCOPES = [
AuthScope.USER_BOT,
AuthScope.USER_READ_CHAT,
AuthScope.USER_WRITE_CHAT,
AuthScope.CHANNEL_READ_REDEMPTIONS,
]
async def authorize(auto_refresh_auth: bool = False) -> Twitch:
async def authorize(user: str, auto_refresh_auth: bool = False) -> Twitch:
twitch = Twitch(
config.TWITCH_CLIENT_ID,
config.TWITCH_CLIENT_SECRET
)
twitch.user_auth_refresh_callback = TokenStorage.save
twitch.user_auth_refresh_callback = lambda a, r: TokenStorage.save(user, a, r)
twitch.auto_refresh_auth = auto_refresh_auth
token, refresh_token = await TokenStorage.get()
token, refresh_token = await TokenStorage.get(user)
await twitch.set_user_authentication(
token,
SCOPES,

View File

@@ -1,12 +1,16 @@
import logging
from core.mongo import mongo_manager
logger = logging.getLogger(__name__)
class TokenStorage:
COLLECTION_NAME = "secrets"
OBJECT_ID = "twitch_tokens"
TYPE = "twitch_token"
@staticmethod
async def save(acceess_token: str, refresh_token: str):
async def save(user: str, acceess_token: str, refresh_token: str):
data = {"access_token": acceess_token, "refresh_token": refresh_token}
async with mongo_manager.connect() as client:
@@ -14,17 +18,20 @@ class TokenStorage:
collection = db[TokenStorage.COLLECTION_NAME]
await collection.update_one(
{"_id": TokenStorage.OBJECT_ID},
{"type": TokenStorage.TYPE, "twitch_login": user},
{"$set": data},
upsert=True
)
@staticmethod
async def get() -> tuple[str, str]:
async def get(user: str) -> tuple[str, str]:
async with mongo_manager.connect() as client:
db = client.get_default_database()
collection = db[TokenStorage.COLLECTION_NAME]
data = await collection.find_one({"_id": TokenStorage.OBJECT_ID})
data = await collection.find_one({"type": TokenStorage.TYPE, "twitch_login": user})
if data is None:
raise RuntimeError(f"Token for user {user} not found")
return data["access_token"], data["refresh_token"]

View File

@@ -1,17 +1,18 @@
from asyncio import sleep, gather
from asyncio import sleep, gather, wait, FIRST_COMPLETED, create_task
import logging
from typing import NoReturn, Literal
from twitchAPI.eventsub.webhook import EventSubWebhook
from twitchAPI.eventsub.websocket import EventSubWebsocket
from twitchAPI.twitch import Twitch
from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent, ChannelChatMessageEvent
from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent, ChannelChatMessageEvent, ChannelPointsCustomRewardRedemptionAddEvent
from twitchAPI.oauth import validate_token
from core.config import config
from repositories.streamers import StreamerConfigRepository, StreamerConfig
from modules.stream_notifications.tasks import on_stream_state_change, on_stream_state_change_with_check, on_message
from modules.stream_notifications.tasks import on_stream_state_change, on_stream_state_change_with_check, on_message, on_redemption_reward_add_task
from modules.stream_notifications.state import UpdateEvent, EventType
from modules.stream_notifications.messages_proc import MessageEvent
from modules.stream_notifications.reward_redemption import RewardRedemption
from .authorize import authorize
@@ -21,8 +22,9 @@ logger = logging.getLogger(__name__)
class TwitchService:
ONLINE_NOTIFICATION_DELAY = 15 * 60
def __init__(self, twitch: Twitch):
def __init__(self, twitch: Twitch, streamer: StreamerConfig):
self.twitch = twitch
self.streamer = streamer
self.failed = False
@@ -30,6 +32,7 @@ class TwitchService:
await on_stream_state_change_with_check.kiq(
UpdateEvent(
broadcaster_user_id=event.event.broadcaster_user_id,
broadcaster_user_login=event.event.broadcaster_user_login,
title=event.event.title,
category_name=event.event.category_name
),
@@ -42,41 +45,20 @@ class TwitchService:
EventType.STREAM_ONLINE,
)
async def on_channel_points_custom_reward_redemption_add(
self,
event: ChannelPointsCustomRewardRedemptionAddEvent
):
await on_redemption_reward_add_task(
RewardRedemption.from_twitch_event(event)
)
async def on_message(self, event: ChannelChatMessageEvent):
await on_message.kiq(
MessageEvent.from_twitch_event(event)
)
async def subscribe_with_retry(
self,
method: Literal["listen_channel_update_v2"]
| Literal["listen_stream_online"]
| Literal["listen_channel_chat_message"],
eventsub: EventSubWebhook,
streamer: StreamerConfig,
retry: int = 10
):
try:
match method:
case "listen_channel_update_v2":
await eventsub.listen_channel_update_v2(str(streamer.twitch.id), self.on_channel_update)
case "listen_stream_online":
await eventsub.listen_stream_online(str(streamer.twitch.id), self.on_stream_online)
case "listen_channel_chat_message":
await eventsub.listen_channel_chat_message(
str(streamer.twitch.id),
str(config.TWITCH_ADMIN_USER_ID),
self.on_message
)
case _:
raise ValueError("Unknown method")
return
except Exception as e:
if retry <= 0:
raise e
async def _clean_subs(self, method: str, streamer: StreamerConfig):
match method:
case "listen_channel_update_v2":
sub_type = "channel.update"
@@ -84,6 +66,8 @@ class TwitchService:
sub_type = "stream.online"
case "listen_channel_chat_message":
sub_type = "channel.chat.message"
case "listen_channel_points_custom_reward_redemption_add":
sub_type = "channel.channel_points_custom_reward_redemption.add"
case _:
raise ValueError("Unknown method")
@@ -98,15 +82,53 @@ class TwitchService:
except Exception as e:
logger.error(f"Failed to delete subscription {sub.id}", exc_info=e)
async def subscribe_with_retry(
self,
method: Literal["listen_channel_update_v2"]
| Literal["listen_stream_online"]
| Literal["listen_channel_chat_message"]
| Literal["listen_channel_points_custom_reward_redemption_add"],
eventsub: EventSubWebsocket,
streamer: StreamerConfig,
retry: int = 10
):
await self._clean_subs(method, streamer)
try:
match method:
case "listen_channel_update_v2":
await eventsub.listen_channel_update_v2(str(streamer.twitch.id), self.on_channel_update)
case "listen_stream_online":
await eventsub.listen_stream_online(str(streamer.twitch.id), self.on_stream_online)
case "listen_channel_chat_message":
await eventsub.listen_channel_chat_message(
str(streamer.twitch.id),
str(config.TWITCH_ADMIN_USER_ID),
self.on_message
)
case "listen_channel_points_custom_reward_redemption_add":
await eventsub.listen_channel_points_custom_reward_redemption_add(
str(streamer.twitch.id),
self.on_channel_points_custom_reward_redemption_add
)
case _:
raise ValueError("Unknown method")
return
except Exception as e:
if retry <= 0:
raise e
await sleep(1)
await self.subscribe_with_retry(method, eventsub, streamer, retry - 1)
async def subscribe_to_streamer(self, eventsub: EventSubWebhook, streamer: StreamerConfig):
async def subscribe_to_streamer(self, eventsub: EventSubWebsocket, streamer: StreamerConfig):
logger.info(f"Subscribe to events for {streamer.twitch.name}")
await gather(
self.subscribe_with_retry("listen_channel_update_v2", eventsub, streamer),
self.subscribe_with_retry("listen_stream_online", eventsub, streamer),
self.subscribe_with_retry("listen_channel_chat_message", eventsub, streamer),
# self.subscribe_with_retry("listen_channel_chat_message", eventsub, streamer),
self.subscribe_with_retry("listen_channel_points_custom_reward_redemption_add", eventsub, streamer)
)
logger.info(f"Subscribe to events for {streamer.twitch.name} done")
@@ -130,24 +152,13 @@ class TwitchService:
logger.info("Token refreshed")
async def run(self) -> NoReturn:
eventsub = EventSubWebhook(
callback_url=config.TWITCH_CALLBACK_URL,
port=config.TWITCH_CALLBACK_PORT,
twitch=self.twitch,
message_deduplication_history_length=50
)
eventsub.wait_for_subscription_confirm_timeout = 60
eventsub.unsubscribe_on_stop = False
streamers = await StreamerConfigRepository.all()
eventsub = EventSubWebsocket(twitch=self.twitch)
try:
eventsub.start()
logger.info("Subscribe to events...")
await gather(
*[self.subscribe_to_streamer(eventsub, streamer) for streamer in streamers]
)
await self.subscribe_to_streamer(eventsub, self.streamer)
logger.info("Twitch service started")
await self._check_token()
@@ -155,15 +166,31 @@ class TwitchService:
logger.info("Twitch service stopping...")
await eventsub.stop()
@classmethod
async def _start_for_streamer(cls, streamer: StreamerConfig):
try:
twith = await authorize(streamer.twitch.name, auto_refresh_auth=True)
await cls(twith, streamer).run()
except Exception as e:
logger.error("Twitch service failed", exc_info=e)
@classmethod
async def start(cls):
logger.info("Starting Twitch service...")
try:
twith = await authorize(auto_refresh_auth=True)
await cls(twith).run()
except Exception as e:
logger.error("Twitch service failed", exc_info=e)
streamers = await StreamerConfigRepository.all()
await wait(
[
create_task(cls._start_for_streamer(streamer))
for streamer in streamers
],
return_when=FIRST_COMPLETED
)
await gather(
*[cls._start_for_streamer(streamer) for streamer in streamers]
)
logger.info("Twitch service stopped")

View File

@@ -16,7 +16,7 @@ class StateWatcher:
@classmethod
async def get_twitch_state(cls, streamer_id: int) -> State | None:
twitch = await authorize()
twitch = await authorize("kurbezz")
stream = await first(
twitch.get_streams(user_id=[str(streamer_id)])