广告实时调价数据流优化:从10分钟到1分钟

广告实时调价数据流优化

一、背景

1.1 业务场景

在 oCPM(Optimized Cost Per Mille)智能出价场景中,广告系统需要根据实时转化效果动态调整出价。核心公式为:

1
eCPM = Target_CPA × pCTR × pCVR × 校准因子 × 成本控制因子 × 1000

其中:

  • Target_CPA:广告主设定的目标转化成本
  • pCTR / pCVR:模型预估的点击率/转化率
  • 校准因子:修正模型预估偏差,calibration = 实际CVR / 预估CVR
  • 成本控制因子:根据实际 CPA 与目标 CPA 的偏差调整出价

1.2 核心挑战

挑战 说明
时效性 数据越新,调价越精准;但实时处理成本高
准确性 需要归因服务确定转化归属,反作弊过滤虚假转化
稳定性 高 QPS 场景下,数据系统必须高可用
多窗口 需要同时维护 2h/24h/7d 等多个时间窗口的指标

1.3 优化目标

将数据上线时间从 10 分钟级提升到 1 分钟级,提升调价引擎的响应速度和准确性。


二、调价数据流全景

2.1 端到端数据流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
┌─────────────────────────────────────────────────────────────────────────────┐
│ 广告调价数据流 │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 曝光/点击 │ │ 转化回传 │ │ 归因服务 │ │ 反作弊过滤 │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │ │
│ └───────────────┴───────────────┴───────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 消息队列 (Kafka/Tdbank) │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 流处理引擎 (Spark/Flink) │ │
│ │ 聚合计算 + 多时间窗口 │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 特征存储 (Redis/TPG) │ │
│ │ 多时间窗口聚合指标 │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 调价引擎 (Bidding Engine) │ │
│ │ 计算校准因子 + 成本控制 │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘

2.2 关键上游服务

归因服务 (Attribution Service)

核心问题:一个转化应该归功于哪次广告曝光/点击?

常见归因模型:

  • Last Click:归因到最后一次点击
  • Last Touch:点击优先于曝光,归因到最后一次触达
  • Time Decay:时间衰减加权

反作弊过滤 (Anti-Fraud Filter)

核心问题:过滤虚假转化,防止脏数据污染调价

检测维度:

  • 设备维度:模拟器、Root设备、设备农场
  • 行为维度:点击到安装时间异常短(点击注入)
  • 网络维度:机房IP、代理IP

三、调价所需数据

3.1 实际效果数据(归因+反作弊后)

指标 说明 来源
Impressions 有效曝光数 曝光日志
Clicks 有效点击数 点击日志
Conversions 有效转化数 转化日志(归因后)
Cost 实际消耗 计费系统
CTR 点击率 = Click/Impression 计算
CVR 转化率 = Conversion/Click 计算
CPA 转化成本 = Cost/Conversion 计算

3.2 模型预估数据(用于计算校准因子)

指标 说明 采集点
Σ pCTR 每次曝光时的 pCTR 累加 曝光时记录
Σ pCVR 每次曝光时的 pCVR 累加 曝光时记录
Σ pCVR|click 每次点击时的 pCVR 累加 点击时记录

3.3 校准因子计算

1
2
CTR 校准因子 = 实际点击数 / Σ pCTR
CVR 校准因子 = 实际转化数 / Σ pCVR

四、多时间窗口处理策略

4.1 为什么需要多时间窗口?

窗口 样本量 时效性 适用场景
2h 捕捉突发变化(素材衰退、竞争加剧)
24h 日常调价主力,平衡稳定性和响应速度
7d/10d 提供基准线,防止短期波动误判

4.2 窗口选择策略

1
2
3
4
5
6
7
8
9
10
11
func GetCalibration(adID string) *CalibrationData {
// 优先用短窗口(时效性高),样本不足时回退到长窗口
for _, window := range []string{"2h", "24h", "7d"} {
data := queryData(adID, window)
if data.Clicks >= minClickThreshold[window] &&
data.Conversions >= minConvThreshold[window] {
return calculateCalibration(data)
}
}
return defaultCalibration // 样本不足,不校准
}

