当前位置: 首页 > news >正文

资讯是做网站还是公众号郑州网络营销与网站推广

资讯是做网站还是公众号,郑州网络营销与网站推广,策划公司名称大全集最新,wordpress 二级导航制作目录 1.TTL 1.1.设置消息过期时间 1.2.设置队列过期时间 2.死信队列 2.1.介绍 2.2.演示 3.延迟队列 3.1.模拟实现延迟队列 3.2.延迟队列插件 4.事务与消息分发 4.1.事务 4.2.消息分发 1.TTL 所谓的ttl,就是过期时间。对于rabbitmq,可以设置…

目录

1.TTL

1.1.设置消息过期时间

1.2.设置队列过期时间

2.死信队列

2.1.介绍

2.2.演示

3.延迟队列

3.1.模拟实现延迟队列

3.2.延迟队列插件

4.事务与消息分发

4.1.事务 

4.2.消息分发


1.TTL

所谓的ttl,就是过期时间。对于rabbitmq,可以设置队列和消息的过期时间

1.1.设置消息过期时间

(1)相关的交换机和队列

//ttl
@Bean("ttlQueue")
public Queue ttlQueue() {return QueueBuilder.durable(Constant.TTL_QUEUE).build();
}@Bean("ttlDirectExchange")
public DirectExchange ttlDirectExchange() {return ExchangeBuilder.directExchange(Constant.TTL_EXCHANGE).build();
}@Bean("ttlBinding")
public Binding ttlBinding(@Qualifier("ttlQueue") Queue queue,@Qualifier("ttlDirectExchange") Exchange exchange ) {return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();
}

(2)生产者 -- 生产消息的时候设置ttl

给消息设置过期时间,也就是设置消息对象的属性

第一种写法:

    //ttl@RequestMapping("/ttl")public String ttl() {MessagePostProcessor postProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("10000");//设置10秒后过期return message;}};rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE,"ttl","a ttl test",postProcessor);return "ttl";}

第二种写法:

