社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  MQ

在activemq中配置网络代理时,并非所有使用者都使用消息

Passionate developer • 4 年前 • 739 次点击  

我在同一台机器上有两个应用程序实例(尽管它也可能在不同的机器上),其中两个tomcat实例具有不同的端口,apache activemq嵌入在应用程序中。

我已经配置了一个静态代理网络,这样来自一个实例的消息也可以被所有其他实例使用(每个实例都可以是生产者和消费者)。

servlet:

package com.activemq.servlet;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;

import javax.jms.JMSException;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import com.activemq.ActiveMQStartup;
import com.activemq.MQPublisher;
import com.activemq.SendMsg;
import com.activemq.SendMsgToAllInstance;
import com.activemq.TestPublisher;

/**
 * Servlet implementation class ActiveMQStartUpServlet
 */
@WebServlet(value = "/activeMQStartUpServlet", loadOnStartup = 1)
public class ActiveMQStartUpServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;
    private ActiveMQStartup mqStartup = null;
    private static final Map pooledPublishers = new HashMap();

    @Override
    public void init(ServletConfig config) throws ServletException {
        System.out.println("starting servelt--------------");
        super.init(config);
        //Apache Active MQ Startup
        mqStartup = new ActiveMQStartup();
        mqStartup.startBrokerService();

    }

    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        System.out.println(req.getParameter("distributedMsg"));
        String mqConfig = null;
        String distributedMsg = req.getParameter("distributedMsg");
        String simpleMsg = req.getParameter("simpleMsg");
        if (distributedMsg != null && !distributedMsg.equals(""))
            mqConfig = "distributedMsg";
        else if (simpleMsg != null && !simpleMsg.equals(""))
            mqConfig = "simpleMsg";
        MQPublisher publisher = acquirePublisher(mqConfig);
        try {
            publisher.publish(mqConfig);
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            releasePublisher(publisher);
        }
    }

    @SuppressWarnings("unchecked")
    private void releasePublisher(MQPublisher publisher) {
        if (publisher == null) return;
        @SuppressWarnings("rawtypes")
        LinkedList publishers;
        TestPublisher poolablePublisher = (TestPublisher)publisher;
        publishers = getPooledPublishers(poolablePublisher.getConfigurationName());
        synchronized (publishers) {
            publishers.addLast(poolablePublisher);
        }

    }

    private MQPublisher acquirePublisher(String mqConfig) {
        LinkedList publishers = getPooledPublishers(mqConfig);
        MQPublisher publisher = getMQPubliser(publishers);
        if (publisher != null) return publisher;
        try {
            if (mqConfig.equals("distributedMsg"))
                return new TestPublisher(MQConfiguration.getConfiguration("distributedMsg"), new SendMsgToAllInstance());
            else    
                return new TestPublisher(MQConfiguration.getConfiguration("simpleMsg"), new SendMsg());
        }catch(Exception e){
            e.printStackTrace();
        }
        return null;
    }

    private LinkedList getPooledPublishers(String mqConfig) {
         LinkedList publishers = null;
         publishers = (LinkedList) pooledPublishers.get(mqConfig);
         if (publishers == null) {
             synchronized(pooledPublishers) {
                 publishers = (LinkedList) pooledPublishers.get(mqConfig);
                 if (publishers == null) {
                     publishers = new LinkedList();
                     pooledPublishers.put(mqConfig, publishers);
                 }
             }
         }
        return publishers;
    }

    private MQPublisher getMQPubliser(LinkedList publishers) {
        synchronized (publishers) {
            while (!publishers.isEmpty()) {
                TestPublisher publisher = (TestPublisher)publishers.removeFirst();
                return publisher;
            }
        }
        return null;
    }




}

配置:

package com.activemq.servlet;

import java.util.HashMap;
import java.util.Map;

import javax.jms.JMSException;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.activemq.ActiveMQContext;

public class MQConfiguration {
    private static final Map configurations = new HashMap();
    private String mqConfig;
    private String topicName;
    private TopicConnection topicConnection = null;

    private MQConfiguration(String mqConfig, String string, String string2) {
        this.mqConfig = mqConfig;

        try {
            String topicFactoryConName = ActiveMQContext.getProperty(mqConfig);
            this.topicName = (mqConfig.equals("distributedMsg") ? ActiveMQContext.getProperty("distributedTopic"):ActiveMQContext.getProperty("normalTopic"));
            TopicConnectionFactory factory = (ActiveMQConnectionFactory) ActiveMQContext.getContext()
                    .lookup(topicFactoryConName);
            this.topicConnection = factory.createTopicConnection();
            this.topicConnection.start();
        } catch (Exception e) {
            System.out.println("error: " + e);
        }
    }

    public static MQConfiguration getConfiguration(String mqConfig) {
        if (mqConfig == null || "".equals(mqConfig)) {
            throw new IllegalArgumentException("mqConfig is null or empty");
        }

        MQConfiguration config = null;

        if (config != null) {
            return config;
        }
        synchronized (configurations) {
            config = (MQConfiguration) configurations.get(mqConfig);
            if (config == null) {
                config = new MQConfiguration(mqConfig, "userName", "userPassword");
            }
            configurations.put(mqConfig, config);
        }

        return config;
    }

    public String getMqConfig() {
        return this.mqConfig;
    }

    public TopicSession createTopicSession(boolean isTransacted, int autoAcknowledge) throws JMSException {
        if (this.topicConnection == null) {
            IllegalStateException ise = new IllegalStateException("topic connection not configured");
            throw ise;
        }
        return this.topicConnection.createTopicSession(isTransacted, autoAcknowledge);
    }