4.3 多窗口融合策略

方案1:样本量加权

1
2
weightedCVR = (cvr_2h × clicks_2h + cvr_24h × clicks_24h × 0.8 + cvr_7d × clicks_7d × 0.5) 
/ (clicks_2h + clicks_24h × 0.8 + clicks_7d × 0.5)

方案2:置信度阈值切换

  • 2h 样本 > 100 clicks → 用 2h 窗口
  • 否则用 24h 窗口
  • 24h 也不足 → 用 7d 窗口

4.4 分层计算策略(进阶优化)

不同时间窗口对时效性要求不同,可以采用分层并行计算,平衡计算成本和数据新鲜度:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
┌─────────────────────────────────────────────────────────────────────────────┐
│ 分层计算架构 │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 实时层 (Real-time) │ │
│ │ │ │
│ │ 窗口: 2h │ │
│ │ 计算: 内存实时聚合 (Flink/Spark Streaming) │ │
│ │ 更新: 秒级/分钟级 │ │
│ │ 存储: Redis │ │
│ │ │ │
│ │ 适用: 对时效性要求高,需要快速响应效果变化 │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 近实时层 (Near Real-time) │ │
│ │ │ │
│ │ 窗口: 24h │ │
│ │ 计算: 离线任务 (Spark Batch) │ │
│ │ 更新: 小时级 (每小时跑一次) │ │
│ │ 存储: Redis / HBase │ │
│ │ │ │
│ │ 适用: 作为调价主力,1小时延迟可接受 │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 离线层 (Batch) │ │
│ │ │ │
│ │ 窗口: 7d / 10d │ │
│ │ 计算: 离线任务 (Hive/Spark) │ │
│ │ 更新: 天级 (每天凌晨跑一次) │ │
│ │ 存储: HBase / MySQL │ │
│ │ │ │
│ │ 适用: 提供基准线,天级延迟完全可接受 │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘

分层计算的优势

优势 说明
资源高效 只有短窗口需要实时计算,长窗口用离线资源
成本优化 实时计算资源贵,离线资源便宜
并行处理 三层独立运行,互不阻塞
容错隔离 实时层故障不影响离线层,反之亦然

实现示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 分层数据源
type MultiLayerMetricsStore struct {
realtime *RedisClient // 2h 窗口,实时更新
nearRT *RedisClient // 24h 窗口,小时更新
batch *HBaseClient // 7d 窗口,天级更新
}

// 查询时合并三层数据
func (s *MultiLayerMetricsStore) GetMetrics(adID string) *MultiWindowMetrics {
return &MultiWindowMetrics{
Hour2: s.realtime.Get(adID), // 实时层
Hour24: s.nearRT.Get(adID), // 近实时层
Day7: s.batch.Get(adID), // 离线层
}
}

调度配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 实时层 - Flink Job (常驻)
realtime_job:
window: 2h
trigger: continuous
sink: redis

# 近实时层 - Spark 定时任务
nearrt_job:
window: 24h
schedule: "0 * * * *" # 每小时执行
source: hive
sink: redis

# 离线层 - Hive 定时任务
batch_job:
window: 7d
schedule: "0 3 * * *" # 每天凌晨3点执行
source: hive
sink: hbase

数据流示意

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
                 实时数据流

┌───────────────┼───────────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Kafka │ │ Kafka │ │ Hive │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
▼ │ │
┌─────────┐ │ │
│ Flink │ │ │
│ (常驻) │ │ │
└────┬────┘ │ │
│ ▼ ▼
│ ┌─────────┐ ┌─────────┐
│ │ Spark │ │ Spark │
│ │ (每小时) │ │ (每天) │
│ └────┬────┘ └────┬────┘
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Redis │ │ Redis │ │ HBase │
│ (2h) │ │ (24h) │ │ (7d) │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└───────────────┼───────────────┘


