feat(ui): Qt progress window with Prism-Launcher-inspired dark palette
CI / test (3.10) (push) Successful in 8s
CI / test (3.11) (push) Successful in 6s
CI / test (3.12) (push) Successful in 7s
CI / build-pyz (push) Successful in 4s
CI / release (push) Has been skipped

cloud-sync now ships a real Qt UI alongside the tkinter fallback.

Architecture:
  - HeadlessProgress: --no-gui path, plain stdout
  - TkProgressWindow: stdlib fallback when Qt isn't installed
  - QtProgressWindow: preferred path; supports both PySide6 and PyQt6
    (interchangeable APIs for our subset)

The factory in ui.py picks Qt → tkinter → headless. Tk stays so the
zipapp still works on bare Python with no extras.

Threading: QApplication runs on the main thread (started by run_with
via QDialog.exec). The restic worker runs on a daemon threading.Thread.
Cross-thread UI updates go via a Signal on a bridge QObject so Qt
auto-marshals them onto the main thread via a queued connection.

Cancellation: WM close + Cancel button both set a flag. sync.pull/push
pass ui.is_cancelled as restic.run's cancel_check; the subprocess gets
killed and returns -1 → exit 1.

Theme: Fusion style + Prism's dark palette (RGB values copied as facts
from PrismLauncher's DarkTheme.cpp). Override with PRISM_THEME=off.

