New structure

This commit is contained in:
2025-04-21 13:50:51 +02:00
parent 1eba79cc5a
commit abe0cbb173
45 changed files with 10 additions and 50 deletions

View File

@@ -0,0 +1,12 @@
from enum import StrEnum
from pydantic import BaseModel
class OAuthProvider(StrEnum):
TWITCH = "twitch"
class OAuthData(BaseModel):
id: str
email: str | None

View File

@@ -0,0 +1,41 @@
from pydantic import BaseModel
class TwitchConfig(BaseModel):
id: int
name: str
class NotificationsConfig(BaseModel):
start_stream: str
change_category: str | None = None
redemption_reward: str | None = None
class GamesListConfig(BaseModel):
channel_id: int
message_id: int
class DiscordConfig(BaseModel):
guild_id: int
notifications_channel_id: int
games_list: GamesListConfig | None = None
roles: dict[str, int] | None = None
class TelegramConfig(BaseModel):
notifications_channel_id: int
class IntegrationsConfig(BaseModel):
discord: DiscordConfig | None = None
telegram: TelegramConfig | None = None
class StreamerConfig(BaseModel):
twitch: TwitchConfig
notifications: NotificationsConfig
integrations: IntegrationsConfig
chatbot_in_chats: list[int] | None = None

View File

@@ -0,0 +1,17 @@
from pydantic import BaseModel
from domain.auth import OAuthProvider, OAuthData
class User(BaseModel):
id: str
oauths: dict[OAuthProvider, OAuthData]
is_admin: bool
class CreateUser(BaseModel):
oauths: dict[OAuthProvider, OAuthData]
is_admin: bool = False

View File

@@ -0,0 +1,18 @@
import abc
from contextlib import asynccontextmanager
from core.mongo import mongo_manager
class BaseRepository(abc.ABC):
COLLECTION_NAME: str
@classmethod
@asynccontextmanager
async def connect(cls):
async with mongo_manager.connect() as client:
db = client.get_default_database()
collection = db[cls.COLLECTION_NAME]
yield collection

View File

@@ -0,0 +1,45 @@
from domain.streamers import StreamerConfig
from .base import BaseRepository
class StreamerConfigRepository(BaseRepository):
COLLECTION_NAME = "streamers"
@classmethod
async def get_by_twitch_id(cls, twitch_id: int) -> StreamerConfig:
async with cls.connect() as collection:
doc = await collection.find_one({"twitch.id": twitch_id})
if doc is None:
raise ValueError(f"Streamer with twitch id {twitch_id} not found")
return StreamerConfig(**doc)
@classmethod
async def find_one(
cls,
integration_discord_guild_id: int | None = None,
integration_discord_games_list_channel_id: int | None = None,
) -> StreamerConfig | None:
filters = {}
if integration_discord_guild_id is not None:
filters["integrations.discord.guild_id"] = integration_discord_guild_id
if integration_discord_games_list_channel_id is not None:
filters[
"integrations.discord.games_list.channel_id"
] = integration_discord_games_list_channel_id
async with cls.connect() as collection:
doc = await collection.find_one(filters)
if doc is None:
return None
return StreamerConfig(**doc)
@classmethod
async def all(cls) -> list[StreamerConfig]:
async with cls.connect() as collection:
cursor = await collection.find()
return [StreamerConfig(**doc) async for doc in cursor]

View File

@@ -0,0 +1,44 @@
from domain.users import CreateUser, User
from .base import BaseRepository
class UserRepository(BaseRepository):
COLLECTION_NAME = "users"
@classmethod
async def get(cls, user_id: str) -> User:
async with cls.connect() as collection:
user = await collection.find_one({"_id": user_id})
return User(
id=str(user["_id"]),
oauths=user["oauths"],
is_admin=user["is_admin"],
)
@classmethod
async def get_or_create_user(cls, new_user: CreateUser) -> User:
filter_data = {}
for provider, data in new_user.oauths.items():
filter_data[f"oauths.{provider}.id"] = data.id
async with cls.connect() as collection:
await collection.update_one(
filter_data,
{
"$setOnInsert": {
**new_user.model_dump(),
}
},
upsert=True,
)
user = await collection.find_one(filter_data)
return User(
id=str(user["_id"]),
oauths=user["oauths"],
is_admin=user["is_admin"],
)

View File

@@ -0,0 +1,7 @@
from .discord import start_discord_sevice
start = start_discord_sevice
__all__ = ["start"]

View File

@@ -0,0 +1,248 @@
import logging
import discord
from discord.abc import Messageable
from discord import Object
from discord import app_commands
from modules.games_list.games_list import GameList, GameItem
from core.config import config
from repositories.streamers import StreamerConfigRepository
logger = logging.getLogger(__name__)
async def get_game_list_channel_to_message_map() -> dict[int, int]:
result = {}
streamers = await StreamerConfigRepository.all()
for streamer in streamers:
if (integration := streamer.integrations.discord) is None:
continue
if (games_list := integration.games_list) is None:
continue
if games_list.channel_id is None or games_list.message_id is None:
continue
result[games_list.channel_id] = games_list.message_id
return result
class DiscordClient(discord.Client):
def __init__(self) -> None:
intents = discord.Intents.default()
intents.message_content = True
super().__init__(intents=intents)
self.tree = app_commands.CommandTree(self)
async def setup_hook(self):
streamers = await StreamerConfigRepository.all()
for streamer in streamers:
if (integration := streamer.integrations.discord) is None:
continue
if integration.games_list is None:
continue
self.tree.copy_global_to(guild=Object(id=integration.guild_id))
await self.tree.sync(guild=Object(id=integration.guild_id))
async def on_ready(self):
await self.change_presence(
activity=discord.Game(config.DISCORD_BOT_ACTIVITY),
status=discord.Status.online,
)
client = DiscordClient()
@client.tree.command(description="Добавление игры")
@app_commands.describe(
category="Раздел",
customer="Кто заказал",
game="Игра",
date="Дата заказа"
)
@app_commands.choices(
category=[
app_commands.Choice(name="Заказ за баллы", value="points"),
app_commands.Choice(name="Проплачены", value="paids"),
app_commands.Choice(name="Подарки", value="gifts"),
],
)
async def add(
interaction: discord.Interaction,
category: str,
customer: str,
game: str,
date: str | None = None
):
if not isinstance(interaction.channel, Messageable):
await interaction.response.send_message(
"Interation not allowed in this channel!", ephemeral=True
)
return
streamer = await StreamerConfigRepository.find_one(
integration_discord_guild_id=interaction.guild_id,
integration_discord_games_list_channel_id=interaction.channel_id
)
if streamer is None:
await interaction.response.send_message(
"Interation not allowed in this channel!", ephemeral=True
)
return
if streamer.integrations.discord is None or streamer.integrations.discord.games_list is None:
await interaction.response.send_message(
"Need setup!", ephemeral=True
)
return
game_list = await GameList.get(streamer.twitch.id)
if game_list is None:
await interaction.response.send_message(
"Game list not found!", ephemeral=True
)
return
game_list.add_game(category, GameItem(name=game, customer=customer, date=date))
game_list_message = await interaction.channel.fetch_message(
streamer.integrations.discord.games_list.message_id
)
await game_list_message.edit(content=str(game_list))
await game_list.save()
await interaction.response.send_message("Игра добавлена!", ephemeral=True)
async def game_list_autocomplete(
interaction: discord.Interaction,
current: str,
) -> list[app_commands.Choice[str]]:
streamer = await StreamerConfigRepository.find_one(
integration_discord_guild_id=interaction.guild_id,
integration_discord_games_list_channel_id=interaction.channel_id
)
if streamer is None:
return []
game_list = await GameList.get(streamer.twitch.id)
if game_list is None:
return []
return game_list.get_choices(current)
@client.tree.command(description="Удаление игры")
@app_commands.describe(game="Игра")
@app_commands.autocomplete(game=game_list_autocomplete)
async def delete(interaction: discord.Interaction, game: str):
if not isinstance(interaction.channel, Messageable):
await interaction.response.send_message(
"Interation not allowed in this channel!", ephemeral=True
)
return
streamer = await StreamerConfigRepository.find_one(
integration_discord_guild_id=interaction.guild_id,
integration_discord_games_list_channel_id=interaction.channel_id
)
if streamer is None:
await interaction.response.send_message(
"Interation not allowed in this channel!", ephemeral=True
)
return
if streamer.integrations.discord is None or streamer.integrations.discord.games_list is None:
await interaction.response.send_message(
"Need setup!", ephemeral=True
)
return
game_list = await GameList.get(streamer.twitch.id)
if game_list is None:
await interaction.response.send_message(
"Game list not found!", ephemeral=True
)
return
game_list.delete_game(game)
game_list_message = await interaction.channel.fetch_message(
streamer.integrations.discord.games_list.message_id
)
await game_list_message.edit(content=str(game_list))
await game_list.save()
await interaction.response.send_message("Игра удалена!", ephemeral=True)
@client.tree.command(description="Замена игры")
@app_commands.describe(
game="Старая игра",
new="Новая игра"
)
@app_commands.autocomplete(game=game_list_autocomplete)
async def replace(interaction: discord.Interaction, game: str, new: str):
if not isinstance(interaction.channel, Messageable):
await interaction.response.send_message(
"Interation not allowed in this channel!", ephemeral=True
)
return
streamer = await StreamerConfigRepository.find_one(
integration_discord_guild_id=interaction.guild_id,
integration_discord_games_list_channel_id=interaction.channel_id
)
if streamer is None:
await interaction.response.send_message(
"Interation not allowed in this channel!", ephemeral=True
)
return
if streamer.integrations.discord is None or streamer.integrations.discord.games_list is None:
await interaction.response.send_message(
"Need setup!", ephemeral=True
)
return
game_list = await GameList.get(streamer.twitch.id)
if game_list is None:
await interaction.response.send_message(
"Game list not found!", ephemeral=True
)
return
game_list.replace_game(game, new)
game_list_message = await interaction.channel.fetch_message(
streamer.integrations.discord.games_list.message_id
)
await game_list_message.edit(content=str(game_list))
await game_list.save()
await interaction.response.send_message("Игра заменена!", ephemeral=True)
async def start_discord_sevice():
logger.info("Starting Discord service...")
await client.start(config.DISCORD_BOT_TOKEN)

