From b1943c2f4bf5cf50026c07c878b52f47b97f07ce Mon Sep 17 00:00:00 2001 From: scootz Date: Fri, 1 May 2026 14:18:16 +0100 Subject: [PATCH] Release v0.7.5 portal-only refactor --- .gitignore | 1 - Makefile | 30 +- README.md | 112 +- kwin_capture_screen.cpp | 334 ----- org.codex.reforger-kwin-capture.desktop | 9 - pyproject.toml | 4 +- reforger_queue/__init__.py | 1 + reforger_queue/cli.py | 234 ++++ reforger_queue/config.py | 19 + reforger_queue/magick.py | 83 ++ reforger_queue/ocr.py | 294 ++++ reforger_queue/portal.py | 75 + reforger_queue_read.py | 1671 +---------------------- uv.lock | 2 +- 14 files changed, 751 insertions(+), 2118 deletions(-) delete mode 100644 kwin_capture_screen.cpp delete mode 100644 org.codex.reforger-kwin-capture.desktop create mode 100644 reforger_queue/__init__.py create mode 100644 reforger_queue/cli.py create mode 100644 reforger_queue/config.py create mode 100644 reforger_queue/magick.py create mode 100644 reforger_queue/ocr.py create mode 100644 reforger_queue/portal.py diff --git a/.gitignore b/.gitignore index 20a8215..8103af1 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,5 @@ wheels/ # Virtual environments .venv -kwin_capture_screen portal_capture_frame *.moc diff --git a/Makefile b/Makefile index bd78e70..596e41b 100644 --- a/Makefile +++ b/Makefile @@ -1,26 +1,13 @@ CXX ?= g++ PKG_CONFIG ?= pkg-config -PREFIX ?= /usr/local -BIN_DIR ?= $(PREFIX)/bin -APPLICATIONS_DIR ?= $(PREFIX)/share/applications -SYSTEM_HELPER := $(BIN_DIR)/reforger-kwin-capture -DESKTOP_FILE := org.codex.reforger-kwin-capture.desktop - -QT_CFLAGS := $(shell $(PKG_CONFIG) --cflags Qt6Core Qt6DBus) -QT_LIBS := $(shell $(PKG_CONFIG) --libs Qt6Core Qt6DBus) -PORTAL_CFLAGS := $(shell $(PKG_CONFIG) --cflags Qt6Core Qt6DBus Qt6Gui gstreamer-1.0 gstreamer-app-1.0 gstreamer-video-1.0) -PORTAL_LIBS := $(shell $(PKG_CONFIG) --libs Qt6Core Qt6DBus Qt6Gui gstreamer-1.0 gstreamer-app-1.0 gstreamer-video-1.0) MOC ?= /usr/lib/qt6/moc -.PHONY: all clean install-kwin-auth uninstall-kwin-auth +PORTAL_CFLAGS := $(shell $(PKG_CONFIG) --cflags Qt6Core Qt6DBus Qt6Gui gstreamer-1.0 gstreamer-app-1.0 gstreamer-video-1.0) +PORTAL_LIBS := $(shell $(PKG_CONFIG) --libs Qt6Core Qt6DBus Qt6Gui gstreamer-1.0 gstreamer-app-1.0 gstreamer-video-1.0) -all: kwin_capture_screen portal_capture_frame +.PHONY: all clean -kwin_capture_screen: kwin_capture_screen.cpp kwin_capture_screen.moc - $(CXX) -std=c++17 -O2 -Wall -Wextra -fPIC $(QT_CFLAGS) $< -o $@ $(QT_LIBS) - -kwin_capture_screen.moc: kwin_capture_screen.cpp - $(MOC) $< -o $@ +all: portal_capture_frame portal_capture_frame: portal_capture_frame.cpp portal_capture_frame.moc $(CXX) -std=c++17 -O2 -Wall -Wextra -fPIC $(PORTAL_CFLAGS) $< -o $@ $(PORTAL_LIBS) @@ -28,12 +15,5 @@ portal_capture_frame: portal_capture_frame.cpp portal_capture_frame.moc portal_capture_frame.moc: portal_capture_frame.cpp $(MOC) $< -o $@ -install-kwin-auth: kwin_capture_screen $(DESKTOP_FILE) - install -Dm755 kwin_capture_screen "$(DESTDIR)$(SYSTEM_HELPER)" - install -Dm644 $(DESKTOP_FILE) "$(DESTDIR)$(APPLICATIONS_DIR)/$(DESKTOP_FILE)" - -uninstall-kwin-auth: - rm -f "$(DESTDIR)$(SYSTEM_HELPER)" "$(DESTDIR)$(APPLICATIONS_DIR)/$(DESKTOP_FILE)" - clean: - rm -f kwin_capture_screen kwin_capture_screen.moc portal_capture_frame portal_capture_frame.moc + rm -f portal_capture_frame portal_capture_frame.moc diff --git a/README.md b/README.md index ed70652..ea5d8e0 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ -# Anti Prestige Tool +# Anti Prestige Tool v0.7.5 -Queue-position reader for Arma Reforger on Linux. +Queue-position reader for Arma Reforger on Linux Wayland. -The tool captures the Reforger queue screen, crops the queue-number region, and reads the orange UI digits with bundled real-game digit templates. It does not use Tesseract, OpenCV, or Pillow. +The tool captures a user-selected Reforger window through xdg-desktop-portal/PipeWire, crops the queue-number region, and reads the orange UI digits with bundled real-game digit templates. Normal use does not require external OCR libraries, desktop-specific screenshot hooks, or a specific installed font. ## Install @@ -13,7 +13,7 @@ Install system dependencies: sudo pacman -S --needed base-devel uv qt6-base gstreamer gst-plugins-base gst-plugin-pipewire imagemagick xdg-desktop-portal ``` -You also need a portal backend for your desktop: +Install the portal backend for your desktop: ```text KDE: xdg-desktop-portal-kde @@ -21,17 +21,18 @@ GNOME: xdg-desktop-portal-gnome wlr: xdg-desktop-portal-wlr ``` -Build the local helpers: +Set up the Python environment and build the portal helper: ```bash +uv sync make ``` -No sudo install is required for the default Portal/PipeWire capture path. No specific font package is required for normal use. +No sudo install step is required. ## Use -Preferred command: +Run: ```bash uv run reforger_queue_read.py --portal-window --show-crop --debug @@ -43,13 +44,13 @@ On first run, your desktop should show a window-sharing picker. Select the Arma ~/.local/state/anti-prestige-tool/portal-window-restore-token ``` -Later runs reuse that token: +Later runs try to reuse that token: ```bash uv run reforger_queue_read.py --portal-window --show-crop --debug ``` -If the saved window selection is wrong or stale: +Force a fresh picker if the saved selection is wrong or stale: ```bash uv run reforger_queue_read.py --portal-window --portal-reselect --show-crop --debug @@ -61,85 +62,27 @@ Save the captured input image while debugging: uv run reforger_queue_read.py --portal-window --save-input /tmp/apt-portal-window.png --show-crop --debug ``` -The default matcher uses `templates/reforger_digits.json`. Font mode is only for debugging: +## Validation -```bash -uv run reforger_queue_read.py --portal-window --font Roboto-Condensed --show-crop --debug -``` - -## Optional KDE/KWin Mode - -KWin mode can automatically find the Arma window by KWin metadata and can rebind after the game restarts. It is KDE-specific and requires installing an authorized helper. - -Install the helper: - -```bash -sudo make install-kwin-auth -kbuildsycoca6 -``` - -Run: - -```bash -uv run reforger_queue_read.py --kwin-window auto --show-crop --debug -``` - -If auto matching is ambiguous, list windows and pass the PID or internal id: - -```bash -uv run reforger_queue_read.py --list-kwin-windows -uv run reforger_queue_read.py --kwin-window 53561 --show-crop --debug -uv run reforger_queue_read.py --kwin-window-id '{77a38955-a827-479c-971b-af8de226ac7b}' --show-crop --debug -``` - -The auth install places: - -```text -/usr/local/bin/reforger-kwin-capture -/usr/local/share/applications/org.codex.reforger-kwin-capture.desktop -``` - -The desktop file must be in a root-owned application directory because KWin does not trust user-writable desktop entries for restricted screenshot interfaces. - -## Other Inputs - -Read an existing screenshot: - -```bash -uv run reforger_queue_read.py --image /path/to/screenshot.png --show-crop --debug -``` - -Use Spectacle/KDE screen capture: - -```bash -uv run reforger_queue_read.py --capture --screen DP-3 --show-crop --debug -``` - -Use an OBS scene screenshot: - -```bash -uv run reforger_queue_read.py --obs-scene APT --show-crop --debug -``` - -Validate the regression set: +Run the regression set: ```bash uv run reforger_queue_read.py --dataset datasets/regression-test-set --expect-filenames ``` -Expected current result: +Expected result: ```text summary: 21 passed, 0 failed ``` -Validate the smaller included dataset: +Run the smaller smoke dataset: ```bash uv run reforger_queue_read.py --dataset datasets/scootz-dataset --show-crop ``` -Expected current output: +Expected output: ```text datasets/scootz-dataset/1920x1080.png crop=788,465,75,45 @@ -148,7 +91,26 @@ datasets/scootz-dataset/2160x1440.png crop=1050,620,100,60 datasets/scootz-dataset/2160x1440.png 24 ``` -## Notes +## Debug Options + +Read an existing screenshot: + +```bash +uv run reforger_queue_read.py --image /path/to/screenshot.png --show-crop --debug +``` + +The default matcher uses bundled templates from: + +```text +templates/reforger_digits.json +``` + +Font mode is only for debugging. Known-good overrides: + +```bash +uv run reforger_queue_read.py --portal-window --font Roboto-Condensed --show-crop --debug +uv run reforger_queue_read.py --portal-window --font Adwaita-Sans --show-crop --debug +``` Default crop reference: @@ -158,6 +120,4 @@ reference-crop: 1050,620,100,60 scale-mode: width ``` -Portal/PipeWire is easiest to install, but restore behavior depends on the desktop portal. If the selected window disappears after an Arma restart, the portal may show the picker again. - -KWin mode is the best KDE automation path, but it has the extra authorization install step. +Portal restore behavior depends on the desktop portal. If the selected window disappears after an Arma restart, the portal may show the picker again. diff --git a/kwin_capture_screen.cpp b/kwin_capture_screen.cpp deleted file mode 100644 index b2b3f3d..0000000 --- a/kwin_capture_screen.cpp +++ /dev/null @@ -1,334 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -class WindowReceiver : public QObject -{ - Q_OBJECT - Q_CLASSINFO("D-Bus Interface", "org.codex.ReforgerQueueKWinProbe") - -public: - QString json; - QString error; - bool done = false; - -public slots: - void WindowsJson(const QString &payload) - { - json = payload; - done = true; - QCoreApplication::quit(); - } - - void Error(const QString &message) - { - error = message; - done = true; - QCoreApplication::quit(); - } -}; - -QDBusConnection sessionBus() -{ - QDBusConnection bus = QDBusConnection::sessionBus(); - if (bus.isConnected()) { - return bus; - } - - QString runtimeDir = qEnvironmentVariable("XDG_RUNTIME_DIR"); - if (runtimeDir.isEmpty()) { - runtimeDir = QStringLiteral("/run/user/%1").arg(getuid()); - } - - return QDBusConnection::connectToBus( - QStringLiteral("unix:path=%1/bus").arg(runtimeDir), - QStringLiteral("reforger-queue-session")); -} - -QString jsString(const QString &value) -{ - QJsonArray array; - array.append(value); - QByteArray encoded = QJsonDocument(array).toJson(QJsonDocument::Compact); - encoded.chop(1); - encoded.remove(0, 1); - return QString::fromUtf8(encoded); -} - -int capture(const QString &method, const QString &target, const QString &outputPath, QTextStream &out, QTextStream &err) -{ - QFile file(outputPath); - if (!file.open(QIODevice::WriteOnly | QIODevice::Truncate)) { - err << "failed to open output file: " << file.errorString() << "\n"; - return 1; - } - - QDBusConnection bus = sessionBus(); - if (!bus.isConnected()) { - err << "failed to connect to the session D-Bus: " << bus.lastError().message() << "\n"; - return 1; - } - - QDBusInterface iface( - QStringLiteral("org.kde.KWin"), - QStringLiteral("/org/kde/KWin/ScreenShot2"), - QStringLiteral("org.kde.KWin.ScreenShot2"), - bus); - - if (!iface.isValid()) { - err << "failed to connect to KWin ScreenShot2: " - << iface.lastError().message() << "\n"; - return 1; - } - - QVariantMap options; - QDBusUnixFileDescriptor fd(file.handle()); - QDBusReply reply = iface.call(method, target, options, QVariant::fromValue(fd)); - - file.close(); - - if (!reply.isValid()) { - err << method << " failed for " << target << ": " - << reply.error().name() << ": " << reply.error().message() << "\n"; - return 1; - } - - QJsonObject resultJson; - const QVariantMap resultMap = reply.value(); - for (auto it = resultMap.constBegin(); it != resultMap.constEnd(); ++it) { - resultJson.insert(it.key(), QJsonValue::fromVariant(it.value())); - } - out << QJsonDocument(resultJson).toJson(QJsonDocument::Compact) << "\n"; - - return 0; -} - -QString windowProbeScript(const QString &service, const QString &path, const QString &interface) -{ - return QStringLiteral(R"JS( -function safeString(value) { - if (value === undefined || value === null) { - return ""; - } - return String(value); -} - -function safeNumber(value) { - var number = Number(value); - return isNaN(number) ? 0 : number; -} - -function safeBool(value) { - return Boolean(value); -} - -function prop(object, name, fallback) { - try { - if (object[name] === undefined || object[name] === null) { - return fallback; - } - return object[name]; - } catch (error) { - return fallback; - } -} - -function rect(value) { - if (!value) { - return {"x": 0, "y": 0, "width": 0, "height": 0}; - } - return { - "x": Math.round(safeNumber(prop(value, "x", 0))), - "y": Math.round(safeNumber(prop(value, "y", 0))), - "width": Math.round(safeNumber(prop(value, "width", 0))), - "height": Math.round(safeNumber(prop(value, "height", 0))) - }; -} - -function outputName(window) { - try { - if (window.output && window.output.name) { - return String(window.output.name); - } - } catch (error) { - } - return ""; -} - -function windowInfo(window) { - var geometry = rect(prop(window, "frameGeometry", prop(window, "bufferGeometry", null))); - return { - "internalId": safeString(prop(window, "internalId", "")), - "pid": safeNumber(prop(window, "pid", 0)), - "caption": safeString(prop(window, "caption", "")), - "resourceClass": safeString(prop(window, "resourceClass", "")), - "resourceName": safeString(prop(window, "resourceName", "")), - "windowRole": safeString(prop(window, "windowRole", "")), - "desktopFileName": safeString(prop(window, "desktopFileName", "")), - "output": outputName(window), - "frameGeometry": geometry, - "normalWindow": safeBool(prop(window, "normalWindow", false)), - "fullScreen": safeBool(prop(window, "fullScreen", false)), - "minimized": safeBool(prop(window, "minimized", false)), - "hidden": safeBool(prop(window, "hidden", false)), - "keepAbove": safeBool(prop(window, "keepAbove", false)), - "skipTaskbar": safeBool(prop(window, "skipTaskbar", false)) - }; -} - -try { - var windows = workspace.windowList().map(windowInfo); - callDBus(%1, %2, %3, "WindowsJson", JSON.stringify(windows)); -} catch (error) { - callDBus(%1, %2, %3, "Error", safeString(error && error.stack ? error.stack : error)); -} -)JS") - .arg(jsString(service), jsString(path), jsString(interface)); -} - -int listWindows(QCoreApplication &app, QTextStream &out, QTextStream &err) -{ - QDBusConnection bus = sessionBus(); - if (!bus.isConnected()) { - err << "failed to connect to the session D-Bus: " << bus.lastError().message() << "\n"; - return 1; - } - - const QString service = QStringLiteral("org.codex.ReforgerQueueKWinProbe%1").arg(QCoreApplication::applicationPid()); - const QString path = QStringLiteral("/Probe"); - const QString interface = QStringLiteral("org.codex.ReforgerQueueKWinProbe"); - - WindowReceiver receiver; - if (!bus.registerService(service)) { - err << "failed to register D-Bus service " << service << ": " << bus.lastError().message() << "\n"; - return 1; - } - if (!bus.registerObject(path, &receiver, QDBusConnection::ExportAllSlots)) { - err << "failed to register D-Bus object " << path << ": " << bus.lastError().message() << "\n"; - bus.unregisterService(service); - return 1; - } - - QTemporaryFile scriptFile; - scriptFile.setFileTemplate(QDir::tempPath() + QStringLiteral("/reforger-kwin-window-probe-XXXXXX.js")); - if (!scriptFile.open()) { - err << "failed to create temporary KWin script: " << scriptFile.errorString() << "\n"; - bus.unregisterObject(path); - bus.unregisterService(service); - return 1; - } - scriptFile.write(windowProbeScript(service, path, interface).toUtf8()); - scriptFile.close(); - - const QString pluginName = QStringLiteral("reforger-queue-window-probe-%1").arg(QCoreApplication::applicationPid()); - QDBusInterface scripting( - QStringLiteral("org.kde.KWin"), - QStringLiteral("/Scripting"), - QStringLiteral("org.kde.kwin.Scripting"), - bus); - - if (!scripting.isValid()) { - err << "failed to connect to KWin scripting: " << scripting.lastError().message() << "\n"; - bus.unregisterObject(path); - bus.unregisterService(service); - return 1; - } - - QDBusReply loadReply = scripting.call(QStringLiteral("loadScript"), scriptFile.fileName(), pluginName); - if (!loadReply.isValid() || loadReply.value() < 0) { - err << "failed to load KWin window probe script: " - << loadReply.error().name() << ": " << loadReply.error().message() << "\n"; - bus.unregisterObject(path); - bus.unregisterService(service); - return 1; - } - - const int scriptId = loadReply.value(); - QDBusInterface script( - QStringLiteral("org.kde.KWin"), - QStringLiteral("/Scripting/Script%1").arg(scriptId), - QStringLiteral("org.kde.kwin.Script"), - bus); - QDBusMessage runReply = script.call(QStringLiteral("run")); - if (runReply.type() == QDBusMessage::ErrorMessage) { - err << "failed to run KWin window probe script: " - << runReply.errorName() << ": " << runReply.errorMessage() << "\n"; - scripting.call(QStringLiteral("unloadScript"), pluginName); - bus.unregisterObject(path); - bus.unregisterService(service); - return 1; - } - - QTimer::singleShot(5000, &app, [&receiver]() { - if (!receiver.done) { - receiver.error = QStringLiteral("timed out waiting for KWin window probe results"); - receiver.done = true; - QCoreApplication::quit(); - } - }); - app.exec(); - - scripting.call(QStringLiteral("unloadScript"), pluginName); - bus.unregisterObject(path); - bus.unregisterService(service); - - if (!receiver.error.isEmpty()) { - err << receiver.error << "\n"; - return 1; - } - if (receiver.json.isEmpty()) { - err << "KWin window probe returned no data\n"; - return 1; - } - - out << receiver.json << "\n"; - return 0; -} - -int main(int argc, char *argv[]) -{ - QCoreApplication app(argc, argv); - QTextStream out(stdout); - QTextStream err(stderr); - - const QStringList args = QCoreApplication::arguments(); - - if (args.size() == 3 && !args.at(1).startsWith(QLatin1String("--"))) { - return capture(QStringLiteral("CaptureScreen"), args.at(1), args.at(2), out, err); - } - - if (args.size() == 2 && args.at(1) == QLatin1String("--list-windows")) { - return listWindows(app, out, err); - } - - if (args.size() == 4 && args.at(1) == QLatin1String("--capture-screen")) { - return capture(QStringLiteral("CaptureScreen"), args.at(2), args.at(3), out, err); - } - - if (args.size() == 4 && args.at(1) == QLatin1String("--capture-window")) { - return capture(QStringLiteral("CaptureWindow"), args.at(2), args.at(3), out, err); - } - - err << "usage:\n" - << " " << args.value(0) << " OUTPUT_NAME OUTPUT_FILE\n" - << " " << args.value(0) << " --capture-screen OUTPUT_NAME OUTPUT_FILE\n" - << " " << args.value(0) << " --capture-window INTERNAL_ID OUTPUT_FILE\n" - << " " << args.value(0) << " --list-windows\n"; - return 64; -} - -#include "kwin_capture_screen.moc" diff --git a/org.codex.reforger-kwin-capture.desktop b/org.codex.reforger-kwin-capture.desktop deleted file mode 100644 index 2afa608..0000000 --- a/org.codex.reforger-kwin-capture.desktop +++ /dev/null @@ -1,9 +0,0 @@ -[Desktop Entry] -Type=Application -Name=Reforger KWin Capture Helper -Comment=Authorized KWin screenshot helper for the Reforger queue reader -Exec=/usr/local/bin/reforger-kwin-capture -NoDisplay=true -Terminal=false -Categories=Utility; -X-KDE-DBUS-Restricted-Interfaces=org.kde.KWin.ScreenShot2 diff --git a/pyproject.toml b/pyproject.toml index 836bb01..5ece28f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "anti-prestige-tool" -version = "0.1.0" -description = "Add your description here" +version = "0.7.5" +description = "Arma Reforger queue-position reader for Linux Wayland" readme = "README.md" requires-python = ">=3.10" dependencies = [] diff --git a/reforger_queue/__init__.py b/reforger_queue/__init__.py new file mode 100644 index 0000000..b6c7ef8 --- /dev/null +++ b/reforger_queue/__init__.py @@ -0,0 +1 @@ +"""Queue reader internals for anti-prestige-tool.""" diff --git a/reforger_queue/cli.py b/reforger_queue/cli.py new file mode 100644 index 0000000..b036319 --- /dev/null +++ b/reforger_queue/cli.py @@ -0,0 +1,234 @@ +import argparse +import os +import tempfile + +from .config import ( + DEFAULT_POINTSIZE, + DEFAULT_PORTAL_RESTORE_TOKEN, + DEFAULT_REFERENCE_CROP, + DEFAULT_REFERENCE_SIZE, + DEFAULT_TEMPLATE_SET, +) +from .magick import run_bytes +from .ocr import ( + read_queue_number, + resolve_crop, + templates_from_dataset, + write_template_set, +) +from .portal import capture_portal_window + + +def parse_crop(value): + parts = value.replace(",", " ").split() + if len(parts) != 4: + raise argparse.ArgumentTypeError("crop must be four numbers: x,y,width,height") + try: + return tuple(int(part) for part in parts) + except ValueError as exc: + raise argparse.ArgumentTypeError("crop values must be integers") from exc + + +def parse_size(value): + parts = value.lower().replace(",", "x").split("x") + if len(parts) != 2: + raise argparse.ArgumentTypeError("size must be WIDTHxHEIGHT") + try: + return int(parts[0]), int(parts[1]) + except ValueError as exc: + raise argparse.ArgumentTypeError("size values must be integers") from exc + + +def image_paths(directory): + extensions = {".png", ".jpg", ".jpeg", ".webp", ".bmp"} + return [ + os.path.join(directory, name) + for name in sorted(os.listdir(directory)) + if os.path.splitext(name)[1].lower() in extensions + ] + + +def run_dataset(args): + paths = image_paths(args.dataset) + if not paths: + print("no dataset images found") + return 2 + + status = 0 + passed = 0 + failed = 0 + for path in paths: + crop = resolve_crop(path, args.crop, args.reference_crop, args.reference_size, args.scale_mode, args.cropped) + if args.show_crop and crop: + print(f"{path}\tcrop={crop[0]},{crop[1]},{crop[2]},{crop[3]}") + number, details = read_queue_number(path, crop, args.template_set, args.font, args.pointsize, args.cropped) + if not number: + status = 2 + failed += 1 + suffix = "" + if args.expect_filenames: + expected = os.path.splitext(os.path.basename(path))[0] + suffix = f"\texpected={expected}\tFAIL" + print(f"{path}\tno digits found{suffix}") + continue + + suffix = "" + if args.expect_filenames: + expected = os.path.splitext(os.path.basename(path))[0] + if number == expected: + passed += 1 + suffix = f"\texpected={expected}\tPASS" + else: + failed += 1 + status = 1 + suffix = f"\texpected={expected}\tFAIL" + + print(f"{path}\t{number}{suffix}") + if args.debug: + for digit, bbox, area, hamming, iou in details: + print(f" digit={digit} bbox={bbox} area={area} hamming={hamming:.3f} iou={iou:.3f}") + if args.expect_filenames: + print(f"summary: {passed} passed, {failed} failed") + return status + + +def build_template_set(args): + paths = image_paths(args.build_template_set) + if not paths: + print("no template source images found") + return 2 + templates = templates_from_dataset(paths, args.reference_crop, args.reference_size, args.scale_mode, args.cropped) + write_template_set(args.template_output, templates, paths) + print(args.template_output) + return 0 + + +def read_single_image(args): + crop = resolve_crop(args.image, args.crop, args.reference_crop, args.reference_size, args.scale_mode, args.cropped) + if args.show_crop and crop: + print(f"crop={crop[0]},{crop[1]},{crop[2]},{crop[3]}") + number, details = read_queue_number(args.image, crop, args.template_set, args.font, args.pointsize, args.cropped) + if not number: + print("no digits found") + return 2 + print(number) + if args.debug: + for digit, bbox, area, hamming, iou in details: + print(f"digit={digit} bbox={bbox} area={area} hamming={hamming:.3f} iou={iou:.3f}") + return 0 + + +def read_portal_window(args): + fd, image_path = tempfile.mkstemp(prefix="reforger-queue-", suffix=".png") + os.close(fd) + try: + portal_info = capture_portal_window( + image_path, + args.portal_restore_token, + args.portal_reselect, + args.portal_timeout, + ) + if args.save_input: + run_bytes(["magick", image_path, "+repage", args.save_input]) + crop = resolve_crop(image_path, args.crop, args.reference_crop, args.reference_size, args.scale_mode, args.cropped) + if args.show_crop and crop: + print(f"crop={crop[0]},{crop[1]},{crop[2]},{crop[3]}") + number, details = read_queue_number(image_path, crop, args.template_set, args.font, args.pointsize, args.cropped) + finally: + try: + os.unlink(image_path) + except OSError: + pass + + if not number: + print("no digits found") + return 2 + + print(number) + if args.debug: + print(f"portal-window node_id={portal_info.get('node_id', '')} streams={len(portal_info.get('streams', []))}") + if portal_info.get("restore_token") and args.portal_restore_token: + print(f"portal-restore-token {args.portal_restore_token}") + for digit, bbox, area, hamming, iou in details: + print(f"digit={digit} bbox={bbox} area={area} hamming={hamming:.3f} iou={iou:.3f}") + return 0 + + +def main(): + parser = argparse.ArgumentParser(description="Read Arma Reforger queue number from a fixed screen crop.") + parser.add_argument("--image", help="Read from an existing image.") + parser.add_argument("--dataset", help="Read every image in a dataset directory.") + parser.add_argument("--portal-window", action="store_true", help="Capture a user-selected Wayland window.") + parser.add_argument( + "--portal-restore-token", + default=DEFAULT_PORTAL_RESTORE_TOKEN, + help="Path used to store the portal restore token for later runs.", + ) + parser.add_argument( + "--portal-reselect", + action="store_true", + help="Ignore the saved portal restore token and show the portal picker again.", + ) + parser.add_argument( + "--portal-timeout", + type=float, + default=60.0, + help="Seconds to wait for portal selection and the first PipeWire frame.", + ) + parser.add_argument("--cropped", action="store_true", help="Treat input image(s) as already cropped to the queue number.") + parser.add_argument("--crop", type=parse_crop, help="Absolute crop as x,y,width,height.") + parser.add_argument( + "--reference-crop", + type=parse_crop, + default=DEFAULT_REFERENCE_CROP, + help="Reference crop as x,y,width,height, scaled to the input image size when --crop is omitted.", + ) + parser.add_argument( + "--reference-size", + type=parse_size, + default=DEFAULT_REFERENCE_SIZE, + help="Reference screen size for --reference-crop scaling.", + ) + parser.add_argument( + "--scale-mode", + choices=("width", "independent"), + default="width", + help="How to scale --reference-crop. Width mode preserves a 16:9 UI layout and is the default.", + ) + parser.add_argument( + "--template-set", + default=DEFAULT_TEMPLATE_SET, + help="Digit template JSON used by default recognition.", + ) + parser.add_argument("--font", help="Render synthetic digit templates from an ImageMagick font instead of using --template-set.") + parser.add_argument("--pointsize", type=int, default=DEFAULT_POINTSIZE, help="Point size used for --font templates.") + parser.add_argument("--debug", action="store_true", help="Print component and match details.") + parser.add_argument("--show-crop", action="store_true", help="Print the resolved crop used for each full-size image.") + parser.add_argument("--save-input", help="Save the live captured portal input image for debugging.") + parser.add_argument( + "--expect-filenames", + action="store_true", + help="In dataset mode, compare detected values to each image filename stem.", + ) + parser.add_argument("--build-template-set", metavar="DATASET", help=argparse.SUPPRESS) + parser.add_argument("--template-output", default=DEFAULT_TEMPLATE_SET, help=argparse.SUPPRESS) + args = parser.parse_args() + + if args.portal_reselect and not args.portal_window: + parser.error("--portal-reselect can only be used with --portal-window") + if args.save_input and not args.portal_window: + parser.error("--save-input can only be used with --portal-window") + if args.portal_timeout <= 0: + parser.error("--portal-timeout must be positive") + + selected_modes = [args.image, args.dataset, args.build_template_set, args.portal_window] + if sum(bool(value) for value in selected_modes) != 1: + parser.error("choose exactly one of --image, --dataset, --portal-window, or --build-template-set") + + if args.build_template_set: + return build_template_set(args) + if args.dataset: + return run_dataset(args) + if args.image: + return read_single_image(args) + return read_portal_window(args) diff --git a/reforger_queue/config.py b/reforger_queue/config.py new file mode 100644 index 0000000..57e6ee7 --- /dev/null +++ b/reforger_queue/config.py @@ -0,0 +1,19 @@ +import os + + +PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +DEFAULT_REFERENCE_SIZE = (2560, 1440) +DEFAULT_REFERENCE_CROP = (1050, 620, 100, 60) +DEFAULT_TEMPLATE_SET = os.path.join(PROJECT_DIR, "templates", "reforger_digits.json") +DEFAULT_POINTSIZE = 43 +DEFAULT_PORTAL_RESTORE_TOKEN = os.path.expanduser( + "~/.local/state/anti-prestige-tool/portal-window-restore-token" +) +AUTO_FONT_CANDIDATES = ( + "Roboto-Condensed", + "RobotoCondensed-Regular", + "Roboto-Condensed-Regular", + "Roboto", + "Adwaita-Sans", + "Adwaita-Sans-Bold", +) diff --git a/reforger_queue/magick.py b/reforger_queue/magick.py new file mode 100644 index 0000000..2175d82 --- /dev/null +++ b/reforger_queue/magick.py @@ -0,0 +1,83 @@ +import re +import subprocess +import sys + + +def run_bytes(args): + try: + return subprocess.run(args, check=True, stdout=subprocess.PIPE).stdout + except FileNotFoundError: + sys.exit(f"missing required command: {args[0]}") + except subprocess.CalledProcessError as exc: + sys.exit(f"command failed ({exc.returncode}): {' '.join(args)}") + + +def run_text(args): + return run_bytes(args).decode("utf-8", "replace") + + +def parse_ppm(data): + idx = 0 + + def next_token(): + nonlocal idx + while idx < len(data) and data[idx] in b" \t\r\n": + idx += 1 + if idx < len(data) and data[idx] == ord("#"): + while idx < len(data) and data[idx] not in b"\r\n": + idx += 1 + return next_token() + start = idx + while idx < len(data) and data[idx] not in b" \t\r\n": + idx += 1 + return data[start:idx] + + magic = next_token() + if magic != b"P6": + sys.exit("ImageMagick did not return binary PPM data") + + width = int(next_token()) + height = int(next_token()) + max_value = int(next_token()) + if max_value != 255: + sys.exit(f"unsupported PPM max value: {max_value}") + + while idx < len(data) and data[idx] in b" \t\r\n": + idx += 1 + + return width, height, data[idx:] + + +def ppm_from_magick(args): + return parse_ppm(run_bytes(["magick", *args, "-alpha", "off", "-depth", "8", "ppm:-"])) + + +def try_ppm_from_magick(args): + command = ["magick", *args, "-alpha", "off", "-depth", "8", "ppm:-"] + try: + result = subprocess.run(command, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + except FileNotFoundError: + sys.exit("missing required command: magick") + + if result.returncode != 0: + return None, result.stderr.decode("utf-8", "replace").strip() + + try: + return parse_ppm(result.stdout), "" + except Exception as exc: + return None, str(exc) + + +def magick_fonts(): + fonts = set() + for line in run_text(["magick", "-list", "font"]).splitlines(): + match = re.match(r"\s*Font:\s*(\S+)", line) + if match: + fonts.add(match.group(1)) + return fonts + + +def image_size(image_path): + output = run_bytes(["magick", "identify", "-format", "%w %h", image_path]).decode("ascii") + width, height = output.split() + return int(width), int(height) diff --git a/reforger_queue/ocr.py b/reforger_queue/ocr.py new file mode 100644 index 0000000..b1be842 --- /dev/null +++ b/reforger_queue/ocr.py @@ -0,0 +1,294 @@ +import json +import os +import sys + +from .config import AUTO_FONT_CANDIDATES +from .magick import image_size, magick_fonts, ppm_from_magick, try_ppm_from_magick + + +def orange_digit_mask(width, height, raw): + mask = [[False] * width for _ in range(height)] + for y in range(height): + row_offset = y * width * 3 + for x in range(width): + offset = row_offset + x * 3 + r, g, b = raw[offset], raw[offset + 1], raw[offset + 2] + mask[y][x] = r >= 115 and 55 <= g <= 205 and b <= 100 and r - g >= 20 + return mask + + +def white_mask(width, height, raw): + mask = [[False] * width for _ in range(height)] + for y in range(height): + row_offset = y * width * 3 + for x in range(width): + offset = row_offset + x * 3 + r, g, b = raw[offset], raw[offset + 1], raw[offset + 2] + mask[y][x] = r >= 120 and g >= 120 and b >= 120 + return mask + + +def connected_components(mask): + height = len(mask) + width = len(mask[0]) if height else 0 + seen = [[False] * width for _ in range(height)] + components = [] + + for y in range(height): + for x in range(width): + if seen[y][x] or not mask[y][x]: + continue + + stack = [(x, y)] + seen[y][x] = True + points = set() + + while stack: + cx, cy = stack.pop() + points.add((cx, cy)) + for ny in range(cy - 1, cy + 2): + for nx in range(cx - 1, cx + 2): + if nx < 0 or ny < 0 or nx >= width or ny >= height: + continue + if seen[ny][nx] or not mask[ny][nx]: + continue + seen[ny][nx] = True + stack.append((nx, ny)) + + xs = [point[0] for point in points] + ys = [point[1] for point in points] + bbox = (min(xs), min(ys), max(xs), max(ys)) + box_width = bbox[2] - bbox[0] + 1 + box_height = bbox[3] - bbox[1] + 1 + + if len(points) >= 40 and box_width >= 5 and box_height >= 15: + components.append({"points": points, "bbox": bbox, "area": len(points)}) + + components.sort(key=lambda component: component["bbox"][0]) + return components + + +def normalize(component, width=24, height=36): + x1, y1, x2, y2 = component["bbox"] + source_width = x2 - x1 + 1 + source_height = y2 - y1 + 1 + points = component["points"] + output = [] + + for y in range(height): + source_y = y1 + int((y + 0.5) * source_height / height) + row = [] + for x in range(width): + source_x = x1 + int((x + 0.5) * source_width / width) + row.append((source_x, source_y) in points) + output.append(row) + + return output + + +def template_distance(left, right): + total = len(left) * len(left[0]) + differences = 0 + intersection = 0 + union = 0 + + for left_row, right_row in zip(left, right): + for left_value, right_value in zip(left_row, right_row): + differences += left_value != right_value + intersection += left_value and right_value + union += left_value or right_value + + hamming = differences / total + iou = intersection / union if union else 0.0 + return hamming, iou + + +def template_font_candidates(font): + if font != "auto": + return [font] + + available = magick_fonts() + candidates = [candidate for candidate in AUTO_FONT_CANDIDATES if candidate in available] + return [*candidates, None] + + +def build_template_args(font, pointsize): + args = ["-background", "black", "-fill", "white"] + if font: + args.extend(["-font", font]) + args.extend(["-pointsize", str(pointsize), "label:0123456789"]) + return args + + +def build_templates(font, pointsize): + errors = [] + for candidate in template_font_candidates(font): + ppm, error = try_ppm_from_magick(build_template_args(candidate, pointsize)) + if ppm is None: + errors.append(f"{candidate or 'ImageMagick default'}: {error}") + continue + + width, height, raw = ppm + components = connected_components(white_mask(width, height, raw)) + if len(components) < 10: + errors.append(f"{candidate or 'ImageMagick default'}: rendered only {len(components)} digit components") + continue + return {str(index): [normalize(component)] for index, component in enumerate(components[:10])} + + if font == "auto": + sys.exit("could not render digit templates with any available ImageMagick font:\n " + "\n ".join(errors)) + sys.exit( + f"could not render digit templates with font {font!r}. " + "Install that font, use --font auto, or pass another ImageMagick font name." + ) + + +def validate_template_grid(grid, digit, sample_index, path): + if not isinstance(grid, list) or len(grid) != 36: + sys.exit(f"invalid template grid for digit {digit} sample {sample_index} in {path}: expected 36 rows") + for row in grid: + if not isinstance(row, list) or len(row) != 24 or any(not isinstance(value, bool) for value in row): + sys.exit( + f"invalid template grid for digit {digit} sample {sample_index} in {path}: " + "expected 24 boolean columns per row" + ) + + +def load_template_set(path): + try: + with open(path, "r", encoding="utf-8") as handle: + payload = json.load(handle) + except FileNotFoundError: + sys.exit(f"missing digit template set {path}; regenerate it from the regression dataset") + except json.JSONDecodeError as exc: + sys.exit(f"failed to parse digit template set {path}: {exc}") + + digits = payload.get("digits") if isinstance(payload, dict) else None + if not isinstance(digits, dict): + sys.exit(f"invalid digit template set {path}: missing digits object") + + templates = {} + for digit in "0123456789": + samples = digits.get(digit) + if not isinstance(samples, list) or not samples: + sys.exit(f"invalid digit template set {path}: missing samples for digit {digit}") + templates[digit] = [] + for index, grid in enumerate(samples): + validate_template_grid(grid, digit, index, path) + templates[digit].append(grid) + + return templates + + +def templates_from_dataset(paths, reference_crop, reference_size, scale_mode, cropped): + templates = {str(digit): [] for digit in range(10)} + for path in paths: + expected = os.path.splitext(os.path.basename(path))[0] + if not expected.isdigit(): + sys.exit(f"template source filename must be numeric: {path}") + + crop = resolve_crop(path, None, reference_crop, reference_size, scale_mode, cropped) + width, height, raw = load_queue_image(path, crop, cropped) + components = connected_components(orange_digit_mask(width, height, raw)) + if len(components) != len(expected): + sys.exit( + f"template source {path} produced {len(components)} digit components, " + f"but filename expects {len(expected)}" + ) + + for digit, component in zip(expected, components): + templates[digit].append(normalize(component)) + + missing = [digit for digit, samples in templates.items() if not samples] + if missing: + sys.exit(f"template dataset has no samples for digits: {', '.join(missing)}") + return templates + + +def write_template_set(output, templates, source_paths): + payload = { + "version": 1, + "normalize_size": [24, 36], + "source": "regression digit crops", + "source_files": [os.path.basename(path) for path in source_paths], + "digits": templates, + } + directory = os.path.dirname(output) + if directory: + os.makedirs(directory, exist_ok=True) + temp_path = f"{output}.tmp" + with open(temp_path, "w", encoding="utf-8") as handle: + json.dump(payload, handle, separators=(",", ":")) + handle.write("\n") + os.replace(temp_path, output) + + +def resolve_templates(template_set, font, pointsize): + if font: + return build_templates(font, pointsize) + return load_template_set(template_set) + + +def classify(component, templates): + sample = normalize(component) + ranked = [] + for digit, digit_templates in templates.items(): + for template in digit_templates: + hamming, iou = template_distance(sample, template) + ranked.append((hamming, -iou, digit, iou)) + ranked.sort() + hamming, negative_iou, digit, iou = ranked[0] + return digit, hamming, iou + + +def load_queue_image(image_path, crop, already_cropped=False): + if already_cropped: + return ppm_from_magick([image_path, "+repage"]) + + x, y, width, height = crop + return ppm_from_magick([image_path, "+repage", "-crop", f"{width}x{height}+{x}+{y}", "+repage"]) + + +def read_queue_number(image_path, crop, template_set, font, pointsize, already_cropped=False): + width, height, raw = load_queue_image(image_path, crop, already_cropped) + components = connected_components(orange_digit_mask(width, height, raw)) + templates = resolve_templates(template_set, font, pointsize) + + digits = [] + details = [] + for component in components: + digit, hamming, iou = classify(component, templates) + digits.append(digit) + details.append((digit, component["bbox"], component["area"], hamming, iou)) + + return "".join(digits), details + + +def scale_crop(reference_crop, reference_size, target_size, scale_mode): + reference_width, reference_height = reference_size + target_width, target_height = target_size + x, y, width, height = reference_crop + + if scale_mode == "width": + x_scale = target_width / reference_width + y_scale = x_scale + elif scale_mode == "independent": + x_scale = target_width / reference_width + y_scale = target_height / reference_height + else: + raise ValueError(f"unsupported scale mode: {scale_mode}") + + return ( + round(x * x_scale), + round(y * y_scale), + max(1, round(width * x_scale)), + max(1, round(height * y_scale)), + ) + + +def resolve_crop(image_path, explicit_crop, reference_crop, reference_size, scale_mode, already_cropped): + if already_cropped: + return None + if explicit_crop: + return explicit_crop + return scale_crop(reference_crop, reference_size, image_size(image_path), scale_mode) diff --git a/reforger_queue/portal.py b/reforger_queue/portal.py new file mode 100644 index 0000000..c4c3f06 --- /dev/null +++ b/reforger_queue/portal.py @@ -0,0 +1,75 @@ +import json +import os +import subprocess +import sys + +from .config import PROJECT_DIR + + +def portal_helper_path(): + return os.path.join(PROJECT_DIR, "portal_capture_frame") + + +def require_portal_helper(): + helper = portal_helper_path() + if not os.path.exists(helper): + sys.exit(f"missing {helper}; run `make` in {PROJECT_DIR}") + if not os.access(helper, os.X_OK): + sys.exit(f"portal helper is not executable: {helper}") + return helper + + +def read_portal_restore_token(path): + try: + with open(path, "r", encoding="utf-8") as handle: + return handle.read().strip() + except FileNotFoundError: + return "" + except OSError as exc: + sys.exit(f"failed to read portal restore token {path}: {exc}") + + +def write_portal_restore_token(path, token): + directory = os.path.dirname(path) + if directory: + os.makedirs(directory, exist_ok=True) + temp_path = f"{path}.tmp" + try: + with open(temp_path, "w", encoding="utf-8") as handle: + handle.write(token) + handle.write("\n") + os.replace(temp_path, path) + except OSError as exc: + sys.exit(f"failed to write portal restore token {path}: {exc}") + + +def capture_portal_window(output, restore_token_path, reselect, timeout): + helper = require_portal_helper() + restore_token = "" + if restore_token_path and not reselect: + restore_token = read_portal_restore_token(restore_token_path) + + command = [helper, "--window", output, "--timeout", str(timeout)] + if restore_token_path: + command.append("--persist") + if restore_token: + command.extend(["--restore-token", restore_token]) + + try: + result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + except subprocess.CalledProcessError as exc: + message = exc.stderr.strip() if exc.stderr else str(exc) + sys.exit(message) + + capture_info = {} + if result.stdout.strip(): + try: + capture_info = json.loads(result.stdout) + except json.JSONDecodeError as exc: + sys.exit(f"failed to parse portal capture metadata: {exc}: {result.stdout!r}") + + new_restore_token = str(capture_info.get("restore_token", "") or "") + if restore_token_path and new_restore_token: + write_portal_restore_token(restore_token_path, new_restore_token) + + return capture_info diff --git a/reforger_queue_read.py b/reforger_queue_read.py index c7a921d..1bd0ce2 100644 --- a/reforger_queue_read.py +++ b/reforger_queue_read.py @@ -1,1674 +1,5 @@ #!/usr/bin/env python3 -import argparse -import base64 -import hashlib -import json -import os -import re -import socket -import struct -import subprocess -import sys -import tempfile -import time -import uuid - - -DEFAULT_REFERENCE_SIZE = (2560, 1440) -DEFAULT_REFERENCE_CROP = (1050, 620, 100, 60) -DEFAULT_TEMPLATE_SET = os.path.join(os.path.dirname(os.path.abspath(__file__)), "templates", "reforger_digits.json") -DEFAULT_POINTSIZE = 43 -AUTO_FONT_CANDIDATES = ( - "Roboto-Condensed", - "RobotoCondensed-Regular", - "Roboto-Condensed-Regular", - "Roboto", - "Adwaita-Sans", - "Adwaita-Sans-Bold", -) -SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) -SYSTEM_KWIN_HELPER = "/usr/local/bin/reforger-kwin-capture" -DEFAULT_PORTAL_RESTORE_TOKEN = os.path.expanduser( - "~/.local/state/anti-prestige-tool/portal-window-restore-token" -) -OBS_WEBSOCKET_CONFIG = os.path.expanduser("~/.config/obs-studio/plugin_config/obs-websocket/config.json") -OBS_DEFAULT_HOST = "127.0.0.1" -OBS_DEFAULT_PORT = 4455 -OBS_DEFAULT_SCENE = "APT" -OBS_DEFAULT_SOURCE = "APT Capture" -OBS_CANVAS_SIZE = (1920, 1080) -OBS_PIPEWIRE_INPUT_KIND = "pipewire-screen-capture-source" -OBS_RESELECT_TIMEOUT = 30.0 - - -def run_bytes(args): - try: - return subprocess.run(args, check=True, stdout=subprocess.PIPE).stdout - except FileNotFoundError: - sys.exit(f"missing required command: {args[0]}") - except subprocess.CalledProcessError as exc: - sys.exit(f"command failed ({exc.returncode}): {' '.join(args)}") - - -def run_text(args): - return run_bytes(args).decode("utf-8", "replace") - - -def obs_is_running(): - return subprocess.run( - ["pgrep", "-x", "obs"], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ).returncode == 0 - - -def tcp_port_open(host, port, timeout=0.25): - try: - with socket.create_connection((host, port), timeout=timeout): - return True - except OSError: - return False - - -def read_obs_websocket_config(path=OBS_WEBSOCKET_CONFIG): - try: - with open(path, "r", encoding="utf-8") as handle: - return json.load(handle) - except FileNotFoundError: - return {} - except json.JSONDecodeError as exc: - sys.exit(f"failed to parse OBS WebSocket config {path}: {exc}") - - -def write_obs_websocket_config(config, path=OBS_WEBSOCKET_CONFIG): - directory = os.path.dirname(path) - os.makedirs(directory, exist_ok=True) - temp_path = f"{path}.tmp" - with open(temp_path, "w", encoding="utf-8") as handle: - json.dump(config, handle, indent=2) - handle.write("\n") - os.replace(temp_path, path) - - -def ensure_obs_websocket_config_enabled(path=OBS_WEBSOCKET_CONFIG): - config = read_obs_websocket_config(path) - changed = not config.get("server_enabled", False) - - config.setdefault("alerts_enabled", False) - config.setdefault("auth_required", True) - config.setdefault("first_load", False) - config.setdefault("server_port", OBS_DEFAULT_PORT) - config["server_enabled"] = True - - if config.get("auth_required", True) and not config.get("server_password"): - sys.exit(f"OBS WebSocket auth is enabled but {path} has no server_password") - - if changed: - write_obs_websocket_config(config, path) - - return config, changed - - -def launch_obs(scene): - subprocess.Popen( - ["obs", "--scene", scene, "--minimize-to-tray"], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - start_new_session=True, - ) - - -def wait_for_port(host, port, timeout=15.0): - deadline = time.monotonic() + timeout - while time.monotonic() < deadline: - if tcp_port_open(host, port, timeout=0.25): - return True - time.sleep(0.25) - return False - - -def prepare_obs(host, port, scene, config_path=OBS_WEBSOCKET_CONFIG): - running = obs_is_running() - config, config_changed = ensure_obs_websocket_config_enabled(config_path) - effective_port = int(config.get("server_port", port)) - - if running and not tcp_port_open(host, effective_port): - if config_changed: - sys.exit( - "Enabled OBS WebSocket in the OBS config, but OBS is already running. " - "Restart OBS, then rerun this command." - ) - sys.exit( - f"OBS is running, but OBS WebSocket is not listening on {host}:{effective_port}. " - "Enable OBS WebSocket or restart OBS." - ) - - if not running: - launch_obs(scene) - if not wait_for_port(host, effective_port): - sys.exit(f"started OBS, but OBS WebSocket did not become available on {host}:{effective_port}") - - return config, effective_port - - -class ObsWebSocketClient: - def __init__(self, host, port, password=None, timeout=5.0): - self.host = host - self.port = port - self.password = password or "" - self.timeout = timeout - self.sock = None - - def __enter__(self): - self.connect() - return self - - def __exit__(self, exc_type, exc, traceback): - self.close() - - def connect(self): - self.sock = socket.create_connection((self.host, self.port), timeout=self.timeout) - self._http_upgrade() - hello = self.recv_json() - if hello.get("op") != 0: - raise RuntimeError(f"expected OBS Hello, got: {hello}") - - identify = {"rpcVersion": 1, "eventSubscriptions": 0} - auth = hello.get("d", {}).get("authentication") - if auth: - identify["authentication"] = self.auth_response(auth) - - self.send_json({"op": 1, "d": identify}) - identified = self.recv_json() - if identified.get("op") != 2: - raise RuntimeError(f"OBS identification failed: {identified}") - - def close(self): - if self.sock: - try: - self.sock.close() - finally: - self.sock = None - - def _http_upgrade(self): - key = base64.b64encode(os.urandom(16)).decode("ascii") - request = ( - f"GET / HTTP/1.1\r\n" - f"Host: {self.host}:{self.port}\r\n" - "Upgrade: websocket\r\n" - "Connection: Upgrade\r\n" - f"Sec-WebSocket-Key: {key}\r\n" - "Sec-WebSocket-Version: 13\r\n" - "Sec-WebSocket-Protocol: obswebsocket.json\r\n" - "\r\n" - ) - self.sock.sendall(request.encode("ascii")) - response = b"" - while b"\r\n\r\n" not in response: - chunk = self.sock.recv(4096) - if not chunk: - raise RuntimeError("OBS WebSocket closed during HTTP upgrade") - response += chunk - status_line = response.split(b"\r\n", 1)[0] - if b" 101 " not in status_line: - raise RuntimeError(f"OBS WebSocket upgrade failed: {status_line.decode('ascii', 'replace')}") - - def auth_response(self, auth): - salt = auth["salt"] - challenge = auth["challenge"] - secret = base64.b64encode(hashlib.sha256((self.password + salt).encode("utf-8")).digest()).decode("ascii") - return base64.b64encode(hashlib.sha256((secret + challenge).encode("utf-8")).digest()).decode("ascii") - - def read_exact(self, length): - chunks = [] - remaining = length - while remaining: - chunk = self.sock.recv(remaining) - if not chunk: - raise RuntimeError("OBS WebSocket closed unexpectedly") - chunks.append(chunk) - remaining -= len(chunk) - return b"".join(chunks) - - def recv_frame(self): - header = self.read_exact(2) - first, second = header - opcode = first & 0x0F - masked = bool(second & 0x80) - length = second & 0x7F - if length == 126: - length = struct.unpack("!H", self.read_exact(2))[0] - elif length == 127: - length = struct.unpack("!Q", self.read_exact(8))[0] - mask = self.read_exact(4) if masked else None - payload = self.read_exact(length) if length else b"" - if mask: - payload = bytes(byte ^ mask[index % 4] for index, byte in enumerate(payload)) - return opcode, payload - - def send_frame(self, opcode, payload): - if isinstance(payload, str): - payload = payload.encode("utf-8") - mask = os.urandom(4) - length = len(payload) - if length < 126: - header = struct.pack("!BB", 0x80 | opcode, 0x80 | length) - elif length <= 0xFFFF: - header = struct.pack("!BBH", 0x80 | opcode, 0x80 | 126, length) - else: - header = struct.pack("!BBQ", 0x80 | opcode, 0x80 | 127, length) - masked_payload = bytes(byte ^ mask[index % 4] for index, byte in enumerate(payload)) - self.sock.sendall(header + mask + masked_payload) - - def recv_json(self): - while True: - opcode, payload = self.recv_frame() - if opcode == 1: - return json.loads(payload.decode("utf-8")) - if opcode == 8: - raise RuntimeError("OBS WebSocket closed the connection") - if opcode == 9: - self.send_frame(10, payload) - - def send_json(self, message): - self.send_frame(1, json.dumps(message, separators=(",", ":"))) - - def request(self, request_type, request_data=None): - request_id = str(uuid.uuid4()) - message = { - "op": 6, - "d": { - "requestType": request_type, - "requestId": request_id, - }, - } - if request_data is not None: - message["d"]["requestData"] = request_data - self.send_json(message) - - while True: - response = self.recv_json() - if response.get("op") != 7: - continue - data = response.get("d", {}) - if data.get("requestId") != request_id: - continue - status = data.get("requestStatus", {}) - if not status.get("result"): - comment = status.get("comment", "no error details") - raise RuntimeError(f"OBS request {request_type} failed: {comment}") - return data.get("responseData", {}) - - -def decode_obs_image_data(image_data): - if "," in image_data: - image_data = image_data.split(",", 1)[1] - return base64.b64decode(image_data) - - -def save_obs_scene_screenshot(output, scene, host, port, password): - with ObsWebSocketClient(host, port, password=password) as client: - scenes = client.request("GetSceneList").get("scenes", []) - scene_names = {scene_info.get("sceneName") for scene_info in scenes} - if scene not in scene_names: - available = ", ".join(sorted(name for name in scene_names if name)) - sys.exit(f"OBS scene {scene!r} was not found. Available scenes: {available}") - - data = client.request( - "GetSourceScreenshot", - { - "sourceName": scene, - "imageFormat": "png", - }, - ) - image_bytes = decode_obs_image_data(data["imageData"]) - with open(output, "wb") as handle: - handle.write(image_bytes) - - -def scene_names(client): - return {scene.get("sceneName") for scene in client.request("GetSceneList").get("scenes", [])} - - -def ensure_obs_scene_exists(client, scene): - if scene not in scene_names(client): - client.request("CreateScene", {"sceneName": scene}) - - -def ensure_obs_video_settings(client): - settings = client.request("GetVideoSettings") - width, height = OBS_CANVAS_SIZE - if ( - settings.get("baseWidth") == width - and settings.get("baseHeight") == height - and settings.get("outputWidth") == width - and settings.get("outputHeight") == height - ): - return - - client.request( - "SetVideoSettings", - { - "baseWidth": width, - "baseHeight": height, - "outputWidth": width, - "outputHeight": height, - "fpsNumerator": settings.get("fpsNumerator", 60), - "fpsDenominator": settings.get("fpsDenominator", 1), - }, - ) - - -def get_scene_items(client, scene): - return client.request("GetSceneItemList", {"sceneName": scene}).get("sceneItems", []) - - -def find_pipewire_scene_item(client, scene): - fallback = None - for item in get_scene_items(client, scene): - if item.get("inputKind") != OBS_PIPEWIRE_INPUT_KIND: - continue - if fallback is None: - fallback = item - if item.get("sceneItemEnabled", True): - return item - return fallback - - -def disable_pipewire_scene_items(client, scene): - for item in get_scene_items(client, scene): - if item.get("inputKind") == OBS_PIPEWIRE_INPUT_KIND: - client.request( - "SetSceneItemEnabled", - { - "sceneName": scene, - "sceneItemId": item["sceneItemId"], - "sceneItemEnabled": False, - }, - ) - - -def existing_input_names(client): - return {item.get("inputName") for item in client.request("GetInputList").get("inputs", [])} - - -def unique_input_name(client, preferred): - names = existing_input_names(client) - if preferred not in names: - return preferred - - for index in range(2, 100): - candidate = f"{preferred} {index}" - if candidate not in names: - return candidate - - return f"{preferred} {int(time.time())}" - - -def create_pipewire_scene_item(client, scene, source_name): - source_name = unique_input_name(client, source_name) - response = client.request( - "CreateInput", - { - "sceneName": scene, - "inputName": source_name, - "inputKind": OBS_PIPEWIRE_INPUT_KIND, - "inputSettings": {"ShowCursor": False}, - "sceneItemEnabled": True, - }, - ) - item_id = response.get("sceneItemId") - if item_id is None: - item_id = client.request("GetSceneItemId", {"sceneName": scene, "sourceName": source_name})["sceneItemId"] - return {"sceneItemId": item_id, "sourceName": source_name} - - -def normalize_obs_scene_item_transform(client, scene, item_id): - width, height = OBS_CANVAS_SIZE - client.request( - "SetSceneItemTransform", - { - "sceneName": scene, - "sceneItemId": item_id, - "sceneItemTransform": { - "alignment": 5, - "positionX": 0.0, - "positionY": 0.0, - "rotation": 0.0, - "scaleX": 1.0, - "scaleY": 1.0, - "boundsType": "OBS_BOUNDS_SCALE_INNER", - "boundsAlignment": 0, - "boundsWidth": float(width), - "boundsHeight": float(height), - "cropToBounds": False, - "cropLeft": 0, - "cropTop": 0, - "cropRight": 0, - "cropBottom": 0, - }, - }, - ) - - -def ensure_obs_scene_ready(client, scene, source_name, force_reselect=False): - ensure_obs_scene_exists(client, scene) - ensure_obs_video_settings(client) - client.request("SetCurrentProgramScene", {"sceneName": scene}) - - if force_reselect: - disable_pipewire_scene_items(client, scene) - item = create_pipewire_scene_item(client, scene, source_name) - else: - item = find_pipewire_scene_item(client, scene) - if item is None: - item = create_pipewire_scene_item(client, scene, source_name) - - item_id = item["sceneItemId"] - source = item["sourceName"] - client.request("SetSceneItemEnabled", {"sceneName": scene, "sceneItemId": item_id, "sceneItemEnabled": True}) - normalize_obs_scene_item_transform(client, scene, item_id) - - active = client.request("GetSourceActive", {"sourceName": source}) - if not active.get("videoActive") or not active.get("videoShowing"): - sys.exit( - f"OBS source {source!r} is not active. Open OBS, select scene {scene!r}, " - "and reselect the Arma Reforger window for the PipeWire source." - ) - - return source - - -def write_obs_scene_screenshot(client, output, scene): - data = client.request( - "GetSourceScreenshot", - { - "sourceName": scene, - "imageFormat": "png", - }, - ) - image_bytes = decode_obs_image_data(data["imageData"]) - with open(output, "wb") as handle: - handle.write(image_bytes) - - -def save_obs_scene_screenshot_with_setup( - output, - scene, - source_name, - host, - port, - password, - force_reselect=False, - wait_for_nonblank=False, - timeout=OBS_RESELECT_TIMEOUT, -): - with ObsWebSocketClient(host, port, password=password) as client: - ensure_obs_scene_ready(client, scene, source_name, force_reselect=force_reselect) - deadline = time.monotonic() + timeout - while True: - write_obs_scene_screenshot(client, output, scene) - if not wait_for_nonblank or not image_is_blank(output): - return - if time.monotonic() >= deadline: - return - time.sleep(0.5) - - -def parse_ppm(data): - idx = 0 - - def next_token(): - nonlocal idx - while idx < len(data) and data[idx] in b" \t\r\n": - idx += 1 - if idx < len(data) and data[idx] == ord("#"): - while idx < len(data) and data[idx] not in b"\r\n": - idx += 1 - return next_token() - start = idx - while idx < len(data) and data[idx] not in b" \t\r\n": - idx += 1 - return data[start:idx] - - magic = next_token() - if magic != b"P6": - sys.exit("ImageMagick did not return binary PPM data") - - width = int(next_token()) - height = int(next_token()) - max_value = int(next_token()) - if max_value != 255: - sys.exit(f"unsupported PPM max value: {max_value}") - - while idx < len(data) and data[idx] in b" \t\r\n": - idx += 1 - - return width, height, data[idx:] - - -def ppm_from_magick(args): - return parse_ppm(run_bytes(["magick", *args, "-alpha", "off", "-depth", "8", "ppm:-"])) - - -def try_ppm_from_magick(args): - command = ["magick", *args, "-alpha", "off", "-depth", "8", "ppm:-"] - try: - result = subprocess.run(command, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - except FileNotFoundError: - sys.exit("missing required command: magick") - - if result.returncode != 0: - return None, result.stderr.decode("utf-8", "replace").strip() - - try: - return parse_ppm(result.stdout), "" - except Exception as exc: - return None, str(exc) - - -def magick_fonts(): - fonts = set() - for line in run_text(["magick", "-list", "font"]).splitlines(): - match = re.match(r"\s*Font:\s*(\S+)", line) - if match: - fonts.add(match.group(1)) - return fonts - - -def template_font_candidates(font): - if font != "auto": - return [font] - - available = magick_fonts() - candidates = [candidate for candidate in AUTO_FONT_CANDIDATES if candidate in available] - return [*candidates, None] - - -def image_size(image_path): - output = run_bytes(["magick", "identify", "-format", "%w %h", image_path]).decode("ascii") - width, height = output.split() - return int(width), int(height) - - -def image_is_blank(image_path, bright_threshold=12, min_nonblank_fraction=0.001): - width, height, raw = ppm_from_magick([image_path, "+repage", "-resize", "160x90!"]) - if not width or not height: - return True - - nonblank = 0 - total = width * height - for offset in range(0, len(raw), 3): - if max(raw[offset], raw[offset + 1], raw[offset + 2]) > bright_threshold: - nonblank += 1 - - return (nonblank / total) < min_nonblank_fraction - - -def orange_digit_mask(width, height, raw): - mask = [[False] * width for _ in range(height)] - for y in range(height): - row_offset = y * width * 3 - for x in range(width): - offset = row_offset + x * 3 - r, g, b = raw[offset], raw[offset + 1], raw[offset + 2] - mask[y][x] = r >= 115 and 55 <= g <= 205 and b <= 100 and r - g >= 20 - return mask - - -def white_mask(width, height, raw): - mask = [[False] * width for _ in range(height)] - for y in range(height): - row_offset = y * width * 3 - for x in range(width): - offset = row_offset + x * 3 - r, g, b = raw[offset], raw[offset + 1], raw[offset + 2] - mask[y][x] = r >= 120 and g >= 120 and b >= 120 - return mask - - -def connected_components(mask): - height = len(mask) - width = len(mask[0]) if height else 0 - seen = [[False] * width for _ in range(height)] - components = [] - - for y in range(height): - for x in range(width): - if seen[y][x] or not mask[y][x]: - continue - - stack = [(x, y)] - seen[y][x] = True - points = set() - - while stack: - cx, cy = stack.pop() - points.add((cx, cy)) - for ny in range(cy - 1, cy + 2): - for nx in range(cx - 1, cx + 2): - if nx < 0 or ny < 0 or nx >= width or ny >= height: - continue - if seen[ny][nx] or not mask[ny][nx]: - continue - seen[ny][nx] = True - stack.append((nx, ny)) - - xs = [point[0] for point in points] - ys = [point[1] for point in points] - bbox = (min(xs), min(ys), max(xs), max(ys)) - box_width = bbox[2] - bbox[0] + 1 - box_height = bbox[3] - bbox[1] + 1 - - if len(points) >= 40 and box_width >= 5 and box_height >= 15: - components.append({"points": points, "bbox": bbox, "area": len(points)}) - - components.sort(key=lambda component: component["bbox"][0]) - return components - - -def normalize(component, width=24, height=36): - x1, y1, x2, y2 = component["bbox"] - source_width = x2 - x1 + 1 - source_height = y2 - y1 + 1 - points = component["points"] - output = [] - - for y in range(height): - source_y = y1 + int((y + 0.5) * source_height / height) - row = [] - for x in range(width): - source_x = x1 + int((x + 0.5) * source_width / width) - row.append((source_x, source_y) in points) - output.append(row) - - return output - - -def template_distance(left, right): - total = len(left) * len(left[0]) - differences = 0 - intersection = 0 - union = 0 - - for left_row, right_row in zip(left, right): - for left_value, right_value in zip(left_row, right_row): - differences += left_value != right_value - intersection += left_value and right_value - union += left_value or right_value - - hamming = differences / total - iou = intersection / union if union else 0.0 - return hamming, iou - - -def build_template_args(font, pointsize): - args = [ - "-background", - "black", - "-fill", - "white", - ] - if font: - args.extend(["-font", font]) - args.extend( - [ - "-pointsize", - str(pointsize), - "label:0123456789", - ] - ) - return args - - -def build_templates(font, pointsize): - errors = [] - for candidate in template_font_candidates(font): - ppm, error = try_ppm_from_magick(build_template_args(candidate, pointsize)) - if ppm is None: - errors.append(f"{candidate or 'ImageMagick default'}: {error}") - continue - - width, height, raw = ppm - components = connected_components(white_mask(width, height, raw)) - if len(components) < 10: - errors.append(f"{candidate or 'ImageMagick default'}: rendered only {len(components)} digit components") - continue - return {str(index): normalize(component) for index, component in enumerate(components[:10])} - - if font == "auto": - sys.exit("could not render digit templates with any available ImageMagick font:\n " + "\n ".join(errors)) - sys.exit( - f"could not render digit templates with font {font!r}. " - "Install that font, use --font auto, or pass another ImageMagick font name." - ) - - -def validate_template_grid(grid, digit, sample_index, path): - if not isinstance(grid, list) or len(grid) != 36: - sys.exit(f"invalid template grid for digit {digit} sample {sample_index} in {path}: expected 36 rows") - for row in grid: - if not isinstance(row, list) or len(row) != 24 or any(not isinstance(value, bool) for value in row): - sys.exit( - f"invalid template grid for digit {digit} sample {sample_index} in {path}: " - "expected 24 boolean columns per row" - ) - - -def load_template_set(path): - try: - with open(path, "r", encoding="utf-8") as handle: - payload = json.load(handle) - except FileNotFoundError: - sys.exit(f"missing digit template set {path}; regenerate it from the regression dataset") - except json.JSONDecodeError as exc: - sys.exit(f"failed to parse digit template set {path}: {exc}") - - digits = payload.get("digits") if isinstance(payload, dict) else None - if not isinstance(digits, dict): - sys.exit(f"invalid digit template set {path}: missing digits object") - - templates = {} - for digit in "0123456789": - samples = digits.get(digit) - if not isinstance(samples, list) or not samples: - sys.exit(f"invalid digit template set {path}: missing samples for digit {digit}") - templates[digit] = [] - for index, grid in enumerate(samples): - validate_template_grid(grid, digit, index, path) - templates[digit].append(grid) - - return templates - - -def templates_from_dataset(paths, reference_crop, reference_size, scale_mode, cropped): - templates = {str(digit): [] for digit in range(10)} - for path in paths: - expected = os.path.splitext(os.path.basename(path))[0] - if not expected.isdigit(): - sys.exit(f"template source filename must be numeric: {path}") - - crop = resolve_crop(path, None, reference_crop, reference_size, scale_mode, cropped) - width, height, raw = load_queue_image(path, crop, cropped) - components = connected_components(orange_digit_mask(width, height, raw)) - if len(components) != len(expected): - sys.exit( - f"template source {path} produced {len(components)} digit components, " - f"but filename expects {len(expected)}" - ) - - for digit, component in zip(expected, components): - templates[digit].append(normalize(component)) - - missing = [digit for digit, samples in templates.items() if not samples] - if missing: - sys.exit(f"template dataset has no samples for digits: {', '.join(missing)}") - return templates - - -def write_template_set(output, templates, source_paths): - payload = { - "version": 1, - "normalize_size": [24, 36], - "source": "regression digit crops", - "source_files": [os.path.basename(path) for path in source_paths], - "digits": templates, - } - directory = os.path.dirname(output) - if directory: - os.makedirs(directory, exist_ok=True) - temp_path = f"{output}.tmp" - with open(temp_path, "w", encoding="utf-8") as handle: - json.dump(payload, handle, separators=(",", ":")) - handle.write("\n") - os.replace(temp_path, output) - - -def resolve_templates(template_set, font, pointsize): - if font: - return build_templates(font, pointsize) - return load_template_set(template_set) - - -def classify(component, templates): - sample = normalize(component) - ranked = [] - for digit, digit_templates in templates.items(): - if digit_templates and isinstance(digit_templates[0][0], bool): - digit_templates = [digit_templates] - for template in digit_templates: - hamming, iou = template_distance(sample, template) - ranked.append((hamming, -iou, digit, iou)) - ranked.sort() - hamming, negative_iou, digit, iou = ranked[0] - return digit, hamming, iou - - -def helper_path(): - if os.path.exists(SYSTEM_KWIN_HELPER) and os.access(SYSTEM_KWIN_HELPER, os.X_OK): - return SYSTEM_KWIN_HELPER - return os.path.join(SCRIPT_DIR, "kwin_capture_screen") - - -def require_helper(): - helper = helper_path() - if not os.path.exists(helper): - sys.exit(f"missing {helper}; run `make` in {SCRIPT_DIR}") - return helper - - -def run_helper(args, stdout=None, stderr=None): - helper = require_helper() - try: - return subprocess.run([helper, *args], check=True, stdout=stdout, stderr=stderr, text=True) - except subprocess.CalledProcessError as exc: - message = exc.stderr.strip() if exc.stderr else str(exc) - sys.exit(message) - - -def capture_named_screen(output, screen): - helper = require_helper() - subprocess.run( - [helper, "--capture-screen", screen, output], - check=True, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - text=True, - ) - - -def kwin_windows(): - result = run_helper(["--list-windows"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - try: - windows = json.loads(result.stdout) - except json.JSONDecodeError as exc: - sys.exit(f"failed to parse KWin window list JSON: {exc}: {result.stdout!r}") - if not isinstance(windows, list): - sys.exit("KWin window probe returned non-list JSON") - return windows - - -def rect_area(rect): - if not isinstance(rect, dict): - return 0 - return max(0, int(rect.get("width", 0) or 0)) * max(0, int(rect.get("height", 0) or 0)) - - -def window_geometry_text(window): - geometry = window.get("frameGeometry") or {} - x = int(geometry.get("x", 0) or 0) - y = int(geometry.get("y", 0) or 0) - width = int(geometry.get("width", 0) or 0) - height = int(geometry.get("height", 0) or 0) - return f"{x},{y},{width}x{height}" - - -def print_kwin_windows(windows): - print("internalId\tpid\tclass\toutput\tgeometry\tflags\tcaption") - for window in windows: - flags = [] - if window.get("normalWindow"): - flags.append("normal") - if window.get("fullScreen"): - flags.append("fullscreen") - if window.get("minimized"): - flags.append("minimized") - if window.get("hidden"): - flags.append("hidden") - if window.get("skipTaskbar"): - flags.append("skip-taskbar") - print( - f"{window.get('internalId', '')}\t" - f"{window.get('pid', '')}\t" - f"{window.get('resourceClass', '')}\t" - f"{window.get('output', '')}\t" - f"{window_geometry_text(window)}\t" - f"{','.join(flags)}\t" - f"{window.get('caption', '')}" - ) - - -def format_window_candidate(window): - return ( - f"internalId={window.get('internalId', '')} " - f"pid={window.get('pid', '')} " - f"class={window.get('resourceClass', '')!r} " - f"output={window.get('output', '')!r} " - f"geometry={window_geometry_text(window)} " - f"caption={window.get('caption', '')!r}" - ) - - -def print_window_candidates(windows, heading): - print(heading, file=sys.stderr) - candidates = sorted( - windows, - key=lambda window: ( - bool(window.get("normalWindow") or window.get("fullScreen")), - rect_area(window.get("frameGeometry")), - ), - reverse=True, - ) - for window in candidates[:20]: - print(f" {format_window_candidate(window)}", file=sys.stderr) - - -def kwin_auto_score(window): - caption = str(window.get("caption", "") or "") - resource_class = str(window.get("resourceClass", "") or "") - resource_name = str(window.get("resourceName", "") or "") - desktop_file_name = str(window.get("desktopFileName", "") or "") - window_role = str(window.get("windowRole", "") or "") - identity_text = " ".join((resource_class, resource_name, desktop_file_name, window_role)).lower() - caption_text = caption.lower() - - score = 0 - if "steam_app_1874880" in identity_text: - score += 200 - if re.search(r"\b(arma|reforger)\b", identity_text): - score += 120 - if caption_text == "arma reforger": - score += 100 - elif re.search(r"\barma reforger\b", caption_text): - score += 80 - elif re.search(r"\b(arma|reforger)\b", caption_text): - score += 20 - - if window.get("fullScreen"): - score += 15 - if window.get("normalWindow"): - score += 10 - if not window.get("minimized") and not window.get("hidden"): - score += 5 - - return score - - -def auto_kwin_window(windows): - scored = [(kwin_auto_score(window), window) for window in windows] - matches = [(score, window) for score, window in scored if score > 0] - if not matches: - print_window_candidates(windows, "no KWin window matched Arma/Reforger; current candidates:") - sys.exit("no Arma/Reforger KWin window found") - - ranked = sorted( - matches, - key=lambda item: ( - item[0], - bool(item[1].get("normalWindow") or item[1].get("fullScreen")), - not bool(item[1].get("minimized") or item[1].get("hidden")), - rect_area(item[1].get("frameGeometry")), - ), - reverse=True, - ) - top_score_value, top_window = ranked[0] - top_score = ( - top_score_value, - bool(top_window.get("normalWindow") or top_window.get("fullScreen")), - not bool(top_window.get("minimized") or top_window.get("hidden")), - rect_area(top_window.get("frameGeometry")), - ) - ambiguous = [ - window - for score, window in ranked - if ( - score, - bool(window.get("normalWindow") or window.get("fullScreen")), - not bool(window.get("minimized") or window.get("hidden")), - rect_area(window.get("frameGeometry")), - ) - == top_score - ] - if len(ambiguous) > 1: - print_window_candidates(ambiguous, "multiple equally ranked Arma/Reforger KWin windows matched:") - sys.exit("ambiguous Arma/Reforger KWin window match; pass --kwin-window-id or --kwin-window PID") - - return top_window - - -def kwin_window_by_id(windows, internal_id): - matches = [window for window in windows if str(window.get("internalId", "")) == str(internal_id)] - if not matches: - print_window_candidates(windows, f"KWin window id {internal_id!r} was not found; current candidates:") - sys.exit(f"KWin window id {internal_id!r} not found") - if len(matches) > 1: - print_window_candidates(matches, f"KWin window id {internal_id!r} matched more than one window:") - sys.exit(f"KWin window id {internal_id!r} is ambiguous") - return matches[0] - - -def kwin_window_by_pid(windows, pid): - matches = [window for window in windows if str(window.get("pid", "")) == str(pid)] - if not matches: - print_window_candidates(windows, f"KWin window PID {pid!r} was not found; current candidates:") - sys.exit(f"KWin window PID {pid!r} not found") - if len(matches) > 1: - print_window_candidates(matches, f"KWin window PID {pid!r} matched more than one window:") - sys.exit(f"KWin window PID {pid!r} is ambiguous; pass --kwin-window-id") - return matches[0] - - -def resolve_kwin_window_selector(selector): - windows = kwin_windows() - if selector == "auto": - return auto_kwin_window(windows) - if any(str(window.get("internalId", "")) == str(selector) for window in windows): - return kwin_window_by_id(windows, selector) - if str(selector).isdigit(): - return kwin_window_by_pid(windows, selector) - return kwin_window_by_id(windows, selector) - - -def capture_kwin_window(output, selector): - window = resolve_kwin_window_selector(selector) - internal_id = str(window.get("internalId", "")) - if not internal_id: - sys.exit(f"matched KWin window has no internalId: {format_window_candidate(window)}") - geometry = window.get("frameGeometry") or {} - width = int(geometry.get("width", 0) or 0) - height = int(geometry.get("height", 0) or 0) - if width <= 0 or height <= 0: - sys.exit(f"matched KWin window has invalid geometry: {format_window_candidate(window)}") - - fd, raw_path = tempfile.mkstemp(prefix="reforger-kwin-window-", suffix=".raw") - os.close(fd) - try: - result = run_helper(["--capture-window", internal_id, raw_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - capture_info = {} - if result.stdout.strip(): - try: - capture_info = json.loads(result.stdout) - except json.JSONDecodeError as exc: - sys.exit(f"failed to parse KWin CaptureWindow metadata: {exc}: {result.stdout!r}") - width = int(capture_info.get("width", width) or width) - height = int(capture_info.get("height", height) or height) - expected_size = width * height * 4 - raw_size = os.path.getsize(raw_path) - deadline = time.monotonic() + 2.0 - while raw_size != expected_size and time.monotonic() < deadline: - time.sleep(0.05) - raw_size = os.path.getsize(raw_path) - if raw_size == width * height * 4: - run_bytes(["magick", "-size", f"{width}x{height}", "-depth", "8", f"bgra:{raw_path}", output]) - else: - with open(raw_path, "rb") as handle: - header = handle.read(8) - if header.startswith(b"\x89PNG\r\n\x1a\n"): - run_bytes(["magick", raw_path, "+repage", output]) - else: - sys.exit( - f"KWin CaptureWindow returned {raw_size} raw bytes, but window geometry " - f"is {width}x{height} ({expected_size} BGRA bytes expected); " - f"metadata={capture_info}" - ) - finally: - try: - os.unlink(raw_path) - except OSError: - pass - return window - - -def portal_helper_path(): - return os.path.join(SCRIPT_DIR, "portal_capture_frame") - - -def require_portal_helper(): - helper = portal_helper_path() - if not os.path.exists(helper): - sys.exit(f"missing {helper}; run `make` in {SCRIPT_DIR}") - if not os.access(helper, os.X_OK): - sys.exit(f"portal helper is not executable: {helper}") - return helper - - -def read_portal_restore_token(path): - try: - with open(path, "r", encoding="utf-8") as handle: - return handle.read().strip() - except FileNotFoundError: - return "" - except OSError as exc: - sys.exit(f"failed to read portal restore token {path}: {exc}") - - -def write_portal_restore_token(path, token): - directory = os.path.dirname(path) - if directory: - os.makedirs(directory, exist_ok=True) - temp_path = f"{path}.tmp" - try: - with open(temp_path, "w", encoding="utf-8") as handle: - handle.write(token) - handle.write("\n") - os.replace(temp_path, path) - except OSError as exc: - sys.exit(f"failed to write portal restore token {path}: {exc}") - - -def capture_portal_window(output, restore_token_path, reselect, timeout): - helper = require_portal_helper() - restore_token = "" - if restore_token_path and not reselect: - restore_token = read_portal_restore_token(restore_token_path) - - command = [helper, "--window", output, "--timeout", str(timeout)] - if restore_token_path: - command.append("--persist") - if restore_token: - command.extend(["--restore-token", restore_token]) - - try: - result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - except subprocess.CalledProcessError as exc: - message = exc.stderr.strip() if exc.stderr else str(exc) - sys.exit(message) - - capture_info = {} - if result.stdout.strip(): - try: - capture_info = json.loads(result.stdout) - except json.JSONDecodeError as exc: - sys.exit(f"failed to parse portal capture metadata: {exc}: {result.stdout!r}") - - new_restore_token = str(capture_info.get("restore_token", "") or "") - if restore_token_path and new_restore_token: - write_portal_restore_token(restore_token_path, new_restore_token) - - return capture_info - - - -def parse_geometry(geometry): - match = re.match(r"(-?\d+),(-?\d+),(\d+)x(\d+)", geometry) - if not match: - raise ValueError(f"could not parse screen geometry: {geometry}") - return tuple(int(group) for group in match.groups()) - - -def screen_geometry(screen_name): - screens = kwin_screens() - for screen in screens: - if screen.get("name") == screen_name: - if not screen.get("enabled"): - sys.exit(f"screen {screen_name} is disabled") - geometry = screen.get("geometry") - if not geometry: - sys.exit(f"screen {screen_name} has no geometry") - return parse_geometry(geometry) - available = ", ".join(screen.get("name", "unknown") for screen in screens) - sys.exit(f"unknown screen {screen_name}; available screens: {available}") - - -def virtual_desktop_origin(): - enabled_geometries = [ - parse_geometry(screen["geometry"]) - for screen in kwin_screens() - if screen.get("enabled") and screen.get("geometry") - ] - if not enabled_geometries: - return 0, 0 - return min(geometry[0] for geometry in enabled_geometries), min(geometry[1] for geometry in enabled_geometries) - - -def capture_screen_via_fullscreen_crop(output, screen): - x, y, width, height = screen_geometry(screen) - origin_x, origin_y = virtual_desktop_origin() - crop_x = x - origin_x - crop_y = y - origin_y - - fd, full_path = tempfile.mkstemp(prefix="reforger-fullscreen-", suffix=".png") - os.close(fd) - try: - capture_spectacle(full_path, "fullscreen") - run_bytes(["magick", full_path, "-crop", f"{width}x{height}+{crop_x}+{crop_y}", "+repage", output]) - finally: - try: - os.unlink(full_path) - except OSError: - pass - - -def capture_spectacle(output, mode): - if mode == "fullscreen": - capture_arg = "--fullscreen" - elif mode == "current": - capture_arg = "--current" - else: - raise ValueError(f"unsupported capture mode: {mode}") - - subprocess.run( - ["spectacle", "--background", capture_arg, "--nonotify", "--output", output], - check=True, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - - -def capture_screenshot(output, mode, screen=None): - if screen: - try: - capture_named_screen(output, screen) - except subprocess.CalledProcessError: - capture_screen_via_fullscreen_crop(output, screen) - return - capture_spectacle(output, mode) - - -def kwin_screens(): - output = run_bytes(["qdbus6", "org.kde.KWin", "/KWin", "supportInformation"]).decode("utf-8", "replace") - screens = [] - current = None - - for line in output.splitlines(): - if re.match(r"^Screen \d+:", line): - if current: - screens.append(current) - current = {} - continue - if current is None: - continue - - stripped = line.strip() - if stripped.startswith("Name:"): - current["name"] = stripped.split(":", 1)[1].strip() - elif stripped.startswith("Enabled:"): - current["enabled"] = stripped.split(":", 1)[1].strip() == "1" - elif stripped.startswith("Geometry:"): - current["geometry"] = stripped.split(":", 1)[1].strip() - - if current: - screens.append(current) - - return screens - - -def print_screens(): - for screen in kwin_screens(): - enabled = "enabled" if screen.get("enabled") else "disabled" - geometry = screen.get("geometry", "no geometry") - print(f"{screen.get('name', 'unknown')}\t{enabled}\t{geometry}") - - -def load_queue_image(image_path, crop, already_cropped=False): - if already_cropped: - return ppm_from_magick([image_path, "+repage"]) - - x, y, width, height = crop - return ppm_from_magick([image_path, "+repage", "-crop", f"{width}x{height}+{x}+{y}", "+repage"]) - - -def read_queue_number(image_path, crop, template_set, font, pointsize, already_cropped=False): - width, height, raw = load_queue_image(image_path, crop, already_cropped) - components = connected_components(orange_digit_mask(width, height, raw)) - templates = resolve_templates(template_set, font, pointsize) - - digits = [] - details = [] - for component in components: - digit, hamming, iou = classify(component, templates) - digits.append(digit) - details.append((digit, component["bbox"], component["area"], hamming, iou)) - - return "".join(digits), details - - -def parse_crop(value): - parts = value.replace(",", " ").split() - if len(parts) != 4: - raise argparse.ArgumentTypeError("crop must be four numbers: x,y,width,height") - try: - return tuple(int(part) for part in parts) - except ValueError as exc: - raise argparse.ArgumentTypeError("crop values must be integers") from exc - - -def parse_size(value): - parts = value.lower().replace(",", "x").split("x") - if len(parts) != 2: - raise argparse.ArgumentTypeError("size must be WIDTHxHEIGHT") - try: - return int(parts[0]), int(parts[1]) - except ValueError as exc: - raise argparse.ArgumentTypeError("size values must be integers") from exc - - -def scale_crop(reference_crop, reference_size, target_size, scale_mode): - reference_width, reference_height = reference_size - target_width, target_height = target_size - x, y, width, height = reference_crop - - if scale_mode == "width": - x_scale = target_width / reference_width - y_scale = x_scale - elif scale_mode == "independent": - x_scale = target_width / reference_width - y_scale = target_height / reference_height - else: - raise ValueError(f"unsupported scale mode: {scale_mode}") - - return ( - round(x * x_scale), - round(y * y_scale), - max(1, round(width * x_scale)), - max(1, round(height * y_scale)), - ) - - -def resolve_crop(image_path, explicit_crop, reference_crop, reference_size, scale_mode, already_cropped): - if already_cropped: - return None - if explicit_crop: - return explicit_crop - return scale_crop(reference_crop, reference_size, image_size(image_path), scale_mode) - - -def main(): - parser = argparse.ArgumentParser(description="Read Arma Reforger queue number from a fixed screen crop.") - parser.add_argument("--image", help="Read from an existing image.") - parser.add_argument("--dataset", help="Read every image in a dataset directory.") - parser.add_argument( - "--build-template-set", - metavar="DATASET", - help=argparse.SUPPRESS, - ) - parser.add_argument( - "--template-output", - default=DEFAULT_TEMPLATE_SET, - help=argparse.SUPPRESS, - ) - parser.add_argument("--list-screens", action="store_true", help="List KWin output names for use with --screen.") - parser.add_argument( - "--list-kwin-windows", - action="store_true", - help="List KWin-managed windows with internal ids for --kwin-window-id.", - ) - parser.add_argument( - "--kwin-window", - metavar="auto|INTERNAL_ID|PID", - help="Capture a KWin window by internal id or PID, or use 'auto' to match Arma/Reforger.", - ) - parser.add_argument("--kwin-window-id", help="Capture a KWin window by explicit internal id.") - parser.add_argument( - "--obs-scene", - nargs="?", - const=OBS_DEFAULT_SCENE, - help=f"Read from an OBS scene screenshot via obs-websocket. Defaults to {OBS_DEFAULT_SCENE!r}.", - ) - parser.add_argument("--obs-host", default=OBS_DEFAULT_HOST, help="OBS WebSocket host.") - parser.add_argument("--obs-port", type=int, default=OBS_DEFAULT_PORT, help="OBS WebSocket port.") - parser.add_argument("--obs-password", help="OBS WebSocket password. Defaults to the local OBS config password.") - parser.add_argument( - "--obs-source", - default=OBS_DEFAULT_SOURCE, - help="PipeWire capture source name to create if the OBS scene has no PipeWire source.", - ) - parser.add_argument( - "--obs-reselect", - action="store_true", - help="Force a fresh OBS PipeWire source before reading the scene. This cannot choose a Wayland window automatically.", - ) - parser.add_argument( - "--obs-reselect-timeout", - type=float, - default=OBS_RESELECT_TIMEOUT, - help="Seconds to wait for a non-blank OBS frame after forcing source reselection.", - ) - parser.add_argument( - "--obs-config", - default=OBS_WEBSOCKET_CONFIG, - help="Path to OBS WebSocket config. Used for password lookup and enabling the server.", - ) - parser.add_argument( - "--portal-window", - action="store_true", - help="Capture a user-selected Wayland window through xdg-desktop-portal/PipeWire.", - ) - parser.add_argument( - "--portal-restore-token", - default=DEFAULT_PORTAL_RESTORE_TOKEN, - help="Path used to store the portal restore token for later runs.", - ) - parser.add_argument( - "--portal-reselect", - action="store_true", - help="Ignore the saved portal restore token and show the portal picker again.", - ) - parser.add_argument( - "--portal-timeout", - type=float, - default=60.0, - help="Seconds to wait for portal selection and the first PipeWire frame.", - ) - parser.add_argument("--cropped", action="store_true", help="Treat input image(s) as already cropped to the queue number.") - parser.add_argument("--capture", action="store_true", help="Capture the current monitor with Spectacle.") - parser.add_argument("--screen", help="KWin output name to capture explicitly, for example DP-3.") - parser.add_argument( - "--capture-mode", - choices=("current", "fullscreen"), - default="current", - help="Spectacle capture mode. Use current for one monitor, fullscreen for the whole desktop.", - ) - parser.add_argument("--crop", type=parse_crop, help="Absolute crop as x,y,width,height.") - parser.add_argument( - "--reference-crop", - type=parse_crop, - default=DEFAULT_REFERENCE_CROP, - help="Reference crop as x,y,width,height, scaled to the input image size when --crop is omitted.", - ) - parser.add_argument( - "--reference-size", - type=parse_size, - default=DEFAULT_REFERENCE_SIZE, - help="Reference screen size for --reference-crop scaling.", - ) - parser.add_argument( - "--scale-mode", - choices=("width", "independent"), - default="width", - help="How to scale --reference-crop. Width mode preserves a 16:9 UI layout and is the default.", - ) - parser.add_argument( - "--template-set", - default=DEFAULT_TEMPLATE_SET, - help="Digit template JSON used by default recognition.", - ) - parser.add_argument("--font", help="Render synthetic digit templates from an ImageMagick font instead of using --template-set.") - parser.add_argument("--pointsize", type=int, default=DEFAULT_POINTSIZE, help="Point size used for templates.") - parser.add_argument("--debug", action="store_true", help="Print component and match details.") - parser.add_argument("--show-crop", action="store_true", help="Print the resolved crop used for each full-size image.") - parser.add_argument("--save-input", help="Save the live captured/KWin/OBS input image for debugging.") - parser.add_argument( - "--expect-filenames", - action="store_true", - help="In dataset mode, compare detected values to each image filename stem.", - ) - args = parser.parse_args() - - if args.screen and not args.capture: - parser.error("--screen can only be used with --capture") - - if args.kwin_window_id and args.kwin_window: - parser.error("use only one of --kwin-window or --kwin-window-id") - - if args.obs_password and not args.obs_scene: - parser.error("--obs-password can only be used with --obs-scene") - - if args.portal_reselect and not args.portal_window: - parser.error("--portal-reselect can only be used with --portal-window") - - if args.portal_timeout <= 0: - parser.error("--portal-timeout must be positive") - - kwin_window_selector = args.kwin_window_id or args.kwin_window - if sum( - bool(value) - for value in ( - args.image, - args.dataset, - args.build_template_set, - args.capture, - args.list_screens, - args.list_kwin_windows, - args.obs_scene, - kwin_window_selector, - args.portal_window, - ) - ) != 1: - parser.error( - "choose exactly one of --image, --dataset, --capture, --obs-scene, --portal-window, " - "--kwin-window/--kwin-window-id, --list-screens, or --list-kwin-windows" - ) - - if args.list_screens: - print_screens() - return 0 - - if args.list_kwin_windows: - print_kwin_windows(kwin_windows()) - return 0 - - if args.build_template_set: - extensions = {".png", ".jpg", ".jpeg", ".webp", ".bmp"} - paths = [ - os.path.join(args.build_template_set, name) - for name in sorted(os.listdir(args.build_template_set)) - if os.path.splitext(name)[1].lower() in extensions - ] - if not paths: - print("no template source images found") - return 2 - templates = templates_from_dataset(paths, args.reference_crop, args.reference_size, args.scale_mode, args.cropped) - write_template_set(args.template_output, templates, paths) - print(args.template_output) - return 0 - - if args.dataset: - extensions = {".png", ".jpg", ".jpeg", ".webp", ".bmp"} - paths = [ - os.path.join(args.dataset, name) - for name in sorted(os.listdir(args.dataset)) - if os.path.splitext(name)[1].lower() in extensions - ] - if not paths: - print("no dataset images found") - return 2 - - status = 0 - passed = 0 - failed = 0 - for path in paths: - crop = resolve_crop(path, args.crop, args.reference_crop, args.reference_size, args.scale_mode, args.cropped) - if args.show_crop and crop: - print(f"{path}\tcrop={crop[0]},{crop[1]},{crop[2]},{crop[3]}") - number, details = read_queue_number(path, crop, args.template_set, args.font, args.pointsize, args.cropped) - if not number: - status = 2 - failed += 1 - suffix = "" - if args.expect_filenames: - expected = os.path.splitext(os.path.basename(path))[0] - suffix = f"\texpected={expected}\tFAIL" - print(f"{path}\tno digits found{suffix}") - continue - - suffix = "" - if args.expect_filenames: - expected = os.path.splitext(os.path.basename(path))[0] - if number == expected: - passed += 1 - suffix = f"\texpected={expected}\tPASS" - else: - failed += 1 - status = 1 - suffix = f"\texpected={expected}\tFAIL" - - print(f"{path}\t{number}{suffix}") - if args.debug: - for digit, bbox, area, hamming, iou in details: - print(f" digit={digit} bbox={bbox} area={area} hamming={hamming:.3f} iou={iou:.3f}") - if args.expect_filenames: - print(f"summary: {passed} passed, {failed} failed") - return status - - temp_path = None - image_path = args.image - kwin_window = None - portal_info = None - if args.capture or args.obs_scene or kwin_window_selector or args.portal_window: - fd, temp_path = tempfile.mkstemp(prefix="reforger-queue-", suffix=".png") - os.close(fd) - image_path = temp_path - - if args.capture: - capture_screenshot(image_path, args.capture_mode, args.screen) - - if kwin_window_selector: - kwin_window = capture_kwin_window(image_path, kwin_window_selector) - - if args.portal_window: - portal_info = capture_portal_window( - image_path, - args.portal_restore_token, - args.portal_reselect, - args.portal_timeout, - ) - - if args.obs_scene: - config, effective_port = prepare_obs(args.obs_host, args.obs_port, args.obs_scene, args.obs_config) - password = args.obs_password - if password is None and config.get("auth_required", True): - password = config.get("server_password", "") - save_obs_scene_screenshot_with_setup( - image_path, - args.obs_scene, - args.obs_source, - args.obs_host, - effective_port, - password, - force_reselect=args.obs_reselect, - wait_for_nonblank=args.obs_reselect, - timeout=args.obs_reselect_timeout, - ) - if image_is_blank(image_path): - sys.exit( - "OBS scene screenshot is blank. On this OBS/Wayland setup, PipeWire sources " - "only expose RestoreToken/ShowCursor settings over WebSocket, so the CLI cannot " - "select the Arma Reforger process automatically. Use a persistent monitor source " - "or reselect the Arma source in OBS." - ) - - if args.save_input and image_path and (args.capture or args.obs_scene or kwin_window_selector or args.portal_window): - run_bytes(["magick", image_path, "+repage", args.save_input]) - - try: - crop = resolve_crop(image_path, args.crop, args.reference_crop, args.reference_size, args.scale_mode, args.cropped) - if args.show_crop and crop: - print(f"crop={crop[0]},{crop[1]},{crop[2]},{crop[3]}") - number, details = read_queue_number(image_path, crop, args.template_set, args.font, args.pointsize, args.cropped) - finally: - if temp_path: - try: - os.unlink(temp_path) - except OSError: - pass - - if not number: - print("no digits found") - return 2 - - print(number) - if args.debug: - if kwin_window: - print(f"kwin-window {format_window_candidate(kwin_window)}") - if portal_info: - print(f"portal-window node_id={portal_info.get('node_id', '')} streams={len(portal_info.get('streams', []))}") - if portal_info.get("restore_token") and args.portal_restore_token: - print(f"portal-restore-token {args.portal_restore_token}") - for digit, bbox, area, hamming, iou in details: - print(f"digit={digit} bbox={bbox} area={area} hamming={hamming:.3f} iou={iou:.3f}") - - return 0 +from reforger_queue.cli import main if __name__ == "__main__": diff --git a/uv.lock b/uv.lock index 322d41a..7acb917 100644 --- a/uv.lock +++ b/uv.lock @@ -4,5 +4,5 @@ requires-python = ">=3.10" [[package]] name = "anti-prestige-tool" -version = "0.1.0" +version = "0.7.5" source = { virtual = "." }