anti-prestige-tool/reforger_queue_read.py
2026-05-01 10:26:41 +01:00

528 lines
18 KiB
Python

#!/usr/bin/env python3
import argparse
import os
import re
import subprocess
import sys
import tempfile
DEFAULT_REFERENCE_SIZE = (2560, 1440)
DEFAULT_REFERENCE_CROP = (1050, 620, 100, 60)
DEFAULT_FONT = "Roboto-Condensed"
DEFAULT_POINTSIZE = 43
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
def run_bytes(args):
try:
return subprocess.run(args, check=True, stdout=subprocess.PIPE).stdout
except FileNotFoundError:
sys.exit(f"missing required command: {args[0]}")
except subprocess.CalledProcessError as exc:
sys.exit(f"command failed ({exc.returncode}): {' '.join(args)}")
def ppm_from_magick(args):
data = run_bytes(["magick", *args, "-alpha", "off", "-depth", "8", "ppm:-"])
idx = 0
def next_token():
nonlocal idx
while idx < len(data) and data[idx] in b" \t\r\n":
idx += 1
if idx < len(data) and data[idx] == ord("#"):
while idx < len(data) and data[idx] not in b"\r\n":
idx += 1
return next_token()
start = idx
while idx < len(data) and data[idx] not in b" \t\r\n":
idx += 1
return data[start:idx]
magic = next_token()
if magic != b"P6":
sys.exit("ImageMagick did not return binary PPM data")
width = int(next_token())
height = int(next_token())
max_value = int(next_token())
if max_value != 255:
sys.exit(f"unsupported PPM max value: {max_value}")
while idx < len(data) and data[idx] in b" \t\r\n":
idx += 1
return width, height, data[idx:]
def image_size(image_path):
output = run_bytes(["magick", "identify", "-format", "%w %h", image_path]).decode("ascii")
width, height = output.split()
return int(width), int(height)
def orange_digit_mask(width, height, raw):
mask = [[False] * width for _ in range(height)]
for y in range(height):
row_offset = y * width * 3
for x in range(width):
offset = row_offset + x * 3
r, g, b = raw[offset], raw[offset + 1], raw[offset + 2]
mask[y][x] = r >= 115 and 55 <= g <= 205 and b <= 100 and r - g >= 20
return mask
def white_mask(width, height, raw):
mask = [[False] * width for _ in range(height)]
for y in range(height):
row_offset = y * width * 3
for x in range(width):
offset = row_offset + x * 3
r, g, b = raw[offset], raw[offset + 1], raw[offset + 2]
mask[y][x] = r >= 120 and g >= 120 and b >= 120
return mask
def connected_components(mask):
height = len(mask)
width = len(mask[0]) if height else 0
seen = [[False] * width for _ in range(height)]
components = []
for y in range(height):
for x in range(width):
if seen[y][x] or not mask[y][x]:
continue
stack = [(x, y)]
seen[y][x] = True
points = set()
while stack:
cx, cy = stack.pop()
points.add((cx, cy))
for ny in range(cy - 1, cy + 2):
for nx in range(cx - 1, cx + 2):
if nx < 0 or ny < 0 or nx >= width or ny >= height:
continue
if seen[ny][nx] or not mask[ny][nx]:
continue
seen[ny][nx] = True
stack.append((nx, ny))
xs = [point[0] for point in points]
ys = [point[1] for point in points]
bbox = (min(xs), min(ys), max(xs), max(ys))
box_width = bbox[2] - bbox[0] + 1
box_height = bbox[3] - bbox[1] + 1
if len(points) >= 40 and box_width >= 5 and box_height >= 15:
components.append({"points": points, "bbox": bbox, "area": len(points)})
components.sort(key=lambda component: component["bbox"][0])
return components
def normalize(component, width=24, height=36):
x1, y1, x2, y2 = component["bbox"]
source_width = x2 - x1 + 1
source_height = y2 - y1 + 1
points = component["points"]
output = []
for y in range(height):
source_y = y1 + int((y + 0.5) * source_height / height)
row = []
for x in range(width):
source_x = x1 + int((x + 0.5) * source_width / width)
row.append((source_x, source_y) in points)
output.append(row)
return output
def template_distance(left, right):
total = len(left) * len(left[0])
differences = 0
intersection = 0
union = 0
for left_row, right_row in zip(left, right):
for left_value, right_value in zip(left_row, right_row):
differences += left_value != right_value
intersection += left_value and right_value
union += left_value or right_value
hamming = differences / total
iou = intersection / union if union else 0.0
return hamming, iou
def build_templates(font, pointsize):
width, height, raw = ppm_from_magick(
[
"-background",
"black",
"-fill",
"white",
"-font",
font,
"-pointsize",
str(pointsize),
"label:0123456789",
]
)
components = connected_components(white_mask(width, height, raw))
if len(components) < 10:
sys.exit(f"template rendering produced only {len(components)} digit components")
return {str(index): normalize(component) for index, component in enumerate(components[:10])}
def classify(component, templates):
sample = normalize(component)
ranked = []
for digit, template in templates.items():
hamming, iou = template_distance(sample, template)
ranked.append((hamming, -iou, digit, iou))
ranked.sort()
hamming, negative_iou, digit, iou = ranked[0]
return digit, hamming, iou
def helper_path():
return os.path.join(SCRIPT_DIR, "kwin_capture_screen")
def capture_named_screen(output, screen):
helper = helper_path()
if not os.path.exists(helper):
sys.exit(f"missing {helper}; run `make` in {SCRIPT_DIR}")
subprocess.run(
[helper, screen, output],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def parse_geometry(geometry):
match = re.match(r"(-?\d+),(-?\d+),(\d+)x(\d+)", geometry)
if not match:
raise ValueError(f"could not parse screen geometry: {geometry}")
return tuple(int(group) for group in match.groups())
def screen_geometry(screen_name):
screens = kwin_screens()
for screen in screens:
if screen.get("name") == screen_name:
if not screen.get("enabled"):
sys.exit(f"screen {screen_name} is disabled")
geometry = screen.get("geometry")
if not geometry:
sys.exit(f"screen {screen_name} has no geometry")
return parse_geometry(geometry)
available = ", ".join(screen.get("name", "unknown") for screen in screens)
sys.exit(f"unknown screen {screen_name}; available screens: {available}")
def virtual_desktop_origin():
enabled_geometries = [
parse_geometry(screen["geometry"])
for screen in kwin_screens()
if screen.get("enabled") and screen.get("geometry")
]
if not enabled_geometries:
return 0, 0
return min(geometry[0] for geometry in enabled_geometries), min(geometry[1] for geometry in enabled_geometries)
def capture_screen_via_fullscreen_crop(output, screen):
x, y, width, height = screen_geometry(screen)
origin_x, origin_y = virtual_desktop_origin()
crop_x = x - origin_x
crop_y = y - origin_y
fd, full_path = tempfile.mkstemp(prefix="reforger-fullscreen-", suffix=".png")
os.close(fd)
try:
capture_spectacle(full_path, "fullscreen")
run_bytes(["magick", full_path, "-crop", f"{width}x{height}+{crop_x}+{crop_y}", "+repage", output])
finally:
try:
os.unlink(full_path)
except OSError:
pass
def capture_spectacle(output, mode):
if mode == "fullscreen":
capture_arg = "--fullscreen"
elif mode == "current":
capture_arg = "--current"
else:
raise ValueError(f"unsupported capture mode: {mode}")
subprocess.run(
["spectacle", "--background", capture_arg, "--nonotify", "--output", output],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def capture_screenshot(output, mode, screen=None):
if screen:
try:
capture_named_screen(output, screen)
except subprocess.CalledProcessError:
capture_screen_via_fullscreen_crop(output, screen)
return
capture_spectacle(output, mode)
def kwin_screens():
output = run_bytes(["qdbus6", "org.kde.KWin", "/KWin", "supportInformation"]).decode("utf-8", "replace")
screens = []
current = None
for line in output.splitlines():
if re.match(r"^Screen \d+:", line):
if current:
screens.append(current)
current = {}
continue
if current is None:
continue
stripped = line.strip()
if stripped.startswith("Name:"):
current["name"] = stripped.split(":", 1)[1].strip()
elif stripped.startswith("Enabled:"):
current["enabled"] = stripped.split(":", 1)[1].strip() == "1"
elif stripped.startswith("Geometry:"):
current["geometry"] = stripped.split(":", 1)[1].strip()
if current:
screens.append(current)
return screens
def print_screens():
for screen in kwin_screens():
enabled = "enabled" if screen.get("enabled") else "disabled"
geometry = screen.get("geometry", "no geometry")
print(f"{screen.get('name', 'unknown')}\t{enabled}\t{geometry}")
def load_queue_image(image_path, crop, already_cropped=False):
if already_cropped:
return ppm_from_magick([image_path, "+repage"])
x, y, width, height = crop
return ppm_from_magick([image_path, "+repage", "-crop", f"{width}x{height}+{x}+{y}", "+repage"])
def read_queue_number(image_path, crop, font, pointsize, already_cropped=False):
width, height, raw = load_queue_image(image_path, crop, already_cropped)
components = connected_components(orange_digit_mask(width, height, raw))
templates = build_templates(font, pointsize)
digits = []
details = []
for component in components:
digit, hamming, iou = classify(component, templates)
digits.append(digit)
details.append((digit, component["bbox"], component["area"], hamming, iou))
return "".join(digits), details
def parse_crop(value):
parts = value.replace(",", " ").split()
if len(parts) != 4:
raise argparse.ArgumentTypeError("crop must be four numbers: x,y,width,height")
try:
return tuple(int(part) for part in parts)
except ValueError as exc:
raise argparse.ArgumentTypeError("crop values must be integers") from exc
def parse_size(value):
parts = value.lower().replace(",", "x").split("x")
if len(parts) != 2:
raise argparse.ArgumentTypeError("size must be WIDTHxHEIGHT")
try:
return int(parts[0]), int(parts[1])
except ValueError as exc:
raise argparse.ArgumentTypeError("size values must be integers") from exc
def scale_crop(reference_crop, reference_size, target_size, scale_mode):
reference_width, reference_height = reference_size
target_width, target_height = target_size
x, y, width, height = reference_crop
if scale_mode == "width":
x_scale = target_width / reference_width
y_scale = x_scale
elif scale_mode == "independent":
x_scale = target_width / reference_width
y_scale = target_height / reference_height
else:
raise ValueError(f"unsupported scale mode: {scale_mode}")
return (
round(x * x_scale),
round(y * y_scale),
max(1, round(width * x_scale)),
max(1, round(height * y_scale)),
)
def resolve_crop(image_path, explicit_crop, reference_crop, reference_size, scale_mode, already_cropped):
if already_cropped:
return None
if explicit_crop:
return explicit_crop
return scale_crop(reference_crop, reference_size, image_size(image_path), scale_mode)
def main():
parser = argparse.ArgumentParser(description="Read Arma Reforger queue number from a fixed screen crop.")
parser.add_argument("--image", help="Read from an existing image.")
parser.add_argument("--dataset", help="Read every image in a dataset directory.")
parser.add_argument("--list-screens", action="store_true", help="List KWin output names for use with --screen.")
parser.add_argument("--cropped", action="store_true", help="Treat input image(s) as already cropped to the queue number.")
parser.add_argument("--capture", action="store_true", help="Capture the current monitor with Spectacle.")
parser.add_argument("--screen", help="KWin output name to capture explicitly, for example DP-3.")
parser.add_argument(
"--capture-mode",
choices=("current", "fullscreen"),
default="current",
help="Spectacle capture mode. Use current for one monitor, fullscreen for the whole desktop.",
)
parser.add_argument("--crop", type=parse_crop, help="Absolute crop as x,y,width,height.")
parser.add_argument(
"--reference-crop",
type=parse_crop,
default=DEFAULT_REFERENCE_CROP,
help="Reference crop as x,y,width,height, scaled to the input image size when --crop is omitted.",
)
parser.add_argument(
"--reference-size",
type=parse_size,
default=DEFAULT_REFERENCE_SIZE,
help="Reference screen size for --reference-crop scaling.",
)
parser.add_argument(
"--scale-mode",
choices=("width", "independent"),
default="width",
help="How to scale --reference-crop. Width mode preserves a 16:9 UI layout and is the default.",
)
parser.add_argument("--font", default=DEFAULT_FONT, help="Font used for synthetic digit templates.")
parser.add_argument("--pointsize", type=int, default=DEFAULT_POINTSIZE, help="Point size used for templates.")
parser.add_argument("--debug", action="store_true", help="Print component and match details.")
parser.add_argument("--show-crop", action="store_true", help="Print the resolved crop used for each full-size image.")
parser.add_argument(
"--expect-filenames",
action="store_true",
help="In dataset mode, compare detected values to each image filename stem.",
)
args = parser.parse_args()
if args.screen and not args.capture:
parser.error("--screen can only be used with --capture")
if sum(bool(value) for value in (args.image, args.dataset, args.capture, args.list_screens)) != 1:
parser.error("choose exactly one of --image, --dataset, --capture, or --list-screens")
if args.list_screens:
print_screens()
return 0
if args.dataset:
extensions = {".png", ".jpg", ".jpeg", ".webp", ".bmp"}
paths = [
os.path.join(args.dataset, name)
for name in sorted(os.listdir(args.dataset))
if os.path.splitext(name)[1].lower() in extensions
]
if not paths:
print("no dataset images found")
return 2
status = 0
passed = 0
failed = 0
for path in paths:
crop = resolve_crop(path, args.crop, args.reference_crop, args.reference_size, args.scale_mode, args.cropped)
if args.show_crop and crop:
print(f"{path}\tcrop={crop[0]},{crop[1]},{crop[2]},{crop[3]}")
number, details = read_queue_number(path, crop, args.font, args.pointsize, args.cropped)
if not number:
status = 2
failed += 1
suffix = ""
if args.expect_filenames:
expected = os.path.splitext(os.path.basename(path))[0]
suffix = f"\texpected={expected}\tFAIL"
print(f"{path}\tno digits found{suffix}")
continue
suffix = ""
if args.expect_filenames:
expected = os.path.splitext(os.path.basename(path))[0]
if number == expected:
passed += 1
suffix = f"\texpected={expected}\tPASS"
else:
failed += 1
status = 1
suffix = f"\texpected={expected}\tFAIL"
print(f"{path}\t{number}{suffix}")
if args.debug:
for digit, bbox, area, hamming, iou in details:
print(f" digit={digit} bbox={bbox} area={area} hamming={hamming:.3f} iou={iou:.3f}")
if args.expect_filenames:
print(f"summary: {passed} passed, {failed} failed")
return status
temp_path = None
image_path = args.image
if args.capture:
fd, temp_path = tempfile.mkstemp(prefix="reforger-queue-", suffix=".png")
os.close(fd)
image_path = temp_path
capture_screenshot(image_path, args.capture_mode, args.screen)
try:
crop = resolve_crop(image_path, args.crop, args.reference_crop, args.reference_size, args.scale_mode, args.cropped)
if args.show_crop and crop:
print(f"crop={crop[0]},{crop[1]},{crop[2]},{crop[3]}")
number, details = read_queue_number(image_path, crop, args.font, args.pointsize, args.cropped)
finally:
if temp_path:
try:
os.unlink(temp_path)
except OSError:
pass
if not number:
print("no digits found")
return 2
print(number)
if args.debug:
for digit, bbox, area, hamming, iou in details:
print(f"digit={digit} bbox={bbox} area={area} hamming={hamming:.3f} iou={iou:.3f}")
return 0
if __name__ == "__main__":
raise SystemExit(main())