"""pull + push entry points + divergence detection. Backend: restic-rest-server. URL form: ``rest:://:@//`` Password = HTTP basic auth ONLY. Restic repos are initialised with ``--insecure-no-password``; encryption-at-rest delegated to LUKS on the host disk + TLS at the reverse proxy. Divergence detection (pull only): 1. Fetch remote latest snapshot id + time via ``snapshots --json --latest 1``. 2. Read ``/.cloud-sync/state.json`` for the last-pulled snapshot id. 3. Branches: - no state.json AND remote empty → no-op - no state.json AND remote non-empty → first-run restore - state.last_pulled == remote.id → up to date, skip - state.last_pulled != remote.id, no local edits since state.last_pulled_at → fast-forward restore - state.last_pulled != remote.id, local edits since state.last_pulled_at → CONFLICT dialog """ from __future__ import annotations import fnmatch import json import sys import urllib.parse from datetime import datetime, timezone from pathlib import Path from . import config as cfgmod, restic, scope as scopemod, state as statemod from .cli import Args from .creds import read_credentials from .ui import HeadlessProgress, Progress def pull(args: Args, progress: Progress | None = None) -> int: ui = progress or HeadlessProgress() # Opt-in gate: no sync.json → instance isn't sync-enabled, silent no-op. # This is the path the global Prism PreLaunch hook takes for instances # the player hasn't opted into sync. sync_cfg = cfgmod.read(args.pack_folder) if sync_cfg is None: return 0 # First-run login. If the user declines, skip without blocking the # launch (return 0 — non-fatal for Prism PreLaunch). if not args.token_file.exists(): if not _prompt_login_and_save(args, ui): ui.set_status("Sync skipped") print("instance-sync: no token; skipping pull") return 0 ui.set_status("Reading credentials…") discord_id, password = read_credentials(args.token_file) ui.set_status("Resolving restic binary…") binary = restic.resolve_binary(args) repo = _restic_repo(sync_cfg.url, discord_id, password, sync_cfg.instance_id) env = _restic_env() label = cfgmod.resolve_label(args.instance_label, sync_cfg.instance_id) ui.set_status("Checking remote snapshots…") code, out = restic.run( binary, ["-r", repo, "--insecure-no-password", "snapshots", "--json", "--latest", "1"], env=env, cancel_check=ui.is_cancelled, ) if code == -1: return 1 if code != 0: print( f"instance-sync: failed to list snapshots (restic exit {code})", file=sys.stderr, ) return 2 snapshots = _parse_snapshots(out) if not snapshots: statemod.clear(args.pack_folder) ui.set_status("No snapshots yet — nothing to pull") print("instance-sync: no snapshots yet for this user; nothing to pull") return 0 remote = snapshots[0] remote_id = remote["id"] remote_time = _parse_restic_time(remote["time"]) local_state = statemod.read(args.pack_folder) scope = scopemod.load(args.pack_folder) decision: str if local_state is None: decision = "use_remote" elif local_state.last_pulled_snapshot_id == remote_id: ui.set_status("Cloud is up to date") print("instance-sync: already at latest snapshot") return 0 else: modified = _find_modified_in_scope( args.pack_folder, scope, local_state.last_pulled_at ) if not modified: decision = "use_remote" else: decision = _ask_conflict(modified, remote_time, label) if decision is None: # UI unavailable in headless mode → conservative: cancel ui.set_status("Conflict detected; no UI available") print( "instance-sync: conflict detected (remote moved + local edits) " "but headless mode can't prompt; aborting", file=sys.stderr, ) return 1 if decision == "cancel": ui.set_status("Launch cancelled") return 1 if decision == "keep_local": ui.set_status("Keeping local; push will overwrite cloud on exit") print("instance-sync: keeping local copy") return 0 # decision == "use_remote" _, exclude_from = scopemod.materialize_for_restic(args.pack_folder, scope) ui.set_status("Restoring files…") code, _ = restic.run( binary, [ "-r", repo, "--insecure-no-password", "restore", "latest", "--target", str(args.pack_folder), "--exclude-file", str(exclude_from), ], env=env, cancel_check=ui.is_cancelled, ) if code == -1: return 1 if code != 0: print(f"instance-sync: restic restore failed (exit {code})", file=sys.stderr) return 2 statemod.write( args.pack_folder, statemod.State( instance_id=sync_cfg.instance_id, last_pulled_snapshot_id=remote_id, last_pulled_at=datetime.now(timezone.utc), ), ) ui.set_status("Pull complete") print("instance-sync: pull ok") return 0 def push(args: Args, progress: Progress | None = None) -> int: ui = progress or HeadlessProgress() # Opt-in gate — see pull(). sync_cfg = cfgmod.read(args.pack_folder) if sync_cfg is None: return 0 if not args.token_file.exists(): ui.set_status("No token; skipping push") print("instance-sync: no token; skipping push") return 0 ui.set_status("Reading credentials…") discord_id, password = read_credentials(args.token_file) ui.set_status("Resolving restic binary…") binary = restic.resolve_binary(args) repo = _restic_repo(sync_cfg.url, discord_id, password, sync_cfg.instance_id) env = _restic_env() # First-push-on-a-new-instance: probe + init if the per-instance repo # doesn't exist yet. Idempotent on subsequent pushes. code = _ensure_repo_initialized(binary, repo, env, ui) if code != 0: return code scope = scopemod.load(args.pack_folder) files_from, exclude_from = scopemod.materialize_for_restic(args.pack_folder, scope) ui.set_status("Uploading snapshot…") code, out = restic.run( binary, [ "-r", repo, "--insecure-no-password", "backup", "--files-from", str(files_from), "--exclude-file", str(exclude_from), "--host", "instance-sync", "--tag", "auto", "--json", ], env=env, cwd=args.pack_folder, cancel_check=ui.is_cancelled, ) if code == -1: return 1 if code != 0: print(f"instance-sync: restic backup failed (exit {code})", file=sys.stderr) return 2 new_id = _parse_backup_summary(out) if new_id is not None: statemod.write( args.pack_folder, statemod.State( instance_id=sync_cfg.instance_id, last_pulled_snapshot_id=new_id, last_pulled_at=datetime.now(timezone.utc), ), ) ui.set_status("Push complete") print("instance-sync: push ok") return 0 # --------------------------------------------------------------------------- # divergence detection # --------------------------------------------------------------------------- def _find_modified_in_scope( pack_folder: Path, scope: scopemod.Scope, since: datetime ) -> list[tuple[Path, datetime]]: """Return (rel_path, mtime) for in-scope files newer than ``since``. Walks each include root, skips paths matching any exclude glob. Stops early at 50 hits (we only need to know "any" + sample some for the conflict dialog). False positives are safer than false negatives — a spurious conflict shows a dismissable dialog; a missed conflict silently overwrites the user's edits. """ since_ts = since.timestamp() hits: list[tuple[Path, datetime]] = [] for include in scope.include: root = pack_folder / include.rstrip("/") if not root.exists(): continue candidates: list[Path] = [root] if root.is_file() else _walk_files(root) for f in candidates: try: mtime = f.stat().st_mtime except OSError: continue if mtime <= since_ts: continue rel = f.relative_to(pack_folder) if _matches_any(rel, scope.exclude): continue hits.append((rel, datetime.fromtimestamp(mtime, tz=timezone.utc))) if len(hits) >= 50: return hits return hits def _walk_files(root: Path): try: for p in root.rglob("*"): if p.is_file(): yield p except OSError: return def _matches_any(rel: Path, patterns: list[str]) -> bool: """Restic-style glob match against the relative path. Subset of restic's exclude semantics that covers our default scope: ``foo/`` — foo itself OR anything under it ``**/foo/`` — any ancestor directory named foo, recursively ``foo/bar*`` — fnmatch against the full relative path ``**/*.log`` — fnmatch the basename (and any tail subpath) """ rel_str = str(rel) parts = rel.parts for pat in patterns: if pat.endswith("/"): core = pat.rstrip("/") if core.startswith("**/"): tail = core[len("**/") :] if tail in parts: return True continue if rel_str == core or rel_str.startswith(core + "/"): return True continue if fnmatch.fnmatch(rel_str, pat): return True if pat.startswith("**/"): suffix = pat[len("**/") :] if fnmatch.fnmatch(rel.name, suffix): return True for i in range(len(parts)): if fnmatch.fnmatch("/".join(parts[i:]), suffix): return True return False def _ask_conflict( modified: list[tuple[Path, datetime]], remote_time: datetime, label: str, ) -> str | None: """Show the conflict dialog. Returns choice or None if no UI available.""" try: from .ui_qt import prompt_conflict_qt except ImportError: return None newest = max(modified, key=lambda h: h[1]) return prompt_conflict_qt( local_modified=_format_dt(newest[1]), remote_modified=_format_dt(remote_time), save_label=label, ) def _prompt_login_and_save(args: Args, ui: Progress) -> bool: """First-run login. Returns True if a token was saved, False if skipped.""" try: from .ui_qt import prompt_login_qt except ImportError: ui.set_status("No token and no UI; can't prompt") print( "instance-sync: no token at " f"{args.token_file} and no Qt UI available", file=sys.stderr, ) return False token = prompt_login_qt() if token is None: return False args.token_file.parent.mkdir(parents=True, exist_ok=True) args.token_file.write_text(token + "\n", encoding="utf-8") args.token_file.chmod(0o600) return True # --------------------------------------------------------------------------- # restic output parsing # --------------------------------------------------------------------------- def _parse_snapshots(out: str) -> list[dict]: s = out.strip() if not s or s == "null": return [] try: data = json.loads(s) except json.JSONDecodeError: return [] return data if isinstance(data, list) else [] def _parse_restic_time(s: str) -> datetime: """Restic emits e.g. ``2026-06-04T18:33:21.123456789Z``.""" if s.endswith("Z"): s = s[:-1] + "+00:00" # Python's fromisoformat doesn't accept nanosecond precision — trim to micro. if "." in s: head, _, tail = s.partition(".") frac, _, tz = tail.partition("+") if tz: tz = "+" + tz else: frac, _, tz = tail.partition("-") if tz: tz = "-" + tz s = f"{head}.{frac[:6]}{tz}" return datetime.fromisoformat(s).astimezone(timezone.utc) def _parse_backup_summary(out: str) -> str | None: """restic backup --json emits one JSON line per event; the final ``summary`` event carries ``snapshot_id``.""" for line in reversed(out.splitlines()): line = line.strip() if not line: continue try: ev = json.loads(line) except json.JSONDecodeError: continue if ev.get("message_type") == "summary" and "snapshot_id" in ev: return str(ev["snapshot_id"]) return None # --------------------------------------------------------------------------- # misc helpers # --------------------------------------------------------------------------- def _format_dt(dt: datetime) -> str: """Format a tz-aware datetime as 'Thursday, October 21, 2021 at 7:12 PM'. Hand-rolled instead of ``strftime("%-d")`` because GNU strftime's leading-zero stripping syntax is platform-specific (``%-d`` vs ``%#d``). """ local = dt.astimezone() weekday = local.strftime("%A") month = local.strftime("%B") hour = local.hour % 12 or 12 ampm = local.strftime("%p") return f"{weekday}, {month} {local.day}, {local.year} at {hour}:{local.minute:02d} {ampm}" def _restic_repo( base_url: str, discord_id: str, password: str, instance_id: str ) -> str: """Build ``rest:://:@///``. Per-instance subpath under the user's namespace. ``--private-repos`` on restic-rest-server only enforces the FIRST path segment (the username); deeper segments are user-controlled, so each instance gets its own isolated restic repo without server-side coordination. """ raw = base_url.strip() if raw.startswith("rest:"): raw = raw[len("rest:"):] raw = raw.rstrip("/") scheme_end = raw.find("://") if scheme_end <= 0: raise ValueError( f"url must include scheme (http:// or https://): {base_url!r}" ) scheme = raw[: scheme_end + 3] host_and_path = raw[scheme_end + 3 :] u = urllib.parse.quote(discord_id, safe="") p = urllib.parse.quote(password, safe="") iid = urllib.parse.quote(instance_id, safe="") return f"rest:{scheme}{u}:{p}@{host_and_path}/{discord_id}/{iid}/" def _ensure_repo_initialized( binary: Path, repo: str, env: dict[str, str], ui: "Progress" ) -> int: """`restic cat config` to probe; `restic init` if absent. Returns 0/1/2. Cheap to call before every push — `cat config` is a single HTTP GET. Idempotent: if the repo already exists, init is skipped. """ code, _ = restic.run( binary, ["-r", repo, "--insecure-no-password", "cat", "config"], env=env, cancel_check=ui.is_cancelled, ) if code == 0: return 0 if code == -1: return 1 # cat config failed → assume not initialized; init it. ui.set_status("Initializing remote repo…") code, _ = restic.run( binary, ["-r", repo, "--insecure-no-password", "init"], env=env, cancel_check=ui.is_cancelled, ) if code == -1: return 1 if code != 0: print( f"instance-sync: restic init failed (exit {code})", file=sys.stderr ) return 2 return 0 def _restic_env() -> dict[str, str]: return { # No RESTIC_PASSWORD — repos use --insecure-no-password. "RESTIC_PROGRESS_FPS": "0", }