提问者:小点点

Kafka消息格式与不同帧的关系


我刚接触Kafka。我对kafka消息格式有点困惑。我测试了一个KafkaJS消费者。

const run = async () => {
    await kafkaClient.consumer.subscribe({ topic: 'mytopic', fromBeginning: true })
    await kafkaClient.consumer.run({
        eachBatchAutoResolve: false,
        eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {
            for (let message of batch.messages) {
                if (!isRunning() || isStale()) break
                processMessage(message)
                resolveOffset(message.offset)
            }
        },
    })
}

我使用控制台. log(message)查看消息格式,它是这样的

{
  magicByte: 2,
  attributes: 0,
  timestamp: '540669',
  offset: '601953',
  key: <Buffer 39 63 37 23>,
  value: <Buffer 7b 65 32 65 37 38 ... 555 more bytes>,
  headers: {
    'myheader': <Buffer 61  6f>,
  },
  ‧‧‧‧‧‧
}

我还尝试了一个在localhost上使用Spring boot构建的消费者。由于没有生产者,我使用Postman向kafka发送消息。Spring boot消费者收到的消息是这样的

{
   body: 'this is body',
   clientIp: 'this is IP
}

'this is body'是我从Postman发送的内容。clientIp的值是我的ip。

我注意到这是从KafkaJS中的message. value.toString()返回的内容。为什么它们不同?使用不同框架构建的消费者如果连接到同一个kafka主题会得到不同的消息吗?

如果我想构建一个java消费者来接收和使用与KafkaJS消费者相同的消息格式,我应该尝试什么?


共1个答案

匿名用户

sole. log(message)向您展示了从Kafka读取的整个反序列化消息,其中包括键、值和其他元数据。这个对象中的数据结构如何取决于语言(例如Java或Go将有自己的类或结构,这些类或结构将包含大部分相同的数据,但不一定以相同的方式结构)。

即使每种语言都有不同的对象来表示反序列化的Kafka消息,元数据(标头、时间戳…)、消息键和消息值始终存在,它们的结构可能不同但其值应该始终通过某种方法或属性可用。并且当序列化时,它始终具有Kafka协议定义的相同字节表示。