#!/usr/bin/env python3 import argparse import base64 import hashlib import json import os import re import socket import struct import subprocess import sys import tempfile import time import uuid DEFAULT_REFERENCE_SIZE = (2560, 1440) DEFAULT_REFERENCE_CROP = (1050, 620, 100, 60) DEFAULT_FONT = "Roboto-Condensed" DEFAULT_POINTSIZE = 43 SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) SYSTEM_KWIN_HELPER = "/usr/local/bin/reforger-kwin-capture" DEFAULT_PORTAL_RESTORE_TOKEN = os.path.expanduser( "~/.local/state/anti-prestige-tool/portal-window-restore-token" ) OBS_WEBSOCKET_CONFIG = os.path.expanduser("~/.config/obs-studio/plugin_config/obs-websocket/config.json") OBS_DEFAULT_HOST = "127.0.0.1" OBS_DEFAULT_PORT = 4455 OBS_DEFAULT_SCENE = "APT" OBS_DEFAULT_SOURCE = "APT Capture" OBS_CANVAS_SIZE = (1920, 1080) OBS_PIPEWIRE_INPUT_KIND = "pipewire-screen-capture-source" OBS_RESELECT_TIMEOUT = 30.0 def run_bytes(args): try: return subprocess.run(args, check=True, stdout=subprocess.PIPE).stdout except FileNotFoundError: sys.exit(f"missing required command: {args[0]}") except subprocess.CalledProcessError as exc: sys.exit(f"command failed ({exc.returncode}): {' '.join(args)}") def run_text(args): return run_bytes(args).decode("utf-8", "replace") def obs_is_running(): return subprocess.run( ["pgrep", "-x", "obs"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ).returncode == 0 def tcp_port_open(host, port, timeout=0.25): try: with socket.create_connection((host, port), timeout=timeout): return True except OSError: return False def read_obs_websocket_config(path=OBS_WEBSOCKET_CONFIG): try: with open(path, "r", encoding="utf-8") as handle: return json.load(handle) except FileNotFoundError: return {} except json.JSONDecodeError as exc: sys.exit(f"failed to parse OBS WebSocket config {path}: {exc}") def write_obs_websocket_config(config, path=OBS_WEBSOCKET_CONFIG): directory = os.path.dirname(path) os.makedirs(directory, exist_ok=True) temp_path = f"{path}.tmp" with open(temp_path, "w", encoding="utf-8") as handle: json.dump(config, handle, indent=2) handle.write("\n") os.replace(temp_path, path) def ensure_obs_websocket_config_enabled(path=OBS_WEBSOCKET_CONFIG): config = read_obs_websocket_config(path) changed = not config.get("server_enabled", False) config.setdefault("alerts_enabled", False) config.setdefault("auth_required", True) config.setdefault("first_load", False) config.setdefault("server_port", OBS_DEFAULT_PORT) config["server_enabled"] = True if config.get("auth_required", True) and not config.get("server_password"): sys.exit(f"OBS WebSocket auth is enabled but {path} has no server_password") if changed: write_obs_websocket_config(config, path) return config, changed def launch_obs(scene): subprocess.Popen( ["obs", "--scene", scene, "--minimize-to-tray"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True, ) def wait_for_port(host, port, timeout=15.0): deadline = time.monotonic() + timeout while time.monotonic() < deadline: if tcp_port_open(host, port, timeout=0.25): return True time.sleep(0.25) return False def prepare_obs(host, port, scene, config_path=OBS_WEBSOCKET_CONFIG): running = obs_is_running() config, config_changed = ensure_obs_websocket_config_enabled(config_path) effective_port = int(config.get("server_port", port)) if running and not tcp_port_open(host, effective_port): if config_changed: sys.exit( "Enabled OBS WebSocket in the OBS config, but OBS is already running. " "Restart OBS, then rerun this command." ) sys.exit( f"OBS is running, but OBS WebSocket is not listening on {host}:{effective_port}. " "Enable OBS WebSocket or restart OBS." ) if not running: launch_obs(scene) if not wait_for_port(host, effective_port): sys.exit(f"started OBS, but OBS WebSocket did not become available on {host}:{effective_port}") return config, effective_port class ObsWebSocketClient: def __init__(self, host, port, password=None, timeout=5.0): self.host = host self.port = port self.password = password or "" self.timeout = timeout self.sock = None def __enter__(self): self.connect() return self def __exit__(self, exc_type, exc, traceback): self.close() def connect(self): self.sock = socket.create_connection((self.host, self.port), timeout=self.timeout) self._http_upgrade() hello = self.recv_json() if hello.get("op") != 0: raise RuntimeError(f"expected OBS Hello, got: {hello}") identify = {"rpcVersion": 1, "eventSubscriptions": 0} auth = hello.get("d", {}).get("authentication") if auth: identify["authentication"] = self.auth_response(auth) self.send_json({"op": 1, "d": identify}) identified = self.recv_json() if identified.get("op") != 2: raise RuntimeError(f"OBS identification failed: {identified}") def close(self): if self.sock: try: self.sock.close() finally: self.sock = None def _http_upgrade(self): key = base64.b64encode(os.urandom(16)).decode("ascii") request = ( f"GET / HTTP/1.1\r\n" f"Host: {self.host}:{self.port}\r\n" "Upgrade: websocket\r\n" "Connection: Upgrade\r\n" f"Sec-WebSocket-Key: {key}\r\n" "Sec-WebSocket-Version: 13\r\n" "Sec-WebSocket-Protocol: obswebsocket.json\r\n" "\r\n" ) self.sock.sendall(request.encode("ascii")) response = b"" while b"\r\n\r\n" not in response: chunk = self.sock.recv(4096) if not chunk: raise RuntimeError("OBS WebSocket closed during HTTP upgrade") response += chunk status_line = response.split(b"\r\n", 1)[0] if b" 101 " not in status_line: raise RuntimeError(f"OBS WebSocket upgrade failed: {status_line.decode('ascii', 'replace')}") def auth_response(self, auth): salt = auth["salt"] challenge = auth["challenge"] secret = base64.b64encode(hashlib.sha256((self.password + salt).encode("utf-8")).digest()).decode("ascii") return base64.b64encode(hashlib.sha256((secret + challenge).encode("utf-8")).digest()).decode("ascii") def read_exact(self, length): chunks = [] remaining = length while remaining: chunk = self.sock.recv(remaining) if not chunk: raise RuntimeError("OBS WebSocket closed unexpectedly") chunks.append(chunk) remaining -= len(chunk) return b"".join(chunks) def recv_frame(self): header = self.read_exact(2) first, second = header opcode = first & 0x0F masked = bool(second & 0x80) length = second & 0x7F if length == 126: length = struct.unpack("!H", self.read_exact(2))[0] elif length == 127: length = struct.unpack("!Q", self.read_exact(8))[0] mask = self.read_exact(4) if masked else None payload = self.read_exact(length) if length else b"" if mask: payload = bytes(byte ^ mask[index % 4] for index, byte in enumerate(payload)) return opcode, payload def send_frame(self, opcode, payload): if isinstance(payload, str): payload = payload.encode("utf-8") mask = os.urandom(4) length = len(payload) if length < 126: header = struct.pack("!BB", 0x80 | opcode, 0x80 | length) elif length <= 0xFFFF: header = struct.pack("!BBH", 0x80 | opcode, 0x80 | 126, length) else: header = struct.pack("!BBQ", 0x80 | opcode, 0x80 | 127, length) masked_payload = bytes(byte ^ mask[index % 4] for index, byte in enumerate(payload)) self.sock.sendall(header + mask + masked_payload) def recv_json(self): while True: opcode, payload = self.recv_frame() if opcode == 1: return json.loads(payload.decode("utf-8")) if opcode == 8: raise RuntimeError("OBS WebSocket closed the connection") if opcode == 9: self.send_frame(10, payload) def send_json(self, message): self.send_frame(1, json.dumps(message, separators=(",", ":"))) def request(self, request_type, request_data=None): request_id = str(uuid.uuid4()) message = { "op": 6, "d": { "requestType": request_type, "requestId": request_id, }, } if request_data is not None: message["d"]["requestData"] = request_data self.send_json(message) while True: response = self.recv_json() if response.get("op") != 7: continue data = response.get("d", {}) if data.get("requestId") != request_id: continue status = data.get("requestStatus", {}) if not status.get("result"): comment = status.get("comment", "no error details") raise RuntimeError(f"OBS request {request_type} failed: {comment}") return data.get("responseData", {}) def decode_obs_image_data(image_data): if "," in image_data: image_data = image_data.split(",", 1)[1] return base64.b64decode(image_data) def save_obs_scene_screenshot(output, scene, host, port, password): with ObsWebSocketClient(host, port, password=password) as client: scenes = client.request("GetSceneList").get("scenes", []) scene_names = {scene_info.get("sceneName") for scene_info in scenes} if scene not in scene_names: available = ", ".join(sorted(name for name in scene_names if name)) sys.exit(f"OBS scene {scene!r} was not found. Available scenes: {available}") data = client.request( "GetSourceScreenshot", { "sourceName": scene, "imageFormat": "png", }, ) image_bytes = decode_obs_image_data(data["imageData"]) with open(output, "wb") as handle: handle.write(image_bytes) def scene_names(client): return {scene.get("sceneName") for scene in client.request("GetSceneList").get("scenes", [])} def ensure_obs_scene_exists(client, scene): if scene not in scene_names(client): client.request("CreateScene", {"sceneName": scene}) def ensure_obs_video_settings(client): settings = client.request("GetVideoSettings") width, height = OBS_CANVAS_SIZE if ( settings.get("baseWidth") == width and settings.get("baseHeight") == height and settings.get("outputWidth") == width and settings.get("outputHeight") == height ): return client.request( "SetVideoSettings", { "baseWidth": width, "baseHeight": height, "outputWidth": width, "outputHeight": height, "fpsNumerator": settings.get("fpsNumerator", 60), "fpsDenominator": settings.get("fpsDenominator", 1), }, ) def get_scene_items(client, scene): return client.request("GetSceneItemList", {"sceneName": scene}).get("sceneItems", []) def find_pipewire_scene_item(client, scene): fallback = None for item in get_scene_items(client, scene): if item.get("inputKind") != OBS_PIPEWIRE_INPUT_KIND: continue if fallback is None: fallback = item if item.get("sceneItemEnabled", True): return item return fallback def disable_pipewire_scene_items(client, scene): for item in get_scene_items(client, scene): if item.get("inputKind") == OBS_PIPEWIRE_INPUT_KIND: client.request( "SetSceneItemEnabled", { "sceneName": scene, "sceneItemId": item["sceneItemId"], "sceneItemEnabled": False, }, ) def existing_input_names(client): return {item.get("inputName") for item in client.request("GetInputList").get("inputs", [])} def unique_input_name(client, preferred): names = existing_input_names(client) if preferred not in names: return preferred for index in range(2, 100): candidate = f"{preferred} {index}" if candidate not in names: return candidate return f"{preferred} {int(time.time())}" def create_pipewire_scene_item(client, scene, source_name): source_name = unique_input_name(client, source_name) response = client.request( "CreateInput", { "sceneName": scene, "inputName": source_name, "inputKind": OBS_PIPEWIRE_INPUT_KIND, "inputSettings": {"ShowCursor": False}, "sceneItemEnabled": True, }, ) item_id = response.get("sceneItemId") if item_id is None: item_id = client.request("GetSceneItemId", {"sceneName": scene, "sourceName": source_name})["sceneItemId"] return {"sceneItemId": item_id, "sourceName": source_name} def normalize_obs_scene_item_transform(client, scene, item_id): width, height = OBS_CANVAS_SIZE client.request( "SetSceneItemTransform", { "sceneName": scene, "sceneItemId": item_id, "sceneItemTransform": { "alignment": 5, "positionX": 0.0, "positionY": 0.0, "rotation": 0.0, "scaleX": 1.0, "scaleY": 1.0, "boundsType": "OBS_BOUNDS_SCALE_INNER", "boundsAlignment": 0, "boundsWidth": float(width), "boundsHeight": float(height), "cropToBounds": False, "cropLeft": 0, "cropTop": 0, "cropRight": 0, "cropBottom": 0, }, }, ) def ensure_obs_scene_ready(client, scene, source_name, force_reselect=False): ensure_obs_scene_exists(client, scene) ensure_obs_video_settings(client) client.request("SetCurrentProgramScene", {"sceneName": scene}) if force_reselect: disable_pipewire_scene_items(client, scene) item = create_pipewire_scene_item(client, scene, source_name) else: item = find_pipewire_scene_item(client, scene) if item is None: item = create_pipewire_scene_item(client, scene, source_name) item_id = item["sceneItemId"] source = item["sourceName"] client.request("SetSceneItemEnabled", {"sceneName": scene, "sceneItemId": item_id, "sceneItemEnabled": True}) normalize_obs_scene_item_transform(client, scene, item_id) active = client.request("GetSourceActive", {"sourceName": source}) if not active.get("videoActive") or not active.get("videoShowing"): sys.exit( f"OBS source {source!r} is not active. Open OBS, select scene {scene!r}, " "and reselect the Arma Reforger window for the PipeWire source." ) return source def write_obs_scene_screenshot(client, output, scene): data = client.request( "GetSourceScreenshot", { "sourceName": scene, "imageFormat": "png", }, ) image_bytes = decode_obs_image_data(data["imageData"]) with open(output, "wb") as handle: handle.write(image_bytes) def save_obs_scene_screenshot_with_setup( output, scene, source_name, host, port, password, force_reselect=False, wait_for_nonblank=False, timeout=OBS_RESELECT_TIMEOUT, ): with ObsWebSocketClient(host, port, password=password) as client: ensure_obs_scene_ready(client, scene, source_name, force_reselect=force_reselect) deadline = time.monotonic() + timeout while True: write_obs_scene_screenshot(client, output, scene) if not wait_for_nonblank or not image_is_blank(output): return if time.monotonic() >= deadline: return time.sleep(0.5) def ppm_from_magick(args): data = run_bytes(["magick", *args, "-alpha", "off", "-depth", "8", "ppm:-"]) idx = 0 def next_token(): nonlocal idx while idx < len(data) and data[idx] in b" \t\r\n": idx += 1 if idx < len(data) and data[idx] == ord("#"): while idx < len(data) and data[idx] not in b"\r\n": idx += 1 return next_token() start = idx while idx < len(data) and data[idx] not in b" \t\r\n": idx += 1 return data[start:idx] magic = next_token() if magic != b"P6": sys.exit("ImageMagick did not return binary PPM data") width = int(next_token()) height = int(next_token()) max_value = int(next_token()) if max_value != 255: sys.exit(f"unsupported PPM max value: {max_value}") while idx < len(data) and data[idx] in b" \t\r\n": idx += 1 return width, height, data[idx:] def image_size(image_path): output = run_bytes(["magick", "identify", "-format", "%w %h", image_path]).decode("ascii") width, height = output.split() return int(width), int(height) def image_is_blank(image_path, bright_threshold=12, min_nonblank_fraction=0.001): width, height, raw = ppm_from_magick([image_path, "+repage", "-resize", "160x90!"]) if not width or not height: return True nonblank = 0 total = width * height for offset in range(0, len(raw), 3): if max(raw[offset], raw[offset + 1], raw[offset + 2]) > bright_threshold: nonblank += 1 return (nonblank / total) < min_nonblank_fraction def orange_digit_mask(width, height, raw): mask = [[False] * width for _ in range(height)] for y in range(height): row_offset = y * width * 3 for x in range(width): offset = row_offset + x * 3 r, g, b = raw[offset], raw[offset + 1], raw[offset + 2] mask[y][x] = r >= 115 and 55 <= g <= 205 and b <= 100 and r - g >= 20 return mask def white_mask(width, height, raw): mask = [[False] * width for _ in range(height)] for y in range(height): row_offset = y * width * 3 for x in range(width): offset = row_offset + x * 3 r, g, b = raw[offset], raw[offset + 1], raw[offset + 2] mask[y][x] = r >= 120 and g >= 120 and b >= 120 return mask def connected_components(mask): height = len(mask) width = len(mask[0]) if height else 0 seen = [[False] * width for _ in range(height)] components = [] for y in range(height): for x in range(width): if seen[y][x] or not mask[y][x]: continue stack = [(x, y)] seen[y][x] = True points = set() while stack: cx, cy = stack.pop() points.add((cx, cy)) for ny in range(cy - 1, cy + 2): for nx in range(cx - 1, cx + 2): if nx < 0 or ny < 0 or nx >= width or ny >= height: continue if seen[ny][nx] or not mask[ny][nx]: continue seen[ny][nx] = True stack.append((nx, ny)) xs = [point[0] for point in points] ys = [point[1] for point in points] bbox = (min(xs), min(ys), max(xs), max(ys)) box_width = bbox[2] - bbox[0] + 1 box_height = bbox[3] - bbox[1] + 1 if len(points) >= 40 and box_width >= 5 and box_height >= 15: components.append({"points": points, "bbox": bbox, "area": len(points)}) components.sort(key=lambda component: component["bbox"][0]) return components def normalize(component, width=24, height=36): x1, y1, x2, y2 = component["bbox"] source_width = x2 - x1 + 1 source_height = y2 - y1 + 1 points = component["points"] output = [] for y in range(height): source_y = y1 + int((y + 0.5) * source_height / height) row = [] for x in range(width): source_x = x1 + int((x + 0.5) * source_width / width) row.append((source_x, source_y) in points) output.append(row) return output def template_distance(left, right): total = len(left) * len(left[0]) differences = 0 intersection = 0 union = 0 for left_row, right_row in zip(left, right): for left_value, right_value in zip(left_row, right_row): differences += left_value != right_value intersection += left_value and right_value union += left_value or right_value hamming = differences / total iou = intersection / union if union else 0.0 return hamming, iou def build_templates(font, pointsize): width, height, raw = ppm_from_magick( [ "-background", "black", "-fill", "white", "-font", font, "-pointsize", str(pointsize), "label:0123456789", ] ) components = connected_components(white_mask(width, height, raw)) if len(components) < 10: sys.exit(f"template rendering produced only {len(components)} digit components") return {str(index): normalize(component) for index, component in enumerate(components[:10])} def classify(component, templates): sample = normalize(component) ranked = [] for digit, template in templates.items(): hamming, iou = template_distance(sample, template) ranked.append((hamming, -iou, digit, iou)) ranked.sort() hamming, negative_iou, digit, iou = ranked[0] return digit, hamming, iou def helper_path(): if os.path.exists(SYSTEM_KWIN_HELPER) and os.access(SYSTEM_KWIN_HELPER, os.X_OK): return SYSTEM_KWIN_HELPER return os.path.join(SCRIPT_DIR, "kwin_capture_screen") def require_helper(): helper = helper_path() if not os.path.exists(helper): sys.exit(f"missing {helper}; run `make` in {SCRIPT_DIR}") return helper def run_helper(args, stdout=None, stderr=None): helper = require_helper() try: return subprocess.run([helper, *args], check=True, stdout=stdout, stderr=stderr, text=True) except subprocess.CalledProcessError as exc: message = exc.stderr.strip() if exc.stderr else str(exc) sys.exit(message) def capture_named_screen(output, screen): helper = require_helper() subprocess.run( [helper, "--capture-screen", screen, output], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, text=True, ) def kwin_windows(): result = run_helper(["--list-windows"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) try: windows = json.loads(result.stdout) except json.JSONDecodeError as exc: sys.exit(f"failed to parse KWin window list JSON: {exc}: {result.stdout!r}") if not isinstance(windows, list): sys.exit("KWin window probe returned non-list JSON") return windows def rect_area(rect): if not isinstance(rect, dict): return 0 return max(0, int(rect.get("width", 0) or 0)) * max(0, int(rect.get("height", 0) or 0)) def window_geometry_text(window): geometry = window.get("frameGeometry") or {} x = int(geometry.get("x", 0) or 0) y = int(geometry.get("y", 0) or 0) width = int(geometry.get("width", 0) or 0) height = int(geometry.get("height", 0) or 0) return f"{x},{y},{width}x{height}" def print_kwin_windows(windows): print("internalId\tpid\tclass\toutput\tgeometry\tflags\tcaption") for window in windows: flags = [] if window.get("normalWindow"): flags.append("normal") if window.get("fullScreen"): flags.append("fullscreen") if window.get("minimized"): flags.append("minimized") if window.get("hidden"): flags.append("hidden") if window.get("skipTaskbar"): flags.append("skip-taskbar") print( f"{window.get('internalId', '')}\t" f"{window.get('pid', '')}\t" f"{window.get('resourceClass', '')}\t" f"{window.get('output', '')}\t" f"{window_geometry_text(window)}\t" f"{','.join(flags)}\t" f"{window.get('caption', '')}" ) def format_window_candidate(window): return ( f"internalId={window.get('internalId', '')} " f"pid={window.get('pid', '')} " f"class={window.get('resourceClass', '')!r} " f"output={window.get('output', '')!r} " f"geometry={window_geometry_text(window)} " f"caption={window.get('caption', '')!r}" ) def print_window_candidates(windows, heading): print(heading, file=sys.stderr) candidates = sorted( windows, key=lambda window: ( bool(window.get("normalWindow") or window.get("fullScreen")), rect_area(window.get("frameGeometry")), ), reverse=True, ) for window in candidates[:20]: print(f" {format_window_candidate(window)}", file=sys.stderr) def kwin_auto_score(window): caption = str(window.get("caption", "") or "") resource_class = str(window.get("resourceClass", "") or "") resource_name = str(window.get("resourceName", "") or "") desktop_file_name = str(window.get("desktopFileName", "") or "") window_role = str(window.get("windowRole", "") or "") identity_text = " ".join((resource_class, resource_name, desktop_file_name, window_role)).lower() caption_text = caption.lower() score = 0 if "steam_app_1874880" in identity_text: score += 200 if re.search(r"\b(arma|reforger)\b", identity_text): score += 120 if caption_text == "arma reforger": score += 100 elif re.search(r"\barma reforger\b", caption_text): score += 80 elif re.search(r"\b(arma|reforger)\b", caption_text): score += 20 if window.get("fullScreen"): score += 15 if window.get("normalWindow"): score += 10 if not window.get("minimized") and not window.get("hidden"): score += 5 return score def auto_kwin_window(windows): scored = [(kwin_auto_score(window), window) for window in windows] matches = [(score, window) for score, window in scored if score > 0] if not matches: print_window_candidates(windows, "no KWin window matched Arma/Reforger; current candidates:") sys.exit("no Arma/Reforger KWin window found") ranked = sorted( matches, key=lambda item: ( item[0], bool(item[1].get("normalWindow") or item[1].get("fullScreen")), not bool(item[1].get("minimized") or item[1].get("hidden")), rect_area(item[1].get("frameGeometry")), ), reverse=True, ) top_score_value, top_window = ranked[0] top_score = ( top_score_value, bool(top_window.get("normalWindow") or top_window.get("fullScreen")), not bool(top_window.get("minimized") or top_window.get("hidden")), rect_area(top_window.get("frameGeometry")), ) ambiguous = [ window for score, window in ranked if ( score, bool(window.get("normalWindow") or window.get("fullScreen")), not bool(window.get("minimized") or window.get("hidden")), rect_area(window.get("frameGeometry")), ) == top_score ] if len(ambiguous) > 1: print_window_candidates(ambiguous, "multiple equally ranked Arma/Reforger KWin windows matched:") sys.exit("ambiguous Arma/Reforger KWin window match; pass --kwin-window-id or --kwin-window PID") return top_window def kwin_window_by_id(windows, internal_id): matches = [window for window in windows if str(window.get("internalId", "")) == str(internal_id)] if not matches: print_window_candidates(windows, f"KWin window id {internal_id!r} was not found; current candidates:") sys.exit(f"KWin window id {internal_id!r} not found") if len(matches) > 1: print_window_candidates(matches, f"KWin window id {internal_id!r} matched more than one window:") sys.exit(f"KWin window id {internal_id!r} is ambiguous") return matches[0] def kwin_window_by_pid(windows, pid): matches = [window for window in windows if str(window.get("pid", "")) == str(pid)] if not matches: print_window_candidates(windows, f"KWin window PID {pid!r} was not found; current candidates:") sys.exit(f"KWin window PID {pid!r} not found") if len(matches) > 1: print_window_candidates(matches, f"KWin window PID {pid!r} matched more than one window:") sys.exit(f"KWin window PID {pid!r} is ambiguous; pass --kwin-window-id") return matches[0] def resolve_kwin_window_selector(selector): windows = kwin_windows() if selector == "auto": return auto_kwin_window(windows) if any(str(window.get("internalId", "")) == str(selector) for window in windows): return kwin_window_by_id(windows, selector) if str(selector).isdigit(): return kwin_window_by_pid(windows, selector) return kwin_window_by_id(windows, selector) def capture_kwin_window(output, selector): window = resolve_kwin_window_selector(selector) internal_id = str(window.get("internalId", "")) if not internal_id: sys.exit(f"matched KWin window has no internalId: {format_window_candidate(window)}") geometry = window.get("frameGeometry") or {} width = int(geometry.get("width", 0) or 0) height = int(geometry.get("height", 0) or 0) if width <= 0 or height <= 0: sys.exit(f"matched KWin window has invalid geometry: {format_window_candidate(window)}") fd, raw_path = tempfile.mkstemp(prefix="reforger-kwin-window-", suffix=".raw") os.close(fd) try: result = run_helper(["--capture-window", internal_id, raw_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE) capture_info = {} if result.stdout.strip(): try: capture_info = json.loads(result.stdout) except json.JSONDecodeError as exc: sys.exit(f"failed to parse KWin CaptureWindow metadata: {exc}: {result.stdout!r}") width = int(capture_info.get("width", width) or width) height = int(capture_info.get("height", height) or height) expected_size = width * height * 4 raw_size = os.path.getsize(raw_path) deadline = time.monotonic() + 2.0 while raw_size != expected_size and time.monotonic() < deadline: time.sleep(0.05) raw_size = os.path.getsize(raw_path) if raw_size == width * height * 4: run_bytes(["magick", "-size", f"{width}x{height}", "-depth", "8", f"bgra:{raw_path}", output]) else: with open(raw_path, "rb") as handle: header = handle.read(8) if header.startswith(b"\x89PNG\r\n\x1a\n"): run_bytes(["magick", raw_path, "+repage", output]) else: sys.exit( f"KWin CaptureWindow returned {raw_size} raw bytes, but window geometry " f"is {width}x{height} ({expected_size} BGRA bytes expected); " f"metadata={capture_info}" ) finally: try: os.unlink(raw_path) except OSError: pass return window def portal_helper_path(): return os.path.join(SCRIPT_DIR, "portal_capture_frame") def require_portal_helper(): helper = portal_helper_path() if not os.path.exists(helper): sys.exit(f"missing {helper}; run `make` in {SCRIPT_DIR}") if not os.access(helper, os.X_OK): sys.exit(f"portal helper is not executable: {helper}") return helper def read_portal_restore_token(path): try: with open(path, "r", encoding="utf-8") as handle: return handle.read().strip() except FileNotFoundError: return "" except OSError as exc: sys.exit(f"failed to read portal restore token {path}: {exc}") def write_portal_restore_token(path, token): directory = os.path.dirname(path) if directory: os.makedirs(directory, exist_ok=True) temp_path = f"{path}.tmp" try: with open(temp_path, "w", encoding="utf-8") as handle: handle.write(token) handle.write("\n") os.replace(temp_path, path) except OSError as exc: sys.exit(f"failed to write portal restore token {path}: {exc}") def capture_portal_window(output, restore_token_path, reselect, timeout): helper = require_portal_helper() restore_token = "" if restore_token_path and not reselect: restore_token = read_portal_restore_token(restore_token_path) command = [helper, "--window", output, "--timeout", str(timeout)] if restore_token_path: command.append("--persist") if restore_token: command.extend(["--restore-token", restore_token]) try: result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) except subprocess.CalledProcessError as exc: message = exc.stderr.strip() if exc.stderr else str(exc) sys.exit(message) capture_info = {} if result.stdout.strip(): try: capture_info = json.loads(result.stdout) except json.JSONDecodeError as exc: sys.exit(f"failed to parse portal capture metadata: {exc}: {result.stdout!r}") new_restore_token = str(capture_info.get("restore_token", "") or "") if restore_token_path and new_restore_token: write_portal_restore_token(restore_token_path, new_restore_token) return capture_info def parse_geometry(geometry): match = re.match(r"(-?\d+),(-?\d+),(\d+)x(\d+)", geometry) if not match: raise ValueError(f"could not parse screen geometry: {geometry}") return tuple(int(group) for group in match.groups()) def screen_geometry(screen_name): screens = kwin_screens() for screen in screens: if screen.get("name") == screen_name: if not screen.get("enabled"): sys.exit(f"screen {screen_name} is disabled") geometry = screen.get("geometry") if not geometry: sys.exit(f"screen {screen_name} has no geometry") return parse_geometry(geometry) available = ", ".join(screen.get("name", "unknown") for screen in screens) sys.exit(f"unknown screen {screen_name}; available screens: {available}") def virtual_desktop_origin(): enabled_geometries = [ parse_geometry(screen["geometry"]) for screen in kwin_screens() if screen.get("enabled") and screen.get("geometry") ] if not enabled_geometries: return 0, 0 return min(geometry[0] for geometry in enabled_geometries), min(geometry[1] for geometry in enabled_geometries) def capture_screen_via_fullscreen_crop(output, screen): x, y, width, height = screen_geometry(screen) origin_x, origin_y = virtual_desktop_origin() crop_x = x - origin_x crop_y = y - origin_y fd, full_path = tempfile.mkstemp(prefix="reforger-fullscreen-", suffix=".png") os.close(fd) try: capture_spectacle(full_path, "fullscreen") run_bytes(["magick", full_path, "-crop", f"{width}x{height}+{crop_x}+{crop_y}", "+repage", output]) finally: try: os.unlink(full_path) except OSError: pass def capture_spectacle(output, mode): if mode == "fullscreen": capture_arg = "--fullscreen" elif mode == "current": capture_arg = "--current" else: raise ValueError(f"unsupported capture mode: {mode}") subprocess.run( ["spectacle", "--background", capture_arg, "--nonotify", "--output", output], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) def capture_screenshot(output, mode, screen=None): if screen: try: capture_named_screen(output, screen) except subprocess.CalledProcessError: capture_screen_via_fullscreen_crop(output, screen) return capture_spectacle(output, mode) def kwin_screens(): output = run_bytes(["qdbus6", "org.kde.KWin", "/KWin", "supportInformation"]).decode("utf-8", "replace") screens = [] current = None for line in output.splitlines(): if re.match(r"^Screen \d+:", line): if current: screens.append(current) current = {} continue if current is None: continue stripped = line.strip() if stripped.startswith("Name:"): current["name"] = stripped.split(":", 1)[1].strip() elif stripped.startswith("Enabled:"): current["enabled"] = stripped.split(":", 1)[1].strip() == "1" elif stripped.startswith("Geometry:"): current["geometry"] = stripped.split(":", 1)[1].strip() if current: screens.append(current) return screens def print_screens(): for screen in kwin_screens(): enabled = "enabled" if screen.get("enabled") else "disabled" geometry = screen.get("geometry", "no geometry") print(f"{screen.get('name', 'unknown')}\t{enabled}\t{geometry}") def load_queue_image(image_path, crop, already_cropped=False): if already_cropped: return ppm_from_magick([image_path, "+repage"]) x, y, width, height = crop return ppm_from_magick([image_path, "+repage", "-crop", f"{width}x{height}+{x}+{y}", "+repage"]) def read_queue_number(image_path, crop, font, pointsize, already_cropped=False): width, height, raw = load_queue_image(image_path, crop, already_cropped) components = connected_components(orange_digit_mask(width, height, raw)) templates = build_templates(font, pointsize) digits = [] details = [] for component in components: digit, hamming, iou = classify(component, templates) digits.append(digit) details.append((digit, component["bbox"], component["area"], hamming, iou)) return "".join(digits), details def parse_crop(value): parts = value.replace(",", " ").split() if len(parts) != 4: raise argparse.ArgumentTypeError("crop must be four numbers: x,y,width,height") try: return tuple(int(part) for part in parts) except ValueError as exc: raise argparse.ArgumentTypeError("crop values must be integers") from exc def parse_size(value): parts = value.lower().replace(",", "x").split("x") if len(parts) != 2: raise argparse.ArgumentTypeError("size must be WIDTHxHEIGHT") try: return int(parts[0]), int(parts[1]) except ValueError as exc: raise argparse.ArgumentTypeError("size values must be integers") from exc def scale_crop(reference_crop, reference_size, target_size, scale_mode): reference_width, reference_height = reference_size target_width, target_height = target_size x, y, width, height = reference_crop if scale_mode == "width": x_scale = target_width / reference_width y_scale = x_scale elif scale_mode == "independent": x_scale = target_width / reference_width y_scale = target_height / reference_height else: raise ValueError(f"unsupported scale mode: {scale_mode}") return ( round(x * x_scale), round(y * y_scale), max(1, round(width * x_scale)), max(1, round(height * y_scale)), ) def resolve_crop(image_path, explicit_crop, reference_crop, reference_size, scale_mode, already_cropped): if already_cropped: return None if explicit_crop: return explicit_crop return scale_crop(reference_crop, reference_size, image_size(image_path), scale_mode) def main(): parser = argparse.ArgumentParser(description="Read Arma Reforger queue number from a fixed screen crop.") parser.add_argument("--image", help="Read from an existing image.") parser.add_argument("--dataset", help="Read every image in a dataset directory.") parser.add_argument("--list-screens", action="store_true", help="List KWin output names for use with --screen.") parser.add_argument( "--list-kwin-windows", action="store_true", help="List KWin-managed windows with internal ids for --kwin-window-id.", ) parser.add_argument( "--kwin-window", metavar="auto|INTERNAL_ID|PID", help="Capture a KWin window by internal id or PID, or use 'auto' to match Arma/Reforger.", ) parser.add_argument("--kwin-window-id", help="Capture a KWin window by explicit internal id.") parser.add_argument( "--obs-scene", nargs="?", const=OBS_DEFAULT_SCENE, help=f"Read from an OBS scene screenshot via obs-websocket. Defaults to {OBS_DEFAULT_SCENE!r}.", ) parser.add_argument("--obs-host", default=OBS_DEFAULT_HOST, help="OBS WebSocket host.") parser.add_argument("--obs-port", type=int, default=OBS_DEFAULT_PORT, help="OBS WebSocket port.") parser.add_argument("--obs-password", help="OBS WebSocket password. Defaults to the local OBS config password.") parser.add_argument( "--obs-source", default=OBS_DEFAULT_SOURCE, help="PipeWire capture source name to create if the OBS scene has no PipeWire source.", ) parser.add_argument( "--obs-reselect", action="store_true", help="Force a fresh OBS PipeWire source before reading the scene. This cannot choose a Wayland window automatically.", ) parser.add_argument( "--obs-reselect-timeout", type=float, default=OBS_RESELECT_TIMEOUT, help="Seconds to wait for a non-blank OBS frame after forcing source reselection.", ) parser.add_argument( "--obs-config", default=OBS_WEBSOCKET_CONFIG, help="Path to OBS WebSocket config. Used for password lookup and enabling the server.", ) parser.add_argument( "--portal-window", action="store_true", help="Capture a user-selected Wayland window through xdg-desktop-portal/PipeWire.", ) parser.add_argument( "--portal-restore-token", default=DEFAULT_PORTAL_RESTORE_TOKEN, help="Path used to store the portal restore token for later runs.", ) parser.add_argument( "--portal-reselect", action="store_true", help="Ignore the saved portal restore token and show the portal picker again.", ) parser.add_argument( "--portal-timeout", type=float, default=60.0, help="Seconds to wait for portal selection and the first PipeWire frame.", ) parser.add_argument("--cropped", action="store_true", help="Treat input image(s) as already cropped to the queue number.") parser.add_argument("--capture", action="store_true", help="Capture the current monitor with Spectacle.") parser.add_argument("--screen", help="KWin output name to capture explicitly, for example DP-3.") parser.add_argument( "--capture-mode", choices=("current", "fullscreen"), default="current", help="Spectacle capture mode. Use current for one monitor, fullscreen for the whole desktop.", ) parser.add_argument("--crop", type=parse_crop, help="Absolute crop as x,y,width,height.") parser.add_argument( "--reference-crop", type=parse_crop, default=DEFAULT_REFERENCE_CROP, help="Reference crop as x,y,width,height, scaled to the input image size when --crop is omitted.", ) parser.add_argument( "--reference-size", type=parse_size, default=DEFAULT_REFERENCE_SIZE, help="Reference screen size for --reference-crop scaling.", ) parser.add_argument( "--scale-mode", choices=("width", "independent"), default="width", help="How to scale --reference-crop. Width mode preserves a 16:9 UI layout and is the default.", ) parser.add_argument("--font", default=DEFAULT_FONT, help="Font used for synthetic digit templates.") parser.add_argument("--pointsize", type=int, default=DEFAULT_POINTSIZE, help="Point size used for templates.") parser.add_argument("--debug", action="store_true", help="Print component and match details.") parser.add_argument("--show-crop", action="store_true", help="Print the resolved crop used for each full-size image.") parser.add_argument("--save-input", help="Save the live captured/KWin/OBS input image for debugging.") parser.add_argument( "--expect-filenames", action="store_true", help="In dataset mode, compare detected values to each image filename stem.", ) args = parser.parse_args() if args.screen and not args.capture: parser.error("--screen can only be used with --capture") if args.kwin_window_id and args.kwin_window: parser.error("use only one of --kwin-window or --kwin-window-id") if args.obs_password and not args.obs_scene: parser.error("--obs-password can only be used with --obs-scene") if args.portal_reselect and not args.portal_window: parser.error("--portal-reselect can only be used with --portal-window") if args.portal_timeout <= 0: parser.error("--portal-timeout must be positive") kwin_window_selector = args.kwin_window_id or args.kwin_window if sum( bool(value) for value in ( args.image, args.dataset, args.capture, args.list_screens, args.list_kwin_windows, args.obs_scene, kwin_window_selector, args.portal_window, ) ) != 1: parser.error( "choose exactly one of --image, --dataset, --capture, --obs-scene, --portal-window, " "--kwin-window/--kwin-window-id, --list-screens, or --list-kwin-windows" ) if args.list_screens: print_screens() return 0 if args.list_kwin_windows: print_kwin_windows(kwin_windows()) return 0 if args.dataset: extensions = {".png", ".jpg", ".jpeg", ".webp", ".bmp"} paths = [ os.path.join(args.dataset, name) for name in sorted(os.listdir(args.dataset)) if os.path.splitext(name)[1].lower() in extensions ] if not paths: print("no dataset images found") return 2 status = 0 passed = 0 failed = 0 for path in paths: crop = resolve_crop(path, args.crop, args.reference_crop, args.reference_size, args.scale_mode, args.cropped) if args.show_crop and crop: print(f"{path}\tcrop={crop[0]},{crop[1]},{crop[2]},{crop[3]}") number, details = read_queue_number(path, crop, args.font, args.pointsize, args.cropped) if not number: status = 2 failed += 1 suffix = "" if args.expect_filenames: expected = os.path.splitext(os.path.basename(path))[0] suffix = f"\texpected={expected}\tFAIL" print(f"{path}\tno digits found{suffix}") continue suffix = "" if args.expect_filenames: expected = os.path.splitext(os.path.basename(path))[0] if number == expected: passed += 1 suffix = f"\texpected={expected}\tPASS" else: failed += 1 status = 1 suffix = f"\texpected={expected}\tFAIL" print(f"{path}\t{number}{suffix}") if args.debug: for digit, bbox, area, hamming, iou in details: print(f" digit={digit} bbox={bbox} area={area} hamming={hamming:.3f} iou={iou:.3f}") if args.expect_filenames: print(f"summary: {passed} passed, {failed} failed") return status temp_path = None image_path = args.image kwin_window = None portal_info = None if args.capture or args.obs_scene or kwin_window_selector or args.portal_window: fd, temp_path = tempfile.mkstemp(prefix="reforger-queue-", suffix=".png") os.close(fd) image_path = temp_path if args.capture: capture_screenshot(image_path, args.capture_mode, args.screen) if kwin_window_selector: kwin_window = capture_kwin_window(image_path, kwin_window_selector) if args.portal_window: portal_info = capture_portal_window( image_path, args.portal_restore_token, args.portal_reselect, args.portal_timeout, ) if args.obs_scene: config, effective_port = prepare_obs(args.obs_host, args.obs_port, args.obs_scene, args.obs_config) password = args.obs_password if password is None and config.get("auth_required", True): password = config.get("server_password", "") save_obs_scene_screenshot_with_setup( image_path, args.obs_scene, args.obs_source, args.obs_host, effective_port, password, force_reselect=args.obs_reselect, wait_for_nonblank=args.obs_reselect, timeout=args.obs_reselect_timeout, ) if image_is_blank(image_path): sys.exit( "OBS scene screenshot is blank. On this OBS/Wayland setup, PipeWire sources " "only expose RestoreToken/ShowCursor settings over WebSocket, so the CLI cannot " "select the Arma Reforger process automatically. Use a persistent monitor source " "or reselect the Arma source in OBS." ) if args.save_input and image_path and (args.capture or args.obs_scene or kwin_window_selector or args.portal_window): run_bytes(["magick", image_path, "+repage", args.save_input]) try: crop = resolve_crop(image_path, args.crop, args.reference_crop, args.reference_size, args.scale_mode, args.cropped) if args.show_crop and crop: print(f"crop={crop[0]},{crop[1]},{crop[2]},{crop[3]}") number, details = read_queue_number(image_path, crop, args.font, args.pointsize, args.cropped) finally: if temp_path: try: os.unlink(temp_path) except OSError: pass if not number: print("no digits found") return 2 print(number) if args.debug: if kwin_window: print(f"kwin-window {format_window_candidate(kwin_window)}") if portal_info: print(f"portal-window node_id={portal_info.get('node_id', '')} streams={len(portal_info.get('streams', []))}") if portal_info.get("restore_token") and args.portal_restore_token: print(f"portal-restore-token {args.portal_restore_token}") for digit, bbox, area, hamming, iou in details: print(f"digit={digit} bbox={bbox} area={area} hamming={hamming:.3f} iou={iou:.3f}") return 0 if __name__ == "__main__": raise SystemExit(main())