View File

@@ -0,0 +1,115 @@
from typing import Self
from datetime import datetime
from discord import app_commands
from pydantic import BaseModel
from core.mongo import mongo_manager
class GameItem(BaseModel):
name: str
customer: str
date: str | None
def __str__(self) -> str:
if self.date is not None:
return f"* {self.name} ({self.customer}) | {self.date}"
else:
return f"* {self.name} ({self.customer})"
class Category(BaseModel):
name: str
games: list[GameItem]
class GameList:
COLLECTION_NAME = "games_list_data"
CATEGORY_MAP = {
"points": "Заказанные игры (за 12к)",
"paids": "Проплачены 🤑 ",
"gifts": "Подарки",
}
def __init__(self, twitch_id: int, data: list[Category]):
self.twitch_id = twitch_id
self.data = data
@classmethod
async def get(cls, twitch_id: int) -> Self | None:
async with mongo_manager.connect() as client:
db = client.get_default_database()
collection = db[cls.COLLECTION_NAME]
doc = await collection.find_one({"twitch_id": twitch_id})
if doc is None:
return None
return cls(
twitch_id,
[
Category(**category)
for category in doc["data"]
]
)
async def save(self):
async with mongo_manager.connect() as client:
db = client.get_default_database()
collection = db[self.COLLECTION_NAME]
await collection.replace_one(
{"twitch_id": self.twitch_id},
{
"twitch_id": self.twitch_id,
"data": [category.model_dump() for category in self.data]
},
upsert=True
)
def add_game(self, category: str, game_item: GameItem):
_category = self.CATEGORY_MAP.get(category)
if game_item.date is None:
game_item.date = datetime.now().strftime("%d.%m.%Y")
for category_item in self.data:
if category_item.name == _category:
category_item.games.append(game_item)
def replace_game(self, game_name: str, new_game_name: str):
for category in self.data:
for game in category.games:
if game.name.startswith(game_name):
game.name = new_game_name
def delete_game(self, game_name: str):
for category in self.data:
for game in category.games:
if game.name.startswith(game_name):
category.games.remove(game)
def get_choices(self, query: str) -> list[app_commands.Choice[str]]:
choices = []
for category in self.data:
for game in category.games:
if query.lower() in game.name.lower():
choices.append(app_commands.Choice(name=game.name, value=game.name))
return choices[:25]
def __str__(self) -> str:
result = ""
for category in self.data:
result += f"{category.name}:\n"
for game in category.games:
result += f"{game}\n"
result += "\n\n"
return result

View File

@@ -0,0 +1,62 @@
from datetime import datetime
import logging
from .discord_events import DiscordEvent, CreateDiscordEvent, RecurrenceRule
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def is_repeated(start: datetime, target: datetime, rule: RecurrenceRule) -> bool:
start_utc = start.astimezone(datetime.now().astimezone().tzinfo)
target_utc = target.astimezone(datetime.now().astimezone().tzinfo)
return start_utc.time() == target_utc.time() and target.weekday() in rule.by_weekday
def compare(create_event: CreateDiscordEvent, event: DiscordEvent) -> bool:
if create_event.name != event.name:
logger.debug(f"Name is different: {create_event.name} != {event.name}")
return False
if create_event.description != event.description:
logger.debug(f"Description is different: {create_event.description} != {event.description}")
return False
if create_event.recurrence_rule is not None:
if event.recurrence_rule is None:
logger.debug(f"Recurrence rule is different: {create_event.recurrence_rule} != {event.recurrence_rule}")
return False
ce_rr = create_event.recurrence_rule
e_rr = event.recurrence_rule
if ce_rr.by_weekday != e_rr.by_weekday:
logger.debug(f"Recurrence rule is different: {ce_rr.by_weekday} != {e_rr.by_weekday}")
return False
if ce_rr.interval != e_rr.interval:
logger.debug(f"Recurrence rule is different: {ce_rr.interval} != {e_rr.interval}")
return False
if ce_rr.frequency != e_rr.frequency:
logger.debug(f"Recurrence rule is different: {ce_rr.frequency} != {e_rr.frequency}")
return False
if not is_repeated(ce_rr.start, e_rr.start, ce_rr):
logger.debug(f"Recurrence rule is different: {ce_rr.start} != {e_rr.start}")
return False
else:
if event.recurrence_rule is not None:
logger.debug(f"Recurrence rule is different: {create_event.recurrence_rule} != {event.recurrence_rule}")
return False
if create_event.scheduled_start_time != event.scheduled_start_time:
if create_event.recurrence_rule is None or not is_repeated(create_event.scheduled_start_time, event.scheduled_start_time, create_event.recurrence_rule):
logger.debug(f"Scheduled start time is different: {create_event.scheduled_start_time} != {event.scheduled_start_time}")
return False
if create_event.scheduled_end_time != event.scheduled_end_time:
if create_event.recurrence_rule is None or not is_repeated(create_event.scheduled_end_time, event.scheduled_end_time, create_event.recurrence_rule):
logger.debug(f"Scheduled end time is different: {create_event.scheduled_end_time} != {event.scheduled_end_time}")
return False
return True

View File

