Design and Implementation of an Online Course Purchase System(22)

Applying ActiveMQ to Synchronize Index Libraries

Posted by Chen Xingxu on June 3, 2020

易课寄在线购课系统开发笔记(二十二)–应用ActiveMQ实现同步索引库

同步索引库分析

方案一:在 ecourses-bms 中,添加课程的业务逻辑中,添加一个同步索引库的业务逻辑。

缺点:业务逻辑耦合度高,业务拆分不明确。

方案二:业务逻辑在 ecourses-search 中实现,调用服务在 ecourses-bms 实现,业务逻辑分开。

缺点:服务之间的耦合度变高,服务的启动有先后顺序。

方案三:使用消息队列。MQ是一个消息中间件。

MQ 是一个消息中间件,常见的有 ActiveMQ、RabbitMQ、Kafka

ActiveMQ

什么是 ActiveMQ

ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现,尽管 JMS 规范出台已经是很久之前的事情了,但是 JMS 在当今的 J2EE 应用中间仍然扮演着特殊的地位。

主要特点:

  1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire, Stomp REST, WS Notification, XMPP, AMQP;

  2. 完全支持 JMS1.1 和 J2EE 1.4 规范 (持久化,XA 消息,事务);

  3. 对 Spring 的支持,ActiveMQ 可以很容易内嵌到使用 Spring 的系统里面去,而且也支持 Spring 2.0 的特性;

  4. 通过了常见 J2EE 服务器 (如 Geronimo, JBoss 4, GlassFish, WebLogic) 的测试,其中通过 JCA 1.5 resource adaptors 的配置,可以让 ActiveMQ 可以自动的部署到任何兼容 J2EE 1.4 的商业服务器上;

  5. 支持多种传送协议: in-VM, TCP, SSL, NIO, UDP, JGroups, JXTA;

  6. 支持通过 JDBC 和 journal 提供高速的消息持久化;

  7. 从设计上保证了高性能的集群,客户端——服务器,点对点;

  8. 支持 Ajax;

  9. 支持与 Axis 的整合;

  10. 可以很容易得调用内嵌 JMS provider 进行测试。

ActiveMQ 的消息形式

对于消息的传递有两种类型:

一种是点对点的,即一个生产者和一个消费者一一对应;

另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。

JMS 定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。

  • StreamMessage —— Java原始值的数据流
  • MapMessage —— 一套名称——值对
  • TextMessage —— 一个字符串对象
  • ObjectMessage —— 一个序列化的 Java 对象
  • BytesMessage —— 一个字节的数据流

ActiveMQ 的安装

请参阅下面这篇笔记:

CentOS 6学习笔记(十一)–CentOS6环境安装ActiveMQ

ActiveMQ 的使用方法

Queue

Producer

生产者:生产消息,发送端。

把 jar 包添加到工程中。使用 5.11.2 版本的 jar 包。

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-all</artifactId>
</dependency>

第一步:创建 ConnectionFactory 对象,需要指定服务端 IP 及端口号;

第二步:使用 ConnectionFactory 对象创建一个 Connection 对象;

第三步:开启连接,调用 Connection 对象的 start 方法;

第四步:使用 Connection 对象创建一个 Session 对象;

第五步:使用 Session 对象创建一个 Destination 对象(topic、queue),此处创建一个 Queue 对象;

第六步:使用 Session 对象创建一个 Producer 对象;

第七步:创建一个 Message 对象,创建一个 TextMessage 对象;

第八步:使用 Producer 对象发送消息;

第九步:关闭资源。

@Test
public void testQueueProducer() throws Exception {
    // 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
    //brokerURL服务器的ip及端口号
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.156:61616");
    // 第二步:使用ConnectionFactory对象创建一个Connection对象。
    Connection connection = connectionFactory.createConnection();
    // 第三步:开启连接,调用Connection对象的start方法。
    connection.start();
    // 第四步:使用Connection对象创建一个Session对象。
    //第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
    //第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
    //参数:队列的名称。
    Queue queue = session.createQueue("test-queue");
    // 第六步:使用Session对象创建一个Producer对象。
    MessageProducer producer = session.createProducer(queue);
    // 第七步:创建一个Message对象,创建一个TextMessage对象。
    /*TextMessage message = new ActiveMQTextMessage();
    message.setText("hello activeMq,this is my first test.");*/
    TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
    // 第八步:使用Producer对象发送消息。
    producer.send(textMessage);
    // 第九步:关闭资源。
    producer.close();
    session.close();
    connection.close();
}

Consumer

消费者:接收消息。

第一步:创建一个 ConnectionFactory 对象;

第二步:从 ConnectionFactory 对象中获得一个 Connection 对象;

第三步:开启连接。调用 Connection 对象的 start 方法;

