Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

附录F 供应商数据同步链路

1. 背景

数字商品平台需要从外部供应商同步供给数据。本方案讨论的是一条通用的供应商数据同步链路,并以酒店供给全量同步为例展开。酒店数据规模大、结构复杂、变化频率不一致:酒店名称、地址、设施、图片等静态信息变化较慢;房型、套餐、最低价、可售状态等半动态信息需要更高频刷新;下单前房态房价必须实时确认。

本设计聚焦一个典型任务:

通过遍历所有城市,从供应商拉取酒店信息
酒店规模约 100 万
任务预计运行 10 小时
需要支持断点续跑、失败补偿、数据追溯和质量监控

这类任务不能只依赖进程内状态做一个长循环。第一阶段更推荐设计成 Batch + Checkpoint + DLQ 的可恢复流水线:任务可以按城市和分页顺序遍历,进度持久化在数据库里,失败后从 checkpoint 继续。任务分片和分布式 Worker 抢占可以作为后续优化项目,而不是一开始就进入主链路。

2. 设计目标

  1. 可恢复:任务中断后可以从 checkpoint 继续,不从头重跑。
  2. 可追溯:保存供应商原始数据 Raw Snapshot,支持问题排查和回放。
  3. 可治理:通过标准化、质量校验、Diff、版本控制,避免错误数据污染平台模型。
  4. 可补偿:失败数据进入 DLQ,支持自动重试、人工修复和重新投递。
  5. 可观测:实时查看任务进度、失败原因、供应商质量和业务影响指标。
  6. 不影响交易安全:列表页可缓存,详情页更接近实时,创单前必须实时确认。

3. 核心难点

难点说明设计策略
任务时间长100 万酒店跑 10 小时,中途失败概率高Batch + Page/Cursor Checkpoint
数据量大全量同步可能包含酒店、房型、图片、设施等大 payloadRaw Snapshot 存引用,主表保持轻量
供应商不稳定超时、限流、5xx、分页游标失效限流、熔断、指数退避、DLQ
模型不一致供应商酒店/房型/套餐与平台 Resource/SPU/SKU/Offer 不一致标准化映射 + supplier mapping
数据质量不稳定字段缺失、城市映射失败、价格异常、坐标漂移分层质量校验 + 部分成功
发布风险同步成功不代表可以发布sync version、snapshot version、publish version 分离
下游一致性DB 更新成功但 ES、缓存、事件可能失败Outbox + 索引补偿

4. 总体架构

Full Sync Task
  → Sync Batch
  → Page Fetch
  → Raw Snapshot
  → Normalize
  → Quality Check
  → Resource Mapping
  → Diff
  → Publish
  → Search / Cache / Downstream Event
  → Metrics / DLQ / Compensation

架构图见:

供应商数据同步链路架构图

Data Flow Diagram 见:

供应商数据同步 Data Flow Diagram

图文件:

  • ecommerce-book/images/supplier-sync-architecture.png
  • ecommerce-book/images/supplier-sync-architecture.svg
  • ecommerce-book/images/supplier-sync-data-flow.png
  • ecommerce-book/images/supplier-sync-data-flow.svg

5. 任务模型

5.1 Task:同步任务定义

supplier_sync_task 描述“要同步什么、怎么同步、多久同步一次”。

