我刚接触Kafka。我对kafka消息格式有点困惑。我测试了一个KafkaJS消费者。
const run = async () => {
await kafkaClient.consumer.subscribe({ topic: 'mytopic', fromBeginning: true })
await kafkaClient.consumer.run({
eachBatchAutoResolve: false,
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {
for (let message of batch.messages) {
if (!isRunning() || isStale()) break
processMessage(message)
resolveOffset(message.offset)
}
},
})
}
我使用控制台. log(message)
查看消息格式,它是这样的
{
magicByte: 2,
attributes: 0,
timestamp: '540669',
offset: '601953',
key: <Buffer 39 63 37 23>,
value: <Buffer 7b 65 32 65 37 38 ... 555 more bytes>,
headers: {
'myheader': <Buffer 61 6f>,
},
‧‧‧‧‧‧
}
我还尝试了一个在localhost上使用Spring boot构建的消费者。由于没有生产者,我使用Postman向kafka发送消息。Spring boot消费者收到的消息是这样的
{
body: 'this is body',
clientIp: 'this is IP
}
'this is body'
是我从Postman发送的内容。clientIp
的值是我的ip。
我注意到这是从KafkaJS中的message. value.toString()
返回的内容。为什么它们不同?使用不同框架构建的消费者如果连接到同一个kafka主题会得到不同的消息吗?
如果我想构建一个java消费者来接收和使用与KafkaJS消费者相同的消息格式,我应该尝试什么?
sole. log(message)
向您展示了从Kafka读取的整个反序列化消息,其中包括键、值和其他元数据。这个对象中的数据结构如何取决于语言(例如Java或Go将有自己的类或结构,这些类或结构将包含大部分相同的数据,但不一定以相同的方式结构)。
即使每种语言都有不同的对象来表示反序列化的Kafka消息,元数据(标头、时间戳…)、消息键和消息值始终存在,它们的结构可能不同但其值应该始终通过某种方法或属性可用。并且当序列化时,它始终具有Kafka协议定义的相同字节表示。