我能够创建一个数据流管道,它从pub/sub读取数据,并在处理后以流模式写入大查询。
现在,我想以批处理模式运行我的管道,以降低成本,而不是流模式。
目前我的流水线正在使用动态目的地在bigquery中进行流式插入。我想知道是否有办法使用动态目的地执行批量插入操作。
下面是
public class StarterPipeline {
public interface StarterPipelineOption extends PipelineOptions {
/**
* Set this required option to specify where to read the input.
*/
@Description("Path of the file to read from")
@Default.String(Constants.pubsub_event_pipeline_url)
String getInputFile();
void setInputFile(String value);
}
@SuppressWarnings("serial")
public static void main(String[] args) throws SocketTimeoutException {
StarterPipelineOption options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(StarterPipelineOption.class);
Pipeline p = Pipeline.create(options);
PCollection<String> datastream = p.apply("Read Events From Pubsub",
PubsubIO.readStrings().fromSubscription(Constants.pubsub_event_pipeline_url));
PCollection<String> windowed_items = datastream.apply(Window.<String>into(new GlobalWindows())
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(300))))
.withAllowedLateness(Duration.standardDays(10)).discardingFiredPanes());
// Write into Big Query
windowed_items.apply("Read and make event table row", new
ReadEventJson_bigquery())
.apply("Write_events_to_BQ",
BigQueryIO.writeTableRows().to(new DynamicDestinations<TableRow, String>() {
public String getDestination(ValueInSingleWindow<TableRow> element) {
String destination = EventSchemaBuilder
.fetch_destination_based_on_event(element.getValue().get("event").toString());
return destination;
}
@Override
public TableDestination getTable(String table) {
String destination =
EventSchemaBuilder.fetch_table_name_based_on_event(table);
return new TableDestination(destination, destination);
}
@Override
public TableSchema getSchema(String table) {
TableSchema table_schema =
EventSchemaBuilder.fetch_table_schema_based_on_event(table);
return table_schema;
}
}).withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
p.run().waitUntilFinish();
log.info("Events Pipeline Job Stopped");
}
}
批处理或流由PCollection决定,因此您需要将数据流PCollection从Pub/Sub转换为批处理PCollection以写入BigQuery。允许这样做的转换是GroupIntoBatches
请注意,由于此转换使用键值对,批处理将仅包含单个键的元素。对于非KV元素,请检查此相关答案。
使用此转换将PCollection作为批处理创建后,请像使用流PCollection一样应用带有动态目标的BigQuery写入。
匿名用户
您可以通过对流式作业使用文件加载来限制成本。插入方法部分指出,BigQueryIO. Write支持两种方法将数据插入到使用BigQueryIO.Write.与方法(org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method)指定的BigQuery中。如果未提供任何方法,则将根据输入PCollection选择默认方法。有关方法的更多信息,请参阅BigQueryIO.Write.Method。
不同的插入方法提供了成本、配额和数据展示一致性的不同权衡。有关这些权衡的更多信息,请参阅BigQuery留档。
相关问题
- Android:在模块jefied-play-services-测量和jefied-play-services-测量-impl中发现重复类
- 在Hashmap中查找匹配的键/值对
- 如何迭代Hashmap并与同一Hashmap中的其他键进行组合以比较它们的对象
- HashCode-如果相等的对象碰巧在同一个桶中散列会发生什么?
- 如何防止对数组中类对象的重复引用?
- 如何以及何时在HashMap中完成重新散列
- 在hashmap或hashtable中重新散列的成本
- HashMap如何识别内部数组中的哪些位置包含元素?
- 当HashMap增加其大小时,HashMap中值的索引会发生什么?
- @BeforeClass在ktor测试类中不工作
- Jest vanilla JavaScript JSDOM刷新失败,切换beforeAll到before每一个后的第二次测试中断
- 在笑话中,定义全局变量是否与在BeforeAll中定义相同?
- 静态编程语言中@BeforeAll的正确解决方法是什么
- 线程“main”java. lang.NoClassDefFoundError中的异常:在Intellij[Spring boot]中
- 线程“main”java. lang.NoClassDefFoundError中的异常:org/apache/log4j/ProvisionNode
- 异步管道是否从服务中定义并从组件变量指向的可观察对象取消订阅?
- 结合主体时不更新在模板中的异步管道可观察
- 组件中的Angular 2重复订阅
- 应该在ngOnDestroy()中将Angular组件变量设置为null吗?
- Angular2处理非组件类中的订阅