Qos(Quality of Service,服务质量)概念:

当网络发生拥塞的时候,所有的数据流都有可能被丢弃;为满足用户对不同应用不同服务质量的要求,就需要网络能根据用户的要求分配和调度资源,对不同的数据流提供不同的服务质量:对实时性强且重要的数据报文优先处理;对于实时性不强的普通数据报文,提供较低的处理优先级,网络拥塞时甚至丢弃。QoS应运而生。支持QoS功能的设备,能够提供传输品质服务;针对某种类别的数据流,可以为它赋予某个级别的传输优先级,来标识它的相对重要性,并使用设备所提供的各种优先级转发策略、拥塞避免等机制为这些数据流提供特殊的传输服务。配置了QoS的网络环境,增加了网络性能的可预知性,并能够有效地分配网络带宽,更加合理地利用网络资源。

为什么要设置Qos:

在RabbitMQ中,队列向消费者发送消息,如果没有设置Qos的值,那么队列中有多少消息就发送多少消息给消费者,完全不管消费者是否能够消费完,这样可能就会形成大量未ack的消息在缓存区堆积,因为这些消息未收到消费者发送的ack,所以只能暂时存储在缓存区中,等待ack,然后删除对应消息。这样的话,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用
basic.qos 方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量,一旦数量达到配置的数量,RabbitMQ
将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时
RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ
将会感知这个情况到并再发送一条消息。消息应答和 QoS 预取值对用户吞吐量有重大影响。通常,增加预取将提高向消费者传递消息的速度。
虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的RAM
消耗(随机存取存储器)应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同
100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1
是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的。

Qos的取值问题:

在传输效率和消费者消费速度之间做一个平衡。这个值是需要不断尝试的,因为太低,信道传输消息效率太低,如果太高
,消费者来不及确认消息导致消息积累问题,内存消耗不断增大。

<>不公平分发

概念:
如果采用默认消息分发策略,消息是轮询发送的。但是消费者之前存在处理快慢问题,如果A处理慢,B处理快,他们接受同样数量的消息显然是不合理的。
引出不公平分发:
就是在这样情况下,不公平分发出现了,简而言之就是能者多劳,处理快的多处理,处理慢的少处理。
如何实现不公平分发:

那么如何实现呢?上面介绍了basicQos,如果我们将qos的值设为1,那么你想一想会出现什么情况呢?信道中只允许传输一条消息,那么当这条消息处理完后,队列会立马发送下一条消息,所以这个时候快的不断处理,慢的等待当前处理完再处理下一条。这样就实现了能者多劳。

代码实现:
生产者:
public class Producer { public static final String QUEUE_NAME =
"test_basic_qos"; public static final String EXCHANGE_NAME = "test_basic_qos";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME,
false,false,false,null); channel.exchangeDeclare(EXCHANGE_NAME,
BuiltinExchangeType.DIRECT); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,
"basic.qos"); Scanner scanner = new Scanner(System.in); while(scanner.hasNext())
{ String message = scanner.next(); System.out.println("发送消息为:" + message);
channel.basicPublish(EXCHANGE_NAME,"basic.qos",null,message.getBytes(
StandardCharsets.UTF_8)); } } }
C1(快的消费者):
public class Consumer02 { public static final String QUEUE_NAME =
"test_basic_qos"; public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback =
(consumerTag, message) -> { try { SleepUtils.sleep(2); } catch (
InterruptedException e) { e.printStackTrace(); } System.out.println("高性能服务器接受:"
+ new String(message.getBody())); channel.basicAck(message.getEnvelope().
getDeliveryTag(),false); }; CancelCallback cancelCallback = consumerTag -> {};
channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback); } }
C2(慢的消费者):
public class Consumer01 { public static final String QUEUE_NAME =
"test_basic_qos"; public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel(); channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag,message) -> { try { SleepUtils.
sleep(30); } catch (InterruptedException e) { e.printStackTrace(); } System.out.
println("低性能服务器接受:" + new String(message.getBody())); channel.basicAck(message.
getEnvelope().getDeliveryTag(),true); }; CancelCallback cancelCallback =
consumerTag-> {}; channel.basicConsume(QUEUE_NAME,false,deliverCallback,
cancelCallback); } }
结果:

<>预取值

概念:
设置消费者信道最大传输信息数。
测试:

我们将慢的消费者preCount取值为5,快的消费者预取值为2,然后发送7条消息(为了保证快的消费者只处理2条,我们要在2s内能发送7条数据,这样保证后面的消息全部发送给慢的消费者,避免快的消费者处理完了消息,又将发送后续消息。)
代码:
参考上面代码,只是修改了qos值。
结果:

分析:
因为快的消费者信道满了,不能再发送消息,所以消息只能发送给慢的服务器,这就是basicQos用法。

技术
下载桌面版
GitHub
Gitee
SourceForge
百度网盘(提取码:draw)
云服务器优惠
华为云优惠券
腾讯云优惠券
阿里云优惠券
Vultr优惠券
站点信息
问题反馈
邮箱:[email protected]
吐槽一下
QQ群:766591547
关注微信