广告实时调价数据流优化
一、背景
1.1 业务场景
在 oCPM(Optimized Cost Per Mille)智能出价场景中,广告系统需要根据实时转化效果动态调整出价。核心公式为:
1
| eCPM = Target_CPA × pCTR × pCVR × 校准因子 × 成本控制因子 × 1000
|
其中:
- Target_CPA:广告主设定的目标转化成本
- pCTR / pCVR:模型预估的点击率/转化率
- 校准因子:修正模型预估偏差,
calibration = 实际CVR / 预估CVR
- 成本控制因子:根据实际 CPA 与目标 CPA 的偏差调整出价
1.2 核心挑战
| 挑战 |
说明 |
| 时效性 |
数据越新,调价越精准;但实时处理成本高 |
| 准确性 |
需要归因服务确定转化归属,反作弊过滤虚假转化 |
| 稳定性 |
高 QPS 场景下,数据系统必须高可用 |
| 多窗口 |
需要同时维护 2h/24h/7d 等多个时间窗口的指标 |
1.3 优化目标
将数据上线时间从 10 分钟级提升到 1 分钟级,提升调价引擎的响应速度和准确性。
二、调价数据流全景
2.1 端到端数据流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| ┌─────────────────────────────────────────────────────────────────────────────┐ │ 广告调价数据流 │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ 曝光/点击 │ │ 转化回传 │ │ 归因服务 │ │ 反作弊过滤 │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ │ │ └───────────────┴───────────────┴───────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ │ 消息队列 (Kafka/Tdbank) │ │ │ └─────────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ │ 流处理引擎 (Spark/Flink) │ │ │ │ 聚合计算 + 多时间窗口 │ │ │ └─────────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ │ 特征存储 (Redis/TPG) │ │ │ │ 多时间窗口聚合指标 │ │ │ └─────────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ │ 调价引擎 (Bidding Engine) │ │ │ │ 计算校准因子 + 成本控制 │ │ │ └─────────────────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘
|
2.2 关键上游服务
归因服务 (Attribution Service)
核心问题:一个转化应该归功于哪次广告曝光/点击?
常见归因模型:
- Last Click:归因到最后一次点击
- Last Touch:点击优先于曝光,归因到最后一次触达
- Time Decay:时间衰减加权
反作弊过滤 (Anti-Fraud Filter)
核心问题:过滤虚假转化,防止脏数据污染调价
检测维度:
- 设备维度:模拟器、Root设备、设备农场
- 行为维度:点击到安装时间异常短(点击注入)
- 网络维度:机房IP、代理IP
三、调价所需数据
3.1 实际效果数据(归因+反作弊后)
| 指标 |
说明 |
来源 |
| Impressions |
有效曝光数 |
曝光日志 |
| Clicks |
有效点击数 |
点击日志 |
| Conversions |
有效转化数 |
转化日志(归因后) |
| Cost |
实际消耗 |
计费系统 |
| CTR |
点击率 = Click/Impression |
计算 |
| CVR |
转化率 = Conversion/Click |
计算 |
| CPA |
转化成本 = Cost/Conversion |
计算 |
3.2 模型预估数据(用于计算校准因子)
| 指标 |
说明 |
采集点 |
| Σ pCTR |
每次曝光时的 pCTR 累加 |
曝光时记录 |
| Σ pCVR |
每次曝光时的 pCVR 累加 |
曝光时记录 |
| Σ pCVR|click |
每次点击时的 pCVR 累加 |
点击时记录 |
3.3 校准因子计算
1 2
| CTR 校准因子 = 实际点击数 / Σ pCTR CVR 校准因子 = 实际转化数 / Σ pCVR
|
四、多时间窗口处理策略
4.1 为什么需要多时间窗口?
| 窗口 |
样本量 |
时效性 |
适用场景 |
| 2h |
少 |
高 |
捕捉突发变化(素材衰退、竞争加剧) |
| 24h |
中 |
中 |
日常调价主力,平衡稳定性和响应速度 |
| 7d/10d |
多 |
低 |
提供基准线,防止短期波动误判 |
4.2 窗口选择策略
1 2 3 4 5 6 7 8 9 10 11
| func GetCalibration(adID string) *CalibrationData { for _, window := range []string{"2h", "24h", "7d"} { data := queryData(adID, window) if data.Clicks >= minClickThreshold[window] && data.Conversions >= minConvThreshold[window] { return calculateCalibration(data) } } return defaultCalibration }
|
4.3 多窗口融合策略
方案1:样本量加权
1 2
| weightedCVR = (cvr_2h × clicks_2h + cvr_24h × clicks_24h × 0.8 + cvr_7d × clicks_7d × 0.5) / (clicks_2h + clicks_24h × 0.8 + clicks_7d × 0.5)
|
方案2:置信度阈值切换
- 2h 样本 > 100 clicks → 用 2h 窗口
- 否则用 24h 窗口
- 24h 也不足 → 用 7d 窗口
4.4 分层计算策略(进阶优化)
不同时间窗口对时效性要求不同,可以采用分层并行计算,平衡计算成本和数据新鲜度:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| ┌─────────────────────────────────────────────────────────────────────────────┐ │ 分层计算架构 │ │ │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ │ 实时层 (Real-time) │ │ │ │ │ │ │ │ 窗口: 2h │ │ │ │ 计算: 内存实时聚合 (Flink/Spark Streaming) │ │ │ │ 更新: 秒级/分钟级 │ │ │ │ 存储: Redis │ │ │ │ │ │ │ │ 适用: 对时效性要求高,需要快速响应效果变化 │ │ │ └─────────────────────────────────────────────────────────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ │ 近实时层 (Near Real-time) │ │ │ │ │ │ │ │ 窗口: 24h │ │ │ │ 计算: 离线任务 (Spark Batch) │ │ │ │ 更新: 小时级 (每小时跑一次) │ │ │ │ 存储: Redis / HBase │ │ │ │ │ │ │ │ 适用: 作为调价主力,1小时延迟可接受 │ │ │ └─────────────────────────────────────────────────────────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────────────────────────────┐ │ │ │ 离线层 (Batch) │ │ │ │ │ │ │ │ 窗口: 7d / 10d │ │ │ │ 计算: 离线任务 (Hive/Spark) │ │ │ │ 更新: 天级 (每天凌晨跑一次) │ │ │ │ 存储: HBase / MySQL │ │ │ │ │ │ │ │ 适用: 提供基准线,天级延迟完全可接受 │ │ │ └─────────────────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────────┘
|
分层计算的优势
| 优势 |
说明 |
| 资源高效 |
只有短窗口需要实时计算,长窗口用离线资源 |
| 成本优化 |
实时计算资源贵,离线资源便宜 |
| 并行处理 |
三层独立运行,互不阻塞 |
| 容错隔离 |
实时层故障不影响离线层,反之亦然 |
实现示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| type MultiLayerMetricsStore struct { realtime *RedisClient nearRT *RedisClient batch *HBaseClient }
func (s *MultiLayerMetricsStore) GetMetrics(adID string) *MultiWindowMetrics { return &MultiWindowMetrics{ Hour2: s.realtime.Get(adID), Hour24: s.nearRT.Get(adID), Day7: s.batch.Get(adID), } }
|
调度配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| realtime_job: window: 2h trigger: continuous sink: redis
nearrt_job: window: 24h schedule: "0 * * * *" source: hive sink: redis
batch_job: window: 7d schedule: "0 3 * * *" source: hive sink: hbase
|
数据流示意
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| 实时数据流 │ ┌───────────────┼───────────────┐ ▼ ▼ ▼ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ Kafka │ │ Kafka │ │ Hive │ └────┬────┘ └────┬────┘ └────┬────┘ │ │ │ ▼ │ │ ┌─────────┐ │ │ │ Flink │ │ │ │ (常驻) │ │ │ └────┬────┘ │ │ │ ▼ ▼ │ ┌─────────┐ ┌─────────┐ │ │ Spark │ │ Spark │ │ │ (每小时) │ │ (每天) │ │ └────┬────┘ └────┬────┘ │ │ │ ▼ ▼ ▼ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ Redis │ │ Redis │ │ HBase │ │ (2h) │ │ (24h) │ │ (7d) │ └────┬────┘ └────┬────┘ └────┬────┘ │ │ │ └───────────────┼───────────────┘ │ ▼ ┌───────────┐ │ 调价引擎 │ │ 合并查询 │ └───────────┘
|
与 V2 内存方案的对比
| 对比项 |
V2 全内存方案 |
分层计算方案 |
| 2h 窗口 |
内存计算 |
内存计算 |
| 24h 窗口 |
内存计算 |
离线小时级 |
| 7d 窗口 |
内存计算 |
离线天级 |
| 内存压力 |
高 (存全量) |
低 (只存短窗口) |
| 计算成本 |
高 |
低 |
| 延迟 |
全部分钟级 |
2h分钟级/24h小时级/7d天级 |
| 适用场景 |
全窗口高时效性要求 |
短窗口高时效,长窗口可接受延迟 |
选择建议:如果业务对 24h/7d 窗口的时效性要求不高(小时/天级延迟可接受),分层计算方案更经济;如果全部窗口都需要分钟级时效,则选择 V2 全内存方案。
五、V1 方案:查询时计算
5.1 架构
1 2 3 4 5 6 7
| ┌─────────────────────────────────────────────────────────────────────────────┐ │ V1 架构 │ │ │ │ Tdbank ──► SparkStreaming ──► TPG ──► AdProfile ──► SnsMixer ──► 出价 │ │ (1min) (存储) (8min) (5min) │ │ │ └─────────────────────────────────────────────────────────────────────────────┘
|
5.2 各组件职责
| 组件 |
职责 |
延迟 |
| SparkStreaming |
处理 Tdbank 3路数据流,聚合到 aid 维度,增量写入 TPG |
1 min |
| TPG |
存储 oCPM 广告全量明细数据 |
- |
| AdProfile |
每分钟执行多个 SQL,生成多时间窗口指标 |
8 min |
| SnsMixer |
缓存 AdProfile 数据,响应出价引擎查询 |
5 min TTL |
5.3 AdProfile 处理逻辑
1 2 3 4 5 6 7 8
| def refreshMetrics() { query10DayMetrics() query24HourMetrics() query2HourMetrics() queryFlowback() }
|
5.4 延迟分析
| 阶段 |
延迟 |
原因 |
| SparkStreaming |
1 min |
微批处理 batch interval |
| AdProfile SQL |
8 min |
串行执行,全量扫描 TPG |
| SnsMixer 缓存 |
0-5 min |
TTL 导致数据滞后 |
| 总计 |
10-14 min |
- |
5.5 核心问题
- 串行 SQL:4 个 SQL 串行执行,最大瓶颈
- 全量扫描:每次查询都扫描全量数据,无增量计算
- 多层缓存:SnsMixer 5min TTL 额外增加延迟
六、V2 方案:写入时计算
6.1 核心思路
把”查询时聚合”变成”写入时聚合”,用空间换时间
6.2 架构
1 2 3 4 5 6 7 8 9 10 11
| ┌─────────────────────────────────────────────────────────────────────────────┐ │ V2 架构 │ │ │ │ Tdbank ──► SparkStreaming ──► 内存全量计算 ──► 实时指标 ──► 出价引擎 │ │ (1min) (20s更新) (毫秒级) │ │ │ │ │ ▼ │ │ TPG │ │ (持久化+容错) │ │ │ └─────────────────────────────────────────────────────────────────────────────┘
|
6.3 核心组件设计
主进程 (SparkStreaming)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| readParams()
new processor configSparkStreaming() accu_class.init() all_class.init()
foreachRdd { data = getRealData() accu_class.merge(data) all_class.merge(data) index = all_class.getIndex() }
|
accu_class:增量数据 + 持久化
1 2 3 4 5 6 7 8 9 10 11
| val map
def init { new thread { writeToTpg() checkNeedRestore() thread.sleep(N分钟) } }
def merge()
|
all_class:全量数据 + 实时指标计算
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| val map val index
def init { restoreData() new thread { calculateIndex() removeOldData() writeCheckpoint() thread.sleep(20s) } }
def getIndex() def merge()
|
6.4 为什么快了?
| 优化点 |
V1 |
V2 |
效果 |
| 数据存储 |
TPG (磁盘) |
内存 map |
查询:秒级 → 毫秒级 |
| 计算方式 |
每次 SQL 全量扫描 |
增量 merge + 后台计算 |
8分钟 → 20秒 |
| 指标更新 |
串行阻塞 |
后台线程异步 |
非阻塞 |
| 缓存层 |
5min TTL |
无需额外缓存 |
减少 0-5min |
6.5 容错机制
- checkpoint:定期将内存数据写入 TPG
- restore:故障重启时从 TPG 恢复全量数据
- 离线对账:定期比对实时数据与离线数据,不一致时修复
七、业界方案对比
7.1 主流架构
| 公司 |
技术栈 |
延迟 |
| Meta |
Scribe → Spark/Flink → TAO |
秒级 |
| Google |
Pub/Sub → Dataflow → Bigtable |
秒级 |
| 字节 |
BMQ → Flink → Abase |
秒级 |
| 腾讯广告 V1 |
Tdbank → Spark → TPG → AdProfile |
10分钟级 |
| 腾讯广告 V2 |
Tdbank → Spark + 内存计算 |
1分钟级 |
7.2 Lambda vs Kappa 架构
| 架构 |
特点 |
适用场景 |
| Lambda |
批流分离,离线修正实时 |
数据一致性要求高 |
| Kappa |
纯流处理,Kafka 作为 source of truth |
追求低延迟 |
| V2 方案 |
类 Lambda,实时计算 + 离线对账 |
平衡延迟和一致性 |
八、最终效果
8.1 性能提升
| 指标 |
V1 |
V2 |
提升 |
| 数据上线延迟 |
10-14 分钟 |
< 1 分钟 |
10x |
| 指标计算频率 |
1 分钟/次 |
20 秒/次 |
3x |
| 查询响应时间 |
秒级 |
毫秒级 |
1000x |
8.2 业务收益
- 调价更精准:数据更新更快,校准因子更准确
- 成本控制更及时:快速响应超成本情况,压低出价
- 广告主 ROI 提升:首批上线的 15 个大广告主均有 10%+ ROI 提升
8.3 系统稳定性
- 容错能力:checkpoint + restore 机制保障数据不丢失
- 可运维性:离线对账机制,发现问题可快速修复
- 可扩展性:内存计算模式可水平扩展
九、面试要点总结
9.1 一句话概括
通过将”查询时聚合”变成”写入时聚合”,在内存中维护全量数据并增量更新,将数据上线延迟从 10 分钟优化到 1 分钟。
9.2 核心优化思路
- 计算前置:把聚合计算从查询时移到写入时
- 空间换时间:内存存储全量数据,避免磁盘 IO
- 增量计算:新数据 merge,不重复扫描历史数据
- 异步处理:后台线程计算指标,不阻塞主流程
9.3 延伸问题
Q: 内存不够怎么办?
A: 按广告 ID 分片,每个节点只存部分数据;冷数据落盘或淘汰。
Q: 如何保证数据一致性?
A: checkpoint 持久化 + 离线对账修复。
Q: 时间窗口怎么选择?
A: 优先短窗口(时效性高),样本不足时回退到长窗口;或样本量加权融合。
Q: 校准因子的作用?
A: 修正模型预估偏差。模型预估 CVR=3%,实际 CVR=2.4%,校准因子=0.8,压低出价。
十、参考资料
- 腾讯广告实时数据系统设计文档
- Flink 官方文档 - State Management
- 《广告算法》- 刘鹏
- Meta Engineering Blog - Real-time Data Infrastructure