┌───────────┐
│ 调价引擎 │
│ 合并查询 │
└───────────┘

与 V2 内存方案的对比

对比项 V2 全内存方案 分层计算方案
2h 窗口 内存计算 内存计算
24h 窗口 内存计算 离线小时级
7d 窗口 内存计算 离线天级
内存压力 高 (存全量) 低 (只存短窗口)
计算成本
延迟 全部分钟级 2h分钟级/24h小时级/7d天级
适用场景 全窗口高时效性要求 短窗口高时效,长窗口可接受延迟

选择建议:如果业务对 24h/7d 窗口的时效性要求不高(小时/天级延迟可接受),分层计算方案更经济;如果全部窗口都需要分钟级时效,则选择 V2 全内存方案。


五、V1 方案:查询时计算

5.1 架构

1
2
3
4
5
6
7
┌─────────────────────────────────────────────────────────────────────────────┐
│ V1 架构 │
│ │
│ Tdbank ──► SparkStreaming ──► TPG ──► AdProfile ──► SnsMixer ──► 出价 │
│ (1min) (存储) (8min) (5min) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘

5.2 各组件职责

组件 职责 延迟
SparkStreaming 处理 Tdbank 3路数据流,聚合到 aid 维度,增量写入 TPG 1 min
TPG 存储 oCPM 广告全量明细数据 -
AdProfile 每分钟执行多个 SQL,生成多时间窗口指标 8 min
SnsMixer 缓存 AdProfile 数据,响应出价引擎查询 5 min TTL

5.3 AdProfile 处理逻辑

1
2
3
4
5
6
7
8
// 串行执行多个 SQL
def refreshMetrics() {
query10DayMetrics() // 2 min - 全量扫描 TPG
query24HourMetrics() // 2 min
query2HourMetrics() // 1.5 min
queryFlowback() // 2.5 min
// 总计 8 min
}

5.4 延迟分析

阶段 延迟 原因
SparkStreaming 1 min 微批处理 batch interval
AdProfile SQL 8 min 串行执行,全量扫描 TPG
SnsMixer 缓存 0-5 min TTL 导致数据滞后
总计 10-14 min -

5.5 核心问题

  1. 串行 SQL:4 个 SQL 串行执行,最大瓶颈
  2. 全量扫描:每次查询都扫描全量数据,无增量计算
  3. 多层缓存:SnsMixer 5min TTL 额外增加延迟

六、V2 方案:写入时计算

6.1 核心思路

把”查询时聚合”变成”写入时聚合”,用空间换时间

6.2 架构

1
2
3
4
5
6
7
8
9
10
11
┌─────────────────────────────────────────────────────────────────────────────┐
│ V2 架构 │
│ │
│ Tdbank ──► SparkStreaming ──► 内存全量计算 ──► 实时指标 ──► 出价引擎 │
│ (1min) (20s更新) (毫秒级) │
│ │ │
│ ▼ │
│ TPG │
│ (持久化+容错) │
│ │
└─────────────────────────────────────────────────────────────────────────────┘

6.3 核心组件设计

主进程 (SparkStreaming)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 读取配置
readParams()

// 初始化
new processor // 自定义实时数据解析
configSparkStreaming() // 配置 SparkStreaming
accu_class.init() // 初始化增量数据线程
all_class.init() // 初始化全量数据线程

// 订阅数据
foreachRdd {
data = getRealData() // 获取最近1min实时数据
accu_class.merge(data) // 增量数据合并
all_class.merge(data) // 全局数据合并
index = all_class.getIndex() // 获取实时指标
}

accu_class:增量数据 + 持久化

1
2
3
4
5
6
7
8
9
10
11
val map  // 存储增量数据(内存)

def init {
new thread {
writeToTpg() // 数据写入 TPG,同时清空 map
checkNeedRestore() // 需要时基于 TPG 修复数据
thread.sleep(N分钟)
}
}

