提问者:小点点

Kafka消费者民意调查无限期运行,不返回任何内容


我在使用Kafka消费者时遇到了困难(持续时间超时),其中它无限期地运行并且永远不会从方法中出来。要明白这可能与连接有关,我有时会看到它有点不一致。如果轮询停止响应,我该如何处理这个问题?下面给出的是Kafka消费者()的片段

public ConsumerRecords<K, V> poll(final Duration timeout) {
    return poll(time.timer(timeout), true);
}

我从这里打电话给上面:

Duration timeout = Duration.ofSeconds(30);
    while (true) {
        final ConsumerRecords<recordID, topicName> records = consumer.poll(timeout);
        System.out.println("record count is" + records.count());
}

我收到以下错误:

org. apache.kafka.通用.error.SerializationException:反序列化偏移量2处分区的键/值时出错。如果需要,请查找过去的记录以继续消费。


共2个答案

匿名用户

在试图解决上面遇到的问题时,我偶然发现了一些有用的信息。我将提供一段应该能够处理此问题的代码,但在此之前,了解导致此问题的原因很重要。

在向Apache Kafka生成或消费消息或数据时,我们需要该消息或数据的模式结构,在我的例子中是Avro模式。如果向Kafka生成的消息与该消息模式冲突,它将对消费产生影响。

在它消费记录的方法中,在您的消费者主题中添加以下代码——

请记住导入以下软件包:

导入org. apache.kafka.通用。主题分区;
导入org.jsoup.SerializationException;

try {
        while (true) {
            ConsumerRecords<String, GenericRecord> records = null;
            try {
                records = consumer.poll(10000);
            } catch (SerializationException e) {
                String s = e.getMessage().split("Error deserializing key/value 
for partition ")[1].split(". If needed, please seek past the record to 
continue consumption.")[0];
                String topics = s.split("-")[0];
                int offset = Integer.valueOf(s.split("offset ")[1]);
                int partition = Integer.valueOf(s.split("-")[1].split(" at") . 
   [0]);

                TopicPartition topicPartition = new TopicPartition(topics, 
 partition);
                //log.info("Skipping " + topic + "-" + partition + " offset " 
 + offset);
                consumer.seek(topicPartition, offset + 1);
            }


            for (ConsumerRecord<String, GenericRecord> record : records) {

                System.out.printf("value = %s \n", record.value());


            }

        }


    } finally {
        consumer.close();
    }

匿名用户

我在设置测试环境时遇到了这个问题。

在代理上运行以下命令会打印出存储的记录,正如人们所期望的那样:

bin/kafka-console-consumer.sh --bootstrap-server="localhost:9092" --topic="foo" --from-beginning

结果发现Kafka服务器配置错误。要从外部IP地址连接,监听器必须在kafka/config/server.properties中具有一个合法的值,例如:

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092