#!/usr/bin/env python3 """Collect recent audio downloads into a target directory with manifest output.""" from __future__ import annotations import argparse import hashlib import json import re import shutil import subprocess import sys import time import wave from datetime import datetime, timezone from pathlib import Path def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Collect recent audio downloads into a target directory." ) parser.add_argument( "--source", action="append", help=( "Source download directory. Repeatable. " "If omitted, auto-discovers Playwright temp downloads and then " "falls back to ~/Downloads." ), ) parser.add_argument( "--target", required=True, help="Target directory for collected files.", ) parser.add_argument( "--since", type=float, default=time.time() - 1800, help="Unix timestamp lower bound for file mtime. Default: now-1800s", ) parser.add_argument( "--ext", default="mp3,wav,m4a,ogg,flac,aac", help="Comma-separated file extensions to include.", ) parser.add_argument( "--limit", type=int, default=8, help="Maximum files to collect. Default: 8", ) parser.add_argument( "--expected-count", type=int, default=None, help="Required minimum number of collected files.", ) parser.add_argument( "--prefix", default="gemini-music", help="Filename prefix for collected files. Default: gemini-music", ) parser.add_argument( "--batch-id", default=None, help="Batch ID used in output filenames. Default: current timestamp.", ) parser.add_argument( "--manifest", default=None, help="Manifest output path. Default: /--manifest.json", ) parser.add_argument( "--prompt", default="", help="Prompt text to store in manifest.", ) parser.add_argument( "--move", action="store_true", help="Move files instead of copying.", ) parser.add_argument( "--no-dedupe-target", action="store_true", help="Disable hash dedupe against existing files in target directory.", ) return parser.parse_args() def unique_path(path: Path) -> Path: if not path.exists(): return path stem = path.stem suffix = path.suffix parent = path.parent idx = 2 while True: candidate = parent / f"{stem}-{idx}{suffix}" if not candidate.exists(): return candidate idx += 1 def collect_candidates(source: Path, since_ts: float, allowed_ext: set[str]) -> list[Path]: files: list[Path] = [] if not source.exists(): return files for path in source.rglob("*"): if not path.is_file(): continue ext = path.suffix.lower().lstrip(".") if ext not in allowed_ext: continue try: mtime = path.stat().st_mtime except OSError: continue if mtime >= since_ts: files.append(path) files.sort(key=lambda p: p.stat().st_mtime, reverse=True) return files def discover_playwright_sources() -> list[Path]: globs = ( "/var/folders/*/*/T/playwright-mcp-output/*", "/private/var/folders/*/*/T/playwright-mcp-output/*", "/var/folders/*/*/*/T/playwright-mcp-output/*", "/private/var/folders/*/*/*/T/playwright-mcp-output/*", "/tmp/playwright-mcp-output/*", ) candidates: list[Path] = [] seen: set[Path] = set() for pattern in globs: for raw in Path("/").glob(pattern.lstrip("/")): if not raw.is_dir(): continue path = raw.resolve() if path in seen: continue seen.add(path) candidates.append(path) candidates.sort(key=lambda p: p.stat().st_mtime, reverse=True) return candidates def resolve_sources(raw_sources: list[str] | None) -> list[Path]: if raw_sources: return [Path(item).expanduser().resolve() for item in raw_sources] auto_sources = discover_playwright_sources() auto_sources.append((Path.cwd() / ".playwright-cli").resolve()) auto_sources.append((Path(__file__).resolve().parents[3] / ".playwright-cli").resolve()) auto_sources.append((Path.home() / "Downloads").resolve()) result: list[Path] = [] seen: set[Path] = set() for path in auto_sources: if path in seen: continue seen.add(path) result.append(path) return result def sha256_of_file(path: Path) -> str: digest = hashlib.sha256() with path.open("rb") as fh: while True: chunk = fh.read(1024 * 1024) if not chunk: break digest.update(chunk) return digest.hexdigest() def read_audio_metadata(path: Path) -> tuple[float | None, int | None]: # Prefer ffprobe for broad codec/container support. try: proc = subprocess.run( [ "ffprobe", "-v", "error", "-show_entries", "format=duration,bit_rate", "-of", "json", str(path), ], check=False, capture_output=True, text=True, ) except OSError: proc = None if proc and proc.returncode == 0: try: payload = json.loads(proc.stdout or "{}") fmt = payload.get("format", {}) dur_raw = fmt.get("duration") br_raw = fmt.get("bit_rate") duration = float(dur_raw) if dur_raw not in (None, "") else None bitrate_kbps = ( int(int(br_raw) / 1000) if br_raw not in (None, "") else None ) return duration, bitrate_kbps except (ValueError, TypeError, json.JSONDecodeError): pass # macOS fallback for compressed formats when ffprobe is unavailable. try: proc = subprocess.run( ["afinfo", str(path)], check=False, capture_output=True, text=True, ) except OSError: proc = None if proc and proc.returncode == 0: duration_match = re.search(r"estimated duration:\s*([0-9.]+)\s*sec", proc.stdout) bitrate_match = re.search(r"bit rate:\s*([0-9]+)\s*bits per second", proc.stdout) duration = float(duration_match.group(1)) if duration_match else None bitrate_kbps = int(int(bitrate_match.group(1)) / 1000) if bitrate_match else None if duration is not None or bitrate_kbps is not None: return duration, bitrate_kbps # Fallback for WAV without external dependencies. if path.suffix.lower() == ".wav": try: with wave.open(str(path), "rb") as wav_file: frames = wav_file.getnframes() frame_rate = wav_file.getframerate() channels = wav_file.getnchannels() sample_width = wav_file.getsampwidth() duration = (frames / frame_rate) if frame_rate else None bitrate_kbps = int((frame_rate * channels * sample_width * 8) / 1000) return duration, bitrate_kbps except (wave.Error, OSError, ValueError): return None, None return None, None def iso_ts(ts: float) -> str: return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat() def collect_candidates_all_sources( sources: list[Path], since_ts: float, allowed_ext: set[str] ) -> tuple[list[Path], list[dict[str, object]]]: tried: list[dict[str, object]] = [] merged: list[Path] = [] seen: set[Path] = set() for source in sources: files = collect_candidates(source, since_ts, allowed_ext) tried.append({"source": str(source), "matches": len(files)}) for file_path in files: resolved = file_path.resolve() if resolved in seen: continue seen.add(resolved) merged.append(file_path) merged.sort(key=lambda p: p.stat().st_mtime, reverse=True) return merged, tried def collect_existing_hashes(target: Path, allowed_ext: set[str]) -> set[str]: hashes: set[str] = set() for path in target.iterdir(): if not path.is_file(): continue ext = path.suffix.lower().lstrip(".") if ext not in allowed_ext: continue try: hashes.add(sha256_of_file(path)) except OSError: continue return hashes def write_manifest(manifest_path: Path, payload: dict[str, object]) -> None: manifest_path.parent.mkdir(parents=True, exist_ok=True) with manifest_path.open("w", encoding="utf-8") as fh: json.dump(payload, fh, ensure_ascii=False, indent=2) fh.write("\n") def main() -> int: args = parse_args() target = Path(args.target).expanduser().resolve() target.mkdir(parents=True, exist_ok=True) batch_id = args.batch_id or time.strftime("%Y%m%d-%H%M%S") manifest_path = ( Path(args.manifest).expanduser().resolve() if args.manifest else target / f"{args.prefix}-{batch_id}-manifest.json" ) allowed_ext = { ext.strip().lower().lstrip(".") for ext in args.ext.split(",") if ext.strip() } if not allowed_ext: print("No valid extensions provided.", file=sys.stderr) return 2 sources = resolve_sources(args.source) candidates, tried_sources = collect_candidates_all_sources(sources, args.since, allowed_ext) if not candidates: payload = { "status": "no_matching_files", "created_at": iso_ts(time.time()), "batch_id": batch_id, "prompt": args.prompt, "target_dir": str(target), "since_ts": args.since, "sources_tried": tried_sources, "collected_count": 0, "files": [], } write_manifest(manifest_path, payload) print("No matching files found.") print(f"MANIFEST: {manifest_path}") return 1 dedupe_target = not args.no_dedupe_target seen_hashes: set[str] = set() if dedupe_target: seen_hashes.update(collect_existing_hashes(target, allowed_ext)) files: list[dict[str, object]] = [] skipped_duplicates = 0 for src in candidates: if len(files) >= args.limit: break try: src_hash = sha256_of_file(src) except OSError: continue if src_hash in seen_hashes: skipped_duplicates += 1 continue idx = len(files) + 1 dst = target / f"{args.prefix}-{batch_id}-{idx:02d}{src.suffix.lower()}" dst = unique_path(dst) src_mtime = src.stat().st_mtime if args.move: shutil.move(str(src), str(dst)) else: shutil.copy2(str(src), str(dst)) duration_sec, bitrate_kbps = read_audio_metadata(dst) file_entry = { "prompt": args.prompt, "generated_at": iso_ts(src_mtime), "source_filename": src.name, "source_path": str(src.resolve()), "target_path": str(dst.resolve()), "sha256": src_hash, "file_size_bytes": dst.stat().st_size, "duration_sec": duration_sec, "bitrate_kbps": bitrate_kbps, } files.append(file_entry) seen_hashes.add(src_hash) status = "ok" exit_code = 0 expected_count = args.expected_count if not files: status = "no_files_after_dedupe" exit_code = 1 elif expected_count is not None and len(files) < expected_count: status = "insufficient_files" exit_code = 1 payload = { "status": status, "created_at": iso_ts(time.time()), "batch_id": batch_id, "prompt": args.prompt, "target_dir": str(target), "sources_tried": tried_sources, "since_ts": args.since, "limit": args.limit, "expected_count": expected_count, "dedupe_target": dedupe_target, "skipped_duplicates": skipped_duplicates, "collected_count": len(files), "files": files, } write_manifest(manifest_path, payload) for item in files: print(item["target_path"]) print(f"MANIFEST: {manifest_path}") return exit_code if __name__ == "__main__": raise SystemExit(main())