?Initial RabbitMQ
When we came into contact with the queue, we still introduced AQS. Inside AQS, a waiting queue based on FIFO (First Input First Output) was maintained. For the rabbitmq knowledge recorded today, it is also a first-in first-out queue, but this queue is a queue for storing messages.
RabbitMQ (message queue) message queue is created by RabbitMQ based on the Erlang development language. It supports multiple protocols, such as AMQP protocol, XMPP protocol, SMTP protocol, STOMP protocol and so on.
When we make synchronous calls in the project, we will encounter many problems. For example, when the producers of the services have some problems, the consumers who call these service providers will also have problems and the projects will be unavailable; and the synchronous calls will make The coupling degree of the project is relatively high. Every time a new requirement is added, the source code needs to be modified; the performance of the synchronous call is not so good, because the next step of the synchronous call will not be performed until the previous step is completed. If the calling time is relatively long, you need to wait for a long time.
And asynchronous communication can just solve these problems. Among them, the RabbitMQ message queue is a kind of asynchronous communication.
?Advantages of RabbitMQ
Above we learned about the disadvantages of synchronous calls and the advantages of asynchronous communication, and then we understand the advantages of RabbitMQ.
? Application decoupling
Take the e-commerce projects we usually do as an example. In e-commerce projects, we usually have payment services, order services, inventory systems, logistics services, etc. For example, after the user places an order, if the order service and inventory service are called in a coupled manner, if one of the systems fails, it will cause the user to fail to place an order. However, if the logistics service is called asynchronously based on the message queue, if the logistics service fails, we only need to spend time processing the logistics service, which will not affect the user’s order payment service. When the bug processing of the logistics service is completed, it can return to normal.
?Traffic peak clipping
Take our daily life as an example. Usually, there will be great shopping discounts on Double Eleven and Double Twelve. During this period, there will be a lot of orders from users. If the order flow is not controlled, the server will easily crash. Therefore, we can use the message queue as a buffer, limit the number of orders, or disperse orders within a certain period of time for processing. Some users receive a reminder that the order was successfully placed within a short period of time after placing the order. This can effectively control the situation of extremely high flow rate.
?Asynchronous processing
Synchronous calls need to execute the system step by step. The total execution time is the sum of the time spent by each subsystem module, while asynchronous processing allows the subsystem to consume its own corresponding messages from the corresponding message queue, and execute the corresponding messages locally. corresponding operation. Therefore, asynchronous processing can greatly improve the response speed and throughput of the system.
?Types of MQ
There are many types of message queues, such as RabbitMQ, RocketMQ, ActiveMQ, and KafKa, which are well known to us. Let’s briefly introduce them next.
RabbitMQ was created based on the Erlang development language, which supports multiple protocols, such as AMQP protocol, XMPP protocol, SMTP protocol, STOMP protocol and so on. Its availability ratio is very high, the throughput of a single machine is average, the message delay is at the microsecond level, and the message reliability is relatively high.
ActiveMQ is an open source middleware developed by Apache. The development language is developed in Java. It supports multiple programming languages and protocols, such as OpenWire protocol, STOMP protocol, RESY protocol, AMQP protocol, XMPP protocol, etc., and its usability is general. , the stand-alone throughput is slightly worse, the message delay is millisecond level, and the message reliability is average.
RocketMQ is a kind of message queue developed by Alibaba, and it is also developed by Java language. The RocketMQ protocol supports customization, high throughput, high availability, high message reliability, and time-sensitive performance delay at the millisecond level.
KafKa is a message queue developed by the Apache community using Scala and Java languages. It supports custom protocols, has very high availability, and has a very high stand-alone throughput. It is mostly used in distributed architectures. It was open sourced by Ali and then delivered to the Apache community. Message reliability is average.
RabbitMQ performs very well in all aspects, so we can choose RabbitMQ to learn.
?RabbitMQ component architecture
Publisher: The producer of the message.
Consumer : The consumer of the message.
Broker: Mainly used to receive and distribute messages, RabbitMQ Server is Message Broker.
Virtual host: As the name suggests, a virtual host is similar to the namespace (namespace) concept in nacos. When multiple different users use the services provided by the same RabbitMQ server, multiple vhosts can be divided, and each user creates an exchange/queue in his own vhost. The vhost needs to be specified when connecting, and the default vhost is /.
Connection: TCP connections established between producers and consumers and brokers.
Channel: Chinese means pipeline, used for two-way data flow. Whether publishing messages, subscribing to queues or receiving messages, these actions are all done through pipelines. Because establishing and destroying TCP is very expensive for the operating system, the concept of pipeline is introduced to reuse a TCP connection.
Exchange: exchange is an exchange that receives messages sent by producers and routes them to queues in the server.
Queue: Message queue, used to save messages until sent to consumers. It is a container for messages. A message can be put into one or more queue messages, and finally sent here to wait for the consumer to pick it up.
Binding: Binding relationship, mainly used for the association between switches and queues. Binding can contain routing keys, and switches and message queues are associated through routing keys (Routing Key).
?Spring AMQP
Let’s take a look at the official SpringAMQP introduction:
The Spring AMQP project applies Spring’s core concepts to the development of AMQP-based messaging solutions. It provides a “template” as a high-level abstraction for sending and receiving messages. It also provides support for message-driven POJOs with “listener containers”. These libraries help manage AMQP resources while facilitating the use of dependency injection and declarative configuration. In all of these cases, you will see similarities with the JMS support in the Spring Framework.
feature
A listener container for processing inbound messages asynchronously
Templates for sending and receiving messages
RabbitAdmin for automatically declaring queues, exchanges and bindings
We use SpringAMQP to implement the basic message queue function in HelloWorld:
We first create two modules, the publisher producer module and the consumer consumer. By sending messages in the producer, the consumer processes the messages.
Since both the publisher module and the consumer module require SpringAMQP dependencies, the dependencies are introduced in the pom file of the parent process:
<!–AMQP依赖,包含RabbitMQ–>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Then configure the relevant address information of RabbitMQ in the yaml of the publisher module:
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
spring:
rabbitmq:
host: 192.168.220.135
port: 5672
username: xiaowei
password: 123456
virtual-host: /
Then write the code in the test class of the publisher module:
package cn.itcast.mq.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
public class SpringAmqpTest {
@Autowired //自动装配,和我们使用的RedisTemplate相似
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue() {
String queueName=”simple.queue”;
String message = “hello, spring amqp!”;
rabbitTemplate.convertAndSend(queueName,message);
}
}
Until now, the producer can send messages, start the project, and log in to the RabbitMQ official website to see a message in the queue:
Then configure the dependency and application.yaml of the consumer module, which is the same as the producer:
<!–AMQP依赖,包含RabbitMQ–>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
spring:
rabbitmq:
host: 192.168.220.135
port: 5672
username: xiaowei
password: 123456
virtual-host: /
Then write a class to receive producer messages:
package cn.itcast.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component //加上注解,交给spring容器来管理
public class SpringRabbitListener {
@RabbitListener(queues=”simple.queue”) //simple.queue是创建的队列名称
public void listenSimpleQueue(String msg){ //由于生产者那里是string类型,因此这里一样类型
System.out.println(“消费者已经接收到simple.queue的消息:[” + msg + “]”);
}
}
Start the consumer module project, run it, and check the console to see that the consumer can get the message sent by the producer:
We open the interface of the official website again, and after refreshing, we will find that the message has been consumed: