From 12d01f9d6d9c016d0173b4261ffbfa1e91763046 Mon Sep 17 00:00:00 2001 From: Speng Reb Date: Wed, 20 May 2026 20:09:32 +0200 Subject: [PATCH] Fix socket namespace readiness and add AsyncBot API - force explicit '/' namespace connect with readiness checks - add guarded emits with disconnect reason diagnostics - add configurable socket transports (e.g. websocket-only) - introduce AsyncBot based on socketio.AsyncClient - export AsyncBot and document async quick-start/troubleshooting - bump package version for release --- README.md | 30 ++++- pyproject.toml | 2 +- veretube_bot/__init__.py | 3 +- veretube_bot/async_bot.py | 248 ++++++++++++++++++++++++++++++++++++++ veretube_bot/bot.py | 52 ++++++-- 5 files changed, 325 insertions(+), 10 deletions(-) create mode 100644 veretube_bot/async_bot.py diff --git a/README.md b/README.md index defd2c3..eecf325 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,33 @@ def on_media(data): bot.run() # connect and block until disconnected ``` +## Async quick start + +```python +import asyncio +from veretube_bot import AsyncBot + +async def main(): + bot = AsyncBot( + token="cbt_...", + channel="mychannel", + socket_url="http://your-server:1337", + api_url="http://your-server:8080/api/v1", + transports=["websocket"], # optional + ) + + @bot.on("login") + async def on_login(data): + if data.get("success"): + await bot.send_message("hello from async bot") + + await bot.connect(timeout=10) + await asyncio.sleep(1) + await bot.disconnect() + +asyncio.run(main()) +``` + ## Connection ```python @@ -50,6 +77,7 @@ Bot( channel, # str — channel name (lowercase, no '#') socket_url, # str — socket.io server URL (uses io.port, not the HTTP port) api_url, # str — REST API base URL, e.g. http://host:8080/api/v1 + transports=None, # e.g. ["websocket"] to bypass polling/upgrade issues reconnection=False, # set True to reconnect automatically on drop reconnection_delay=3, # seconds between reconnect attempts ) @@ -58,7 +86,7 @@ Bot( | Method | Description | |--------|-------------| | `bot.run()` | Connect and block until disconnected | -| `bot.connect()` | Open the connection (returns immediately) | +| `bot.connect()` | Open the connection and wait until `/` namespace is ready | | `bot.wait()` | Block until disconnected (call after `connect()`) | | `bot.disconnect()` | Close the connection | diff --git a/pyproject.toml b/pyproject.toml index 9517da3..648feed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "veretube-bot" -version = "0.1.0" +version = "0.1.3" description = "Python bot library for veretube sync channels" readme = "README.md" license = "MIT" diff --git a/veretube_bot/__init__.py b/veretube_bot/__init__.py index 4336d86..64c7f67 100644 --- a/veretube_bot/__init__.py +++ b/veretube_bot/__init__.py @@ -1,5 +1,6 @@ from .bot import Bot +from .async_bot import AsyncBot from .exceptions import BotAPIError from .rank import Rank -__all__ = ["Bot", "BotAPIError", "Rank"] +__all__ = ["Bot", "AsyncBot", "BotAPIError", "Rank"] diff --git a/veretube_bot/async_bot.py b/veretube_bot/async_bot.py new file mode 100644 index 0000000..fc26ba4 --- /dev/null +++ b/veretube_bot/async_bot.py @@ -0,0 +1,248 @@ +import asyncio +import inspect +import logging +from collections import defaultdict +from typing import Callable + +import socketio + +from ._api import BotAPI + +logger = logging.getLogger(__name__) + +_PASSTHROUGH_EVENTS = ( + "chatMsg", + "pm", + "errorMsg", + "kick", + "announcement", + "clearchat", + "updateEmote", + "removeEmote", +) + + +class AsyncBot: + def __init__( + self, + token: str, + channel: str, + socket_url: str, + api_url: str, + transports: list[str] | None = None, + reconnection: bool = False, + reconnection_delay: int = 3, + ): + if not token.startswith("cbt_"): + raise ValueError("token must start with 'cbt_'") + + self.token = token + self.channel = channel + self.socket_url = socket_url + self.transports = transports + + self.api = BotAPI(api_url, channel, token) + self.users: list[dict] = [] + self.now_playing: dict | None = None + self.playlist: list[dict] = [] + self.channel_opts: dict = {} + self.last_disconnect_reason = None + + self._handlers: dict[str, list[Callable]] = defaultdict(list) + self._sio = socketio.AsyncClient( + reconnection=reconnection, + reconnection_delay=reconnection_delay, + logger=False, + engineio_logger=False, + ) + self._wire_sio() + + def _wire_sio(self): + sio = self._sio + + @sio.on("connect") + async def _connect(): + logger.info("connected to %s", self.socket_url) + await self._fire("connect", None) + + @sio.on("disconnect") + async def _disconnect(*args): + reason = args[0] if args else None + self.last_disconnect_reason = reason + self.users = [] + self.playlist = [] + logger.info("disconnected: %s", reason) + await self._fire("disconnect", reason) + + @sio.on("connect_error") + async def _connect_error(err): + logger.error("connection error: %s", err) + await self._fire("connect_error", err) + + @sio.on("login") + async def _login(data): + if data.get("success"): + logger.info("authenticated as %s", data.get("name")) + try: + await sio.emit("joinChannel", {"name": self.channel}, namespace="/") + except Exception: + logger.exception("failed to emit joinChannel on '/' namespace") + else: + logger.error("authentication failed: %s", data.get("error", "unknown")) + await self._fire("login", data) + + @sio.on("userlist") + async def _userlist(users): + self.users = list(users) + await self._fire("userlist", users) + + @sio.on("addUser") + async def _add_user(user): + self.users = [u for u in self.users if u["name"] != user["name"]] + self.users.append(user) + await self._fire("addUser", user) + + @sio.on("userLeave") + async def _user_leave(data): + self.users = [u for u in self.users if u["name"] != data["name"]] + await self._fire("userLeave", data) + + @sio.on("setUserMeta") + async def _user_meta(data): + for user in self.users: + if user["name"] == data["name"]: + user.setdefault("meta", {}).update(data.get("meta", {})) + break + await self._fire("setUserMeta", data) + + @sio.on("setUserRank") + async def _user_rank(data): + for user in self.users: + if user["name"] == data["name"]: + user["rank"] = data["rank"] + break + await self._fire("setUserRank", data) + + @sio.on("changeMedia") + async def _change_media(data): + self.now_playing = data + await self._fire("changeMedia", data) + + @sio.on("playlist") + async def _playlist(items): + self.playlist = list(items) + await self._fire("playlist", items) + + @sio.on("queue") + async def _queue(data): + item = data.get("item") + if item: + self.playlist.append(item) + await self._fire("queue", data) + + @sio.on("delete") + async def _delete(data): + uid = data.get("uid") + self.playlist = [i for i in self.playlist if i.get("uid") != uid] + await self._fire("delete", data) + + @sio.on("channelOpts") + async def _channel_opts(opts): + self.channel_opts = opts + await self._fire("channelOpts", opts) + + for _event in _PASSTHROUGH_EVENTS: + def _make(ev: str): + @sio.on(ev) + async def _handler(data=None): + await self._fire(ev, data) + _make(_event) + + async def _fire(self, event: str, data): + for handler in self._handlers[event]: + try: + result = handler(data) + if inspect.isawaitable(result): + await result + except Exception: + logger.exception("unhandled exception in %s handler", event) + + def on(self, event: str) -> Callable: + def decorator(fn: Callable) -> Callable: + self._handlers[event].append(fn) + return fn + return decorator + + async def _wait_for_namespace(self, namespace: str = "/", timeout: float = 10.0): + deadline = asyncio.get_running_loop().time() + timeout + while namespace not in self._sio.namespaces: + if asyncio.get_running_loop().time() >= deadline: + raise TimeoutError( + f"namespace {namespace!r} did not connect within {timeout:.1f}s" + ) + await self._sio.sleep(0.05) + + async def _emit_guarded(self, event: str, data: dict, namespace: str = "/"): + if not self._sio.connected: + raise RuntimeError( + f"socket is not connected; last_disconnect_reason={self.last_disconnect_reason!r}" + ) + if namespace not in self._sio.namespaces: + raise RuntimeError( + f"namespace {namespace!r} is not connected; call connect() and wait for readiness" + ) + await self._sio.emit(event, data, namespace=namespace) + + async def connect(self, timeout: float = 10.0): + await self._sio.connect( + self.socket_url, + auth={"token": self.token}, + transports=self.transports, + namespaces=["/"], + wait=True, + wait_timeout=timeout, + ) + await self._wait_for_namespace("/", timeout=timeout) + + async def wait(self): + await self._sio.wait() + + async def run(self, timeout: float = 10.0): + await self.connect(timeout=timeout) + await self.wait() + + async def disconnect(self): + await self._sio.disconnect() + + async def send_message(self, msg: str, to: str | None = None): + if to: + msg = f"{to}: {msg}" + await self._emit_guarded("chatMsg", {"msg": msg, "meta": {}}) + + async def send_action(self, text: str): + await self._emit_guarded("chatMsg", {"msg": f"/me {text}", "meta": {}}) + + async def send_pm(self, to: str, msg: str): + await self._emit_guarded("pm", {"to": to, "msg": msg, "meta": {}}) + + async def queue(self, id: str, type: str, pos: str = "end"): + await asyncio.to_thread(self.api.add_to_playlist, id, type, pos) + + async def delete_item(self, uid: int): + await asyncio.to_thread(self.api.delete_playlist_item, uid) + + async def skip_to(self, uid: int): + await asyncio.to_thread(self.api.skip_to, uid) + + async def skip(self): + data = await asyncio.to_thread(self.api.get_playlist) + items = data.get("items", []) + idx = data.get("currentIndex", -1) + if 0 <= idx < len(items) - 1: + await asyncio.to_thread(self.api.skip_to, items[idx + 1]["uid"]) + + async def shuffle_playlist(self): + await asyncio.to_thread(self.api.shuffle_playlist) + + async def clear_playlist(self): + await asyncio.to_thread(self.api.clear_playlist) diff --git a/veretube_bot/bot.py b/veretube_bot/bot.py index c2b3cca..2737935 100644 --- a/veretube_bot/bot.py +++ b/veretube_bot/bot.py @@ -1,4 +1,5 @@ import logging +import time from collections import defaultdict from typing import Callable @@ -53,6 +54,7 @@ class Bot: channel: str, socket_url: str, api_url: str, + transports: list[str] | None = None, reconnection: bool = False, reconnection_delay: int = 3, ): @@ -62,6 +64,7 @@ class Bot: self.token = token self.channel = channel self.socket_url = socket_url + self.transports = transports self.api = BotAPI(api_url, channel, token) @@ -70,6 +73,7 @@ class Bot: self.now_playing: dict | None = None self.playlist: list[dict] = [] self.channel_opts: dict = {} + self.last_disconnect_reason = None self._handlers: dict[str, list[Callable]] = defaultdict(list) @@ -94,6 +98,7 @@ class Bot: @sio.on("disconnect") def _disconnect(*args): reason = args[0] if args else None + self.last_disconnect_reason = reason self.users = [] self.playlist = [] logger.info("disconnected: %s", reason) @@ -108,7 +113,10 @@ class Bot: def _login(data): if data.get("success"): logger.info("authenticated as %s", data.get("name")) - sio.emit("joinChannel", {"name": self.channel}) + try: + sio.emit("joinChannel", {"name": self.channel}, namespace="/") + except Exception: + logger.exception("failed to emit joinChannel on '/' namespace") else: logger.error("authentication failed: %s", data.get("error", "unknown")) self._fire("login", data) @@ -217,9 +225,39 @@ class Bot: # ── Connection ──────────────────────────────────────────────────────────── - def connect(self): - """Open the socket.io connection. Returns immediately after connecting.""" - self._sio.connect(self.socket_url, auth={"token": self.token}) + def _wait_for_namespace(self, namespace: str = "/", timeout: float = 10.0): + """Wait until a namespace is connected or raise TimeoutError.""" + deadline = time.monotonic() + timeout + while namespace not in self._sio.namespaces: + if time.monotonic() >= deadline: + raise TimeoutError( + f"namespace {namespace!r} did not connect within {timeout:.1f}s" + ) + self._sio.sleep(0.05) + + def _emit_guarded(self, event: str, data: dict, namespace: str = "/"): + """Emit only when the underlying socket and namespace are connected.""" + if not self._sio.connected: + raise RuntimeError( + f"socket is not connected; last_disconnect_reason={self.last_disconnect_reason!r}" + ) + if namespace not in self._sio.namespaces: + raise RuntimeError( + f"namespace {namespace!r} is not connected; call connect() and wait for readiness" + ) + self._sio.emit(event, data, namespace=namespace) + + def connect(self, timeout: float = 10.0): + """Open the socket.io connection and wait until the default namespace is ready.""" + self._sio.connect( + self.socket_url, + auth={"token": self.token}, + transports=self.transports, + namespaces=["/"], + wait=True, + wait_timeout=timeout, + ) + self._wait_for_namespace("/", timeout=timeout) def wait(self): """Block until the connection closes. Call after connect().""" @@ -240,15 +278,15 @@ class Bot: """Send a chat message, optionally prefixed with 'to: '.""" if to: msg = f"{to}: {msg}" - self._sio.emit("chatMsg", {"msg": msg, "meta": {}}) + self._emit_guarded("chatMsg", {"msg": msg, "meta": {}}) def send_action(self, text: str): """Send a /me action message.""" - self._sio.emit("chatMsg", {"msg": f"/me {text}", "meta": {}}) + self._emit_guarded("chatMsg", {"msg": f"/me {text}", "meta": {}}) def send_pm(self, to: str, msg: str): """Send a private message.""" - self._sio.emit("pm", {"to": to, "msg": msg, "meta": {}}) + self._emit_guarded("pm", {"to": to, "msg": msg, "meta": {}}) # ── Playlist ──────────────────────────────────────────────────────────────