我的问题是关于Apache火花流如何通过提高并行化或将许多写入组合成一个更大的写入来处理需要很长时间的输出操作。在这种情况下,写入是对Neo4J的密码请求,但它可以应用于其他数据存储。
我有一个Apache火花流应用程序在Java写入2个数据存储:Elasticsearch和Neo4j。以下是版本:
Elasticsearch输出非常简单,因为我使用了Elasticsearch-Hadoop for Apache Spark库。
我们的输入是从Kafka接收的关于特定主题的流,我通过map函数反序列化流的元素,以创建一个<code>JavaDStream
代码如下所示。
dataStream.foreachRDD( rdd -> {
rdd.foreach( cypherQuery -> {
BoltDriverSingleton.getInstance().update(cypherQuery);
});
});
关于如何提高吞吐量,我有两个想法:
我想得太多了吗?我尝试了太多的研究,以至于我可能陷得太深了。
感谢任何可能提供帮助的人;这是我很长一段时间以来第一个StackOverflow问题,所以请留下反馈,我会及时回应并根据需要纠正这个问题。
我认为我们所需要的只是一个简单的地图/减少。以下内容应该允许我们解析RDD中的每条消息,然后将其一次性写入图形数据库。
dataStream.map( message -> {
return (ParseResult) Neo4JMessageParser.parse(message);
}).foreachRDD( rdd -> {
List<ParseResult> parseResults = rdd.collect();
String cypherQuery = Neo4JMessageParser.buildQuery(parseResults);
Neo4JRepository.update(cypherQuery);
// commit offsets
});
通过这样做,我们应该能够减少与必须为每个传入消息执行写入相关的开销。