第四步:使用 Connection 对象创建一个 Session 对象;

第五步:使用 Session 对象创建一个 Destination 对象,和发送端保持一致 queue,并且队列的名称一致;

第六步:使用 Session 对象创建一个 Consumer 对象;

第七步:接收消息;

第八步:打印消息;

第九步:关闭资源。

@Test
public void testQueueConsumer() throws Exception {
    // 第一步:创建一个ConnectionFactory对象。
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.156:61616");
    // 第二步:从ConnectionFactory对象中获得一个Connection对象。
    Connection connection = connectionFactory.createConnection();
    // 第三步:开启连接。调用Connection对象的start方法。
    connection.start();
    // 第四步:使用Connection对象创建一个Session对象。
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
    Queue queue = session.createQueue("test-queue");
    // 第六步:使用Session对象创建一个Consumer对象。
    MessageConsumer consumer = session.createConsumer(queue);
    // 第七步:接收消息。
    consumer.setMessageListener(new MessageListener() {

        @Override
        public void onMessage(Message message) {
            try {
                TextMessage textMessage = (TextMessage) message;
                String text = null;
                //取消息的内容
                text = textMessage.getText();
                // 第八步:打印消息。
                System.out.println(text);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    });
    //等待键盘输入
    System.in.read();
    // 第九步:关闭资源
    consumer.close();
    session.close();
    connection.close();
}

Topic

Producer

使用步骤:

第一步:创建 ConnectionFactory 对象,需要指定服务端 IP 及端口号;

第二步:使用 ConnectionFactory 对象创建一个 Connection 对象;

第三步:开启连接,调用 Connection 对象的 start 方法;

第四步:使用 Connection 对象创建一个 Session 对象;

第五步:使用 Session 对象创建一个 Destination 对象(topic、queue),此处创建一个 Topic 对象;

第六步:使用 Session 对象创建一个 Producer 对象;

第七步:创建一个 Message 对象,创建一个 TextMessage 对象;

第八步:使用 Producer 对象发送消息;

第九步:关闭资源。

@Test
public void testTopicProducer() throws Exception {
    // 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
    // brokerURL服务器的ip及端口号
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
    // 第二步:使用ConnectionFactory对象创建一个Connection对象。
    Connection connection = connectionFactory.createConnection();
    // 第三步:开启连接,调用Connection对象的start方法。
    connection.start();
    // 第四步:使用Connection对象创建一个Session对象。
    // 第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
    // 第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个topic对象。
    // 参数:话题的名称。
    Topic topic = session.createTopic("test-topic");
    // 第六步:使用Session对象创建一个Producer对象。
    MessageProducer producer = session.createProducer(topic);
    // 第七步:创建一个Message对象,创建一个TextMessage对象。
    /*
     * TextMessage message = new ActiveMQTextMessage(); message.setText(
     * "hello activeMq,this is my first test.");
     */
    TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");
    // 第八步:使用Producer对象发送消息。
    producer.send(textMessage);
    // 第九步:关闭资源。
    producer.close();
    session.close();
    connection.close();
}

Consumer

消费者:接收消息。

第一步:创建一个 ConnectionFactory 对象;

第二步:从 ConnectionFactory 对象中获得一个 Connection 对象;

第三步:开启连接,调用 Connection 对象的 start 方法;

第四步:使用 Connection 对象创建一个 Session 对象;

第五步:使用 Session 对象创建一个 Destination 对象。和发送端保持一致 topic,并且话题的名称一致;

第六步:使用 Session 对象创建一个 Consumer 对象;

第七步:接收消息;

第八步:打印消息;

第九步:关闭资源。

@Test
public void testTopicConsumer() throws Exception {
    // 第一步:创建一个ConnectionFactory对象。
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.156:61616");
    // 第二步:从ConnectionFactory对象中获得一个Connection对象。
    Connection connection = connectionFactory.createConnection();
    // 第三步:开启连接。调用Connection对象的start方法。
    connection.start();
    // 第四步:使用Connection对象创建一个Session对象。
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
    Topic topic = session.createTopic("test-topic");
    // 第六步:使用Session对象创建一个Consumer对象。
    MessageConsumer consumer = session.createConsumer(topic);
    // 第七步:接收消息。
    consumer.setMessageListener(new MessageListener() {

        @Override
        public void onMessage(Message message) {
            try {
                TextMessage textMessage = (TextMessage) message;
                String text = null;
                // 取消息的内容
                text = textMessage.getText();
                // 第八步:打印消息。
                System.out.println(text);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    });
    System.out.println("topic的消费端03。。。。。");
    // 等待键盘输入
    System.in.read();
    // 第九步:关闭资源
    consumer.close();
    session.close();
    connection.close();
}