Compare commits

...

58 Commits

Author SHA1 Message Date
402042080e Fix 2025-02-19 00:43:38 +01:00
f6c7128cdc Fix 2025-02-19 00:39:14 +01:00
67a18ba3dd Update 2025-02-19 00:30:56 +01:00
b0f668e97f Add debug 2025-02-19 00:26:18 +01:00
4de3336230 Fix 2025-02-19 00:20:34 +01:00
4aff92a68d Fix 2025-02-19 00:15:36 +01:00
44441b615c Update 2025-02-19 00:14:36 +01:00
4d19ae568f Fix 2025-02-19 00:09:09 +01:00
ed3fdbcc05 Fix 2025-02-18 23:57:14 +01:00
bf43ffa8a3 Fix 2025-02-18 23:52:36 +01:00
a6a10d9110 Update 2025-02-18 23:49:16 +01:00
e86fa6635a Fix 2025-02-18 23:43:58 +01:00
60a3d903ae Update 2025-02-18 23:43:29 +01:00
aec3939816 Update 2025-02-18 23:42:57 +01:00
f9c1f7e77b Fix 2025-02-18 23:30:26 +01:00
8c1e6adc32 Fix 2025-02-18 23:21:01 +01:00
dd6be7abde Fix 2025-02-18 23:18:56 +01:00
462bd0d7dd Ignore pahangor 2025-02-18 23:16:41 +01:00
3eb49f8f4e Update 2025-02-18 23:09:32 +01:00
1f94675639 Update 2025-02-18 23:03:16 +01:00
e33d53d554 Update 2025-02-18 23:01:00 +01:00
ca63648374 Update 2025-02-18 22:49:57 +01:00
b96f651b40 Fix 2025-02-18 22:40:43 +01:00
f998a61e54 Fix 2025-02-18 22:39:51 +01:00
469275540f Fix 2025-02-18 22:34:35 +01:00
027cddd886 Fix 2025-02-18 22:33:26 +01:00
ca7f382d11 Update 2025-02-18 22:18:54 +01:00
1ea24db686 Update 2025-02-18 22:18:14 +01:00
ae5efdae73 Fix 2025-02-18 22:17:47 +01:00
5679405c7e Fix 2025-02-18 22:12:41 +01:00
99be4cbab2 Fix 2025-02-18 22:11:35 +01:00
a1d7833c1d Fix 2025-02-18 22:10:08 +01:00
ef9c88b86b Update 2025-02-18 22:09:05 +01:00
c21909138a Fix 2025-02-18 22:02:58 +01:00
d53892f9ea Fix 2025-02-18 22:01:17 +01:00
62956dc5f0 Use gemini 2025-02-18 21:58:15 +01:00
8ccd3debce Fix 2025-02-18 21:47:11 +01:00
c536bd45d7 Fix 2025-02-18 21:43:32 +01:00
fe5a39a40c Fix 2025-02-18 21:41:54 +01:00
9fc794d3ed Fix 2025-02-18 21:36:57 +01:00
20ba243272 Fix 2025-02-18 21:35:59 +01:00
5d33bd7ea9 Fix 2025-02-18 21:32:27 +01:00
c85d86ec92 Fix 2025-02-18 21:31:22 +01:00
8b58ac480a Fix 2025-02-18 21:24:23 +01:00
41ffb15e15 Update 2025-02-18 21:22:46 +01:00
20b34a0e69 Fix 2025-02-18 20:54:14 +01:00
4eac5ecd11 Fix 2025-02-18 20:51:11 +01:00
113b84c837 Update 2025-02-18 20:48:57 +01:00
4ff7d1a1d7 Fix 2025-02-18 20:45:24 +01:00
f8f923bfb4 Update 2025-02-18 20:40:03 +01:00
2e41a08bd9 Update 2025-02-18 20:33:04 +01:00
2ef0cc06c2 Fix 2025-02-18 20:25:39 +01:00
1c57392a44 Add debug 2025-02-18 20:20:39 +01:00
11d1142346 Fix channel_points_custom_reward_id 2025-02-18 20:15:40 +01:00
bb2ed88736 Update 2025-02-18 20:11:49 +01:00
3cd6b9ebe3 Test message processing 2025-02-18 20:05:51 +01:00
cc45a437d9 Add debug 2025-02-18 19:26:51 +01:00
c2c8889590 Add listen_channel_chat_message 2025-02-18 19:01:28 +01:00
7 changed files with 1318 additions and 871 deletions

