Kafka 常见面试问题
本文最后更新于 2024年1月10日 凌晨
Kafka 常见面试问题
基础
Kafka 简介
Apache Kafka 是由 LinkedIn 开发并后来捐献给 Apache 软件基金会的一个开源流处理平台。它基于发布-订阅的消息队列架构,设计用于构建高吞吐、持久、可扩展且能够处理大量实时数据流的系统。
Kafka 的主要作用:
- 消息中间件: Kafka 可以作为高性能的消息队列使用,支持消息的生产和消费,解耦数据生产者和消费者之间的依赖关系。
- 实时数据处理: 它支持实时数据流的摄取、存储和处理,可以配合流处理框架(如 Apache Storm、Apache Flink 或 Spark Streaming)来实现复杂的数据处理逻辑。
- 日志聚合: Kafka 常被用于收集来自分布式应用的日志,作为一个高性能的日志聚合解决方案。
- 事件源(Event Sourcing): Kafka 可以存储数据变更记录作为事件来源,提供了回放历史数据以及状态恢复的能力。
Kafka 的适用场景:
- 日志聚合系统: Kafka 可以从各种服务中收集日志,并提供集中化日志处理。
- 流式数据处理: Kafka 可以作为实时流数据的处理管道,实现数据的实时摄取、传输和处理。
- 事件驱动架构: Kafka 非常适合作为微服务架构中的事件总线,促进服务间的异步通信和解耦。
- 数据集成: Kafka 可以将数据从独立系统同步到集中式数据存储,如数据湖、数据仓库。
- 持久化存储: Kafka 具备将数据持久化保存的能力,便于数据回溯和复用。
Kafka 因为其独特的设计,特别适用于需要高可靠性和可扩展性的大数据实时处理场景。
Kafka 的缺点
Kafka 作为一个分布式消息系统,在其优势的同时也存在一些缺点,以下是一些 Kafka 的缺点:
- 非实时性: Kafka 是通过批量发送数据的方式来提高吞吐量的,因此并不是真正的实时消息系统。对于一些对实时性要求极高的应用场景,可能不太适合。
- 不支持 MQTT 协议: Kafka 不支持 MQTT(Message Queuing Telemetry Transport)协议,这对于一些依赖 MQTT 协议的应用来说可能是一个限制。
- 不直接支持物联网传感数据接入: Kafka 并没有专门为物联网(IoT)传感数据设计的接入方式,这使得在 IoT 场景下的接入可能相对复杂。
- 局部有序性: Kafka 保证同一分区内的消息有序,但无法保证全局有序性。对于一些需要全局有序性的应用场景,Kafka 可能需要进行额外的处理。
- 监控不完善: Kafka 的监控在某些方面可能不够完善,用户可能需要安装额外的插件或集成其他监控工具来满足特定的监控需求。
- 依赖 Zookeeper: Kafka 依赖 Zookeeper 来进行元数据的管理,这增加了系统的复杂性。尽管 Zookeeper 是一个可靠的分布式协调服务,但引入了额外的依赖关系和维护成本。
这些缺点并不意味着 Kafka 不适用于各种场景,而是需要根据具体的应用需求和场景来选择合适的消息系统。在某些特定场景下,这些缺点可能需要通过合适的架构设计和技术组合来弥补。
架构组件
Apache Kafka 是一个分布式的流平台,广泛用于构建实时数据管道和流应用程序。它能够高效地处理高吞吐量的数据,并能够持久化消息。下面是 Kafka 的几个关键组件以及它们的作用:
- Producer(生产者):
- 生产者负责向Kafka的Topic发送消息。它可以指定消息需要发送到的Topic,Kafka会负责将消息路由到正确的Broker和Partition。
- 生产者在发送消息时可以选择要求不同级别的确认(ACK)从Broker接收到,这涉及到消息可靠性的保障。
- 生产者还可以为消息指定一个key,保证具有相同key的消息被发送到相同的Partition中,这样可以保证消费的有序性。
- Broker(代理节点):
- Broker是Kafka集群的节点,用来存储数据并提供数据传输服务。一个集群由多个Broker组成。
- 每个Broker在集群中有唯一ID,并且持有一部分数据的Partition。
- Broker负责维护足够的副本来确保数据的可用性和容错性。
- Topic and Partition(主题和分区):
- Topic是消息的逻辑分组,可以看作是一类消息的分类。
- 每个Topic可以分为多个Partition,Partition是物理上的概念,代表了Topic的分裂,使得消息能够分布存储在不同的Broker上。
- Consumer(消费者):
- 消费者从指定的Topic读取消息进行处理。消费者所属的Consumer Group可以保证同一个Group内不同Consumer平均分摊消息消费任务。
- 如果消费者数量多于Partition数量,则部分消费者会处于空闲状态。
- Kafka跟踪每个Consumer Group的消费进度通过Offset,这个Offset表示该Consumer Group在每个Partition上消费到的位置。
- Consumer Group(消费者组):
- Consumer Group由多个Consumer组成,每个Consumer通常会读取不同Partition的消息。
- Kafka为每个Consumer Group维护一个Offset,用于追踪每个Partition上的读取进度。
- 当Consumer处理消息失败或者集群发生重新平衡时,Consumer Group可以通过Offset恢复消费状态。
其他重要概念
- Zookeeper:
- Kafka群集依赖Zookeeper来进行集群管理和配置信息的同步。Zookeeper协调broker之间的状态,以及Consumer的offset等重要信息存储。
- Replication(副本机制):
- 对于Partition的数据,Kafka通过Replication机制来保证数据的可靠性。Partition会有一个Leader和多个Follower,所有的读写操作都由Leader处理,而Follower负责复制数据。
- Controller(控制器):
- Controller是一个特殊的Broker,负责管理Partition的Leader选举以及各种故障恢复操作。
使用Kafka可以构建强大和可敏感伸缩的实时数据处理管道,常见的用例包括日志聚合、实时流处理、事件源、事件通知和许多其他用于大规模数据传输的场景。
与传统消息传递的对比
传统的消息传递方法主要包括两种模型:队列(Queue)和发布-订阅(Publish-Subscribe)。
队列(Queue):
- 在队列模型中,消息被发送到一个队列中,然后从队列中被一个或多个消费者取出。每条消息只能被一个消费者接收,确保消息的一致性传递。
- 消费者从队列中拉取消息,处理完消息后,确认消息已被处理,消息从队列中移除。
- 这种模型适用于点对点的通信,其中一个生产者发送消息给一个特定的消费者。
发布-订阅(Publish-Subscribe):
- 在发布-订阅模型中,消息发送者(发布者)将消息发布到一个主题(Topic)中,而订阅者可以选择订阅感兴趣的主题。
- 每个订阅者都可以接收到发布到其订阅主题的消息,实现了消息的广播。
- 这种模型适用于一对多的通信,其中一个生产者可以向多个消费者广播消息。
这两种传统的消息传递方法在消息系统中有广泛的应用。队列模型适用于需要点对点通信和确保消息被唯一消费的场景,而发布-订阅模型适用于广播消息给多个订阅者的场景。这些方法在构建分布式系统、消息队列系统、实时通信系统等方面都具有重要作用。
Kafka 的优势
Kafka 相对于传统的消息传递方法具有多个优势,其中一些主要的优势包括:
- 高性能:
- Kafka 是为高吞吐量和低延迟而设计的,能够处理大规模的数据流。其设计特点允许每秒处理数兆字节的读写操作。
- 支持批量操作,能够高效地处理大量的消息。
- 可扩展性:
- Kafka 集群可以透明地扩展,可以方便地增加新的服务器进入集群,从而满足系统需求的扩展性。
- 容错性:
- Kafka 提供了数据冗余和副本机制,每个 Partition 的数据会被复制到多个服务器上。在某个 Broker 失效时,Zookeeper 会通知生产者和消费者切换到其他可用的 Broker,从而确保系统的容错性。
- 持久性:
- Kafka 采用持久性存储,可以保留生产者产生的数据,即使消费者没有及时处理也不会导致数据丢失。
- 每个消息都会在磁盘上保留一定的时间,以确保消息的持久性。
- 水平伸缩:
- Kafka 的设计允许数据水平分区,使得集群能够有效地分布和处理大量的数据。
- 多订阅者模型:
- Kafka 支持发布-订阅模型,允许多个消费者同时订阅一个主题,实现了消息的多播,适用于广播消息给多个订阅者的场景。
总体而言,Kafka 在分布式消息传递系统中具有出色的性能、可扩展性、容错性和持久性,使其成为处理大规模数据流的理想选择。
与其他消息组件对比
Kafka 相对于其他消息组件的特点如下:
ActiveMQ:
- 单机吞吐量:万级,比 RocketMQ、Kafka 稍低。
- Topic 数量影响:未提供具体数据。
- 时效性:毫秒级。
- 可用性:高,基于主从架构实现高可用。
- 消息可靠性:有较低的概率丢失数据。
- 功能支持:MQ 领域的功能极其完备。
RabbitMQ:
- 单机吞吐量:同 ActiveMQ。
- Topic 数量影响:未提供具体数据。
- 时效性:微秒级,低延迟。
- 可用性:同 ActiveMQ。
- 消息可靠性:基本不丢失。
- 功能支持:基于 Erlang 开发,性能强,功能完备。
RocketMQ:
- 单机吞吐量:10 万级,支撑高吞吐。
- Topic 数量影响:Topic 数量可以达到几百/几千的级别,吞吐量下降较小。
- 时效性:毫秒级。
- 可用性:非常高,分布式架构。
- 消息可靠性:经过参数优化配置,可以做到零丢失。
- 功能支持:MQ 功能较为完善,分布式,扩展性好。
Kafka:
- 单机吞吐量:10 万级,高吞吐,常用于大数据实时计算和日志采集。
- Topic 数量影响:Topic 数量从几十到几百个时,吞吐量会大幅下降,建议控制 Topic 数量。
- 时效性:延迟在毫秒级以内。
- 可用性:非常高,分布式,一个数据多个副本,少数机器宕机不丢失数据,不导致不可用。
- 消息可靠性:与 RocketMQ 相当。
- 功能支持:功能相对简单,主要支持 MQ 基本功能,在大数据领域的实时计算和日志采集中被广泛使用。
工作流程
Broker 的作用
在 Kafka 中,broker 是 Kafka 集群中的一个节点或服务器。每个 broker 负责存储和管理一部分数据,并提供数据的读写服务。
以下是 broker 在 Kafka 中的主要意义:
- 数据存储: Broker 负责存储 Kafka 中的所有主题(topics)的分区(partitions)数据。每个分区都会在集群中的不同的 broker 上有多个副本,以提供容错性和高可用性。
- 数据处理: Broker 负责处理生产者发送的消息,并将消息写入相应的分区。它也处理消费者的读取请求,从存储中检索消息并将其传递给消费者。
- 负载均衡: Kafka 集群中的多个 broker 一起工作以实现负载均衡。每个分区的副本会分布在不同的 broker 上,从而提高集群的整体性能和容错性。
- 元数据管理: Broker 负责维护关于整个 Kafka 集群的元数据,包括主题、分区、副本的分布信息。这些元数据对于生产者和消费者在集群中找到正确的 broker 以及进行数据交互至关重要。
- 领导者选举: 对于每个分区,都会有一个 broker 被选为领导者(leader),负责处理该分区的所有读写请求。其他副本作为追随者(followers)进行数据复制。Broker 参与领导者选举以确保系统的高可用性。
- 与 Zookeeper 交互: Broker 与 Zookeeper 协同工作,以进行领导者选举、存储元数据、监控集群状态等操作。Zookeeper 用于维护 Kafka 集群的整体协调。
总体而言,broker 是 Kafka 集群中的关键组件,它们共同协作以提供高性能、高可用性和可伸缩性的分布式消息系统。
Zookeeper 在 Kafka 中的作用
ZooKeeper(Zk)在 Kafka 中扮演着关键的角色,其主要作用包括以下几点:
- 元数据存储:
- Kafka 的元数据,如 topic、分区、副本分配等信息,都存放在 ZooKeeper 中。这样做的好处是可以实现集群中各个节点之间的协调和一致性。
- Consumer 消费状态管理(0.8 版本之前):
- 在 Kafka 0.8 版本之前,ZooKeeper 负责管理 Consumer 的消费状态,包括 Consumer Group 的管理、offset 的存储和管理等。Consumer Group 是一组逻辑上关联的 Consumer 实例,它们共同消费一个或多个分区的消息。ZooKeeper 为 Consumer Group 提供了协调和管理的功能。
- Leader 选举和 Broker 管理:
- ZooKeeper 协助 Kafka 进行 Broker 的 Leader 选举。每个分区都有一个 Leader,负责处理读写请求,而其他副本则作为 Follower。ZooKeeper 协助在 Broker 故障或新 Broker 加入时进行 Leader 的选举过程。
- Offset 管理(0.8 版本之前):
- 在 Kafka 0.8 版本之前,Consumer 的 offset(消息偏移量)信息也是由 ZooKeeper 来管理的。这样可以确保 Consumer 可以从上次消费的位置继续消费消息,实现了对消息的可靠性消费。
总体而言,ZooKeeper 在 Kafka 中充当了分布式协调服务的角色,协助 Kafka 集群的各个组件进行协同工作。由于 Kafka 自身实现了对元数据的本地化存储和对 offset 的本地存储,因此在新版本中对 ZooKeeper 的依赖逐渐减小,但 ZooKeeper 仍然扮演着关键的协调和管理角色。
Kafka 可以脱离 Kookeeper 单独使用吗
在目前的 Kafka 无法完全脱离 ZooKeeper 而独立运行。ZooKeeper 在 Kafka 中承担着关键的角色,主要用于存储元数据、管理消费者组、进行 Broker 的 Leader 选举等任务。因此,ZooKeeper 是 Kafka 集群正常运行所必需的组件之一。
Kafka 的元数据,包括主题(topic)、分区(partition)、副本分配等信息,都存储在 ZooKeeper 中。而且,ZooKeeper 还负责协调 Kafka 集群中的各个 Broker 以及消费者组的活动状态。
虽然 Kafka 2.8.0 版本引入了 KRaft,这是一个实验性的特性,旨在替代 ZooKeeper 用于管理 Kafka 的元数据。KRaft 允许 Kafka Broker 之间通过内部协议进行直接通信,而无需依赖外部的 ZooKeeper。但目前来说,KRaft 仍然处于实验阶段,不建议在生产环境中广泛使用。
因此,尽管 Kafka 未来可能会逐步减少对 ZooKeeper 的依赖,但在目前版本中,ZooKeeper 仍然是 Kafka 集群正常运行所必需的组件。
高吞吐的原理
Kafka 实现高吞吐的原理主要包括以下关键因素:
- 分区机制: Kafka 将每个主题分为多个分区,每个分区在不同的服务器上进行存储和处理。分区允许数据在集群中并行处理,提高吞吐量。每个分区可以被多个消费者并发读取,建立一对一的消费者与分区关系。
- 批量发送和异步写入: 生产者在向 Kafka 发送消息时采用批量发送方式,将多条消息一次性打包发送给 Kafka,降低网络通信开销。Kafka 写入消息的方式是异步的,生产者无需等待确认所有消息都被写入,极大提高写入吞吐量。
- 零拷贝技术: Kafka 使用零拷贝技术提高数据传输效率。通过内存映射文件(Mmap)技术,直接将文件数据传输到网络或存储设备,避免用户空间和内核空间之间的数据拷贝,提升读写性能。
- 批量消费和消费者组: Kafka 支持批量消费,允许消费者一次性获取多条消息进行处理,减少处理过程中的开销。同时,Kafka 支持将多个消费者组织成一个消费者组,每个消费者组独立消费分区中的消息,实现了消费的并发性和水平扩展能力。
- 集群并行处理: Kafka 通过增加 Broker 节点实现集群的水平扩展和并行处理。每个 Broker 节点负责处理一部分分区,实现了分布式的读取和写入操作。这样的设计将负载均衡地分散到多个节点上,提高整体吞吐量。
- 读写文件依赖 OS 文件系统的页缓存: Kafka 利用操作系统文件系统的页缓存进行读写文件,而不是在 JVM 内部缓存数据。这种方式能够充分利用操作系统的内存管理机制,提高内存利用率。
- Sendfile 技术(零拷贝): Kafka 使用 Sendfile 技术实现零拷贝,避免了传统网络 IO 的四步流程(读取数据、拷贝到用户缓冲区、用户缓冲区写入网络、网络发送数据)。零拷贝技术通过直接在内核和磁盘之间传输数据,减少了数据在用户空间和内核空间之间的拷贝,提高了数据传输效率。
- 支持 End-to-End 的压缩: Kafka 支持消息的端到端压缩,通过在生产者端进行消息压缩,在消费者端进行解压缩,可以减小网络传输的数据量,提高吞吐效率。
- 顺序读写: Kafka 的文件存储采用顺序 IO,这使得 Kafka 在写入和读取数据时能够充分利用磁盘的顺序读写能力,提高性能。此外,Kafka 对于消息的写入和读取操作都具有常量时间的复杂度,保证了高效的消息处理。
综上所述,Kafka 实现高吞吐的原理通过合理的分区机制、批量发送和异步写入、零拷贝技术、批量消费和消费者组、以及集群并行处理等综合应用,使得 Kafka 能够高效地处理大规模的数据流,达到高吞吐量的目标。
数据存储
文件存储设计
Kafka 的高效文件存储设计具有以下特点:
- 分割大文件: Kafka 将一个 Topic 中的一个 Partition 的大文件分割成多个小文件段(Segment)。这种设计使得定期清理或删除已经被消费的文件段变得更加容易,从而减少磁盘空间的占用。
- 快速定位和确认消息大小: Kafka 在设计中采用了索引信息,这使得系统可以快速定位消息,并确定响应的最大大小。通过索引的帮助,Kafka 能够以高效的方式处理消息的读取和确认。
- 内存中的索引: Kafka 将索引文件的元数据全部映射到内存中,避免了频繁的磁盘 IO 操作。这样一来,对索引的访问可以在内存中快速完成,提高了读写的效率。
- 稀疏存储的索引文件: Kafka 采用了索引文件的稀疏存储方式,这意味着索引文件中的数据并不是完全密集存储的。这种设计降低了索引文件元数据占用空间的大小,减小了对磁盘空间的需求,提高了存储的效率。
通过这些设计特点,Kafka 实现了高效的文件存储和消息管理,保证了系统的性能和可维护性。这样的存储设计使得 Kafka 能够应对大规模数据流的高吞吐量和低延迟的需求。
分区的目的
Kafka 分区的目的主要有两方面的好处:
- 负载均衡: 分区能够实现负载均衡,将消息数据均匀地分散到 Kafka 集群的各个 Broker 节点上。每个分区由一个特定的 Broker 负责,通过增加分区数量和 Broker 节点,可以有效地分摊整个系统的负载,提高整体的处理能力。
- 提高并发度和效率: 对于消费者而言,分区使得多个消费者可以并发地处理消息。每个消费者组可以独立地消费一个或多个分区,从而提高了并发度。这种设计适用于大规模的数据流处理场景,使得多个消费者能够同时处理不同分区的消息,提高了整体的效率和吞吐量。
通过合理设置分区数量,可以根据实际需求来平衡负载和提高并发度,使 Kafka 在大规模和高并发的场景下更加高效地处理数据流。
分区策略
在 Kafka 内部,存在两种默认的分区分配策略:Range 和 RoundRobin。
Range 分区策略
Range 是 Kafka 的默认分区策略。该策略对每个主题(Topic)进行分区,首先对同一个主题内的分区按序号进行排序,然后按照字母顺序对消费者进行排序。接着,通过将分区总数除以消费者线程总数来确定每个消费者线程应该消费的分区数量。如果无法整除,前面的几个消费者线程将多消费一个分区。
举例说明:假设有 10 个分区、两个消费者(C1、C2),以及 3 个消费者线程。计算方式为 10 / 3 = 3 余 1。
- C1-0 将消费 0, 1, 2, 3 分区
- C2-0 将消费 4, 5, 6 分区
- C2-1 将消费 7, 8, 9 分区
第一步是将所有主题分区组成 TopicAndPartition
列表,然后按 hashCode
进行排序,最后通过轮询的方式分配给每个消费线程。
RoundRobin 分区策略
RoundRobin 策略是另一种可选的分区策略。这个策略将所有主题的分区均匀地分配给每个消费者线程,以实现负载均衡。
例如,有两个消费者(C1、C2)和 10 个分区,RoundRobin 分配方式如下:
- C1-0 将消费 0, 2, 4, 6, 8 分区
- C2-0 将消费 1, 3, 5, 7, 9 分区
RoundRobin 策略简单地将每个分区均匀地轮流分配给不同的消费者线程。
选择使用哪种分区策略通常取决于业务需求和应用场景。Range 策略适合需要按照特定规则对分区进行排序的情况,而 RoundRobin 策略适用于简单的负载均衡需求。
调整分区数
Kafka 分区数在创建主题时是可以指定的,但是一旦创建后,Kafka 不支持直接减少分区数。这是由于分区数的减少涉及到一些复杂的数据迁移和一致性问题,因此 Kafka 设计上选择了不支持减少分区数。
增加分区数是可能的,可以通过修改主题的配置或使用 Kafka 提供的工具来实现。增加分区数可以提高并行性和吞吐量,适应业务增长的需要。
减少分区数的问题主要在于如何处理已经存在的数据。如果要减少分区数,可能需要考虑以下问题:
- 数据迁移: 已经存在的分区中可能有大量的数据,如果要减少分区数,就需要将这些数据合并到较少的分区中。这涉及到数据的重新分布和迁移,可能会对性能和可用性产生影响。
- 有序性: Kafka 保证每个分区内的消息有序,如果要将多个分区的数据合并到一个较少的分区中,就需要确保合并后的分区仍然保持有序性。这可能需要一些复杂的操作来维护消息的顺序。
- 消息丢失: 如果在减少分区数的过程中发生错误,可能会导致消息丢失。这需要谨慎处理,采取一些保护措施来确保数据的完整性。
综合考虑这些因素,Kafka 设计上更倾向于支持增加分区数,而在减少分区数时需要开发者小心谨慎,并可能需要进行一些手动操作以确保数据的完整性和一致性。
消费指定分区
Kafka 消费者是可以消费指定分区的消息的。在 Kafka 中,每个消费者可以订阅一个或多个主题,而每个主题可以被划分为多个分区。消费者可以通过指定分区的方式来消费特定分区的消息。
消费者在消费消息时,可以通过以下方式来指定分区:
手动指定分区: 消费者可以通过代码中显式指定要消费的分区,从而只消费该分区的消息。这种方式允许消费者有更精细的控制,可以选择性地消费特定分区的消息。
1
consumer.assign(Arrays.asList(new TopicPartition("topicName", partitionNumber)));
自动分配分区: 消费者也可以选择让 Kafka 自动分配分区,这时 Kafka 会根据消费者组的协调过程将分区均匀地分配给消费者。在这种情况下,每个消费者只消费被分配到的分区的消息。
1
consumer.subscribe(Arrays.asList("topicName"));
通过手动指定分区或者让 Kafka 自动分配分区,消费者可以根据业务需求有选择地消费特定分区的消息。这为灵活的消息消费提供了支持。
提高远程用户的吞吐量
要提高远程用户的吞吐量,特别是当用户位于与 Kafka broker 不同的数据中心时,可以考虑以下策略:
- 调优 Socket 缓冲区大小: 增加 Socket 缓冲区大小可以帮助摊销长网络延迟。对于远程用户,由于网络传输的延迟相对较高,通过调整 Socket 缓冲区大小,可以提高数据在网络上传输的效率。适当增加缓冲区大小有助于减少网络传输的频率,从而提高吞吐量。
- 使用更高带宽的网络连接: 考虑使用更高带宽的网络连接,以提高数据传输速率。选择更高速的网络连接可以缩短数据传输的时间,从而增加吞吐量。
- 合理设置副本数: 在跨数据中心的情况下,可以考虑调整副本数以提高数据可用性。然而,需要权衡副本数和数据同步的网络开销。增加副本数可能会增加数据同步的延迟,但有助于提高数据的可用性。
- 使用压缩算法: 在远程传输过程中使用压缩算法可以减小数据量,降低网络传输的成本。Kafka 支持消息的压缩,可以在生产者端进行消息压缩,然后在消费者端进行解压缩,以减少网络传输的数据量。
- 考虑网络优化: 进行网络优化,例如使用专用线路或优化网络拓扑结构,以减少网络延迟和提高带宽利用率。
综合考虑以上策略,可以更好地适应远程用户的场景,提高数据传输的效率和吞吐量。需要根据具体的网络环境和业务需求进行调优和配置。
Topic 设置分区、Broker
在 Kafka 中,创建 Topic 时,可以通过以下规则将分区放置到不同的 Broker 中:
- 副本因子限制: 副本因子不能大于 Broker 的个数。副本因子表示每个分区有多少个副本。
- 第一个分区的放置: 第一个分区(编号为 0)的第一个副本的放置位置是随机从 Broker 列表中选择的。这决定了该 Topic 的第一个分区的主副本所在的 Broker。
- 其他分区的放置: 其他分区的第一个副本的放置位置相对于第 0 个分区的主副本依次往后移。例如,如果有 5 个 Broker 和 5 个分区,第一个分区的主副本放在第四个 Broker 上,那么第二个分区将放在第五个 Broker 上,第三个分区将放在第一个 Broker 上,依此类推。
- 剩余副本的放置: 剩余的副本相对于第一个副本的放置位置是由一个随机产生的数值
nextReplicaShift
决定的。这确保了剩余的副本位置是相对随机的,增加了分区的均衡性。
总体来说,Kafka 通过这些规则在创建 Topic 时将分区和副本分散放置在不同的 Broker 上,以实现负载均衡和容错性。这样的设计保证了每个分区的主副本和副本分布在不同的 Broker 上,提高了整个集群的可靠性和性能。
如何清理过期数据
在 Kafka 中,清理过期数据通常使用删除(delete)和压缩(compaction)策略。以下是这两种策略的简要说明:
删除策略:
- 按时间清理:
- 设置
log.retention.hours
参数,指定消息在日志中保留的最长时间。超过该时间的消息将被删除。
- 设置
- 按大小清理:
- 设置
log.retention.bytes
参数,指定日志段(segment)的最大大小。当一个日志段达到指定大小时,旧的消息将被删除。
- 设置
- 定期清理:
- 设置定期清理的策略,可以根据时间或大小的条件,定期触发清理操作。
压缩策略:
- 启用 Cleaner:
- 设置
log.cleaner.enable=true
启用 Cleaner,该功能默认为关闭状态。
- 设置
- 设置 Cleanup Policy 为 Compact:
- 在 topic 的配置中设置
log.cleanup.policy=compact
启用压缩策略。
- 在 topic 的配置中设置
- Compaction 压缩策略:
- 在 Kafka 中,Compaction 是指只保留每个 key 的最后一个版本的数据。通过将具有相同 key 的消息聚合,只保留最后一次出现时的数据,从而减少存储占用。
- 删除无效 Key:
- 当某个 key 的最新版本的消息没有内容时,Compaction 策略会删除该 key。
清理策略的选择取决于应用的需求和特定场景。删除策略适用于按照时间或大小清理数据,而压缩策略适用于保留每个 key 的最新版本,以减少存储空间。在实际应用中,可以根据业务需求和数据特性进行配置。
Message 结构
在 Kafka 中,一条消息(Record)通常由以下几个部分组成:
- Attributes(属性):
- 该字节包含有关消息的元数据属性。最低的 2 位包含用于消息的压缩编解码器。其他位应设置为 0。
- CRC(校验码):
- CRC 是消息字节的其余部分的 CRC 32。这用于检查代理和使用者上的消息的完整性。
- Key(键):
- Key 是可选的参数,用于分区分配。Key 可以为 null。
- Magic Byte(版本标识):
- 这是用于允许向后兼容的消息二进制格式演变的版本 ID。当前值为 0。
- Offset(偏移量):
- Offset 是 Kafka 中用作日志序列号的偏移量。当 Producer 发送消息时,它实际上并不知道偏移量,并且可以填写它喜欢的任何值。
- Value(值):
- Value 是实际的消息内容,作为不透明的字节数组。Kafka 支持递归消息,在这种情况下,它本身可能包含消息集。消息可以为 null。
每个消息都有以上基本元素,它们共同构成了 Kafka 中的消息体。这些信息在生产者和消费者之间进行传递,确保了 Kafka 的强大而灵活的消息处理能力。
Kafka 有几种数据保留策略
在 Kafka 中,数据保留策略主要通过两个配置来控制:log.retention.hours
和 log.retention.bytes
。
按照过期时间保留(按小时保留):
- 参数:
log.retention.hours
- 描述:指定消息在日志中保留的时间,超过这个时间将被删除。
- 例子:
log.retention.hours=168
表示消息在日志中保留 7 天(168 小时)。
- 参数:
按照存储的消息大小保留:
- 参数:
log.retention.bytes
- 描述:指定日志分区中存储的消息总字节数的上限。当日志大小达到这个值时,较早的消息将被删除。
- 例子:
log.retention.bytes=1073741824
表示当消息总大小达到 1 GB 时,较早的消息将被删除。
- 参数:
这两种策略可以同时启用,也可以只选择其中一种。如果两种策略都启用,任何一种条件满足都将导致消息被删除。
这些配置可以在 Kafka 的 Topic 级别进行设置,也可以在 Broker 级别进行全局设置。Topic 级别的设置将覆盖全局设置。
数据过期策略
在 Kafka 中,如果同时设置了按照过期时间保留和按照存储大小保留两种数据保留策略,当其中一种策略的条件满足时,Kafka 会执行数据清除工作。
在你的例子中,当消息总大小达到 10 GB 时,即按照存储大小保留的条件满足,Kafka 将执行数据清除。这意味着较早的消息将被删除,以确保消息总大小不超过 10 GB。此时,与过期时间(7 天)无关,因为按照存储大小保留的条件首先得到满足。
Kafka 的数据清除是在后台异步进行的,因此不会立即删除消息,而是在后续的清理周期中执行。清理的频率取决于 log.retention.check.interval.ms
参数的设置。默认情况下,该参数为 5 分钟,意味着每隔 5 分钟 Kafka 都会检查是否需要执行数据清除操作。
默认接收到的最大信息
Kafka 服务器默认配置中可以接收的最大消息大小是通过参数 message.max.bytes
控制的。在 Kafka 的配置中,这个参数的默认值是 1000000 字节,即 1 MB。
这个参数影响生产者发送消息和消费者接收消息的最大大小。如果要传输的消息大小超过这个限制,就需要调整相应的配置参数,以确保 Kafka 服务器能够处理更大的消息。
在生产环境中,如果需要处理较大的消息,建议仔细评估系统的需求,并根据实际情况调整 Kafka 的相关配置参数。
数据消费
消息的有序性
在 Kafka 中,保证消息有序性的主要原理是在单个分区内,消息的写入和读取是有序的。每个分区维护了一个持久化的消息日志(log),该日志以追加写入的方式记录消息。以下是 Kafka 实现消息有序性的关键机制:
- Producer 写入顺序: Kafka Producer 向分区写入消息时,会按照消息的写入顺序依次追加到分区的消息日志中。这确保了同一分区内的消息是有序的。
- Consumer 顺序消费: Kafka Consumer 从分区读取消息时,它会按照消息在分区中的顺序进行消费。每个消费者组内的每个消费者负责消费分区的不同部分,但同一分区内的消息顺序由单个消费者保证。
- 分区独立: 每个分区是独立的消息队列,不同分区之间的消息不进行排序。这意味着跨分区的消息顺序不受保证。Kafka 的并行性和吞吐量可以通过增加分区数来提高,但这也意味着可能失去了全局的消息有序性。
需要注意的是,如果业务场景对全局有序性要求很高,可以将所有消息写入一个分区,但这样可能会牺牲一些性能。因此,在设计 Kafka 主题时,需要根据业务需求权衡全局有序性和并行性。
总体而言,Kafka 通过分区内的有序写入和消费来满足大多数场景下的消息有序性需求。
ACK
在 Kafka 中,acks
参数用于表示生产者在发送消息后等待的确认数。该参数有三个取值:0、1 和 -1,分别对应不同的确认模式。
Ack=0:
- 生产者发送消息后不等待 broker 的确认,直接继续发送下一条消息。
- Broker 在接收到消息后,即使还没有将消息写入磁盘,也会立即返回 ack。这种模式下,存在丢失数据的风险,因为消息可能在写入磁盘之前就已经返回了 ack。
Ack=1:
- 生产者等待 partition 的 leader broker 的 ack。Leader 在接收到消息并成功写入磁盘后,会返回 ack。
- 如果在 follower 同步成功之前 leader 故障,可能会丢失数据,因为 follower 可能尚未同步成功。
- 生产者可以继续发送下一条消息,而不必等待 follower 的同步。
Ack=-1 (All):
- 生产者等待 partition 的 leader 和所有的 follower broker 的 ack。
- 只有当 leader 和所有的 follower 都成功写入磁盘后,才会返回 ack。这是最安全的模式,保证数据不会丢失,但会增加写入的延迟。
- 在生产环境中,主要以 Ack=-1 为主,因为它提供了最高的数据可靠性。如果生产者可以容忍一些数据的丢失,并且希望获得更低的延迟,可以考虑使用 Ack=1。
此外,min.insync.replicas
参数在 Ack 为 -1 时生效,表示 ISR(In-Sync Replicas)中应答的最小 follower 数量。默认为 1,这意味着只要 leader 本身,即使没有其他 follower,也会返回 ack,不能保证不丢失数据。为了保证数据可靠性,需要将 min.insync.replicas
设置为大于等于 2,确保有其他副本同步到数据。
在设置 retries
参数时,通常会将其配置为 Integer.MAX_VALUE
,表示无限重试。这样,即使上述条件不完全满足,写入会一直重试,以确保数据被成功发送给多个副本。这在要求极高数据可靠性的场景下使用。
总体而言,Ack=-1
是在生产环境中常用的设置,提供了最高的数据可靠性。如果对数据的实时性要求较高,可以考虑使用 Ack=1
,但需注意可能存在的数据丢失风险。Ack=0
一般只用于测试场景,不适用于生产环境。选择合适的 Ack 模式需要根据业务需求和对数据可靠性的要求做出权衡。
完全不丢结论:ack=-1 + min.insync.replicas>=2 +无限重试
ISR、OSR、AR
在 Kafka 中,ISR、OSR、AR 是与副本(Replica)相关的概念,用于描述副本的状态和同步情况:
- ISR(In-Sync Replicas): 副本同步队列。ISR 表示与 leader 副本保持同步的一组副本。具体来说,ISR 包含了那些在一定时间内与 leader 副本保持相对实时同步的副本。当 follower 副本与 leader 副本的同步延迟在一定范围内时,该 follower 副本被认为是在 ISR 中。ISR 中的副本可以参与到 leader 副本的选举中,用于维持高可用性和故障恢复。
- OSR(Out-of-Sync Replicas): 不同步的副本。OSR 包含那些与 leader 副本同步延迟超过一定阈值的副本。当 follower 副本的同步延迟过大,无法及时跟上 leader 副本的变化时,该副本会被移出 ISR,进入 OSR 列表。OSR 中的副本在一定条件下有可能重新加入到 ISR 中,但在 OSR 中的副本不会参与 leader 选举。
- AR(Assigned Replicas): 所有副本。AR 表示分区中的所有副本,包括 ISR 中的副本和 OSR 中的副本。AR = ISR + OSR。
这些概念主要用于 Kafka 的副本管理和数据复制机制。ISR 的概念使得 Kafka 能够保持高可用性,即使部分副本同步延迟,仍然能够保持一定的一致性,而 OSR 中的副本则表示可能需要进行故障恢复或者重新同步的副本。
LEO、HW、LSO、LW
在 Kafka 中,LEO、HW、LSO、LW 等表示不同的概念:
- LEO(LogEndOffset): 是 LogEndOffset 的简称,代表当前日志文件中下一条消息的位置。LEO 是一个相对于分区的位移(offset),表示当前日志中最后一条消息的下一条消息的位置。
- HW(High Watermark): 水位或水印,通常用于表示消费者可以安全地读取的位置。在 Kafka 中,HW 是 ISR(In-Sync Replicas,同步副本集)中最小的 LEO(LogEndOffset)。消费者最多只能消费到 HW 所在的位置上一条消息,以确保消费者读取的是已经复制到多数副本的消息,从而确保消息的一致性。
- LSO(Last Stable Offset): 是 LastStableOffset 的简称,对未完成的事务而言,LSO 的值等于事务中第一条消息的位置(firstUnstableOffset),对已完成的事务而言,它的值与 HW 相同。LSO 用于处理事务性消息,确保只有已经提交的事务消息才会被读取。
- LW(Low Watermark): 低水位,代表 AR(Assigned Replicas,所有副本)集合中最小的 logStartOffset(日志起始位置)值。LW 是 ISR 中所有副本的 logStartOffset 的最小值,用于处理 ISR 中副本不一致的情况。
这些概念在 Kafka 中用于管理分区的复制、事务和消费者的读取进度等方面,确保数据的一致性和可靠性。
数据传输的事务
数据传输的事务通常分为以下三种级别:
- 最多一次(At Most Once): 消息不会被重复发送,最多被传输一次。这意味着在传输过程中可能发生消息丢失,但绝不会出现消息重复的情况。
- 最少一次(At Least Once): 消息不会被漏发送,最少被传输一次。这意味着在传输过程中可能发生消息重复,但绝不会出现消息丢失的情况。通常,这种级别的实现会使用一些手段来确保消息的可靠传输,例如重试机制。
- 精确一次(Exactly Once): 数据传输的最高级别,要求不会漏传输也不会重复传输,确保每个消息都被接收一次且仅一次。实现精确一次传输通常需要使用一些复杂的协议和机制,例如事务性消息和幂等性操作,以保证在任何情况下都能够实现精确一次的传输。
选择使用哪种级别的事务取决于应用程序的要求以及对于消息传输可靠性和性能的权衡。不同的应用场景可能需要不同级别的事务来满足特定的需求。
不重复消费
Kafka 保证不重复消费的关键在于保证消息队列的消费具备幂等性。以下是一些常见的保证机制:
- 数据库唯一键约束: 如果消费的操作是往数据库中写入数据,可以利用数据库的唯一键约束来保证不重复插入。在插入之前,先检查数据库中是否已存在相同唯一键的记录,如果已存在,则不执行插入操作。
- 使用全局唯一标识: 在生产者发送消息时,为每条消息附加一个全局唯一标识,例如订单 ID。消费者在接收到消息后,先检查全局唯一标识在本地存储(例如 Redis)中是否已存在。如果不存在,进行消息处理,并将唯一标识存储在本地;如果已存在,则跳过消息处理,保证不重复处理相同的消息。
- 幂等性处理: 消费者的处理逻辑应当具备幂等性,即多次执行不会产生不同的效果。这可以通过设计消费者的业务逻辑,使其在处理相同消息多次时具有相同的结果。
- 事务性操作: 如果消费者的处理涉及多个步骤,可以考虑使用事务性操作。消息的处理要么全部成功,要么全部失败回滚,以确保消息不会被重复处理。
综合以上方法,可以有效地保证 Kafka 消费的幂等性,从而达到不重复消费的目的。选择合适的机制取决于具体业务场景和需求。
不丢失消息
在 Kafka 中,要确保不丢失消息,需要考虑生产者和消费者两个方面的情况。
保证不丢失消息的策略:
消费端弄丢了数据:
- 关闭自动提交 offset:在消费者端关闭自动提交 offset 的功能,而是由消费者在处理完消息后手动提交 offset,以确保消费者在处理消息时宕机不会导致 offset 提交,从而保证消息不会丢失。
- 幂等性处理:消费者需要保证消息处理的幂等性,即多次处理相同消息的效果与处理一次相同,以应对可能的重复消费情况。
Kafka 弄丢了数据:
- 设置
replication.factor
参数:确保每个分区至少有 2 个副本,可以通过给 Topic 设置replication.factor
参数来实现。这样即使一个 broker 宕机,还有其他副本可以保证数据的可用性。 - 设置
min.insync.replicas
参数:在 Kafka 服务端设置min.insync.replicas
参数,要求一个 leader 至少感知到有至少一个 follower 保持联系,以确保 leader 切换时有至少一个 follower 跟随。 - 设置
acks=all
:在生产者端设置acks=all
,要求每条数据必须写入所有 replica 之后才认为写入成功。 - 设置
retries=MAX
:在生产者端设置retries=MAX
,保证一旦写入失败,生产者将无限重试。
- 设置
这些策略结合起来,可以有效地保障 Kafka 中消息的可靠性,确保数据不会在生产、传输和消费过程中丢失。
精确一次 Exactly Once
在分布式消息系统中,精确一次消费(Exactly-Once Semantic)是最难实现的一种消息传递保证,特别是在存在系统故障和消息重传的环境中。对于 Apache Kafka 来说,要实现精准一次消费,需要从生产者、Kafka 代理(Broker)、消费者三个部分共同努力来减少消息的丢失和重复。
Kafka 0.11 及以后的版本中引入的事务功能和幂等性生产者是实现精确一次语义的关键。
幂等性生产者(Idempotent Producer):
- Kafka 提供了幂等性生产者的配置,通常通过设置
enable.idempotence=true
启用。 - 幂等性生产者可以确保即使生产者发生重试,消息也只会被写入一次。
- Kafka 通过使用内部的序列号来跟踪和删除重复的消息。
- Kafka 提供了幂等性生产者的配置,通常通过设置
事务式生产者(Transactional Producer):
- 生产者可以启动一个事务,将多个写操作包裹在内,确保它们要么全部完成,要么全部不做。
- 这允许跨多个分区和主题的写作操作被视为一个原子操作。
- 通过
transactional.id
配置事务 ID,并调用initTransactions
方法来启用。 - 使用
beginTransaction
和commitTransaction
来包裹事务。
消费者的幂等性消费:
- 消费者处理逻辑需要设计成幂等的,即使重复消费相同的消息也不会影响最终结果。
- 消费者可以使用事务来读取来自特定事务的消息,通过事务边界(commit 或 abort)协调。
消费者位移提交(Offsets Committing):
- 精确一次处理的另一个关键组成部分是消费者提交处理过的消息的 offsets。
- 在事务性处理中,消费者提交的 offsets 和生产者发送的消息应该位于同一个事务中。
- Kafka 0.11+支持消费者在事务内提交 offset,这意味着 offset 的提交和消息的生产成为一个原子操作。
实现步骤:
为了确保 Kafka 实现精确一次语义,可以遵循以下步骤:
- 配置幂等性生产者。
- 配置事务式生产者,并正确处理事务(确保 begin 和 commit/abort 匹配使用)。
- 消费端需要开启读取已提交的消息(read_committed)来只接收那些已经在生产方提交的消息。
- 消费端处理消息的业务逻辑要实现为幂等的。
- 在同一个事务中,将消费的 offset 和生产的消息一起提交,使其成为一个原子操作。
- 处理错误和异常情况时,确保事务能够被适当地中止。
注意事项:
- 实现 Kafka 的精确一次语义需要仔细设计系统和错误处理策略。
- 幂等性生产者和事务对生产者的性能有一定影响,因为它们需要额外的协调和状态管理。
- 也需要考虑消费者的消费速率和业务逻辑处理时间,以避免消费者成为性能瓶颈。
正确实现了精确一次消费之后,系统应该能够在出现故障后恢复,而不会造成消息的丢失或重复消费。不过要注意,在分布式系统中完全的精确一次语义是非常难以保证的,通常需要结合业务逻辑上的幂等性来设计系统。
Pull 模式与Push 模式
Kafka 使用的是 Pull 模式。最初,在设计 Kafka 时,考虑了消息是由消费者主动拉取还是由 broker 推送的问题。在这方面,Kafka 采用了一种传统的设计,即生产者(producer)将消息推送到 broker,而消费者(consumer)从 broker 拉取消息。
一些其他消息系统采用了推送(Push)模式,将消息主动推送给下游的消费者。然而,Push 模式存在一些问题,例如 broker 推送速率大于消费者消费速率时,容易导致消费者崩溃。为了让消费者以最大速率和最高效率消费消息,Kafka 选择了传统的拉取(Pull)模式。
在 Pull 模式下,消费者可以自主决定是否以批量方式从 broker 拉取数据。这种自主决定的优势在于,消费者可以根据自身的消费能力灵活调整拉取策略,避免推送速率过高或过低的问题。此外,Pull 模式下,消费者可以选择阻塞直到新消息到达,从而有效地避免了不断轮询的情况。
总体而言,Kafka 的 Pull 模式为消费者提供了更大的灵活性和可控性,使其能够根据实际需求进行自主调整。
Offset 作用
在 Kafka 中,可以通过 offset 寻找数据的过程如下:
- 找到对应的 Segment:
- 使用二分法找到小于目标 offset 的 segment。Segment 是 Kafka 中数据的存储单元,每个 Segment 包含一个. Log 文件和一个. Index 文件,文件名中包含了该 Segment 覆盖的 offset 范围。
- 计算消息在 Segment 中的偏移量:
- 目标 offset 减去 Segment 覆盖的起始 offset,得到消息在这个 Segment 中的相对偏移量。
- 在 Index 文件中查找索引:
- 使用二分法在 Index 文件中找到对应的索引项。Index 文件包含了每个消息在 Segment 中的起始位置和对应的消息 offset。
- 在 Log 文件中顺序查找消息:
- 根据找到的索引项,到对应的 Log 文件中,从偏移量的位置开始,按照 Kafka 存储消息的格式顺序查找,直到找到目标 offset 对应的消息。
在这个过程中,Kafka 是按照 Log 文件的存储格式来判断一条消息是否结束的。这个过程保证了可以通过 offset 快速定位到消息的位置,实现了高效的消息检索。需要注意的是,由于 Kafka 的消息是以批次(Batch)的形式进行存储,因此在 Log 文件中可能会存在多条消息,需要按照消息的格式逐条解析以找到目标消息。