我有一个在Spark 2.1.1上运行的流媒体作业,轮询Kafka 0.10。我正在使用Spark KafkaUtils类来创建一个DStream,一切都很好,直到我有数据因为保留策略而过时。当我停止工作进行一些更改时,我的问题就来了。如果有任何数据超出了主题,我会收到一个错误,说我的偏移超出了范围。我做了很多研究,包括查看spark源代码,我看到了很多评论,比如本期的评论:spark-19680——基本上是说数据不应该默默地丢失——所以auto.offset.reset被spark忽略了。不过,我最大的问题是,我现在能做什么?我的主题不会在spark中进行民意调查——它在启动时会消亡,但偏移量除外。我不知道如何重置偏移量,所以我的工作将重新开始。自从我读到这些检查点对于这种用途来说是不可靠的之后,我就没有启用过检查点。我曾经有很多代码来管理偏移,但如果有任何提交的偏移,spark似乎会忽略请求的偏移,所以我现在管理的偏移是这样的:
val stream = KafkaUtils.createDirectStream[String, T](
ssc,
PreferConsistent,
Subscribe[String, T](topics, kafkaParams))
stream.foreachRDD { (rdd, batchTime) =>
val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
Log.debug("processing new batch...")
val values = rdd.map(x => x.value())
val incomingFrame: Dataset[T] = SparkUtils.sparkSession.createDataset(values)(consumer.encoder()).persist
consumer.processDataset(incomingFrame, batchTime)
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
}
ssc.start()
ssc.awaitTermination()
作为一种变通方法,我一直在更改我的组ID,但这真的很糟糕。我知道这是预期的行为,不应该发生,我只需要知道如何让流重新运行。任何帮助都将不胜感激。
这是我写的一个代码块,直到一个真正的解决方案被引入到spark-streaming-kafka。它基本上根据您设置的偏移重置策略重置已老化的分区的偏移量。只需给它相同的地图参数,_params,你提供给KafkaUtils。在从驱动程序调用 KafkaUtils.create****Stream() 之前调用它。
final OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(_params.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toString().toUpperCase(Locale.ROOT));
if(OffsetResetStrategy.EARLIEST.equals(offsetResetStrategy) || OffsetResetStrategy.LATEST.equals(offsetResetStrategy)) {
LOG.info("Going to reset consumer offsets");
final KafkaConsumer<K,V> consumer = new KafkaConsumer<>(_params);
LOG.debug("Fetching current state");
final List<TopicPartition> parts = new LinkedList<>();
final Map<TopicPartition, OffsetAndMetadata> currentCommited = new HashMap<>();
for(String topic: this.topics()) {
List<PartitionInfo> info = consumer.partitionsFor(topic);
for(PartitionInfo i: info) {
final TopicPartition p = new TopicPartition(topic, i.partition());
final OffsetAndMetadata m = consumer.committed(p);
parts.add(p);
currentCommited.put(p, m);
}
}
final Map<TopicPartition, Long> begining = consumer.beginningOffsets(parts);
final Map<TopicPartition, Long> ending = consumer.endOffsets(parts);
LOG.debug("Finding what offsets need to be adjusted");
final Map<TopicPartition, OffsetAndMetadata> newCommit = new HashMap<>();
for(TopicPartition part: parts) {
final OffsetAndMetadata m = currentCommited.get(part);
final Long begin = begining.get(part);
final Long end = ending.get(part);
if(m == null || m.offset() < begin) {
LOG.info("Adjusting partition {}-{}; OffsetAndMeta={} Begining={} End={}", part.topic(), part.partition(), m, begin, end);
final OffsetAndMetadata newMeta;
if(OffsetResetStrategy.EARLIEST.equals(offsetResetStrategy)) {
newMeta = new OffsetAndMetadata(begin);
} else if(OffsetResetStrategy.LATEST.equals(offsetResetStrategy)) {
newMeta = new OffsetAndMetadata(end);
} else {
newMeta = null;
}
LOG.info("New offset to be {}", newMeta);
if(newMeta != null) {
newCommit.put(part, newMeta);
}
}
}
consumer.commitSync(newCommit);
consumer.close();
}
auto.offset.reset=latest/最早
仅在使用者首次启动时应用。
有Spark JIRA来解决这个问题,在那之前我们需要解决各种解决方法。https://issues.apache.org/jira/browse/SPARK-19680
尝试
auto.offset.reset=latest
或
auto.offset.reset=earliest
最早:自动将偏移重置为最早偏移
最新:自动将偏移量重置为最新偏移量
none:如果没有为使用者的组找到以前的偏移量,则向使用者引发异常
其他任何操作:向消费者抛出异常。
影响偏移值对应最小和最大配置的另一件事是日志保留策略。假设您有一个主题,保留时间配置为1小时。您产生了10条消息,然后一小时后您又发布了10条消息。最大偏移量仍然保持不变,但最小偏移量不能为0,因为Kafka已经删除了这些消息,因此可用的最小偏移量将为10。