嗨,我正在处理一个Spark Streaming项目。在这个项目中,我要解析从Kafka Stream收到的数据(Proto Buf Message)
我对在Kafka中解析Proto Buf Mesage没有任何想法。
我正在尝试理解下面的代码以开始解析原型消息。
def main(args:数组[字符串]){
val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()
import spark.implicits._
val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","student").load()
val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
}
有人能给我提供一些如何逐步解析proto buf消息的示例吗?我只需要一些如何在火花流应用程序中使用它的参考资料。
我以这种方式使用结构化流媒体:
import MessagesProto #Your proto.py file
from datetime import datetime as dt
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import udf
def message_proto(value):
m = MessagesProto.message_x()
m.ParseFromString(value)
return({'x': y,
'z': w
})
schema_impressions = StructType() \
.add("x", StringType()) \
.add("z", TimestampType())
proto_udf = udf(message_proto, schema_impressions)
class StructuredStreaming():
def structured_streming(self):
stream = self.spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", self.kafka_bootstrap_servers) \
.option("subscribe", self.topic) \
.option("startingOffsets", self.startingOffsets) \
.option("max.poll.records", self.max_poll_records) \
.option("auto.commit.interval.ms", self.auto_commit_interval_ms) \
.option("session.timeout.ms", self.session_timeout_ms) \
.option("key.deserializer", self.key_deserializer) \
.option("value.deserializer", self.value_deserializer) \
.load()
self.query = stream \
.select(col("value")) \
.select(proto_udf("value").alias("value_udf")) \
.select("value_udf.x", "valued_udf.y)