私信  •  关注

Pablo

Pablo 最近回复了
6 年前
回复了 Pablo 创建的主题 » 从BigQuery缓慢更改查找缓存-数据流Python流SDK

正如您所指出的,Java SDK允许您使用更多的流实用程序,如计时器和状态。这些实用程序有助于实现此类管道。

Python SDK缺少这些实用程序,特别是计时器。因此,我们需要使用一个hack,其中可以通过在 some_other_topic 在PubSub。

这也意味着您必须手动执行对BigQuery的查找。你可以用 apache_beam.io.gcp.bigquery_tools.BigQueryWrapper 类直接执行BigQuery的查找。

下面是刷新某些货币转换数据的管道示例。我还没有测试过,但我90%肯定它只需要很少的调整就可以工作。如果有帮助,请告诉我。

pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)

def load_conversion_data(_):
  # I will suppose that these are currency conversions. E.g. 
  # {‘EURUSD’: 1.1, ‘USDMXN’ 20, …}
  return external_service.load_my_conversion_data()

table_pcv = beam.pvalue.AsSingleton((
  p
  | beam.io.gcp.ReadFromPubSub(topic=some_other_topic)
  | WindowInto(window.GlobalWindow(),
               trigger=trigger.Repeatedly(trigger.AfterCount(1),
               accumulation_mode=trigger.AccumulationMode.DISCARDING)
  | beam.Map(load_conversion_data)))


class ConvertTo(beam.DoFn):
  def __init__(self, target_currency):
    self.target_currenct = target_currency

  def process(self, elm, rates):
    if elm[‘currency’] == self.target_currency:
      yield elm
    elif ‘%s%s’ % (elm[‘currency’], self.target_currency) in rates:
      rate = rates[‘%s%s’ % (elm[‘currency’], self.target_currency)]
      result = {}.update(elm).update({‘currency’: self.target_currency,
                                      ‘value’: elm[‘value’]*rate})
      yield result
    else:
      return  # We drop that value


_ = (p 
     | beam.io.gcp.ReadFromPubSub(topic=some_topic)
     | beam.WindowInto(window.FixedWindows(1))
     | beam.ParDo(ConvertTo(‘USD’), rates=table_pcv))