社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

如何在python中处理传入的pubsub消息?

Larry Cai • 5 年前 • 912 次点击  

我已经在debian上创建了一个云计算引擎实例,并成功地创建了一个主题的push订阅

from google.cloud import pubsub_v1

project_id = "censored"
topic_name = "censored"
subscription_name = "censored"
endpoint = "https://censored.appspot.com/pubsub/push?token=censored"

def create_push_subscription(project_id,
                             topic_name,
                             subscription_name,
                             endpoint):
    """Create a new push subscription on the given topic."""
    # [START pubsub_create_push_subscription]

    subscriber = pubsub_v1.SubscriberClient()
    topic_path = subscriber.topic_path(project_id, topic_name)
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    push_config = pubsub_v1.types.PushConfig(
        push_endpoint=endpoint)

    subscription = subscriber.create_subscription(
        subscription_path, topic_path, push_config)

    print('Push subscription created: {}'.format(subscription))
    print('Endpoint for subscription is: {}'.format(endpoint))
    # [END pubsub_create_push_subscription]

create_push_subscription(project_id, topic_name, subscription_name, endpoint)

但我不确定收到的信息到底是怎么来的。我已经找到了这个解析消息的示例代码,但是我不确定如何让它在后台监控,并在收到消息时“激活”。

import argparse
import base64
import json
import sys
import time

from google.cloud import pubsub_v1

def summarize(message):
    # [START parse_message]
    data = message.data.decode('utf-8')
    attributes = message.attributes

    name = attributes['name']
    time_created = attributes['timeCreated']
    bucket_id = attributes['bucketId']
    object_id = attributes['objectId']
    generation = attributes['objectGeneration']
    description = (
        '\tName: {name}\n'
        '\tTime Created: {time_created}\n'
        '\tBucket ID: {bucket_id}\n'
        '\tObject ID: {object_id}\n'
        '\tGeneration: {generation}\n'
        ).format(
            name=name,
            time_created=time_created,
            bucket_id=bucket_id,
            object_id=object_id,
            generation=generation
            )

    if 'overwroteGeneration' in attributes:
        description += '\tOverwrote generation: %s\n' % (
            attributes['overwroteGeneration'])
    if 'overwrittenByGeneration' in attributes:
        description += '\tOverwritten by generation: %s\n' % (
            attributes['overwrittenByGeneration'])

    payload_format = attributes['payloadFormat']
    if payload_format == 'JSON_API_V1':
        object_metadata = json.loads(data)
        name = object_metadata['name']
        time_created = object_metadata['timeCreated']
        size = object_metadata['size']
        content_type = object_metadata['contentType']
        metageneration = object_metadata['metageneration']
        description += (
            '\tName: {name}\n'
            '\tTime Created: {time_created}\n'
            '\tContent type: {content_type}\n'
            '\tSize: {object_size}\n'
            '\tMetageneration: {metageneration}\n'
            ).format(
                name=name,
                time_created=time_created,
                content_type=content_type,
                object_size=size,
                metageneration=metageneration
                )
    return description
    print('Note for developer: If BucketId and ObjectId listed, utf encoding.')
    print('If not, JSON_V1 encoding. Adjust accordingly.')

    # [END parse_message]
while(True):
    print("signpost 1")
    summarize(message)
    print("signpost 2")
    time.sleep(10)
print("signpost 3")

例如,此代码将返回

NameError: name 'message' is not defined

这是意料之中的…

有人能帮我把它摆好吗?

我知道pull中的消息是不同的,因为在pull中会定义消息,但是如果可能的话,我希望保持push。

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/40794
 
912 次点击  
文章 [ 1 ]  |  最新文章 5 年前