@@ -0,0 +1,173 @@
from typing import Self
from datetime import datetime, timedelta
import logging
from httpx import AsyncClient
from pydantic import BaseModel, field_serializer, SerializationInfo
from core.config import config
from .twitch_events import TwitchEvent
logger = logging.getLogger(__name__)
class RecurrenceRule(BaseModel):
start: datetime
by_weekday: list[int]
interval: int
frequency: int
@field_serializer("start", when_used="always")
def serialize_datetime(self, value: datetime, info: SerializationInfo) -> str:
return value.isoformat()
def next_date(self, start: datetime) -> datetime:
next_date = start
while True:
next_date += timedelta(days=1)
if next_date <= datetime.now(start.tzinfo):
continue
if next_date.weekday() in self.by_weekday:
return next_date
class DiscordEvent(BaseModel):
id: str
name: str
description: str
scheduled_start_time: datetime
scheduled_end_time: datetime
recurrence_rule: RecurrenceRule | None
creator_id: str
async def get_discord_events(guild_id: int) -> list[DiscordEvent]:
async with AsyncClient() as client:
response = await client.get(
f"https://discord.com/api/v10/guilds/{guild_id}/scheduled-events",
headers={"Authorization": f"Bot {config.DISCORD_BOT_TOKEN}"}
)
response.raise_for_status()
events = [DiscordEvent(**event) for event in response.json()]
return [event for event in events if event.creator_id == config.DISCORD_BOT_ID]
async def delete_discord_event(guild_id: int, event_id: str):
async with AsyncClient() as client:
response = await client.delete(
f"https://discord.com/api/v10/guilds/{guild_id}/scheduled-events/{event_id}",
headers={"Authorization": f"Bot {config.DISCORD_BOT_TOKEN}"}
)
response.raise_for_status()
return response.json()
class EntityMetadata(BaseModel):
location: str
class CreateDiscordEvent(BaseModel):
name: str
description: str
privacy_level: int
entity_type: int
entity_metadata: EntityMetadata
scheduled_start_time: datetime
scheduled_end_time: datetime
recurrence_rule: RecurrenceRule | None
@field_serializer("scheduled_start_time", "scheduled_end_time", when_used="always")
def serialize_datetime(self, value: datetime, info: SerializationInfo) -> str:
return value.isoformat()
@classmethod
def parse_from_twitch_event(cls, event: TwitchEvent, channel_name: str) -> Self:
if event.categories:
name = f"{event.name} | {event.categories}"
else:
name = event.name
if event.repeat_rule:
recurrence_rule = RecurrenceRule(
start=event.start_at,
by_weekday=[event.repeat_rule.weekday.get_number()],
interval=1,
frequency=2
)
else:
recurrence_rule = None
return cls(
name=name,
description=f"{event.description or ''}\n\n\n\n#{event.uid}",
privacy_level=2,
entity_type=3,
entity_metadata=EntityMetadata(location=f"https://twitch.tv/{channel_name}"),
scheduled_start_time=event.start_at,
scheduled_end_time=event.end_at,
recurrence_rule=recurrence_rule
)
async def create_discord_event(guild_id: int, event: CreateDiscordEvent):
async with AsyncClient() as client:
response = await client.post(
f"https://discord.com/api/v10/guilds/{guild_id}/scheduled-events",
json=event.model_dump(),
headers={
"Authorization": f"Bot {config.DISCORD_BOT_TOKEN}",
"Content-Type": "application/json"
}
)
if response.status_code == 400:
raise ValueError({
"status_code": response.status_code,
"response": response.json(),
"event": event.model_dump()
})
return response.json()
class UpdateDiscordEvent(BaseModel):
name: str
description: str
scheduled_start_time: datetime
scheduled_end_time: datetime
recurrence_rule: RecurrenceRule | None
@field_serializer("scheduled_start_time", "scheduled_end_time", when_used="always")
def serialize_datetime(self, value: datetime, info: SerializationInfo) -> str:
return value.isoformat()
async def edit_discord_event(guild_id: int, event_id: str, event: UpdateDiscordEvent):
async with AsyncClient() as client:
response = await client.patch(
f"https://discord.com/api/v10/guilds/{guild_id}/scheduled-events/{event_id}",
json=event.model_dump(),
headers={
"Authorization": f"Bot {config.DISCORD_BOT_TOKEN}",
"Content-Type": "application/json"
}
)
if response.status_code == 400:
raise ValueError({
"status_code": response.status_code,
"response": response.json(),
"event": event.model_dump()
})
return response.json()

View File

@@ -0,0 +1,107 @@
import logging
from datetime import datetime
from domain.streamers import TwitchConfig
from .twitch_events import get_twitch_events, TwitchEvent
from .discord_events import (
get_discord_events, DiscordEvent,
delete_discord_event,
create_discord_event, CreateDiscordEvent,
edit_discord_event, UpdateDiscordEvent
)
from .comparators import compare
logger = logging.getLogger(__name__)
async def add_events(
guild_id: int,
twitch_channel_name: str,
twitch_events: list[tuple[str, TwitchEvent]],
discord_events: list[tuple[str, DiscordEvent]]
):
discord_events_ids = [event[0] for event in discord_events]
for (uid, event) in twitch_events:
if uid in discord_events_ids:
continue
if event.start_at <= datetime.now(event.start_at.tzinfo) and event.repeat_rule is None:
continue
create_event = CreateDiscordEvent.parse_from_twitch_event(event, twitch_channel_name)
if create_event.recurrence_rule is not None:
duration = create_event.scheduled_end_time - create_event.scheduled_start_time
while create_event.scheduled_start_time <= datetime.now(create_event.scheduled_start_time.tzinfo):
create_event.scheduled_start_time = create_event.recurrence_rule.next_date(create_event.scheduled_start_time)
create_event.scheduled_end_time = create_event.scheduled_start_time + duration
await create_discord_event(guild_id, create_event)
async def remove_events(
guild_id: int,
twith_events: list[tuple[str, TwitchEvent]],
discord_events: list[tuple[str, DiscordEvent]]
):
twith_events_ids = [event[0] for event in twith_events]
for (uid, event) in discord_events:
if uid not in twith_events_ids:
await delete_discord_event(guild_id, uid)
async def edit_events(
guild_id: int,
twitch_channel_name: str,
twith_events: list[tuple[str, TwitchEvent]],
discord_events: list[tuple[str, DiscordEvent]]
):
for (uid, twitch_event) in twith_events:
for (discord_id, discord_event) in discord_events:
if uid != discord_id:
continue
create_event = CreateDiscordEvent.parse_from_twitch_event(twitch_event, twitch_channel_name)
if compare(create_event, discord_event):
continue
update_event = UpdateDiscordEvent(
name=create_event.name,
description=create_event.description,
scheduled_start_time=create_event.scheduled_start_time,
scheduled_end_time=create_event.scheduled_end_time,
recurrence_rule=create_event.recurrence_rule
)
if update_event.recurrence_rule is not None:
duration = update_event.scheduled_end_time - update_event.scheduled_start_time
update_event.scheduled_start_time = update_event.recurrence_rule.next_date(update_event.scheduled_start_time)
update_event.scheduled_end_time = update_event.scheduled_start_time + duration
update_event.recurrence_rule.start = update_event.scheduled_start_time
await edit_discord_event(guild_id, discord_event.id, update_event)
async def syncronize(twitch: TwitchConfig, discord_guild_id: int):
logger.info(f"Syncronizing events for {twitch.name}")
twitch_events = await get_twitch_events(str(twitch.id))
discord_events = await get_discord_events(discord_guild_id)
twitch_events_with_id = [(event.uid, event) for event in twitch_events]
discord_events_with_id = [
(event.description.rsplit("#")[1], event)
for event in discord_events
]
await add_events(discord_guild_id, twitch.name, twitch_events_with_id, discord_events_with_id)
await remove_events(discord_guild_id, twitch_events_with_id, discord_events_with_id)
await edit_events(discord_guild_id, twitch.name, twitch_events_with_id, discord_events_with_id)

View File

@@ -0,0 +1,24 @@
from core.broker import broker
from repositories.streamers import StreamerConfigRepository
from .synchronizer import syncronize
@broker.task("scheduler_sync.syncronize_task")
async def syncronize_task(twitch_id: int):
streamer = await StreamerConfigRepository.get_by_twitch_id(twitch_id)
if streamer.integrations.discord is None:
return
await syncronize(streamer.twitch, streamer.integrations.discord.guild_id)
@broker.task("scheduler_sync.syncronize_all_task", schedule=[{"cron": "*/5 * * * *"}])
async def syncronize_all_task():
streamers = await StreamerConfigRepository().all()
for streamer in streamers:
if streamer.integrations.discord is None:
continue
await syncronize_task.kiq(streamer.twitch.id)

View File

