Broker Kafka集群包含一个或多个服务器,这种服务器被称为broker Topic 每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处) Partition parition是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件 Producer 负责发布消息到Kafka broker Consumer 消费消息。每个consumer属于一个特定的consumer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。
使用librdkafka开发一个producer的步骤: librdkafka: 1. conf 设置 kafka conf: rd_kafka_conf_new(): rd_kafka_conf_set() topic conf: rd_kafka_topic_conf_new(): rd_kafka_topic_conf_set()
设置conf回调,消息发送成功或者失败都会调用 rd_kafka_conf_set_dr_cb() rd_kafka_conf_set_dr_msg_cb()
创建kafka rd_kafka_new() 设置系统日志 rd_kafka_set_logger() rd_kafka_set_log_level() 添加下游brokers: rd_kafka_brokers_add()
创建新的topic rd_kafka_topic_new()
producer: rd_kafka_produce() 发送后,设置时间观察,第二个参数是阻塞等待时间,一般设置为0,rd_kafka_poll()
销毁操作 rd_kafka_topic_destroy() rd_kafka_destroy() rd_kafka_wait_destroyed(2000)
一些数据结构的解释
- Brokers
librdkafka 只需要一份最初的brokers列表(至少包含一个broker)。它将连接所有”metadata.broker.list”或者是rd_kafka_brokers_add()函数添加的brokers,然后向每个brokers申请一些元数据信息:包含brokers的完整列表、topic、partitions以及它们在Kafka 集群中的leaders broker信息。
Brokers名字的形式为:host:port; 其中port是可选的,默认是9092,host是任何一个可以解析的hostname或者ipv4或者ipv6地址。如果host是多个地址,librdkafka将会在每一次连接尝试中循环连接这些地址。包含所有broker 地址的DNS记录可以用来提供可靠的bootstrap broker。
|