233 lines
7.1 KiB
Python
Executable File
233 lines
7.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Run Gemini music generation flow end-to-end via Playwright CLI."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
|
|
class FlowError(RuntimeError):
|
|
"""Raised when a subprocess command in the flow fails."""
|
|
|
|
|
|
def run_command(
|
|
cmd: list[str], *, capture_output: bool = True, check: bool = True
|
|
) -> subprocess.CompletedProcess[str]:
|
|
kwargs: dict[str, object] = {"text": True}
|
|
if capture_output:
|
|
kwargs["stdout"] = subprocess.PIPE
|
|
kwargs["stderr"] = subprocess.STDOUT
|
|
proc = subprocess.run(cmd, **kwargs)
|
|
if check and proc.returncode != 0:
|
|
output = proc.stdout if capture_output else ""
|
|
raise FlowError(
|
|
f"Command failed ({proc.returncode}): {' '.join(cmd)}\n{output}"
|
|
)
|
|
return proc
|
|
|
|
|
|
def run_pw(pw_shared: Path, *args: str) -> str:
|
|
proc = run_command([str(pw_shared), *args], capture_output=True)
|
|
return proc.stdout or ""
|
|
|
|
|
|
def is_login_required(pw_shared: Path) -> bool:
|
|
out = run_pw(
|
|
pw_shared,
|
|
"eval",
|
|
(
|
|
"() => {"
|
|
"const hasAccount = !!document.querySelector("
|
|
"'button[aria-label*=\\\"Google 账号\\\"], "
|
|
"button[aria-label*=\\\"Google Account\\\"]'"
|
|
");"
|
|
"const hasService = !!document.querySelector('a[href*=\\\"ServiceLogin\\\"]');"
|
|
"const hasLoginCtl = Array.from(document.querySelectorAll('a,button'))"
|
|
".some(el => /登录|Sign in/i.test((el.textContent || '').trim()));"
|
|
"return !hasAccount && (hasService || hasLoginCtl);"
|
|
"}"
|
|
),
|
|
)
|
|
return bool(re.search(r"(?m)^true$", out))
|
|
|
|
|
|
def enter_music_tool(pw_shared: Path) -> None:
|
|
js = r"""
|
|
async (page) => {
|
|
const labels = [/创作音乐/, /制作音乐/, /Create music/i, /Music/i];
|
|
|
|
const tryCardButtons = async () => {
|
|
for (const re of labels) {
|
|
const btn = page.getByRole('button', { name: re }).first();
|
|
if (await btn.count()) {
|
|
try {
|
|
await btn.click({ timeout: 2000 });
|
|
return true;
|
|
} catch (_) {
|
|
// Overlay may intercept pointer. Fall through to menu strategy.
|
|
}
|
|
}
|
|
}
|
|
return false;
|
|
};
|
|
|
|
const tryToolMenu = async () => {
|
|
await page.getByRole('button', { name: '工具', exact: true }).click();
|
|
for (const re of labels) {
|
|
const itemCheck = page.getByRole('menuitemcheckbox', { name: re }).first();
|
|
if (await itemCheck.count()) {
|
|
await itemCheck.click();
|
|
return true;
|
|
}
|
|
const itemPlain = page.getByRole('menuitem', { name: re }).first();
|
|
if (await itemPlain.count()) {
|
|
await itemPlain.click();
|
|
return true;
|
|
}
|
|
}
|
|
return false;
|
|
};
|
|
|
|
let ok = await tryCardButtons();
|
|
if (!ok) ok = await tryToolMenu();
|
|
if (!ok) ok = await tryToolMenu();
|
|
if (!ok) throw new Error('Music tool entry not found');
|
|
}
|
|
"""
|
|
run_pw(pw_shared, "run-code", js)
|
|
|
|
|
|
def submit_and_download_one(pw_shared: Path, prompt: str) -> None:
|
|
js = f"""
|
|
async (page) => {{
|
|
const prompt = {json.dumps(prompt)};
|
|
const input = page.getByRole('textbox', {{ name: /为 Gemini 输入提示|Enter a prompt/i }}).first();
|
|
await input.click();
|
|
await input.fill(prompt);
|
|
await input.press('Enter');
|
|
|
|
const stopBtn = page.getByRole('button', {{ name: /停止回答|Stop response/i }}).first();
|
|
await stopBtn.waitFor({{ state: 'visible', timeout: 15000 }}).catch(() => {{}});
|
|
await stopBtn.waitFor({{ state: 'hidden', timeout: 240000 }});
|
|
|
|
const downloadBtn = page.getByRole('button', {{ name: /下载音乐作品|Download music/i }}).last();
|
|
await downloadBtn.click();
|
|
|
|
const mp3Item = page.getByRole('menuitem', {{ name: /纯音频|MP3/i }}).first();
|
|
if (await mp3Item.count()) {{
|
|
await mp3Item.click();
|
|
}} else {{
|
|
const anyItem = page.getByRole('menuitem').first();
|
|
if (await anyItem.count()) await anyItem.click();
|
|
}}
|
|
|
|
await page.waitForTimeout(1200);
|
|
}}
|
|
"""
|
|
run_pw(pw_shared, "run-code", js)
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Generate music on Gemini web and collect downloaded files."
|
|
)
|
|
parser.add_argument("--prompt", required=True, help="Prompt text for music generation.")
|
|
parser.add_argument(
|
|
"--target", required=True, help="Absolute output directory for collected files."
|
|
)
|
|
parser.add_argument(
|
|
"--count", type=int, default=1, help="Number of tracks to generate. Default: 1."
|
|
)
|
|
parser.add_argument(
|
|
"--no-headed",
|
|
action="store_true",
|
|
help="Run browser without headed mode.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
if args.count < 1:
|
|
print("--count must be a positive integer.", file=sys.stderr)
|
|
return 1
|
|
|
|
repo_root = Path(__file__).resolve().parents[3]
|
|
pw_shared = Path(
|
|
os.environ.get("PW_SHARED_WRAPPER", str(repo_root / "tools/pw"))
|
|
).expanduser()
|
|
collect_script = (Path(__file__).resolve().parent / "collect_downloads.py").resolve()
|
|
|
|
if not pw_shared.exists() or not pw_shared.is_file():
|
|
print(f"Shared Playwright wrapper not found: {pw_shared}", file=sys.stderr)
|
|
return 1
|
|
if not os.access(pw_shared, os.X_OK):
|
|
print(f"Shared Playwright wrapper is not executable: {pw_shared}", file=sys.stderr)
|
|
return 1
|
|
if not collect_script.exists():
|
|
print(f"Collector script not found: {collect_script}", file=sys.stderr)
|
|
return 1
|
|
|
|
target = Path(args.target).expanduser().resolve()
|
|
target.mkdir(parents=True, exist_ok=True)
|
|
start_ts = time.time()
|
|
|
|
try:
|
|
os.environ["PLAYWRIGHT_SHARED_INIT_MODE"] = (
|
|
"headless" if args.no_headed else "headed"
|
|
)
|
|
run_pw(pw_shared, "snapshot")
|
|
run_pw(pw_shared, "goto", "https://gemini.google.com/app")
|
|
run_pw(pw_shared, "snapshot")
|
|
|
|
if is_login_required(pw_shared):
|
|
print(
|
|
"Gemini is not logged in. Please log in at https://gemini.google.com/app and rerun.",
|
|
file=sys.stderr,
|
|
)
|
|
return 2
|
|
|
|
enter_music_tool(pw_shared)
|
|
|
|
for i in range(1, args.count + 1):
|
|
current_prompt = args.prompt
|
|
if args.count > 1:
|
|
current_prompt = (
|
|
f"{args.prompt}\n"
|
|
f"变体要求:这是第 {i} / {args.count} 首。保持风格一致,但旋律和节奏细节需要变化。"
|
|
)
|
|
submit_and_download_one(pw_shared, current_prompt)
|
|
|
|
collect_cmd = [
|
|
sys.executable,
|
|
str(collect_script),
|
|
"--target",
|
|
str(target),
|
|
"--since",
|
|
str(start_ts),
|
|
"--expected-count",
|
|
str(args.count),
|
|
"--limit",
|
|
str(args.count),
|
|
"--prefix",
|
|
"gemini-music",
|
|
"--prompt",
|
|
args.prompt,
|
|
]
|
|
proc = run_command(collect_cmd, capture_output=False, check=False)
|
|
return proc.returncode
|
|
except FlowError as exc:
|
|
print(str(exc), file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|