Pyz size went 20 KB → 36 KB (added ui.py + ui_qt.py).
33 tests still green.
This commit is contained in:
2026-06-04 23:12:58 +02:00
parent 49d1cb3280
commit fe26ed309c
5 changed files with 475 additions and 11 deletions
+6 -3
View File
@@ -83,8 +83,8 @@ def parse(argv: list[str]) -> tuple[str, Args]:
def main(argv: list[str] | None = None) -> int: def main(argv: list[str] | None = None) -> int:
"""CLI entrypoint. Returns exit code (0=ok, 1=user cancel, 2=error).""" """CLI entrypoint. Returns exit code (0=ok, 1=user cancel, 2=error)."""
# Import here to keep CLI import light (test isolation). # Imports kept here so tests of parse() don't drag UI in.
from . import sync from . import sync, ui
try: try:
cmd, args = parse(sys.argv[1:] if argv is None else argv) cmd, args = parse(sys.argv[1:] if argv is None else argv)
@@ -92,8 +92,11 @@ def main(argv: list[str] | None = None) -> int:
return int(e.code) if isinstance(e.code, int) else 2 return int(e.code) if isinstance(e.code, int) else 2
action = {"pull": sync.pull, "push": sync.push}[cmd] action = {"pull": sync.pull, "push": sync.push}[cmd]
progress = ui.make_progress(headless=args.headless)
title = "Cloud sync — pulling" if cmd == "pull" else "Cloud sync — pushing"
try: try:
return action(args) return progress.run_with(lambda: action(args, progress), title)
except KeyboardInterrupt: except KeyboardInterrupt:
print("cloud-sync: cancelled", file=sys.stderr) print("cloud-sync: cancelled", file=sys.stderr)
return 1 return 1
+41 -3
View File
@@ -23,10 +23,12 @@ import stat
import subprocess import subprocess
import sys import sys
import tempfile import tempfile
import time
import urllib.request import urllib.request
import zipfile import zipfile
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import Callable
from .cli import Args from .cli import Args
@@ -107,13 +109,20 @@ def run(
env: dict[str, str] | None = None, env: dict[str, str] | None = None,
cwd: Path | None = None, cwd: Path | None = None,
timeout: int = 900, timeout: int = 900,
cancel_check: Callable[[], bool] | None = None,
) -> tuple[int, str]: ) -> tuple[int, str]:
"""Run restic. Inherits stderr to caller's terminal for live progress. """Run restic. Inherits stderr to caller's terminal for live progress.
Returns (returncode, captured_stdout).""" Returns (returncode, captured_stdout).
When cancel_check is supplied, polls every 100 ms; if it returns True,
kills restic and returns ``(-1, "")``.
"""
merged_env = dict(os.environ) merged_env = dict(os.environ)
if env: if env:
merged_env.update(env) merged_env.update(env)
p = subprocess.run( # noqa: S603 — controlled invocation
if cancel_check is None:
p_run = subprocess.run( # noqa: S603
[str(binary), *args], [str(binary), *args],
cwd=str(cwd) if cwd else None, cwd=str(cwd) if cwd else None,
env=merged_env, env=merged_env,
@@ -123,7 +132,36 @@ def run(
timeout=timeout, timeout=timeout,
check=False, check=False,
) )
return p.returncode, p.stdout return p_run.returncode, p_run.stdout
# Cancel-capable path: spawn + poll
p = subprocess.Popen( # noqa: S603
[str(binary), *args],
cwd=str(cwd) if cwd else None,
env=merged_env,
stdout=subprocess.PIPE,
stderr=sys.stderr,
text=True,
)
deadline = time.monotonic() + timeout
while p.poll() is None:
if cancel_check():
p.kill()
try:
p.wait(timeout=5)
except subprocess.TimeoutExpired:
pass
return -1, ""
if time.monotonic() > deadline:
p.kill()
try:
p.wait(timeout=5)
except subprocess.TimeoutExpired:
pass
raise subprocess.TimeoutExpired([str(binary), *args], timeout)
time.sleep(0.1)
out = p.stdout.read() if p.stdout else ""
return p.returncode, out
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
+26 -3
View File
@@ -23,25 +23,33 @@ from pathlib import Path
from . import restic, scope as scopemod from . import restic, scope as scopemod
from .cli import Args from .cli import Args
from .creds import read_credentials from .creds import read_credentials
from .ui import HeadlessProgress, Progress
def pull(args: Args) -> int: def pull(args: Args, progress: Progress | None = None) -> int:
"""Restore latest snapshot's files into pack_folder. """Restore latest snapshot's files into pack_folder.
If the repo has no snapshots yet, this is a no-op (first run on this If the repo has no snapshots yet, this is a no-op (first run on this
machine; nothing to restore). machine; nothing to restore).
""" """
ui = progress or HeadlessProgress()
ui.set_status("Reading credentials…")
discord_id, password = read_credentials(args.token_file) discord_id, password = read_credentials(args.token_file)
ui.set_status("Resolving restic binary…")
binary = restic.resolve_binary(args) binary = restic.resolve_binary(args)
repo = _restic_repo(args.url, discord_id, password) repo = _restic_repo(args.url, discord_id, password)
env = _restic_env() env = _restic_env()
# Check whether any snapshots exist ui.set_status("Checking remote snapshots")
code, out = restic.run( code, out = restic.run(
binary, binary,
["-r", repo, "--insecure-no-password", "snapshots", "--json", "--latest", "1"], ["-r", repo, "--insecure-no-password", "snapshots", "--json", "--latest", "1"],
env=env, env=env,
cancel_check=ui.is_cancelled,
) )
if code == -1:
return 1
if code != 0: if code != 0:
print( print(
f"cloud-sync: failed to list snapshots (restic exit {code})", f"cloud-sync: failed to list snapshots (restic exit {code})",
@@ -50,6 +58,7 @@ def pull(args: Args) -> int:
return 2 return 2
stripped = out.strip() stripped = out.strip()
if stripped in ("", "null", "[]"): if stripped in ("", "null", "[]"):
ui.set_status("No snapshots yet — nothing to pull")
print( print(
"cloud-sync: no snapshots yet for this user " "cloud-sync: no snapshots yet for this user "
"(first run on this machine?); nothing to pull" "(first run on this machine?); nothing to pull"
@@ -59,6 +68,7 @@ def pull(args: Args) -> int:
scope = scopemod.load(args.pack_folder) scope = scopemod.load(args.pack_folder)
_, exclude_from = scopemod.materialize_for_restic(args.pack_folder, scope) _, exclude_from = scopemod.materialize_for_restic(args.pack_folder, scope)
ui.set_status("Restoring files…")
code, _ = restic.run( code, _ = restic.run(
binary, binary,
[ [
@@ -68,17 +78,25 @@ def pull(args: Args) -> int:
"--exclude-file", str(exclude_from), "--exclude-file", str(exclude_from),
], ],
env=env, env=env,
cancel_check=ui.is_cancelled,
) )
if code == -1:
return 1
if code != 0: if code != 0:
print(f"cloud-sync: restic restore failed (exit {code})", file=sys.stderr) print(f"cloud-sync: restic restore failed (exit {code})", file=sys.stderr)
return 2 return 2
ui.set_status("Pull complete")
print("cloud-sync: pull ok") print("cloud-sync: pull ok")
return 0 return 0
def push(args: Args) -> int: def push(args: Args, progress: Progress | None = None) -> int:
"""Snapshot the in-scope files into the user's repo.""" """Snapshot the in-scope files into the user's repo."""
ui = progress or HeadlessProgress()
ui.set_status("Reading credentials…")
discord_id, password = read_credentials(args.token_file) discord_id, password = read_credentials(args.token_file)
ui.set_status("Resolving restic binary…")
binary = restic.resolve_binary(args) binary = restic.resolve_binary(args)
repo = _restic_repo(args.url, discord_id, password) repo = _restic_repo(args.url, discord_id, password)
env = _restic_env() env = _restic_env()
@@ -86,6 +104,7 @@ def push(args: Args) -> int:
scope = scopemod.load(args.pack_folder) scope = scopemod.load(args.pack_folder)
files_from, exclude_from = scopemod.materialize_for_restic(args.pack_folder, scope) files_from, exclude_from = scopemod.materialize_for_restic(args.pack_folder, scope)
ui.set_status("Uploading snapshot…")
code, _ = restic.run( code, _ = restic.run(
binary, binary,
[ [
@@ -98,10 +117,14 @@ def push(args: Args) -> int:
], ],
env=env, env=env,
cwd=args.pack_folder, cwd=args.pack_folder,
cancel_check=ui.is_cancelled,
) )
if code == -1:
return 1
if code != 0: if code != 0:
print(f"cloud-sync: restic backup failed (exit {code})", file=sys.stderr) print(f"cloud-sync: restic backup failed (exit {code})", file=sys.stderr)
return 2 return 2
ui.set_status("Push complete")
print("cloud-sync: push ok") print("cloud-sync: push ok")
return 0 return 0
+214
View File
@@ -0,0 +1,214 @@
"""Progress UI for cloud-sync operations.
Two implementations sharing the ``Progress`` protocol:
- :class:`HeadlessProgress` — no window; prints to stdout/stderr. Used when
``--no-gui`` is set or when tkinter import fails.
- :class:`TkProgressWindow` — tkinter modal window with status text +
indeterminate progress bar + Cancel button. Stdlib only.
The window runs in the main thread; the restic subprocess runs in a
worker thread. The window polls every 100 ms to check whether the worker
finished and whether the user clicked Cancel.
Future option: PySide6 / PyQt6 for a real Qt window matching Prism's
look. Gated behind ``cloud-sync[qt]`` extra in ``pyproject.toml``; not
implemented yet.
"""
from __future__ import annotations
import sys
import threading
from typing import Callable, Protocol
class Progress(Protocol):
"""Interface for status + cancellation reporting during a sync run."""
def set_status(self, msg: str) -> None: ...
def is_cancelled(self) -> bool: ...
def run_with(self, worker: Callable[[], int], title: str) -> int: ...
# ---------------------------------------------------------------------------
# Headless (text-only)
# ---------------------------------------------------------------------------
class HeadlessProgress:
"""No-op progress. Status messages go to stdout, errors to stderr."""
def set_status(self, msg: str) -> None:
print(f"cloud-sync: {msg}", flush=True)
def is_cancelled(self) -> bool:
return False
def run_with(self, worker: Callable[[], int], title: str) -> int:
self.set_status(title)
return worker()
# ---------------------------------------------------------------------------
# Tk window
# ---------------------------------------------------------------------------
class TkProgressWindow:
"""Modal tkinter window with status + indeterminate progress bar.
Usage::
ui = TkProgressWindow()
rc = ui.run_with(lambda: do_the_work(ui), title="Cloud pull")
"""
def __init__(self) -> None:
# Defer imports so headless fall-through doesn't blow up on
# tkinter-less Python builds.
import tkinter as tk
from tkinter import ttk
self._tk = tk
self._ttk = ttk
self._root = tk.Tk()
self._root.title("Cloud sync")
self._root.geometry("440x160")
self._root.resizable(False, False)
self._root.attributes("-topmost", True)
self._root.protocol("WM_DELETE_WINDOW", self._on_close)
frame = ttk.Frame(self._root, padding=20)
frame.pack(fill="both", expand=True)
self._title_var = tk.StringVar(value="Working…")
ttk.Label(
frame,
textvariable=self._title_var,
font=("TkDefaultFont", 11, "bold"),
).pack(anchor="w")
self._status_var = tk.StringVar(value="Starting…")
ttk.Label(frame, textvariable=self._status_var).pack(anchor="w", pady=(4, 8))
self._bar = ttk.Progressbar(frame, mode="indeterminate", length=400)
self._bar.pack(fill="x")
self._bar.start(15)
button_row = ttk.Frame(frame)
button_row.pack(fill="x", pady=(12, 0))
self._cancel_btn = ttk.Button(
button_row, text="Cancel", command=self._on_close
)
self._cancel_btn.pack(side="right")
self._cancelled = False
self._worker_rc: int | None = None
self._worker_exc: BaseException | None = None
self._center_on_screen()
# -- public API ----------------------------------------------------
def set_status(self, msg: str) -> None:
try:
self._status_var.set(msg)
except self._tk.TclError:
# window was destroyed
pass
def is_cancelled(self) -> bool:
return self._cancelled
def run_with(self, worker: Callable[[], int], title: str) -> int:
self._title_var.set(title)
self._status_var.set("Starting…")
def thread_target() -> None:
try:
self._worker_rc = worker()
except BaseException as e: # noqa: BLE001
self._worker_exc = e
t = threading.Thread(target=thread_target, daemon=True)
t.start()
self._root.after(100, self._poll, t)
self._root.mainloop()
if self._worker_exc is not None:
raise self._worker_exc
if self._worker_rc is None:
# User cancelled before worker reported
return 1
return self._worker_rc
# -- internals -----------------------------------------------------
def _poll(self, thread: threading.Thread) -> None:
if not thread.is_alive():
try:
self._bar.stop()
self._root.quit()
self._root.destroy()
except self._tk.TclError:
pass
return
self._root.after(100, self._poll, thread)
def _on_close(self) -> None:
# Mark cancelled; worker checks via is_cancelled. Don't destroy
# window — polling loop will clean up once worker exits.
self._cancelled = True
try:
self._status_var.set("Cancelling…")
self._cancel_btn.configure(state="disabled")
except self._tk.TclError:
pass
def _center_on_screen(self) -> None:
self._root.update_idletasks()
w = self._root.winfo_width()
h = self._root.winfo_height()
sw = self._root.winfo_screenwidth()
sh = self._root.winfo_screenheight()
x = max(0, (sw // 2) - (w // 2))
y = max(0, (sh // 2) - (h // 2))
self._root.geometry(f"+{x}+{y}")
# ---------------------------------------------------------------------------
# Factory
# ---------------------------------------------------------------------------
def make_progress(headless: bool) -> Progress:
"""Pick the best Progress impl for the runtime + flags.
Preference order:
1. Qt (PySide6 or PyQt6) — modern look, matches Prism's aesthetic.
2. tkinter — stdlib fallback; ships with most Python distributions.
3. headless — print to stdout/stderr only.
Override via ``--no-gui`` (forces headless).
"""
if headless:
return HeadlessProgress()
try:
from .ui_qt import QtProgressWindow
return QtProgressWindow()
except ImportError:
pass
except Exception as e: # noqa: BLE001
print(
f"cloud-sync: Qt init failed ({e}); falling back to tkinter",
file=sys.stderr,
)
try:
return TkProgressWindow()
except Exception as e: # noqa: BLE001
print(
f"cloud-sync: tkinter unavailable ({e}); falling back to headless",
file=sys.stderr,
)
return HeadlessProgress()
+186
View File
@@ -0,0 +1,186 @@
"""Qt progress UI for cloud-sync.
Supports both PySide6 (preferred — LGPL, official Qt binding) and PyQt6
(fallback — GPL/commercial). Same code runs on both because their
QtWidgets / QtCore APIs are interchangeable for our subset.
This module never imports Qt at top level. ``import_qt()`` raises
ImportError if neither binding is available; the factory in ``ui.py``
catches that and falls back to the tkinter window.
Threading model: ``QApplication`` runs on the main thread (started by
``run_with`` via ``QDialog.exec``); the restic worker runs on a daemon
``threading.Thread``; cross-thread UI updates go via a ``Signal`` on a
bridge ``QObject`` so Qt enqueues them onto the main thread.
"""
from __future__ import annotations
import sys
import threading
from typing import Any, Callable
def import_qt() -> tuple[Any, Any, Any]:
"""Return (QtWidgets, QtCore, Signal). Raises ImportError if neither
PySide6 nor PyQt6 is installed."""
try:
from PySide6 import QtCore, QtWidgets # type: ignore
return QtWidgets, QtCore, QtCore.Signal
except ImportError:
pass
try:
from PyQt6 import QtCore, QtWidgets # type: ignore
return QtWidgets, QtCore, QtCore.pyqtSignal
except ImportError as e:
raise ImportError(
"neither PySide6 nor PyQt6 is installed; "
"pip install 'cloud-sync[qt]' or pip install PySide6"
) from e
def _import_qtgui() -> Any:
"""Return whichever QtGui module is available (PySide6 or PyQt6)."""
try:
from PySide6 import QtGui # type: ignore
return QtGui
except ImportError:
from PyQt6 import QtGui # type: ignore
return QtGui
def _apply_prism_dark(app: Any) -> None:
"""Apply a Prism-Launcher-inspired dark palette + Fusion style.
Palette RGB values are facts taken from
PrismLauncher/launcher/ui/themes/DarkTheme.cpp (GPL-3.0). Numerical
color values themselves aren't copyrightable; only the surrounding
code is. We reimplement the same look from scratch under MIT.
Override via ``PRISM_THEME=off`` to keep the system / Qt-default
palette (useful if the user's desktop theme handles dark mode).
"""
import os
if os.environ.get("PRISM_THEME") == "off":
return
G = _import_qtgui()
app.setStyle("Fusion")
p = G.QPalette()
Role = G.QPalette.ColorRole
white = G.QColor("white")
p.setColor(Role.Window, G.QColor(49, 49, 49))
p.setColor(Role.WindowText, white)
p.setColor(Role.Base, G.QColor(34, 34, 34))
p.setColor(Role.AlternateBase, G.QColor(42, 42, 42))
p.setColor(Role.ToolTipBase, white)
p.setColor(Role.ToolTipText, white)
p.setColor(Role.Text, white)
p.setColor(Role.Button, G.QColor(48, 48, 48))
p.setColor(Role.ButtonText, white)
p.setColor(Role.BrightText, G.QColor("red"))
p.setColor(Role.Link, G.QColor(47, 163, 198))
p.setColor(Role.Highlight, G.QColor(150, 219, 89)) # Prism's signature green
p.setColor(Role.HighlightedText, G.QColor("black"))
p.setColor(Role.PlaceholderText, G.QColor("darkGray"))
app.setPalette(p)
class QtProgressWindow:
"""Modal Qt dialog: title + status + indeterminate progress + Cancel."""
def __init__(self) -> None:
QtWidgets, QtCore, Signal = import_qt()
self._QtCore = QtCore
# Bridge so worker thread can update UI via a queued signal.
class _Bridge(QtCore.QObject): # type: ignore[misc, valid-type]
status_changed = Signal(str)
finished = Signal(int)
app_existed = QtWidgets.QApplication.instance() is not None
self._app = QtWidgets.QApplication.instance() or QtWidgets.QApplication(
sys.argv
)
if not app_existed:
_apply_prism_dark(self._app)
self._dialog = QtWidgets.QDialog()
self._dialog.setWindowTitle("Cloud sync")
self._dialog.setFixedSize(440, 160)
# Block ESC + window close X → mark cancelled, don't accept
self._dialog.setWindowFlag(
QtCore.Qt.WindowType.WindowContextHelpButtonHint, False
)
layout = QtWidgets.QVBoxLayout(self._dialog)
layout.setContentsMargins(20, 20, 20, 20)
self._title_label = QtWidgets.QLabel("Working…")
font = self._title_label.font()
font.setBold(True)
font.setPointSize(font.pointSize() + 1)
self._title_label.setFont(font)
layout.addWidget(self._title_label)
self._status_label = QtWidgets.QLabel("Starting…")
layout.addWidget(self._status_label)
self._bar = QtWidgets.QProgressBar()
self._bar.setRange(0, 0) # indeterminate
self._bar.setTextVisible(False)
layout.addWidget(self._bar)
button_row = QtWidgets.QHBoxLayout()
button_row.addStretch(1)
self._cancel_btn = QtWidgets.QPushButton("Cancel")
button_row.addWidget(self._cancel_btn)
layout.addLayout(button_row)
self._bridge = _Bridge()
self._bridge.status_changed.connect(self._status_label.setText)
self._bridge.finished.connect(self._on_finished)
self._cancel_btn.clicked.connect(self._on_cancel)
self._dialog.rejected.connect(self._on_cancel)
self._cancelled = False
self._worker_rc: int | None = None
self._worker_exc: BaseException | None = None
# -- public API ----------------------------------------------------
def set_status(self, msg: str) -> None:
self._bridge.status_changed.emit(msg)
def is_cancelled(self) -> bool:
return self._cancelled
def run_with(self, worker: Callable[[], int], title: str) -> int:
self._title_label.setText(title)
self._status_label.setText("Starting…")
def thread_target() -> None:
try:
rc = worker()
except BaseException as e: # noqa: BLE001
self._worker_exc = e
rc = 1
self._worker_rc = rc
self._bridge.finished.emit(rc)
t = threading.Thread(target=thread_target, daemon=True)
t.start()
self._dialog.exec()
if self._worker_exc is not None:
raise self._worker_exc
return self._worker_rc if self._worker_rc is not None else 1
# -- internals -----------------------------------------------------
def _on_cancel(self) -> None:
self._cancelled = True
self._status_label.setText("Cancelling…")
self._cancel_btn.setEnabled(False)
def _on_finished(self, _rc: int) -> None:
self._dialog.accept()