我在使用Kafka消费者时遇到了困难(持续时间超时),其中它无限期地运行并且永远不会从方法中出来。要明白这可能与连接有关,我有时会看到它有点不一致。如果轮询停止响应,我该如何处理这个问题?下面给出的是Kafka消费者()的片段
public ConsumerRecords<K, V> poll(final Duration timeout) {
return poll(time.timer(timeout), true);
}
我从这里打电话给上面:
Duration timeout = Duration.ofSeconds(30);
while (true) {
final ConsumerRecords<recordID, topicName> records = consumer.poll(timeout);
System.out.println("record count is" + records.count());
}
我收到以下错误:
org. apache.kafka.通用.error.SerializationException:反序列化偏移量2处分区的键/值时出错。如果需要,请查找过去的记录以继续消费。
在试图解决上面遇到的问题时,我偶然发现了一些有用的信息。我将提供一段应该能够处理此问题的代码,但在此之前,了解导致此问题的原因很重要。
在向Apache Kafka生成或消费消息或数据时,我们需要该消息或数据的模式结构,在我的例子中是Avro模式。如果向Kafka生成的消息与该消息模式冲突,它将对消费产生影响。
在它消费记录的方法中,在您的消费者主题中添加以下代码——
请记住导入以下软件包:
导入org. apache.kafka.通用。主题分区;
导入org.jsoup.SerializationException;
try {
while (true) {
ConsumerRecords<String, GenericRecord> records = null;
try {
records = consumer.poll(10000);
} catch (SerializationException e) {
String s = e.getMessage().split("Error deserializing key/value
for partition ")[1].split(". If needed, please seek past the record to
continue consumption.")[0];
String topics = s.split("-")[0];
int offset = Integer.valueOf(s.split("offset ")[1]);
int partition = Integer.valueOf(s.split("-")[1].split(" at") .
[0]);
TopicPartition topicPartition = new TopicPartition(topics,
partition);
//log.info("Skipping " + topic + "-" + partition + " offset "
+ offset);
consumer.seek(topicPartition, offset + 1);
}
for (ConsumerRecord<String, GenericRecord> record : records) {
System.out.printf("value = %s \n", record.value());
}
}
} finally {
consumer.close();
}
我在设置测试环境时遇到了这个问题。
在代理上运行以下命令会打印出存储的记录,正如人们所期望的那样:
bin/kafka-console-consumer.sh --bootstrap-server="localhost:9092" --topic="foo" --from-beginning
结果发现Kafka服务器配置错误。要从外部IP地址连接,监听器
必须在kafka/config/server.properties
中具有一个合法的值,例如:
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092