feat(skills): 初始化多技能包并完善发布流程
- 新增 git-push skill 与 agent 元数据,加入安全推送与 commit message 规范 - 新增 xiaohongshu-engage / xiaohongshu-publish-note 两个技能及元数据 - 新增 gemini-image-web skill、元数据与下载整理脚本 - 新增 .gitignore 以忽略常见生成产物
This commit is contained in:
@@ -0,0 +1,129 @@
|
||||
---
|
||||
name: gemini-image-web
|
||||
description: "Generate images in Gemini web via browser automation, download results, and collect downloaded files into a local target folder with manifest output. Use when users ask to create Gemini web images, need multiple images generated through repeated requests, or need structured outputs (paths, metadata, dedupe) for downstream publishing workflows."
|
||||
---
|
||||
|
||||
# Gemini Image Web
|
||||
|
||||
## Workflow
|
||||
|
||||
1. Open Gemini web and confirm user is logged in.
|
||||
2. Set output directory and target image count.
|
||||
3. Send one image-generation prompt per request until target count is reached.
|
||||
4. For each request, wait until generation ends (`停止回答` button disappears), then download.
|
||||
5. Collect downloaded files into target folder with batch naming, dedupe, and manifest.
|
||||
6. Return file paths, manifest path, and failure summary.
|
||||
|
||||
## 1) Prerequisites
|
||||
|
||||
- Ensure browser session can access Gemini (`https://gemini.google.com/app`).
|
||||
- If login, captcha, or MFA is required, pause and ask user to complete it manually.
|
||||
- Decide output directory before generation, for example:
|
||||
- `/Users/xd/java/xhs/output/gemini`
|
||||
|
||||
## 2) Open Gemini
|
||||
|
||||
- Navigate to Gemini app page.
|
||||
- Confirm login state by checking account/avatar area.
|
||||
- If not logged in, stop and ask user to complete login manually.
|
||||
- If model selection is needed, choose a model that supports image output.
|
||||
|
||||
## 3) Multi-Image Generation Strategy
|
||||
|
||||
- Gemini web currently returns one image per request.
|
||||
- If user asks for `N` images, run `N` requests in sequence.
|
||||
- Keep a shared base prompt, then apply per-image variants only when needed.
|
||||
- Record a `download_start_ts` before each download action.
|
||||
|
||||
Prompt construction rules:
|
||||
|
||||
- Keep a single clear subject per prompt.
|
||||
- Include visual style, lighting, composition, and aspect ratio.
|
||||
- Include banned elements only if user requests negative constraints.
|
||||
|
||||
## 4) Wait For Completion (Explicit End Condition)
|
||||
|
||||
- After submit, wait for generation state to appear.
|
||||
- Treat generation as complete only when:
|
||||
- `停止回答` button disappears, and
|
||||
- latest assistant response has downloadable image action.
|
||||
- If refs are stale or state is unclear, re-snapshot and retry once.
|
||||
|
||||
## 5) Download Images
|
||||
|
||||
- Download from the latest assistant response block (not old history blocks).
|
||||
- Click `下载完整尺寸的图片`.
|
||||
- Wait for download completion toast/progress to end before next request.
|
||||
- Repeat until target count is reached or retry budget is exhausted.
|
||||
|
||||
## 6) Collect Downloaded Files
|
||||
|
||||
Use bundled script:
|
||||
|
||||
```bash
|
||||
python3 scripts/collect_downloads.py \
|
||||
--source /var/folders/.../playwright-mcp-output/<session-id> \
|
||||
--source ~/Downloads \
|
||||
--target /ABS/PATH/TO/output/gemini \
|
||||
--since <download_start_unix_ts> \
|
||||
--limit <max_to_collect> \
|
||||
--expected-count <required_count> \
|
||||
--prefix gemini \
|
||||
--batch-id <run_id> \
|
||||
--prompt "<prompt_used>"
|
||||
```
|
||||
|
||||
Script behavior:
|
||||
|
||||
- Source strategy:
|
||||
- Prefer Playwright temp download directory first.
|
||||
- Fallback to `~/Downloads` when primary source has no matches.
|
||||
- Filters to image extensions (`png,jpg,jpeg,webp`).
|
||||
- Uses batch naming (`<prefix>-<batch-id>-NN.ext`).
|
||||
- Dedupes by SHA-256 (current run + existing target files).
|
||||
- Captures dimensions (`width`, `height`) and writes JSON manifest.
|
||||
- Prints absolute output paths and manifest path.
|
||||
|
||||
## 7) Failure Handling By Step
|
||||
|
||||
- Login step:
|
||||
- If login/captcha/MFA blocks, stop and ask user to complete manually.
|
||||
- Generation step:
|
||||
- If failed once, retry once with minimal prompt rewrite.
|
||||
- If still failing, record failure reason and continue remaining quota if requested.
|
||||
- Completion detection step:
|
||||
- If `停止回答` does not disappear within timeout, retry snapshot+wait once.
|
||||
- If still stuck, mark timeout and skip this request.
|
||||
- Download step:
|
||||
- If click intercepted or stale ref, re-snapshot and retry once.
|
||||
- If no file detected after timeout, mark download failure for that request.
|
||||
- Collection step:
|
||||
- If no matching files, return manifest with failure status.
|
||||
- If dedupe removes all files, return manifest with `no_files_after_dedupe`.
|
||||
- If collected count < required count, return `insufficient_files`.
|
||||
|
||||
## 8) Return Output
|
||||
|
||||
Return:
|
||||
|
||||
- prompt used
|
||||
- target count and successful count
|
||||
- absolute file paths for collected files
|
||||
- manifest absolute path
|
||||
- retries, failures, and skipped duplicates
|
||||
|
||||
## 9) Reliability Rules
|
||||
|
||||
- Re-snapshot after navigation, model switch, and generation completion.
|
||||
- If refs are stale or click intercepted, re-snapshot and retry once.
|
||||
- Do not assume static selectors across Gemini updates; rely on visible text and role-first matching.
|
||||
|
||||
## 10) Boundaries
|
||||
|
||||
- Do not bypass login verification, captcha, paywalls, or security checks.
|
||||
- Do not submit disallowed or unsafe image prompts.
|
||||
- Stop before posting to third-party platforms; this skill only generates and collects images.
|
||||
|
||||
## Scripts
|
||||
|
||||
- `scripts/collect_downloads.py`: Collect recent downloaded images with fallback sources, dedupe, and manifest.
|
||||
@@ -0,0 +1,4 @@
|
||||
interface:
|
||||
display_name: "Gemini Image Web"
|
||||
short_description: "Generate Gemini images via web, multi-request, dedupe, and manifest."
|
||||
default_prompt: "Use $gemini-image-web to generate one image per Gemini request until target count is reached, download full-size outputs, then collect files with fallback source strategy, dedupe, and manifest metadata."
|
||||
+367
@@ -0,0 +1,367 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Collect recent image downloads into a target directory with manifest output."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Collect recent image downloads into a target directory."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--source",
|
||||
action="append",
|
||||
help=(
|
||||
"Source download directory. Repeatable. "
|
||||
"If omitted, auto-discovers Playwright temp downloads and then "
|
||||
"falls back to ~/Downloads."
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--target",
|
||||
required=True,
|
||||
help="Target directory for collected files.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--since",
|
||||
type=float,
|
||||
default=time.time() - 1800,
|
||||
help="Unix timestamp lower bound for file mtime. Default: now-1800s",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--ext",
|
||||
default="png,jpg,jpeg,webp",
|
||||
help="Comma-separated file extensions to include.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--limit",
|
||||
type=int,
|
||||
default=8,
|
||||
help="Maximum files to collect. Default: 8",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--expected-count",
|
||||
type=int,
|
||||
default=None,
|
||||
help="Required minimum number of collected files.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--prefix",
|
||||
default="gemini",
|
||||
help="Filename prefix for collected files. Default: gemini",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--batch-id",
|
||||
default=None,
|
||||
help="Batch ID used in output filenames. Default: current timestamp.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--manifest",
|
||||
default=None,
|
||||
help="Manifest output path. Default: <target>/<prefix>-<batch-id>-manifest.json",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--prompt",
|
||||
default="",
|
||||
help="Prompt text to store in manifest.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--move",
|
||||
action="store_true",
|
||||
help="Move files instead of copying.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--no-dedupe-target",
|
||||
action="store_true",
|
||||
help="Disable hash dedupe against existing files in target directory.",
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def unique_path(path: Path) -> Path:
|
||||
if not path.exists():
|
||||
return path
|
||||
stem = path.stem
|
||||
suffix = path.suffix
|
||||
parent = path.parent
|
||||
idx = 2
|
||||
while True:
|
||||
candidate = parent / f"{stem}-{idx}{suffix}"
|
||||
if not candidate.exists():
|
||||
return candidate
|
||||
idx += 1
|
||||
|
||||
|
||||
def collect_candidates(source: Path, since_ts: float, allowed_ext: set[str]) -> list[Path]:
|
||||
files: list[Path] = []
|
||||
if not source.exists():
|
||||
return files
|
||||
for path in source.iterdir():
|
||||
if not path.is_file():
|
||||
continue
|
||||
ext = path.suffix.lower().lstrip(".")
|
||||
if ext not in allowed_ext:
|
||||
continue
|
||||
try:
|
||||
mtime = path.stat().st_mtime
|
||||
except OSError:
|
||||
continue
|
||||
if mtime >= since_ts:
|
||||
files.append(path)
|
||||
files.sort(key=lambda p: p.stat().st_mtime, reverse=True)
|
||||
return files
|
||||
|
||||
|
||||
def discover_playwright_sources() -> list[Path]:
|
||||
globs = (
|
||||
"/var/folders/*/*/*/T/playwright-mcp-output/*",
|
||||
"/tmp/playwright-mcp-output/*",
|
||||
)
|
||||
candidates: list[Path] = []
|
||||
seen: set[Path] = set()
|
||||
for pattern in globs:
|
||||
for raw in Path("/").glob(pattern.lstrip("/")):
|
||||
if not raw.is_dir():
|
||||
continue
|
||||
path = raw.resolve()
|
||||
if path in seen:
|
||||
continue
|
||||
seen.add(path)
|
||||
candidates.append(path)
|
||||
candidates.sort(key=lambda p: p.stat().st_mtime, reverse=True)
|
||||
return candidates
|
||||
|
||||
|
||||
def resolve_sources(raw_sources: list[str] | None) -> list[Path]:
|
||||
if raw_sources:
|
||||
return [Path(item).expanduser().resolve() for item in raw_sources]
|
||||
auto_sources = discover_playwright_sources()
|
||||
auto_sources.append((Path.home() / "Downloads").resolve())
|
||||
result: list[Path] = []
|
||||
seen: set[Path] = set()
|
||||
for path in auto_sources:
|
||||
if path in seen:
|
||||
continue
|
||||
seen.add(path)
|
||||
result.append(path)
|
||||
return result
|
||||
|
||||
|
||||
def sha256_of_file(path: Path) -> str:
|
||||
digest = hashlib.sha256()
|
||||
with path.open("rb") as fh:
|
||||
while True:
|
||||
chunk = fh.read(1024 * 1024)
|
||||
if not chunk:
|
||||
break
|
||||
digest.update(chunk)
|
||||
return digest.hexdigest()
|
||||
|
||||
|
||||
def dimensions_from_sips(path: Path) -> tuple[int, int] | None:
|
||||
try:
|
||||
proc = subprocess.run(
|
||||
["sips", "-g", "pixelWidth", "-g", "pixelHeight", str(path)],
|
||||
check=False,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
except OSError:
|
||||
return None
|
||||
if proc.returncode != 0:
|
||||
return None
|
||||
width_match = re.search(r"pixelWidth:\s+(\d+)", proc.stdout)
|
||||
height_match = re.search(r"pixelHeight:\s+(\d+)", proc.stdout)
|
||||
if not width_match or not height_match:
|
||||
return None
|
||||
return int(width_match.group(1)), int(height_match.group(1))
|
||||
|
||||
|
||||
def dimensions_from_png(path: Path) -> tuple[int, int] | None:
|
||||
try:
|
||||
with path.open("rb") as fh:
|
||||
header = fh.read(24)
|
||||
except OSError:
|
||||
return None
|
||||
if len(header) < 24 or header[:8] != b"\x89PNG\r\n\x1a\n":
|
||||
return None
|
||||
width = int.from_bytes(header[16:20], "big")
|
||||
height = int.from_bytes(header[20:24], "big")
|
||||
return width, height
|
||||
|
||||
|
||||
def read_dimensions(path: Path) -> tuple[int, int] | None:
|
||||
dims = dimensions_from_sips(path)
|
||||
if dims:
|
||||
return dims
|
||||
if path.suffix.lower() == ".png":
|
||||
return dimensions_from_png(path)
|
||||
return None
|
||||
|
||||
|
||||
def iso_ts(ts: float) -> str:
|
||||
return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
|
||||
|
||||
|
||||
def select_source_candidates(
|
||||
sources: list[Path], since_ts: float, allowed_ext: set[str]
|
||||
) -> tuple[Path | None, list[Path], list[dict[str, object]]]:
|
||||
tried: list[dict[str, object]] = []
|
||||
for source in sources:
|
||||
files = collect_candidates(source, since_ts, allowed_ext)
|
||||
tried.append({"source": str(source), "matches": len(files)})
|
||||
if files:
|
||||
return source, files, tried
|
||||
return None, [], tried
|
||||
|
||||
|
||||
def collect_existing_hashes(target: Path, allowed_ext: set[str]) -> set[str]:
|
||||
hashes: set[str] = set()
|
||||
for path in target.iterdir():
|
||||
if not path.is_file():
|
||||
continue
|
||||
ext = path.suffix.lower().lstrip(".")
|
||||
if ext not in allowed_ext:
|
||||
continue
|
||||
try:
|
||||
hashes.add(sha256_of_file(path))
|
||||
except OSError:
|
||||
continue
|
||||
return hashes
|
||||
|
||||
|
||||
def write_manifest(manifest_path: Path, payload: dict[str, object]) -> None:
|
||||
manifest_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with manifest_path.open("w", encoding="utf-8") as fh:
|
||||
json.dump(payload, fh, ensure_ascii=False, indent=2)
|
||||
fh.write("\n")
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
target = Path(args.target).expanduser().resolve()
|
||||
target.mkdir(parents=True, exist_ok=True)
|
||||
batch_id = args.batch_id or time.strftime("%Y%m%d-%H%M%S")
|
||||
manifest_path = (
|
||||
Path(args.manifest).expanduser().resolve()
|
||||
if args.manifest
|
||||
else target / f"{args.prefix}-{batch_id}-manifest.json"
|
||||
)
|
||||
|
||||
allowed_ext = {
|
||||
ext.strip().lower().lstrip(".")
|
||||
for ext in args.ext.split(",")
|
||||
if ext.strip()
|
||||
}
|
||||
if not allowed_ext:
|
||||
print("No valid extensions provided.", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
sources = resolve_sources(args.source)
|
||||
selected_source, candidates, tried_sources = select_source_candidates(
|
||||
sources, args.since, allowed_ext
|
||||
)
|
||||
if not candidates:
|
||||
payload = {
|
||||
"status": "no_matching_files",
|
||||
"created_at": iso_ts(time.time()),
|
||||
"batch_id": batch_id,
|
||||
"prompt": args.prompt,
|
||||
"target_dir": str(target),
|
||||
"since_ts": args.since,
|
||||
"sources_tried": tried_sources,
|
||||
"collected_count": 0,
|
||||
"files": [],
|
||||
}
|
||||
write_manifest(manifest_path, payload)
|
||||
print("No matching files found.")
|
||||
print(f"MANIFEST: {manifest_path}")
|
||||
return 1
|
||||
|
||||
dedupe_target = not args.no_dedupe_target
|
||||
seen_hashes: set[str] = set()
|
||||
if dedupe_target:
|
||||
seen_hashes.update(collect_existing_hashes(target, allowed_ext))
|
||||
|
||||
files: list[dict[str, object]] = []
|
||||
skipped_duplicates = 0
|
||||
for src in candidates:
|
||||
if len(files) >= args.limit:
|
||||
break
|
||||
try:
|
||||
src_hash = sha256_of_file(src)
|
||||
except OSError:
|
||||
continue
|
||||
if src_hash in seen_hashes:
|
||||
skipped_duplicates += 1
|
||||
continue
|
||||
|
||||
idx = len(files) + 1
|
||||
dst = target / f"{args.prefix}-{batch_id}-{idx:02d}{src.suffix.lower()}"
|
||||
dst = unique_path(dst)
|
||||
src_mtime = src.stat().st_mtime
|
||||
if args.move:
|
||||
shutil.move(str(src), str(dst))
|
||||
else:
|
||||
shutil.copy2(str(src), str(dst))
|
||||
dims = read_dimensions(dst)
|
||||
file_entry = {
|
||||
"prompt": args.prompt,
|
||||
"generated_at": iso_ts(src_mtime),
|
||||
"source_filename": src.name,
|
||||
"source_path": str(src.resolve()),
|
||||
"target_path": str(dst.resolve()),
|
||||
"sha256": src_hash,
|
||||
"width": dims[0] if dims else None,
|
||||
"height": dims[1] if dims else None,
|
||||
}
|
||||
files.append(file_entry)
|
||||
seen_hashes.add(src_hash)
|
||||
|
||||
status = "ok"
|
||||
exit_code = 0
|
||||
expected_count = args.expected_count
|
||||
if not files:
|
||||
status = "no_files_after_dedupe"
|
||||
exit_code = 1
|
||||
elif expected_count is not None and len(files) < expected_count:
|
||||
status = "insufficient_files"
|
||||
exit_code = 1
|
||||
|
||||
payload = {
|
||||
"status": status,
|
||||
"created_at": iso_ts(time.time()),
|
||||
"batch_id": batch_id,
|
||||
"prompt": args.prompt,
|
||||
"target_dir": str(target),
|
||||
"source_dir": str(selected_source) if selected_source else None,
|
||||
"sources_tried": tried_sources,
|
||||
"since_ts": args.since,
|
||||
"limit": args.limit,
|
||||
"expected_count": expected_count,
|
||||
"dedupe_target": dedupe_target,
|
||||
"skipped_duplicates": skipped_duplicates,
|
||||
"collected_count": len(files),
|
||||
"files": files,
|
||||
}
|
||||
write_manifest(manifest_path, payload)
|
||||
|
||||
for item in files:
|
||||
print(item["target_path"])
|
||||
print(f"MANIFEST: {manifest_path}")
|
||||
return exit_code
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user