CREATE TABLE supplier_sync_task (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    task_code VARCHAR(64) NOT NULL,
    supplier_id BIGINT NOT NULL,
    category_code VARCHAR(32) NOT NULL,
    sync_mode VARCHAR(32) NOT NULL COMMENT 'FULL/INCREMENTAL/PUSH/REFRESH',
    data_scope VARCHAR(64) NOT NULL COMMENT 'RESOURCE/PRODUCT/OFFER/STOCK_PRICE',
    schedule_type VARCHAR(32) NOT NULL COMMENT 'CRON/MANUAL/PUSH',
    cron_expr VARCHAR(64) DEFAULT NULL,
    status VARCHAR(32) NOT NULL COMMENT 'ENABLED/DISABLED',
    concurrency_policy VARCHAR(32) NOT NULL DEFAULT 'SKIP_IF_RUNNING'
        COMMENT 'SKIP_IF_RUNNING/CANCEL_PREVIOUS/ALLOW_PARALLEL',
    last_batch_id VARCHAR(64) DEFAULT NULL,
    owner_team VARCHAR(64) DEFAULT NULL,
    created_at DATETIME NOT NULL,
    updated_at DATETIME NOT NULL,
    UNIQUE KEY uk_task_code (task_code),
    KEY idx_supplier_category (supplier_id, category_code)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='供应商同步任务定义';

样例:

task_code: hotel_supplier_full_resource
supplier_id: 1001
category_code: HOTEL
sync_mode: FULL
data_scope: RESOURCE
schedule_type: MANUAL
status: ENABLED
concurrency_policy: SKIP_IF_RUNNING
owner_team: product-sync

5.2 Batch:一次任务执行批次

supplier_sync_batch 记录一次任务执行的状态、水位、统计和版本。

CREATE TABLE supplier_sync_batch (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    batch_id VARCHAR(64) NOT NULL,
    task_code VARCHAR(64) NOT NULL,
    trigger_source VARCHAR(32) NOT NULL COMMENT 'CRON/MANUAL/COMPENSATION',
    trigger_id VARCHAR(64) DEFAULT NULL COMMENT '外部触发幂等 ID',
    supplier_id BIGINT NOT NULL,
    category_code VARCHAR(32) NOT NULL,
    sync_mode VARCHAR(32) NOT NULL,
    data_scope VARCHAR(64) NOT NULL,
    status VARCHAR(32) NOT NULL COMMENT 'PENDING/RUNNING/SUCCESS/PARTIAL_FAILED/FAILED/CANCELLED',
    sync_batch_version BIGINT NOT NULL,
    start_checkpoint VARCHAR(512) DEFAULT NULL,
    end_checkpoint VARCHAR(512) DEFAULT NULL,
    total_count INT NOT NULL DEFAULT 0,
    success_count INT NOT NULL DEFAULT 0,
    failed_count INT NOT NULL DEFAULT 0,
    skipped_count INT NOT NULL DEFAULT 0,
    current_city_code VARCHAR(64) DEFAULT NULL,
    current_page INT DEFAULT NULL,
    progress_percent DECIMAL(5,2) NOT NULL DEFAULT 0.00,
    worker_id VARCHAR(64) DEFAULT NULL,
    lease_token VARCHAR(64) DEFAULT NULL,
    lease_until DATETIME DEFAULT NULL,
    heartbeat_at DATETIME DEFAULT NULL,
    last_heartbeat_stage VARCHAR(64) DEFAULT NULL,
    last_heartbeat_message VARCHAR(512) DEFAULT NULL,
    last_checkpoint_at DATETIME DEFAULT NULL,
    created_at DATETIME NOT NULL,
    started_at DATETIME DEFAULT NULL,
    finished_at DATETIME DEFAULT NULL,
    updated_at DATETIME NOT NULL,
    error_message VARCHAR(1024) DEFAULT NULL,
    UNIQUE KEY uk_batch_id (batch_id),
    UNIQUE KEY uk_task_trigger (task_code, trigger_id),
    KEY idx_task_status (task_code, status),
    KEY idx_status_lease (status, lease_until),
    KEY idx_supplier_time (supplier_id, started_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='供应商同步批次';

样例:

batch_id: batch_20260427_hotel_full_001
task_code: hotel_supplier_full_resource
trigger_source: MANUAL
trigger_id: req_20260427_0001
supplier_id: 1001
category_code: HOTEL
sync_mode: FULL
data_scope: RESOURCE
status: RUNNING
sync_batch_version: 202604270001
total_count: 1000000
success_count: 688200
failed_count: 320
skipped_count: 12000
current_city_code: BKK
current_page: 120
progress_percent: 68.82
worker_id: hotel-sync-worker-pod-a1b2c3-12345-20260427T103000Z
lease_token: 7f2d4c77-5d5b-4f1f-aeb0-74f7f21c6e2a
lease_until: 2026-04-27 10:35:00
heartbeat_at: 2026-04-27 10:30:00
last_heartbeat_stage: FETCHING
last_heartbeat_message: fetching city=BKK page=120
last_checkpoint_at: 2026-04-27 10:29:50

6. 任务创建、互斥与执行恢复

6.1 任务创建流程

一次同步任务通常由定时调度、运营手动触发或系统补偿触发。无论来源是什么,都不应该直接启动一个进程开始跑,而是先创建 batch,再由执行器领取 batch。

触发同步
  → 查询 supplier_sync_task
  → 检查任务是否 ENABLED
  → 检查 trigger_id 幂等
  → 检查互斥策略
  → 创建 supplier_sync_batch(status=PENDING)
  → 执行器抢占 batch
  → 执行同步

创建 batch 时要初始化:

字段说明
batch_id本次执行唯一 ID
trigger_source / trigger_id触发来源和外部请求幂等 ID
sync_batch_version本次同步批次版本
status初始为 PENDING
start_checkpoint本次任务起点,通常为空或上次成功水位
end_checkpoint当前进度,任务执行过程中不断推进
total_count预计处理数量,可先为空或估算
worker_id / lease_token执行器抢占后写入

任务创建也要做幂等。运营后台重复点击、调度器重试、网络超时后重发,都可能重复触发同一个任务。推荐由调用方传入 trigger_id,例如运营后台的 manual_request_id 或调度系统的 fire_id

同一个 task_code + trigger_id
  → 只允许创建一个 batch
  → 重复请求直接返回已存在 batch

如果是定时任务,可以用计划触发时间生成 trigger_id

trigger_id = hotel_supplier_full_resource:2026-04-27T02:00:00Z

6.2 上一次任务还没执行完怎么办

同一个供应商、同一个品类、同一个数据范围的全量任务,通常不应该同时跑多个,否则会造成供应商限流、重复写入、发布版本乱序和进度混乱。这里需要显式定义互斥策略。

策略含义适用场景
SKIP_IF_RUNNING如果已有运行中的 batch,新触发直接跳过定时全量同步、普通刷新
CANCEL_PREVIOUS取消旧 batch,启动新 batch人工修复后需要重新跑全量
ALLOW_PARALLEL允许并行,但必须保证数据范围不重叠不同城市、不同供应商、不同数据 scope

默认建议使用 SKIP_IF_RUNNING。创建 batch 前先检查:

SELECT batch_id, status, heartbeat_at, lease_until
FROM supplier_sync_batch
WHERE task_code = ?
  AND status IN ('PENDING', 'RUNNING')
ORDER BY created_at DESC
LIMIT 1;

如果存在未完成 batch:

concurrency_policy = SKIP_IF_RUNNING
  → 不创建新 batch,记录 SKIPPED 日志

concurrency_policy = CANCEL_PREVIOUS
  → 将旧 batch 标记 CANCELLED
  → 创建新 batch

concurrency_policy = ALLOW_PARALLEL
  → 检查数据范围是否重叠
  → 不重叠才允许创建

相关答辩提示已统一收录到附录B

6.3 Batch 抢占

即使第一阶段不做任务分片,也建议 batch 由执行器通过 CAS 抢占,避免多个进程同时执行同一个 batch。抢占不是“查出来再更新”,而是用一条带条件的 UPDATE 完成。

UPDATE supplier_sync_batch
SET status = 'RUNNING',
    worker_id = ?,
    lease_token = ?,
    lease_until = DATE_ADD(NOW(), INTERVAL 5 MINUTE),
    heartbeat_at = NOW(),
    last_heartbeat_stage = 'CLAIMED',
    last_heartbeat_message = 'batch claimed',
    started_at = IFNULL(started_at, NOW()),
    updated_at = NOW()
WHERE batch_id = ?
  AND status = 'PENDING';

rows_affected = 1 表示抢占成功;rows_affected = 0 表示已经被其他执行器抢走,当前 worker 必须放弃执行。

对于机器重启、进程 OOM、发布中断后遗留的 RUNNING batch,可以允许抢占 lease 已经过期的 batch:

UPDATE supplier_sync_batch
SET worker_id = ?,
    lease_token = ?,
    lease_until = DATE_ADD(NOW(), INTERVAL 5 MINUTE),
    heartbeat_at = NOW(),
    last_heartbeat_stage = 'RECLAIMED',
    last_heartbeat_message = 'expired batch reclaimed',
    updated_at = NOW()
WHERE batch_id = ?
  AND status = 'RUNNING'
  AND lease_until < NOW();

注意,这里只抢占“租约过期”的任务,不抢占“心跳正常”的任务。否则一个慢请求、一次 GC 或一次网络抖动都可能导致双 worker 写同一个 batch。

6.4 worker_idlease_token

worker_id 用来标识“哪个执行器实例在跑任务”,lease_token 用来标识“本次抢占的所有权”。两者要同时使用。

字段作用是否稳定
worker_id标识执行器实例,方便排查、日志关联和监控展示进程生命周期内稳定
lease_token标识一次抢占行为,防止旧 worker 恢复后覆盖新 worker每次抢占重新生成

worker_id 可以用“服务名 + 机器/容器名 + 进程号 + 启动时间”生成:

func GenerateWorkerID(serviceName string) string {
    host := os.Getenv("POD_NAME")
    if host == "" {
        host = os.Getenv("HOSTNAME")
    }
    if host == "" {
        host, _ = os.Hostname()
    }

    pid := os.Getpid()
    startedAt := time.Now().UTC().Format("20060102T150405Z")
    return fmt.Sprintf("%s-%s-%d-%s", serviceName, host, pid, startedAt)
}

示例:

worker_id   = hotel-sync-worker-pod-a1b2c3-12345-20260427T103000Z
lease_token = 7f2d4c77-5d5b-4f1f-aeb0-74f7f21c6e2a

为什么还需要 lease_token?因为容器名或机器名可能复用,旧进程在长 GC 后也可能恢复。只有 worker_id 不够严格;lease_token 能保证“只有当前这次抢占的持有者”才能续租、推进 checkpoint 和结束任务。

所有关键更新都必须带上三个条件:

WHERE batch_id = ?
  AND worker_id = ?
  AND lease_token = ?

如果更新影响行数为 0,要立即停止当前任务,并记录 LEASE_LOST 日志。

6.5 心跳与租约

长任务不能只依赖 status=RUNNING 判断是否还活着。机器重启、进程 OOM、发布重启都可能导致状态永远卡在 RUNNING。因此 batch 要同时有“租约”和“心跳”。

概念解决的问题典型字段
心跳 Heartbeatworker 是否还活着heartbeat_atlast_heartbeat_stage
租约 Lease当前谁拥有任务执行权worker_idlease_tokenlease_until
Checkpoint任务恢复时从哪里继续end_checkpointlast_checkpoint_at

执行器每 15 到 30 秒续租一次,租约建议设置为 2 到 5 分钟。心跳间隔要远小于租约时长,给短暂网络抖动留下余量。

UPDATE supplier_sync_batch
SET heartbeat_at = NOW(),
    lease_until = DATE_ADD(NOW(), INTERVAL 5 MINUTE),
    last_heartbeat_stage = ?,
    last_heartbeat_message = ?,
    updated_at = NOW()
WHERE batch_id = ?
  AND worker_id = ?
  AND lease_token = ?
  AND status = 'RUNNING';

心跳建议上报的不只是“我还活着”,还要包含当前阶段:

阶段含义示例 message
FETCHING正在请求供应商接口fetching city=BKK page=120
SNAPSHOT_SAVING正在保存 Raw Snapshotsaving raw snapshot page=120
NORMALIZING正在做字段标准化normalizing 100 hotels
VALIDATING正在做质量校验validating schema and city mapping
PUBLISHING正在发布平台模型publishing resource changes
CHECKPOINTING正在推进 checkpointcheckpoint to page=121

如果心跳更新失败:

rows_affected = 0
  → 当前 worker 不再拥有任务
  → 停止拉取供应商
  → 停止写平台表
  → 打印 LEASE_LOST 日志
  → 退出执行

这一步非常关键。不能因为“当前进程还活着”就继续跑,因为数据库里的执行权可能已经被新 worker 抢走。

6.6 心跳正常但 Checkpoint 不动怎么办

心跳和 checkpoint 是两个维度。心跳正常只能说明 worker 还活着,不代表任务在前进。可能出现:

  1. 供应商接口一直卡在慢请求。
  2. 某个城市数据量异常大。
  3. Raw Snapshot 存储变慢。
  4. 发布阶段被数据库锁阻塞。
  5. worker 进入了内部死循环,但心跳线程仍然正常。

因此需要同时监控:

heartbeat_lag = now - heartbeat_at
checkpoint_lag = now - last_checkpoint_at

处理策略:

现象判断动作
heartbeat_lag 超过租约worker 失联允许新 worker 抢占
heartbeat_lag 正常,checkpoint_lag 过大worker 活着但进度卡住告警,不立即抢占
heartbeat_lag 正常,阶段长期不变某阶段阻塞根据阶段定位供应商、存储或发布问题

不要在心跳正常时强行抢占。否则可能造成两个 worker 同时处理同一页,只是其中一个更慢。

6.7 机器重启后如何恢复

机器重启后,原 worker 不再续租。调度器或新 worker 会发现:

SELECT batch_id
FROM supplier_sync_batch
WHERE status = 'RUNNING'
  AND lease_until < NOW();

恢复流程:

worker-01 执行 batch
  → 机器重启,心跳停止
  → lease_until 过期
  → worker-02 生成新的 worker_id 和 lease_token
  → worker-02 抢占过期 batch
  → 读取 end_checkpoint
  → 从 city/page/cursor 继续

这时可能重复处理上一页,所以处理逻辑必须幂等:

supplier_id + supplier_resource_code + supplier_product_code

Checkpoint 负责减少重跑范围,幂等负责保证重复处理也不会写错。

6.8 进度上报

进度不要只写日志,要落到 batch 表,便于运营后台、告警系统和排查工具读取。

每处理完一页,更新 checkpoint、统计、进度和心跳:

UPDATE supplier_sync_batch
SET end_checkpoint = ?,
    current_city_code = ?,
    current_page = ?,
    success_count = success_count + ?,
    failed_count = failed_count + ?,
    skipped_count = skipped_count + ?,
    progress_percent = ?,
    heartbeat_at = NOW(),
    lease_until = DATE_ADD(NOW(), INTERVAL 5 MINUTE),
    last_heartbeat_stage = 'CHECKPOINTING',
    last_heartbeat_message = ?,
    last_checkpoint_at = NOW(),
    updated_at = NOW()
WHERE batch_id = ?
  AND worker_id = ?
  AND lease_token = ?
  AND status = 'RUNNING';

上报频率建议按“页”或“固定时间窗口”控制:

上报方式优点缺点
每条酒店上报精确DB 写入过多
每页上报性能和准确性平衡失败时最多重复一页
每 30 秒上报写入少进度略滞后

推荐:每页处理完成后推进 checkpoint,同时每 15 到 30 秒续租心跳。如果一页处理时间可能超过心跳间隔,则需要独立心跳协程,不能等整页处理完成才心跳。

6.9 边界场景处理

场景风险处理
定时任务重复触发同一任务多个 batch 并发concurrency_policy=SKIP_IF_RUNNING
人工重复点击执行重复创建全量任务task_code + status 互斥
机器重启batch 卡在 RUNNINGlease 过期后新 worker 抢占
旧 worker 恢复覆盖新 worker checkpoint更新时校验 worker_id + lease_token
心跳正常但 checkpoint 不动worker 活着但卡住告警定位,不立即抢占
checkpoint 更新失败下次重复处理上一页页内写入必须幂等
checkpoint 先更新后处理失败数据被跳过必须先处理成功再推进 checkpoint
供应商短暂失败任务频繁失败指数退避、限流、熔断
任务被取消仍有 worker 在跑worker 每页检查 batch status
发布新版本进程退出checkpoint + lease 恢复

7. Checkpoint 与断点续跑

7.1 为什么需要 Checkpoint

100 万酒店、10 小时任务,如果只把进度放在内存里,会有三个问题:

  1. 任务中断后恢复困难。
  2. 机器重启后只能从头开始。
  3. 进度不可观测,不知道当前卡在哪里。

因此,第一阶段主设计不引入任务分片,而是在 supplier_sync_batch 上保存 checkpoint。任务仍然可以按城市和分页遍历,但每处理完一页就推进一次 checkpoint。

batch_001
  → city = BKK, page = 1
  → city = BKK, page = 2
  → ...
  → city = JKT, page = 1
  → ...

7.2 Checkpoint 存储

Checkpoint 可以先复用 supplier_sync_batch.start_checkpointsupplier_sync_batch.end_checkpoint,也可以在后续演进中拆出独立 checkpoint 表。

主链路里的 checkpoint 建议记录:

字段含义
city_code当前遍历到哪个城市
page当前处理到第几页
cursor供应商返回的下一页游标
last_supplier_hotel_id上一次成功处理的供应商酒店 ID
success_count当前批次已成功处理数量
failed_count当前批次失败数量
updated_atcheckpoint 更新时间

7.3 Checkpoint 是什么

Checkpoint 是同步任务“跑到哪里了”的进度记录。它用于断点续跑。

示例:

{
  "city_code": "BKK",
  "page": 120,
  "cursor": "abc123",
  "last_supplier_hotel_id": "H998877"
}

如果 Bangkok 第 120 页失败,下次可以从 page 120 或 cursor abc123 继续,而不是从第一页重跑。

7.4 Checkpoint 怎么使用

推荐顺序是:先处理本页数据,再推进 checkpoint

拉取 BKK page=120
  → 保存 Raw Snapshot
  → 标准化
  → 质量校验
  → 平台模型映射
  → Diff / Publish
  → 本页处理成功
  → checkpoint = BKK page=121

不要先推进 checkpoint 再处理数据,否则机器在中间宕机会跳过未处理页面。

机器重启时的恢复流程:

机器重启 / 进程退出
  → 调度器重新启动 batch
  → 读取 batch.end_checkpoint
  → 从 city/page/cursor 继续拉取
  → 已处理过的一页允许重复处理
  → 通过 supplier_id + supplier_resource_code 幂等去重

Checkpoint 只能保证“不大范围重跑”,不能保证“绝不重复处理”。因此它必须和幂等设计配合使用。

8. 拉取与限流

同步任务按城市和分页拉取:

city = BKK
page_size = 100
page = 1..N

容量估算:

1000000 hotels / 10 hours = 27.8 hotels/s

如果每页 100 个酒店:

1000000 / 100 = 10000 pages
10000 pages / 10 hours = 0.28 page/s

如果需要逐个拉酒店详情:

1000000 detail calls / 10 hours = 27.8 QPS

拉取并发度要受供应商限流约束:

fetch_concurrency = min(供应商限流 QPS / 单请求 QPS, 系统处理能力)

必须支持:

  1. 每供应商限流。
  2. 每城市请求节流。
  3. 超时控制。
  4. 失败指数退避。
  5. 供应商异常时熔断。

9. Raw Snapshot 与标准化

9.1 Raw Snapshot

Raw Snapshot 是供应商原始响应数据的快照。它不是平台商品模型,也不是最终发布数据,而是证据和可回放数据。

作用:

  1. 排查问题:线上价格或酒店信息异常时,可以还原供应商当时返回了什么。
  2. 支持回放:修复映射规则后,可以用原始数据重新跑同步。
  3. 支持 Diff:比较本次和上次数据变化。
  4. 明确责任:区分供应商数据错误和平台清洗映射错误。

9.2 Snapshot 表

CREATE TABLE supplier_sync_snapshot (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    snapshot_id VARCHAR(64) NOT NULL,
    batch_id VARCHAR(64) NOT NULL,
    supplier_id BIGINT NOT NULL,
    category_code VARCHAR(32) NOT NULL,
    supplier_resource_code VARCHAR(128) DEFAULT NULL,
    supplier_product_code VARCHAR(128) DEFAULT NULL,
    snapshot_type VARCHAR(32) NOT NULL COMMENT 'RAW/NORMALIZED',
    snapshot_version BIGINT NOT NULL,
    payload_ref VARCHAR(512) DEFAULT NULL,
    payload_hash VARCHAR(64) NOT NULL,
    created_at DATETIME NOT NULL,
    UNIQUE KEY uk_snapshot_id (snapshot_id),
    KEY idx_batch (batch_id),
    KEY idx_supplier_object (supplier_id, supplier_resource_code, supplier_product_code)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='供应商同步快照';

样例:

snapshot_id: rs_20260427_000001
batch_id: batch_20260427_hotel_full_001
supplier_id: 1001
category_code: HOTEL
supplier_resource_code: hotel_8848
supplier_product_code: room_deluxe
snapshot_type: RAW
snapshot_version: 8
payload_ref: s3://hotel-sync/raw/2026/04/27/batch001/BKK/page120.json
payload_hash: 9a0f...e31c

9.3 标准化

供应商字段需要转换成平台标准模型:

供应商字段平台字段
supplier_hotel_idsupplier_resource_code
hotel_nameresource_name
city_codeplatform_city_id
addressaddress
latitudegeo.lat
longitudegeo.lng
facilitiesext_info.facilities

标准化后生成 NORMALIZED snapshot。

10. 质量校验

质量校验分为五层:

校验层校验内容失败处理
Schema 校验必填字段、类型、枚举、时间格式、货币单位进入失败明细
主数据校验城市、国家、商圈、品牌是否存在进入人工映射
模型校验是否能映射 Resource / SPU / SKU / Offer阻断发布
交易校验价格异常、库存异常、可售状态矛盾高风险拦截
业务规则校验站点、渠道、品类是否允许售卖审核或灰度

质量校验要支持部分成功。100 万酒店同步中,不能因为 100 条失败就整批失败。

成功数据:继续发布
失败数据:写入 DLQ
高风险数据:进入审核或人工修复

11. 平台模型映射

酒店通常作为 Resource 沉淀:

supplier_hotel_id
  → supplier_product_mapping
  → platform_resource_id

如果 mapping 存在:

更新 resource / ext_info / room 信息

如果 mapping 不存在:

创建 resource
创建 supplier mapping
必要时创建 SPU / SKU / Offer

酒店同步的核心落库模型:

平台模型说明
resource_tab酒店资源
resource_ext_hotel_tab酒店扩展信息,如地址、设施、坐标、评分
supplier_product_mapping_tab供应商酒店 ID 与平台酒店 ID 的映射
product_spu_tab需要平台售卖承接时创建
product_sku_tab固定售卖单元,部分酒店业务可不沉淀完整 SKU
product_offer_tab套餐、房型、房价计划等销售配置

12. 版本与 Diff

版本分为三类:

版本含义用途
sync_batch_version本次同步任务版本排查哪次同步带来了变化
data_snapshot_version原始/标准化数据快照版本支持回放、diff、回滚
publish_version平台正式发布版本控制搜索、缓存、下游事件一致性

Diff 是标准化后的数据与当前线上发布版本之间的变化。

Normalized Snapshot
  vs
Current Published Resource

Diff 类型:

Diff 类型示例动作
NO_CHANGE无变化跳过
CONTENT_CHANGED酒店名称、地址变化更新详情缓存
IMAGE_CHANGED图片变化更新图片和缓存
GEO_CHANGED城市、坐标变化高风险,进入审核
ROOM_CHANGED房型变化更新房型或 Offer
SELLABILITY_CHANGED可售状态变化刷新可售状态

Diff 表:

CREATE TABLE supplier_sync_diff_log (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    diff_id VARCHAR(64) NOT NULL,
    batch_id VARCHAR(64) NOT NULL,
    supplier_id BIGINT NOT NULL,
    category_code VARCHAR(32) NOT NULL,
    supplier_resource_code VARCHAR(128) DEFAULT NULL,
    supplier_product_code VARCHAR(128) DEFAULT NULL,
    platform_resource_id BIGINT DEFAULT NULL,
    spu_id BIGINT DEFAULT NULL,
    sku_id BIGINT DEFAULT NULL,
    offer_id BIGINT DEFAULT NULL,
    old_publish_version BIGINT DEFAULT NULL,
    new_snapshot_version BIGINT NOT NULL,
    diff_type VARCHAR(64) NOT NULL COMMENT 'NO_CHANGE/CONTENT_CHANGED/PRICE_CHANGED/STOCK_CHANGED/RULE_CHANGED',
    changed_fields JSON NOT NULL,
    risk_level VARCHAR(32) NOT NULL COMMENT 'LOW/MEDIUM/HIGH',
    action VARCHAR(64) NOT NULL COMMENT 'IGNORE/AUTO_PUBLISH/REVIEW/DLQ',
    created_at DATETIME NOT NULL,
    UNIQUE KEY uk_diff_id (diff_id),
    KEY idx_batch (batch_id),
    KEY idx_action (action)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='供应商同步差异日志';

样例:

diff_id: diff_20260427_000001
batch_id: batch_20260427_hotel_full_001
supplier_id: 1001
category_code: HOTEL
supplier_resource_code: hotel_8848
platform_resource_id: 50001
old_publish_version: 22
new_snapshot_version: 8
diff_type: CONTENT_CHANGED
changed_fields:
[
  {"field": "address", "old": "Old Road", "new": "New Road"},
  {"field": "facilities", "old": ["wifi"], "new": ["wifi", "pool"]}
]
risk_level: LOW
action: AUTO_PUBLISH

13. 发布与下游刷新

发布时生成新的 publish_version

resource_id = 50001
old_publish_version = 21
new_publish_version = 22

发布后通过 Outbox 发事件:

HotelResourceUpdated
HotelMappingCreated
HotelContentChanged
HotelSearchIndexRefreshRequired

下游动作:

  1. 搜索索引刷新。
  2. 详情缓存失效。
  3. 商品质量报表更新。
  4. 数据平台 CDC。
  5. 营销、计价、订单读取新版本商品上下文。

14. DLQ 与补偿

14.1 为什么用 MySQL DLQ

酒店同步失败通常不是单纯消息失败,而是字段缺失、映射失败、价格异常、发布失败、索引失败等需要人工修复、状态流转和审计的问题。因此推荐:

Kafka DLQ:短期失败消息缓冲,可选
MySQL DLQ:权威问题单和补偿状态

14.2 DLQ 表

CREATE TABLE supplier_sync_dead_letter (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    dead_letter_id VARCHAR(64) NOT NULL,
    batch_id VARCHAR(64) NOT NULL,
    task_code VARCHAR(64) NOT NULL,
    sync_mode VARCHAR(32) NOT NULL,
    category_code VARCHAR(32) NOT NULL,
    supplier_id BIGINT NOT NULL,
    supplier_resource_code VARCHAR(128) DEFAULT NULL,
    supplier_product_code VARCHAR(128) DEFAULT NULL,
    platform_resource_id BIGINT DEFAULT NULL,
    spu_id BIGINT DEFAULT NULL,
    sku_id BIGINT DEFAULT NULL,
    offer_id BIGINT DEFAULT NULL,
    error_stage VARCHAR(64) NOT NULL COMMENT 'ADAPTER/VALIDATION/MAPPING/PUBLISH/INDEX',
    error_type VARCHAR(64) NOT NULL COMMENT 'RETRYABLE/NON_RETRYABLE/MAPPING_REQUIRED/RISK_BLOCKED',
    error_code VARCHAR(128) NOT NULL,
    error_message VARCHAR(1024) NOT NULL,
    raw_payload_ref VARCHAR(512) DEFAULT NULL,
    raw_payload_hash VARCHAR(64) DEFAULT NULL,
    normalized_payload_ref VARCHAR(512) DEFAULT NULL,
    status VARCHAR(32) NOT NULL DEFAULT 'PENDING'
        COMMENT 'PENDING/RETRYING/MANUAL_FIX/RESOLVED/IGNORED/FAILED',
    retry_count INT NOT NULL DEFAULT 0,
    max_retry_count INT NOT NULL DEFAULT 5,
    next_retry_at DATETIME DEFAULT NULL,
    last_retry_at DATETIME DEFAULT NULL,
    owner_team VARCHAR(64) DEFAULT NULL,
    assignee VARCHAR(64) DEFAULT NULL,
    fix_note VARCHAR(1024) DEFAULT NULL,
    created_at DATETIME NOT NULL,
    updated_at DATETIME NOT NULL,
    resolved_at DATETIME DEFAULT NULL,
    UNIQUE KEY uk_dead_letter_id (dead_letter_id),
    UNIQUE KEY uk_dedup (
        batch_id,
        supplier_id,
        supplier_resource_code,
        supplier_product_code,
        error_stage,
        raw_payload_hash
    ),
    KEY idx_status_next_retry (status, next_retry_at),
    KEY idx_supplier_status (supplier_id, status),
    KEY idx_category_status (category_code, status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='供应商同步死信队列';

样例:

dead_letter_id: dlq_20260427_000001
batch_id: batch_20260427_hotel_full_001
task_code: hotel_supplier_full_resource
sync_mode: FULL
category_code: HOTEL
supplier_id: 1001
supplier_resource_code: hotel_8848
error_stage: MAPPING
error_type: MAPPING_REQUIRED
error_code: CITY_NOT_FOUND
error_message: supplier city code BKK-OLD cannot map to platform city
raw_payload_ref: s3://hotel-sync/raw/2026/04/27/batch001/BKK/page120.json
status: MANUAL_FIX
owner_team: product-sync
assignee: ops_user_01

14.3 状态机

PENDING
  → RETRYING
  → RESOLVED

PENDING
  → MANUAL_FIX
  → RETRYING
  → RESOLVED

PENDING
  → IGNORED

RETRYING
  → FAILED

14.4 补偿 Job

SELECT *
FROM supplier_sync_dead_letter
WHERE status IN ('PENDING', 'FAILED')
  AND next_retry_at <= NOW()
  AND retry_count < max_retry_count
ORDER BY next_retry_at ASC
LIMIT 100;

重试时间使用指数退避:

next_retry_at = now + min(2^retry_count minutes, 1 hour)

15. 监控指标

指标类型指标
任务进度总城市数、已完成城市数、当前城市、当前 page/cursor
处理统计酒店总数、成功数、失败数、跳过数
性能指标任务耗时、供应商 QPS、平均耗时、P99 耗时
质量指标字段缺失率、映射失败率、重复数据率、异常价格率
新鲜度指标数据延迟、过期数据比例、热门酒店刷新延迟
补偿指标DLQ 数量、重试成功率、人工修复数量
下游指标ES 刷新失败数、缓存刷新失败数、事件发布失败数

核心指标公式:

同步成功率 = 成功处理酒店数 / 总酒店数
映射失败率 = 映射失败酒店数 / 总酒店数
字段缺失率 = 缺失关键字段酒店数 / 总酒店数
数据新鲜度延迟 = now - last_success_sync_time
DLQ 修复率 = resolved_dlq_count / total_dlq_count

16. 异常场景

异常处理
某城市同步失败从该城市对应 checkpoint 继续
某页接口超时从 page checkpoint 重试
单个酒店字段缺失写入 DLQ,不阻塞整批
供应商限流降低 worker 数,指数退避
城市映射失败进入人工映射,修复后重新投递
ES 刷新失败Outbox 补偿重试
发布版本异常保留旧版本,新版本不生效

17. 答辩材料

本专题相关总结、常见问题和参考回答已统一收录到附录B

18. 后续优化项目

18.1 任务分片

当单批次同步时间继续变长,或者需要多个 Worker 并行提升吞吐时,可以把任务从“Batch + Checkpoint”演进为“Batch + Shard + Checkpoint”。

典型分片方式:

batch_001
  ├─ city_shard_BKK
  ├─ city_shard_JKT
  ├─ city_shard_SIN
  └─ ...

Shard 表可以这样设计:

CREATE TABLE supplier_sync_shard (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    batch_id VARCHAR(64) NOT NULL,
    shard_type VARCHAR(32) NOT NULL COMMENT 'CITY',
    shard_key VARCHAR(128) NOT NULL COMMENT 'city_code or city_id',
    status VARCHAR(32) NOT NULL COMMENT 'PENDING/RUNNING/SUCCESS/FAILED',
    checkpoint VARCHAR(1024) DEFAULT NULL,
    total_count INT DEFAULT 0,
    success_count INT DEFAULT 0,
    failed_count INT DEFAULT 0,
    skipped_count INT DEFAULT 0,
    worker_id VARCHAR(64) DEFAULT NULL,
    lease_token VARCHAR(64) DEFAULT NULL,
    lease_until DATETIME DEFAULT NULL,
    heartbeat_at DATETIME DEFAULT NULL,
    started_at DATETIME DEFAULT NULL,
    finished_at DATETIME DEFAULT NULL,
    updated_at DATETIME NOT NULL,
    UNIQUE KEY uk_batch_shard (batch_id, shard_key),
    KEY idx_status (status),
    KEY idx_lease (status, lease_until),
    KEY idx_updated_at (updated_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='供应商同步分片';

18.2 分布式 Worker 抢占

多个 Worker 可以通过数据库 CAS 抢占 PENDING shard:

UPDATE supplier_sync_shard
SET status = 'RUNNING',
    worker_id = 'worker-01',
    lease_token = 'token-abc',
    lease_until = DATE_ADD(NOW(), INTERVAL 5 MINUTE),
    heartbeat_at = NOW(),
    updated_at = NOW()
WHERE id = 123
  AND status = 'PENDING';

rows_affected = 1 表示抢占成功,rows_affected = 0 表示已经被其他 Worker 抢走。

执行过程中 Worker 定期续租:

UPDATE supplier_sync_shard
SET heartbeat_at = NOW(),
    lease_until = DATE_ADD(NOW(), INTERVAL 5 MINUTE)
WHERE id = ?
  AND worker_id = ?
  AND lease_token = ?
  AND status = 'RUNNING';

如果 Worker 宕机,租约过期后,调度器把 shard 释放回 PENDING,其他 Worker 读取 shard checkpoint 继续执行。

18.3 Redis 抢占与数据库权威状态

当 batch 或 shard 数量非常多,多个 worker 高频抢占数据库导致压力上升时,可以引入 Redis 作为抢占加速层。

基本做法:

worker 抢 Redis 锁
  → SET lock:sync:batch:{batch_id} value NX EX 300
  → 抢到 Redis 锁后,再 CAS 更新 MySQL batch
  → MySQL 更新成功,才真正执行任务
  → 执行期间同时续 Redis 锁和 MySQL lease

Redis 抢锁示例:

SET lock:sync:batch:batch_001 worker_id:lease_token NX EX 300

续租和释放必须用 Lua 校验 value,不能直接 DEL

if redis.call("GET", key) == value then
    return redis.call("EXPIRE", key, ttl)
else
    return 0
end

释放锁同理:

if redis.call("GET", key) == value then
    return redis.call("DEL", key)
else
    return 0
end

Redis 抢占的关键原则:

  1. Redis 只做短期锁,不做任务事实表。
  2. MySQL 仍然是 batch 状态、checkpoint、统计和审计的权威存储。
  3. worker 只有同时持有 Redis 锁和 MySQL lease,才允许继续执行。
  4. 如果 Redis 锁续租失败,但 MySQL lease 还在,可以选择停止任务并释放 MySQL lease,避免双写风险。
  5. 如果 MySQL lease 更新失败,即使 Redis 锁还在,也必须停止任务。

是否使用 Redis,要看瓶颈在哪里。对于“一个 10 小时酒店全量任务”的第一阶段,MySQL CAS 足够简单可靠;对于“上万个 shard、大量 worker 高频抢占”的阶段,Redis 才更有价值。

18.4 为什么放在后续优化

任务分片和分布式 Worker 会引入额外复杂度:

  1. Shard 状态机。
  2. Worker 租约和心跳。
  3. 旧 Worker 恢复后的并发写保护。
  4. 跨 shard 的批次统计聚合。
  5. 热点城市和长尾城市的任务倾斜。

如果第一阶段的 10 小时任务可以接受,优先实现 Batch + Checkpoint + DLQ 的简单闭环。等同步窗口、供应商限流、数据规模或恢复时间成为瓶颈,再引入 shard 和分布式 Worker。