feat(skill/gemini-music-web): 新增音乐生成与下载收集流程

This commit is contained in:
2026-03-04 02:04:48 +08:00
parent 4caa839154
commit aa72b570e1
4 changed files with 794 additions and 0 deletions
+404
View File
@@ -0,0 +1,404 @@
#!/usr/bin/env python3
"""Collect recent audio downloads into a target directory with manifest output."""
from __future__ import annotations
import argparse
import hashlib
import json
import re
import shutil
import subprocess
import sys
import time
import wave
from datetime import datetime, timezone
from pathlib import Path
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Collect recent audio downloads into a target directory."
)
parser.add_argument(
"--source",
action="append",
help=(
"Source download directory. Repeatable. "
"If omitted, auto-discovers Playwright temp downloads and then "
"falls back to ~/Downloads."
),
)
parser.add_argument(
"--target",
required=True,
help="Target directory for collected files.",
)
parser.add_argument(
"--since",
type=float,
default=time.time() - 1800,
help="Unix timestamp lower bound for file mtime. Default: now-1800s",
)
parser.add_argument(
"--ext",
default="mp3,wav,m4a,ogg,flac,aac",
help="Comma-separated file extensions to include.",
)
parser.add_argument(
"--limit",
type=int,
default=8,
help="Maximum files to collect. Default: 8",
)
parser.add_argument(
"--expected-count",
type=int,
default=None,
help="Required minimum number of collected files.",
)
parser.add_argument(
"--prefix",
default="gemini-music",
help="Filename prefix for collected files. Default: gemini-music",
)
parser.add_argument(
"--batch-id",
default=None,
help="Batch ID used in output filenames. Default: current timestamp.",
)
parser.add_argument(
"--manifest",
default=None,
help="Manifest output path. Default: <target>/<prefix>-<batch-id>-manifest.json",
)
parser.add_argument(
"--prompt",
default="",
help="Prompt text to store in manifest.",
)
parser.add_argument(
"--move",
action="store_true",
help="Move files instead of copying.",
)
parser.add_argument(
"--no-dedupe-target",
action="store_true",
help="Disable hash dedupe against existing files in target directory.",
)
return parser.parse_args()
def unique_path(path: Path) -> Path:
if not path.exists():
return path
stem = path.stem
suffix = path.suffix
parent = path.parent
idx = 2
while True:
candidate = parent / f"{stem}-{idx}{suffix}"
if not candidate.exists():
return candidate
idx += 1
def collect_candidates(source: Path, since_ts: float, allowed_ext: set[str]) -> list[Path]:
files: list[Path] = []
if not source.exists():
return files
for path in source.rglob("*"):
if not path.is_file():
continue
ext = path.suffix.lower().lstrip(".")
if ext not in allowed_ext:
continue
try:
mtime = path.stat().st_mtime
except OSError:
continue
if mtime >= since_ts:
files.append(path)
files.sort(key=lambda p: p.stat().st_mtime, reverse=True)
return files
def discover_playwright_sources() -> list[Path]:
globs = (
"/var/folders/*/*/T/playwright-mcp-output/*",
"/private/var/folders/*/*/T/playwright-mcp-output/*",
"/var/folders/*/*/*/T/playwright-mcp-output/*",
"/private/var/folders/*/*/*/T/playwright-mcp-output/*",
"/tmp/playwright-mcp-output/*",
)
candidates: list[Path] = []
seen: set[Path] = set()
for pattern in globs:
for raw in Path("/").glob(pattern.lstrip("/")):
if not raw.is_dir():
continue
path = raw.resolve()
if path in seen:
continue
seen.add(path)
candidates.append(path)
candidates.sort(key=lambda p: p.stat().st_mtime, reverse=True)
return candidates
def resolve_sources(raw_sources: list[str] | None) -> list[Path]:
if raw_sources:
return [Path(item).expanduser().resolve() for item in raw_sources]
auto_sources = discover_playwright_sources()
auto_sources.append((Path.home() / "Downloads").resolve())
result: list[Path] = []
seen: set[Path] = set()
for path in auto_sources:
if path in seen:
continue
seen.add(path)
result.append(path)
return result
def sha256_of_file(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as fh:
while True:
chunk = fh.read(1024 * 1024)
if not chunk:
break
digest.update(chunk)
return digest.hexdigest()
def read_audio_metadata(path: Path) -> tuple[float | None, int | None]:
# Prefer ffprobe for broad codec/container support.
try:
proc = subprocess.run(
[
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration,bit_rate",
"-of",
"json",
str(path),
],
check=False,
capture_output=True,
text=True,
)
except OSError:
proc = None
if proc and proc.returncode == 0:
try:
payload = json.loads(proc.stdout or "{}")
fmt = payload.get("format", {})
dur_raw = fmt.get("duration")
br_raw = fmt.get("bit_rate")
duration = float(dur_raw) if dur_raw not in (None, "") else None
bitrate_kbps = (
int(int(br_raw) / 1000) if br_raw not in (None, "") else None
)
return duration, bitrate_kbps
except (ValueError, TypeError, json.JSONDecodeError):
pass
# macOS fallback for compressed formats when ffprobe is unavailable.
try:
proc = subprocess.run(
["afinfo", str(path)],
check=False,
capture_output=True,
text=True,
)
except OSError:
proc = None
if proc and proc.returncode == 0:
duration_match = re.search(r"estimated duration:\s*([0-9.]+)\s*sec", proc.stdout)
bitrate_match = re.search(r"bit rate:\s*([0-9]+)\s*bits per second", proc.stdout)
duration = float(duration_match.group(1)) if duration_match else None
bitrate_kbps = int(int(bitrate_match.group(1)) / 1000) if bitrate_match else None
if duration is not None or bitrate_kbps is not None:
return duration, bitrate_kbps
# Fallback for WAV without external dependencies.
if path.suffix.lower() == ".wav":
try:
with wave.open(str(path), "rb") as wav_file:
frames = wav_file.getnframes()
frame_rate = wav_file.getframerate()
channels = wav_file.getnchannels()
sample_width = wav_file.getsampwidth()
duration = (frames / frame_rate) if frame_rate else None
bitrate_kbps = int((frame_rate * channels * sample_width * 8) / 1000)
return duration, bitrate_kbps
except (wave.Error, OSError, ValueError):
return None, None
return None, None
def iso_ts(ts: float) -> str:
return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
def collect_candidates_all_sources(
sources: list[Path], since_ts: float, allowed_ext: set[str]
) -> tuple[list[Path], list[dict[str, object]]]:
tried: list[dict[str, object]] = []
merged: list[Path] = []
seen: set[Path] = set()
for source in sources:
files = collect_candidates(source, since_ts, allowed_ext)
tried.append({"source": str(source), "matches": len(files)})
for file_path in files:
resolved = file_path.resolve()
if resolved in seen:
continue
seen.add(resolved)
merged.append(file_path)
merged.sort(key=lambda p: p.stat().st_mtime, reverse=True)
return merged, tried
def collect_existing_hashes(target: Path, allowed_ext: set[str]) -> set[str]:
hashes: set[str] = set()
for path in target.iterdir():
if not path.is_file():
continue
ext = path.suffix.lower().lstrip(".")
if ext not in allowed_ext:
continue
try:
hashes.add(sha256_of_file(path))
except OSError:
continue
return hashes
def write_manifest(manifest_path: Path, payload: dict[str, object]) -> None:
manifest_path.parent.mkdir(parents=True, exist_ok=True)
with manifest_path.open("w", encoding="utf-8") as fh:
json.dump(payload, fh, ensure_ascii=False, indent=2)
fh.write("\n")
def main() -> int:
args = parse_args()
target = Path(args.target).expanduser().resolve()
target.mkdir(parents=True, exist_ok=True)
batch_id = args.batch_id or time.strftime("%Y%m%d-%H%M%S")
manifest_path = (
Path(args.manifest).expanduser().resolve()
if args.manifest
else target / f"{args.prefix}-{batch_id}-manifest.json"
)
allowed_ext = {
ext.strip().lower().lstrip(".")
for ext in args.ext.split(",")
if ext.strip()
}
if not allowed_ext:
print("No valid extensions provided.", file=sys.stderr)
return 2
sources = resolve_sources(args.source)
candidates, tried_sources = collect_candidates_all_sources(sources, args.since, allowed_ext)
if not candidates:
payload = {
"status": "no_matching_files",
"created_at": iso_ts(time.time()),
"batch_id": batch_id,
"prompt": args.prompt,
"target_dir": str(target),
"since_ts": args.since,
"sources_tried": tried_sources,
"collected_count": 0,
"files": [],
}
write_manifest(manifest_path, payload)
print("No matching files found.")
print(f"MANIFEST: {manifest_path}")
return 1
dedupe_target = not args.no_dedupe_target
seen_hashes: set[str] = set()
if dedupe_target:
seen_hashes.update(collect_existing_hashes(target, allowed_ext))
files: list[dict[str, object]] = []
skipped_duplicates = 0
for src in candidates:
if len(files) >= args.limit:
break
try:
src_hash = sha256_of_file(src)
except OSError:
continue
if src_hash in seen_hashes:
skipped_duplicates += 1
continue
idx = len(files) + 1
dst = target / f"{args.prefix}-{batch_id}-{idx:02d}{src.suffix.lower()}"
dst = unique_path(dst)
src_mtime = src.stat().st_mtime
if args.move:
shutil.move(str(src), str(dst))
else:
shutil.copy2(str(src), str(dst))
duration_sec, bitrate_kbps = read_audio_metadata(dst)
file_entry = {
"prompt": args.prompt,
"generated_at": iso_ts(src_mtime),
"source_filename": src.name,
"source_path": str(src.resolve()),
"target_path": str(dst.resolve()),
"sha256": src_hash,
"file_size_bytes": dst.stat().st_size,
"duration_sec": duration_sec,
"bitrate_kbps": bitrate_kbps,
}
files.append(file_entry)
seen_hashes.add(src_hash)
status = "ok"
exit_code = 0
expected_count = args.expected_count
if not files:
status = "no_files_after_dedupe"
exit_code = 1
elif expected_count is not None and len(files) < expected_count:
status = "insufficient_files"
exit_code = 1
payload = {
"status": status,
"created_at": iso_ts(time.time()),
"batch_id": batch_id,
"prompt": args.prompt,
"target_dir": str(target),
"sources_tried": tried_sources,
"since_ts": args.since,
"limit": args.limit,
"expected_count": expected_count,
"dedupe_target": dedupe_target,
"skipped_duplicates": skipped_duplicates,
"collected_count": len(files),
"files": files,
}
write_manifest(manifest_path, payload)
for item in files:
print(item["target_path"])
print(f"MANIFEST: {manifest_path}")
return exit_code
if __name__ == "__main__":
raise SystemExit(main())
+230
View File
@@ -0,0 +1,230 @@
#!/usr/bin/env bash
set -euo pipefail
usage() {
cat <<'EOF'
Usage:
run_music_flow.sh --prompt "<text>" --target /abs/output/dir [--count N] [--session NAME] [--no-headed]
Example:
run_music_flow.sh \
--prompt "创作一段 90 BPM 的 lo-fi hiphop,温暖、夜晚、钢琴和刷镲,时长 30 秒。" \
--target /Users/xd/java/xhs/output/gemini-music \
--count 2
EOF
}
PROMPT=""
TARGET=""
COUNT=1
SESSION="gmw$(date +%s)"
HEADED=1
while [[ $# -gt 0 ]]; do
case "$1" in
--prompt)
PROMPT="${2:-}"
shift 2
;;
--target)
TARGET="${2:-}"
shift 2
;;
--count)
COUNT="${2:-1}"
shift 2
;;
--session)
SESSION="${2:-$SESSION}"
shift 2
;;
--no-headed)
HEADED=0
shift
;;
-h|--help)
usage
exit 0
;;
*)
echo "Unknown arg: $1" >&2
usage
exit 1
;;
esac
done
if [[ -z "$PROMPT" || -z "$TARGET" ]]; then
echo "Both --prompt and --target are required." >&2
usage
exit 1
fi
if ! [[ "$COUNT" =~ ^[0-9]+$ ]] || [[ "$COUNT" -lt 1 ]]; then
echo "--count must be a positive integer." >&2
exit 1
fi
CODEX_HOME="${CODEX_HOME:-$HOME/.codex}"
PWCLI="${PWCLI:-$CODEX_HOME/skills/playwright/scripts/playwright_cli.sh}"
COLLECT_SCRIPT="$(cd "$(dirname "$0")" && pwd)/collect_downloads.py"
if ! command -v npx >/dev/null 2>&1; then
echo "npx is required." >&2
exit 1
fi
if [[ ! -x "$PWCLI" ]]; then
echo "Playwright wrapper not found or not executable: $PWCLI" >&2
exit 1
fi
if [[ ! -f "$COLLECT_SCRIPT" ]]; then
echo "Collector script not found: $COLLECT_SCRIPT" >&2
exit 1
fi
pw() {
"$PWCLI" --session "$SESSION" "$@"
}
json_escape() {
python3 - "$1" <<'PY'
import json
import sys
print(json.dumps(sys.argv[1]))
PY
}
is_login_required() {
local out
out="$(
pw eval "() => {
const hasAccount = !!document.querySelector('button[aria-label*=\\\"Google 账号\\\"], button[aria-label*=\\\"Google Account\\\"]');
const hasService = !!document.querySelector('a[href*=\\\"ServiceLogin\\\"]');
const hasLoginCtl = Array.from(document.querySelectorAll('a,button')).some(el => /登录|Sign in/i.test((el.textContent || '').trim()));
return !hasAccount && (hasService || hasLoginCtl);
}"
)"
echo "$out" | rg -q '^true$'
}
enter_music_tool() {
local js
js="$(cat <<'JS'
const labels = [/创作音乐/, /制作音乐/, /Create music/i, /Music/i];
const tryCardButtons = async () => {
for (const re of labels) {
const btn = page.getByRole('button', { name: re }).first();
if (await btn.count()) {
try {
await btn.click({ timeout: 2000 });
return true;
} catch (_) {
// Overlay may intercept pointer. Fall through to menu strategy.
}
}
}
return false;
};
const tryToolMenu = async () => {
await page.getByRole('button', { name: '工具', exact: true }).click();
for (const re of labels) {
const itemCheck = page.getByRole('menuitemcheckbox', { name: re }).first();
if (await itemCheck.count()) {
await itemCheck.click();
return true;
}
const itemPlain = page.getByRole('menuitem', { name: re }).first();
if (await itemPlain.count()) {
await itemPlain.click();
return true;
}
}
return false;
};
let ok = await tryCardButtons();
if (!ok) ok = await tryToolMenu();
if (!ok) {
// Re-open the tool menu once and retry as a last attempt.
ok = await tryToolMenu();
}
if (!ok) {
throw new Error('Music tool entry not found');
}
JS
)"
pw run-code "$js" >/dev/null
}
submit_and_download_one() {
local track_prompt="$1"
local escaped
escaped="$(json_escape "$track_prompt")"
local js
js="$(cat <<JS
const prompt = $escaped;
const input = page.getByRole('textbox', { name: /为 Gemini 输入提示|Enter a prompt/i }).first();
await input.click();
await input.fill(prompt);
await input.press('Enter');
const stopBtn = page.getByRole('button', { name: /停止回答|Stop response/i }).first();
await stopBtn.waitFor({ state: 'visible', timeout: 15000 }).catch(() => {});
await stopBtn.waitFor({ state: 'hidden', timeout: 240000 });
const downloadBtn = page.getByRole('button', { name: /下载音乐作品|Download music/i }).last();
await downloadBtn.click();
const mp3Item = page.getByRole('menuitem', { name: /纯音频|MP3/i }).first();
if (await mp3Item.count()) {
await mp3Item.click();
} else {
const anyItem = page.getByRole('menuitem').first();
if (await anyItem.count()) await anyItem.click();
}
await page.waitForTimeout(1200);
JS
)"
pw run-code "$js" >/dev/null
}
mkdir -p "$TARGET"
start_ts="$(python3 - <<'PY'
import time
print(time.time())
PY
)"
if [[ "$HEADED" -eq 1 ]]; then
pw open "https://gemini.google.com/app" --headed >/dev/null
else
pw open "https://gemini.google.com/app" >/dev/null
fi
pw snapshot >/dev/null
if is_login_required; then
echo "Gemini is not logged in. Please log in at https://gemini.google.com/app and rerun." >&2
exit 2
fi
enter_music_tool
for ((i=1; i<=COUNT; i++)); do
current_prompt="$PROMPT"
if [[ "$COUNT" -gt 1 ]]; then
current_prompt="$PROMPT
变体要求:这是第 $i / $COUNT 首。保持风格一致,但旋律和节奏细节需要变化。"
fi
submit_and_download_one "$current_prompt"
done
python3 "$COLLECT_SCRIPT" \
--target "$TARGET" \
--since "$start_ts" \
--expected-count "$COUNT" \
--limit "$COUNT" \
--prefix "gemini-music" \
--prompt "$PROMPT"