#!/usr/bin/env python3 """Collect recent image downloads into a target directory with manifest output.""" from __future__ import annotations import argparse import hashlib import json import re import shutil import subprocess import sys import time from datetime import datetime, timezone from pathlib import Path def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Collect recent image downloads into a target directory." ) parser.add_argument( "--source", action="append", help=( "Source download directory. Repeatable. " "If omitted, auto-discovers Playwright temp downloads and then " "falls back to ~/Downloads." ), ) parser.add_argument( "--target", required=True, help="Target directory for collected files.", ) parser.add_argument( "--since", type=float, default=time.time() - 1800, help="Unix timestamp lower bound for file mtime. Default: now-1800s", ) parser.add_argument( "--ext", default="png,jpg,jpeg,webp", help="Comma-separated file extensions to include.", ) parser.add_argument( "--limit", type=int, default=8, help="Maximum files to collect. Default: 8", ) parser.add_argument( "--expected-count", type=int, default=None, help="Required minimum number of collected files.", ) parser.add_argument( "--prefix", default="gemini-image", help="Filename prefix for collected files. Default: gemini-image", ) parser.add_argument( "--batch-id", default=None, help="Batch ID used in output filenames. Default: current timestamp.", ) parser.add_argument( "--manifest", default=None, help="Manifest output path. Default: /--manifest.json", ) parser.add_argument( "--prompt", default="", help="Prompt text to store in manifest.", ) parser.add_argument( "--move", action="store_true", help="Move files instead of copying.", ) parser.add_argument( "--no-dedupe-target", action="store_true", help="Disable hash dedupe against existing files in target directory.", ) return parser.parse_args() def unique_path(path: Path) -> Path: if not path.exists(): return path stem = path.stem suffix = path.suffix parent = path.parent idx = 2 while True: candidate = parent / f"{stem}-{idx}{suffix}" if not candidate.exists(): return candidate idx += 1 def collect_candidates(source: Path, since_ts: float, allowed_ext: set[str]) -> list[Path]: files: list[Path] = [] if not source.exists(): return files for path in source.rglob("*"): if not path.is_file(): continue ext = path.suffix.lower().lstrip(".") if ext not in allowed_ext: continue try: mtime = path.stat().st_mtime except OSError: continue if mtime >= since_ts: files.append(path) files.sort(key=lambda p: p.stat().st_mtime, reverse=True) return files def discover_playwright_sources() -> list[Path]: globs = ( "/var/folders/*/*/T/playwright-mcp-output/*", "/private/var/folders/*/*/T/playwright-mcp-output/*", "/var/folders/*/*/*/T/playwright-mcp-output/*", "/private/var/folders/*/*/*/T/playwright-mcp-output/*", "/tmp/playwright-mcp-output/*", ) candidates: list[Path] = [] seen: set[Path] = set() for pattern in globs: for raw in Path("/").glob(pattern.lstrip("/")): if not raw.is_dir(): continue path = raw.resolve() if path in seen: continue seen.add(path) candidates.append(path) candidates.sort(key=lambda p: p.stat().st_mtime, reverse=True) return candidates def resolve_sources(raw_sources: list[str] | None) -> list[Path]: if raw_sources: return [Path(item).expanduser().resolve() for item in raw_sources] auto_sources = discover_playwright_sources() auto_sources.append((Path.cwd() / ".playwright-cli").resolve()) auto_sources.append((Path(__file__).resolve().parents[3] / ".playwright-cli").resolve()) auto_sources.append((Path.home() / "Downloads").resolve()) result: list[Path] = [] seen: set[Path] = set() for path in auto_sources: if path in seen: continue seen.add(path) result.append(path) return result def sha256_of_file(path: Path) -> str: digest = hashlib.sha256() with path.open("rb") as fh: while True: chunk = fh.read(1024 * 1024) if not chunk: break digest.update(chunk) return digest.hexdigest() def dimensions_from_sips(path: Path) -> tuple[int, int] | None: try: proc = subprocess.run( ["sips", "-g", "pixelWidth", "-g", "pixelHeight", str(path)], check=False, capture_output=True, text=True, ) except OSError: return None if proc.returncode != 0: return None width_match = re.search(r"pixelWidth:\s+(\d+)", proc.stdout) height_match = re.search(r"pixelHeight:\s+(\d+)", proc.stdout) if not width_match or not height_match: return None return int(width_match.group(1)), int(height_match.group(1)) def dimensions_from_png(path: Path) -> tuple[int, int] | None: try: with path.open("rb") as fh: header = fh.read(24) except OSError: return None if len(header) < 24 or header[:8] != b"\x89PNG\r\n\x1a\n": return None width = int.from_bytes(header[16:20], "big") height = int.from_bytes(header[20:24], "big") return width, height def read_dimensions(path: Path) -> tuple[int, int] | None: dims = dimensions_from_sips(path) if dims: return dims if path.suffix.lower() == ".png": return dimensions_from_png(path) return None def iso_ts(ts: float) -> str: return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat() def collect_candidates_all_sources( sources: list[Path], since_ts: float, allowed_ext: set[str] ) -> tuple[list[Path], list[dict[str, object]]]: tried: list[dict[str, object]] = [] merged: list[Path] = [] seen: set[Path] = set() for source in sources: files = collect_candidates(source, since_ts, allowed_ext) tried.append({"source": str(source), "matches": len(files)}) for file_path in files: resolved = file_path.resolve() if resolved in seen: continue seen.add(resolved) merged.append(file_path) merged.sort(key=lambda p: p.stat().st_mtime, reverse=True) return merged, tried def collect_existing_hashes(target: Path, allowed_ext: set[str]) -> set[str]: hashes: set[str] = set() for path in target.iterdir(): if not path.is_file(): continue ext = path.suffix.lower().lstrip(".") if ext not in allowed_ext: continue try: hashes.add(sha256_of_file(path)) except OSError: continue return hashes def write_manifest(manifest_path: Path, payload: dict[str, object]) -> None: manifest_path.parent.mkdir(parents=True, exist_ok=True) with manifest_path.open("w", encoding="utf-8") as fh: json.dump(payload, fh, ensure_ascii=False, indent=2) fh.write("\n") def main() -> int: args = parse_args() target = Path(args.target).expanduser().resolve() target.mkdir(parents=True, exist_ok=True) batch_id = args.batch_id or time.strftime("%Y%m%d-%H%M%S") manifest_path = ( Path(args.manifest).expanduser().resolve() if args.manifest else target / f"{args.prefix}-{batch_id}-manifest.json" ) allowed_ext = { ext.strip().lower().lstrip(".") for ext in args.ext.split(",") if ext.strip() } if not allowed_ext: print("No valid extensions provided.", file=sys.stderr) return 2 sources = resolve_sources(args.source) candidates, tried_sources = collect_candidates_all_sources(sources, args.since, allowed_ext) if not candidates: payload = { "status": "no_matching_files", "created_at": iso_ts(time.time()), "batch_id": batch_id, "prompt": args.prompt, "target_dir": str(target), "since_ts": args.since, "sources_tried": tried_sources, "collected_count": 0, "files": [], } write_manifest(manifest_path, payload) print("No matching files found.") print(f"MANIFEST: {manifest_path}") return 1 dedupe_target = not args.no_dedupe_target seen_hashes: set[str] = set() if dedupe_target: seen_hashes.update(collect_existing_hashes(target, allowed_ext)) files: list[dict[str, object]] = [] skipped_duplicates = 0 for src in candidates: if len(files) >= args.limit: break try: src_hash = sha256_of_file(src) except OSError: continue if src_hash in seen_hashes: skipped_duplicates += 1 continue idx = len(files) + 1 dst = target / f"{args.prefix}-{batch_id}-{idx:02d}{src.suffix.lower()}" dst = unique_path(dst) src_mtime = src.stat().st_mtime if args.move: shutil.move(str(src), str(dst)) else: shutil.copy2(str(src), str(dst)) dims = read_dimensions(dst) file_entry = { "prompt": args.prompt, "generated_at": iso_ts(src_mtime), "source_filename": src.name, "source_path": str(src.resolve()), "target_path": str(dst.resolve()), "sha256": src_hash, "width": dims[0] if dims else None, "height": dims[1] if dims else None, } files.append(file_entry) seen_hashes.add(src_hash) status = "ok" exit_code = 0 expected_count = args.expected_count if not files: status = "no_files_after_dedupe" exit_code = 1 elif expected_count is not None and len(files) < expected_count: status = "insufficient_files" exit_code = 1 payload = { "status": status, "created_at": iso_ts(time.time()), "batch_id": batch_id, "prompt": args.prompt, "target_dir": str(target), "sources_tried": tried_sources, "since_ts": args.since, "limit": args.limit, "expected_count": expected_count, "dedupe_target": dedupe_target, "skipped_duplicates": skipped_duplicates, "collected_count": len(files), "files": files, } write_manifest(manifest_path, payload) for item in files: print(item["target_path"]) print(f"MANIFEST: {manifest_path}") return exit_code if __name__ == "__main__": raise SystemExit(main())