1. 生产者TCP链接管理
1.1 Kafka生产者程序概览
Kafka 的 Java 生产者 API 主要的对象就是 KafkaProducer。通常我们开发一个生产者的步骤有 4 步。
* 构造生产者对象所需的参数对象。
* 利用第 1 步的参数对象,创建 KafkaProducer 对象实例。
* 使用 KafkaProducer 的 send 方法发送消息。
* 调用 KafkaProducer 的 close 方法关闭生产者并释放各种系统资源。
上面这 4 步写成 Java 代码的话大概是这个样子:
Properties props = new Properties (); props.put(“参数1”, “参数1的值”);
props.put(“参数2”, “参数2的值”); …… try (Producer producer = new
KafkaProducer<>(props)) { producer.send(new ProducerRecord(……), callback); …… }
1.2 TCP链接管理
1.2.1 创建时机
生产者有三种情况可能创建TCP链接
1. 创建KafkaProducer实例时
在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线程开始运行时首先会创建与
Broker 的连接。
2. 更新元数据后
* 当 Producer 尝试给一个不存在的主题发送消息时,Broker 会告诉 Producer 说这个主题不存在。此时 Producer 会发送
METADATA 请求给 Kafka 集群,去尝试获取最新的元数据信息。
* Producer 通过 metadata.max.age.ms 参数定期地去更新元数据信息。该参数的默认值是 300000,即 5
分钟,也就是说不管集群那边是否有变化,Producer 每 5 分钟都会强制刷新一次元数据以保证它是最及时的数据。
3. 当要发送消息时
当要发送消息时,Producer 发现尚不存在与目标 Broker 的连接,也会创建一个。
1.2.2 关闭时机
Producer端关闭TCP链接有两种方式:一种是用户主动关闭;一种是 Kafka 自动关闭。
1. 主动关闭
这里的主动关闭实际上是广义的主动关闭,甚至包括用户调用 kill -9 主动“杀掉”Producer 应用。当然最推荐的方式还是调用
producer.close() 方法来关闭。
2. kafka自动关闭
这与 Producer 端参数 connections.max.idle.ms 的值有关。默认情况下该参数值是 9 分钟,即如果在 9
分钟内没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭。用户可以在 Producer 端设置
connections.max.idle.ms=-1 禁掉这种机制。一旦被设置成 -1,TCP 连接将成为永久长连接。当然这只是软件层面的“长连接”机制,由于
Kafka 创建的这些 Socket 连接都开启了 keepalive,因此 keepalive 探活机制还是会遵守的。
1.2.3 总结
* KafkaProducer 实例创建时启动 Sender 线程,从而创建与 bootstrap.servers 中所有 Broker 的 TCP 连接。
* KafkaProducer 实例首次更新元数据信息之后,还会再次创建与集群中所有 Broker 的 TCP 连接。
* 如果 Producer 端发送消息到某台 Broker 时发现没有与该 Broker 的 TCP 连接,那么也会立即创建连接。
* 如果设置 Producer 端 connections.max.idle.ms 参数大于 0,则步骤 1 中创建的 TCP
连接会被自动关闭;如果设置该参数 =-1,那么步骤 1 中创建的 TCP 连接将无法被关闭,从而成为“僵尸”连接。
2. 消费者TCP链接管理
2.1 创建时机
和生产者不同的是,构建 KafkaConsumer 实例时是不会创建任何 TCP 连接的。TCP 连接是在调用 KafkaConsumer.poll
方法时被创建的。再细粒度地说,在 poll 方法内部有 3 个时机可以创建 TCP 连接。
* 发起 FindCoordinator 请求时(往集群中负载最小的那台Broker发送)。
* 连接协调者时。
* 消费数据时。
2.2 创建的连接数量
消费者程序会创建 3 类 TCP 连接:
* 确定协调者和获取集群元数据。
* 连接协调者,令其执行组成员管理操作。
* 执行实际的消息获取。
2.3 关闭时机
和生产者类似,消费者关闭 Socket 也分为主动关闭和 Kafka 自动关闭。主动关闭是指你显式地调用消费者 API
的方法去关闭消费者,具体方式就是手动调用 KafkaConsumer.close() 方法,或者是执行 Kill 命令,不论是 Kill -2 还是 Kill
-9;而 Kafka 自动关闭是由消费者端参数 connection.max.idle.ms 控制的,该参数现在的默认值是 9 分钟,即如果某个 Socket
连接上连续 9 分钟都没有任何请求“过境”的话,那么消费者会强行“杀掉”这个 Socket 连接。