社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  MQ

在控制器中使用rabbitmq消息

zlatan • 4 年前 • 1483 次点击  

我目前正在学习服务、api和amqp消息传递系统,准确地说是rabbitmq,我正在关注这个 tutorial 关于rabbitmq消息传递。我把一切都做好了,但我想在我的项目中改变一些东西。我想从路由和控制器调用发布者和使用者脚本,而不是在终端中键入它们( php src/publisher.php php src/consumer.php )

首先,我创建了两个路由:

 Route::get('/send-message', 'ServiceAController@index');
 Route::get('/receive-message', 'ServiceBController@index');

第一个路由(send message)用于将http请求作为消息发送到rabbitmq,这是通过postman post请求完成的,在这里我插入了所需的参数。此路线的控制器工作正常,如下所示:

public function index(Request $request){
    //Returning status 200 and sending message if amount is in range
    if( (-100000000  <= $request->amount )  &&  ($request->amount <= 100000000 )){
        //Sending message to RabbitMQ
        $amount = $request->amount;
        $currency = $request->currency;

        //Saving request data to variable to publish it
        $messageContent = json_encode([
            'amount' => $amount * 100,
            'currency' => $currency,
        ]);

        //Sending broker message
        $host = 'secret';
        $port = 5672;
        $user = 'secret';
        $pass = 'secret';
        $vhost = 'secret';
        $exchange = 'balance';
        $queue = 'local_balance';

        $connection = new AMQPStreamConnection($host, $port, $user, $pass, $vhost);
        $channel = $connection->channel();
        /*
            The following code is the same both in the consumer and the producer.
            In this way we are sure we always have a queue to consume from and an
                exchange where to publish messages.
        */
        /*
            name: $queue
            passive: false
            durable: true // the queue will survive server restarts
            exclusive: false // the queue can be accessed in other channels
            auto_delete: false //the queue won't be deleted once the channel is closed.
        */
        $channel->queue_declare($queue, false, true, false, false);
        /*
            name: $exchange
            type: direct
            passive: false
            durable: true // the exchange will survive server restarts
            auto_delete: false //the exchange won't be deleted once the channel is closed.
        */
        $channel->exchange_declare($exchange, 'direct', false, true, false);
        $channel->queue_bind($queue, $exchange);
        $messageBody = $messageContent;
        $message = new AMQPMessage($messageBody, ['content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
        $channel->basic_publish($message, $exchange);
        $channel->close();
        $connection->close();

        //Returning json response of HTTP payload
        $response = json_encode([
            'amount' => +number_format($amount, 2, '.', ''),
            'currency' => $currency,
        ]);
        return $response;
    }else{
        //Returning status 400 if amount is not in acceptable range
        abort(400, 'Amount is not in acceptable range'); //Returning code 400 if condition isn't met
    }
}

但是我的 问题 从我将消费者代码放入ServiceBController开始,与前一个相同。我的ServiceBController如下所示:

public function index(){

    $host = 'secret';
    $port = 5672;
    $user = 'secret';
    $pass = 'secret';
    $vhost = 'secret';
    $exchange = 'balance';
    $queue = 'local_balance';

    $connection = new AMQPStreamConnection($host, $port, $user, $pass, $vhost);
    $channel = $connection->channel();
    /*
        The following code is the same both in the consumer and the producer.
        In this way we are sure we always have a queue to consume from and an
            exchange where to publish messages.
    */
    /*
        name: $queue
        passive: false
        durable: true // the queue will survive server restarts
        exclusive: false // the queue can be accessed in other channels
        auto_delete: false //the queue won't be deleted once the channel is closed.
    */
    $channel->queue_declare($queue, false, true, false, false);
    /*
        name: $exchange
        type: direct
        passive: false
        durable: true // the exchange will survive server restarts
        auto_delete: false //the exchange won't be deleted once the channel is closed.
    */
    $channel->exchange_declare($exchange, 'direct', false, true, false);
    $channel->queue_bind($queue, $exchange);
    /**
     * @param AMQPMessage $message
     */
    function process_message(AMQPMessage $message){
        $messageBody = json_decode($message->body);
        $amount = $messageBody->amount;
        $currency = $messageBody->currency;

        file_put_contents('C:/xampp/htdocs/nsoft/data' . '.json', $message->body);

        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    }
    /*
        queue: Queue from where to get the messages
        consumer_tag: Consumer identifier
        no_local: Don't receive messages published by this consumer.
        no_ack: Tells the server if the consumer will acknowledge the messages.
        exclusive: Request exclusive consumer access, meaning only this consumer can access the queue
        nowait:
        callback: A PHP Callback
    */
    $consumerTag = 'local.consumer';
    $channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');
    /**
     * @param \PhpAmqpLib\Channel\AMQPChannel $channel
     * @param \PhpAmqpLib\Connection\AbstractConnection $connection
     */
    function shutdown($channel, $connection){
        $channel->close();
        $connection->close();
    }

    register_shutdown_function('shutdown', $channel, $connection);

    while (count($channel->callbacks)) {
        $channel->wait();
    }
}

在postman中使用get请求调用它之后,我得到以下错误:

symfony\component\debug\exception\fatalerrorexception:在第227行的文件C:\xampp\htdocs\nsoft\vendor\php amqplib\php amqplib\phpamqplib\wire\io\streamio.php中,超过了30秒的最长执行时间。

我已经被这个错误困扰了好几天了,似乎找不到解决办法,所以我需要一些人的帮助。我做错什么了?作为参考,当我将同一个使用者脚本作为单独的文件放置时,它也可以工作( src/consumer.php )当我通过我的终端呼叫时。如有任何帮助,我们将不胜感激。

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/43094
 
1483 次点击  
文章 [ 2 ]  |  最新文章 4 年前