Files

294 lines
9.1 KiB
Python
Executable File

#!/usr/bin/env python3
"""Run Gemini image generation flow end-to-end via Playwright CLI."""
from __future__ import annotations
import argparse
import json
import os
import re
import subprocess
import sys
import time
from pathlib import Path
class FlowError(RuntimeError):
"""Raised when a subprocess command in the flow fails."""
def run_command(
cmd: list[str], *, capture_output: bool = True, check: bool = True
) -> subprocess.CompletedProcess[str]:
kwargs: dict[str, object] = {"text": True}
if capture_output:
kwargs["stdout"] = subprocess.PIPE
kwargs["stderr"] = subprocess.STDOUT
proc = subprocess.run(cmd, **kwargs)
if check and proc.returncode != 0:
output = proc.stdout if capture_output else ""
raise FlowError(
f"Command failed ({proc.returncode}): {' '.join(cmd)}\n{output}"
)
return proc
def run_pw(pw_shared: Path, *args: str) -> str:
proc = run_command([str(pw_shared), *args], capture_output=True)
return proc.stdout or ""
def is_login_required(pw_shared: Path) -> bool:
out = run_pw(
pw_shared,
"eval",
(
"() => {"
"const hasAccount = !!document.querySelector("
"'button[aria-label*=\\\"Google 账号\\\"], "
"button[aria-label*=\\\"Google Account\\\"]'"
");"
"const hasService = !!document.querySelector('a[href*=\\\"ServiceLogin\\\"]');"
"const hasLoginCtl = Array.from(document.querySelectorAll('a,button'))"
".some(el => /登录|Sign in/i.test((el.textContent || '').trim()));"
"return !hasAccount && (hasService || hasLoginCtl);"
"}"
),
)
return bool(re.search(r"(?m)^true$", out))
def enter_image_tool(pw_shared: Path) -> None:
js = r"""
async (page) => {
const labels = [/创作图片/, /制作图片/, /Create image/i, /Image/i];
const openToolMenu = async () => {
const cn = page.getByRole('button', { name: '工具', exact: true }).first();
if (await cn.count()) {
await cn.click();
return true;
}
const generic = page.getByRole('button', { name: /工具|Tools/i }).first();
if (await generic.count()) {
await generic.click();
return true;
}
return false;
};
const tryCardButtons = async () => {
for (const re of labels) {
const btn = page.getByRole('button', { name: re }).first();
if (await btn.count()) {
try {
await btn.click({ timeout: 2000 });
return true;
} catch (_) {
// Overlay may intercept pointer. Fall through to menu strategy.
}
}
}
return false;
};
const tryToolMenu = async () => {
const opened = await openToolMenu();
if (!opened) return false;
for (const re of labels) {
const itemCheck = page.getByRole('menuitemcheckbox', { name: re }).first();
if (await itemCheck.count()) {
await itemCheck.click();
return true;
}
const itemPlain = page.getByRole('menuitem', { name: re }).first();
if (await itemPlain.count()) {
await itemPlain.click();
return true;
}
}
return false;
};
let ok = await tryCardButtons();
if (!ok) ok = await tryToolMenu();
if (!ok) ok = await tryToolMenu();
if (!ok) throw new Error('Image tool entry not found');
}
"""
run_pw(pw_shared, "run-code", js)
def submit_and_download_one(pw_shared: Path, prompt: str) -> None:
js = f"""
async (page) => {{
const prompt = {json.dumps(prompt)};
const input = page.getByRole('textbox', {{ name: /为 Gemini 输入提示|Enter a prompt/i }}).first();
await input.click();
await input.fill(prompt);
await input.press('Enter');
const stopBtn = page.getByRole('button', {{ name: /停止回答|Stop response/i }}).first();
await stopBtn.waitFor({{ state: 'visible', timeout: 15000 }}).catch(() => {{}});
await stopBtn.waitFor({{ state: 'hidden', timeout: 240000 }});
const downloadBtn = page.getByRole('button', {{ name: /下载完整尺寸的图片|下载图片|Download full size|Download image|Download/i }}).last();
if (!(await downloadBtn.count())) {{
throw new Error('Image download button not found');
}}
const downloadPromise = page.waitForEvent('download', {{ timeout: 45000 }}).catch(() => null);
await downloadBtn.click();
const preferredItem = page.getByRole('menuitem', {{ name: /完整尺寸|Full size|PNG|JPG|JPEG|WEBP/i }}).first();
if (await preferredItem.isVisible().catch(() => false)) {{
await preferredItem.click();
}} else {{
const anyItem = page.getByRole('menuitem').first();
if (await anyItem.isVisible().catch(() => false)) {{
await anyItem.click();
}}
}}
const download = await downloadPromise;
if (!download) {{
const failedToast = page.getByText(/下载失败|Download failed|无法下载|保存失败/i).first();
if (await failedToast.isVisible().catch(() => false)) {{
throw new Error('Image download failed');
}}
throw new Error('Image download did not start');
}}
await download.path().catch(() => null);
await page.waitForTimeout(800);
}}
"""
run_pw(pw_shared, "run-code", js)
def retry_click_latest_download(pw_shared: Path) -> None:
js = r"""
async (page) => {
const btn = page.getByRole('button', { name: /下载完整尺寸的图片|下载图片|Download full size|Download image|Download/i }).last();
if (!(await btn.count())) {
throw new Error('Image download button not found for retry');
}
const downloadPromise = page.waitForEvent('download', { timeout: 45000 }).catch(() => null);
await btn.click();
const download = await downloadPromise;
if (!download) {
throw new Error('Retry image download did not start');
}
await download.path().catch(() => null);
await page.waitForTimeout(800);
}
"""
run_pw(pw_shared, "run-code", js)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Generate images on Gemini web and collect downloaded files."
)
parser.add_argument("--prompt", required=True, help="Prompt text for image generation.")
parser.add_argument(
"--target", required=True, help="Absolute output directory for collected files."
)
parser.add_argument(
"--count", type=int, default=1, help="Number of images to generate. Default: 1."
)
parser.add_argument(
"--no-headed",
action="store_true",
help="Run browser without headed mode.",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
if args.count < 1:
print("--count must be a positive integer.", file=sys.stderr)
return 1
repo_root = Path(__file__).resolve().parents[3]
pw_shared = Path(
os.environ.get("PW_SHARED_WRAPPER", str(repo_root / "tools/pw"))
).expanduser()
collect_script = (Path(__file__).resolve().parent / "collect_downloads.py").resolve()
if not pw_shared.exists() or not pw_shared.is_file():
print(f"Shared Playwright wrapper not found: {pw_shared}", file=sys.stderr)
return 1
if not os.access(pw_shared, os.X_OK):
print(f"Shared Playwright wrapper is not executable: {pw_shared}", file=sys.stderr)
return 1
if not collect_script.exists():
print(f"Collector script not found: {collect_script}", file=sys.stderr)
return 1
target = Path(args.target).expanduser().resolve()
target.mkdir(parents=True, exist_ok=True)
start_ts = time.time()
try:
os.environ["PLAYWRIGHT_SHARED_INIT_MODE"] = (
"headless" if args.no_headed else "headed"
)
run_pw(pw_shared, "snapshot")
run_pw(pw_shared, "goto", "https://gemini.google.com/app")
run_pw(pw_shared, "snapshot")
if is_login_required(pw_shared):
print(
"Gemini is not logged in. Please log in at https://gemini.google.com/app and rerun.",
file=sys.stderr,
)
return 2
enter_image_tool(pw_shared)
for i in range(1, args.count + 1):
current_prompt = args.prompt
if args.count > 1:
current_prompt = (
f"{args.prompt}\n"
f"变体要求:这是第 {i} / {args.count} 张。保持主题一致,但构图和光影细节需要变化。"
)
submit_and_download_one(pw_shared, current_prompt)
collect_cmd = [
sys.executable,
str(collect_script),
"--target",
str(target),
"--since",
str(start_ts),
"--expected-count",
str(args.count),
"--limit",
str(args.count),
"--prefix",
"gemini-image",
"--prompt",
args.prompt,
]
proc = run_command(collect_cmd, capture_output=False, check=False)
if proc.returncode == 0:
return 0
# Fallback: click latest image download button once and retry collection.
try:
retry_click_latest_download(pw_shared)
except FlowError:
return proc.returncode
retry_proc = run_command(collect_cmd, capture_output=False, check=False)
return retry_proc.returncode
except FlowError as exc:
print(str(exc), file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())