kafka consumer 手动 ack

news/2025/2/27 5:50:58

在消费 Kafka 消息时,手动确认(acknowledge)消息的消费,可以通过使用 KafkaConsumer 类中的 commitSync()commitAsync() 方法来实现。这些方法将提交当前偏移量,确保在消费者崩溃时不会重新消费已处理的消息。

以下是一个简单的手动 ack 的示例代码:

1. 配置 KafkaConsumer 和手动确认消费

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

public class KafkaManualAckConsumer {
    public static void main(String[] args) {
        // 配置消费者的基本属性
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka 服务器地址
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); // 消费者组ID
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息key反序列化
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息value反序列化
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 关闭自动提交,启用手动提交

        // 创建 KafkaConsumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        // 订阅主题
        consumer.subscribe(Collections.singletonList("my-topic"));

        try {
            while (true) {
                // 拉取消息
                var records = consumer.poll(1000); // 拉取数据,等待最多1000ms

                // 处理每一条消息
                records.forEach(record -> {
                    System.out.println("Consumed message: " + record.value());

                    // 处理完消息后手动提交偏移量
                    // commitSync: 确保消息成功提交
                    consumer.commitSync();
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭消费者
            consumer.close();
        }
    }
}

2. 代码解析

  1. 配置消费者:
    • ENABLE_AUTO_COMMIT_CONFIG 设置为 false,禁用自动提交偏移量。这样就可以在处理完每条消息后手动提交。
  2. 消息消费与手动 ack:
    • poll(1000) 方法拉取最多 1000 毫秒内的消息。
    • commitSync() 方法用于同步提交当前的偏移量,即消费到的消息的位移,这样可以确保 Kafka 消费者确认该消息已处理。
  3. 异常处理:
    • 异常捕获块 catch 用于处理消费过程中可能出现的任何错误,确保程序不会崩溃。
  4. 关闭消费者:
    • finally 块中调用 consumer.close() 来关闭消费者连接。

3. 使用 commitAsync 提高性能(可选)

如果对性能要求更高,可以考虑使用 commitAsync() 方法,它不会阻塞当前线程,提交操作将在后台异步完成:

consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        System.out.println("Error committing offset: " + exception.getMessage());
    } else {
        System.out.println("Successfully committed offsets: " + offsets);
    }
});

这样你可以不阻塞线程,提高消费性能,尤其是在高吞吐量的环境中。
如果你有其他的需求或者想更细致地控制消费的细节,随时告诉我!


http://www.niftyadmin.cn/n/5869568.html

相关文章

HBase与MapReduce结合(二)——对HBase表中字段进行WordCount

目录 1. 数据文本2. pom.xml中依赖配置3. 工具类Util4. 导入数据ImportData5. 对HBase表进行WordCount6. 配置Job7. 结果参考 1. 数据文本 1_song1_2016-1-11 song1 singer1 man slow pc 2_song2_2016-1-11 song2 singer2 woman slow ios 3_song3_2016-1-11 song3 singer3 man…

Project #0 - C++ Primer前置知识学习

这次的Lab0主要以熟悉C 11和C 17的新特性。这里先把相关新特性分析一下。 目录 一、Doxygen注释 二、函数签名分析 三、构造函数与delete、explicit 四、static constexpr 解释 五、各种cast转换(重点) 5.1 const_cast与reinterpret_cast 5.2 static_cast与dynamic_cas…

eclipse 4.4.2 m2eclipse apache-maven-3.2.1

apache-maven-3.2.1-CSDN博客 eclipse 4.4.2 m2eclipse

Java SE与Java EE

Java SE&#xff08;Java 平台标准版&#xff09; Java SE 是 Java 平台的核心&#xff0c;提供了 Java 语言的基础功能。它包含了 Java 开发工具包&#xff08;JDK&#xff09;&#xff0c;其中有 Java 编译器&#xff08;javac&#xff09;、Java 虚拟机&#xff08;JVM&…

营销过程乌龟图模版

营销过程乌龟图模版 输入 公司现状产品服务客户问询客户期望电话、电脑系统品牌软件硬件材料 售前 - 沟通 - 确定需求 - 满足需求 - 售后 机料环 电话、电脑等设备软件硬件、系统品牌等工具材料 人 责任人协助者生产者客户 法 订单由谁评审控制程序营销过程控制程序顾客满意度…

SOC-ATF 安全启动BL1流程分析(1)

一、ATF 源码下载链接 1. ARM Trusted Firmware (ATF) 官方 GitHub 仓库 GitHub 地址: https://github.com/ARM-software/arm-trusted-firmware 这是 ATF 的官方源码仓库&#xff0c;包含最新的代码、文档和示例。 下载方式&#xff1a; 使用 Git 克隆仓库&#xff1a; git…

miqiu的分布式锁(二):实战——用JMeter验证JVM锁能否解决MySQL超卖问题

miqiu的分布式锁二&#xff1a;实战——用JMeter验证JVM锁能否解决MySQL超卖问题 实验背景 在秒杀场景中&#xff0c;超卖问题是典型的并发编程挑战。本文通过JMeter压测工具&#xff0c;验证基于JVM的两种锁机制&#xff08;synchronized/ReentrantLock&#xff09;对MySQL库…

kubernetes 初学命令

基础命令 kubectl 1. kubetcl get #查看node节点状态 kubectl get nodes #查看pods节点状态 kubectl get pods 2.kubectl run #kubectl run 命令在 pod 中创建并运行特定的镜像 kubectl run nginx --imagenginx --port 80 3.kubectl describe #看到pod 的详情 kubectl d…