import logging from collections import defaultdict from typing import Callable import socketio from ._api import BotAPI from .exceptions import BotAPIError logger = logging.getLogger(__name__) # Events that the server broadcasts and that user code can subscribe to. _PASSTHROUGH_EVENTS = ( "chatMsg", "pm", "errorMsg", "kick", "announcement", "clearchat", "updateEmote", "removeEmote", ) class Bot: """ Base class for a veretube sync bot. Connects via socket.io (token auth) and exposes the full REST API as convenience methods. Internal state (users, playlist, now_playing, channel_opts) is kept up to date automatically from socket events. Usage:: bot = Bot( token="cbt_...", channel="mychannel", socket_url="http://localhost:1337", api_url="http://localhost:8080/api/v1", ) @bot.on("chatMsg") def on_chat(data): if data["msg"] == "!hello": bot.send_message("Hello!") bot.run() # blocks until disconnected """ def __init__( self, token: str, channel: str, socket_url: str, api_url: str, reconnection: bool = False, reconnection_delay: int = 3, ): if not token.startswith("cbt_"): raise ValueError("token must start with 'cbt_'") self.token = token self.channel = channel self.socket_url = socket_url self.api = BotAPI(api_url, channel, token) # In-memory channel state, kept up to date by socket events self.users: list[dict] = [] self.now_playing: dict | None = None self.playlist: list[dict] = [] self.channel_opts: dict = {} self._handlers: dict[str, list[Callable]] = defaultdict(list) self._sio = socketio.Client( reconnection=reconnection, reconnection_delay=reconnection_delay, logger=False, engineio_logger=False, ) self._wire_sio() # ── Internal event wiring ───────────────────────────────────────────────── def _wire_sio(self): sio = self._sio @sio.on("connect") def _connect(): logger.info("connected to %s", self.socket_url) self._fire("connect", None) @sio.on("disconnect") def _disconnect(*args): reason = args[0] if args else None self.users = [] self.playlist = [] logger.info("disconnected: %s", reason) self._fire("disconnect", reason) @sio.on("connect_error") def _connect_error(err): logger.error("connection error: %s", err) self._fire("connect_error", err) @sio.on("login") def _login(data): if data.get("success"): logger.info("authenticated as %s", data.get("name")) sio.emit("joinChannel", {"name": self.channel}) else: logger.error("authentication failed: %s", data.get("error", "unknown")) self._fire("login", data) @sio.on("userlist") def _userlist(users): self.users = list(users) logger.debug("userlist: %d users", len(users)) self._fire("userlist", users) @sio.on("addUser") def _add_user(user): self.users = [u for u in self.users if u["name"] != user["name"]] self.users.append(user) self._fire("addUser", user) @sio.on("userLeave") def _user_leave(data): self.users = [u for u in self.users if u["name"] != data["name"]] self._fire("userLeave", data) @sio.on("setUserMeta") def _user_meta(data): for user in self.users: if user["name"] == data["name"]: user.setdefault("meta", {}).update(data.get("meta", {})) break self._fire("setUserMeta", data) @sio.on("setUserRank") def _user_rank(data): for user in self.users: if user["name"] == data["name"]: user["rank"] = data["rank"] break self._fire("setUserRank", data) @sio.on("changeMedia") def _change_media(data): self.now_playing = data logger.debug("now playing: %s", data.get("title")) self._fire("changeMedia", data) @sio.on("playlist") def _playlist(items): self.playlist = list(items) self._fire("playlist", items) @sio.on("queue") def _queue(data): item = data.get("item") if item: self.playlist.append(item) self._fire("queue", data) @sio.on("delete") def _delete(data): uid = data.get("uid") self.playlist = [i for i in self.playlist if i.get("uid") != uid] self._fire("delete", data) @sio.on("channelOpts") def _channel_opts(opts): self.channel_opts = opts self._fire("channelOpts", opts) # Events with no state to update — just fan out to user handlers. for _event in _PASSTHROUGH_EVENTS: def _make(ev: str): @sio.on(ev) def _handler(data=None): self._fire(ev, data) _make(_event) def _fire(self, event: str, data): for handler in self._handlers[event]: try: handler(data) except Exception: logger.exception("unhandled exception in %s handler", event) # ── Event registration ──────────────────────────────────────────────────── def on(self, event: str) -> Callable: """ Register a handler for a socket event. Can be used as a decorator:: @bot.on("chatMsg") def handle_chat(data): print(data["msg"]) Or called directly:: bot.on("chatMsg")(my_handler) Multiple handlers for the same event are all called in registration order. Exceptions in handlers are caught and logged so one bad handler doesn't kill the others. """ def decorator(fn: Callable) -> Callable: self._handlers[event].append(fn) return fn return decorator # ── Connection ──────────────────────────────────────────────────────────── def connect(self): """Open the socket.io connection. Returns immediately after connecting.""" self._sio.connect(self.socket_url, auth={"token": self.token}) def wait(self): """Block until the connection closes. Call after connect().""" self._sio.wait() def run(self): """Connect and block until disconnected. Equivalent to connect() + wait().""" self.connect() self._sio.wait() def disconnect(self): """Close the connection.""" self._sio.disconnect() # ── Chat ────────────────────────────────────────────────────────────────── def send_message(self, msg: str, to: str | None = None): """Send a chat message, optionally prefixed with 'to: '.""" if to: msg = f"{to}: {msg}" self._sio.emit("chatMsg", {"msg": msg, "meta": {}}) def send_action(self, text: str): """Send a /me action message.""" self._sio.emit("chatMsg", {"msg": f"/me {text}", "meta": {}}) def send_pm(self, to: str, msg: str): """Send a private message.""" self._sio.emit("pm", {"to": to, "msg": msg, "meta": {}}) # ── Playlist ────────────────────────────────────────────────────────────── def queue(self, id: str, type: str, pos: str = "end"): """ Queue media. type is the source: 'yt', 'sc', 'tw', 'fi', etc. pos is 'end' or 'next'. Requires rank >= MOD. """ self.api.add_to_playlist(id, type, pos) def delete_item(self, uid: int): """Remove a playlist item by uid. Requires rank >= ADMIN.""" self.api.delete_playlist_item(uid) def skip_to(self, uid: int): """Jump to a specific playlist item by uid. Requires rank >= MOD.""" self.api.skip_to(uid) def skip(self): """ Skip to the next playlist item. Requires rank >= MOD. Fetches the current playlist state via REST to find the next uid. """ data = self.api.get_playlist() items = data.get("items", []) idx = data.get("currentIndex", -1) if 0 <= idx < len(items) - 1: self.api.skip_to(items[idx + 1]["uid"]) def shuffle_playlist(self): """Shuffle the playlist. Requires rank >= ADMIN.""" self.api.shuffle_playlist() def clear_playlist(self): """Clear the entire playlist. Requires rank >= ADMIN.""" self.api.clear_playlist() # ── Emotes ──────────────────────────────────────────────────────────────── def get_emotes(self) -> list: """Return the full emote list. Works even when channel is offline.""" return self.api.get_emotes() def add_emote(self, name: str, image: str): """Add an emote. Requires rank >= OWNER.""" self.api.add_emote(name, image) def update_emote(self, name: str, image: str | None = None, new_name: str | None = None): """Update an emote's image or rename it. Requires rank >= OWNER.""" self.api.update_emote(name, image=image, new_name=new_name) def delete_emote(self, name: str): """Delete an emote. Requires rank >= OWNER.""" self.api.delete_emote(name) # ── Moderation ──────────────────────────────────────────────────────────── def kick(self, name: str, reason: str = "Kicked by bot"): """Kick a user. Requires rank >= MOD.""" self.api.kick_user(name, reason) def ban(self, name: str, reason: str = "Banned by bot"): """Ban a user. Requires rank >= ADMIN.""" self.api.ban_user(name, reason) def unban(self, name: str): """Remove a ban. Requires rank >= ADMIN.""" self.api.unban_user(name) def set_rank(self, name: str, rank: int): """Change a user's channel rank. Requires rank >= OWNER.""" self.api.set_user_rank(name, rank) # ── Settings ────────────────────────────────────────────────────────────── def get_settings(self) -> dict: """Get channel settings. Requires rank >= OWNER and channel active.""" return self.api.get_settings() def update_settings(self, **kwargs): """ Update channel settings. Pass any setting key as a keyword argument. Requires rank >= OWNER and channel active. Example:: bot.update_settings(pagetitle="Now Playing: Chill", allow_voteskip=False) """ self.api.update_settings(**kwargs) # ── Helpers ─────────────────────────────────────────────────────────────── def get_user(self, name: str) -> dict | None: """Look up a user in the current in-memory userlist by name.""" return next((u for u in self.users if u["name"] == name), None)