易课寄在线购课系统开发笔记(二十二)–应用ActiveMQ实现同步索引库
同步索引库分析
方案一:在 ecourses-bms 中,添加课程的业务逻辑中,添加一个同步索引库的业务逻辑。
缺点:业务逻辑耦合度高,业务拆分不明确。
方案二:业务逻辑在 ecourses-search 中实现,调用服务在 ecourses-bms 实现,业务逻辑分开。
缺点:服务之间的耦合度变高,服务的启动有先后顺序。
方案三:使用消息队列。MQ是一个消息中间件。
MQ 是一个消息中间件,常见的有 ActiveMQ、RabbitMQ、Kafka
ActiveMQ
什么是 ActiveMQ
ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现,尽管 JMS 规范出台已经是很久之前的事情了,但是 JMS 在当今的 J2EE 应用中间仍然扮演着特殊的地位。
主要特点:
-
多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire, Stomp REST, WS Notification, XMPP, AMQP;
-
完全支持 JMS1.1 和 J2EE 1.4 规范 (持久化,XA 消息,事务);
-
对 Spring 的支持,ActiveMQ 可以很容易内嵌到使用 Spring 的系统里面去,而且也支持 Spring 2.0 的特性;
-
通过了常见 J2EE 服务器 (如 Geronimo, JBoss 4, GlassFish, WebLogic) 的测试,其中通过 JCA 1.5 resource adaptors 的配置,可以让 ActiveMQ 可以自动的部署到任何兼容 J2EE 1.4 的商业服务器上;
-
支持多种传送协议: in-VM, TCP, SSL, NIO, UDP, JGroups, JXTA;
-
支持通过 JDBC 和 journal 提供高速的消息持久化;
-
从设计上保证了高性能的集群,客户端——服务器,点对点;
-
支持 Ajax;
-
支持与 Axis 的整合;
-
可以很容易得调用内嵌 JMS provider 进行测试。
ActiveMQ 的消息形式
对于消息的传递有两种类型:
一种是点对点的,即一个生产者和一个消费者一一对应;
另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
JMS 定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
- StreamMessage —— Java原始值的数据流
- MapMessage —— 一套名称——值对
- TextMessage —— 一个字符串对象
- ObjectMessage —— 一个序列化的 Java 对象
- BytesMessage —— 一个字节的数据流
ActiveMQ 的安装
请参阅下面这篇笔记:
CentOS 6学习笔记(十一)–CentOS6环境安装ActiveMQ
ActiveMQ 的使用方法
Queue
Producer
生产者:生产消息,发送端。
把 jar 包添加到工程中。使用 5.11.2 版本的 jar 包。
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
</dependency>
第一步:创建 ConnectionFactory 对象,需要指定服务端 IP 及端口号;
第二步:使用 ConnectionFactory 对象创建一个 Connection 对象;
第三步:开启连接,调用 Connection 对象的 start 方法;
第四步:使用 Connection 对象创建一个 Session 对象;
第五步:使用 Session 对象创建一个 Destination 对象(topic、queue),此处创建一个 Queue 对象;
第六步:使用 Session 对象创建一个 Producer 对象;
第七步:创建一个 Message 对象,创建一个 TextMessage 对象;
第八步:使用 Producer 对象发送消息;
第九步:关闭资源。
@Test
public void testQueueProducer() throws Exception {
// 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
//brokerURL服务器的ip及端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.156:61616");
// 第二步:使用ConnectionFactory对象创建一个Connection对象。
Connection connection = connectionFactory.createConnection();
// 第三步:开启连接,调用Connection对象的start方法。
connection.start();
// 第四步:使用Connection对象创建一个Session对象。
//第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
//第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
//参数:队列的名称。
Queue queue = session.createQueue("test-queue");
// 第六步:使用Session对象创建一个Producer对象。
MessageProducer producer = session.createProducer(queue);
// 第七步:创建一个Message对象,创建一个TextMessage对象。
/*TextMessage message = new ActiveMQTextMessage();
message.setText("hello activeMq,this is my first test.");*/
TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
// 第八步:使用Producer对象发送消息。
producer.send(textMessage);
// 第九步:关闭资源。
producer.close();
session.close();
connection.close();
}
Consumer
消费者:接收消息。
第一步:创建一个 ConnectionFactory 对象;
第二步:从 ConnectionFactory 对象中获得一个 Connection 对象;
第三步:开启连接。调用 Connection 对象的 start 方法;
第四步:使用 Connection 对象创建一个 Session 对象;
第五步:使用 Session 对象创建一个 Destination 对象,和发送端保持一致 queue,并且队列的名称一致;
第六步:使用 Session 对象创建一个 Consumer 对象;
第七步:接收消息;
第八步:打印消息;
第九步:关闭资源。
@Test
public void testQueueConsumer() throws Exception {
// 第一步:创建一个ConnectionFactory对象。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.156:61616");
// 第二步:从ConnectionFactory对象中获得一个Connection对象。
Connection connection = connectionFactory.createConnection();
// 第三步:开启连接。调用Connection对象的start方法。
connection.start();
// 第四步:使用Connection对象创建一个Session对象。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
Queue queue = session.createQueue("test-queue");
// 第六步:使用Session对象创建一个Consumer对象。
MessageConsumer consumer = session.createConsumer(queue);
// 第七步:接收消息。
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = null;
//取消息的内容
text = textMessage.getText();
// 第八步:打印消息。
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//等待键盘输入
System.in.read();
// 第九步:关闭资源
consumer.close();
session.close();
connection.close();
}
Topic
Producer
使用步骤:
第一步:创建 ConnectionFactory 对象,需要指定服务端 IP 及端口号;
第二步:使用 ConnectionFactory 对象创建一个 Connection 对象;
第三步:开启连接,调用 Connection 对象的 start 方法;
第四步:使用 Connection 对象创建一个 Session 对象;
第五步:使用 Session 对象创建一个 Destination 对象(topic、queue),此处创建一个 Topic 对象;
第六步:使用 Session 对象创建一个 Producer 对象;
第七步:创建一个 Message 对象,创建一个 TextMessage 对象;
第八步:使用 Producer 对象发送消息;
第九步:关闭资源。
@Test
public void testTopicProducer() throws Exception {
// 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
// brokerURL服务器的ip及端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
// 第二步:使用ConnectionFactory对象创建一个Connection对象。
Connection connection = connectionFactory.createConnection();
// 第三步:开启连接,调用Connection对象的start方法。
connection.start();
// 第四步:使用Connection对象创建一个Session对象。
// 第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
// 第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个topic对象。
// 参数:话题的名称。
Topic topic = session.createTopic("test-topic");
// 第六步:使用Session对象创建一个Producer对象。
MessageProducer producer = session.createProducer(topic);
// 第七步:创建一个Message对象,创建一个TextMessage对象。
/*
* TextMessage message = new ActiveMQTextMessage(); message.setText(
* "hello activeMq,this is my first test.");
*/
TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");
// 第八步:使用Producer对象发送消息。
producer.send(textMessage);
// 第九步:关闭资源。
producer.close();
session.close();
connection.close();
}
Consumer
消费者:接收消息。
第一步:创建一个 ConnectionFactory 对象;
第二步:从 ConnectionFactory 对象中获得一个 Connection 对象;
第三步:开启连接,调用 Connection 对象的 start 方法;
第四步:使用 Connection 对象创建一个 Session 对象;
第五步:使用 Session 对象创建一个 Destination 对象。和发送端保持一致 topic,并且话题的名称一致;
第六步:使用 Session 对象创建一个 Consumer 对象;
第七步:接收消息;
第八步:打印消息;
第九步:关闭资源。
@Test
public void testTopicConsumer() throws Exception {
// 第一步:创建一个ConnectionFactory对象。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.156:61616");
// 第二步:从ConnectionFactory对象中获得一个Connection对象。
Connection connection = connectionFactory.createConnection();
// 第三步:开启连接。调用Connection对象的start方法。
connection.start();
// 第四步:使用Connection对象创建一个Session对象。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
Topic topic = session.createTopic("test-topic");
// 第六步:使用Session对象创建一个Consumer对象。
MessageConsumer consumer = session.createConsumer(topic);
// 第七步:接收消息。
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = null;
// 取消息的内容
text = textMessage.getText();
// 第八步:打印消息。
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.out.println("topic的消费端03。。。。。");
// 等待键盘输入
System.in.read();
// 第九步:关闭资源
consumer.close();
session.close();
connection.close();
}