feat(skill/gemini-video-web): add Gemini video creation skill

This commit is contained in:
2026-03-04 13:24:35 +08:00
parent 8fc5c6e128
commit 787a3334b6
4 changed files with 844 additions and 0 deletions
+403
View File
@@ -0,0 +1,403 @@
#!/usr/bin/env python3
"""Collect recent video downloads into a target directory with manifest output."""
from __future__ import annotations
import argparse
import hashlib
import json
import shutil
import subprocess
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Collect recent video downloads into a target directory."
)
parser.add_argument(
"--source",
action="append",
help=(
"Source download directory. Repeatable. "
"If omitted, auto-discovers Playwright temp downloads and then "
"falls back to ~/Downloads."
),
)
parser.add_argument(
"--target",
required=True,
help="Target directory for collected files.",
)
parser.add_argument(
"--since",
type=float,
default=time.time() - 1800,
help="Unix timestamp lower bound for file mtime. Default: now-1800s",
)
parser.add_argument(
"--ext",
default="mp4,mov,webm,mkv,m4v,avi",
help="Comma-separated file extensions to include.",
)
parser.add_argument(
"--limit",
type=int,
default=8,
help="Maximum files to collect. Default: 8",
)
parser.add_argument(
"--expected-count",
type=int,
default=None,
help="Required minimum number of collected files.",
)
parser.add_argument(
"--prefix",
default="gemini-video",
help="Filename prefix for collected files. Default: gemini-video",
)
parser.add_argument(
"--batch-id",
default=None,
help="Batch ID used in output filenames. Default: current timestamp.",
)
parser.add_argument(
"--manifest",
default=None,
help="Manifest output path. Default: <target>/<prefix>-<batch-id>-manifest.json",
)
parser.add_argument(
"--prompt",
default="",
help="Prompt text to store in manifest.",
)
parser.add_argument(
"--move",
action="store_true",
help="Move files instead of copying.",
)
parser.add_argument(
"--no-dedupe-target",
action="store_true",
help="Disable hash dedupe against existing files in target directory.",
)
return parser.parse_args()
def unique_path(path: Path) -> Path:
if not path.exists():
return path
stem = path.stem
suffix = path.suffix
parent = path.parent
idx = 2
while True:
candidate = parent / f"{stem}-{idx}{suffix}"
if not candidate.exists():
return candidate
idx += 1
def collect_candidates(source: Path, since_ts: float, allowed_ext: set[str]) -> list[Path]:
files: list[Path] = []
if not source.exists():
return files
for path in source.rglob("*"):
if not path.is_file():
continue
ext = path.suffix.lower().lstrip(".")
if ext not in allowed_ext:
continue
try:
mtime = path.stat().st_mtime
except OSError:
continue
if mtime >= since_ts:
files.append(path)
files.sort(key=lambda p: p.stat().st_mtime, reverse=True)
return files
def discover_playwright_sources() -> list[Path]:
globs = (
"/var/folders/*/*/T/playwright-mcp-output/*",
"/private/var/folders/*/*/T/playwright-mcp-output/*",
"/var/folders/*/*/*/T/playwright-mcp-output/*",
"/private/var/folders/*/*/*/T/playwright-mcp-output/*",
"/tmp/playwright-mcp-output/*",
)
candidates: list[Path] = []
seen: set[Path] = set()
for pattern in globs:
for raw in Path("/").glob(pattern.lstrip("/")):
if not raw.is_dir():
continue
path = raw.resolve()
if path in seen:
continue
seen.add(path)
candidates.append(path)
candidates.sort(key=lambda p: p.stat().st_mtime, reverse=True)
return candidates
def resolve_sources(raw_sources: list[str] | None) -> list[Path]:
if raw_sources:
return [Path(item).expanduser().resolve() for item in raw_sources]
auto_sources = discover_playwright_sources()
auto_sources.append((Path.cwd() / ".playwright-cli").resolve())
auto_sources.append((Path(__file__).resolve().parents[3] / ".playwright-cli").resolve())
auto_sources.append((Path.home() / "Downloads").resolve())
result: list[Path] = []
seen: set[Path] = set()
for path in auto_sources:
if path in seen:
continue
seen.add(path)
result.append(path)
return result
def sha256_of_file(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as fh:
while True:
chunk = fh.read(1024 * 1024)
if not chunk:
break
digest.update(chunk)
return digest.hexdigest()
def to_float(value: object) -> float | None:
if value in (None, ""):
return None
try:
return float(value)
except (TypeError, ValueError):
return None
def to_int(value: object) -> int | None:
if value in (None, ""):
return None
try:
return int(value)
except (TypeError, ValueError):
return None
def read_video_metadata(path: Path) -> tuple[float | None, int | None, int | None, int | None]:
try:
proc = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration,bit_rate",
"-show_entries",
"stream=width,height,bit_rate",
"-select_streams",
"v:0",
"-of",
"json",
str(path),
],
check=False,
capture_output=True,
text=True,
)
except OSError:
return None, None, None, None
if proc.returncode != 0:
return None, None, None, None
try:
payload = json.loads(proc.stdout or "{}")
except json.JSONDecodeError:
return None, None, None, None
fmt = payload.get("format", {})
streams = payload.get("streams") or []
stream0 = streams[0] if streams else {}
duration = to_float(fmt.get("duration"))
bitrate_raw = stream0.get("bit_rate") or fmt.get("bit_rate")
bitrate_kbps = None
bitrate_int = to_int(bitrate_raw)
if bitrate_int is not None:
bitrate_kbps = int(bitrate_int / 1000)
width = to_int(stream0.get("width"))
height = to_int(stream0.get("height"))
return duration, bitrate_kbps, width, height
def iso_ts(ts: float) -> str:
return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
def collect_candidates_all_sources(
sources: list[Path], since_ts: float, allowed_ext: set[str]
) -> tuple[list[Path], list[dict[str, object]]]:
tried: list[dict[str, object]] = []
merged: list[Path] = []
seen: set[Path] = set()
for source in sources:
files = collect_candidates(source, since_ts, allowed_ext)
tried.append({"source": str(source), "matches": len(files)})
for file_path in files:
resolved = file_path.resolve()
if resolved in seen:
continue
seen.add(resolved)
merged.append(file_path)
merged.sort(key=lambda p: p.stat().st_mtime, reverse=True)
return merged, tried
def collect_existing_hashes(target: Path, allowed_ext: set[str]) -> set[str]:
hashes: set[str] = set()
for path in target.iterdir():
if not path.is_file():
continue
ext = path.suffix.lower().lstrip(".")
if ext not in allowed_ext:
continue
try:
hashes.add(sha256_of_file(path))
except OSError:
continue
return hashes
def write_manifest(manifest_path: Path, payload: dict[str, object]) -> None:
manifest_path.parent.mkdir(parents=True, exist_ok=True)
with manifest_path.open("w", encoding="utf-8") as fh:
json.dump(payload, fh, ensure_ascii=False, indent=2)
fh.write("\n")
def main() -> int:
args = parse_args()
target = Path(args.target).expanduser().resolve()
target.mkdir(parents=True, exist_ok=True)
batch_id = args.batch_id or time.strftime("%Y%m%d-%H%M%S")
manifest_path = (
Path(args.manifest).expanduser().resolve()
if args.manifest
else target / f"{args.prefix}-{batch_id}-manifest.json"
)
allowed_ext = {
ext.strip().lower().lstrip(".")
for ext in args.ext.split(",")
if ext.strip()
}
if not allowed_ext:
print("No valid extensions provided.", file=sys.stderr)
return 2
sources = resolve_sources(args.source)
candidates, tried_sources = collect_candidates_all_sources(sources, args.since, allowed_ext)
if not candidates:
payload = {
"status": "no_matching_files",
"created_at": iso_ts(time.time()),
"batch_id": batch_id,
"prompt": args.prompt,
"target_dir": str(target),
"since_ts": args.since,
"sources_tried": tried_sources,
"collected_count": 0,
"files": [],
}
write_manifest(manifest_path, payload)
print("No matching files found.")
print(f"MANIFEST: {manifest_path}")
return 1
dedupe_target = not args.no_dedupe_target
seen_hashes: set[str] = set()
if dedupe_target:
seen_hashes.update(collect_existing_hashes(target, allowed_ext))
files: list[dict[str, object]] = []
skipped_duplicates = 0
for src in candidates:
if len(files) >= args.limit:
break
try:
src_hash = sha256_of_file(src)
except OSError:
continue
if src_hash in seen_hashes:
skipped_duplicates += 1
continue
idx = len(files) + 1
dst = target / f"{args.prefix}-{batch_id}-{idx:02d}{src.suffix.lower()}"
dst = unique_path(dst)
src_mtime = src.stat().st_mtime
if args.move:
shutil.move(str(src), str(dst))
else:
shutil.copy2(str(src), str(dst))
duration_sec, bitrate_kbps, width, height = read_video_metadata(dst)
file_entry = {
"prompt": args.prompt,
"generated_at": iso_ts(src_mtime),
"source_filename": src.name,
"source_path": str(src.resolve()),
"target_path": str(dst.resolve()),
"sha256": src_hash,
"file_size_bytes": dst.stat().st_size,
"duration_sec": duration_sec,
"bitrate_kbps": bitrate_kbps,
"width": width,
"height": height,
}
files.append(file_entry)
seen_hashes.add(src_hash)
status = "ok"
exit_code = 0
expected_count = args.expected_count
if not files:
status = "no_files_after_dedupe"
exit_code = 1
elif expected_count is not None and len(files) < expected_count:
status = "insufficient_files"
exit_code = 1
payload = {
"status": status,
"created_at": iso_ts(time.time()),
"batch_id": batch_id,
"prompt": args.prompt,
"target_dir": str(target),
"sources_tried": tried_sources,
"since_ts": args.since,
"limit": args.limit,
"expected_count": expected_count,
"dedupe_target": dedupe_target,
"skipped_duplicates": skipped_duplicates,
"collected_count": len(files),
"files": files,
}
write_manifest(manifest_path, payload)
for item in files:
print(item["target_path"])
print(f"MANIFEST: {manifest_path}")
return exit_code
if __name__ == "__main__":
raise SystemExit(main())
+276
View File
@@ -0,0 +1,276 @@
#!/usr/bin/env python3
"""Run Gemini video generation flow end-to-end via Playwright CLI."""
from __future__ import annotations
import argparse
import json
import os
import re
import subprocess
import sys
import time
from pathlib import Path
class FlowError(RuntimeError):
"""Raised when a subprocess command in the flow fails."""
def run_command(
cmd: list[str], *, capture_output: bool = True, check: bool = True
) -> subprocess.CompletedProcess[str]:
kwargs: dict[str, object] = {"text": True}
if capture_output:
kwargs["stdout"] = subprocess.PIPE
kwargs["stderr"] = subprocess.STDOUT
proc = subprocess.run(cmd, **kwargs)
if check and proc.returncode != 0:
output = proc.stdout if capture_output else ""
raise FlowError(
f"Command failed ({proc.returncode}): {' '.join(cmd)}\n{output}"
)
return proc
def run_pw(pw_shared: Path, *args: str) -> str:
proc = run_command([str(pw_shared), *args], capture_output=True)
return proc.stdout or ""
def is_login_required(pw_shared: Path) -> bool:
out = run_pw(
pw_shared,
"eval",
(
"() => {"
"const hasAccount = !!document.querySelector("
"'button[aria-label*=\\\"Google 账号\\\"], "
"button[aria-label*=\\\"Google Account\\\"]'"
");"
"const hasService = !!document.querySelector('a[href*=\\\"ServiceLogin\\\"]');"
"const hasLoginCtl = Array.from(document.querySelectorAll('a,button'))"
".some(el => /登录|Sign in/i.test((el.textContent || '').trim()));"
"return !hasAccount && (hasService || hasLoginCtl);"
"}"
),
)
return bool(re.search(r"(?m)^true$", out))
def enter_video_tool(pw_shared: Path) -> None:
js = r"""
async (page) => {
const labels = [/创作视频/, /制作视频/, /Create video/i, /Video/i];
const openToolMenu = async () => {
const cn = page.getByRole('button', { name: '工具', exact: true }).first();
if (await cn.count()) {
await cn.click();
return true;
}
const generic = page.getByRole('button', { name: /工具|Tools/i }).first();
if (await generic.count()) {
await generic.click();
return true;
}
return false;
};
const tryCardButtons = async () => {
for (const re of labels) {
const btn = page.getByRole('button', { name: re }).first();
if (await btn.count()) {
try {
await btn.click({ timeout: 2000 });
return true;
} catch (_) {
// Overlay may intercept pointer. Fall through to menu strategy.
}
}
}
return false;
};
const tryToolMenu = async () => {
const opened = await openToolMenu();
if (!opened) return false;
for (const re of labels) {
const itemCheck = page.getByRole('menuitemcheckbox', { name: re }).first();
if (await itemCheck.count()) {
await itemCheck.click();
return true;
}
const itemPlain = page.getByRole('menuitem', { name: re }).first();
if (await itemPlain.count()) {
await itemPlain.click();
return true;
}
}
return false;
};
let ok = await tryCardButtons();
if (!ok) ok = await tryToolMenu();
if (!ok) ok = await tryToolMenu();
if (!ok) throw new Error('Video tool entry not found');
}
"""
run_pw(pw_shared, "run-code", js)
def submit_and_download_one(pw_shared: Path, prompt: str) -> None:
js = f"""
async (page) => {{
const prompt = {json.dumps(prompt)};
const input = page.getByRole('textbox', {{ name: /为 Gemini 输入提示|Enter a prompt/i }}).first();
await input.click();
await input.fill(prompt);
await input.press('Enter');
const stopBtn = page.getByRole('button', {{ name: /停止回答|Stop response/i }}).first();
await stopBtn.waitFor({{ state: 'visible', timeout: 20000 }}).catch(() => {{}});
await stopBtn.waitFor({{ state: 'hidden', timeout: 480000 }});
const downloadBtn = page.getByRole('button', {{ name: /下载视频|下载视频作品|Download video|下载作品|Download/i }}).last();
if (!(await downloadBtn.count())) {{
throw new Error('Video download button not found');
}}
await downloadBtn.click();
const preferredItem = page.getByRole('menuitem', {{ name: /MP4|高清视频|High quality|最高质量|1080p|720p/i }}).first();
if (await preferredItem.isVisible().catch(() => false)) {{
await preferredItem.click();
}} else {{
const anyItem = page.getByRole('menuitem').first();
if (await anyItem.isVisible().catch(() => false)) {{
await anyItem.click();
}}
}}
await page.waitForTimeout(3500);
}}
"""
run_pw(pw_shared, "run-code", js)
def retry_click_latest_download(pw_shared: Path) -> None:
js = r"""
async (page) => {
const btn = page.getByRole('button', { name: /下载视频|下载视频作品|Download video|下载作品|Download/i }).last();
if (!(await btn.count())) {
throw new Error('Video download button not found for retry');
}
await btn.click();
await page.waitForTimeout(5000);
}
"""
run_pw(pw_shared, "run-code", js)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Generate videos on Gemini web and collect downloaded files."
)
parser.add_argument("--prompt", required=True, help="Prompt text for video generation.")
parser.add_argument(
"--target", required=True, help="Absolute output directory for collected files."
)
parser.add_argument(
"--count", type=int, default=1, help="Number of videos to generate. Default: 1."
)
parser.add_argument(
"--no-headed",
action="store_true",
help="Run browser without headed mode.",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
if args.count < 1:
print("--count must be a positive integer.", file=sys.stderr)
return 1
repo_root = Path(__file__).resolve().parents[3]
pw_shared = Path(
os.environ.get("PW_SHARED_WRAPPER", str(repo_root / "tools/pw"))
).expanduser()
collect_script = (Path(__file__).resolve().parent / "collect_downloads.py").resolve()
if not pw_shared.exists() or not pw_shared.is_file():
print(f"Shared Playwright wrapper not found: {pw_shared}", file=sys.stderr)
return 1
if not os.access(pw_shared, os.X_OK):
print(f"Shared Playwright wrapper is not executable: {pw_shared}", file=sys.stderr)
return 1
if not collect_script.exists():
print(f"Collector script not found: {collect_script}", file=sys.stderr)
return 1
target = Path(args.target).expanduser().resolve()
target.mkdir(parents=True, exist_ok=True)
start_ts = time.time()
try:
os.environ["PLAYWRIGHT_SHARED_INIT_MODE"] = (
"headless" if args.no_headed else "headed"
)
run_pw(pw_shared, "snapshot")
run_pw(pw_shared, "goto", "https://gemini.google.com/app")
run_pw(pw_shared, "snapshot")
if is_login_required(pw_shared):
print(
"Gemini is not logged in. Please log in at https://gemini.google.com/app and rerun.",
file=sys.stderr,
)
return 2
enter_video_tool(pw_shared)
for i in range(1, args.count + 1):
current_prompt = args.prompt
if args.count > 1:
current_prompt = (
f"{args.prompt}\n"
f"变体要求:这是第 {i} / {args.count} 条视频。保持主题一致,但镜头和节奏细节需要变化。"
)
submit_and_download_one(pw_shared, current_prompt)
collect_cmd = [
sys.executable,
str(collect_script),
"--target",
str(target),
"--since",
str(start_ts),
"--expected-count",
str(args.count),
"--limit",
str(args.count),
"--prefix",
"gemini-video",
"--prompt",
args.prompt,
]
proc = run_command(collect_cmd, capture_output=False, check=False)
if proc.returncode == 0:
return 0
# Fallback: click latest video download button once and retry collection.
try:
retry_click_latest_download(pw_shared)
except FlowError:
return proc.returncode
retry_proc = run_command(collect_cmd, capture_output=False, check=False)
return retry_proc.returncode
except FlowError as exc:
print(str(exc), file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())