    public Topic getTopic() {
        try {
            return (Topic) ActiveMQContext.getContext().lookup(this.topicName);
        } catch (Exception e) {
            e.getMessage();
        }
        return null;
    }
}

出版商:

package com.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import com.activemq.servlet.MQConfiguration;

public class TestPublisher implements MQPublisher {
    private final String configurationName;
    private TopicSession topicSession = null;
    private TopicPublisher topicPublisher = null;

    public TestPublisher(MQConfiguration config, Object messageListener) throws JMSException {
        if (config == null) {
            throw new IllegalArgumentException("config == null");
        }
        Topic topic = config.getTopic();
        this.configurationName = config.getMqConfig();
        this.topicSession = config.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        this.topicPublisher = this.topicSession.createPublisher(topic);
        MessageConsumer msgConsumer = this.topicSession.createConsumer(topic);
        msgConsumer.setMessageListener((MessageListener) messageListener);
    }

    @Override
    public void publish(String msg) throws JMSException {
        this.topicPublisher.publish(createMessage(msg, this.topicSession));
    }

    private Message createMessage(String msg, Session session) throws JMSException {
        TextMessage message = session.createTextMessage(msg);
        return message;
    }

    public String getConfigurationName() {
        return this.configurationName;
    }
}

消费者:

package com.activemq;

import javax.jms.Message;
import javax.jms.MessageListener;

public class SendMsgToAllInstance implements MessageListener {

    @Override
    public void onMessage(Message arg0) {
        System.out.println("distributed message-------------");

        // We have call to dao layer to to fetch some data and cached it

    }

}

jndi:activemq-jndi.properties




    
# JNDI properties file to setup the JNDI server within ActiveMQ

#
# Default JNDI properties settings
#
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:61616
activemq.network.connector=static:(tcp://localhost:61620)

#activemq.network.connector=broker:(tcp://localhost:61619,network:static:tcp://localhost:61620)?persistent=false&useJmx=true
activemq.data.directory=data61619
activemq.jmx.port=1099

#
# Set the connection factory name(s) as well as the destination names. The connection factory name(s)
# as well as the second part (after the dot) of the left hand side of the destination definition
# must be used in the JNDI lookups.
#
connectionFactoryNames = distributedMsgFactory,simpleMsgFactory
topic.jms/distributedTopic=distributedTopic
topic.jms/normalTopic=normalTopic

distributedMsg=distributedMsgFactory
simpleMsg=simpleMsgFactory

distributedTopic=jms/distributedTopic
normalTopic=jms/normalTopic

activemq启动 :

package com.activemq;

import java.net.URI;

import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.security.JaasAuthenticationPlugin;

public class ActiveMQStartup {
    private final String bindAddress;
    private final String dataDirectory;
    private BrokerService broker = new BrokerService();
    protected final int numRestarts = 3;
    protected final int networkTTL = 2;
    protected final int consumerTTL = 2;
    protected final boolean dynamicOnly = true;
    protected final String networkBroker;
    protected final String jmxPort;

    public ActiveMQStartup() {
        ActiveMQContext context = new ActiveMQContext();
        context.loadJndiProperties();
        bindAddress = ActiveMQContext.getProperty("java.naming.provider.url");
        dataDirectory = ActiveMQContext.getProperty("activemq.data.directory");
        networkBroker = ActiveMQContext.getProperty("activemq.network.connector");
        jmxPort = ActiveMQContext.getProperty("activemq.jmx.port");
    }

    // Start activemq broker service
    public void startBrokerService() {
        try {
            broker.setDataDirectory("../" + dataDirectory);
            broker.setBrokerName(dataDirectory);
            broker.setUseShutdownHook(true);
            TransportConnector connector = new TransportConnector();
            connector.setUri(new URI(bindAddress));         

            //broker.setPlugins(new BrokerPlugin[]{new JaasAuthenticationPlugin()});
            ManagementContext mgContext = new ManagementContext();
            if (networkBroker != null && !networkBroker.isEmpty()) {
                NetworkConnector networkConnector = broker.addNetworkConnector(networkBroker);
                networkConnector.setName(dataDirectory);
                mgContext.setConnectorPort(Integer.parseInt(jmxPort));
                broker.setManagementContext(mgContext);
                configureNetworkConnector(networkConnector);
            }
            broker.setNetworkConnectorStartAsync(true);
            broker.addConnector(connector);
            broker.start();
        } catch (Exception e) {
            System.out.println("Failed to start Apache MQ Broker : " + e);
        }
    }

    private void configureNetworkConnector(NetworkConnector networkConnector) {
        networkConnector.setDuplex(true);
        networkConnector.setNetworkTTL(networkTTL);
        networkConnector.setDynamicOnly(dynamicOnly);
        networkConnector.setConsumerTTL(consumerTTL);
        //networkConnector.setStaticBridge(true);
    }

    // Stop broker service
    public void stopBrokerService() {
        try {
            broker.stop();
        } catch (Exception e) {
            System.out.println("Unable to stop the ApacheMQ Broker service " + e);
        }
    }
}

我正在逐个启动tomcat实例,看到代理之间的网络连接正在建立。

当我从instance1或instance2(第一次)发送消息时,它仅在该实例上消耗,但当我从第二个实例发送消息时,两者都消耗;

GIT中的代码: https://github.com/AratRana/ApacheActiveMQ

你能告诉我哪里错了吗?

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/40093
 
739 次点击  
文章 [ 1 ]  |  最新文章 4 年前