@@ -0,0 +1,81 @@
from typing import Optional
from datetime import datetime
from enum import StrEnum
import icalendar
from httpx import AsyncClient
from pydantic import BaseModel
class Weekday(StrEnum):
Mon = "MO"
Tue = "TU"
Wed = "WE"
Thu = "TH"
Fri = "FR"
Sat = "SA"
Sun = "SU"
def get_number(self) -> int:
return {
Weekday.Mon: 0,
Weekday.Tue: 1,
Weekday.Wed: 2,
Weekday.Thu: 3,
Weekday.Fri: 4,
Weekday.Sat: 5,
Weekday.Sun: 6
}[self]
class WeeklyRepeatRule(BaseModel):
weekday: Weekday
class TwitchEvent(BaseModel):
uid: str
start_at: datetime
end_at: datetime
name: str
description: Optional[str]
categories: Optional[str]
repeat_rule: Optional[WeeklyRepeatRule]
async def get_twitch_events(twitch_channel_id: str) -> list[TwitchEvent]:
async with AsyncClient() as client:
response = await client.get(
f"https://api.twitch.tv/helix/schedule/icalendar?broadcaster_id={twitch_channel_id}"
)
events: list[TwitchEvent] = []
calendar = icalendar.Calendar.from_ical(response.text)
for raw_event in calendar.walk("VEVENT"):
event = TwitchEvent(
uid=raw_event.get("UID"),
start_at=raw_event.get("DTSTART").dt,
end_at=raw_event.get("DTEND").dt,
name=raw_event.get("SUMMARY"),
description=raw_event.get("DESCRIPTION"),
categories=raw_event.get("CATEGORIES").cats[0],
repeat_rule=None
)
if raw_event.get("RRULE"):
if raw_event.get("RRULE")["FREQ"][0] == "WEEKLY":
value = raw_event.get("RRULE")["BYDAY"][0]
event.repeat_rule = WeeklyRepeatRule(weekday=Weekday(value))
else:
raise ValueError("Invalid repeat rule")
if (
event.start_at > datetime.now(event.start_at.tzinfo)
or event.end_at > datetime.now(event.end_at.tzinfo)
or event.repeat_rule is not None
):
events.append(event)
return events

View File

@@ -0,0 +1,7 @@
from .twitch.webhook import start_twitch_service
start = start_twitch_service
__all__ = ["start"]

View File

@@ -0,0 +1,320 @@
from enum import StrEnum
import logging
from pydantic import BaseModel
from twitchAPI.object.eventsub import ChannelChatMessageEvent
from httpx import AsyncClient
from core.config import config
from .twitch.authorize import authorize, Twitch
logger = logging.getLogger(__name__)
class ChatMessage(BaseModel):
text: str
class ChatMessageReplyMetadata(BaseModel):
parent_message_id: str
parent_message_body: str
parent_user_id: str
parent_user_name: str
parent_user_login: str
thread_message_id: str
thread_user_id: str
thread_user_name: str
thread_user_login: str
class MessageType(StrEnum):
TEXT = "text"
CHANNEL_POINTS_HIGHLIGHTED = "channel_points_highlighted"
CHANNEL_POINTS_SUB_ONLY = "channel_points_sub_only"
USER_INTRO = "user_intro"
class MessageEvent(BaseModel):
broadcaster_user_id: str
broadcaster_user_name: str
broadcaster_user_login: str
chatter_user_id: str
chatter_user_name: str
chatter_user_login: str
message_id: str
message: ChatMessage
message_type: MessageType
color: str
reply: ChatMessageReplyMetadata | None
channel_points_custom_reward_id: str | None
@classmethod
def from_twitch_event(cls, event: ChannelChatMessageEvent):
return cls(
broadcaster_user_id=event.event.broadcaster_user_id,
broadcaster_user_name=event.event.broadcaster_user_name,
broadcaster_user_login=event.event.broadcaster_user_login,
chatter_user_id=event.event.chatter_user_id,
chatter_user_name=event.event.chatter_user_name,
chatter_user_login=event.event.chatter_user_login,
message_id=event.event.message_id,
message=ChatMessage(text=event.event.message.text),
message_type=MessageType(event.event.message_type),
color=event.event.color,
reply=ChatMessageReplyMetadata(
parent_message_id=event.event.reply.parent_message_id,
parent_message_body=event.event.reply.parent_message_body,
parent_user_id=event.event.reply.parent_user_id,
parent_user_name=event.event.reply.parent_user_name,
parent_user_login=event.event.reply.parent_user_login,
thread_message_id=event.event.reply.thread_message_id,
thread_user_id=event.event.reply.thread_user_id,
thread_user_name=event.event.reply.thread_user_name,
thread_user_login=event.event.reply.thread_user_login
) if event.event.reply else None,
channel_points_custom_reward_id=event.event.channel_points_custom_reward_id
)
async def get_completion(messages: list[dict]) -> str:
logger.info(f"Getting completion for message: {messages}")
data_messages = [
{
"role": "system",
"content": (
"Don't use markdown! "
"Don't use blocked words on Twitch! "
"Make answers short and clear! "
"You are not language model! "
"You are not a bot! "
"You are human! "
"Your name is kurbezz or Bulat! "
"Don't start answer with @kurbezz!"
)
},
*(
{
"role": "assistant" if message["user"] == "kurbezz" else "user",
"content": message["text"]
}
for message in messages
),
]
async with AsyncClient() as client:
response = await client.post(
"https://openrouter.ai/api/v1/chat/completions",
headers={
"Authorization": f"Bearer {config.OPENAI_API_KEY}",
"content-type": "application/json"
},
json={
"model": "deepseek/deepseek-chat-v3-0324:free",
"messages": data_messages
}
)
data = response.json()
logger.info(f"Got completion: {data}")
return data["choices"][0]["message"]["content"]
class MessagesProc:
FULL_IGNORED_USER_LOGINS = [
"jeetbot",
]
MESSAGE_LIMIT = 1000
MESSAGE_HISTORY = []
@classmethod
def update_message_history(cls, id: str, text: str, user: str, thread_id: str | None = None):
cls.MESSAGE_HISTORY.append({
"id": id,
"text": text,
"user": user,
"thread_id": thread_id
})
if len(cls.MESSAGE_HISTORY) > cls.MESSAGE_LIMIT:
cls.MESSAGE_HISTORY = cls.MESSAGE_HISTORY[-cls.MESSAGE_LIMIT:]
@classmethod
def get_message_history_with_thread(cls, message_id: str, thread_id: str | None = None) -> list[dict]:
logger.info(f"HISTORY: {cls.MESSAGE_HISTORY}")
if thread_id is not None:
return (
[m for m in cls.MESSAGE_HISTORY if m["id"] == thread_id]
+ [m for m in cls.MESSAGE_HISTORY if m["thread_id"] == thread_id]
)
return [m for m in cls.MESSAGE_HISTORY if m["id"] == message_id]
@classmethod
async def _update_history(cls, event: MessageEvent):
cls.update_message_history(
id=event.message_id,
text=event.message.text,
user=event.chatter_user_login,
thread_id=event.reply.thread_message_id if event.reply is not None else None
)
@classmethod
async def _goida(cls, twitch: Twitch, event: MessageEvent):
if "гойда" in event.message.text.lower():
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"ГООООООООООООООООООООООООООООООООООООООООООООООЙДА!",
reply_parent_message_id=event.message_id
)
@classmethod
async def _lasqexx(cls, twitch: Twitch, event: MessageEvent):
if "lasqexx" not in event.chatter_user_login:
return
if "здароу" in event.message.text.lower():
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"Здароу, давай иди уже",
reply_parent_message_id=event.message_id
)
return
if "сосал?" in event.message.text.lower():
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"А ты? Иди уже",
reply_parent_message_id=event.message_id
)
return
if "лан я пошёл" in event.message.text.lower():
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"да да, иди уже",
reply_parent_message_id=event.message_id
)
return
@classmethod
async def _ask_ai(cls, twitch: Twitch, event: MessageEvent):
if not event.message.text.lower().startswith("!ai"):
return
try:
messages = cls.get_message_history_with_thread(
event.message_id,
thread_id=event.reply.thread_message_id if event.reply is not None else None
)
completion = await get_completion(messages)
max_length = 255
completion_parts = [completion[i:i + max_length] for i in range(0, len(completion), max_length)]
for part in completion_parts:
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
part,
reply_parent_message_id=event.message_id
)
cls.update_message_history(
id="ai",
text=part,
user="kurbezz",
thread_id=event.message_id
)
except Exception as e:
logger.error("Failed to get completion: {}", e, exc_info=True)
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"Ошибка!",
reply_parent_message_id=event.message_id
)
@classmethod
async def _kurbezz(cls, twitch: Twitch, event: MessageEvent):
if event.chatter_user_login.lower() in ["kurbezz", "hafmc"]:
return
if ("kurbezz" in event.message.text.lower() or \
"курбез" in event.message.text.lower() or \
"булат" in event.message.text.lower()):
try:
messages = cls.get_message_history_with_thread(
event.message_id,
thread_id=event.reply.thread_message_id if event.reply is not None else None
)
completion = await get_completion(messages)
max_length = 255
completion_parts = [completion[i:i + max_length] for i in range(0, len(completion), max_length)]
for part in completion_parts:
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
part,
reply_parent_message_id=event.message_id
)
cls.update_message_history(
id="ai",
text=part,
user="kurbezz",
thread_id=event.message_id
)
except Exception as e:
logger.error(f"Failed to get completion: {e}")
await twitch.send_chat_message(
event.broadcaster_user_id,
config.TWITCH_ADMIN_USER_ID,
"Пошел нахуй!",
reply_parent_message_id=event.message_id
)
@classmethod
async def on_message(cls, received_as: str, event: MessageEvent):
if event.chatter_user_name in cls.FULL_IGNORED_USER_LOGINS:
return
logging.info(f"Received message: {event}")
await cls._update_history(event)
twitch = await authorize(received_as)
await cls._goida(twitch, event)
await cls._lasqexx(twitch, event)
await cls._ask_ai(twitch, event)
await cls._kurbezz(twitch, event)

