502 lines
19 KiB
Python
502 lines
19 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
run_ui_test.py <platform> <shots_dir> <baselines_dir> [--update]
|
||
|
||
Drives Golf Score through a full interaction sequence on the Pebble emulator
|
||
via the QEMU HMP monitor (sendkey), captures a screenshot at each step, and
|
||
verifies APP_LOG output for expected state changes.
|
||
|
||
Log capture restarts fresh for each step so it does not hold a persistent
|
||
connection that would block pebble screenshot after ~60 s.
|
||
|
||
Known emulator limitation
|
||
─────────────────────────
|
||
QEMU's `sendkey key hold_ms` always delivers a momentary press regardless of
|
||
hold_ms — the Pebble click system's 700 ms long-press threshold is never
|
||
reached. Steps that depend on long-press are executed (firing the short-press
|
||
handler instead) and annotated "EMULATOR: long press → short press" in output.
|
||
Log verification is skipped for those steps; screenshots are still captured to
|
||
confirm the display does not break.
|
||
|
||
Key mapping (confirmed via probe)
|
||
──────────────────────────────────
|
||
UP → sendkey up
|
||
DOWN → sendkey down
|
||
SELECT → sendkey right
|
||
BACK → sendkey left (left on main window exits the app — expected)
|
||
|
||
Exit codes: 0 all pass 1 test failures 2 setup error
|
||
"""
|
||
|
||
import json, os, re, shutil, socket as _socket, struct
|
||
import subprocess, sys, time, zlib
|
||
|
||
EMULATOR_STATE = '/tmp/pb-emulator.json'
|
||
|
||
# ── Timing ────────────────────────────────────────────────────────────────────
|
||
SHORT_MS = 100 # ms normal key hold
|
||
LONG_MS = 850 # ms sent, but emulator treats as SHORT_MS (see note above)
|
||
SETTLE_S = 0.25 # s pause after short press
|
||
LONG_S = 1.1 # s pause after "long" press
|
||
DRAW_S = 0.45 # s extra render time before screenshotting
|
||
LOG_CAP_S = 2.5 # s how long to capture logs per step
|
||
|
||
# ── Pixel comparison ──────────────────────────────────────────────────────────
|
||
CHANNEL_TOL = 10
|
||
PIXEL_THRESH = 0.01
|
||
|
||
|
||
# ── QEMU HMP monitor ──────────────────────────────────────────────────────────
|
||
|
||
class Monitor:
|
||
def __init__(self, port):
|
||
self.s = _socket.socket()
|
||
self.s.settimeout(5)
|
||
self.s.connect(('localhost', port))
|
||
self._drain()
|
||
|
||
def _drain(self):
|
||
buf = b''
|
||
self.s.settimeout(0.3)
|
||
try:
|
||
while True:
|
||
buf += self.s.recv(4096)
|
||
except OSError:
|
||
pass
|
||
self.s.settimeout(5)
|
||
|
||
def _send(self, cmd):
|
||
self.s.sendall((cmd + '\n').encode())
|
||
time.sleep(0.15)
|
||
self._drain()
|
||
|
||
def press(self, key, long=False):
|
||
hold = LONG_MS if long else SHORT_MS
|
||
self._send(f'sendkey {key} {hold}')
|
||
time.sleep(LONG_S if long else SETTLE_S)
|
||
|
||
def close(self):
|
||
try:
|
||
self.s.close()
|
||
except OSError:
|
||
pass
|
||
|
||
|
||
# ── Per-step log capture ──────────────────────────────────────────────────────
|
||
# pebble logs is started BEFORE the action and killed BEFORE the screenshot.
|
||
# This avoids the persistent-connection interference that causes pebble
|
||
# screenshot to fail after ~16 calls when a long-lived pebble-logs process
|
||
# holds the session.
|
||
|
||
def capture_logs(platform, action_fn, pattern, cap_s=LOG_CAP_S):
|
||
"""
|
||
Start `pebble logs`, run action_fn, wait cap_s, kill, return match.
|
||
Returns (found: bool|None, info: str)
|
||
None → pattern was None (step skipped log check)
|
||
True → pattern matched
|
||
False → pattern not matched; info contains recent lines
|
||
"""
|
||
if pattern is None:
|
||
if action_fn:
|
||
action_fn()
|
||
return None, "skipped"
|
||
|
||
proc = subprocess.Popen(
|
||
['pebble', 'logs', '--emulator', platform],
|
||
stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
|
||
text=True, bufsize=1,
|
||
)
|
||
time.sleep(0.4) # let pebble logs connect
|
||
|
||
if action_fn:
|
||
action_fn()
|
||
|
||
time.sleep(cap_s) # let log messages arrive
|
||
|
||
proc.terminate()
|
||
try:
|
||
out, _ = proc.communicate(timeout=1.5)
|
||
except subprocess.TimeoutExpired:
|
||
proc.kill()
|
||
out, _ = proc.communicate()
|
||
|
||
lines = out.splitlines()
|
||
for line in lines:
|
||
if re.search(pattern, line):
|
||
return True, line.strip()
|
||
recent = [l.strip() for l in lines[-4:] if l.strip()]
|
||
return False, f"pattern={pattern!r} recent={recent}"
|
||
|
||
|
||
# ── Screenshot ────────────────────────────────────────────────────────────────
|
||
|
||
def capture(platform, path, retries=3):
|
||
"""Take screenshot; retry on failure (transient connection blip)."""
|
||
time.sleep(DRAW_S)
|
||
for attempt in range(retries):
|
||
r = subprocess.run(
|
||
['pebble', 'screenshot', '--emulator', platform, '--no-open', path],
|
||
capture_output=True,
|
||
)
|
||
if r.returncode == 0 and os.path.exists(path):
|
||
return True
|
||
if attempt < retries - 1:
|
||
time.sleep(1.5)
|
||
return False
|
||
|
||
|
||
# ── PNG pixel comparison ──────────────────────────────────────────────────────
|
||
|
||
def _load_png(path):
|
||
with open(path, 'rb') as f:
|
||
raw = f.read()
|
||
assert raw[:8] == b'\x89PNG\r\n\x1a\n'
|
||
pos, w, h, idats = 8, None, None, []
|
||
while pos < len(raw):
|
||
n = struct.unpack_from('>I', raw, pos)[0]
|
||
tag = raw[pos+4:pos+8]
|
||
body = raw[pos+8:pos+8+n]
|
||
pos += 12 + n
|
||
if tag == b'IHDR': w, h = struct.unpack('>II', body[:8])
|
||
elif tag == b'IDAT': idats.append(body)
|
||
elif tag == b'IEND': break
|
||
data = zlib.decompress(b''.join(idats))
|
||
stride = w * 3
|
||
px, prev, idx = [], bytes(stride), 0
|
||
for _ in range(h):
|
||
f = data[idx]; idx += 1
|
||
row = bytearray(data[idx:idx+stride]); idx += stride
|
||
if f == 1:
|
||
for i in range(3, stride): row[i] = (row[i] + row[i-3]) & 0xFF
|
||
elif f == 2:
|
||
for i in range(stride): row[i] = (row[i] + prev[i]) & 0xFF
|
||
elif f == 3:
|
||
for i in range(stride):
|
||
a = row[i-3] if i >= 3 else 0
|
||
row[i] = (row[i] + (a + prev[i]) // 2) & 0xFF
|
||
elif f == 4:
|
||
for i in range(stride):
|
||
a = row[i-3] if i >= 3 else 0; b = prev[i]
|
||
c = prev[i-3] if i >= 3 else 0; p = a + b - c
|
||
pa, pb, pc = abs(p-a), abs(p-b), abs(p-c)
|
||
pr = a if pa<=pb and pa<=pc else (b if pb<=pc else c)
|
||
row[i] = (row[i] + pr) & 0xFF
|
||
prev = bytes(row)
|
||
px.append([tuple(row[i:i+3]) for i in range(0, stride, 3)])
|
||
return w, h, px
|
||
|
||
def compare_images(a, b):
|
||
try:
|
||
aw, ah, apx = _load_png(a)
|
||
bw, bh, bpx = _load_png(b)
|
||
except Exception as e:
|
||
return False, f"load error: {e}"
|
||
if aw != bw or ah != bh:
|
||
return False, f"size {aw}×{ah} vs {bw}×{bh}"
|
||
diffs = sum(
|
||
1 for y in range(ah) for x in range(aw)
|
||
if any(abs(apx[y][x][c] - bpx[y][x][c]) > CHANNEL_TOL for c in range(3))
|
||
)
|
||
frac = diffs / (aw * ah)
|
||
return frac <= PIXEL_THRESH, f"{diffs}/{aw*ah} px differ ({frac:.2%})"
|
||
|
||
|
||
# ── Test step actions ─────────────────────────────────────────────────────────
|
||
# All action functions receive `mon` via closure from main().
|
||
# They are defined as lambdas/functions that call mon.press().
|
||
# Long presses use mon.press(key, long=True) — note emulator limitation above.
|
||
|
||
def make_steps(mon):
|
||
"""Return the full step list, closing over `mon`."""
|
||
|
||
# Short-hand helpers
|
||
def up(): mon.press('up')
|
||
def down(): mon.press('down')
|
||
def sel(): mon.press('right') # SELECT = right arrow
|
||
def back(): mon.press('left') # BACK = left arrow
|
||
def long_up(): mon.press('up', long=True)
|
||
def long_down(): mon.press('down', long=True)
|
||
def long_sel(): mon.press('right', long=True)
|
||
|
||
def reset():
|
||
long_sel() # open settings
|
||
down(); down() # → Reset Round
|
||
sel() # trigger reset
|
||
time.sleep(0.4)
|
||
|
||
def score_hole2_advance():
|
||
for _ in range(5): up()
|
||
long_up(); long_up() # putts (fires short in emulator)
|
||
sel() # advance to hole 3
|
||
|
||
def score_hole3():
|
||
up(); up(); up()
|
||
|
||
def open_scorecard():
|
||
sel() # item 0 = View Scorecard
|
||
|
||
def open_hole_picker():
|
||
down(); sel() # item 0→1 (Jump to Hole), SELECT
|
||
|
||
def jump_hole1():
|
||
sel() # select hole 1
|
||
time.sleep(0.35) # wait for return-to-main timer
|
||
|
||
def open_controls():
|
||
down(); down(); down() # item 0→3 (Controls)
|
||
sel()
|
||
|
||
def reset_from_settings():
|
||
up() # Controls→Reset Round
|
||
sel()
|
||
time.sleep(0.35)
|
||
|
||
# ── Step list ──────────────────────────────────────────────────────────────
|
||
# (step_id, description, action_fn | None, log_pattern | None)
|
||
#
|
||
# log_pattern = None for steps that either have no state change to verify
|
||
# or where the emulator limitation prevents the expected handler from firing.
|
||
EMULATOR_NOTE = " ⚠ EMULATOR: long press fires as short press (sendkey limitation)"
|
||
|
||
return [
|
||
# ── Clean start ───────────────────────────────────────────────────────
|
||
("00_reset",
|
||
"Reset round via Settings → HOLE 1 0 str 0 ptt",
|
||
reset,
|
||
r"ACT:RESET"),
|
||
|
||
("01_initial",
|
||
"Initial state: HOLE 1 STROKES 0 PUTTS 0",
|
||
None, None),
|
||
|
||
# ── Stroke counter ────────────────────────────────────────────────────
|
||
("02_up_1",
|
||
"UP → STROKES 1",
|
||
up,
|
||
r"ACT:UP hole=1 str=1 ptt=0"),
|
||
|
||
("03_up_3",
|
||
"UP × 2 → STROKES 3",
|
||
lambda: [up(), up()],
|
||
r"ACT:UP hole=1 str=3 ptt=0"),
|
||
|
||
# ── Putt counter (long press — emulator fires short press instead) ────
|
||
("04_putt_attempt",
|
||
f"Hold UP (emulator → short UP) STROKES becomes 4",
|
||
long_up,
|
||
None), # skip: emulator fires ACT:UP not ACT:LONG_UP
|
||
|
||
("05_putt_attempt2",
|
||
f"Hold UP × 2 (emulator → short UP × 2) STROKES becomes 6",
|
||
lambda: [long_up(), long_up()],
|
||
None),
|
||
|
||
# ── DOWN correction ───────────────────────────────────────────────────
|
||
("06_down_corrects",
|
||
"DOWN × 4 → STROKES 2 (correcting emulator over-count)",
|
||
lambda: [down(), down(), down(), down()],
|
||
r"ACT:DOWN hole=1 str=2 ptt=0"),
|
||
|
||
# ── Long DOWN (putt decrement — same limitation) ──────────────────────
|
||
("07_long_down_attempt",
|
||
"Hold DOWN (emulator → short DOWN) STROKES becomes 1",
|
||
long_down,
|
||
None),
|
||
|
||
# ── Correct back to known state ───────────────────────────────────────
|
||
("08_restore_state",
|
||
"UP → STROKES 2 (restore to known state for next steps)",
|
||
up,
|
||
r"ACT:UP hole=1 str=2 ptt=0"),
|
||
|
||
# ── Hole advance ──────────────────────────────────────────────────────
|
||
("09_hole_2",
|
||
"SELECT → HOLE 2 (hole 1 saved: 2 str)",
|
||
sel,
|
||
r"ACT:SELECT hole=2"),
|
||
|
||
# ── Multi-hole data for scorecard ─────────────────────────────────────
|
||
("10_hole2_scored",
|
||
"Score hole 2: UP × 5, long UP × 2 (→ 7 str), SELECT → HOLE 3",
|
||
score_hole2_advance,
|
||
r"ACT:SELECT hole=3"),
|
||
|
||
("11_hole3_scored",
|
||
"Score hole 3: UP × 3",
|
||
score_hole3,
|
||
r"ACT:UP hole=3 str=3 ptt=0"),
|
||
|
||
# ── Settings menu ─────────────────────────────────────────────────────
|
||
("12_settings",
|
||
"Hold SELECT → Settings menu",
|
||
long_sel,
|
||
r"ACT:SETTINGS"),
|
||
|
||
# ── Scorecard ─────────────────────────────────────────────────────────
|
||
("13_scorecard",
|
||
"SELECT → View Scorecard",
|
||
open_scorecard,
|
||
r"ACT:SCORECARD"),
|
||
|
||
("14_scorecard_scrolled",
|
||
"DOWN × 3 → scroll scorecard",
|
||
lambda: [down(), down(), down()],
|
||
None),
|
||
|
||
("15_back_to_settings",
|
||
"BACK → Settings menu",
|
||
back,
|
||
None),
|
||
|
||
# ── Hole picker ───────────────────────────────────────────────────────
|
||
("16_hole_picker",
|
||
"DOWN + SELECT → Jump to Hole picker (HOLE 3 pre-selected)",
|
||
open_hole_picker,
|
||
None),
|
||
|
||
("17_picker_up2",
|
||
"UP × 2 → HOLE 1 highlighted",
|
||
lambda: [up(), up()],
|
||
None),
|
||
|
||
("18_jumped_hole1",
|
||
"SELECT → jump to HOLE 1 (2 str)",
|
||
jump_hole1,
|
||
r"ACT:JUMP hole=1"),
|
||
|
||
# ── Controls / help ───────────────────────────────────────────────────
|
||
("19_settings_fresh",
|
||
"Hold SELECT → Settings (fresh open from main)",
|
||
long_sel,
|
||
r"ACT:SETTINGS"),
|
||
|
||
("20_controls",
|
||
"DOWN × 3 + SELECT → Controls cheatsheet",
|
||
open_controls,
|
||
r"ACT:CONTROLS"),
|
||
|
||
("21_controls_scrolled",
|
||
"DOWN × 2 → scroll controls",
|
||
lambda: [down(), down()],
|
||
None),
|
||
|
||
("22_back_to_settings",
|
||
"BACK → Settings (Controls highlighted)",
|
||
back,
|
||
None),
|
||
|
||
# ── Reset round ───────────────────────────────────────────────────────
|
||
("23_after_reset",
|
||
"UP + SELECT → Reset Round → HOLE 1 0 str",
|
||
reset_from_settings,
|
||
r"ACT:RESET"),
|
||
|
||
# ── Floor checks ──────────────────────────────────────────────────────
|
||
("24_stroke_floor",
|
||
"DOWN at 0 → STROKES stays 0",
|
||
down,
|
||
r"ACT:DOWN hole=1 str=0 ptt=0"),
|
||
|
||
("25_long_down_floor",
|
||
"Hold DOWN at 0 (emulator → short DOWN) STROKES stays 0",
|
||
long_down,
|
||
None),
|
||
]
|
||
|
||
|
||
# ── Runner ────────────────────────────────────────────────────────────────────
|
||
|
||
def main():
|
||
if len(sys.argv) < 4:
|
||
print(f"Usage: {sys.argv[0]} <platform> <shots_dir> <baselines_dir> [--update]")
|
||
return 2
|
||
|
||
platform = sys.argv[1]
|
||
shots_dir = sys.argv[2]
|
||
baselines_dir = sys.argv[3]
|
||
update_mode = '--update' in sys.argv
|
||
|
||
# Locate monitor port
|
||
try:
|
||
state = json.load(open(EMULATOR_STATE))
|
||
pdata = state.get(platform)
|
||
if not pdata:
|
||
print(f"ERROR: '{platform}' not in {EMULATOR_STATE}", file=sys.stderr)
|
||
return 2
|
||
sdk_ver = next(iter(pdata))
|
||
mon_port = pdata[sdk_ver]['qemu']['monitor']
|
||
except Exception as e:
|
||
print(f"ERROR: {e}", file=sys.stderr)
|
||
return 2
|
||
|
||
print(f" Monitor port {mon_port} ({platform})")
|
||
|
||
try:
|
||
mon = Monitor(mon_port)
|
||
except Exception as e:
|
||
print(f" ERROR: monitor: {e}", file=sys.stderr)
|
||
return 2
|
||
|
||
os.makedirs(shots_dir, exist_ok=True)
|
||
os.makedirs(baselines_dir, exist_ok=True)
|
||
|
||
steps = make_steps(mon)
|
||
failures = []
|
||
|
||
for step_id, desc, action, log_pattern in steps:
|
||
shot = os.path.join(shots_dir, f"{platform}_{step_id}.png")
|
||
baseline = os.path.join(baselines_dir, f"{platform}_{step_id}.png")
|
||
|
||
# Capture logs and run action together; logs killed before screenshot
|
||
log_ok, log_info = capture_logs(platform, action, log_pattern)
|
||
|
||
# Screenshot (taken AFTER log process is dead — no connection conflict)
|
||
if not capture(platform, shot):
|
||
print(f" ✗ [{step_id}] screenshot failed")
|
||
failures.append(step_id)
|
||
continue
|
||
|
||
# Baseline update or comparison
|
||
if update_mode:
|
||
shutil.copy2(shot, baseline)
|
||
img_result = "baseline saved"
|
||
elif os.path.exists(baseline):
|
||
img_ok, img_result = compare_images(baseline, shot)
|
||
if not img_ok:
|
||
failures.append(step_id)
|
||
else:
|
||
img_ok, img_result = None, "no baseline"
|
||
|
||
# Determine display mark
|
||
img_failed = not update_mode and img_ok is False
|
||
log_failed = log_ok is False # None = skipped, not failure
|
||
step_failed = img_failed or log_failed
|
||
|
||
mark = "↑" if update_mode else ("✗" if step_failed else "✓")
|
||
print(f" {mark} [{step_id}] {desc}")
|
||
|
||
if log_ok is True:
|
||
print(f" log ✓ {log_info}")
|
||
elif log_ok is False:
|
||
print(f" log ✗ {log_info}")
|
||
failures.append(step_id)
|
||
# log_ok is None → silently skipped
|
||
|
||
if not update_mode and img_ok is not None:
|
||
img_sym = "img ✓" if img_ok else "img ✗"
|
||
print(f" {img_sym} {img_result}")
|
||
elif update_mode:
|
||
print(f" {img_result}")
|
||
|
||
mon.close()
|
||
|
||
failures = list(dict.fromkeys(failures)) # deduplicate
|
||
if failures:
|
||
print(f"\n {len(failures)} failure(s): {', '.join(failures)}")
|
||
return 1
|
||
return 0
|
||
|
||
|
||
if __name__ == '__main__':
|
||
sys.exit(main())
|