18 4 月, 2024

Manufacturing

Processing Machinery

Switch

7 min read

1. The role of Exchange (switch)
In RabbitMQ, the producer will not directly deliver the message to the queue when sending the message, but first deliver the message to the switch, and then forward it to the specific queue by the switch, and then the queue will push or pull the message to the consumer to consume

create message routing key pull/push

Producer —————————–> Switch ————— ——————> Queue —————————> Consumer

2. The type of Exchange (switch)
2.1. Direct switch: Direct Exchange
A direct-connected switch is a switch with a routing function. A queue is bound to a switch. In addition, a routing_key is bound. When a message is sent, a binding_key needs to be specified, and the message is sent to the switch. At that time, it will be sent to the designated queue by the switch. The same binding_key can also be applied to multiple queues. In this way, when a switch is bound to multiple queues, it will be sent to the corresponding queue for processing.

Note 1: What is a routing key
Each message has an attribute called a routing key, which is simply a string

Note 2: Applicable scenarios for directly connected switches
Tasks with priority send messages to corresponding queues according to the priority of the task, so that more resources can be assigned to process high-priority queues.

2.2. Topic Exchange: Topic Exchange
Disadvantages of direct switch!
The routing_key scheme of the direct switch is very simple. If we want to send a message to multiple queues, the switch needs to be bound with a lot of routing_keys.
Assume that each switch is bound with a bunch of routing_keys connected to each queue. Then the management of messages will be extremely difficult. So RabbitMQ provides a topic switch, and the message sent to the topic switch needs to carry the routing_key of the specified rule.
The topic exchange will send data to the corresponding (multiple) queues according to this rule.

The routing_key of the topic switch needs to have certain rules. The binding_key of the switch and the queue needs to adopt the format of *.#.*…., each part is separated by ., where
* indicates a word
# means any number (zero or more) of words.

Example:
The binding key of queue Q1 is *.TT.*
The binding key of queue Q2 is TT.#

If a message carries a routing key of A.TT.B, then queue Q1 will receive
If a message carries a routing key of TT.AA.BB, then queue Q2 will receive

2.3. Fan-shaped switch: Fanout Exchange
A fan switch is the most basic type of switch, and what it can do is very simple—broadcast messages.
The fan-shaped switch will send all the messages it can receive to the queue bound to itself. Because broadcasting requires no “thinking,” fanned switches process messages the fastest of all switch types.

This switch has no routing key concept, even if you bind a routing key, it will be ignored.

2.4. Header switch: Headers exchange
2.5. Default switch
In fact, it is a direct exchange (direct exchange) whose name is pre-declared by RabbitMQ as an empty string. It has a special property that makes it especially useful for simple applications: that is, each newly created queue (queue) is automatically bound to the default exchange, and the binding routing key (routing key) name is the same as the queue name.

For example: when you declare a queue named “hello”, RabbitMQ will automatically bind it to the default switch, and the binding routing key name is also “hello”.
Therefore, when a message with a routing key named “hello” is sent to the default exchange, the message will be routed to the queue named “hello” by the default exchange

Exchanges with names like amq.*:
These are the switches that RabbitMQ creates by default. These queue names are reserved for RabbitMQ internal use and cannot be used by applications, otherwise a 403 (ACCESS_REFUSED) error will be thrown

2.6. Dead Letter Exchange
By default, if a message is delivered to an exchange and the exchange finds that there is no matching queue for the message, the message will be silently discarded.
In order to solve this problem, there is a switch in RabbitMQ called a dead letter switch. When the consumer cannot process the received message, it republishes the message to another queue, waiting for retry or manual intervention. The exchange and queue in this process are the so-called “Dead Letter Exchange and Queue”

3. Properties of the switch
In addition to the switch type, there are many other attributes that can be attached to the switch when declaring it, the most important of which are:
Name: switch name
Durability: Whether to persist. If persistent, the switch still exists after RabbitMQ restarts
Auto-delete: When all the message queues bound to it have finished using this switch, delete it
Arguments: extended parameters

4. Comprehensive case: the use of switches
rabbitmq02 #main module maven
rabbitmq-provider #producer springboot
rabbitmq-consumer #consumer springboot

Add dependencies to submodules
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
4.1. Direct Exchange

1. RabbitmqDirectConfig
2. SendMessageController
Key code: rabbitTemplate.convertAndSend(“testDirectExchange”, “testDirectRouting”, map);
sendDirectMessage: push messages (can also be changed to a scheduled task, depending on the requirements)

3. View the rabbitmq management interface
We are currently creating a consumer rabbitmq-consumer, the message has not been consumed, let’s go to the rabbitMq management page to see if the push is successful
Overview tab, you can view the message just created
4. Create a message receiving listening class DirectReceiver

