本文共 550 字,大约阅读时间需要 1 分钟。
在进行 Storm 斜街 Kafka 作业时,Storm 的 Spout 作为 Kafka 的消费者,接收到消息后将其发送到 Bolt 进行输出。然而,开发者发现 Storm 一直提交的都是同样的偏移量(offset),这导致了数据的重复消费。这种现象对数据的准确性和系统的稳定性都产生了负面影响。
在查阅 GitHub 上的官方示例后,开发者发现问题出在 Bolt 的输出逻辑上。具体来说,Bolt 在处理消息后没有及时发送确认(ack),导致作为消费者的 Spout 无法接收到最新的偏移量信息。为了解决这个问题,建议在 Bolt 的 execute 方法中添加 collector.ack(input) 的调用,以确认消息的成功处理。
以下是修复后的代码示例:
```java public void execute(Tuple input) { String log = input.getString(0); if (log.length() > 0) { System.out.println("【info, partition1: hive log】: " + log); } collector.ack(input); //发送 ack 确认,避免重复消费转载地址:http://tpefk.baihongyu.com/