#!/usr/bin/env python3 """ run_ui_test.py [--update] Drives Golf Score through a full interaction sequence on the Pebble emulator via the QEMU HMP monitor (sendkey), captures a screenshot at each step, and verifies APP_LOG output for expected state changes. Log capture restarts fresh for each step so it does not hold a persistent connection that would block pebble screenshot after ~60 s. Known emulator limitation ───────────────────────── QEMU's `sendkey key hold_ms` always delivers a momentary press regardless of hold_ms — the Pebble click system's 700 ms long-press threshold is never reached. Steps that depend on long-press are executed (firing the short-press handler instead) and annotated "EMULATOR: long press → short press" in output. Log verification is skipped for those steps; screenshots are still captured to confirm the display does not break. Key mapping (confirmed via probe) ────────────────────────────────── UP → sendkey up DOWN → sendkey down SELECT → sendkey right BACK → sendkey left (left on main window exits the app — expected) Exit codes: 0 all pass 1 test failures 2 setup error """ import json, os, re, shutil, socket as _socket, struct import subprocess, sys, time, zlib EMULATOR_STATE = '/tmp/pb-emulator.json' # ── Timing ──────────────────────────────────────────────────────────────────── SHORT_MS = 100 # ms normal key hold LONG_MS = 850 # ms sent, but emulator treats as SHORT_MS (see note above) SETTLE_S = 0.25 # s pause after short press LONG_S = 1.1 # s pause after "long" press DRAW_S = 0.45 # s extra render time before screenshotting LOG_CAP_S = 2.5 # s how long to capture logs per step # ── Pixel comparison ────────────────────────────────────────────────────────── CHANNEL_TOL = 10 PIXEL_THRESH = 0.01 # ── QEMU HMP monitor ────────────────────────────────────────────────────────── class Monitor: def __init__(self, port): self.s = _socket.socket() self.s.settimeout(5) self.s.connect(('localhost', port)) self._drain() def _drain(self): buf = b'' self.s.settimeout(0.3) try: while True: buf += self.s.recv(4096) except OSError: pass self.s.settimeout(5) def _send(self, cmd): self.s.sendall((cmd + '\n').encode()) time.sleep(0.15) self._drain() def press(self, key, long=False): hold = LONG_MS if long else SHORT_MS self._send(f'sendkey {key} {hold}') time.sleep(LONG_S if long else SETTLE_S) def close(self): try: self.s.close() except OSError: pass # ── Per-step log capture ────────────────────────────────────────────────────── # pebble logs is started BEFORE the action and killed BEFORE the screenshot. # This avoids the persistent-connection interference that causes pebble # screenshot to fail after ~16 calls when a long-lived pebble-logs process # holds the session. def capture_logs(platform, action_fn, pattern, cap_s=LOG_CAP_S): """ Start `pebble logs`, run action_fn, wait cap_s, kill, return match. Returns (found: bool|None, info: str) None → pattern was None (step skipped log check) True → pattern matched False → pattern not matched; info contains recent lines """ if pattern is None: if action_fn: action_fn() return None, "skipped" proc = subprocess.Popen( ['pebble', 'logs', '--emulator', platform], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True, bufsize=1, ) time.sleep(0.4) # let pebble logs connect if action_fn: action_fn() time.sleep(cap_s) # let log messages arrive proc.terminate() try: out, _ = proc.communicate(timeout=1.5) except subprocess.TimeoutExpired: proc.kill() out, _ = proc.communicate() lines = out.splitlines() for line in lines: if re.search(pattern, line): return True, line.strip() recent = [l.strip() for l in lines[-4:] if l.strip()] return False, f"pattern={pattern!r} recent={recent}" # ── Screenshot ──────────────────────────────────────────────────────────────── def capture(platform, path, retries=3): """Take screenshot; retry on failure (transient connection blip).""" time.sleep(DRAW_S) for attempt in range(retries): r = subprocess.run( ['pebble', 'screenshot', '--emulator', platform, '--no-open', path], capture_output=True, ) if r.returncode == 0 and os.path.exists(path): return True if attempt < retries - 1: time.sleep(1.5) return False # ── PNG pixel comparison ────────────────────────────────────────────────────── def _load_png(path): with open(path, 'rb') as f: raw = f.read() assert raw[:8] == b'\x89PNG\r\n\x1a\n' pos, w, h, idats = 8, None, None, [] while pos < len(raw): n = struct.unpack_from('>I', raw, pos)[0] tag = raw[pos+4:pos+8] body = raw[pos+8:pos+8+n] pos += 12 + n if tag == b'IHDR': w, h = struct.unpack('>II', body[:8]) elif tag == b'IDAT': idats.append(body) elif tag == b'IEND': break data = zlib.decompress(b''.join(idats)) stride = w * 3 px, prev, idx = [], bytes(stride), 0 for _ in range(h): f = data[idx]; idx += 1 row = bytearray(data[idx:idx+stride]); idx += stride if f == 1: for i in range(3, stride): row[i] = (row[i] + row[i-3]) & 0xFF elif f == 2: for i in range(stride): row[i] = (row[i] + prev[i]) & 0xFF elif f == 3: for i in range(stride): a = row[i-3] if i >= 3 else 0 row[i] = (row[i] + (a + prev[i]) // 2) & 0xFF elif f == 4: for i in range(stride): a = row[i-3] if i >= 3 else 0; b = prev[i] c = prev[i-3] if i >= 3 else 0; p = a + b - c pa, pb, pc = abs(p-a), abs(p-b), abs(p-c) pr = a if pa<=pb and pa<=pc else (b if pb<=pc else c) row[i] = (row[i] + pr) & 0xFF prev = bytes(row) px.append([tuple(row[i:i+3]) for i in range(0, stride, 3)]) return w, h, px def compare_images(a, b): try: aw, ah, apx = _load_png(a) bw, bh, bpx = _load_png(b) except Exception as e: return False, f"load error: {e}" if aw != bw or ah != bh: return False, f"size {aw}×{ah} vs {bw}×{bh}" diffs = sum( 1 for y in range(ah) for x in range(aw) if any(abs(apx[y][x][c] - bpx[y][x][c]) > CHANNEL_TOL for c in range(3)) ) frac = diffs / (aw * ah) return frac <= PIXEL_THRESH, f"{diffs}/{aw*ah} px differ ({frac:.2%})" # ── Test step actions ───────────────────────────────────────────────────────── # All action functions receive `mon` via closure from main(). # They are defined as lambdas/functions that call mon.press(). # Long presses use mon.press(key, long=True) — note emulator limitation above. def make_steps(mon): """Return the full step list, closing over `mon`.""" # Short-hand helpers def up(): mon.press('up') def down(): mon.press('down') def sel(): mon.press('right') # SELECT = right arrow def back(): mon.press('left') # BACK = left arrow def long_up(): mon.press('up', long=True) def long_down(): mon.press('down', long=True) def long_sel(): mon.press('right', long=True) def reset(): long_sel() # open settings down(); down() # → Reset Round sel() # trigger reset time.sleep(0.4) def score_hole2_advance(): for _ in range(5): up() long_up(); long_up() # putts (fires short in emulator) sel() # advance to hole 3 def score_hole3(): up(); up(); up() def open_scorecard(): sel() # item 0 = View Scorecard def open_hole_picker(): down(); sel() # item 0→1 (Jump to Hole), SELECT def jump_hole1(): sel() # select hole 1 time.sleep(0.35) # wait for return-to-main timer def open_controls(): down(); down(); down() # item 0→3 (Controls) sel() def reset_from_settings(): up() # Controls→Reset Round sel() time.sleep(0.35) # ── Step list ────────────────────────────────────────────────────────────── # (step_id, description, action_fn | None, log_pattern | None) # # log_pattern = None for steps that either have no state change to verify # or where the emulator limitation prevents the expected handler from firing. EMULATOR_NOTE = " ⚠ EMULATOR: long press fires as short press (sendkey limitation)" return [ # ── Clean start ─────────────────────────────────────────────────────── ("00_reset", "Reset round via Settings → HOLE 1 0 str 0 ptt", reset, r"ACT:RESET"), ("01_initial", "Initial state: HOLE 1 STROKES 0 PUTTS 0", None, None), # ── Stroke counter ──────────────────────────────────────────────────── ("02_up_1", "UP → STROKES 1", up, r"ACT:UP hole=1 str=1 ptt=0"), ("03_up_3", "UP × 2 → STROKES 3", lambda: [up(), up()], r"ACT:UP hole=1 str=3 ptt=0"), # ── Putt counter (long press — emulator fires short press instead) ──── ("04_putt_attempt", f"Hold UP (emulator → short UP) STROKES becomes 4", long_up, None), # skip: emulator fires ACT:UP not ACT:LONG_UP ("05_putt_attempt2", f"Hold UP × 2 (emulator → short UP × 2) STROKES becomes 6", lambda: [long_up(), long_up()], None), # ── DOWN correction ─────────────────────────────────────────────────── ("06_down_corrects", "DOWN × 4 → STROKES 2 (correcting emulator over-count)", lambda: [down(), down(), down(), down()], r"ACT:DOWN hole=1 str=2 ptt=0"), # ── Long DOWN (putt decrement — same limitation) ────────────────────── ("07_long_down_attempt", "Hold DOWN (emulator → short DOWN) STROKES becomes 1", long_down, None), # ── Correct back to known state ─────────────────────────────────────── ("08_restore_state", "UP → STROKES 2 (restore to known state for next steps)", up, r"ACT:UP hole=1 str=2 ptt=0"), # ── Hole advance ────────────────────────────────────────────────────── ("09_hole_2", "SELECT → HOLE 2 (hole 1 saved: 2 str)", sel, r"ACT:SELECT hole=2"), # ── Multi-hole data for scorecard ───────────────────────────────────── ("10_hole2_scored", "Score hole 2: UP × 5, long UP × 2 (→ 7 str), SELECT → HOLE 3", score_hole2_advance, r"ACT:SELECT hole=3"), ("11_hole3_scored", "Score hole 3: UP × 3", score_hole3, r"ACT:UP hole=3 str=3 ptt=0"), # ── Settings menu ───────────────────────────────────────────────────── ("12_settings", "Hold SELECT → Settings menu", long_sel, r"ACT:SETTINGS"), # ── Scorecard ───────────────────────────────────────────────────────── ("13_scorecard", "SELECT → View Scorecard", open_scorecard, r"ACT:SCORECARD"), ("14_scorecard_scrolled", "DOWN × 3 → scroll scorecard", lambda: [down(), down(), down()], None), ("15_back_to_settings", "BACK → Settings menu", back, None), # ── Hole picker ─────────────────────────────────────────────────────── ("16_hole_picker", "DOWN + SELECT → Jump to Hole picker (HOLE 3 pre-selected)", open_hole_picker, None), ("17_picker_up2", "UP × 2 → HOLE 1 highlighted", lambda: [up(), up()], None), ("18_jumped_hole1", "SELECT → jump to HOLE 1 (2 str)", jump_hole1, r"ACT:JUMP hole=1"), # ── Controls / help ─────────────────────────────────────────────────── ("19_settings_fresh", "Hold SELECT → Settings (fresh open from main)", long_sel, r"ACT:SETTINGS"), ("20_controls", "DOWN × 3 + SELECT → Controls cheatsheet", open_controls, r"ACT:CONTROLS"), ("21_controls_scrolled", "DOWN × 2 → scroll controls", lambda: [down(), down()], None), ("22_back_to_settings", "BACK → Settings (Controls highlighted)", back, None), # ── Reset round ─────────────────────────────────────────────────────── ("23_after_reset", "UP + SELECT → Reset Round → HOLE 1 0 str", reset_from_settings, r"ACT:RESET"), # ── Floor checks ────────────────────────────────────────────────────── ("24_stroke_floor", "DOWN at 0 → STROKES stays 0", down, r"ACT:DOWN hole=1 str=0 ptt=0"), ("25_long_down_floor", "Hold DOWN at 0 (emulator → short DOWN) STROKES stays 0", long_down, None), ] # ── Runner ──────────────────────────────────────────────────────────────────── def main(): if len(sys.argv) < 4: print(f"Usage: {sys.argv[0]} [--update]") return 2 platform = sys.argv[1] shots_dir = sys.argv[2] baselines_dir = sys.argv[3] update_mode = '--update' in sys.argv # Locate monitor port try: state = json.load(open(EMULATOR_STATE)) pdata = state.get(platform) if not pdata: print(f"ERROR: '{platform}' not in {EMULATOR_STATE}", file=sys.stderr) return 2 sdk_ver = next(iter(pdata)) mon_port = pdata[sdk_ver]['qemu']['monitor'] except Exception as e: print(f"ERROR: {e}", file=sys.stderr) return 2 print(f" Monitor port {mon_port} ({platform})") try: mon = Monitor(mon_port) except Exception as e: print(f" ERROR: monitor: {e}", file=sys.stderr) return 2 os.makedirs(shots_dir, exist_ok=True) os.makedirs(baselines_dir, exist_ok=True) steps = make_steps(mon) failures = [] for step_id, desc, action, log_pattern in steps: shot = os.path.join(shots_dir, f"{platform}_{step_id}.png") baseline = os.path.join(baselines_dir, f"{platform}_{step_id}.png") # Capture logs and run action together; logs killed before screenshot log_ok, log_info = capture_logs(platform, action, log_pattern) # Screenshot (taken AFTER log process is dead — no connection conflict) if not capture(platform, shot): print(f" ✗ [{step_id}] screenshot failed") failures.append(step_id) continue # Baseline update or comparison if update_mode: shutil.copy2(shot, baseline) img_result = "baseline saved" elif os.path.exists(baseline): img_ok, img_result = compare_images(baseline, shot) if not img_ok: failures.append(step_id) else: img_ok, img_result = None, "no baseline" # Determine display mark img_failed = not update_mode and img_ok is False log_failed = log_ok is False # None = skipped, not failure step_failed = img_failed or log_failed mark = "↑" if update_mode else ("✗" if step_failed else "✓") print(f" {mark} [{step_id}] {desc}") if log_ok is True: print(f" log ✓ {log_info}") elif log_ok is False: print(f" log ✗ {log_info}") failures.append(step_id) # log_ok is None → silently skipped if not update_mode and img_ok is not None: img_sym = "img ✓" if img_ok else "img ✗" print(f" {img_sym} {img_result}") elif update_mode: print(f" {img_result}") mon.close() failures = list(dict.fromkeys(failures)) # deduplicate if failures: print(f"\n {len(failures)} failure(s): {', '.join(failures)}") return 1 return 0 if __name__ == '__main__': sys.exit(main())