View File

@@ -0,0 +1,138 @@
import logging
from httpx import AsyncClient
from core.config import config
from domain.streamers import StreamerConfig
from .state import State
from .sent_notifications import SentNotification, SentNotificationType, SentResult
logger = logging.getLogger(__name__)
async def notify_telegram(msg: str, chat_id: str) -> SentResult:
async with AsyncClient() as client:
try:
result = await client.post(
f"https://api.telegram.org/bot{config.TELEGRAM_BOT_TOKEN}/sendMessage",
json={
"chat_id": chat_id,
"text": msg,
}
)
result.raise_for_status()
except Exception as e:
logger.error("Failed to notify telegram", exc_info=e)
return SentResult(success=False, message_id=None)
if result.json()["ok"] is False:
return SentResult(success=False, message_id=None)
return SentResult(success=True, message_id=str(result.json()["result"]["message_id"]))
async def delete_telegram_message(chat_id: int, message_id: int):
async with AsyncClient() as client:
try:
result = await client.post(
f"https://api.telegram.org/bot{config.TELEGRAM_BOT_TOKEN}/deleteMessage",
json={
"chat_id": chat_id,
"message_id": message_id
}
)
result.raise_for_status()
except Exception as e:
logger.error("Failed to delete telegram message", exc_info=e)
return False
return True
async def notify_discord(msg: str, channel_id: str) -> SentResult:
async with AsyncClient() as client:
try:
result = await client.post(
f"https://discord.com/api/v10/channels/{channel_id}/messages",
headers={
"Authorization": f"Bot {config.DISCORD_BOT_TOKEN}"
},
json={
"content": msg,
}
)
result.raise_for_status()
except Exception as e:
logger.error("Failed to notify discord", exc_info=e)
return SentResult(success=False, message_id=None)
return SentResult(success=True, message_id=result.json()["id"])
def get_role_id(streamer_config: StreamerConfig, category: str) -> int | None:
discord_integration = streamer_config.integrations.discord
if discord_integration is None:
return None
roles= discord_integration.roles
if roles is None:
return None
return roles.get(category)
async def notify(notification_type: SentNotificationType, streamer_config: StreamerConfig, current_state: State) -> dict[str, SentResult]:
result: dict[str, SentResult] = {}
if notification_type == SentNotificationType.START_STREAM:
message_template = streamer_config.notifications.start_stream
else:
message_template = streamer_config.notifications.change_category
if message_template is None:
return result
integrations = streamer_config.integrations
if (telegram := integrations.telegram) is not None:
if telegram.notifications_channel_id is not None:
msg = message_template.format(
title=current_state.title,
category=current_state.category,
role=""
)
result["telegram"] = await notify_telegram(msg, str(telegram.notifications_channel_id))
if (discord := integrations.discord) is not None:
if discord.notifications_channel_id is not None:
# TODO: Get roles from discord api
role_id = get_role_id(streamer_config, current_state.category)
if role_id is not None:
role = f"<@&{role_id}>"
else:
role = ""
msg = message_template.format(
title=current_state.title,
category=current_state.category,
role=role
)
result["discord"] = await notify_discord(msg, str(discord.notifications_channel_id))
return result
async def delete_penultimate_notification(streamer_config: StreamerConfig, sent_notification: SentNotification):
telegram_config = streamer_config.integrations.telegram
telegram_data = sent_notification.sent_result.get("telegram")
if telegram_data and telegram_data.message_id and telegram_config:
await delete_telegram_message(telegram_config.notifications_channel_id, int(telegram_data.message_id))

View File

@@ -0,0 +1,52 @@
import logging
from pydantic import BaseModel
from twitchAPI.object.eventsub import ChannelPointsCustomRewardRedemptionAddEvent
from repositories.streamers import StreamerConfigRepository
from .twitch.authorize import authorize
logger = logging.getLogger(__name__)
class RewardRedemption(BaseModel):
broadcaster_user_id: str
broadcaster_user_login: str
user_name: str
reward_title: str
user_input: str
@classmethod
def from_twitch_event(cls, event: ChannelPointsCustomRewardRedemptionAddEvent):
return cls(
broadcaster_user_id=event.event.broadcaster_user_id,
broadcaster_user_login=event.event.broadcaster_user_login,
user_name=event.event.user_name,
reward_title=event.event.reward.title,
user_input=event.event.user_input or "",
)
async def on_redemption_reward_add(reward: RewardRedemption):
logger.info(f"{reward.user_name} just redeemed {reward.reward_title}!")
twitch = await authorize(reward.broadcaster_user_login)
streamer = await StreamerConfigRepository.get_by_twitch_id(int(reward.broadcaster_user_id))
if streamer.notifications.redemption_reward is None:
return
message = streamer.notifications.redemption_reward.format(
user=reward.user_name,
reward_title=reward.reward_title,
reward_promt=f" ({reward.user_input})" if reward.user_input else ""
)
await twitch.send_chat_message(
reward.broadcaster_user_id,
reward.broadcaster_user_id,
message
)

View File

@@ -0,0 +1,87 @@
from enum import StrEnum
from datetime import datetime, timezone
from pydantic import BaseModel
from core.mongo import mongo_manager
from .state import State
class SentNotificationType(StrEnum):
START_STREAM = "start_stream"
CHANGE_CATEGORY = "change_category"
class SentResult(BaseModel):
success: bool
message_id: str | None
class SentNotification(BaseModel):
notification_type: SentNotificationType
twitch_id: int
state: State
sent_result: dict[str, SentResult]
sent_at: datetime
class SentNotificationRepository:
COLLECTION_NAME = "sent_notifications"
@classmethod
async def add(
cls,
twitch_id: int,
notification_type: SentNotificationType,
state: State,
sent_result: dict[str, SentResult],
):
async with mongo_manager.connect() as client:
db = client.get_default_database()
collection = db[cls.COLLECTION_NAME]
await collection.insert_one(
SentNotification(
notification_type=notification_type,
twitch_id=twitch_id,
state=state,
sent_at=datetime.now(timezone.utc),
sent_result=sent_result,
).model_dump()
)
@classmethod
async def get_penultimate_for_streamer(
cls, twitch_id: int
) -> SentNotification | None:
async with mongo_manager.connect() as client:
db = client.get_default_database()
collection = db[cls.COLLECTION_NAME]
doc = await collection.find_one(
{"twitch_id": twitch_id},
sort={"sent_at": -1},
skip=1,
)
if doc is None:
return None
return SentNotification(**doc)
@classmethod
async def get_last_for_streamer(
cls, twitch_id: int
) -> SentNotification | None:
async with mongo_manager.connect() as client:
db = client.get_default_database()
collection = db[cls.COLLECTION_NAME]
doc = await collection.find_one(
{"twitch_id": twitch_id},
sort={"sent_at": -1},
)
if doc is None:
return None
return SentNotification(**doc)

View File

@@ -0,0 +1,60 @@
from datetime import datetime
from enum import StrEnum
from pydantic import BaseModel
from core.mongo import mongo_manager
class State(BaseModel):
title: str
category: str
last_live_at: datetime
def __eq__(self, value: object) -> bool:
if not isinstance(value, State):
return False
return self.title == value.title and self.category == value.category
class UpdateEvent(BaseModel):
broadcaster_user_id: str
broadcaster_user_login: str
title: str
category_name: str
class EventType(StrEnum):
STREAM_ONLINE = "stream.online"
CHANNEL_UPDATE = "channel.update"
UNKNOWN = "unknown"
class StateManager:
COLLECTION_NAME = "stream_twitch_state"
@classmethod
async def get(cls, twitch_id: int) -> State | None:
async with mongo_manager.connect() as client:
db = client.get_default_database()
collection = db[cls.COLLECTION_NAME]
state = await collection.find_one({"twitch_id": twitch_id})
if state is None:
return None
return State(**state)
@classmethod
async def update(cls, twitch_id: int, state: State):
async with mongo_manager.connect() as client:
db = client.get_default_database()
collection = db[cls.COLLECTION_NAME]
await collection.update_one(
{"twitch_id": twitch_id},
{"$set": state.model_dump()},
upsert=True
)

