首页
统计
关于
Search
1
Sealos3.0离线部署K8s集群
1,338 阅读
2
类的加载
888 阅读
3
Spring Cloud OAuth2.0
880 阅读
4
SpringBoot自动装配原理
769 阅读
5
集合不安全问题
670 阅读
笔记
Java
多线程
注解和反射
JVM
JUC
设计模式
Mybatis
Spring
SpringMVC
SpringBoot
MyBatis-Plus
Elastic Search
微服务
Dubbo
Zookeeper
SpringCloud
Nacos
Sentinel
数据库
MySQL
Oracle
PostgreSQL
Redis
MongoDB
工作流
Activiti7
Camunda
消息队列
RabbitMQ
前端
HTML5
CSS
CSS3
JavaScript
jQuery
Vue2
Vue3
Canvas
React
Linux
容器
Docker
Containerd
Podman
Kubernetes
Python
FastApi
OpenCV
数据分析
牛牛生活
登录
Search
标签搜索
Java
CSS
mysql
RabbitMQ
JavaScript
Redis
OpenCV
JVM
Mybatis-Plus
Camunda
多线程
CSS3
Python
React
Canvas
Spring Cloud
注解和反射
Activiti
工作流
SpringBoot
蘇阿細
累计撰写
464
篇文章
累计收到
4
条评论
首页
栏目
笔记
Java
多线程
注解和反射
JVM
JUC
设计模式
Mybatis
Spring
SpringMVC
SpringBoot
MyBatis-Plus
Elastic Search
微服务
Dubbo
Zookeeper
SpringCloud
Nacos
Sentinel
数据库
MySQL
Oracle
PostgreSQL
Redis
MongoDB
工作流
Activiti7
Camunda
消息队列
RabbitMQ
前端
HTML5
CSS
CSS3
JavaScript
jQuery
Vue2
Vue3
Canvas
React
Linux
容器
Docker
Containerd
Podman
Kubernetes
Python
FastApi
OpenCV
数据分析
牛牛生活
页面
统计
关于
搜索到
464
篇与
的结果
2024-03-03
新增的语义化标签
1. 布局标签<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>新增布局标签</title> </head> <body> <!-- 头部 --> <header> <h1>xxx商城</h1> </header> <hr> <!-- 主导航 --> <nav> <a href="#">首页</a> <a href="#">订单</a> <a href="#">购物车</a> <a href="#">我的</a> </nav> <!-- 主要内容 --> <div class="page-content"> <article> <h2>感动中国人物</h2> <section> <h3>第一名:孙笑川</h3> <p>男,籍贯四川,33岁</p> </section> <section> <h3>第二名:药水哥</h3> <p>真名刘波,男,籍贯湖北,30岁</p> </section> <section> <h3>第三名:Giao哥</h3> <p>男,籍贯河南,29岁</p> </section> </article> <!-- 侧边栏导航 --> <aside style="float: right;"> <nav> <ul> <li><a href="#">秒杀专区</a></li> <li><a href="#">会员专区</a></li> <li><a href="#">优惠券专区</a></li> <li><a href="#">品牌专区</a></li> </ul> </nav> </aside> </div> <hr> <footer> <nav> <a href="#">友情链接1</a> <a href="#">友情链接2</a> <a href="#">友情链接3</a> <a href="#">友情链接4</a> </nav> </footer> </body> </html>2. 状态标签<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>新增的状态标签</title> </head> <body> <span>手机电量:</span> <meter min="0" max="100" value="5" low="10" high="20" optimum="90"></meter> <span>当前进度:</span> <progress max="100" value="80"></progress> </body> </html>3. 列表标签<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>新增的列表标签</title> </head> <body> <form action="#"> <input type="text" list="myData"> <button>搜索</button> </form> <datalist id="myData"> <option value="孙笑川">孙笑川</option> <option value="药水哥">药水哥</option> <option value="Giao哥">Giao哥</option> </datalist> <hr> <details> <summary>如何一夜暴富</summary> <p>白日做梦</p> </details> </body> </html>4. 文本标签<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>新增的文本标签</title> </head> <body> <ruby> <span>魑魅魍魉</span> <rt>chī mèi wǎng liǎng</rt> </ruby> <hr> <p>Lorem ipsum <mark>dolor</mark> sit amet consectetur adipisicing elit. Ipsa in quae molestias id nesciunt consequatur ex deserunt enim reiciendis obcaecati!</p> </body> </html>
2024年03月03日
85 阅读
0 评论
0 点赞
2023-06-25
MongoDB 笔记
//进入test数据库 use test; //向user集合插入一个文档 db.user.insert({ username: '孙笑川' }); //查询user集合中的文档 db.user.find(); //再插入一个 db.user.insert({ username: '药水哥' }); //统计user集合中文档的数量 db.user.find().count(); //按条件查询 db.user.find({username: '孙笑川'}); //向user集合中username为孙笑川的文档添加一个age属性,值为33 db.user.update({username: '孙笑川'}, {$set: {age: 33}}); //使用{username: 'Giao哥'}替换username为药水哥的文档 db.user.replaceOne({username: '药水哥'}, {username: 'Giao哥'}); //删除username为孙笑川的文档的age属性 db.user.update({username: '孙笑川'}, {$unset: {age: 1}}) //向username为孙笑川的文档添加一个hobby:{cities: ['北京', '上海', '深圳']} //MongoDB的属性值也可以是一个文档,当一个文档的属性也是一个文档时,称这个文档为内嵌文档 db.user.update({username: '孙笑川'}, {$set: {hobby:{cities: ['北京', '上海', '深圳']}}}); //向username为Giao哥的文档添加一个hobby:{movies: ['007', '碟中谍', '加勒比海盗']} db.user.update({username: 'Giao哥'}, {$set: {hobby:{movies: ['007', '碟中谍', '加勒比海盗']}}}); //查询喜欢007电影的文档 //MongoDB支持内嵌文档的属性直接查询,此时属性名必须带引号 db.user.find({'hobby.movies':'007'}); //向Giao哥添加一个新的电影 //$push 向集合中添加一个新的元素 //$addToSet 向集合中添加一个新的元素(如果数组中已存在了该元素,则不执行添加) db.user.update({username: 'Giao哥'}, {$push: {'hobby.movies': '大话西游'}}); db.user.update({username: 'Giao哥'}, {$addToSet: {'hobby.movies': '大话西游'}}); //删除喜欢北京的用户文档 db.user.remove({'hobby.cities': '北京'}); //删除user集合 db.user.remove({}); db.user.drop(); show collections; //向number集合中插入20000条数据 for(var i = 1; i <= 20000; i++) { db.number.insert({num: i}); } //11s db.number.find().count(); db.number.drop(); var arr = []; for(var i = 1; i <= 20000; i++) { arr.push({num: i}); } db.number.insert(arr); //0.857s db.number.find().count(); //查找num为500的文档 db.number.find({num: 500}); //查找num大于500的文档 大于 $gt 大于等于 $gte 小于 $lt 小于等于 $lte 不等于 $ne db.number.find({num: {$gt: 500}}); //查找num大于40小于50的文档 db.number.find({num: {$gt: 40, $lt: 50}}); //查找number的前10条数据 db.number.find().limit(10); //查找number的11条到20条数据 skip 跳过指定数量的数据 //skip((pageNum - 1) * 10).limit(pageSize) db.number.find().skip(10).limit(10); //MongoDB会自定调整limit和skip的位置,无顺序影响 db.number.find().limit(10).skip(10); //文档间的关系 //一对一 db.couple.insert([ { name: '黄蓉', husband: { name: '郭靖' } }, { name: '潘金莲', husband: { name: '武大郎' } } ]); db.couple.find(); //一对多 用户 user 订单order(使用内嵌文档可能会出现数据越来越多难以维护的情况) db.user.insert([ {username: '孙笑川'}, {username: '药水哥'}, {username: 'Giao哥'} ]); db.user.find(); db.order.insert([ { list: ['苹果', '西瓜', '梨'], user_id: ObjectId("649847f13d750000000045e4") }, { list: ['铅笔', '钢笔', '圆珠笔'], user_id: ObjectId("649847f13d750000000045e5") }, { list: ['羽毛球', '篮球'], user_id: ObjectId("649847f13d750000000045e6") }, ]); db.order.find(); //查找孙笑川的订单 var user_id = db.user.findOne({username: '孙笑川'})._id; db.order.find({user_id: user_id}); //多对多 老师 teacher 学生 student db.teacher.insert([ {name: '孙笑川'}, {name: '药水哥'}, {name: 'Giao哥'} ]); db.teacher.find(); db.student.insert([ { name: '小明', teacher_ids: [ ObjectId("64984a183d750000000045ea"), ObjectId("64984a183d750000000045eb") ] }, { name: '小华', teacher_ids: [ ObjectId("64984a183d750000000045ea"), ObjectId("64984a183d750000000045ec") ] }, ]); db.student.find(); //查询工资小于1000或大于2500的员工 db.emp.find({$or: [{sal: {$lt: 1000}}, {sal: {$gt: 2500}}]}); //查询财务部的所有员工 var dept_no = db.dept.findOne({dname: '财务部'}).deptno; db.emp.find({depno: dept_no}) //为工资低于1000的员工加400工资 $inc 在原基础上增加 db.emp.updateMany({sal: {$lte: 1000}}, {$inc: {sal: 400}}); //查询emp并按照_id升序排列 //sort() 按照指定属性排序 1 升序 -1 降序 //先按sal升序,再按empno降序 db.emp.find({}).sort({sal: 1, empno: -1}); //在查询时,可以在第二个参数设置要查询的列(投影) db.emp.find({}, {ename: 1, _id: 0, sal: 1});
2023年06月25日
231 阅读
0 评论
0 点赞
2023-06-23
RabbitMQ - 其他补充
1. 幂等性(1)概念用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。(2)消息重复消费消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。(3)解决思路一般使用全局 ID、唯一标识(时间戳、UUID)或 MQ 消息的 id tag 来判断,亦或是zi定义全局唯一 id,每次消费消息时先判断 id 对应的消息是否已消费过(4)消费端的幂等性保障在海量订单生成的业务高峰期,生产端有可能就会重复发送消息,这时候消费端就要实现幂等性,即消息永远不会被消费多次(即使多次收到了一样的消息)。唯一ID + 指纹码机制,利用数据库主键去重利用 redis 的原子性实现(setnx)2. 优先级队列(1)设置方式在管理页面添加在代码中添加Map<String, Object> arguments = new HashMap<>(); //优先级范围 0-10(设置过大会存在性能问题) arguments.put("x-max-priority", 10); channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);设置优先级之后的队列(2)测试Producerpublic class Producer { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.123.88"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); //创建连接 Connection connection = factory.newConnection(); //获取信道 Channel channel = connection.createChannel(); /** * 创建队列 * 参数说明: * 1.队列名称 * 2.是否持久化存储消息 * 3.是否进行消息共享(只供一个消费者进行消费) * 4.是否自动删除(最后一个消费者断开连接后,是否自动删除该队列) * 5.其他参数 */ /** * 2023-06-15偷懒 优先级队列 */ Map<String, Object> arguments = new HashMap<>(); //优先级范围 0-10(设置过大会存在性能问题) arguments.put("x-max-priority", 10); channel.queueDeclare(QUEUE_NAME, true, false, false, arguments); for (int i = 1; i < 10; i++) { String msg = "hello world" + i; if (i == 5) { AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build(); channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes()); } else { channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); } } } }Consumerpublic class Consumer { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.123.88"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println(new String(message.getBody())); CancelCallback cancelCallback = (consumerTag) -> System.out.println("消息消费由于某些原因中断"); /** * 接收消息 * 1.队列名称 * 2.消费成功后是否自动应答 * 3.未成功消费的回调 * 4.取消消费的回调 */ channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }3. 惰性队列(1)使用场景RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念,惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或是由于维护停机等)而导致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。 默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存中,这样可以更快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留 一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间(同步操作),进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法, 但是效果始终不太理想,尤其是在消息量特别大的时候。(2)两种模式default(默认模式):在 3.6.0 之前的版本无需做任何变更lazy 模式:即惰性队列模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过 Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。注:如果要通过声明的方式改变已有队列的模式,那么只能先删除队列,然后再重新声明一个新的队列,同时已产生的消息会同步删除。在管理界面设置在代码中设置Map<String, Object> arguments = new HashMap<>(); arguments.put("x-queue-mode", "lazy"); channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);(3)内存开销对比
2023年06月23日
143 阅读
0 评论
0 点赞
2023-06-23
RabbitMQ - 发布确认补充
1. 发布确认在RabbitMQ不可用的情况下,如何处理无法投递的消息?在application.yml配置文件中添加# 消息确认类型 执行回调 publisher-confirm-type: correlated补充:NONE 禁用发布确认模式,是默认值CORRELATED 发布消息成功到交换器后会触发回调方法SIMPLE 包含两种效果:和 CORRELATED 值一样会触发回调方法在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker(1)队列配置@Configuration public class ConfirmQueueConfig { public static final String CONFIRM_EXCHANGE = "confirm.exchange"; public static final String CONFIRM_QUEUE = "confirm.queue"; public static final String CONFIRM_ROUTING_KEY = "key1"; @Bean("confirmExchange") public DirectExchange confirmExchange() { return new DirectExchange(CONFIRM_EXCHANGE); } @Bean("confirmQueue") public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE).build(); } @Bean public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs(); } }(2)消息生产者@Slf4j @RestController @RequestMapping("/confirm") public class SendConfirmMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{msg}/{routingKey}") @ApiOperation("发送消息(发布确认)") public void sendMsg(@PathVariable("msg") String msg, @PathVariable("routingKey") String routingKey) { log.info("当前时间:{},发送消息:{},routingKey:{}", new Date(), msg, routingKey); //正常发送 rabbitTemplate.convertAndSend(ConfirmQueueConfig.CONFIRM_EXCHANGE, routingKey, msg, new CorrelationData(UUID.randomUUID().toString())); } }(3)回调接口@Slf4j @Component public class ConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; /** * 注入 */ @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } /** * 交换机确认回调方法 * * @param correlationData correlation data for the callback. 回调消息的ID及相关的信息 * @param ack true for ack, false for nack * @param cause An optional cause, for nack, when available, otherwise null. */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (correlationData != null) { String id = correlationData.getId() != null ? correlationData.getId() : ""; if (ack) { log.info("交换机收到ID:{} 的消息", id); } else { log.error("交换机未收到ID:{} 的消息,原因:{}", id, cause); } } else { log.error("消息接收发生了异常!"); } } }(4)消费者@Slf4j @Component public class ConfirmConsumer { @RabbitListener(queues = ConfirmQueueConfig.CONFIRM_QUEUE) public void receiveMsg(Message message) { String msg = new String(message.getBody()); log.info("接收到 {} 队列的消息:{}", ConfirmQueueConfig.CONFIRM_QUEUE, msg); } }(5)测试可以看到,发送了两条消息,第一条消息的 RoutingKey 为 "key1",第二条消息的 RoutingKey 为 "key2",两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了2.回退消息(1)Mandatory 参数在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,并且生产者是不知道消息被丢弃的。此时可通过设置 mandatory 参数,当消息在传递过程中不可达目的地时将消息返回给生产者(2)包含回退的队列设置@Configuration public class ConfirmQueueConfig { public static final String CONFIRM_EXCHANGE = "confirm.exchange"; public static final String CONFIRM_QUEUE = "confirm.queue"; public static final String CONFIRM_ROUTING_KEY = "key1"; @Bean("confirmExchange") public DirectExchange confirmExchange() { return new DirectExchange(CONFIRM_EXCHANGE); } @Bean("confirmQueue") public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE).build(); } @Bean public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs(); } }(3)消息生产者@Slf4j @RestController @RequestMapping("/confirm") public class SendConfirmMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{msg}/{routingKey}") @ApiOperation("发送消息(发布确认)") public void sendMsg(@PathVariable("msg") String msg, @PathVariable("routingKey") String routingKey) { log.info("当前时间:{},发送消息:{},routingKey:{}", new Date(), msg, routingKey); //正常发送 rabbitTemplate.convertAndSend(ConfirmQueueConfig.CONFIRM_EXCHANGE, routingKey, msg, new CorrelationData(UUID.randomUUID().toString())); } }(4)回调接口@Slf4j @Component public class ConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; /** * 注入 */ @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } /** * 交换机确认回调方法 * * @param correlationData correlation data for the callback. 回调消息的ID及相关的信息 * @param ack true for ack, false for nack * @param cause An optional cause, for nack, when available, otherwise null. */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (correlationData != null) { String id = correlationData.getId() != null ? correlationData.getId() : ""; if (ack) { log.info("交换机收到ID:{} 的消息", id); } else { log.error("交换机未收到ID:{} 的消息,原因:{}", id, cause); } } else { log.error("消息接收发生了异常!"); } } /** * 消息在传递过程中不可达目的地时,进行消息回退 * @param message the returned message. * @param replyCode the reply code. * @param replyText the reply text. * @param exchange the exchange. * @param routingKey the routing key. */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("消息:{},被交换机:{}退回,路由key:{},原因:{}", new String(message.getBody()), exchange, routingKey, replyText); } }(5)测试参考未设置消息回退时的测试结果,设置回退后,未匹配routingkey的消息被返回到了队列中3. 备份交换机备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列(1)队列配置@Configuration public class ConfirmQueueConfig { public static final String CONFIRM_EXCHANGE = "confirm.exchange"; public static final String CONFIRM_QUEUE = "confirm.queue"; public static final String CONFIRM_ROUTING_KEY = "key1"; public static final String BACKUP_EXCHANGE = "backup.exchange"; public static final String BACKUP_QUEUE = "backup.queue"; public static final String WARRING_QUEUE = "warring.queue"; @Bean("confirmExchange") public DirectExchange confirmExchange() { return new DirectExchange(CONFIRM_EXCHANGE); } @Bean("confirmQueue") public Queue confirmQueue() { return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true).withArgument("alternate-exchange", BACKUP_EXCHANGE).build(); } @Bean public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs(); } @Bean("backupExchange") public FanoutExchange backupExchange() { return new FanoutExchange(BACKUP_EXCHANGE); } @Bean("backupQueue") public Queue backupQueue() { return QueueBuilder.durable(BACKUP_QUEUE).build(); } @Bean("warringQueue") public Queue warringQueue() { return QueueBuilder.durable(WARRING_QUEUE).build(); } @Bean public Binding queueBindingBackupExchange(@Qualifier("backupQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } @Bean public Binding queueBindingWarringExchange(@Qualifier("warringQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } }(5)报警消费者@Slf4j @Component public class WarringConsumer { @RabbitListener(queues = ConfirmQueueConfig.WARRING_QUEUE) public void receiveWarringMsg(Message message) { String msg = new String(message.getBody()); log.warn("发现不可路由消息:{}", msg); } }(5)生产者、回调接口配置同理(6)测试注:mandatory 参数可与备份交换机同时使用,但备份交换机优先级最高
2023年06月23日
158 阅读
0 评论
0 点赞
2023-06-22
RabbitMQ - 延时队列
1. 概念队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素希望在指定时间到了之后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。2. 使用场景订单在十分钟之内未支付则自动取消新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒用户注册成功后,如果三天内没有登录则进行短信提醒用户发起退款,如果三天内没有得到处理则通知相关运营人员预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议……4. RabbitMQ中的TTLTTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这 条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的 TTL,则优先取较小的值。5. 设置TTL的两种方式(1)消息设置ttlrabbitTemplate.convertAndSend("X", "XC", msg, message -> { //设置ttl message.getMessageProperties().setExpiration(ttlTime); return message; });(2)队列设置ttl//ttl arguments.put("x-message-ttl", 1000 * 10); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();(3)区别消息设置ttl,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外还需要注意的一点是,如果不设置 ttl,表示消息永远不会过期,如果将 ttl设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃队列设置 ttl 时,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队 列中)6. 整合 SpringBootpom.xml<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.5.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.sw.rabbitmq</groupId> <artifactId>RabbitmqDemo01</artifactId> <version>0.0.1-SNAPSHOT</version> <name>RabbitmqDemo01</name> <description>RabbitmqDemo01</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <!-- springboot web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- amqp --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>2.0.15</version> </dependency> <!-- lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!-- swagger2 --> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version> </dependency> <!-- swagger ui --> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.9.2</version> </dependency> <!-- test --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- rabbit test --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>application.ymlserver: port: 8088 spring: rabbitmq: host: 192.168.123.88 port: 5672 username: admin password: 123456swagger config@Configuration @EnableSwagger2 public class SwaggerConfig { @Bean public Docket defaultApi() { return new Docket(DocumentationType.SWAGGER_2) .select() .apis(RequestHandlerSelectors.basePackage("com.sw")) .paths(PathSelectors.any()) .build() .apiInfo(apiInfo()); } private ApiInfo apiInfo() { return new ApiInfo( "rabbitmq", "Rabbitmq-Demo", "1.0.0-SNAPSHOT", null, new Contact("suaxi", "http://www.test.com/", "test@qq.com"), "Apache 2.0", "https://www.apache.org/licenses/LICENSE-2.0.html", Collections.emptyList()); } }7. 队列TTL创建两个队列 QA 和 QB,两者队列 ttl 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交 换机 Y,它们的类型都是 direct,同时创建一个死信队列 QD(1)队列配置@Configuration public class TtlQueueConfig { //普通交换机 public static final String X_EXCHANGE = "X"; //死信交换机 public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; //普通队列 public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; //死信队列 public static final String DEAD_LETTER_QUEUE = "QD"; /** * 普通交换机 * * @return */ @Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); } /** * 死信交换机 * * @return */ @Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } /** * 普通队列A * * @return */ @Bean("queueA") public Queue queueA() { Map<String, Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信routingKey arguments.put("x-dead-letter-routing-key", "YD"); //ttl arguments.put("x-message-ttl", 1000 * 10); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build(); } /** * 普通队列B * * @return */ @Bean("queueB") public Queue queueB() { Map<String, Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信routingKey arguments.put("x-dead-letter-routing-key", "YD"); //ttl arguments.put("x-message-ttl", 1000 * 40); return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build(); } /** * 死信队列 * * @return */ @Bean("queueD") public Queue queueD() { return QueueBuilder.durable(DEAD_LETTER_QUEUE).build(); } /** * 绑定 * * @return */ @Bean public Binding queueABindingX(@Qualifier("queueA") Queue queue, @Qualifier("xExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XA").noargs(); } /** * 绑定 * * @return */ @Bean public Binding queueBBindingX(@Qualifier("queueB") Queue queue, @Qualifier("xExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XB").noargs(); } /** * 绑定 * * @return */ @Bean public Binding queueDBindingY(@Qualifier("queueD") Queue queue, @Qualifier("yExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("YD").noargs(); } }(2)消息生产者@Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{msg}") @ApiOperation("发送消息") public void sendMsg(@PathVariable("msg") String msg) { log.info("当前时间:{},发送一条信息给两个TTL队列:{}", new Date(), msg); rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列" + msg); rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列" + msg); } }(3)消费者@Slf4j @Component public class DeadLetterQueueConsumer { @RabbitListener(queues = "QD") public void receiveD(Message message, Channel channel) { String msg = new String(message.getBody(), StandardCharsets.UTF_8); log.info("当前时间:{},接收到死信队列的消息:{}", new Date(), msg); } }(4)测试消息分别在10s和40s之后变为死信消息,由处理死信队列消息的消费者消费8. 优化延时队列新增一个队列 QC,且不设置 ttl 时间(1)队列配置@Configuration public class TtlQueueConfig { //普通交换机 public static final String X_EXCHANGE = "X"; //死信交换机 public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; //普通队列 public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; public static final String QUEUE_C = "QC"; //死信队列 public static final String DEAD_LETTER_QUEUE = "QD"; /** * 普通交换机 * * @return */ @Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); } /** * 死信交换机 * * @return */ @Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } /** * 普通队列A * * @return */ @Bean("queueA") public Queue queueA() { Map<String, Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信routingKey arguments.put("x-dead-letter-routing-key", "YD"); //ttl arguments.put("x-message-ttl", 1000 * 10); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build(); } /** * 普通队列B * * @return */ @Bean("queueB") public Queue queueB() { Map<String, Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信routingKey arguments.put("x-dead-letter-routing-key", "YD"); //ttl arguments.put("x-message-ttl", 1000 * 40); return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build(); } /** * 死信队列 * * @return */ @Bean("queueD") public Queue queueD() { return QueueBuilder.durable(DEAD_LETTER_QUEUE).build(); } /** * 绑定 * * @return */ @Bean public Binding queueABindingX(@Qualifier("queueA") Queue queue, @Qualifier("xExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XA").noargs(); } /** * 绑定 * * @return */ @Bean public Binding queueBBindingX(@Qualifier("queueB") Queue queue, @Qualifier("xExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XB").noargs(); } /** * 绑定 * * @return */ @Bean public Binding queueDBindingY(@Qualifier("queueD") Queue queue, @Qualifier("yExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("YD").noargs(); } /** * 死信队列C * * @return */ @Bean("queueC") public Queue queueC() { Map<String, Object> arguments = new HashMap<>(2); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信routingKey arguments.put("x-dead-letter-routing-key", "YD"); return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build(); } /** * 绑定 * * @return */ @Bean public Binding queueCBindingY(@Qualifier("queueC") Queue queue, @Qualifier("xExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XC").noargs(); } }(2)消息生产者@Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendExpirationMsg/{msg}/{ttlTime}") @ApiOperation("发送消息(自定义ttl)") public void sendMsg(@PathVariable("msg") String msg, @PathVariable("ttlTime") String ttlTime) { log.info("当前时间:{},发送一条过期时间为:{}ms 的信息给TTL队列:{}", new Date(), ttlTime, msg); rabbitTemplate.convertAndSend("X", "XC", msg, message -> { //设置ttl message.getMessageProperties().setExpiration(ttlTime); return message; }); } }(3)测试两条消息设置了不同的ttl(由生产者发送消息时决定),但 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行9. RabbitMQ插件实现延迟队列如果不能实现在消息粒度上的ttl,并使其在设置的ttl时间及时死亡,就无法设计成一个通用的延时队列。(1)安装延时队列插件通过官网 https://www.rabbitmq.com/community-plugins.html 可下载rabbitmq_delayed_message_exchange 插件,下载完成后解压到 RabbitMQ 的插件目录,进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启rabbitmq-plugins enable rabbitmq_delayed_message_exchange安装成功后开看到 The following plugins have been enabled 字眼(2)新增延时队列、交换机新增一个队列 delayed.queue,一个自定义交换机 delayed.exchange(3)队列配置@Configuration public class DelayQueueConfig { public static final String DELAYED_EXCHANGE = "delayed.exchange"; public static final String DELAYED_QUEUE = "delayed.queue"; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey"; @Bean public CustomExchange delayedExchange() { /** * 参数数目:1.名称 2.类型 3.是否持久化 4.是否自动删除 5.其他参数 */ Map<String, Object> arguments = new HashMap<>(); arguments.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", false, false, arguments); } @Bean public Queue delayedQueue() { return new Queue(DELAYED_QUEUE); } @Bean public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DELAYED_ROUTING_KEY).noargs(); } }(4)消息生产者@Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendDelayMsg/{msg}/{delayTime}") @ApiOperation("发送消息(基于 x-delayed-message 插件)") public void sendMsg(@PathVariable("msg") String msg, @PathVariable("delayTime") Integer delayTime) { log.info("当前时间:{},发送一条过期时间为:{}ms 的信息给delay.queue:{}", new Date(), delayTime, msg); rabbitTemplate.convertAndSend(DelayQueueConfig.DELAYED_EXCHANGE, DelayQueueConfig.DELAYED_ROUTING_KEY, msg, message -> { //设置ttl message.getMessageProperties().setDelay(delayTime); return message; }); } }(5)消费者@Slf4j @Component public class DelayQueueConsumer { @RabbitListener(queues = DelayQueueConfig.DELAYED_QUEUE) public void receiveDelayQueue(Message message) { String msg = new String(message.getBody()); log.info("当前时间:{},收到延迟队列的消息:{}", new Date(), msg); } }(6)测试可看到ttl较小的消息优先被消费了(7)补充延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。延时队列还有很多其它选择,如:Java的DelayQueue,Redis的zset,Quartz任务调度或kafka的时间轮,可根据具体的业务场景进行选择
2023年06月22日
147 阅读
0 评论
0 点赞
2023-06-22
RabbitMQ - 死信队列
无法被消费的消息,即:producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中1. 来源消息 TTL 过期队列达到最大长度(队列满了,无法再添加数据到 mq 中)消息被拒绝(basic.reject 或 basic.nack)并且 requeue = false2. 消息 TTL 过期Producerpublic class Producer { //普通交换机 public static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); //死信消息 AMQP.BasicProperties properties = new AMQP.BasicProperties() .builder().expiration("10000").build(); for (int i = 1; i < 11; i++) { String msg = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, msg.getBytes()); System.out.println("发送的消息是:" + msg); } } }Consumer1 (启动之后关闭,模拟消费者不能正常消息的情况)public class Consumer01 { //普通交换机 public static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机 private static final String DEAD_EXCHANGE = "dead_exchange"; //普通队列 public static final String NORMAL_QUEUE = "normal_queue"; //死信队列 private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); //声明普通/死信交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //声明普通队列 //普通队列绑定死信交换机 Map<String, Object> arguments = new HashMap<>(); //过期时间(过期时间可由生产者或者消费者设置,二选一) arguments.put("x-message-ttl", 10000); //死信交换机 arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); //死信routingKey arguments.put("x-dead-letter-routing-key", "lisi"); channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments); //死信队列 channel.queueDeclare(DEAD_QUEUE, false, false, false, null); //绑定普通交换机 - 队列 channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan"); //绑定死信交换机 - 队列 channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi"); System.out.println("消费者c1等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Consumer01接收的消息是:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> {}); } }Consumer2 处理死信队列的消息public class Consumer02 { //死信队列 private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); System.out.println("消费者c2等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Consumer02接收的(死信)消息是:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> {}); } }3. 队列达到最大长度Peoducerpublic class Producer { //普通交换机 public static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); for (int i = 1; i < 11; i++) { String msg = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, msg.getBytes()); System.out.println("发送的消息是:" + msg); } } }Consumer1public class Consumer01 { //普通交换机 public static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机 private static final String DEAD_EXCHANGE = "dead_exchange"; //普通队列 public static final String NORMAL_QUEUE = "normal_queue"; //死信队列 private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); //声明普通/死信交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //声明普通队列 //普通队列绑定死信交换机 Map<String, Object> arguments = new HashMap<>(); //死信交换机 arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); //死信routingKey arguments.put("x-dead-letter-routing-key", "lisi"); //队列长度 arguments.put("x-max-length", 6); channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments); //死信队列 channel.queueDeclare(DEAD_QUEUE, false, false, false, null); //绑定普通交换机 - 队列 channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan"); //绑定死信交换机 - 队列 channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi"); System.out.println("消费者c3等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Consumer03接收的消息是:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> {}); } }注:添加 arguments.put("x-max-length", 6) 参数Consumer2 处理死信队列的消息public class Consumer02 { //死信队列 private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); System.out.println("消费者c2等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Consumer02接收的(死信)消息是:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> {}); } }4. 消息被拒绝Producer 同理 队列达到最大长度 的生产者Consumerpublic class Consumer04 { //普通交换机 public static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机 private static final String DEAD_EXCHANGE = "dead_exchange"; //普通队列 public static final String NORMAL_QUEUE = "normal_queue"; //死信队列 private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); //声明普通/死信交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //声明普通队列 //普通队列绑定死信交换机 Map<String, Object> arguments = new HashMap<>(); //死信交换机 arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); //死信routingKey arguments.put("x-dead-letter-routing-key", "lisi"); channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments); //死信队列 channel.queueDeclare(DEAD_QUEUE, false, false, false, null); //绑定普通交换机 - 队列 channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan"); //绑定死信交换机 - 队列 channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi"); System.out.println("消费者c4等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { if ("info5".equals(new String(message.getBody(), StandardCharsets.UTF_8))) { System.out.println("Consumer04拒绝此消息:" + new String(message.getBody(), StandardCharsets.UTF_8)); channel.basicReject(message.getEnvelope().getDeliveryTag(), false); } else { System.out.println("Consumer04接收的消息是:" + new String(message.getBody(), StandardCharsets.UTF_8)); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {}); } }启动生产者启动消费者启动处理死信队列消息的消费者(该消费者同理前三个处理死信队列消息的消费者)
2023年06月22日
140 阅读
0 评论
0 点赞
2023-06-20
RabbitMQ - 交换机
1. Exchanges(1)概念RabbitMQ 消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。实际上,生产者甚至都不知道这些消息传递传递到了哪些队列中。 相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息,是应该把这些消息放到特定队列还是说把他们放到许多队列中,亦或是应该丢弃它们,这就得由交换机的类型来决定。(2)交换机类型直接(direct)主题(topic)标题(headers)扇出(fanout)(3)无名exchangechannel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());之前的demo中生产者发送消息时未设置交换机,即第一个参数为空,表示使用的是默认交换机(又称无名交换机)2. 临时队列在之前的demo中使用的都是临时队列,一旦断开了消费者的连接,队列将被自动删除创建临时队列的方式:String queue = channel.queueDeclare().getQueue();3. 绑定binding 是 exchange 和 queue 之间的桥梁,即 exchange 和哪个队列进行了绑定4. Fanout(1)介绍将接收到的所有消息广播到它知道的所有队列中,RabbitMQ默认的交换机如下:(2)DemoProducerpublic class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String msg = scanner.next(); channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes(StandardCharsets.UTF_8)); System.out.println("发送的消息是:" + msg); } } }Consumerpublic class ReceiveLog01 { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //生成临时队列 String queueName = channel.queueDeclare().getQueue(); //绑定临时队列,其中routingKey为空字符串 channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("ReceiveLog01等待接收消息,接收到消息后在控制台打印..."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String msg = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("接收到的消息是:" + msg); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } public class ReceiveLog02 { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //生成临时队列 String queueName = channel.queueDeclare().getQueue(); //绑定临时队列,其中routingKey为空字符串 channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("ReceiveLog02等待接收消息,接收到消息后在控制台打印..."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String msg = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("接收到的消息是:" + msg); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }5. Direct(1)介绍Direct Exchange:消息只去到它绑定的 routingKey 队列中Fanout模式下每一个消费者都收到了消息,此处使用 Direct 模式,让消息根据 routing key 去到指定的地方,如下图:X绑定了Q1,Q2两个队列,绑定类型为direct,队列Q1绑定的orange,Q2绑定了两个key:black、green。在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1,绑定键为 black/green 的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。注:如果绑定的key都相同,则该模式与 Fanout 模式类似,变成了另一种形式的广播(2)DemoProducerpublic class DirectLog { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String msg = scanner.next(); channel.basicPublish(EXCHANGE_NAME, "info", null, msg.getBytes(StandardCharsets.UTF_8)); System.out.println("发送的消息是:" + msg); } } }Consumerpublic class ReceiveLogDirect01 { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare("console", false, false, false, null); channel.queueBind("console", EXCHANGE_NAME, "info"); channel.queueBind("console", EXCHANGE_NAME, "warning"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String msg = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("接收到的消息是:" + msg); }; channel.basicConsume("console", true, deliverCallback, consumerTag -> { }); } } public class ReceiveLogDirect02 { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare("disk", false, false, false, null); channel.queueBind("console", EXCHANGE_NAME, "error"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String msg = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("接收到的消息是:" + msg); }; channel.basicConsume("disk", true, deliverCallback, consumerTag -> { }); } }6. Topics(1)介绍发送到 topic 类型交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以 . 点号分隔开。这些单词可以是任意单词,且不能超过 255 个字节。规则列表中常用的替换符:*(星号) 可以代替一个单词#(井号) 可以替代零个或多个单词(2)举例下图绑定关系如下:Q1绑定的是中间带 orange ,单词总数为 3 个的字符串 *.orange.*Q2:绑定的是最后一个单词是 rabbit 且单词总数为 3 个的单词 *.*.rabbit第一个单词是 lazy 的多个单词 lazy.#以下是消息接收情况:quick.orange.rabbit 被队列 Q1Q2 接收到lazy.orange.elephant 被队列 Q1Q2 接收到quick.orange.fox 被队列 Q1 接收到lazy.brown.fox 被队列 Q2 接收到lazy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃lazy.orange.male.rabbit 是四个单词但匹配 Q2注:当一个队列绑定键是 #,那么这个队列将接收所有数据,类似于 fanout;如果队列绑定键中没有 # 和 * 出现,那么该队列绑定类型就是 direct (固定的routing key)(3)DemoProducerpublic class TopicLog { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); Map<String, String> map = new HashMap<>(); map.put(" quick.orange.rabbit", "被队列Q1Q2接收到"); map.put("lazy.orange.elephant", "被队列Q1Q2接收到"); map.put("quick.orange.fox", "被队列Q1接收到"); map.put(" lazy.brown.fox", "被队列Q2接收到"); map.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列Q2接收一次"); map.put("quick. brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃"); map.put(" quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃"); map.put("lazy.orange.male.rabbit", "是四个单词但匹配Q2"); for (Map.Entry<String, String> entry : map.entrySet()) { channel.basicPublish(EXCHANGE_NAME, entry.getKey(), null, entry.getValue().getBytes(StandardCharsets.UTF_8)); System.out.println("发送的消息是:" + entry.getValue()); } } }Consumerpublic class ReceiveLogTopic01 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); channel.queueDeclare("Q1", false, false, false, null); channel.queueBind("Q1", EXCHANGE_NAME, "*.orange.*"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String msg = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("接收到的消息是:" + msg + ",routingKey:" + delivery.getEnvelope().getRoutingKey()); }; channel.basicConsume("Q1", true, deliverCallback, consumerTag -> { }); } } public class ReceiveLogTopic02 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); channel.queueDeclare("Q2", false, false, false, null); channel.queueBind("Q2", EXCHANGE_NAME, "*.*.rabbit"); channel.queueBind("Q2", EXCHANGE_NAME, "lazy.#"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String msg = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("接收到的消息是:" + msg + ",routingKey:" + delivery.getEnvelope().getRoutingKey()); }; channel.basicConsume("Q2", true, deliverCallback, consumerTag -> { }); } }
2023年06月20日
114 阅读
0 评论
0 点赞
2023-06-20
RabbitMQ - 发布确认
1. 发布确认原理生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的 消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。注:confirm 模式是异步的2. 发布确认策略(1)开启发布确认发布确认默认未开启,手动开启//开启发布确认 channel.confirmSelect();(2)单个确认发布是一种同步确认发布的方式,即发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long) 这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量,但对于某些应用程序来说这可能已经足够了。public static void publishMessageIndividually() throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); //开始时间 long startTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = i + ""; channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); //单个确认 boolean flag = channel.waitForConfirms(); if (flag) { System.out.println("第" + i + "条消息发送成功"); } } //开始时间 long endTime = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条数据(单个确认模式)耗时:" + (endTime - startTime)); }(3)批量发布确认先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现了问题,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息,该方案也是同步的,也一样阻塞消息的发布。public static void publishMessageBatch() throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); int batchSize = 100; //开始时间 long startTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = i + ""; channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); //批量确认 if (i % batchSize == 0) { boolean flag = channel.waitForConfirms(); if (flag) { System.out.println(i + "条消息发送成功"); } } } //开始时间 long endTime = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条数据(批量确认模式)耗时:" + (endTime - startTime)); }(4)异步确认发布利用回调函数来达到消息可靠性的传递public static void publishMessageAsync() throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); //使用线程安全的哈希表记录消息 ConcurrentSkipListMap<Long, Object> outstandingConfirms = new ConcurrentSkipListMap<>(); //确认成功 回调 ConfirmCallback ackCallback = (deliveryTag, multiple) -> { //2.删除已确认的消息,即剩下的就是确认失败的消息 if (multiple) { outstandingConfirms.headMap(deliveryTag).clear(); } else { outstandingConfirms.remove(deliveryTag); } System.out.println("确认的消息:" + deliveryTag); }; //确认失败 回调 ConfirmCallback nackCallback = (deliveryTag, multiple) -> { //3.打印未确认的消息 Object msg = outstandingConfirms.get(deliveryTag); System.out.println("未确认的消息tag:" + deliveryTag + ",未确认的消息内容:" + msg.toString()); }; //消息监听器 channel.addConfirmListener(ackCallback, nackCallback); //开始时间 long startTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = i + ""; channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); //1.记录已发送的消息 outstandingConfirms.put(channel.getNextPublishSeqNo(), msg); } //开始时间 long endTime = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条数据(异步确认模式)耗时:" + (endTime - startTime)); }(5)处理异步未确认消息把未确认的消息放到一个基于内存的且能被发布线程访问的队列,如:用 ConcurrentLinkedQueue 队列在 confirm callbacks 与发布线程之间进行消息的传递。(6)三种方式对比单独发布消息:同步等待确认,简单,但吞吐量非常有限批量发布消息:批量同步等待确认,简单,合理的吞吐量,但出现问题后很难判断是那条消息出现了问题异步处理: 最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来较为复杂
2023年06月20日
131 阅读
0 评论
0 点赞
1
...
18
19
20
...
58