提问者:小点点

来自Kafka主题的Spark Streaming抛出的偏移量超出范围,没有重新启动流的选项


我有一个在Spark 2.1.1上运行的流媒体作业,轮询Kafka 0.10。我正在使用Spark KafkaUtils类来创建一个DStream,一切都很好,直到我有数据因为保留策略而过时。当我停止工作进行一些更改时,我的问题就来了。如果有任何数据超出了主题,我会收到一个错误,说我的偏移超出了范围。我做了很多研究,包括查看spark源代码,我看到了很多评论,比如本期的评论:spark-19680——基本上是说数据不应该默默地丢失——所以auto.offset.reset被spark忽略了。不过,我最大的问题是,我现在能做什么?我的主题不会在spark中进行民意调查——它在启动时会消亡,但偏移量除外。我不知道如何重置偏移量,所以我的工作将重新开始。自从我读到这些检查点对于这种用途来说是不可靠的之后,我就没有启用过检查点。我曾经有很多代码来管理偏移,但如果有任何提交的偏移,spark似乎会忽略请求的偏移,所以我现在管理的偏移是这样的:

val stream = KafkaUtils.createDirectStream[String, T](
    ssc,
    PreferConsistent,
    Subscribe[String, T](topics, kafkaParams))

stream.foreachRDD { (rdd, batchTime) =>
    val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    Log.debug("processing new batch...")

    val values = rdd.map(x => x.value())
    val incomingFrame: Dataset[T] = SparkUtils.sparkSession.createDataset(values)(consumer.encoder()).persist

    consumer.processDataset(incomingFrame, batchTime)
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
}
ssc.start()
ssc.awaitTermination()

作为一种变通方法,我一直在更改我的组ID,但这真的很糟糕。我知道这是预期的行为,不应该发生,我只需要知道如何让流重新运行。任何帮助都将不胜感激。


共3个答案

匿名用户

这是我写的一个代码块,直到一个真正的解决方案被引入到spark-streaming-kafka。它基本上根据您设置的偏移重置策略重置已老化的分区的偏移量。只需给它相同的地图参数,_params,你提供给KafkaUtils。在从驱动程序调用 KafkaUtils.create****Stream() 之前调用它。

final OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(_params.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toString().toUpperCase(Locale.ROOT));
if(OffsetResetStrategy.EARLIEST.equals(offsetResetStrategy) || OffsetResetStrategy.LATEST.equals(offsetResetStrategy)) {
    LOG.info("Going to reset consumer offsets");
    final KafkaConsumer<K,V> consumer = new KafkaConsumer<>(_params);

    LOG.debug("Fetching current state");
    final List<TopicPartition> parts = new LinkedList<>();
    final Map<TopicPartition, OffsetAndMetadata> currentCommited = new HashMap<>();
    for(String topic: this.topics()) {
        List<PartitionInfo> info = consumer.partitionsFor(topic);
        for(PartitionInfo i: info) {
            final TopicPartition p = new TopicPartition(topic, i.partition());
            final OffsetAndMetadata m = consumer.committed(p);
            parts.add(p);
            currentCommited.put(p, m);
        }
    }
    final Map<TopicPartition, Long> begining = consumer.beginningOffsets(parts);
    final Map<TopicPartition, Long> ending = consumer.endOffsets(parts);

    LOG.debug("Finding what offsets need to be adjusted");
    final Map<TopicPartition, OffsetAndMetadata> newCommit = new HashMap<>();
    for(TopicPartition part: parts) {
        final OffsetAndMetadata m = currentCommited.get(part);
        final Long begin = begining.get(part);
        final Long end = ending.get(part);

        if(m == null || m.offset() < begin) {
            LOG.info("Adjusting partition {}-{}; OffsetAndMeta={} Begining={} End={}", part.topic(), part.partition(), m, begin, end);

            final OffsetAndMetadata newMeta;
            if(OffsetResetStrategy.EARLIEST.equals(offsetResetStrategy)) {
                newMeta = new OffsetAndMetadata(begin);
            } else if(OffsetResetStrategy.LATEST.equals(offsetResetStrategy)) {
                newMeta = new OffsetAndMetadata(end);
            } else {
                newMeta = null;
            }

            LOG.info("New offset to be {}", newMeta);
            if(newMeta != null) {
                newCommit.put(part, newMeta);
            }
        }

    }
    consumer.commitSync(newCommit);
    consumer.close();
}

匿名用户

auto.offset.reset=latest/最早仅在使用者首次启动时应用。

有Spark JIRA来解决这个问题,在那之前我们需要解决各种解决方法。https://issues.apache.org/jira/browse/SPARK-19680

匿名用户

尝试

auto.offset.reset=latest

auto.offset.reset=earliest

最早:自动将偏移重置为最早偏移

最新:自动将偏移量重置为最新偏移量

none:如果没有为使用者的组找到以前的偏移量,则向使用者引发异常

其他任何操作:向消费者抛出异常。

影响偏移值对应最小和最大配置的另一件事是日志保留策略。假设您有一个主题,保留时间配置为1小时。您产生了10条消息,然后一小时后您又发布了10条消息。最大偏移量仍然保持不变,但最小偏移量不能为0,因为Kafka已经删除了这些消息,因此可用的最小偏移量将为10。