def merge() // 将最近 1min 数据归并入 map

all_class:全量数据 + 实时指标计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
val map    // 存储全量数据(内存)
val index // 存储最新的实时指标

def init {
restoreData() // 启动时从 TPG 恢复全量数据

new thread {
calculateIndex() // 基于全量数据计算实时指标
removeOldData() // 清除过期数据
writeCheckpoint() // 可选:写入 checkpoint
thread.sleep(20s) // 20秒计算一次
}
}

def getIndex() // 返回最新的 index
def merge() // 将最近 1min 数据归并入 map

6.4 为什么快了?

优化点 V1 V2 效果
数据存储 TPG (磁盘) 内存 map 查询:秒级 → 毫秒级
计算方式 每次 SQL 全量扫描 增量 merge + 后台计算 8分钟 → 20秒
指标更新 串行阻塞 后台线程异步 非阻塞
缓存层 5min TTL 无需额外缓存 减少 0-5min

6.5 容错机制

  1. checkpoint:定期将内存数据写入 TPG
  2. restore:故障重启时从 TPG 恢复全量数据
  3. 离线对账:定期比对实时数据与离线数据,不一致时修复

七、业界方案对比

7.1 主流架构

公司 技术栈 延迟
Meta Scribe → Spark/Flink → TAO 秒级
Google Pub/Sub → Dataflow → Bigtable 秒级
字节 BMQ → Flink → Abase 秒级
腾讯广告 V1 Tdbank → Spark → TPG → AdProfile 10分钟级
腾讯广告 V2 Tdbank → Spark + 内存计算 1分钟级

7.2 Lambda vs Kappa 架构

架构 特点 适用场景
Lambda 批流分离,离线修正实时 数据一致性要求高
Kappa 纯流处理,Kafka 作为 source of truth 追求低延迟
V2 方案 类 Lambda,实时计算 + 离线对账 平衡延迟和一致性

八、最终效果

8.1 性能提升

指标 V1 V2 提升
数据上线延迟 10-14 分钟 < 1 分钟 10x
指标计算频率 1 分钟/次 20 秒/次 3x
查询响应时间 秒级 毫秒级 1000x

8.2 业务收益

  1. 调价更精准:数据更新更快,校准因子更准确
  2. 成本控制更及时:快速响应超成本情况,压低出价
  3. 广告主 ROI 提升:首批上线的 15 个大广告主均有 10%+ ROI 提升

8.3 系统稳定性

  1. 容错能力:checkpoint + restore 机制保障数据不丢失
  2. 可运维性:离线对账机制,发现问题可快速修复
  3. 可扩展性:内存计算模式可水平扩展

九、面试要点总结

9.1 一句话概括

通过将”查询时聚合”变成”写入时聚合”,在内存中维护全量数据并增量更新,将数据上线延迟从 10 分钟优化到 1 分钟。

9.2 核心优化思路

  1. 计算前置:把聚合计算从查询时移到写入时
  2. 空间换时间:内存存储全量数据,避免磁盘 IO
  3. 增量计算:新数据 merge,不重复扫描历史数据
  4. 异步处理:后台线程计算指标,不阻塞主流程

9.3 延伸问题

Q: 内存不够怎么办?
A: 按广告 ID 分片,每个节点只存部分数据;冷数据落盘或淘汰。

Q: 如何保证数据一致性?
A: checkpoint 持久化 + 离线对账修复。

Q: 时间窗口怎么选择?
A: 优先短窗口(时效性高),样本不足时回退到长窗口;或样本量加权融合。

Q: 校准因子的作用?
A: 修正模型预估偏差。模型预估 CVR=3%,实际 CVR=2.4%,校准因子=0.8,压低出价。


十、参考资料

  1. 腾讯广告实时数据系统设计文档
  2. Flink 官方文档 - State Management
  3. 《广告算法》- 刘鹏
  4. Meta Engineering Blog - Real-time Data Infrastructure