#!/usr/bin/env python3 """Collect recent video downloads into a target directory with manifest output.""" from __future__ import annotations import argparse import hashlib import json import shutil import subprocess import sys import time from datetime import datetime, timezone from pathlib import Path def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Collect recent video downloads into a target directory." ) parser.add_argument( "--source", action="append", help=( "Source download directory. Repeatable. " "If omitted, auto-discovers Playwright temp downloads and then " "falls back to ~/Downloads." ), ) parser.add_argument( "--target", required=True, help="Target directory for collected files.", ) parser.add_argument( "--since", type=float, default=time.time() - 1800, help="Unix timestamp lower bound for file mtime. Default: now-1800s", ) parser.add_argument( "--ext", default="mp4,mov,webm,mkv,m4v,avi", help="Comma-separated file extensions to include.", ) parser.add_argument( "--limit", type=int, default=8, help="Maximum files to collect. Default: 8", ) parser.add_argument( "--expected-count", type=int, default=None, help="Required minimum number of collected files.", ) parser.add_argument( "--prefix", default="gemini-video", help="Filename prefix for collected files. Default: gemini-video", ) parser.add_argument( "--batch-id", default=None, help="Batch ID used in output filenames. Default: current timestamp.", ) parser.add_argument( "--manifest", default=None, help="Manifest output path. Default: /--manifest.json", ) parser.add_argument( "--prompt", default="", help="Prompt text to store in manifest.", ) parser.add_argument( "--move", action="store_true", help="Move files instead of copying.", ) parser.add_argument( "--no-dedupe-target", action="store_true", help="Disable hash dedupe against existing files in target directory.", ) return parser.parse_args() def unique_path(path: Path) -> Path: if not path.exists(): return path stem = path.stem suffix = path.suffix parent = path.parent idx = 2 while True: candidate = parent / f"{stem}-{idx}{suffix}" if not candidate.exists(): return candidate idx += 1 def collect_candidates(source: Path, since_ts: float, allowed_ext: set[str]) -> list[Path]: files: list[Path] = [] if not source.exists(): return files for path in source.rglob("*"): if not path.is_file(): continue ext = path.suffix.lower().lstrip(".") if ext not in allowed_ext: continue try: mtime = path.stat().st_mtime except OSError: continue if mtime >= since_ts: files.append(path) files.sort(key=lambda p: p.stat().st_mtime, reverse=True) return files def discover_playwright_sources() -> list[Path]: globs = ( "/var/folders/*/*/T/playwright-mcp-output/*", "/private/var/folders/*/*/T/playwright-mcp-output/*", "/var/folders/*/*/*/T/playwright-mcp-output/*", "/private/var/folders/*/*/*/T/playwright-mcp-output/*", "/tmp/playwright-mcp-output/*", ) candidates: list[Path] = [] seen: set[Path] = set() for pattern in globs: for raw in Path("/").glob(pattern.lstrip("/")): if not raw.is_dir(): continue path = raw.resolve() if path in seen: continue seen.add(path) candidates.append(path) candidates.sort(key=lambda p: p.stat().st_mtime, reverse=True) return candidates def resolve_sources(raw_sources: list[str] | None) -> list[Path]: if raw_sources: return [Path(item).expanduser().resolve() for item in raw_sources] auto_sources = discover_playwright_sources() auto_sources.append((Path.cwd() / ".playwright-cli").resolve()) auto_sources.append((Path(__file__).resolve().parents[3] / ".playwright-cli").resolve()) auto_sources.append((Path.home() / "Downloads").resolve()) result: list[Path] = [] seen: set[Path] = set() for path in auto_sources: if path in seen: continue seen.add(path) result.append(path) return result def sha256_of_file(path: Path) -> str: digest = hashlib.sha256() with path.open("rb") as fh: while True: chunk = fh.read(1024 * 1024) if not chunk: break digest.update(chunk) return digest.hexdigest() def to_float(value: object) -> float | None: if value in (None, ""): return None try: return float(value) except (TypeError, ValueError): return None def to_int(value: object) -> int | None: if value in (None, ""): return None try: return int(value) except (TypeError, ValueError): return None def read_video_metadata(path: Path) -> tuple[float | None, int | None, int | None, int | None]: try: proc = subprocess.run( [ "ffprobe", "-v", "error", "-show_entries", "format=duration,bit_rate", "-show_entries", "stream=width,height,bit_rate", "-select_streams", "v:0", "-of", "json", str(path), ], check=False, capture_output=True, text=True, ) except OSError: return None, None, None, None if proc.returncode != 0: return None, None, None, None try: payload = json.loads(proc.stdout or "{}") except json.JSONDecodeError: return None, None, None, None fmt = payload.get("format", {}) streams = payload.get("streams") or [] stream0 = streams[0] if streams else {} duration = to_float(fmt.get("duration")) bitrate_raw = stream0.get("bit_rate") or fmt.get("bit_rate") bitrate_kbps = None bitrate_int = to_int(bitrate_raw) if bitrate_int is not None: bitrate_kbps = int(bitrate_int / 1000) width = to_int(stream0.get("width")) height = to_int(stream0.get("height")) return duration, bitrate_kbps, width, height def iso_ts(ts: float) -> str: return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat() def collect_candidates_all_sources( sources: list[Path], since_ts: float, allowed_ext: set[str] ) -> tuple[list[Path], list[dict[str, object]]]: tried: list[dict[str, object]] = [] merged: list[Path] = [] seen: set[Path] = set() for source in sources: files = collect_candidates(source, since_ts, allowed_ext) tried.append({"source": str(source), "matches": len(files)}) for file_path in files: resolved = file_path.resolve() if resolved in seen: continue seen.add(resolved) merged.append(file_path) merged.sort(key=lambda p: p.stat().st_mtime, reverse=True) return merged, tried def collect_existing_hashes(target: Path, allowed_ext: set[str]) -> set[str]: hashes: set[str] = set() for path in target.iterdir(): if not path.is_file(): continue ext = path.suffix.lower().lstrip(".") if ext not in allowed_ext: continue try: hashes.add(sha256_of_file(path)) except OSError: continue return hashes def write_manifest(manifest_path: Path, payload: dict[str, object]) -> None: manifest_path.parent.mkdir(parents=True, exist_ok=True) with manifest_path.open("w", encoding="utf-8") as fh: json.dump(payload, fh, ensure_ascii=False, indent=2) fh.write("\n") def main() -> int: args = parse_args() target = Path(args.target).expanduser().resolve() target.mkdir(parents=True, exist_ok=True) batch_id = args.batch_id or time.strftime("%Y%m%d-%H%M%S") manifest_path = ( Path(args.manifest).expanduser().resolve() if args.manifest else target / f"{args.prefix}-{batch_id}-manifest.json" ) allowed_ext = { ext.strip().lower().lstrip(".") for ext in args.ext.split(",") if ext.strip() } if not allowed_ext: print("No valid extensions provided.", file=sys.stderr) return 2 sources = resolve_sources(args.source) candidates, tried_sources = collect_candidates_all_sources(sources, args.since, allowed_ext) if not candidates: payload = { "status": "no_matching_files", "created_at": iso_ts(time.time()), "batch_id": batch_id, "prompt": args.prompt, "target_dir": str(target), "since_ts": args.since, "sources_tried": tried_sources, "collected_count": 0, "files": [], } write_manifest(manifest_path, payload) print("No matching files found.") print(f"MANIFEST: {manifest_path}") return 1 dedupe_target = not args.no_dedupe_target seen_hashes: set[str] = set() if dedupe_target: seen_hashes.update(collect_existing_hashes(target, allowed_ext)) files: list[dict[str, object]] = [] skipped_duplicates = 0 for src in candidates: if len(files) >= args.limit: break try: src_hash = sha256_of_file(src) except OSError: continue if src_hash in seen_hashes: skipped_duplicates += 1 continue idx = len(files) + 1 dst = target / f"{args.prefix}-{batch_id}-{idx:02d}{src.suffix.lower()}" dst = unique_path(dst) src_mtime = src.stat().st_mtime if args.move: shutil.move(str(src), str(dst)) else: shutil.copy2(str(src), str(dst)) duration_sec, bitrate_kbps, width, height = read_video_metadata(dst) file_entry = { "prompt": args.prompt, "generated_at": iso_ts(src_mtime), "source_filename": src.name, "source_path": str(src.resolve()), "target_path": str(dst.resolve()), "sha256": src_hash, "file_size_bytes": dst.stat().st_size, "duration_sec": duration_sec, "bitrate_kbps": bitrate_kbps, "width": width, "height": height, } files.append(file_entry) seen_hashes.add(src_hash) status = "ok" exit_code = 0 expected_count = args.expected_count if not files: status = "no_files_after_dedupe" exit_code = 1 elif expected_count is not None and len(files) < expected_count: status = "insufficient_files" exit_code = 1 payload = { "status": status, "created_at": iso_ts(time.time()), "batch_id": batch_id, "prompt": args.prompt, "target_dir": str(target), "sources_tried": tried_sources, "since_ts": args.since, "limit": args.limit, "expected_count": expected_count, "dedupe_target": dedupe_target, "skipped_duplicates": skipped_duplicates, "collected_count": len(files), "files": files, } write_manifest(manifest_path, payload) for item in files: print(item["target_path"]) print(f"MANIFEST: {manifest_path}") return exit_code if __name__ == "__main__": raise SystemExit(main())