#!/usr/bin/env python3 """Run Gemini music generation flow end-to-end via Playwright CLI.""" from __future__ import annotations import argparse import json import os import re import subprocess import sys import time from pathlib import Path class FlowError(RuntimeError): """Raised when a subprocess command in the flow fails.""" def run_command( cmd: list[str], *, capture_output: bool = True, check: bool = True ) -> subprocess.CompletedProcess[str]: kwargs: dict[str, object] = {"text": True} if capture_output: kwargs["stdout"] = subprocess.PIPE kwargs["stderr"] = subprocess.STDOUT proc = subprocess.run(cmd, **kwargs) if check and proc.returncode != 0: output = proc.stdout if capture_output else "" raise FlowError( f"Command failed ({proc.returncode}): {' '.join(cmd)}\n{output}" ) return proc def run_pw(pw_shared: Path, *args: str) -> str: proc = run_command([str(pw_shared), *args], capture_output=True) return proc.stdout or "" def is_login_required(pw_shared: Path) -> bool: out = run_pw( pw_shared, "eval", ( "() => {" "const hasAccount = !!document.querySelector(" "'button[aria-label*=\\\"Google 账号\\\"], " "button[aria-label*=\\\"Google Account\\\"]'" ");" "const hasService = !!document.querySelector('a[href*=\\\"ServiceLogin\\\"]');" "const hasLoginCtl = Array.from(document.querySelectorAll('a,button'))" ".some(el => /登录|Sign in/i.test((el.textContent || '').trim()));" "return !hasAccount && (hasService || hasLoginCtl);" "}" ), ) return bool(re.search(r"(?m)^true$", out)) def enter_music_tool(pw_shared: Path) -> None: js = r""" async (page) => { const labels = [/创作音乐/, /制作音乐/, /Create music/i, /Music/i]; const tryCardButtons = async () => { for (const re of labels) { const btn = page.getByRole('button', { name: re }).first(); if (await btn.count()) { try { await btn.click({ timeout: 2000 }); return true; } catch (_) { // Overlay may intercept pointer. Fall through to menu strategy. } } } return false; }; const tryToolMenu = async () => { await page.getByRole('button', { name: '工具', exact: true }).click(); for (const re of labels) { const itemCheck = page.getByRole('menuitemcheckbox', { name: re }).first(); if (await itemCheck.count()) { await itemCheck.click(); return true; } const itemPlain = page.getByRole('menuitem', { name: re }).first(); if (await itemPlain.count()) { await itemPlain.click(); return true; } } return false; }; let ok = await tryCardButtons(); if (!ok) ok = await tryToolMenu(); if (!ok) ok = await tryToolMenu(); if (!ok) throw new Error('Music tool entry not found'); } """ run_pw(pw_shared, "run-code", js) def submit_and_download_one(pw_shared: Path, prompt: str) -> None: js = f""" async (page) => {{ const prompt = {json.dumps(prompt)}; const input = page.getByRole('textbox', {{ name: /为 Gemini 输入提示|Enter a prompt/i }}).first(); await input.click(); await input.fill(prompt); await input.press('Enter'); const stopBtn = page.getByRole('button', {{ name: /停止回答|Stop response/i }}).first(); await stopBtn.waitFor({{ state: 'visible', timeout: 15000 }}).catch(() => {{}}); await stopBtn.waitFor({{ state: 'hidden', timeout: 240000 }}); const downloadBtn = page.getByRole('button', {{ name: /下载音乐作品|Download music/i }}).last(); await downloadBtn.click(); const mp3Item = page.getByRole('menuitem', {{ name: /纯音频|MP3/i }}).first(); if (await mp3Item.count()) {{ await mp3Item.click(); }} else {{ const anyItem = page.getByRole('menuitem').first(); if (await anyItem.count()) await anyItem.click(); }} await page.waitForTimeout(1200); }} """ run_pw(pw_shared, "run-code", js) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Generate music on Gemini web and collect downloaded files." ) parser.add_argument("--prompt", required=True, help="Prompt text for music generation.") parser.add_argument( "--target", required=True, help="Absolute output directory for collected files." ) parser.add_argument( "--count", type=int, default=1, help="Number of tracks to generate. Default: 1." ) parser.add_argument( "--no-headed", action="store_true", help="Run browser without headed mode.", ) return parser.parse_args() def main() -> int: args = parse_args() if args.count < 1: print("--count must be a positive integer.", file=sys.stderr) return 1 repo_root = Path(__file__).resolve().parents[3] pw_shared = Path( os.environ.get("PW_SHARED_WRAPPER", str(repo_root / "tools/pw")) ).expanduser() collect_script = (Path(__file__).resolve().parent / "collect_downloads.py").resolve() if not pw_shared.exists() or not pw_shared.is_file(): print(f"Shared Playwright wrapper not found: {pw_shared}", file=sys.stderr) return 1 if not os.access(pw_shared, os.X_OK): print(f"Shared Playwright wrapper is not executable: {pw_shared}", file=sys.stderr) return 1 if not collect_script.exists(): print(f"Collector script not found: {collect_script}", file=sys.stderr) return 1 target = Path(args.target).expanduser().resolve() target.mkdir(parents=True, exist_ok=True) start_ts = time.time() try: os.environ["PLAYWRIGHT_SHARED_INIT_MODE"] = ( "headless" if args.no_headed else "headed" ) run_pw(pw_shared, "snapshot") run_pw(pw_shared, "goto", "https://gemini.google.com/app") run_pw(pw_shared, "snapshot") if is_login_required(pw_shared): print( "Gemini is not logged in. Please log in at https://gemini.google.com/app and rerun.", file=sys.stderr, ) return 2 enter_music_tool(pw_shared) for i in range(1, args.count + 1): current_prompt = args.prompt if args.count > 1: current_prompt = ( f"{args.prompt}\n" f"变体要求:这是第 {i} / {args.count} 首。保持风格一致,但旋律和节奏细节需要变化。" ) submit_and_download_one(pw_shared, current_prompt) collect_cmd = [ sys.executable, str(collect_script), "--target", str(target), "--since", str(start_ts), "--expected-count", str(args.count), "--limit", str(args.count), "--prefix", "gemini-music", "--prompt", args.prompt, ] proc = run_command(collect_cmd, capture_output=False, check=False) return proc.returncode except FlowError as exc: print(str(exc), file=sys.stderr) return 1 if __name__ == "__main__": raise SystemExit(main())