springboot与消息

概述

  • 大多数应用中,可以通过消息服务中间件来提升系统异步通信、扩展解耦能力

  • 消息服务中两个重要概念

    • 消息代理(message broker)
    • 目的地(destination)

    当消息发送者发送消息后,由消息代理接管,消息代理保证消息传递到目的地

  • 消息队列主要有两种形式的目的地

    • 队列:点对点消息通信(point-to-point)
    • 主题:发布(publish)订阅(subscribe)消息通信
  • 消息代理规范

    • JMS JAVA消息服务 eg: ActiveMQ HornetMQ
    • AMQP 高级消息队列协议规范 eg:RabbitMQ

Spring支持

  • spring-jms提供了对JMS的支持
  • spring-rabbit提供对AMQP的支持
  • 需要ConnectionFactory的实现来连接消息代理
  • 提供JmsTemplate、RabbitTemplate来发送消息
  • @JmsListener、@RabbitListener注解在方法上代理发布的消息
  • @EnableJms、@EnableRabbit开启支持

Spring Boot自动配置

  • JmsAutoConfiguration
  • RabbitAutoConfiguration

RabiitMQ

简介

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端

安装

在linux安装RabiitMQ时安装的tags选择带management可以提供web管理界面

RabiitMQ主机端口为5672,Web管理端口为15672

路由

路由有4种模式,其中headers一般不用

  • direct 点对点模式,只有发送消息时的路由键和队列绑定的路由键一致才会收到
  • fanout 广播模式,所有队列都会收到,速度最快
  • topic 匹配模式,只有发送消息的路由键与队列绑定的路由键匹配才会收到。eg:队列绑定的名为sun ,路由发送的路由名为sun.#或sun.*(其中#表示一个单词,*表示多个单词)

死信队列

死信条件:

  • 消息过期,自身过期或者所在队列超时
  • nack,且requene为false
  • 超过队列的maxlength

消费者示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class consumer {

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

Connection connection=connectionFactory.newConnection();
Channel channel = connection.createChannel();

String exchange="test_dlx_exchange";
String quene="test_dlx";
String routKey="dlx.#";

Map<String, Object> arguments=new HashMap<String, Object>();
/**
* 增加死信队列的arguments
*/
arguments.put("x-dead-letter-exchange","dlx.exchange");
channel.exchangeDeclare(exchange,BuiltinExchangeType.TOPIC,true,false,null);
channel.queueDeclare(quene,true,false,false,arguments);
channel.queueBind(quene,exchange,routKey);

/**
* 死信队列声明
*/
channel.exchangeDeclare("dlx.exchange",BuiltinExchangeType.TOPIC,true,false,null);
channel.queueDeclare("dlx.quene",true,false,false,null);
channel.queueBind("dlx.quene","dlx.exchange","#");
/**
* autoAck直接为true测试
*/
channel.basicConsume(quene,true,new MyConsumer(channel));
}

}

consumer示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class MyConsumer extends DefaultConsumer {

private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel=channel;
}

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));


}
}

生产者示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Producter {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("60.205.218.74");
factory.setPort(5672);
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

String exChange="test_dlx_exchange";
String routKey="dlx.save";


for(int i=0;i<1;i++){
String msg="hellow ops rabbit mq dlx ";
AMQP.BasicProperties prps=new AMQP.BasicProperties.Builder()
.deliveryMode(2)//消息持久化
.contentEncoding("UTF-8")
.expiration("10000")
.build();
channel.basicPublish(exChange,routKey,true,prps,msg.getBytes());
}

/**
* 关闭通道和链接
*/
channel.close();
connection.close();
}
}

流控

消费者示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class consumer {

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("60.205.218.74");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");

Connection connection=connectionFactory.newConnection();
Channel channel = connection.createChannel();

String exchange="test_limit_exchange";
String quene="test_limit";
String routKey="confrim.#";

channel.exchangeDeclare(exchange,BuiltinExchangeType.TOPIC);
channel.queueDeclare(quene,true,false,false,null);
channel.queueBind(quene,exchange,routKey);
/**
* 消费者手工签收 autoAck必须为false
*/
channel.basicQos(0,1,false);
channel.basicConsume(quene,false,new MyConsumer(channel));
}

}

consumer示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class MyConsumer extends DefaultConsumer {

private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel=channel;
}

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
/**
* multiple是否一次性都ack签收
*/
channel.basicAck(envelope.getDeliveryTag(),false);

}
}

整合Springboot

  • RabbitAutoConfiguration 自动配置类
  • CachingConnectionFactory 连接工厂
  • RabbitTemplate 用于发送接收消息
  • AmqpAdmin 用于管理队列

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Springboo12rabbitmqApplicationTests {

@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void contextLoads() {

String exchange="exnchange.direct";
String routkey="sun";
Message message;
Map<String,Object> object=new HashMap();
object.put("index","1");
object.put("data",Arrays.asList("1","ss","我擦123哦"));
// rabbitTemplate.send(exchange,routkey,message);
//只需要传入要发送的对象,自动序列化,保存发送给rabbitmq,默认当成消息
rabbitTemplate.convertAndSend(exchange,routkey,object);
}

@Test
public void recevie(){
Object sun = rabbitTemplate.receiveAndConvert("sun");
System.out.println(sun);

}

}

@RabbitListener

eg:

1
2
3
4
5
6
7
8
9
@Service
public class BookService {

@RabbitListener(queues="sun")
public void recive(Object o){
System.out.println("收到消息"+o);
}

}

AmqpAdmin

可以使用AmqpAdmin进行管理路由和队列