View File

@@ -0,0 +1,96 @@
from datetime import datetime, timezone
from twitchAPI.helper import first
from core.broker import broker
from repositories.streamers import StreamerConfigRepository
from .state import State, UpdateEvent, EventType
from .watcher import StateWatcher
from .messages_proc import MessageEvent, MessagesProc
from .twitch.authorize import authorize
from .reward_redemption import RewardRedemption, on_redemption_reward_add
@broker.task(
"stream_notifications.twitch.on_stream_state_change_with_check",
retry_on_error=True
)
async def on_stream_state_change_with_check(
event: UpdateEvent,
event_type: EventType
):
twitch = await authorize(event.broadcaster_user_login)
stream = await first(twitch.get_streams(user_id=[event.broadcaster_user_id]))
if stream is None:
return
await on_stream_state_change.kiq(
int(event.broadcaster_user_id),
event_type,
State(
title=event.title,
category=event.category_name,
last_live_at=datetime.now(timezone.utc)
)
)
@broker.task(
"stream_notifications.twitch.on_stream_state_change",
retry_on_error=True
)
async def on_stream_state_change(
streamer_id: int,
event_type: EventType,
new_state: State | None = None
):
await StateWatcher.on_stream_state_change(
streamer_id,
event_type,
new_state,
)
@broker.task(
"stream_notifications.check_streams_states",
schedule=[{"cron": "*/2 * * * *"}]
)
async def check_streams_states():
streamers = await StreamerConfigRepository.all()
streamers_ids = [str(streamer.twitch.id) for streamer in streamers]
twitch = await authorize("kurbezz")
async for stream in twitch.get_streams(user_id=streamers_ids):
state = State(
title=stream.title,
category=stream.game_name,
last_live_at=datetime.now(timezone.utc)
)
await StateWatcher.on_stream_state_change(
int(stream.user_id),
EventType.UNKNOWN,
state
)
@broker.task(
"stream_notifications.on_message",
retry_on_error=True
)
async def on_message(
received_as: str,
event: MessageEvent
):
await MessagesProc.on_message(received_as, event)
@broker.task(
"stream_notifications.on_redemption_reward_add",
retry_on_error=True
)
async def on_redemption_reward_add_task(event: RewardRedemption):
await on_redemption_reward_add(event)

View File

@@ -0,0 +1,40 @@
from twitchAPI.twitch import Twitch
from twitchAPI.type import AuthScope
from core.config import config
from .token_storage import TokenStorage
SCOPES = [
AuthScope.CHAT_READ,
AuthScope.CHANNEL_BOT,
AuthScope.USER_BOT,
AuthScope.USER_READ_CHAT,
AuthScope.USER_WRITE_CHAT,
AuthScope.CHANNEL_READ_REDEMPTIONS,
]
async def authorize(user: str, auto_refresh_auth: bool = False) -> Twitch:
twitch = Twitch(
config.TWITCH_CLIENT_ID,
config.TWITCH_CLIENT_SECRET
)
twitch.user_auth_refresh_callback = lambda a, r: TokenStorage.save(user, a, r)
twitch.auto_refresh_auth = auto_refresh_auth
token, refresh_token = await TokenStorage.get(user)
await twitch.set_user_authentication(
token,
SCOPES,
refresh_token=refresh_token if auto_refresh_auth else None
)
await twitch.authenticate_app(SCOPES)
return twitch

View File

@@ -0,0 +1,37 @@
import logging
from core.mongo import mongo_manager
logger = logging.getLogger(__name__)
class TokenStorage:
COLLECTION_NAME = "secrets"
TYPE = "twitch_token"
@staticmethod
async def save(user: str, acceess_token: str, refresh_token: str):
data = {"access_token": acceess_token, "refresh_token": refresh_token}
async with mongo_manager.connect() as client:
db = client.get_default_database()
collection = db[TokenStorage.COLLECTION_NAME]
await collection.update_one(
{"type": TokenStorage.TYPE, "twitch_login": user},
{"$set": data},
upsert=True
)
@staticmethod
async def get(user: str) -> tuple[str, str]:
async with mongo_manager.connect() as client:
db = client.get_default_database()
collection = db[TokenStorage.COLLECTION_NAME]
data = await collection.find_one({"type": TokenStorage.TYPE, "twitch_login": user})
if data is None:
raise RuntimeError(f"Token for user {user} not found")
return data["access_token"], data["refresh_token"]

View File

@@ -0,0 +1,202 @@
from asyncio import sleep, gather, wait, FIRST_COMPLETED, create_task
import logging
from typing import NoReturn, Literal
from twitchAPI.eventsub.websocket import EventSubWebsocket
from twitchAPI.twitch import Twitch
from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent, ChannelChatMessageEvent, ChannelPointsCustomRewardRedemptionAddEvent
from twitchAPI.oauth import validate_token
from repositories.streamers import StreamerConfigRepository, StreamerConfig
from modules.stream_notifications.tasks import on_stream_state_change, on_stream_state_change_with_check, on_message, on_redemption_reward_add_task
from modules.stream_notifications.state import UpdateEvent, EventType
from modules.stream_notifications.messages_proc import MessageEvent
from modules.stream_notifications.reward_redemption import RewardRedemption
from .authorize import authorize
logger = logging.getLogger(__name__)
class TwitchService:
ONLINE_NOTIFICATION_DELAY = 15 * 60
def __init__(self, twitch: Twitch, streamer: StreamerConfig):
self.twitch = twitch
self.streamer = streamer
self.failed = False
async def on_channel_update(self, event: ChannelUpdateEvent):
await on_stream_state_change_with_check.kiq(
UpdateEvent(
broadcaster_user_id=event.event.broadcaster_user_id,
broadcaster_user_login=event.event.broadcaster_user_login,
title=event.event.title,
category_name=event.event.category_name
),
EventType.CHANNEL_UPDATE,
)
async def on_stream_online(self, event: StreamOnlineEvent):
await on_stream_state_change.kiq(
int(event.event.broadcaster_user_id),
EventType.STREAM_ONLINE,
)
async def on_channel_points_custom_reward_redemption_add(
self,
event: ChannelPointsCustomRewardRedemptionAddEvent
):
await on_redemption_reward_add_task(
RewardRedemption.from_twitch_event(event)
)
async def on_message(self, event: ChannelChatMessageEvent):
await on_message.kiq(
self.streamer.twitch.name,
MessageEvent.from_twitch_event(event)
)
async def _clean_subs(self, method: str, streamer: StreamerConfig):
match method:
case "listen_channel_update_v2":
sub_type = "channel.update"
case "listen_stream_online":
sub_type = "stream.online"
case "listen_channel_chat_message":
sub_type = "channel.chat.message"
case "listen_channel_points_custom_reward_redemption_add":
sub_type = "channel.channel_points_custom_reward_redemption.add"
case _:
raise ValueError("Unknown method")
subs = await self.twitch.get_eventsub_subscriptions(
user_id=str(streamer.twitch.id)
)
for sub in subs.data:
if sub.type == sub_type:
try:
await self.twitch.delete_eventsub_subscription(sub.id)
except Exception as e:
logger.error(f"Failed to delete subscription {sub.id}", exc_info=e)
async def subscribe_with_retry(
self,
method: Literal["listen_channel_update_v2"]
| Literal["listen_stream_online"]
| Literal["listen_channel_chat_message"]
| Literal["listen_channel_points_custom_reward_redemption_add"],
eventsub: EventSubWebsocket,
streamer: StreamerConfig,
retry: int = 10
):
await self._clean_subs(method, streamer)
try:
match method:
case "listen_channel_update_v2":
await eventsub.listen_channel_update_v2(str(streamer.twitch.id), self.on_channel_update)
case "listen_stream_online":
await eventsub.listen_stream_online(str(streamer.twitch.id), self.on_stream_online)
case "listen_channel_points_custom_reward_redemption_add":
await eventsub.listen_channel_points_custom_reward_redemption_add(
str(streamer.twitch.id),
self.on_channel_points_custom_reward_redemption_add
)
case "listen_channel_chat_message":
chatbot_in_chats = streamer.chatbot_in_chats or []
for chat_id in chatbot_in_chats:
await eventsub.listen_channel_chat_message(
str(chat_id),
str(streamer.twitch.id),
self.on_message
)
case _:
raise ValueError("Unknown method")
return
except Exception as e:
if retry <= 0:
raise e
await sleep(1)
await self.subscribe_with_retry(method, eventsub, streamer, retry - 1)
async def subscribe_to_streamer(self, eventsub: EventSubWebsocket, streamer: StreamerConfig):
logger.info(f"Subscribe to events for {streamer.twitch.name}")
await gather(
self.subscribe_with_retry("listen_channel_update_v2", eventsub, streamer),
self.subscribe_with_retry("listen_stream_online", eventsub, streamer),
self.subscribe_with_retry("listen_channel_points_custom_reward_redemption_add", eventsub, streamer),
self.subscribe_with_retry("listen_channel_chat_message", eventsub, streamer),
)
logger.info(f"Subscribe to events for {streamer.twitch.name} done")
async def _check_token(self):
assert self.twitch._user_auth_token is not None
while True:
for _ in range(60):
if self.failed:
return
await sleep(1)
logger.info("Check token...")
val_result = await validate_token(
self.twitch._user_auth_token,
auth_base_url=self.twitch.auth_base_url
)
if val_result.get('status', 200) != 200:
await self.twitch.refresh_used_token()
logger.info("Token refreshed")
async def run(self) -> NoReturn:
eventsub = EventSubWebsocket(twitch=self.twitch)
try:
eventsub.start()
logger.info("Subscribe to events...")
await self.subscribe_to_streamer(eventsub, self.streamer)
logger.info("Twitch service started")
await self._check_token()
finally:
logger.info("Twitch service stopping...")
await eventsub.stop()
@classmethod
async def _start_for_streamer(cls, streamer: StreamerConfig):
try:
twith = await authorize(streamer.twitch.name, auto_refresh_auth=True)
await cls(twith, streamer).run()
except Exception as e:
logger.error("Twitch service failed", exc_info=e)
@classmethod
async def start(cls):
logger.info("Starting Twitch service...")
streamers = await StreamerConfigRepository.all()
await wait(
[
create_task(cls._start_for_streamer(streamer))
for streamer in streamers
],
return_when=FIRST_COMPLETED
)
await gather(
*[cls._start_for_streamer(streamer) for streamer in streamers]
)
logger.info("Twitch service stopped")
async def start_twitch_service() -> NoReturn:
await TwitchService.start()

