提问者:小点点

SparkStreaming: DirectStreamRDD到dataframe[重复]


我正在研究火花流上下文,它从avro序列化kafka主题中获取数据,如下所示。

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "schema.registry.url" -> "http://localhost:8081",
  "key.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
  "value.deserializer" -> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
  "group.id" -> "1"
)

使用Kafka utils我正在创建直接流,如下所示

val topics = Set("mysql-foobar")


val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String,String](
    topics,
    kafkaParams)
)

我也将数据写入控制台作为

stream.foreachRDD ( rdd => {
  rdd.foreachPartition(iterator => {
    while (iterator.hasNext) {
      val next = iterator.next()
      println(next.value())
    }
  })
})

现在我想从这些RDD创建数据帧。是否有可能我已经审查并测试了许多来自stackoverflow的解决方案,但遇到了一些问题。堆栈溢出解决方案是这样的,也是这样的。我的输出如下所示

{"c1": 4, "c2": "Jarry", "create_ts": 1536758512000, "update_ts": 1537204805000}

共1个答案

匿名用户

由于您使用的是Confluent序列化程序,并且它们目前无法提供与Spark的轻松集成,因此您可以通过AbsaOSS在Github上查看一个相对较新的库来帮助解决此问题。

但是基本上,你使用火花结构化流来获取数据帧,不要尝试使用Dstream来RDD数据帧…

你可以在这里找到你想要的例子

另请参阅将Spark结构化流与Kafka Schema注册表集成的其他示例