正如您所指出的,Java SDK允许您使用更多的流实用程序,如计时器和状态。这些实用程序有助于实现此类管道。
Python SDK缺少这些实用程序,特别是计时器。因此,我们需要使用一个hack,其中可以通过在
some_other_topic
在PubSub。
这也意味着您必须手动执行对BigQuery的查找。你可以用
apache_beam.io.gcp.bigquery_tools.BigQueryWrapper
类直接执行BigQuery的查找。
下面是刷新某些货币转换数据的管道示例。我还没有测试过,但我90%肯定它只需要很少的调整就可以工作。如果有帮助,请告诉我。
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
def load_conversion_data(_):
# I will suppose that these are currency conversions. E.g.
# {âEURUSDâ: 1.1, âUSDMXNâ 20, â¦}
return external_service.load_my_conversion_data()
table_pcv = beam.pvalue.AsSingleton((
p
| beam.io.gcp.ReadFromPubSub(topic=some_other_topic)
| WindowInto(window.GlobalWindow(),
trigger=trigger.Repeatedly(trigger.AfterCount(1),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| beam.Map(load_conversion_data)))
class ConvertTo(beam.DoFn):
def __init__(self, target_currency):
self.target_currenct = target_currency
def process(self, elm, rates):
if elm[âcurrencyâ] == self.target_currency:
yield elm
elif â%s%sâ % (elm[âcurrencyâ], self.target_currency) in rates:
rate = rates[â%s%sâ % (elm[âcurrencyâ], self.target_currency)]
result = {}.update(elm).update({âcurrencyâ: self.target_currency,
âvalueâ: elm[âvalueâ]*rate})
yield result
else:
return # We drop that value
_ = (p
| beam.io.gcp.ReadFromPubSub(topic=some_topic)
| beam.WindowInto(window.FixedWindows(1))
| beam.ParDo(ConvertTo(âUSDâ), rates=table_pcv))