#!/usr/bin/env python3 """Run Gemini image generation flow end-to-end via Playwright CLI.""" from __future__ import annotations import argparse import json import os import re import subprocess import sys import time from pathlib import Path class FlowError(RuntimeError): """Raised when a subprocess command in the flow fails.""" def run_command( cmd: list[str], *, capture_output: bool = True, check: bool = True ) -> subprocess.CompletedProcess[str]: kwargs: dict[str, object] = {"text": True} if capture_output: kwargs["stdout"] = subprocess.PIPE kwargs["stderr"] = subprocess.STDOUT proc = subprocess.run(cmd, **kwargs) if check and proc.returncode != 0: output = proc.stdout if capture_output else "" raise FlowError( f"Command failed ({proc.returncode}): {' '.join(cmd)}\n{output}" ) return proc def run_pw(pw_shared: Path, *args: str) -> str: proc = run_command([str(pw_shared), *args], capture_output=True) return proc.stdout or "" def is_login_required(pw_shared: Path) -> bool: out = run_pw( pw_shared, "eval", ( "() => {" "const hasAccount = !!document.querySelector(" "'button[aria-label*=\\\"Google 账号\\\"], " "button[aria-label*=\\\"Google Account\\\"]'" ");" "const hasService = !!document.querySelector('a[href*=\\\"ServiceLogin\\\"]');" "const hasLoginCtl = Array.from(document.querySelectorAll('a,button'))" ".some(el => /登录|Sign in/i.test((el.textContent || '').trim()));" "return !hasAccount && (hasService || hasLoginCtl);" "}" ), ) return bool(re.search(r"(?m)^true$", out)) def enter_image_tool(pw_shared: Path) -> None: js = r""" async (page) => { const labels = [/创作图片/, /制作图片/, /Create image/i, /Image/i]; const openToolMenu = async () => { const cn = page.getByRole('button', { name: '工具', exact: true }).first(); if (await cn.count()) { await cn.click(); return true; } const generic = page.getByRole('button', { name: /工具|Tools/i }).first(); if (await generic.count()) { await generic.click(); return true; } return false; }; const tryCardButtons = async () => { for (const re of labels) { const btn = page.getByRole('button', { name: re }).first(); if (await btn.count()) { try { await btn.click({ timeout: 2000 }); return true; } catch (_) { // Overlay may intercept pointer. Fall through to menu strategy. } } } return false; }; const tryToolMenu = async () => { const opened = await openToolMenu(); if (!opened) return false; for (const re of labels) { const itemCheck = page.getByRole('menuitemcheckbox', { name: re }).first(); if (await itemCheck.count()) { await itemCheck.click(); return true; } const itemPlain = page.getByRole('menuitem', { name: re }).first(); if (await itemPlain.count()) { await itemPlain.click(); return true; } } return false; }; let ok = await tryCardButtons(); if (!ok) ok = await tryToolMenu(); if (!ok) ok = await tryToolMenu(); if (!ok) throw new Error('Image tool entry not found'); } """ run_pw(pw_shared, "run-code", js) def submit_and_download_one(pw_shared: Path, prompt: str) -> None: js = f""" async (page) => {{ const prompt = {json.dumps(prompt)}; const input = page.getByRole('textbox', {{ name: /为 Gemini 输入提示|Enter a prompt/i }}).first(); await input.click(); await input.fill(prompt); await input.press('Enter'); const stopBtn = page.getByRole('button', {{ name: /停止回答|Stop response/i }}).first(); await stopBtn.waitFor({{ state: 'visible', timeout: 15000 }}).catch(() => {{}}); await stopBtn.waitFor({{ state: 'hidden', timeout: 240000 }}); const downloadBtn = page.getByRole('button', {{ name: /下载完整尺寸的图片|下载图片|Download full size|Download image|Download/i }}).last(); if (!(await downloadBtn.count())) {{ throw new Error('Image download button not found'); }} const downloadPromise = page.waitForEvent('download', {{ timeout: 45000 }}).catch(() => null); await downloadBtn.click(); const preferredItem = page.getByRole('menuitem', {{ name: /完整尺寸|Full size|PNG|JPG|JPEG|WEBP/i }}).first(); if (await preferredItem.isVisible().catch(() => false)) {{ await preferredItem.click(); }} else {{ const anyItem = page.getByRole('menuitem').first(); if (await anyItem.isVisible().catch(() => false)) {{ await anyItem.click(); }} }} const download = await downloadPromise; if (!download) {{ const failedToast = page.getByText(/下载失败|Download failed|无法下载|保存失败/i).first(); if (await failedToast.isVisible().catch(() => false)) {{ throw new Error('Image download failed'); }} throw new Error('Image download did not start'); }} await download.path().catch(() => null); await page.waitForTimeout(800); }} """ run_pw(pw_shared, "run-code", js) def retry_click_latest_download(pw_shared: Path) -> None: js = r""" async (page) => { const btn = page.getByRole('button', { name: /下载完整尺寸的图片|下载图片|Download full size|Download image|Download/i }).last(); if (!(await btn.count())) { throw new Error('Image download button not found for retry'); } const downloadPromise = page.waitForEvent('download', { timeout: 45000 }).catch(() => null); await btn.click(); const download = await downloadPromise; if (!download) { throw new Error('Retry image download did not start'); } await download.path().catch(() => null); await page.waitForTimeout(800); } """ run_pw(pw_shared, "run-code", js) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Generate images on Gemini web and collect downloaded files." ) parser.add_argument("--prompt", required=True, help="Prompt text for image generation.") parser.add_argument( "--target", required=True, help="Absolute output directory for collected files." ) parser.add_argument( "--count", type=int, default=1, help="Number of images to generate. Default: 1." ) parser.add_argument( "--no-headed", action="store_true", help="Run browser without headed mode.", ) return parser.parse_args() def main() -> int: args = parse_args() if args.count < 1: print("--count must be a positive integer.", file=sys.stderr) return 1 repo_root = Path(__file__).resolve().parents[3] pw_shared = Path( os.environ.get("PW_SHARED_WRAPPER", str(repo_root / "tools/pw")) ).expanduser() collect_script = (Path(__file__).resolve().parent / "collect_downloads.py").resolve() if not pw_shared.exists() or not pw_shared.is_file(): print(f"Shared Playwright wrapper not found: {pw_shared}", file=sys.stderr) return 1 if not os.access(pw_shared, os.X_OK): print(f"Shared Playwright wrapper is not executable: {pw_shared}", file=sys.stderr) return 1 if not collect_script.exists(): print(f"Collector script not found: {collect_script}", file=sys.stderr) return 1 target = Path(args.target).expanduser().resolve() target.mkdir(parents=True, exist_ok=True) start_ts = time.time() try: os.environ["PLAYWRIGHT_SHARED_INIT_MODE"] = ( "headless" if args.no_headed else "headed" ) run_pw(pw_shared, "snapshot") run_pw(pw_shared, "goto", "https://gemini.google.com/app") run_pw(pw_shared, "snapshot") if is_login_required(pw_shared): print( "Gemini is not logged in. Please log in at https://gemini.google.com/app and rerun.", file=sys.stderr, ) return 2 enter_image_tool(pw_shared) for i in range(1, args.count + 1): current_prompt = args.prompt if args.count > 1: current_prompt = ( f"{args.prompt}\n" f"变体要求:这是第 {i} / {args.count} 张。保持主题一致,但构图和光影细节需要变化。" ) submit_and_download_one(pw_shared, current_prompt) collect_cmd = [ sys.executable, str(collect_script), "--target", str(target), "--since", str(start_ts), "--expected-count", str(args.count), "--limit", str(args.count), "--prefix", "gemini-image", "--prompt", args.prompt, ] proc = run_command(collect_cmd, capture_output=False, check=False) if proc.returncode == 0: return 0 # Fallback: click latest image download button once and retry collection. try: retry_click_latest_download(pw_shared) except FlowError: return proc.returncode retry_proc = run_command(collect_cmd, capture_output=False, check=False) return retry_proc.returncode except FlowError as exc: print(str(exc), file=sys.stderr) return 1 if __name__ == "__main__": raise SystemExit(main())