我有一个从 Kafka 读取并写入 GCP 的管道。文件的记录太少。我想创建更大的文件。到目前为止,这就是我配置 Beam 的方式(至少我认为是一个相关参数)。我的问题是如何控制从 Beam 流管线生成的文件的大小?
windowDuration: 5
numShards: 0
batchIntervalMillis: 30000
checkpointDurationMillis: 30000
maxRecordsPerBatch: 60000000
下面是与流相关的Spark配置参数。
spark.default.parallelism=600
spark.ui.retainedStages=10
spark.ui.retainedJobs=10
spark.ui.retainedTasks=12000
spark.streaming.receiver.maxRate=350
spark.streaming.kafka.maxRatePerPartition=350
spark.streaming.ui.retainedBatches=40
spark.streaming.backpressure.enabled=true
spark.streaming.receiver.writeAheadLog.enable=false
spark.streaming.kafka.maxRatePerPartition=0
spark.streaming.stopGracefullyOnShutdown=true
spark.streaming.ui.retainedBatches=50
经过一些研究,我发现在运行Beam Streaming应用程序时,如果从Kafka获取高吞吐量数据,最好不要合并数据。原因是在像GCP这样的云环境中运行时,您需要为CPU /内存付费。如果您尝试合并,您将触发洗牌,这需要您支付更多的内存和 CPU 时间。我发现 GCP 已经提供了一种做到这一点的方法。本文“如何使用Cloud Functions自动连接Google Cloud Storage上的分片文件”中对此进行了描述。GCP 撰写功能就是为此目的而创建的。它将文件合并到位。这意味着您不必为此目的移动数据或分配资源。您只需调用该函数,这一切都发生在 GCP 服务器中。