1862
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -15,11 +15,11 @@ pydantic-settings = "^2.7.1"
httpx = "^0.28.1"
icalendar = "^6.1.0"
pytz = "^2024.2"
mongojet = "^0.2.5"
taskiq = "^0.11.10"
mongojet = "^0.2.7"
taskiq = "^0.11.11"
taskiq-redis = "^1.0.2"
redis = {extras = ["hiredis"], version = "^5.2.1"}
fastapi = "^0.115.6"
fastapi = "^0.115.8"
authx = "^1.4.1"
httpx-oauth = "^0.16.1"
uvicorn = {extras = ["standard"], version = "^0.34.0"}

View File

@@ -33,6 +33,8 @@ class Config(BaseModel):
SECRET_KEY: str
OPENAI_API_KEY: str
def get_config() -> Config:
settings = Settings() # type: ignore

View File

@@ -0,0 +1,247 @@
from enum import StrEnum
import logging
from pydantic import BaseModel
from twitchAPI.object.eventsub import ChannelChatMessageEvent
from httpx import AsyncClient
from core.config import config
from .twitch.authorize import authorize
logger = logging.getLogger(__name__)
class ChatMessage(BaseModel):
text: str
class ChatMessageReplyMetadata(BaseModel):
parent_message_id: str
parent_message_body: str
parent_user_id: str
parent_user_name: str
parent_user_login: str
thread_message_id: str
thread_user_id: str
thread_user_name: str
thread_user_login: str
class MessageType(StrEnum):
TEXT = "text"
CHANNEL_POINTS_HIGHLIGHTED = "channel_points_highlighted"
CHANNEL_POINTS_SUB_ONLY = "channel_points_sub_only"
USER_INTRO = "user_intro"
class MessageEvent(BaseModel):
broadcaster_user_id: str
broadcaster_user_name: str
broadcaster_user_login: str
chatter_user_id: str
chatter_user_name: str
chatter_user_login: str
message_id: str
message: ChatMessage
message_type: MessageType
color: str
reply: ChatMessageReplyMetadata | None
channel_points_custom_reward_id: str | None
@classmethod
def from_twitch_event(cls, event: ChannelChatMessageEvent):
return cls(
broadcaster_user_id=event.event.broadcaster_user_id,
broadcaster_user_name=event.event.broadcaster_user_name,
broadcaster_user_login=event.event.broadcaster_user_login,
chatter_user_id=event.event.chatter_user_id,
chatter_user_name=event.event.chatter_user_name,
chatter_user_login=event.event.chatter_user_login,
message_id=event.event.message_id,
message=ChatMessage(text=event.event.message.text),
message_type=MessageType(event.event.message_type),
color=event.event.color,
reply=ChatMessageReplyMetadata(
parent_message_id=event.event.reply.parent_message_id,
parent_message_body=event.event.reply.parent_message_body,
parent_user_id=event.event.reply.parent_user_id,
parent_user_name=event.event.reply.parent_user_name,
parent_user_login=event.event.reply.parent_user_login,
thread_message_id=event.event.reply.thread_message_id,
thread_user_id=event.event.reply.thread_user_id,
thread_user_name=event.event.reply.thread_user_name,
thread_user_login=event.event.reply.thread_user_login
) if event.event.reply else None,
channel_points_custom_reward_id=event.event.channel_points_custom_reward_id
)
async def get_completion(messages: list[dict]) -> str:
logger.info(f"Getting completion for message: {messages}")
data_messages = [
*(
{
"role": "assistant" if message["user"] == "kurbezz" else "user",
"content": message["text"]
}
for message in messages
),
{
"role": "system",
"content": "Don't use markdown! Don't use blocked words on Twitch! Make answers short and clear!"
}
]
async with AsyncClient() as client:
response = await client.post(
"https://openrouter.ai/api/v1/chat/completions",
headers={
"Authorization": f"Bearer {config.OPENAI_API_KEY}",
"content-type": "application/json"
},
json={
"model": "google/gemini-2.0-flash-thinking-exp:free",
"messages": data_messages
}
)
data = response.json()
logger.info(f"Got completion: {data}")
return data["choices"][0]["message"]["content"]
class MessagesProc:
IGNORED_USER_LOGINS = [
"jeetbot",
"kurbezz",
]
MESSAGE_LIMIT = 1000
MESSAGE_HISTORY = []
@classmethod
def update_message_history(cls, id: str, text: str, user: str, thread_id: str | None = None):
cls.MESSAGE_HISTORY.append({
"id": id,
"text": text,
"user": user,
"thread_id": thread_id
})
if len(cls.MESSAGE_HISTORY) > cls.MESSAGE_LIMIT:
cls.MESSAGE_HISTORY = cls.MESSAGE_HISTORY[-cls.MESSAGE_LIMIT:]
@classmethod
def get_message_history_with_thread(cls, message_id: str, thread_id: str | None = None) -> list[dict]:
logger.info(f"HISTORY: {cls.MESSAGE_HISTORY}")
return [m for m in cls.MESSAGE_HISTORY if m["thread_id"] == thread_id] + \
[m for m in cls.MESSAGE_HISTORY if m["id"] == message_id]
@classmethod
async def on_message(cls, event: MessageEvent):
logging.info(f"Received message: {event}")
cls.update_message_history(
id=event.message_id,
text=event.message.text,
user=event.chatter_user_login,
thread_id=event.reply.thread_message_id if event.reply is not None else None
)
if event.chatter_user_name == "pahangor":
return
twitch = await authorize()
if "гойда" in event.message.text.lower():
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"ГООООООООООООООООООООООООООООООООООООООООООООООЙДА!",
reply_parent_message_id=event.message_id
)
if "lasqexx" in event.chatter_user_login:
pass # Todo: Здароу
if event.message.text.lower().startswith("!ai"):
try:
messages = cls.get_message_history_with_thread(
event.message_id,
thread_id=event.reply.thread_message_id if event.reply is not None else None
)
completion = await get_completion(messages)
max_length = 255
completion_parts = [completion[i:i + max_length] for i in range(0, len(completion), max_length)]
for part in completion_parts:
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
part,
reply_parent_message_id=event.message_id
)
except Exception as e:
logger.error("Failed to get completion: {}", e, exc_info=True)
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"Ошибка!",
reply_parent_message_id=event.message_id
)
if event.chatter_user_login in cls.IGNORED_USER_LOGINS:
return
if ("kurbezz" in event.message.text.lower() or \
"курбез" in event.message.text.lower() or \
"булат" in event.message.text.lower()):
try:
messages = cls.get_message_history_with_thread(
event.message_id,
thread_id=event.reply.thread_message_id if event.reply is not None else None
)
completion = await get_completion(messages)
max_length = 255
completion_parts = [completion[i:i + max_length] for i in range(0, len(completion), max_length)]
for part in completion_parts:
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
part,
reply_parent_message_id=event.message_id
)
except Exception as e:
logger.error(f"Failed to get completion: {e}")
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"Пошел нахуй!",
reply_parent_message_id=event.message_id
)

