Fix socket namespace readiness and add AsyncBot API
- force explicit '/' namespace connect with readiness checks - add guarded emits with disconnect reason diagnostics - add configurable socket transports (e.g. websocket-only) - introduce AsyncBot based on socketio.AsyncClient - export AsyncBot and document async quick-start/troubleshooting - bump package version for release
This commit is contained in:
parent
f6fc9ee7a2
commit
12d01f9d6d
5 changed files with 325 additions and 10 deletions
30
README.md
30
README.md
|
|
@ -42,6 +42,33 @@ def on_media(data):
|
||||||
bot.run() # connect and block until disconnected
|
bot.run() # connect and block until disconnected
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## Async quick start
|
||||||
|
|
||||||
|
```python
|
||||||
|
import asyncio
|
||||||
|
from veretube_bot import AsyncBot
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
bot = AsyncBot(
|
||||||
|
token="cbt_...",
|
||||||
|
channel="mychannel",
|
||||||
|
socket_url="http://your-server:1337",
|
||||||
|
api_url="http://your-server:8080/api/v1",
|
||||||
|
transports=["websocket"], # optional
|
||||||
|
)
|
||||||
|
|
||||||
|
@bot.on("login")
|
||||||
|
async def on_login(data):
|
||||||
|
if data.get("success"):
|
||||||
|
await bot.send_message("hello from async bot")
|
||||||
|
|
||||||
|
await bot.connect(timeout=10)
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
await bot.disconnect()
|
||||||
|
|
||||||
|
asyncio.run(main())
|
||||||
|
```
|
||||||
|
|
||||||
## Connection
|
## Connection
|
||||||
|
|
||||||
```python
|
```python
|
||||||
|
|
@ -50,6 +77,7 @@ Bot(
|
||||||
channel, # str — channel name (lowercase, no '#')
|
channel, # str — channel name (lowercase, no '#')
|
||||||
socket_url, # str — socket.io server URL (uses io.port, not the HTTP port)
|
socket_url, # str — socket.io server URL (uses io.port, not the HTTP port)
|
||||||
api_url, # str — REST API base URL, e.g. http://host:8080/api/v1
|
api_url, # str — REST API base URL, e.g. http://host:8080/api/v1
|
||||||
|
transports=None, # e.g. ["websocket"] to bypass polling/upgrade issues
|
||||||
reconnection=False, # set True to reconnect automatically on drop
|
reconnection=False, # set True to reconnect automatically on drop
|
||||||
reconnection_delay=3, # seconds between reconnect attempts
|
reconnection_delay=3, # seconds between reconnect attempts
|
||||||
)
|
)
|
||||||
|
|
@ -58,7 +86,7 @@ Bot(
|
||||||
| Method | Description |
|
| Method | Description |
|
||||||
|--------|-------------|
|
|--------|-------------|
|
||||||
| `bot.run()` | Connect and block until disconnected |
|
| `bot.run()` | Connect and block until disconnected |
|
||||||
| `bot.connect()` | Open the connection (returns immediately) |
|
| `bot.connect()` | Open the connection and wait until `/` namespace is ready |
|
||||||
| `bot.wait()` | Block until disconnected (call after `connect()`) |
|
| `bot.wait()` | Block until disconnected (call after `connect()`) |
|
||||||
| `bot.disconnect()` | Close the connection |
|
| `bot.disconnect()` | Close the connection |
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
|
||||||
|
|
||||||
[project]
|
[project]
|
||||||
name = "veretube-bot"
|
name = "veretube-bot"
|
||||||
version = "0.1.0"
|
version = "0.1.3"
|
||||||
description = "Python bot library for veretube sync channels"
|
description = "Python bot library for veretube sync channels"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
from .bot import Bot
|
from .bot import Bot
|
||||||
|
from .async_bot import AsyncBot
|
||||||
from .exceptions import BotAPIError
|
from .exceptions import BotAPIError
|
||||||
from .rank import Rank
|
from .rank import Rank
|
||||||
|
|
||||||
__all__ = ["Bot", "BotAPIError", "Rank"]
|
__all__ = ["Bot", "AsyncBot", "BotAPIError", "Rank"]
|
||||||
|
|
|
||||||
248
veretube_bot/async_bot.py
Normal file
248
veretube_bot/async_bot.py
Normal file
|
|
@ -0,0 +1,248 @@
|
||||||
|
import asyncio
|
||||||
|
import inspect
|
||||||
|
import logging
|
||||||
|
from collections import defaultdict
|
||||||
|
from typing import Callable
|
||||||
|
|
||||||
|
import socketio
|
||||||
|
|
||||||
|
from ._api import BotAPI
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_PASSTHROUGH_EVENTS = (
|
||||||
|
"chatMsg",
|
||||||
|
"pm",
|
||||||
|
"errorMsg",
|
||||||
|
"kick",
|
||||||
|
"announcement",
|
||||||
|
"clearchat",
|
||||||
|
"updateEmote",
|
||||||
|
"removeEmote",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncBot:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
token: str,
|
||||||
|
channel: str,
|
||||||
|
socket_url: str,
|
||||||
|
api_url: str,
|
||||||
|
transports: list[str] | None = None,
|
||||||
|
reconnection: bool = False,
|
||||||
|
reconnection_delay: int = 3,
|
||||||
|
):
|
||||||
|
if not token.startswith("cbt_"):
|
||||||
|
raise ValueError("token must start with 'cbt_'")
|
||||||
|
|
||||||
|
self.token = token
|
||||||
|
self.channel = channel
|
||||||
|
self.socket_url = socket_url
|
||||||
|
self.transports = transports
|
||||||
|
|
||||||
|
self.api = BotAPI(api_url, channel, token)
|
||||||
|
self.users: list[dict] = []
|
||||||
|
self.now_playing: dict | None = None
|
||||||
|
self.playlist: list[dict] = []
|
||||||
|
self.channel_opts: dict = {}
|
||||||
|
self.last_disconnect_reason = None
|
||||||
|
|
||||||
|
self._handlers: dict[str, list[Callable]] = defaultdict(list)
|
||||||
|
self._sio = socketio.AsyncClient(
|
||||||
|
reconnection=reconnection,
|
||||||
|
reconnection_delay=reconnection_delay,
|
||||||
|
logger=False,
|
||||||
|
engineio_logger=False,
|
||||||
|
)
|
||||||
|
self._wire_sio()
|
||||||
|
|
||||||
|
def _wire_sio(self):
|
||||||
|
sio = self._sio
|
||||||
|
|
||||||
|
@sio.on("connect")
|
||||||
|
async def _connect():
|
||||||
|
logger.info("connected to %s", self.socket_url)
|
||||||
|
await self._fire("connect", None)
|
||||||
|
|
||||||
|
@sio.on("disconnect")
|
||||||
|
async def _disconnect(*args):
|
||||||
|
reason = args[0] if args else None
|
||||||
|
self.last_disconnect_reason = reason
|
||||||
|
self.users = []
|
||||||
|
self.playlist = []
|
||||||
|
logger.info("disconnected: %s", reason)
|
||||||
|
await self._fire("disconnect", reason)
|
||||||
|
|
||||||
|
@sio.on("connect_error")
|
||||||
|
async def _connect_error(err):
|
||||||
|
logger.error("connection error: %s", err)
|
||||||
|
await self._fire("connect_error", err)
|
||||||
|
|
||||||
|
@sio.on("login")
|
||||||
|
async def _login(data):
|
||||||
|
if data.get("success"):
|
||||||
|
logger.info("authenticated as %s", data.get("name"))
|
||||||
|
try:
|
||||||
|
await sio.emit("joinChannel", {"name": self.channel}, namespace="/")
|
||||||
|
except Exception:
|
||||||
|
logger.exception("failed to emit joinChannel on '/' namespace")
|
||||||
|
else:
|
||||||
|
logger.error("authentication failed: %s", data.get("error", "unknown"))
|
||||||
|
await self._fire("login", data)
|
||||||
|
|
||||||
|
@sio.on("userlist")
|
||||||
|
async def _userlist(users):
|
||||||
|
self.users = list(users)
|
||||||
|
await self._fire("userlist", users)
|
||||||
|
|
||||||
|
@sio.on("addUser")
|
||||||
|
async def _add_user(user):
|
||||||
|
self.users = [u for u in self.users if u["name"] != user["name"]]
|
||||||
|
self.users.append(user)
|
||||||
|
await self._fire("addUser", user)
|
||||||
|
|
||||||
|
@sio.on("userLeave")
|
||||||
|
async def _user_leave(data):
|
||||||
|
self.users = [u for u in self.users if u["name"] != data["name"]]
|
||||||
|
await self._fire("userLeave", data)
|
||||||
|
|
||||||
|
@sio.on("setUserMeta")
|
||||||
|
async def _user_meta(data):
|
||||||
|
for user in self.users:
|
||||||
|
if user["name"] == data["name"]:
|
||||||
|
user.setdefault("meta", {}).update(data.get("meta", {}))
|
||||||
|
break
|
||||||
|
await self._fire("setUserMeta", data)
|
||||||
|
|
||||||
|
@sio.on("setUserRank")
|
||||||
|
async def _user_rank(data):
|
||||||
|
for user in self.users:
|
||||||
|
if user["name"] == data["name"]:
|
||||||
|
user["rank"] = data["rank"]
|
||||||
|
break
|
||||||
|
await self._fire("setUserRank", data)
|
||||||
|
|
||||||
|
@sio.on("changeMedia")
|
||||||
|
async def _change_media(data):
|
||||||
|
self.now_playing = data
|
||||||
|
await self._fire("changeMedia", data)
|
||||||
|
|
||||||
|
@sio.on("playlist")
|
||||||
|
async def _playlist(items):
|
||||||
|
self.playlist = list(items)
|
||||||
|
await self._fire("playlist", items)
|
||||||
|
|
||||||
|
@sio.on("queue")
|
||||||
|
async def _queue(data):
|
||||||
|
item = data.get("item")
|
||||||
|
if item:
|
||||||
|
self.playlist.append(item)
|
||||||
|
await self._fire("queue", data)
|
||||||
|
|
||||||
|
@sio.on("delete")
|
||||||
|
async def _delete(data):
|
||||||
|
uid = data.get("uid")
|
||||||
|
self.playlist = [i for i in self.playlist if i.get("uid") != uid]
|
||||||
|
await self._fire("delete", data)
|
||||||
|
|
||||||
|
@sio.on("channelOpts")
|
||||||
|
async def _channel_opts(opts):
|
||||||
|
self.channel_opts = opts
|
||||||
|
await self._fire("channelOpts", opts)
|
||||||
|
|
||||||
|
for _event in _PASSTHROUGH_EVENTS:
|
||||||
|
def _make(ev: str):
|
||||||
|
@sio.on(ev)
|
||||||
|
async def _handler(data=None):
|
||||||
|
await self._fire(ev, data)
|
||||||
|
_make(_event)
|
||||||
|
|
||||||
|
async def _fire(self, event: str, data):
|
||||||
|
for handler in self._handlers[event]:
|
||||||
|
try:
|
||||||
|
result = handler(data)
|
||||||
|
if inspect.isawaitable(result):
|
||||||
|
await result
|
||||||
|
except Exception:
|
||||||
|
logger.exception("unhandled exception in %s handler", event)
|
||||||
|
|
||||||
|
def on(self, event: str) -> Callable:
|
||||||
|
def decorator(fn: Callable) -> Callable:
|
||||||
|
self._handlers[event].append(fn)
|
||||||
|
return fn
|
||||||
|
return decorator
|
||||||
|
|
||||||
|
async def _wait_for_namespace(self, namespace: str = "/", timeout: float = 10.0):
|
||||||
|
deadline = asyncio.get_running_loop().time() + timeout
|
||||||
|
while namespace not in self._sio.namespaces:
|
||||||
|
if asyncio.get_running_loop().time() >= deadline:
|
||||||
|
raise TimeoutError(
|
||||||
|
f"namespace {namespace!r} did not connect within {timeout:.1f}s"
|
||||||
|
)
|
||||||
|
await self._sio.sleep(0.05)
|
||||||
|
|
||||||
|
async def _emit_guarded(self, event: str, data: dict, namespace: str = "/"):
|
||||||
|
if not self._sio.connected:
|
||||||
|
raise RuntimeError(
|
||||||
|
f"socket is not connected; last_disconnect_reason={self.last_disconnect_reason!r}"
|
||||||
|
)
|
||||||
|
if namespace not in self._sio.namespaces:
|
||||||
|
raise RuntimeError(
|
||||||
|
f"namespace {namespace!r} is not connected; call connect() and wait for readiness"
|
||||||
|
)
|
||||||
|
await self._sio.emit(event, data, namespace=namespace)
|
||||||
|
|
||||||
|
async def connect(self, timeout: float = 10.0):
|
||||||
|
await self._sio.connect(
|
||||||
|
self.socket_url,
|
||||||
|
auth={"token": self.token},
|
||||||
|
transports=self.transports,
|
||||||
|
namespaces=["/"],
|
||||||
|
wait=True,
|
||||||
|
wait_timeout=timeout,
|
||||||
|
)
|
||||||
|
await self._wait_for_namespace("/", timeout=timeout)
|
||||||
|
|
||||||
|
async def wait(self):
|
||||||
|
await self._sio.wait()
|
||||||
|
|
||||||
|
async def run(self, timeout: float = 10.0):
|
||||||
|
await self.connect(timeout=timeout)
|
||||||
|
await self.wait()
|
||||||
|
|
||||||
|
async def disconnect(self):
|
||||||
|
await self._sio.disconnect()
|
||||||
|
|
||||||
|
async def send_message(self, msg: str, to: str | None = None):
|
||||||
|
if to:
|
||||||
|
msg = f"{to}: {msg}"
|
||||||
|
await self._emit_guarded("chatMsg", {"msg": msg, "meta": {}})
|
||||||
|
|
||||||
|
async def send_action(self, text: str):
|
||||||
|
await self._emit_guarded("chatMsg", {"msg": f"/me {text}", "meta": {}})
|
||||||
|
|
||||||
|
async def send_pm(self, to: str, msg: str):
|
||||||
|
await self._emit_guarded("pm", {"to": to, "msg": msg, "meta": {}})
|
||||||
|
|
||||||
|
async def queue(self, id: str, type: str, pos: str = "end"):
|
||||||
|
await asyncio.to_thread(self.api.add_to_playlist, id, type, pos)
|
||||||
|
|
||||||
|
async def delete_item(self, uid: int):
|
||||||
|
await asyncio.to_thread(self.api.delete_playlist_item, uid)
|
||||||
|
|
||||||
|
async def skip_to(self, uid: int):
|
||||||
|
await asyncio.to_thread(self.api.skip_to, uid)
|
||||||
|
|
||||||
|
async def skip(self):
|
||||||
|
data = await asyncio.to_thread(self.api.get_playlist)
|
||||||
|
items = data.get("items", [])
|
||||||
|
idx = data.get("currentIndex", -1)
|
||||||
|
if 0 <= idx < len(items) - 1:
|
||||||
|
await asyncio.to_thread(self.api.skip_to, items[idx + 1]["uid"])
|
||||||
|
|
||||||
|
async def shuffle_playlist(self):
|
||||||
|
await asyncio.to_thread(self.api.shuffle_playlist)
|
||||||
|
|
||||||
|
async def clear_playlist(self):
|
||||||
|
await asyncio.to_thread(self.api.clear_playlist)
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
import logging
|
import logging
|
||||||
|
import time
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from typing import Callable
|
from typing import Callable
|
||||||
|
|
||||||
|
|
@ -53,6 +54,7 @@ class Bot:
|
||||||
channel: str,
|
channel: str,
|
||||||
socket_url: str,
|
socket_url: str,
|
||||||
api_url: str,
|
api_url: str,
|
||||||
|
transports: list[str] | None = None,
|
||||||
reconnection: bool = False,
|
reconnection: bool = False,
|
||||||
reconnection_delay: int = 3,
|
reconnection_delay: int = 3,
|
||||||
):
|
):
|
||||||
|
|
@ -62,6 +64,7 @@ class Bot:
|
||||||
self.token = token
|
self.token = token
|
||||||
self.channel = channel
|
self.channel = channel
|
||||||
self.socket_url = socket_url
|
self.socket_url = socket_url
|
||||||
|
self.transports = transports
|
||||||
|
|
||||||
self.api = BotAPI(api_url, channel, token)
|
self.api = BotAPI(api_url, channel, token)
|
||||||
|
|
||||||
|
|
@ -70,6 +73,7 @@ class Bot:
|
||||||
self.now_playing: dict | None = None
|
self.now_playing: dict | None = None
|
||||||
self.playlist: list[dict] = []
|
self.playlist: list[dict] = []
|
||||||
self.channel_opts: dict = {}
|
self.channel_opts: dict = {}
|
||||||
|
self.last_disconnect_reason = None
|
||||||
|
|
||||||
self._handlers: dict[str, list[Callable]] = defaultdict(list)
|
self._handlers: dict[str, list[Callable]] = defaultdict(list)
|
||||||
|
|
||||||
|
|
@ -94,6 +98,7 @@ class Bot:
|
||||||
@sio.on("disconnect")
|
@sio.on("disconnect")
|
||||||
def _disconnect(*args):
|
def _disconnect(*args):
|
||||||
reason = args[0] if args else None
|
reason = args[0] if args else None
|
||||||
|
self.last_disconnect_reason = reason
|
||||||
self.users = []
|
self.users = []
|
||||||
self.playlist = []
|
self.playlist = []
|
||||||
logger.info("disconnected: %s", reason)
|
logger.info("disconnected: %s", reason)
|
||||||
|
|
@ -108,7 +113,10 @@ class Bot:
|
||||||
def _login(data):
|
def _login(data):
|
||||||
if data.get("success"):
|
if data.get("success"):
|
||||||
logger.info("authenticated as %s", data.get("name"))
|
logger.info("authenticated as %s", data.get("name"))
|
||||||
sio.emit("joinChannel", {"name": self.channel})
|
try:
|
||||||
|
sio.emit("joinChannel", {"name": self.channel}, namespace="/")
|
||||||
|
except Exception:
|
||||||
|
logger.exception("failed to emit joinChannel on '/' namespace")
|
||||||
else:
|
else:
|
||||||
logger.error("authentication failed: %s", data.get("error", "unknown"))
|
logger.error("authentication failed: %s", data.get("error", "unknown"))
|
||||||
self._fire("login", data)
|
self._fire("login", data)
|
||||||
|
|
@ -217,9 +225,39 @@ class Bot:
|
||||||
|
|
||||||
# ── Connection ────────────────────────────────────────────────────────────
|
# ── Connection ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
def connect(self):
|
def _wait_for_namespace(self, namespace: str = "/", timeout: float = 10.0):
|
||||||
"""Open the socket.io connection. Returns immediately after connecting."""
|
"""Wait until a namespace is connected or raise TimeoutError."""
|
||||||
self._sio.connect(self.socket_url, auth={"token": self.token})
|
deadline = time.monotonic() + timeout
|
||||||
|
while namespace not in self._sio.namespaces:
|
||||||
|
if time.monotonic() >= deadline:
|
||||||
|
raise TimeoutError(
|
||||||
|
f"namespace {namespace!r} did not connect within {timeout:.1f}s"
|
||||||
|
)
|
||||||
|
self._sio.sleep(0.05)
|
||||||
|
|
||||||
|
def _emit_guarded(self, event: str, data: dict, namespace: str = "/"):
|
||||||
|
"""Emit only when the underlying socket and namespace are connected."""
|
||||||
|
if not self._sio.connected:
|
||||||
|
raise RuntimeError(
|
||||||
|
f"socket is not connected; last_disconnect_reason={self.last_disconnect_reason!r}"
|
||||||
|
)
|
||||||
|
if namespace not in self._sio.namespaces:
|
||||||
|
raise RuntimeError(
|
||||||
|
f"namespace {namespace!r} is not connected; call connect() and wait for readiness"
|
||||||
|
)
|
||||||
|
self._sio.emit(event, data, namespace=namespace)
|
||||||
|
|
||||||
|
def connect(self, timeout: float = 10.0):
|
||||||
|
"""Open the socket.io connection and wait until the default namespace is ready."""
|
||||||
|
self._sio.connect(
|
||||||
|
self.socket_url,
|
||||||
|
auth={"token": self.token},
|
||||||
|
transports=self.transports,
|
||||||
|
namespaces=["/"],
|
||||||
|
wait=True,
|
||||||
|
wait_timeout=timeout,
|
||||||
|
)
|
||||||
|
self._wait_for_namespace("/", timeout=timeout)
|
||||||
|
|
||||||
def wait(self):
|
def wait(self):
|
||||||
"""Block until the connection closes. Call after connect()."""
|
"""Block until the connection closes. Call after connect()."""
|
||||||
|
|
@ -240,15 +278,15 @@ class Bot:
|
||||||
"""Send a chat message, optionally prefixed with 'to: '."""
|
"""Send a chat message, optionally prefixed with 'to: '."""
|
||||||
if to:
|
if to:
|
||||||
msg = f"{to}: {msg}"
|
msg = f"{to}: {msg}"
|
||||||
self._sio.emit("chatMsg", {"msg": msg, "meta": {}})
|
self._emit_guarded("chatMsg", {"msg": msg, "meta": {}})
|
||||||
|
|
||||||
def send_action(self, text: str):
|
def send_action(self, text: str):
|
||||||
"""Send a /me action message."""
|
"""Send a /me action message."""
|
||||||
self._sio.emit("chatMsg", {"msg": f"/me {text}", "meta": {}})
|
self._emit_guarded("chatMsg", {"msg": f"/me {text}", "meta": {}})
|
||||||
|
|
||||||
def send_pm(self, to: str, msg: str):
|
def send_pm(self, to: str, msg: str):
|
||||||
"""Send a private message."""
|
"""Send a private message."""
|
||||||
self._sio.emit("pm", {"to": to, "msg": msg, "meta": {}})
|
self._emit_guarded("pm", {"to": to, "msg": msg, "meta": {}})
|
||||||
|
|
||||||
# ── Playlist ──────────────────────────────────────────────────────────────
|
# ── Playlist ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue