在分布式系统中,“消息只被消费一次” 是一个经典又棘手的问题。 在高并发(峰值 TPS 10万+)场景下,网络抖动、重试机制、重复投递、消费者故障等,都可能导致消息被多次处理。 如果幂等性设计不当,轻则重复扣库存,重则导致资金错误、业务数据混乱。
本文将带你从 消息队列 → 生产者 → 消费者 → 存储层 → 监控层 全链路拆解,实现真正的“精确一次(Exactly Once)” 消费语义。
🧩 1. 消息队列层面的保障
1.1 选择合适的 MQ
# ✅ RocketMQ - 推荐选择 - 支持事务消息(本地事务 + 消息半提交) - 消息重试机制完善 - 顺序消息支持 - 性能与一致性平衡优秀 # ⚙️ Kafka - 可选方案 - 支持精确一次语义(EOS) - 需要事务生产者 + 原子提交 offset 配合 - 配置复杂但吞吐更高1.2 生产者端的防重设计
// 唯一消息ID生成器 public class MessageIdGenerator { public static String generateMessageId(String businessKey) { return businessKey + "_" + System.currentTimeMillis() + "_" + ThreadLocalRandom.current().nextInt(1000); } } // 生产者发送事务消息 public class OrderMessageProducer { public SendResult sendOrderMessage(Order order) { Message message = new Message("ORDER_TOPIC", "CREATE", JSON.toJSONBytes(order)); // 设置唯一业务Key message.setKeys(order.getOrderNo()); // RocketMQ事务消息发送 return rocketMQTemplate.sendMessageInTransaction( "order-transaction-group", message, null); } }💡 要点:MessageId 必须具备全局唯一性且可追溯到业务主键,用于后续幂等判断。
🧱 2. 消费者端的幂等性设计
2.1 基于 Redis 分布式锁(适用于中等并发)
@Component public class OrderMessageConsumer { @Autowired private RedisTemplate<String, String> redisTemplate; @RocketMQMessageListener(topic = "ORDER_TOPIC", consumerGroup = "order-consumer-group") public void handleOrderMessage(Message message) { String orderNo = message.getKeys(); String lockKey = "order_consume_lock:" + orderNo; // 尝试获取分布式锁 Boolean lockAcquired = redisTemplate.opsForValue() .setIfAbsent(lockKey, "processing", Duration.ofMinutes(5)); if (!Boolean.TRUE.equals(lockAcquired)) { // 正在处理或已处理,直接返回