//ttl
@RequestMapping("/ttl")
public String ttl() {rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE,"ttl","a ttl test",message -> {message.getMessageProperties().setExpiration("10000");//设置10秒后过期return message;});return "ttl";
}

(3)效果展示

消息在10s后就会自动删除 (本人承诺没有做任何的处理)

(4)设置消息ttl注意事项

 设置消息TTL,即时消息过期,也不会马上从队列中删除,而是在即将投递到消费者之前才会进行判定。如果每次都要扫描整个队列就会很低效。

给A消息设置30s过期,B消息设置10s过期,先将消息A存入队列再存B消息,此时B消息30s后才会被删除。

以上的两条消息是同时消失。

1.2.设置队列过期时间

给队列设置TTL,指的是队列中的消息在TTL后就会被删除,而非队列被删除。

(1)设置交换机和队列

代码给队列设置ttl,在创建队列中调用 ttl() 方法即可

//队列ttl
@Bean("ttlQueue2")
public Queue ttlQueue2() {return QueueBuilder.durable(Constant.TTL_QUEUE2).ttl(20 * 1000).build();//队列中的消息20s后被删除
}@Bean("ttlDirectExchange2")
public DirectExchange ttlDirectExchange2() {return ExchangeBuilder.directExchange(Constant.TTL_EXCHANGE).build();
}@Bean("ttlBinding2")
public Binding ttlBinding2(@Qualifier("ttlQueue") Queue queue,@Qualifier("ttlDirectExchange") Exchange exchange ) {return BindingBuilder.bind(queue).to(exchange).with("ttl2").noargs();
}

(2)生产者与效果

@RequestMapping("/ttl2")
public String ttl2() {rabbitTemplate.convertAndSend(Constant.TTL_EXCHANGE,"ttl2","a ttl2");return "ttl2";
}

给队列设置ttl,在web界面中就会有改效果出现。


如果把设置有ttl的消息发送到设置有ttl的队列中,那么过期时间取值小的一个。

2.死信队列

  • 所谓死信,就是因为种种原因无法被消费的消息。
  • 所以死信队列,就是用来存放死信的队列。
  • 死信到达死信队列的交换机称为DLX(Dead Letter Exchange)
2.1.介绍

(1)死信队列的图解

正常队列中的消息因为一些原因就会变成死信,然后经过一个特定的路由交换,最后到达一个指定的死信队列中,然后再投递给消费者。

(2)消息称为死信的原因

  • 消息被拒绝,且设置了无法入队
  • 消息过期
  • 队列达到最大长度
2.2.演示

要演示死信队列的情况,就需要有两种队列和两种交换机。

(1)声明交换机和队列

@Configuration
public class DlConfig {//正常的交换机和队列@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constant.NORMAL_QUEUE).deadLetterExchange(Constant.DL_EXCHANGE)//绑定死信交换机.deadLetterRoutingKey("dlx")//死信交换机的路由规则.ttl(10000)//10秒后消息过期.maxLength(10L)//队列最大长度为10.build();}@Bean("normalExchange")public DirectExchange normalExchange() {return ExchangeBuilder.directExchange(Constant.NORMAL_EXCHANGE).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalExchange") DirectExchange normalExchange, @Qualifier("normalQueue")Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with("normal");}//死信交换机和队列@Bean("dlxQueue")public Queue dlxQueue() {return QueueBuilder.durable(Constant.DL_QUEUE).build();}@Bean("dlxExchange")public DirectExchange dlxExchange() {return ExchangeBuilder.directExchange(Constant.DL_EXCHANGE).build();}@Bean("dlxBinding")public Binding dlxBinding(@Qualifier("dlxExchange") DirectExchange dlxExchange, @Qualifier("dlxQueue")Queue dlxQueue) {return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("dlx");}}

上面我们知道了消息称为死信的条件,其中消息过期和超过队列最大长度可以在声明队列时实现。所以,有如下的代码改进

//正常的交换机和队列
@Bean("normalQueue")
public Queue normalQueue() {return QueueBuilder.durable(Constant.NORMAL_QUEUE).deadLetterExchange(Constant.DL_EXCHANGE)//绑定死信交换机.deadLetterRoutingKey("dlx")//死信交换机的路由规则.ttl(10000)//10秒后消息过期.maxLength(10L)//队列最大长度为10.build();
}

对于消息被拒,我们在消费者部分进行修改就好。

(2)生产者和消费者

生产者:可以选择模拟超出队列最大长度的情况

//dlx死信队列
@RequestMapping("/dlx")
public String dlx() {int maxLen = 12;for(int i=0;i<maxLen;i++) {rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal","normal and dlx");}return "normal";
}

消费者:

@Component
public class DlListener {//监听正常队列,模拟拒绝消息//@RabbitListener(queues = Constant.NORMAL_QUEUE)public void normalListener() {System.out.println();}//监听死信队列@RabbitListener(queues = Constant.DL_QUEUE)public void dlxListener(Message message) {System.out.println("死信队列:"+ new String(message.getBody()));}}

队列就变成了这样子: 

(3)过期消息变成死信

(4)其他情况

比如拒绝消息和长度超过最大队列长度

3.延迟队列

延迟队列,指消息发送到队列后,消费者并不能马上拿到消息,而是等待指定的时间后才能消息该消息。

应用场景:

(1)智能家居:比如通过手机下达命令控制家里的家居,达到一定时间段就自动开启。

(2)日常管理:预定一个会议,在会议开始前15分钟就会通知参加人员

比如,我们经常使用的手机闹钟,就是类似于延迟队列的效果。

3.1.模拟实现延迟队列

对于原生的rabbitmq,并没有实现延迟队列的功能,但是我们可以通过TTL+死信队列来模拟实现。

(1)如何模拟实现

消费者需要订阅死信队列,生产者把延迟的消息放入正常队列中,当消息过期就会自动进入死信队列,消费者进而可以拿到消息。

对于TTL,我们是设置消息的TTL,也可以设置队列的过期时间。

(2)模拟实现

死信队列:

@Configuration
public class DlConfig {//正常的交换机和队列@Bean("normalQueue")public Queue normalQueue() {return QueueBuilder.durable(Constant.NORMAL_QUEUE).deadLetterExchange(Constant.DL_EXCHANGE)//绑定死信交换机.deadLetterRoutingKey("dlx")//死信交换机的路由规则.build();}@Bean("normalExchange")public DirectExchange normalExchange() {return ExchangeBuilder.directExchange(Constant.NORMAL_EXCHANGE).build();}@Bean("normalBinding")public Binding normalBinding(@Qualifier("normalExchange") DirectExchange normalExchange, @Qualifier("normalQueue")Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with("normal");}//死信交换机和队列@Bean("dlxQueue")public Queue dlxQueue() {return QueueBuilder.durable(Constant.DL_QUEUE).build();}@Bean("dlxExchange")public DirectExchange dlxExchange() {return ExchangeBuilder.directExchange(Constant.DL_EXCHANGE).build();}@Bean("dlxBinding")public Binding dlxBinding(@Qualifier("dlxExchange") DirectExchange dlxExchange, @Qualifier("dlxQueue")Queue dlxQueue) {return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("dlx");}}

生产者:生产者在生产消息的时候加上过期时间,也就是TTL

@RequestMapping("/delay")
public String delay() {rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal","这是一条延迟消息",message -> {message.getMessageProperties().setExpiration("10000");//设置10秒后过期return message;});System.out.println("消息发送时间:"+new Date());return "delay";
}

消费者:消费者订阅死信队列

  //监听死信队列@RabbitListener(queues = Constant.DL_QUEUE)public void dlxListener(Message message) {System.out.println("消费时间: "+new Date() +",死信队列:"+ new String(message.getBody()));}

演示结果:

结果恰好是10秒后。

(3)存在的缺陷

前面我们知道,当多个携带TTL的消息进入队列中,并且前面消息的TTL大于后面的;那么就会出现,只有前面的消息过期,后面的消息才会跟着过期,这就是TTL+私信队列存在的问题。

所以我们使用一个插件,使用插件带来的延迟队列进行操作。

3.2.延迟队列插件

(1)下载插件并启用

下载地址:这个页面如果点不开,可以使用加速软件加速

Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub

找到.ez文件:点击就会下载插件文件

确定安装目录:

像ubunto的就使用这两个目录的其中一个即可:

将文件复制到目录下:没有目录就创建

找到目录

安装插件前:

安装插件:直接将文件拖拽进来即可

安装插件后:多出来的插件

启动插件:

最后在rabbitmq客户端查看交换机类型

这就说明延迟插件启动成功,后续使用该交换机即可。

(2)定义交换机和队列


@Configuration
public class DelayConfig {@Bean("delayQueue")public Queue delayQueue() {return QueueBuilder.durable(Constant.DELAY_QUEUE).build();}@Bean("delayExchange")public Exchange delayExchange() {return ExchangeBuilder.directExchange(Constant.DELAY_EXCHANGE).delayed().build();}@Bean("delayBinding")public Binding delayBinding(@Qualifier("delayQueue") Queue delayQueue,@Qualifier("delayExchange") Exchange delayExchange) {return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay").noargs();}}

生产者:

@RequestMapping("/delay2")
public String delay2() {rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE,"normal","这是一条延迟消息",message -> {message.getMessageProperties().setExpiration("10000");//设置10秒后过期return message;});System.out.println("消息发送时间:"+new Date());return "delay";
}

消费者:

@Component
public class DelayListener {@RabbitListener(queues = Constant.DELAY_QUEUE)public void delayListener() {System.out.println("消息时间:"+new Date());}}

(3)演示

使用延迟插件就不会出现像上述TTL+死信队列的问题

如果需要关闭插件,执行下面的命令即可:

rabbitmq delayed message exchange

4.事务与消息分发

4.1.事务 

 事务,就是保证发送消息和接收消息是原子性的,要么全部成功,要么全部失败

(1)配置事务管理器

这里就是需要对AMQP客户端进行设置属性

//3.返回事务
@Bean("transRabbitTemplate")
public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);//true为开启事务return rabbitTemplate;
}@Beanpublic RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory){return new RabbitTransactionManager(connectionFactory);//事务管理器}

后续使用该对象就可以完成事务的操作

(2)准备队列和交换机

这里使用系统默认的交换机即可

   //事务@Bean("transQueue")public Queue transQueue() {return QueueBuilder.durable(Constant.TRANS_QUEUE).build();}

(3)消费者

    @RequestMapping("/trans")public String trans() {rabbitTemplate.convertAndSend("",Constant.TRANS_QUEUE,"第一条消息");System.out.println("异常前");int a=9/0;//模拟发送异常rabbitTemplate.convertAndSend("",Constant.TRANS_QUEUE,"第二条消息");return "trans";}

上面的消费者是没有使用事物的

(4)没有采取事务

这里指的是既没有开启事务,也没有在方法上加上@Transactional注解

运行结果:

异常前成功发送消息,异常后的消息没有进行发送成功。

(5)使用事务

    @Transactional@RequestMapping("/trans")public String trans() {transRabbitTemplate.convertAndSend("",Constant.TRANS_QUEUE,"第一条消息");System.out.println("异常前");int a=9/0;//模拟发送异常transRabbitTemplate.convertAndSend("",Constant.TRANS_QUEUE,"第二条消息");return "trans";}

这个时候发送了异常,队列中也是一条消息都没有的。

(6)事务小结

要完成一个事务的操作,这三个操作都不能少

配置对象和事务管理器:

@Bean("transRabbitTemplate")
public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;
}@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {return new RabbitTransactionManager(connectionFactory);
}

加上@Transactional注解:

@Transactional
@RequestMapping("/trans")
public String trans() {transRabbitTemplate.convertAndSend("",Constant.TRANS_QUEUE,"第一条消息");//int a=9/0;//模拟发送异常transRabbitTemplate.convertAndSend("",Constant.TRANS_QUEUE,"第二条消息");return "trans";
}

还有一个注意事项,使用事务,最好把消息发送确认模式关闭

4.2.消息分发

(1)定义

多个消费者订阅同一个队列时,队列会轮询给消费者分发消息,每个消费者平均每分钟拿到的消息数目是一样的,这种情况看似挺好的,但是容易出现问题。

当每个消费者的消费能力不一样时,消费速度慢的,消息就会积压;而消费速度快的消费者,就会空闲,进而影响整体的吞吐量。

所以就有了消息分发,按照一定的规则,平均每分钟给不同的消费者分发不同数量的消息。

对于消息分发,有两个应用场景 -- 限流和非公平分发

(2)限流

消费者每次只能拿到一定数量的消息,只有消费并且确认后,才能继续拿到消息。所以需要配置成手动确认模式和限流参数

1)配置

2)相应代码

交换机和队列:

@Configuration
public class QosConfig {@Bean("qosQueue")public Queue qosQueue() {return QueueBuilder.durable(Constant.QOS_QUEUE).build();}@Bean("qosExchange")public Exchange qosExchange() {return ExchangeBuilder.directExchange(Constant.QOS_EXCHANGE).delayed().build();}@Bean("qosBinding")public Binding qosBinding(@Qualifier("qosQueue") Queue delayQueue, @Qualifier("qosExchange") Exchange delayExchange) {return BindingBuilder.bind(delayQueue).to(delayExchange).with("qos").noargs();}}

生产者:

@RequestMapping("/qos")
public String Qos() {for(int i=0;i<20;i++) {rabbitTemplate.convertAndSend(Constant.QOS_EXCHANGE,"qos","a qos test"+i);}return "qos";
}

消费者:

@Component
public class QosListener {@RabbitListener(queues = Constant.QOS_QUEUE)public void qosListener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("消费:"+new String(message.getBody()));//channel.basicAck(deliveryTag, false); //不进行消息确认}catch (Exception e){channel.basicNack(deliveryTag, false, false);}}
}

3)演示

一下子往队列中发送20条消息,但是消费者一下子只能拿到5条消息

但是没有确认,就只有五条消息,也拿不到后续的消息。

(3)负载均衡

模拟实现负载均衡,可以把限流参数修改成1,消费确认完成一条消息才能继续拿。

后续代码跟上述是差不多了

http://www.yidumall.com/news/100883.html

相关文章:

  • 怎么优化一个网站南昌seo教程
  • 做类似猪八戒网的网站seo和网络推广有什么区别
  • 网课编程品牌seo推广
  • wordpress设置登录和跳转到主页网站优化外包多少钱
  • 网站正在建设中手机版培训心得体会800字
  • wordpress点评站信息流广告推广
  • 两学一做 知识竞赛网站什么叫做关键词
  • 嘉兴建站模板系统郑州关键词排名公司电话
  • 给个龙做罗拉的网站南宁最新消息今天
  • wordpress需要登录济南网站优化公司排名
  • 阳谷做网站广州百度推广开户
  • 怎么免费建立一个网站今日军事新闻热点事件
  • 一个新网站关键词怎么做SEO优化5月新冠病毒最新消息
  • 做一个公司的网站怎么做呢快速优化seo
  • 外国ps修图网站黄页推广平台有哪些
  • wordpress 网站加载过慢6艾滋病多久能检查出来
  • 江苏商城网站制作公司关键词首页优化
  • 平面设计最好的网站网上国网app
  • 郑州外贸建站平台关键词排名优化
  • 做网站第二年要续费吗关键词排名软件官网
  • 绿色家园网站怎么做seo流量优化
  • 怎么网上推广自己的产品seo接单一个月能赚多少钱
  • 低成本做网站 白之家友情链接交换源码
  • 飞天云服务器杭州seo搜索引擎优化
  • 做免费互动小游戏的网站网络营销的主要方法
  • 蚌埠网站建设文章微信群推广
  • 网站公安备案流程10条重大新闻事件
  • 外贸跨境电商平台有哪些上海还能推seo吗
  • 做自己的网站的一般步骤今天新闻
  • 拱墅网站建设今日新闻消息