我试图遵循缓慢更改查找缓存的设计模式(
https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1
)对于使用Python SDK for Apache Beam on DataFlow的流管道。
查找缓存的引用表位于BigQuery中,我们可以读取它并将其作为ParDo操作的侧输入传入,但无论我们如何设置触发器/windows,它都不会刷新。
class FilterAlertDoFn(beam.DoFn):
def process(self, element, alertlist):
print len(alertlist)
print alertlist
⦠# function logic
alert_input = (p | beam.io.Read(beam.io.BigQuerySource(query=ALERT_QUERY))
| âalert_side_inputâ >> beam.WindowInto(
beam.window.GlobalWindows(),
trigger=trigger.RepeatedlyTrigger(trigger.AfterWatermark(
late=trigger.AfterCount(1)
)),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING
)
| beam.Map(lambda elem: elem[âSOMEKEYâ])
)
...
main_input | âalertsâ >> beam.ParDo(FilterAlertDoFn(), beam.pvalue.AsList(alert_input))
基于此处的I/O页(
https://beam.apache.org/documentation/io/built-in/
)它说Python SDK只支持BigQuery接收器的流,这是否意味着BQ读取是一个有界的源,因此不能在这个方法中刷新?
尝试在源上设置非全局窗口会导致侧输入中的PCollection为空。
更新
:
当尝试实现Pablo的答案所建议的策略时,使用side输入的ParDo操作不会运行。
有一个输入源指向两个输出,其中一个使用侧输入。非SideInput仍然会到达它的目的地,SideInput管道不会进入FilterAlertDoFn()。
通过将side输入替换为虚拟值,管道将进入该函数。也许是在等待一个不存在的合适窗口?
使用与上面相同的FilterAlertDoFn(),我的side_输入和调用现在看起来如下:
def refresh_side_input(_):
query = 'select col from table'
client = bigquery.Client(project='gcp-project')
query_job = client.query(query)
return query_job.result()
trigger_input = ( p | 'alert_ref_trigger' >> beam.io.ReadFromPubSub(
subscription=known_args.trigger_subscription))
bigquery_side_input = beam.pvalue.AsSingleton((trigger_input
| beam.WindowInto(beam.window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterCount(1)),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| beam.Map(refresh_side_input)
))
...
# Passing this as side input doesn't work
main_input | 'alerts' >> beam.ParDo(FilterAlertDoFn(), bigquery_side_input)
# Passing dummy variable as side input does work
main_input | 'alerts' >> beam.ParDo(FilterAlertDoFn(), [1])
我尝试了几个不同版本的refresh_side_input(),它们在检查函数内部的返回时报告预期结果。
更新2:
我对Pablo的代码做了一些小修改,得到了相同的行为——DoFn永远不会执行。
在下面的示例中,每当我发布到
一些其他的话题
但在发布到时将永远看不到“in-DoFn”
一些话题
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.transforms import trigger
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
def load_my_conversion_data():
return {'EURUSD': 1.1, 'USDMXN': 4.4}
def load_conversion_data(_):
# I will suppose that these are currency conversions. E.g.
# {'EURUSD': 1.1, 'USDMXN' 20,}
print 'in_load_conversion_data'
return load_my_conversion_data()
class ConvertTo(beam.DoFn):
def __init__(self, target_currency):
self.target_currency = target_currency
def process(self, elm, rates):
print 'in_DoFn'
elm = elm.attributes
if elm['currency'] == self.target_currency:
yield elm
elif ' % s % s' % (elm['currency'], self.target_currency) in rates:
rate = rates[' % s % s' % (elm['currency'], self.target_currency)]
result = {}.update(elm).update({'currency': self.target_currency,
'value': elm['value']*rate})
yield result
else:
return # We drop that value
pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)
some_topic = 'projects/some_project/topics/some_topic'
some_other_topic = 'projects/some_project/topics/some_other_topic'
with beam.Pipeline(options=pipeline_options) as p:
table_pcv = beam.pvalue.AsSingleton((
p
| 'some_other_topic' >> beam.io.ReadFromPubSub(topic=some_other_topic, with_attributes=True)
| 'some_other_window' >> beam.WindowInto(window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterCount(1)),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| beam.Map(load_conversion_data)))
_ = (p | 'some_topic' >> beam.io.ReadFromPubSub(topic=some_topic)
| 'some_window' >> beam.WindowInto(window.FixedWindows(1))
| beam.ParDo(ConvertTo('USD'), rates=table_pcv))