提问者:小点点

如何更改Debezium默认主题命名约定以使其适合合流hive表自动生成策略?


我正在构建一个数据同步器,它从MySQL Source捕获数据更改,并将数据导出到hive。

我选择使用Kafka Connect来实现这一点。我使用Debezium作为源连接器,使用合流hdfs作为接收器连接器。

但问题是,Debezium对Kafka主题的命名约定如下:

serverName. Database Name.table Name

在融合的hdfs接收器性能中,我必须将主题配置为与Debezium生成的相同:

"topic":"serverName. dataseName.tableName"

合流hdfs接收器连接器将在HDFS生成路径:

/topic/serverName。数据库名称。表名称/分区=0

这肯定会在HDFS/Hive中引起一些问题,因为路径包含语法.,实际上是汇流hdfs接收器连接器自动生成的外部表失败,由于路径问题。

2020-05-08T00:42:02,717 ERROR [pool-6-thread-31] metastore.RetryingHMSHandler: MetaException(message:java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: hdfs://localhost:9000./null/topics/dbserver1.test_data_1.student1)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newMetaException(HiveMetaStore.java:6935)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:2050)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invokeInternal(RetryingHMSHandler.java:147)
    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:108)
    at com.sun.proxy.$Proxy26.create_table_with_environment_context(Unknown Source)
    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Processor$create_table_with_environment_context.getResult(ThriftHiveMetastore.java:14800)
    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Processor$create_table_with_environment_context.getResult(ThriftHiveMetastore.java:14784)
    at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
    at org.apache.hadoop.hive.metastore.TUGIBasedProcessor$1.run(TUGIBasedProcessor.java:111)
    at org.apache.hadoop.hive.metastore.TUGIBasedProcessor$1.run(TUGIBasedProcessor.java:107)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
    at org.apache.hadoop.hive.metastore.TUGIBasedProcessor.process(TUGIBasedProcessor.java:119)
    at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: hdfs://localhost:9000./null/topics/dbserver1.test_data_1.student1
    at org.apache.hadoop.fs.Path.initialize(Path.java:263)
    at org.apache.hadoop.fs.Path.<init>(Path.java:254)
    at org.apache.hadoop.hive.metastore.Warehouse.getDnsPath(Warehouse.java:143)
    at org.apache.hadoop.hive.metastore.Warehouse.getDnsPath(Warehouse.java:147)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1852)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1786)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:2035)
    ... 20 more
Caused by: java.net.URISyntaxException: Relative path in absolute URI: hdfs://localhost:9000./null/topics/dbserver1.test_data_1.student1
    at java.net.URI.checkPath(URI.java:1823)
    at java.net.URI.<init>(URI.java:745)
    at org.apache.hadoop.fs.Path.initialize(Path.java:260)
    ... 26 more

那么,无论如何,我可以更改主题的Debezium默认命名约定,或者,我可以更改通过主题名称生成的合流hdfs接收器连接器的默认路径吗?


共1个答案

匿名用户

HDFS连接器将在创建Hive表时用下划线替换点(和破折号)

HDFS本身不关心路径中的点。问题是端口后面不能有一个点,而且里面有/null

hdfs://localhost:9000./null

无论如何,我是否可以更改主题的Debezium默认命名约定

解决方案与Debezium无关。您可以在转换配置中使用RegexRouter,它是Apache Kafka Connect库的基础,用于您的源连接器或接收器连接器,具体取决于您想要“修复”问题的时间。

您还可以编写自己的转换并将其放在Connect的plugin. path