上一章 Kafka专题 第二篇:安装环境 我们已经配置好了开发环境,这一章来介绍Kafka主题。
本专题涉及的代码都基于Golang语言编写,后续有机会会考虑加入Java。
一、创建主题
Kafka消息的生产和消费都是针对主题的,所以我们先来创建需要的主题。
- Confluent Cloud
- 登录管理页面,点击创建主题
Confluent Cloud可以很简单在管理页面进行主题创建。
主题可以自定义,其他的可以先默认,包括分区数(Partitions)为6和复制系数(replication.factor)为3。我们晚点再来解释这几个参数。
- 本地jar包
- 本地jar包则可通过以下命令来创建:
$ cd kafka_2.13-3.2.1 $ bin/kafka-topics.sh --bootstrap-server localhost:9092 \ --create --topic my-topic \ --partitions 6 --replication-factor 3
- Offset Explorer 2客户端
我们上一节已经通过Offset Explorer 2客户端连接到Kafka集群了。也同样可以在客户端上进行主题创建:
除了上述几种方式,当然还可以直接编写代码来作为客户端访问Kafka集群。
接下来的系列文章都将只展示代码示例,其他方式如果需要可自行摸索。
首先,在Go中已经有开源的Kafka客户端框架,主流的是下面几个:
ㅤ | 优点 | 缺点 |
作为Kafka主要贡献和维护机构,框架质量有保障,文档和功能也最齐全。 | 需要依赖调用C语言库,源码查看起来也会比较费劲。 | |
go语言原生,功能支持齐全,有兼容和稳定性保证。 | 文档欠缺,而且源码几乎没有注释。 | |
go语言原生,文档相对较好,使用context上下文。 | 目前只支持到2.7.1版本,且不支持Admin Client |
综上,很可惜,三个主流框架各有优缺点(也许我们可以自己写一个🤔),接下来我们会选择confluent-kafka-go来做示例。
package main import ( "context" "github.com/confluentinc/confluent-kafka-go/kafka" "log" ) func main() { // 1.创建AdminClient并连接Broker代理 client, err := kafka.NewAdminClient(&kafka.ConfigMap{ "bootstrap.servers": "localhost:9092,localhost:9093", "allow.auto.create.topics": false, }) if err != nil { log.Fatalf("error creating admin client: %v", err) } defer client.Close() // 2.检查主题是否存在 // 2.1方式一:获取所有主题列表再遍历筛选 topic := "my-topic" metadata, err := client.GetMetadata(nil, true, 5000) if err != nil { log.Fatalf("error getting all topic list: %v", err) } for _, t := range metadata.Topics { if t.Topic == topic { log.Fatalf("topic(%s) already exists", topic) } } // 2.2方式二:获取指定主题的元数据来判断 metadata, err = client.GetMetadata(&topic, false, 5000) if err != nil { log.Fatalf("error getting metadata of topic(%s): %v", topic, err) } exists := metadata.Topics[topic].Error.Code() == kafka.ErrUnknownTopicOrPart if exists { log.Fatalf("topic(%s) already exists", topic) } // 3.创建主题 ctx := context.Background() result, err := client.CreateTopics(ctx, []kafka.TopicSpecification{ { Topic: topic, NumPartitions: 6, ReplicationFactor: 3, }, }) if err != nil { log.Fatalf("error creating topic(%s): %v", topic, err) } log.Printf("succeed creating topic(%s): %v", topic, result) }
- 创建主题
- 主题的参数
- 最佳实践
三、删除主题
四、其他配置
不建议自动创建主题:
Sending a Message to Kafka 例如,如果序列化消息失败,则可能是 SerializationException,如果缓冲区已满,则可能是 Buffer ExhaustedException 或 TimeoutException,如果发送线程被中断,则可能是 InterruptException
KafkaProducer 有两种类型的错误。可重试错误是可以通过再次发送消息来解决的错误。例如,可以解决连接错误,因为连接可能会重新建立。当为分区选举新的领导者并刷新客户端元数据时,可以解决“不是分区的领导者”错误。 KafkaProducer 可以配置为自动重试这些错误,因此只有在重试次数用尽且错误未解决时,应用程序代码才会出现可重试异常。重试无法解决某些错误,例如“消息大小太大”。在这些情况下,KafkaProducer 不会尝试重试并会立即返回异常
这些回调在生产者的主线程中执行。 这保证了当我们向同一个分区相继发送两个消息时,它们的回调将按照我们发送它们的顺序执行。但这也意味着回调应该是合理的快速,以避免延迟生产者和阻止其他消息的发送。我们不建议在回调中执行阻塞性操作。 相反,你应该使用另一个线程来同时执行任何阻塞性操作/
client.id是客户端和它所使用的应用程序的逻辑标识符。 它可以是任何字符串,并将被代理(Broker)用来识别从客户端发送的消息。它在日志和指标以及配额中使用。选择一个好的客户端名称将使故障排除更容易--它是 "我们看到来自IP 104.27.155.134的认证失败率很高 "和 "看起来订单验证服务认证失败了--你能让Laura看一下吗?"之间的区别。
acks参数控制了在生产者认为写入成功之前,必须有多少个分区副本收到该记录。默认情况下,Kafka会在leader收到记录后回应说记录被成功写入(Apache Kafka的3.0版本预计会改变这个默认值)。这个选项对写入的消息的持续时间有很大影响,根据你的使用情况,默认值可能不是最好的选择。 第7章深入讨论了Kafka的可靠性保证,但现在让我们回顾一下acks参数的三个允许值
您可以将传递超时配置为您希望等待消息发送的最长时间,通常为几分钟,然后保留默认的重试次数(实际上是无限的)。使用此配置,生产者将继续重试,只要有时间继续尝试(或直到成功)。这是考虑重试的一种更合理的方式。我们调整重试的正常过程是:“在代理崩溃的情况下,领导者选举通常需要 30 秒才能完成,所以为了安全起见,让我们继续重试 120 秒。”无需将此心理对话转换为重试次数和重试间隔时间,只需将 Deliver.timeout.ms 配置为 120。