Kafka生产者配置是确保消息可靠、高效传输的关键。以下是一些重要的注意事项和建议:
基本配置
- bootstrap.servers:指定连接Kafka集群的broker地址,建议配置多个broker以确保高可用性。
- key.serializer 和 value.serializer:指定消息key和value的序列化类型,必须写全类名。
可靠性与性能
- acks:控制消息的持久性和可靠性。常用配置包括
acks=1
(仅等待主分区确认)和acks=all
(等待所有ISR副本分区确认)。acks=0
时延最小但风险最大。 - retries:设置消息发送失败时的重试次数,建议根据业务需求设置合理的重试次数。
- max.in.flight.requests.per.connection:设置每个连接允许的最大未确认请求数,用于控制消息的传输延迟和吞吐量。为了保证消息顺序,可以设置为1,但这会降低吞吐量。
- batch.size:设置批量发送消息的大小,默认16KB。增大此值可以提高吞吐量。
- linger.ms:设置消息发送的延迟时间,以毫秒为单位。增大此值可以提高吞吐量。
其他重要配置
- compression.type:设置消息压缩的类型,如
gzip
、snappy
等。压缩可以减少网络传输量,提高性能。 - buffer.memory:设置生产者缓存消息的总体大小,避免内存溢出。
顺序性保证
- Kafka可以保证同一个分区内的消息是有序的。为了保证消息顺序,应设置
retries=0
和max.in.flight.requests.per.connection=1
。
代码示例
以下是一个简单的Kafka生产者配置示例:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
。。。。。