Note 1: The new version of jdk date and formatting
LocalDateTime.now().format(DateTimeFormatter.ofPattern(“yyyy-MM-dd HH:mm:ss”));

Note 2: What is the relationship between rabbitTemplate and amqpTemplate
Looking at the source code, you will find that rabbitTemplate is implemented from the amqpTemplate interface. There is no difference between the two in use, and the functions are the same.

Note 3: Do not write @Configuration as @Configurable, these two look alike
@Configuration This annotation can be used instead of XML files.
Objects manually new, under normal circumstances, Spring cannot rely on injection, this time you can use

@Configurable annotation

producer

package com.zking.rabbitmqproduct.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* Directly connected to the switch
*/
@Configuration
public class RabbitmqDirectConfig {
/**
* Define queue
* @return
*/
@Bean
public Queue directQueue(){
return new Queue(“direct-queue”);
}

/**
* Custom direct switch
* @return
*/
@Bean
public DirectExchange directExchange(){
return new DirectExchange(“direct-exchange”,true,false);
}

/**
* Bind the queue to the switch and set the routing key
* @return
*/
@Bean
public Binding binding(){
return BindingBuilder. bind(directQueue())
.to(directExchange())
.with(“direct_routing_key”);
}

}
consumer

package com.zking.rabbitmqconsumer.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@RabbitListener(queues = {“direct-queue”})
public class DirectReceiver {

@RabbitHandler
public void handler(Map<String,Object> json){
System.out.println(json);
}
}
4.2. Topic Exchange
RabbitTopicConfig

producer

package com.zking.rabbitmqproduct.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitmqTopicConfig {

@Bean
public Queue queueA(){
return new Queue(“queue-a”);
}

@Bean
public Queue queueB(){
return new Queue(“queue-b”);
}

@Bean
public Queue queueC(){
return new Queue(“queue-c”);
}

/**
* 定义主题交换机
* @return
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(“topic-exchange”,true,false);
}

@Bean
public Binding bindingA(){
return BindingBuilder.bind(queueA())
.to(topicExchange())
.with(“topic.person.aa”);
}

@Bean
public Binding bindingB(){
return BindingBuilder.bind(queueB())
.to(topicExchange())
.with(“topic.person.bb”);
}

@Bean
public Binding bindingC(){
return BindingBuilder.bind(queueC())
.to(topicExchange())
.with(“topic.person.*”);
}
}

consumers

package com.zking.rabbitmqconsumer.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class TopicReceiver {

@RabbitListener(queues = {“queue-a”})
@RabbitHandler
public void handlerA(Map<String,Object> json){
System.out.println(“The message from queue-a has been received: “+json);
}

@RabbitListener(queues = {“queue-b”})
@RabbitHandler
public void handlerB(Map<String,Object> json){
System.out.println(“The message from queue-b has been received: “+json);
}

@RabbitListener(queues = {“queue-c”})
@RabbitHandler
public void handlerC(Map<String,Object> json){
System.out.println(“The message from queue-c has been received: “+json);
}
}
4.3. Fanout Exchange
//Because it is a fan-type switch, the routing key does not need to be configured, and the configuration does not work, and the routing key is not configured in the two places
BindingBuilder.bind(queueA()).to(fanoutExchange());
rabbitTemplate. convertAndSend(RabbitFanoutConfig. EXCHANGE_NAME, null, map);
producer

package com.zking.rabbitmqproduct.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitmqFanoutConfig {

@Bean
public Queue queueX(){
return new Queue(“queue-x”);
}

@Bean
public Queue queueY(){
return new Queue(“queue-y”);
}

@Bean
public Queue queueZ(){
return new Queue(“queue-z”);
}

/**
* 定义扇形交换机,与路由键无关
* @return
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(“fanout-exchange”,true,false);
}

@Bean
public Binding bindingX(){
return BindingBuilder.bind(queueX())
.to(fanoutExchange());
}

@Bean
public Binding bindingY(){
return BindingBuilder.bind(queueY())
.to(fanoutExchange());
}

@Bean
public Binding bindingZ(){
return BindingBuilder.bind(queueZ())
.to(fanoutExchange());
}
}

consumer

package com.zking.rabbitmqconsumer.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class FanoutReceiver {

@RabbitListener(queues = {“queue-x”})
@RabbitHandler
public void handlerY(Map<String,Object> json){
System.out.println(“已接受到队列queue-x传递过来的消息:”+json);
}

@RabbitListener(queues = {“queue-y”})
@RabbitHandler
public void handlerX(Map<String,Object> json){
System.out.println(“已接受到队列queue-y传递过来的消息:”+json);
}

@RabbitListener(queues = {“queue-z”})
@RabbitHandler
public void handlerZ(Map<String,Object> json){
System.out.println(“已接受到队列queue-z传递过来的消息:”+json);
}
}