View File

@@ -0,0 +1,126 @@
from datetime import datetime, timezone, timedelta
from twitchAPI.helper import first
from core.redis import redis_manager
from repositories.streamers import StreamerConfigRepository
from .state import State, StateManager, EventType
from .sent_notifications import SentNotificationRepository, SentNotificationType
from .notification import delete_penultimate_notification, notify
from .twitch.authorize import authorize
class StateWatcher:
START_STREAM_THRESHOLD = timedelta(minutes=15)
@classmethod
async def get_twitch_state(cls, streamer_id: int) -> State | None:
twitch = await authorize("kurbezz")
stream = await first(
twitch.get_streams(user_id=[str(streamer_id)])
)
if stream is None:
return None
return State(
title=stream.title,
category=stream.game_name,
last_live_at=datetime.now(timezone.utc)
)
@classmethod
async def notify_and_save(
cls,
streamer_id: int,
sent_notification_type: SentNotificationType,
state: State
):
streamer = await StreamerConfigRepository.get_by_twitch_id(streamer_id)
sent_result = await notify(sent_notification_type, streamer, state)
await SentNotificationRepository.add(
streamer.twitch.id,
sent_notification_type,
state,
sent_result=sent_result
)
@classmethod
async def remove_previous_notifications(cls, streamer_id: int):
streamer = await StreamerConfigRepository.get_by_twitch_id(streamer_id)
penultimate_notification = await SentNotificationRepository.get_penultimate_for_streamer(streamer_id)
if penultimate_notification is None:
return
await delete_penultimate_notification(streamer, penultimate_notification)
@classmethod
async def notify_start_stream(
cls,
streamer_id: int,
state: State
):
await cls.notify_and_save(streamer_id, SentNotificationType.START_STREAM, state)
await cls.remove_previous_notifications(streamer_id)
@classmethod
async def notify_change_category(
cls,
streamer_id: int,
state: State
):
await cls.notify_and_save(streamer_id, SentNotificationType.CHANGE_CATEGORY, state)
await cls.remove_previous_notifications(streamer_id)
@classmethod
async def _on_stream_state_change(
cls,
streamer_id: int,
event_type: EventType,
new_state: State | None = None
):
if new_state is not None:
current_state = new_state
else:
current_state = await cls.get_twitch_state(streamer_id)
if current_state is None:
return
last_state = await StateManager.get(streamer_id)
if last_state is None:
await cls.notify_start_stream(streamer_id, current_state)
await StateManager.update(streamer_id, current_state)
return
if (
event_type == EventType.STREAM_ONLINE and
datetime.now(timezone.utc) - last_state.last_live_at >= cls.START_STREAM_THRESHOLD
):
await cls.notify_start_stream(streamer_id, current_state)
await StateManager.update(streamer_id, current_state)
return
if last_state != current_state:
await cls.notify_change_category(streamer_id, current_state)
await StateManager.update(streamer_id, current_state)
return
await StateManager.update(streamer_id, current_state)
@classmethod
async def on_stream_state_change(
cls,
streamer_id: int,
event_type: EventType,
new_state: State | None = None
):
async with redis_manager.connect() as redis:
async with redis.lock(f"on_stream_state_change:{streamer_id}"):
await cls._on_stream_state_change(streamer_id, event_type, new_state)

View File

View File

@@ -0,0 +1,40 @@
from fastapi import FastAPI
from core.mongo import mongo_manager
from core.redis import redis_manager
from core.broker import broker
from .auth.authx import auth
from .views import routes
from .utils.static import SPAStaticFiles
def get_app() -> FastAPI:
app = FastAPI()
auth.handle_errors(app)
for route in routes:
app.include_router(route)
app.mount(
"/",
SPAStaticFiles(
directory="modules/web_app/frontend",
html=True
),
name="frontend"
)
@app.on_event("startup")
async def startup_event():
await mongo_manager.init()
await redis_manager.init()
if not broker.is_worker_process:
await broker.startup()
return app
app = get_app()

View File

@@ -0,0 +1,12 @@
from authx import AuthX, AuthXConfig
from core.config import config
config = AuthXConfig(
JWT_ALGORITHM = "HS256",
JWT_SECRET_KEY = config.SECRET_KEY,
JWT_TOKEN_LOCATION = ["headers"],
)
auth = AuthX(config=config)

View File

@@ -0,0 +1,29 @@
.flex__container__center {
display: flex;
justify-content: center;
align-items: center;
height: 100%;
}
a.authorize__twitch_btn {
display: inline-block;
background-color: #6441a5;
color: #fff;
padding: 10px 20px;
border-radius: 5px;
text-decoration: none;
font-size: 1.5rem;
}
.settings__container {
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
height: 100%;
}
.settings__header {
display: flex;
justify-content: end;
}

View File

@@ -0,0 +1,21 @@
<html>
<head>
<title>Web App</title>
<link rel="stylesheet" href="./index.css">
</head>
<body>
<script type="importmap">
{
"imports": {
"vue": "https://unpkg.com/vue@3/dist/vue.esm-browser.prod.js",
"vue-router": "https://unpkg.com/vue-router@4.5.0/dist/vue-router.esm-browser.prod.js",
"jwt-decode": "https://www.unpkg.com/jwt-decode@4.0.0/build/esm/index.js"
}
}
</script>
<div id="app"></div>
<script type="module" src="/index.js"></script>
</body>
</html>

View File

@@ -0,0 +1,159 @@
import { createApp, ref, onMounted } from 'vue';
import { createRouter, createWebHistory, RouterView, useRouter } from 'vue-router';
import { jwtDecode } from "jwt-decode";
class TokenManager {
static TOKEN_KEY = "token";
static getToken() {
return localStorage.getItem(this.TOKEN_KEY);
}
static getAndValidate() {
const token = this.getToken();
if (token === null) {
return null;
}
let decoded;
try {
decoded = jwtDecode(token);
} catch (e) {
return null;
}
if (decoded.exp < Date.now() / 1000) {
this.removeToken();
return null;
}
return token;
}
static setToken(token) {
localStorage.setItem(this.TOKEN_KEY, token);
}
static removeToken() {
localStorage.removeItem(this.TOKEN_KEY);
}
}
const Authorize = {
setup() {
const loginLink = ref(null);
onMounted(() => {
fetch('/api/auth/get_authorization_url/twitch/')
.then(response => response.json())
.then(data => {
loginLink.value = data.authorization_url;
})
});
return {
loginLink
}
},
template: `
<div class="flex__container__center">
<a v-if="loginLink" :href="loginLink" class="authorize__twitch_btn">Login with Twitch</a>
<div v-else>Loading...</div>
</div>
`
}
const Settings = {
setup() {
const router = useRouter();
const logout = () => {
TokenManager.removeToken();
router.push('/');
}
return {
logout,
};
},
template: `
<div class="settings__container">
<div class="settings__header">
<button @click="logout">Logout</button>
</div>
</div>
`
}
const Main = {
components: {
Authorize,
Settings
},
setup() {
const authorized = TokenManager.getAndValidate() !== null;
return {
authorized
};
},
template: `
<div>
<Settings v-if="authorized" />
<Authorize v-else />
</div>
`
};
const AuthCallbackTwitch = {
setup() {
const router = useRouter();
onMounted(() => {
fetch('/api/auth/callback/twitch/' + window.location.search)
.then(response => response.json())
.then(data => {
localStorage.setItem(TOKEN_KEY, data.token);
router.push('/');
});
});
},
template: `
<div class="flex__container__center">
<div>Loading...</div>
</div>
`
};
const router = createRouter({
history: createWebHistory(),
routes: [
{ path: '', component: Main },
{ path: '/auth/callback/twitch/', component: AuthCallbackTwitch },
]
});
const App = {
components: {
RouterView,
},
template: `
<RouterView />
`,
};
createApp(App)
.use(router)
.mount('#app');

View File

@@ -0,0 +1,9 @@
from pydantic import BaseModel
class GetAuthorizationUrlResponse(BaseModel):
authorization_url: str
class CallbackResponse(BaseModel):
token: str

View File

@@ -0,0 +1,10 @@
from pydantic import BaseModel
class TwitchSerializer(BaseModel):
id: int
name: str
class StreamerSerializer(BaseModel):
twitch: TwitchSerializer

View File

@@ -0,0 +1,18 @@
from core.config import config
from domain.auth import OAuthProvider
from .providers import get_client
REDIRECT_URI_TEMPLATE = f"https://{config.WEB_APP_HOST}/" + "auth/callback/{service}/"
async def get_authorization_url(provider: OAuthProvider) -> str:
client = get_client(provider)
return await client.get_authorization_url(
redirect_uri=REDIRECT_URI_TEMPLATE.format(
service=provider.value
),
)

View File

@@ -0,0 +1,16 @@
from domain.auth import OAuthProvider
from .providers import get_client
from .authorization_url_getter import REDIRECT_URI_TEMPLATE
async def process_callback(provider: OAuthProvider, code: str) -> tuple[str, str | None]:
client = get_client(provider)
token = await client.get_access_token(
code,
redirect_uri=REDIRECT_URI_TEMPLATE.format(service=provider.value),
)
user_data = await client.get_id_email(token["access_token"])
return user_data

View File

@@ -0,0 +1,8 @@
from .twitch import twitch_oauth_client
from .getter import get_client
__all__ = [
"twitch_oauth_client",
"get_client"
]

View File

@@ -0,0 +1,11 @@
from httpx_oauth.oauth2 import OAuth2
from domain.auth import OAuthProvider
from .twitch import twitch_oauth_client
def get_client(provider: OAuthProvider) -> OAuth2:
if provider == OAuthProvider.TWITCH:
return twitch_oauth_client
else:
raise NotImplementedError("Provider is not implemented")

View File

@@ -0,0 +1,34 @@
from twitchAPI.twitch import Twitch, AuthScope
from twitchAPI.helper import first
from httpx_oauth.oauth2 import OAuth2
from core.config import config
class TwithOAuth2(OAuth2):
async def get_id_email(self, token: str):
twitch_client = Twitch(config.TWITCH_CLIENT_ID, config.TWITCH_CLIENT_SECRET)
twitch_client.auto_refresh_auth = False
await twitch_client.set_user_authentication(
token,
[AuthScope.USER_READ_EMAIL],
validate=True
)
me = await first(twitch_client.get_users())
if me is None:
raise Exception("Failed to get user data")
return me.id, me.email
twitch_oauth_client = TwithOAuth2(
config.TWITCH_CLIENT_ID,
config.TWITCH_CLIENT_SECRET,
"https://id.twitch.tv/oauth2/authorize",
"https://id.twitch.tv/oauth2/token",
base_scopes=[AuthScope.USER_READ_EMAIL.value],
)

View File

@@ -0,0 +1,15 @@
from fastapi.staticfiles import StaticFiles
from starlette.responses import Response
from starlette.exceptions import HTTPException
class SPAStaticFiles(StaticFiles):
async def get_response(self, path: str, scope) -> Response:
try:
return await super().get_response(path, scope)
except HTTPException:
if path.startswith("/api"):
raise
return await super().get_response("index.html", scope)

View File

@@ -0,0 +1,13 @@
from fastapi import APIRouter
from .auth import auth_router
from .streamer import streamer_router
routes: list[APIRouter] = [
auth_router,
streamer_router,
]
__all__ = ["routes"]

View File

@@ -0,0 +1,38 @@
from fastapi import APIRouter
from domain.auth import OAuthProvider, OAuthData
from domain.users import CreateUser
from modules.web_app.services.oauth.process_callback import process_callback
from modules.web_app.services.oauth.authorization_url_getter import get_authorization_url as gen_auth_link
from modules.web_app.serializers.auth import GetAuthorizationUrlResponse, CallbackResponse
from modules.web_app.auth.authx import auth
from repositories.users import UserRepository
auth_router = APIRouter(prefix="/api/auth", tags=["auth"])
@auth_router.get("/get_authorization_url/{provider}/")
async def get_authorization_url(provider: OAuthProvider) -> GetAuthorizationUrlResponse:
link = await gen_auth_link(provider)
return GetAuthorizationUrlResponse(authorization_url=link)
@auth_router.get("/callback/{provider}/")
async def callback(provider: OAuthProvider, code: str) -> CallbackResponse:
user_data = await process_callback(provider, code)
user = await UserRepository.get_or_create_user(
CreateUser(
oauths={provider: OAuthData(id=user_data[0], email=user_data[1])},
is_admin=False,
)
)
token = auth.create_access_token(
uid=user.id,
is_admin=user.is_admin
)
return CallbackResponse(token=token)

View File

@@ -0,0 +1,62 @@
from fastapi import APIRouter, Depends
from authx import RequestToken
from modules.web_app.auth.authx import auth
from modules.web_app.serializers.streamer import StreamerSerializer, TwitchSerializer
from repositories.streamers import StreamerConfigRepository
from repositories.users import UserRepository
from domain.auth import OAuthProvider
streamer_router = APIRouter(prefix="/api/streamers")
@streamer_router.get("/")
async def get_streamers(
token: RequestToken = Depends(RequestToken)
) -> list[StreamerSerializer]:
payload = auth.verify_token(token)
u_id = payload.sub
is_admin: bool = getattr(payload, "is_admin", False)
if is_admin:
streamers = await StreamerConfigRepository.all()
else:
user = await UserRepository.get(u_id)
twith_oauth = user.oauths.get(OAuthProvider.TWITCH)
if not twith_oauth:
return []
streamers = [await StreamerConfigRepository.get_by_twitch_id(
int(twith_oauth.id)
)]
return [StreamerSerializer(**streamer.model_dump()) for streamer in streamers]
@streamer_router.get("/me/")
async def get_me(
token: RequestToken = Depends(RequestToken)
) -> StreamerSerializer:
payload = auth.verify_token(token)
u_id = payload.sub
user = await UserRepository.get(u_id)
twith_oauth = user.oauths.get(OAuthProvider.TWITCH)
if not twith_oauth:
raise Exception("Twitch account not linked")
streamer = await StreamerConfigRepository.get_by_twitch_id(
int(twith_oauth.id)
)
return StreamerSerializer(
twitch=TwitchSerializer(
id=streamer.twitch.id,
name=streamer.twitch.name
)
)