View File

@@ -7,6 +7,7 @@ from repositories.streamers import StreamerConfigRepository
from .state import State, UpdateEvent, EventType
from .watcher import StateWatcher
from .messages_proc import MessageEvent, MessagesProc
from .twitch.authorize import authorize
@@ -73,3 +74,11 @@ async def check_streams_states():
EventType.UNKNOWN,
state
)
@broker.task(
"stream_notifications.on_message",
retry_on_error=True
)
async def on_message(event: MessageEvent):
await MessagesProc.on_message(event)

View File

@@ -8,7 +8,12 @@ from .token_storage import TokenStorage
SCOPES = [
AuthScope.CHAT_READ,
AuthScope.CHAT_EDIT,
AuthScope.CHANNEL_BOT,
AuthScope.USER_BOT,
AuthScope.USER_READ_CHAT,
AuthScope.USER_WRITE_CHAT,
]

View File

@@ -4,13 +4,14 @@ from typing import NoReturn, Literal
from twitchAPI.eventsub.webhook import EventSubWebhook
from twitchAPI.twitch import Twitch
from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent
from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent, ChannelChatMessageEvent
from twitchAPI.oauth import validate_token
from core.config import config
from repositories.streamers import StreamerConfigRepository, StreamerConfig
from modules.stream_notifications.tasks import on_stream_state_change, on_stream_state_change_with_check
from modules.stream_notifications.tasks import on_stream_state_change, on_stream_state_change_with_check, on_message
from modules.stream_notifications.state import UpdateEvent, EventType
from modules.stream_notifications.messages_proc import MessageEvent
from .authorize import authorize
@@ -41,20 +42,34 @@ class TwitchService:
EventType.STREAM_ONLINE,
)
async def on_message(self, event: ChannelChatMessageEvent):
await on_message.kiq(
MessageEvent.from_twitch_event(event)
)
async def subscribe_with_retry(
self,
method: Literal["listen_channel_update_v2"] | Literal["listen_stream_online"],
method: Literal["listen_channel_update_v2"]
| Literal["listen_stream_online"]
| Literal["listen_channel_chat_message"],
eventsub: EventSubWebhook,
streamer: StreamerConfig,
retry: int = 10
):
try:
if method == "listen_channel_update_v2":
match method:
case "listen_channel_update_v2":
await eventsub.listen_channel_update_v2(str(streamer.twitch.id), self.on_channel_update)
elif method == "listen_stream_online":
case "listen_stream_online":
await eventsub.listen_stream_online(str(streamer.twitch.id), self.on_stream_online)
else:
case "listen_channel_chat_message":
await eventsub.listen_channel_chat_message(
str(streamer.twitch.id),
str(config.TWITCH_ADMIN_USER_ID),
self.on_message
)
case _:
raise ValueError("Unknown method")
return
@@ -62,11 +77,14 @@ class TwitchService:
if retry <= 0:
raise e
if method == "listen_channel_update_v2":
match method:
case "listen_channel_update_v2":
sub_type = "channel.update"
elif method == "listen_stream_online":
case "listen_stream_online":
sub_type = "stream.online"
else:
case "listen_channel_chat_message":
sub_type = "channel.chat.message"
case _:
raise ValueError("Unknown method")
subs = await self.twitch.get_eventsub_subscriptions(
@@ -75,7 +93,10 @@ class TwitchService:
for sub in subs.data:
if sub.type == sub_type:
try:
await self.twitch.delete_eventsub_subscription(sub.id)
except Exception as e:
logger.error(f"Failed to delete subscription {sub.id}", exc_info=e)
await sleep(1)
await self.subscribe_with_retry(method, eventsub, streamer, retry - 1)
@@ -84,7 +105,8 @@ class TwitchService:
logger.info(f"Subscribe to events for {streamer.twitch.name}")
await gather(
self.subscribe_with_retry("listen_channel_update_v2", eventsub, streamer),
self.subscribe_with_retry("listen_stream_online", eventsub, streamer)
self.subscribe_with_retry("listen_stream_online", eventsub, streamer),
self.subscribe_with_retry("listen_channel_chat_message", eventsub, streamer),
)
logger.info(f"Subscribe to events for {streamer.twitch.name} done")