mirror of
https://github.com/kurbezz/discord-bot.git
synced 2025-12-06 15:15:37 +01:00
Compare commits
9 Commits
74acf4596a
...
19e2207bc2
| Author | SHA1 | Date | |
|---|---|---|---|
| 19e2207bc2 | |||
| c511192f83 | |||
| 8140648034 | |||
| d1f0681dba | |||
| 4d7ec071cb | |||
| f46f2aee3f | |||
| b5b3397bff | |||
| dac0ddb884 | |||
| 08704c6529 |
@@ -6,7 +6,7 @@ from twitchAPI.object.eventsub import ChannelChatMessageEvent
|
|||||||
from httpx import AsyncClient
|
from httpx import AsyncClient
|
||||||
|
|
||||||
from core.config import config
|
from core.config import config
|
||||||
from .twitch.authorize import authorize
|
from .twitch.authorize import authorize, Twitch
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -139,9 +139,8 @@ async def get_completion(messages: list[dict]) -> str:
|
|||||||
|
|
||||||
|
|
||||||
class MessagesProc:
|
class MessagesProc:
|
||||||
IGNORED_USER_LOGINS = [
|
FULL_IGNORED_USER_LOGINS = [
|
||||||
"jeetbot",
|
"jeetbot",
|
||||||
"kurbezz",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
MESSAGE_LIMIT = 1000
|
MESSAGE_LIMIT = 1000
|
||||||
@@ -172,9 +171,7 @@ class MessagesProc:
|
|||||||
return [m for m in cls.MESSAGE_HISTORY if m["id"] == message_id]
|
return [m for m in cls.MESSAGE_HISTORY if m["id"] == message_id]
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def on_message(cls, event: MessageEvent):
|
async def _update_history(cls, event: MessageEvent):
|
||||||
logging.info(f"Received message: {event}")
|
|
||||||
|
|
||||||
cls.update_message_history(
|
cls.update_message_history(
|
||||||
id=event.message_id,
|
id=event.message_id,
|
||||||
text=event.message.text,
|
text=event.message.text,
|
||||||
@@ -182,11 +179,8 @@ class MessagesProc:
|
|||||||
thread_id=event.reply.thread_message_id if event.reply is not None else None
|
thread_id=event.reply.thread_message_id if event.reply is not None else None
|
||||||
)
|
)
|
||||||
|
|
||||||
if event.chatter_user_name == "pahangor":
|
@classmethod
|
||||||
return
|
async def _goida(cls, twitch: Twitch, event: MessageEvent):
|
||||||
|
|
||||||
twitch = await authorize()
|
|
||||||
|
|
||||||
if "гойда" in event.message.text.lower():
|
if "гойда" in event.message.text.lower():
|
||||||
await twitch.send_chat_message(
|
await twitch.send_chat_message(
|
||||||
event.broadcaster_user_id,
|
event.broadcaster_user_id,
|
||||||
@@ -195,7 +189,11 @@ class MessagesProc:
|
|||||||
reply_parent_message_id=event.message_id
|
reply_parent_message_id=event.message_id
|
||||||
)
|
)
|
||||||
|
|
||||||
if "lasqexx" in event.chatter_user_login:
|
@classmethod
|
||||||
|
async def _lasqexx(cls, twitch: Twitch, event: MessageEvent):
|
||||||
|
if "lasqexx" not in event.chatter_user_login:
|
||||||
|
return
|
||||||
|
|
||||||
if "здароу" in event.message.text.lower():
|
if "здароу" in event.message.text.lower():
|
||||||
await twitch.send_chat_message(
|
await twitch.send_chat_message(
|
||||||
event.broadcaster_user_id,
|
event.broadcaster_user_id,
|
||||||
@@ -203,6 +201,7 @@ class MessagesProc:
|
|||||||
"Здароу, давай иди уже",
|
"Здароу, давай иди уже",
|
||||||
reply_parent_message_id=event.message_id
|
reply_parent_message_id=event.message_id
|
||||||
)
|
)
|
||||||
|
return
|
||||||
|
|
||||||
if "сосал?" in event.message.text.lower():
|
if "сосал?" in event.message.text.lower():
|
||||||
await twitch.send_chat_message(
|
await twitch.send_chat_message(
|
||||||
@@ -211,6 +210,7 @@ class MessagesProc:
|
|||||||
"А ты? Иди уже",
|
"А ты? Иди уже",
|
||||||
reply_parent_message_id=event.message_id
|
reply_parent_message_id=event.message_id
|
||||||
)
|
)
|
||||||
|
return
|
||||||
|
|
||||||
if "лан я пошёл" in event.message.text.lower():
|
if "лан я пошёл" in event.message.text.lower():
|
||||||
await twitch.send_chat_message(
|
await twitch.send_chat_message(
|
||||||
@@ -219,8 +219,13 @@ class MessagesProc:
|
|||||||
"да да, иди уже",
|
"да да, иди уже",
|
||||||
reply_parent_message_id=event.message_id
|
reply_parent_message_id=event.message_id
|
||||||
)
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def _ask_ai(cls, twitch: Twitch, event: MessageEvent):
|
||||||
|
if not event.message.text.lower().startswith("!ai"):
|
||||||
|
return
|
||||||
|
|
||||||
if event.message.text.lower().startswith("!ai"):
|
|
||||||
try:
|
try:
|
||||||
messages = cls.get_message_history_with_thread(
|
messages = cls.get_message_history_with_thread(
|
||||||
event.message_id,
|
event.message_id,
|
||||||
@@ -255,7 +260,9 @@ class MessagesProc:
|
|||||||
reply_parent_message_id=event.message_id
|
reply_parent_message_id=event.message_id
|
||||||
)
|
)
|
||||||
|
|
||||||
if event.chatter_user_login in cls.IGNORED_USER_LOGINS:
|
@classmethod
|
||||||
|
async def _kurbezz(cls, twitch: Twitch, event: MessageEvent):
|
||||||
|
if event.chatter_user_login == "kurbezz":
|
||||||
return
|
return
|
||||||
|
|
||||||
if ("kurbezz" in event.message.text.lower() or \
|
if ("kurbezz" in event.message.text.lower() or \
|
||||||
@@ -295,3 +302,31 @@ class MessagesProc:
|
|||||||
"Пошел нахуй!",
|
"Пошел нахуй!",
|
||||||
reply_parent_message_id=event.message_id
|
reply_parent_message_id=event.message_id
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def _on_custom_reward(cls, twitch: Twitch, event: MessageEvent):
|
||||||
|
pass
|
||||||
|
# if event.channel_points_custom_reward_id:
|
||||||
|
# await twitch.send_chat_message(
|
||||||
|
# event.broadcaster_user_id,
|
||||||
|
# config.TWITCH_ADMIN_USER_ID,
|
||||||
|
# "Спасибо за поддержку!",
|
||||||
|
# reply_parent_message_id=event.message_id
|
||||||
|
# )
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def on_message(cls, event: MessageEvent):
|
||||||
|
if event.chatter_user_name in cls.FULL_IGNORED_USER_LOGINS:
|
||||||
|
return
|
||||||
|
|
||||||
|
logging.info(f"Received message: {event}")
|
||||||
|
|
||||||
|
await cls._update_history(event)
|
||||||
|
|
||||||
|
twitch = await authorize(event.broadcaster_user_login)
|
||||||
|
|
||||||
|
await cls._goida(twitch, event)
|
||||||
|
await cls._lasqexx(twitch, event)
|
||||||
|
await cls._ask_ai(twitch, event)
|
||||||
|
await cls._kurbezz(twitch, event)
|
||||||
|
await cls._on_custom_reward(twitch, event)
|
||||||
|
|||||||
40
src/modules/stream_notifications/reward_redemption.py
Normal file
40
src/modules/stream_notifications/reward_redemption.py
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from twitchAPI.object.eventsub import ChannelPointsCustomRewardRedemptionAddEvent
|
||||||
|
|
||||||
|
from .twitch.authorize import authorize
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class RewardRedemption(BaseModel):
|
||||||
|
broadcaster_user_id: str
|
||||||
|
broadcaster_user_login: str
|
||||||
|
user_name: str
|
||||||
|
reward_title: str
|
||||||
|
reward_prompt: str
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_twitch_event(cls, event: ChannelPointsCustomRewardRedemptionAddEvent):
|
||||||
|
return cls(
|
||||||
|
broadcaster_user_id=event.event.broadcaster_user_id,
|
||||||
|
broadcaster_user_login=event.event.broadcaster_user_login,
|
||||||
|
user_name=event.event.user_name,
|
||||||
|
reward_title=event.event.reward.title,
|
||||||
|
reward_prompt=event.event.reward.prompt or "",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def on_redemption_reward_add(reward: RewardRedemption):
|
||||||
|
logger.info(f"{reward.user_name} just redeemed {reward.reward_title}!")
|
||||||
|
|
||||||
|
twitch = await authorize(reward.broadcaster_user_login)
|
||||||
|
|
||||||
|
await twitch.send_chat_message(
|
||||||
|
reward.broadcaster_user_id,
|
||||||
|
reward.broadcaster_user_id,
|
||||||
|
f"🎉 {reward.user_name} just redeemed {reward.reward_title}! 🎉"
|
||||||
|
)
|
||||||
@@ -21,6 +21,7 @@ class State(BaseModel):
|
|||||||
|
|
||||||
class UpdateEvent(BaseModel):
|
class UpdateEvent(BaseModel):
|
||||||
broadcaster_user_id: str
|
broadcaster_user_id: str
|
||||||
|
broadcaster_user_login: str
|
||||||
title: str
|
title: str
|
||||||
category_name: str
|
category_name: str
|
||||||
|
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ from .state import State, UpdateEvent, EventType
|
|||||||
from .watcher import StateWatcher
|
from .watcher import StateWatcher
|
||||||
from .messages_proc import MessageEvent, MessagesProc
|
from .messages_proc import MessageEvent, MessagesProc
|
||||||
from .twitch.authorize import authorize
|
from .twitch.authorize import authorize
|
||||||
|
from .reward_redemption import RewardRedemption, on_redemption_reward_add
|
||||||
|
|
||||||
|
|
||||||
@broker.task(
|
@broker.task(
|
||||||
@@ -19,7 +20,7 @@ async def on_stream_state_change_with_check(
|
|||||||
event: UpdateEvent,
|
event: UpdateEvent,
|
||||||
event_type: EventType
|
event_type: EventType
|
||||||
):
|
):
|
||||||
twitch = await authorize()
|
twitch = await authorize(event.broadcaster_user_login)
|
||||||
|
|
||||||
stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id]))
|
stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id]))
|
||||||
if stream is None:
|
if stream is None:
|
||||||
@@ -60,7 +61,7 @@ async def check_streams_states():
|
|||||||
streamers = await StreamerConfigRepository.all()
|
streamers = await StreamerConfigRepository.all()
|
||||||
streamers_ids = [str(streamer.twitch.id) for streamer in streamers]
|
streamers_ids = [str(streamer.twitch.id) for streamer in streamers]
|
||||||
|
|
||||||
twitch = await authorize()
|
twitch = await authorize("kurbezz")
|
||||||
|
|
||||||
async for stream in twitch.get_streams(user_id=streamers_ids):
|
async for stream in twitch.get_streams(user_id=streamers_ids):
|
||||||
state = State(
|
state = State(
|
||||||
@@ -82,3 +83,11 @@ async def check_streams_states():
|
|||||||
)
|
)
|
||||||
async def on_message(event: MessageEvent):
|
async def on_message(event: MessageEvent):
|
||||||
await MessagesProc.on_message(event)
|
await MessagesProc.on_message(event)
|
||||||
|
|
||||||
|
|
||||||
|
@broker.task(
|
||||||
|
"stream_notifications.on_redemption_reward_add",
|
||||||
|
retry_on_error=True
|
||||||
|
)
|
||||||
|
async def on_redemption_reward_add_task(event: RewardRedemption):
|
||||||
|
await on_redemption_reward_add(event)
|
||||||
|
|||||||
@@ -14,19 +14,21 @@ SCOPES = [
|
|||||||
AuthScope.USER_BOT,
|
AuthScope.USER_BOT,
|
||||||
AuthScope.USER_READ_CHAT,
|
AuthScope.USER_READ_CHAT,
|
||||||
AuthScope.USER_WRITE_CHAT,
|
AuthScope.USER_WRITE_CHAT,
|
||||||
|
|
||||||
|
AuthScope.CHANNEL_READ_REDEMPTIONS,
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
async def authorize(auto_refresh_auth: bool = False) -> Twitch:
|
async def authorize(user: str, auto_refresh_auth: bool = False) -> Twitch:
|
||||||
twitch = Twitch(
|
twitch = Twitch(
|
||||||
config.TWITCH_CLIENT_ID,
|
config.TWITCH_CLIENT_ID,
|
||||||
config.TWITCH_CLIENT_SECRET
|
config.TWITCH_CLIENT_SECRET
|
||||||
)
|
)
|
||||||
|
|
||||||
twitch.user_auth_refresh_callback = TokenStorage.save
|
twitch.user_auth_refresh_callback = lambda a, r: TokenStorage.save(user, a, r)
|
||||||
twitch.auto_refresh_auth = auto_refresh_auth
|
twitch.auto_refresh_auth = auto_refresh_auth
|
||||||
|
|
||||||
token, refresh_token = await TokenStorage.get()
|
token, refresh_token = await TokenStorage.get(user)
|
||||||
await twitch.set_user_authentication(
|
await twitch.set_user_authentication(
|
||||||
token,
|
token,
|
||||||
SCOPES,
|
SCOPES,
|
||||||
|
|||||||
@@ -1,12 +1,16 @@
|
|||||||
|
import logging
|
||||||
|
|
||||||
from core.mongo import mongo_manager
|
from core.mongo import mongo_manager
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class TokenStorage:
|
class TokenStorage:
|
||||||
COLLECTION_NAME = "secrets"
|
COLLECTION_NAME = "secrets"
|
||||||
OBJECT_ID = "twitch_tokens"
|
TYPE = "twitch_token"
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def save(acceess_token: str, refresh_token: str):
|
async def save(user: str, acceess_token: str, refresh_token: str):
|
||||||
data = {"access_token": acceess_token, "refresh_token": refresh_token}
|
data = {"access_token": acceess_token, "refresh_token": refresh_token}
|
||||||
|
|
||||||
async with mongo_manager.connect() as client:
|
async with mongo_manager.connect() as client:
|
||||||
@@ -14,17 +18,20 @@ class TokenStorage:
|
|||||||
collection = db[TokenStorage.COLLECTION_NAME]
|
collection = db[TokenStorage.COLLECTION_NAME]
|
||||||
|
|
||||||
await collection.update_one(
|
await collection.update_one(
|
||||||
{"_id": TokenStorage.OBJECT_ID},
|
{"type": TokenStorage.TYPE, "twitch_login": user},
|
||||||
{"$set": data},
|
{"$set": data},
|
||||||
upsert=True
|
upsert=True
|
||||||
)
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def get() -> tuple[str, str]:
|
async def get(user: str) -> tuple[str, str]:
|
||||||
async with mongo_manager.connect() as client:
|
async with mongo_manager.connect() as client:
|
||||||
db = client.get_default_database()
|
db = client.get_default_database()
|
||||||
collection = db[TokenStorage.COLLECTION_NAME]
|
collection = db[TokenStorage.COLLECTION_NAME]
|
||||||
|
|
||||||
data = await collection.find_one({"_id": TokenStorage.OBJECT_ID})
|
data = await collection.find_one({"type": TokenStorage.TYPE, "twitch_login": user})
|
||||||
|
|
||||||
|
if data is None:
|
||||||
|
raise RuntimeError(f"Token for user {user} not found")
|
||||||
|
|
||||||
return data["access_token"], data["refresh_token"]
|
return data["access_token"], data["refresh_token"]
|
||||||
|
|||||||
@@ -1,17 +1,18 @@
|
|||||||
from asyncio import sleep, gather
|
from asyncio import sleep, gather, wait, FIRST_COMPLETED, create_task
|
||||||
import logging
|
import logging
|
||||||
from typing import NoReturn, Literal
|
from typing import NoReturn, Literal
|
||||||
|
|
||||||
from twitchAPI.eventsub.webhook import EventSubWebhook
|
from twitchAPI.eventsub.websocket import EventSubWebsocket
|
||||||
from twitchAPI.twitch import Twitch
|
from twitchAPI.twitch import Twitch
|
||||||
from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent, ChannelChatMessageEvent
|
from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent, ChannelChatMessageEvent, ChannelPointsCustomRewardRedemptionAddEvent
|
||||||
from twitchAPI.oauth import validate_token
|
from twitchAPI.oauth import validate_token
|
||||||
|
|
||||||
from core.config import config
|
from core.config import config
|
||||||
from repositories.streamers import StreamerConfigRepository, StreamerConfig
|
from repositories.streamers import StreamerConfigRepository, StreamerConfig
|
||||||
from modules.stream_notifications.tasks import on_stream_state_change, on_stream_state_change_with_check, on_message
|
from modules.stream_notifications.tasks import on_stream_state_change, on_stream_state_change_with_check, on_message, on_redemption_reward_add_task
|
||||||
from modules.stream_notifications.state import UpdateEvent, EventType
|
from modules.stream_notifications.state import UpdateEvent, EventType
|
||||||
from modules.stream_notifications.messages_proc import MessageEvent
|
from modules.stream_notifications.messages_proc import MessageEvent
|
||||||
|
from modules.stream_notifications.reward_redemption import RewardRedemption
|
||||||
from .authorize import authorize
|
from .authorize import authorize
|
||||||
|
|
||||||
|
|
||||||
@@ -21,8 +22,9 @@ logger = logging.getLogger(__name__)
|
|||||||
class TwitchService:
|
class TwitchService:
|
||||||
ONLINE_NOTIFICATION_DELAY = 15 * 60
|
ONLINE_NOTIFICATION_DELAY = 15 * 60
|
||||||
|
|
||||||
def __init__(self, twitch: Twitch):
|
def __init__(self, twitch: Twitch, streamer: StreamerConfig):
|
||||||
self.twitch = twitch
|
self.twitch = twitch
|
||||||
|
self.streamer = streamer
|
||||||
|
|
||||||
self.failed = False
|
self.failed = False
|
||||||
|
|
||||||
@@ -30,6 +32,7 @@ class TwitchService:
|
|||||||
await on_stream_state_change_with_check.kiq(
|
await on_stream_state_change_with_check.kiq(
|
||||||
UpdateEvent(
|
UpdateEvent(
|
||||||
broadcaster_user_id=event.event.broadcaster_user_id,
|
broadcaster_user_id=event.event.broadcaster_user_id,
|
||||||
|
broadcaster_user_login=event.event.broadcaster_user_login,
|
||||||
title=event.event.title,
|
title=event.event.title,
|
||||||
category_name=event.event.category_name
|
category_name=event.event.category_name
|
||||||
),
|
),
|
||||||
@@ -42,41 +45,20 @@ class TwitchService:
|
|||||||
EventType.STREAM_ONLINE,
|
EventType.STREAM_ONLINE,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def on_channel_points_custom_reward_redemption_add(
|
||||||
|
self,
|
||||||
|
event: ChannelPointsCustomRewardRedemptionAddEvent
|
||||||
|
):
|
||||||
|
await on_redemption_reward_add_task(
|
||||||
|
RewardRedemption.from_twitch_event(event)
|
||||||
|
)
|
||||||
|
|
||||||
async def on_message(self, event: ChannelChatMessageEvent):
|
async def on_message(self, event: ChannelChatMessageEvent):
|
||||||
await on_message.kiq(
|
await on_message.kiq(
|
||||||
MessageEvent.from_twitch_event(event)
|
MessageEvent.from_twitch_event(event)
|
||||||
)
|
)
|
||||||
|
|
||||||
async def subscribe_with_retry(
|
async def _clean_subs(self, method: str, streamer: StreamerConfig):
|
||||||
self,
|
|
||||||
method: Literal["listen_channel_update_v2"]
|
|
||||||
| Literal["listen_stream_online"]
|
|
||||||
| Literal["listen_channel_chat_message"],
|
|
||||||
eventsub: EventSubWebhook,
|
|
||||||
streamer: StreamerConfig,
|
|
||||||
retry: int = 10
|
|
||||||
):
|
|
||||||
|
|
||||||
try:
|
|
||||||
match method:
|
|
||||||
case "listen_channel_update_v2":
|
|
||||||
await eventsub.listen_channel_update_v2(str(streamer.twitch.id), self.on_channel_update)
|
|
||||||
case "listen_stream_online":
|
|
||||||
await eventsub.listen_stream_online(str(streamer.twitch.id), self.on_stream_online)
|
|
||||||
case "listen_channel_chat_message":
|
|
||||||
await eventsub.listen_channel_chat_message(
|
|
||||||
str(streamer.twitch.id),
|
|
||||||
str(config.TWITCH_ADMIN_USER_ID),
|
|
||||||
self.on_message
|
|
||||||
)
|
|
||||||
case _:
|
|
||||||
raise ValueError("Unknown method")
|
|
||||||
|
|
||||||
return
|
|
||||||
except Exception as e:
|
|
||||||
if retry <= 0:
|
|
||||||
raise e
|
|
||||||
|
|
||||||
match method:
|
match method:
|
||||||
case "listen_channel_update_v2":
|
case "listen_channel_update_v2":
|
||||||
sub_type = "channel.update"
|
sub_type = "channel.update"
|
||||||
@@ -84,6 +66,8 @@ class TwitchService:
|
|||||||
sub_type = "stream.online"
|
sub_type = "stream.online"
|
||||||
case "listen_channel_chat_message":
|
case "listen_channel_chat_message":
|
||||||
sub_type = "channel.chat.message"
|
sub_type = "channel.chat.message"
|
||||||
|
case "listen_channel_points_custom_reward_redemption_add":
|
||||||
|
sub_type = "channel.channel_points_custom_reward_redemption.add"
|
||||||
case _:
|
case _:
|
||||||
raise ValueError("Unknown method")
|
raise ValueError("Unknown method")
|
||||||
|
|
||||||
@@ -98,15 +82,53 @@ class TwitchService:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to delete subscription {sub.id}", exc_info=e)
|
logger.error(f"Failed to delete subscription {sub.id}", exc_info=e)
|
||||||
|
|
||||||
|
async def subscribe_with_retry(
|
||||||
|
self,
|
||||||
|
method: Literal["listen_channel_update_v2"]
|
||||||
|
| Literal["listen_stream_online"]
|
||||||
|
| Literal["listen_channel_chat_message"]
|
||||||
|
| Literal["listen_channel_points_custom_reward_redemption_add"],
|
||||||
|
eventsub: EventSubWebsocket,
|
||||||
|
streamer: StreamerConfig,
|
||||||
|
retry: int = 10
|
||||||
|
):
|
||||||
|
await self._clean_subs(method, streamer)
|
||||||
|
|
||||||
|
try:
|
||||||
|
match method:
|
||||||
|
case "listen_channel_update_v2":
|
||||||
|
await eventsub.listen_channel_update_v2(str(streamer.twitch.id), self.on_channel_update)
|
||||||
|
case "listen_stream_online":
|
||||||
|
await eventsub.listen_stream_online(str(streamer.twitch.id), self.on_stream_online)
|
||||||
|
case "listen_channel_chat_message":
|
||||||
|
await eventsub.listen_channel_chat_message(
|
||||||
|
str(streamer.twitch.id),
|
||||||
|
str(config.TWITCH_ADMIN_USER_ID),
|
||||||
|
self.on_message
|
||||||
|
)
|
||||||
|
case "listen_channel_points_custom_reward_redemption_add":
|
||||||
|
await eventsub.listen_channel_points_custom_reward_redemption_add(
|
||||||
|
str(streamer.twitch.id),
|
||||||
|
self.on_channel_points_custom_reward_redemption_add
|
||||||
|
)
|
||||||
|
case _:
|
||||||
|
raise ValueError("Unknown method")
|
||||||
|
|
||||||
|
return
|
||||||
|
except Exception as e:
|
||||||
|
if retry <= 0:
|
||||||
|
raise e
|
||||||
|
|
||||||
await sleep(1)
|
await sleep(1)
|
||||||
await self.subscribe_with_retry(method, eventsub, streamer, retry - 1)
|
await self.subscribe_with_retry(method, eventsub, streamer, retry - 1)
|
||||||
|
|
||||||
async def subscribe_to_streamer(self, eventsub: EventSubWebhook, streamer: StreamerConfig):
|
async def subscribe_to_streamer(self, eventsub: EventSubWebsocket, streamer: StreamerConfig):
|
||||||
logger.info(f"Subscribe to events for {streamer.twitch.name}")
|
logger.info(f"Subscribe to events for {streamer.twitch.name}")
|
||||||
await gather(
|
await gather(
|
||||||
self.subscribe_with_retry("listen_channel_update_v2", eventsub, streamer),
|
self.subscribe_with_retry("listen_channel_update_v2", eventsub, streamer),
|
||||||
self.subscribe_with_retry("listen_stream_online", eventsub, streamer),
|
self.subscribe_with_retry("listen_stream_online", eventsub, streamer),
|
||||||
self.subscribe_with_retry("listen_channel_chat_message", eventsub, streamer),
|
# self.subscribe_with_retry("listen_channel_chat_message", eventsub, streamer),
|
||||||
|
self.subscribe_with_retry("listen_channel_points_custom_reward_redemption_add", eventsub, streamer)
|
||||||
)
|
)
|
||||||
logger.info(f"Subscribe to events for {streamer.twitch.name} done")
|
logger.info(f"Subscribe to events for {streamer.twitch.name} done")
|
||||||
|
|
||||||
@@ -130,24 +152,13 @@ class TwitchService:
|
|||||||
logger.info("Token refreshed")
|
logger.info("Token refreshed")
|
||||||
|
|
||||||
async def run(self) -> NoReturn:
|
async def run(self) -> NoReturn:
|
||||||
eventsub = EventSubWebhook(
|
eventsub = EventSubWebsocket(twitch=self.twitch)
|
||||||
callback_url=config.TWITCH_CALLBACK_URL,
|
|
||||||
port=config.TWITCH_CALLBACK_PORT,
|
|
||||||
twitch=self.twitch,
|
|
||||||
message_deduplication_history_length=50
|
|
||||||
)
|
|
||||||
eventsub.wait_for_subscription_confirm_timeout = 60
|
|
||||||
eventsub.unsubscribe_on_stop = False
|
|
||||||
|
|
||||||
streamers = await StreamerConfigRepository.all()
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
eventsub.start()
|
eventsub.start()
|
||||||
|
|
||||||
logger.info("Subscribe to events...")
|
logger.info("Subscribe to events...")
|
||||||
await gather(
|
await self.subscribe_to_streamer(eventsub, self.streamer)
|
||||||
*[self.subscribe_to_streamer(eventsub, streamer) for streamer in streamers]
|
|
||||||
)
|
|
||||||
logger.info("Twitch service started")
|
logger.info("Twitch service started")
|
||||||
|
|
||||||
await self._check_token()
|
await self._check_token()
|
||||||
@@ -155,15 +166,31 @@ class TwitchService:
|
|||||||
logger.info("Twitch service stopping...")
|
logger.info("Twitch service stopping...")
|
||||||
await eventsub.stop()
|
await eventsub.stop()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def _start_for_streamer(cls, streamer: StreamerConfig):
|
||||||
|
try:
|
||||||
|
twith = await authorize(streamer.twitch.name, auto_refresh_auth=True)
|
||||||
|
await cls(twith, streamer).run()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Twitch service failed", exc_info=e)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def start(cls):
|
async def start(cls):
|
||||||
logger.info("Starting Twitch service...")
|
logger.info("Starting Twitch service...")
|
||||||
|
|
||||||
try:
|
streamers = await StreamerConfigRepository.all()
|
||||||
twith = await authorize(auto_refresh_auth=True)
|
|
||||||
await cls(twith).run()
|
await wait(
|
||||||
except Exception as e:
|
[
|
||||||
logger.error("Twitch service failed", exc_info=e)
|
create_task(cls._start_for_streamer(streamer))
|
||||||
|
for streamer in streamers
|
||||||
|
],
|
||||||
|
return_when=FIRST_COMPLETED
|
||||||
|
)
|
||||||
|
|
||||||
|
await gather(
|
||||||
|
*[cls._start_for_streamer(streamer) for streamer in streamers]
|
||||||
|
)
|
||||||
|
|
||||||
logger.info("Twitch service stopped")
|
logger.info("Twitch service stopped")
|
||||||
|
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ class StateWatcher:
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def get_twitch_state(cls, streamer_id: int) -> State | None:
|
async def get_twitch_state(cls, streamer_id: int) -> State | None:
|
||||||
twitch = await authorize()
|
twitch = await authorize("kurbezz")
|
||||||
|
|
||||||
stream = await first(
|
stream = await first(
|
||||||
twitch.get_streams(user_id=[str(streamer_id)])
|
twitch.get_streams(user_id=[str(streamer_id)])
|
||||||
|
|||||||
Reference in New Issue
Block a user