site logo

Marico's space

用于 Gmail 收件箱分析的 Apify Actor:仅 refresh-token 的 OAuth、异步路由、按功能配额

服务器技术 2026-05-16 15:07:50 12

最近折腾了一个 Gmail 收件箱分析工具,踩了几个坑,这篇把设计思路说清楚。开源地址:apify-gmail-inbox-intel。需要强调的是,这不是爬虫也不是批量发送工具,只是一个基于 gmail.readonly 权限范围的收件箱数据分析工具。本文不是教程,更像是设计复盘。

遇到过"哪个客户的邮件我忘了回复"或者"平均回复周期是多久"这类问题吗?这个工具就是来解决这类需求的。

为什么选 Apify Actor

我同时需要三样东西:无服务器运行时、按结果计费、以及规范的输入校验。Apify 全部搞定,不用写后端。托管端点、数据集存储、键值存储(用于状态管理)、以及已经付费的开发者用户群体,这些都现成的。

这个 Actor 通过单一入口暴露四个功能:

  • thread_search — 按 q 查询 Gmail 邮件串,支持分页,返回元数据 + 消息计数
  • reply_metrics — 针对每个邮件串,计算我回复了多少、对方回复了多少、最后回复距今多久、是否超时
  • summarizer — 可选的 OpenAI LLM(人工智能)摘要功能,需要自备 API(应用程序接口)密钥
  • unread_digest — 列出最近 N 小时内未读邮件串,按标签分组

设计决策 1:仅 refresh-token 的 OAuth

早期最难的决定是 OAuth,有两条路:

  1. Actor 侧做 3-legged OAuth — Actor 托管回调 URL,换 code,换 token,存 token。
  2. 仅 refresh-token 模式 — 用户自己完成 OAuth 流程,把 {refresh_token, client_id, client_secret} 作为 Actor 输入传给我。

我选了方案 2,理由:

  • Apify Actors 没有稳定的、每个用户独立的 HTTPS 回调 URL。每次运行是一个 job,不是常驻服务器。
  • "我们从不存储你的 Gmail token" 是更容易让人接受的隐私承诺。
  • 我不想成为别人邮箱的密钥持有者。

Actor 里的流程是这样的:

# src/gmail_client.py — 核心逻辑
async def get_access_token(oauth_token: dict) -> str: resp = await httpx_client.post( "https://oauth2.googleapis.com/token", data={ "grant_type": "refresh_token", "refresh_token": oauth_token["refresh_token"], "client_id": oauth_token["client_id"], "client_secret": oauth_token["client_secret"], }, ) return resp.json()["access_token"]

Access token 只存在内存里。Job 结束 → 进程销毁 → token 消失。尽力而为,但至少我的代码路径不会把 token 持久化到 Apify 存储里。

设计决策 2:一个 async 路由,而不是四个 Actor

拆成四个 Actor 看起来很诱人。没拆,原因是:

  • 营销层面。一个带四个 feature 枚举值的 Actor 只有一个商店页、一个评分、一堆评论。拆成四个 Actor,流量全部分散。
  • 共享 OAuth 和共享配额。Token 交换、错误处理、mask helpers、KVS(键值存储)配额管理 — 全部复用。

src/main.py 就是一个路由:

FEATURES = { "thread_search": thread_search.run, "reply_metrics": reply_metrics.run, "summarizer": summarizer.run, "unread_digest": digest.run,
} async def main(): actor_input = await Actor.get_input() or {} feature = actor_input.get("feature") if feature not in FEATURES: raise ValueError(f"Unknown feature: {feature}") await FEATURES[feature](actor_input)

每个 feature 模块通过同一个共享文件定义自己的 INPUT_SCHEMA.json 语义 — feature 枚举驱动下游各 handler 的校验逻辑。

设计决策 3:配额存在 Apify KVS(键值存储)里

免费额度每月 100 个邮件串。这个计数器需要跨多次运行持久化。Apify KeyValueStore 是天然的选择 — 不需要额外的数据库,持久化,作用域限定在 Actor 范围内。

# src/quota.py — 核心逻辑
async def check_and_increment(user_id: str, feature: str, n: int): kvs = await Actor.open_key_value_store() key = f"quota/{user_id}/{month_key()}/{feature}" used = (await kvs.get_value(key)) or 0 if used + n > FREE_LIMIT: raise QuotaExceeded(feature, used, FREE_LIMIT) await kvs.set_value(key, used + n)

月份切换靠 year-month 字符串 key 实现 — 不需要 cron,不需要迁移,不会有漂移。Pro 版加个 flag 直接跳过检查。

测试

六个 pytest 测试,pytest.ini 里配了 asyncio_mode = auto。覆盖的场景:

  • 路由拒绝未知 feature
  • 四个 feature 在 dry_run=True 时干净利落地短路退出
  • 配额超限抛异常、未超限正常放行
[pytest]
asyncio_mode = auto

就这一行配置,决定了"6 个测试通过"和"6 个测试全报错:缺少事件循环"的区别。踩过坑才记住的。

定价模型

  • 免费版:每月 100 个邮件串
  • Pro 版:$19/月(5000 个邮件串元数据 + 100 次 LLM 摘要)
  • 按量付费附加:$0.50/1000 个邮件串元数据,$0.005/次摘要

Apify 处理账单,我专注写代码。

如果重做会怎么改

  • Webhook 触发 — 现在 unread_digest 是手动触发。定时触发 + 飞书/钉钉推送才是下一步该做的。
  • 标签级规则reply_metrics 现在是全局统计。给销售团队看每个标签的 SLA 矩阵会更有价值。
  • 多账号扇出 — 一次运行,多个 OAuth token,合并成一份数据集。

代码

  • 仓库:apify-gmail-inbox-intel
  • 许可证:MIT
  • Actor 清单:.actor/actor.json + INPUT_SCHEMA.json,想 fork 的话直接用

如果你也在做收件箱相关的自动化工作,欢迎试玩。有没有我遗漏的理由必须用完整的 3-legged OAuth?评论区聊。

原文链接:https://dev.to/foxck016077/an-apify-actor-for-gmail-inbox-analytics--refresh-token-only-oauth-async-router-per-feature-quota-3k8