加入收藏 | 设为首页 | 会员中心 | 我要投稿 厦门网 (https://www.xiamenwang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列(15) - DataStream Connectors之Kafka

发布时间:2019-01-19 08:43:05 所属栏目:教程 来源:孙金城
导读:一、聊什么 为了满足本系列读者的需求,我先介绍一下Kafka在Apache Flink中的使用。所以本篇以一个简单的示例,向大家介绍在Apache Flink中如何使用Kafka。 二、Kafka 简介 Apache Kafka是一个分布式发布-订阅消息传递系统。 它最初由LinkedIn公司开发,Li

除了看日志,我们可以用命令显示的查询我们是否成功的创建了flink-topic,如下:

  1. jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --list --zookeeper localhost:2181 
  2.  
  3. flink-tipic 

如果输出flink-tipic那么说明我们的Topic成功创建了。

那么Topic是保存在哪里?Kafka是怎样进行消息的发布和订阅的呢?为了直观,我们看如下Kafka架构示意图简单理解一下:

Apache Flink 漫谈系列(15) - DataStream Connectors之Kafka

简单介绍一下,Kafka利用ZooKeeper来存储集群信息,也就是上面我们启动的Kafka Server 实例,一个集群中可以有多个Kafka Server 实例,Kafka Server叫做Broker,我们创建的Topic可以在一个或多个Broker中。Kafka利用Push模式发送消息,利用Pull方式拉取消息。

3. 发送消息

如何向已经存在的Topic中发送消息呢,当然我们可以API的方式编写代码发送消息。同时,还可以利用命令方式来便捷的发送消息,如下:

  1. jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flink-topic 
  2. >Kafka test msg 
  3. >Kafka connector 

上面我们发送了两条消息Kafka test msg 和 Kafka connector 到 flink-topic Topic中。

4. 读取消息

如果读取指定Topic的消息呢?同样可以API和命令两种方式都可以完成,我们以命令方式读取flink-topic的消息,如下:

  1. jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink-topic --from-beginning 
  2. Kafka test msg 
  3. Kafka connector 

其中--from-beginning 描述了我们从Topic开始位置读取消息。

三、Flink Kafka Connector

前面我们以最简单的方式安装了Kafka环境,那么我们以上面的环境介绍Flink Kafka Connector的使用。Flink Connector相关的基础知识会在《Apache Flink 漫谈系列(14) - Connectors》中介绍,这里我们直接介绍与Kafka Connector相关的内容。

Apache Flink 中提供了多个版本的Kafka Connector,本篇以flink-1.7.0版本为例进行介绍。

1. mvn 依赖

要使用Kakfa Connector需要在我们的pom中增加对Kafka Connector的依赖,如下:

  1. <dependency> 
  2. <groupId>org.apache.flink</groupId> 
  3. <artifactId>flink-connector-kafka_2.11</artifactId> 
  4. <version>1.7.0</version> 
  5. </dependency> 

Flink Kafka Consumer需要知道如何将Kafka中的二进制数据转换为Java / Scala对象。 DeserializationSchema允许用户指定这样的模式。 为每个Kafka消息调用 T deserialize(byte [] message)方法,从Kafka传递值。

2. Examples

我们示例读取Kafka的数据,再将数据做简单处理之后写入到Kafka中。我们需要再创建一个用于写入的Topic,如下:

  1. bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic-output 

所以示例中我们Source利用flink-topic, Sink用slink-topic-output。

(1) Simple ETL

我们假设Kafka中存储的就是一个简单的字符串,所以我们需要一个用于对字符串进行serialize和deserialize的实现,也就是我们要定义一个实现DeserializationSchema和SerializationSchema 的序列化和反序列化的类。因为我们示例中是字符串,所以我们自定义一个KafkaMsgSchema实现类,然后在编写Flink主程序。

  • KafkaMsgSchema - 完整代码
    1. import org.apache.flink.api.common.serialization.DeserializationSchema; 
    2. import org.apache.flink.api.common.serialization.SerializationSchema; 
    3. import org.apache.flink.api.common.typeinfo.BasicTypeInfo; 
    4. import org.apache.flink.api.common.typeinfo.TypeInformation; 
    5. import org.apache.flink.util.Preconditions; 
    6.  
    7. import java.io.IOException; 
    8. import java.io.ObjectInputStream; 
    9. import java.io.ObjectOutputStream; 
    10. import java.nio.charset.Charset; 
    11.  
    12. public class KafkaMsgSchema implements DeserializationSchema<String>, SerializationSchema<String> { 
    13.     private static final long serialVersionUID = 1L; 
    14.     private transient Charset charset; 
    15.  
    16.     public KafkaMsgSchema() { 
    17. // 默认UTF-8编码 
    18.         this(Charset.forName("UTF-8")); 
    19.     } 
    20.  
    21.     public KafkaMsgSchema(Charset charset) { 
    22.         this.charset = Preconditions.checkNotNull(charset); 
    23.     } 
    24.  
    25.     public Charset getCharset() { 
    26.         return this.charset; 
    27.     } 
    28.  
    29.     public String deserialize(byte[] message) { 
    30. // 将Kafka的消息反序列化为java对象 
    31.         return new String(message, charset); 
    32.     } 
    33.  
    34.     public boolean isEndOfStream(String nextElement) { 
    35. // 流永远不结束 
    36.         return false; 
    37.     } 
    38.  
    39.     public byte[] serialize(String element) { 
    40. // 将java对象序列化为Kafka的消息 
    41.         return element.getBytes(this.charset); 
    42.     } 
    43.  
    44.     public TypeInformation<String> getProducedType() { 
    45. // 定义产生的数据Typeinfo 
    46.         return BasicTypeInfo.STRING_TYPE_INFO; 
    47.     } 
    48.  
    49.     private void writeObject(ObjectOutputStream out) throws IOException { 
    50.         out.defaultWriteObject(); 
    51.         out.writeUTF(this.charset.name()); 
    52.     } 
    53.  
    54.     private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { 
    55.         in.defaultReadObject(); 
    56.         String charsetName = in.readUTF(); 
    57.         this.charset = Charset.forName(charsetName); 
    58.     } 
  • 主程序 - 完整代码
    1. import org.apache.flink.api.common.functions.MapFunction; 
    2. import org.apache.flink.api.java.utils.ParameterTool; 
    3. import org.apache.flink.streaming.api.datastream.DataStream; 
    4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
    5. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; 
    6. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; 
    7. import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; 
    8.  
    9. import java.util.Properties; 
    10.  
    11. public class KafkaExample { 
    12.     public static void main(String[] args) throws Exception { 
    13.         // 用户参数获取 
    14.         final ParameterTool parameterTool = ParameterTool.fromArgs(args); 
    15.         // Stream 环境 
    16.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    17.  
    18.         // Source的topic 
    19.         String sourceTopic = "flink-topic"; 
    20.         // Sink的topic 
    21.         String sinkTopic = "flink-topic-output"; 
    22.         // broker 地址 
    23.         String broker = "localhost:9092"; 
    24.  
    25.         // 属性参数 - 实际投产可以在命令行传入 
    26.         Properties p = parameterTool.getProperties(); 
    27.         p.putAll(parameterTool.getProperties()); 
    28.         p.put("bootstrap.servers", broker); 
    29.  
    30.         env.getConfig().setGlobalJobParameters(parameterTool); 
    31.  
    32.         // 创建消费者 
    33.         FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>( 
    34.                 sourceTopic, 
    35.                 new KafkaMsgSchema(), 
    36.                 p); 
    37.         // 设置读取最早的数据 
    38. //        consumer.setStartFromEarliest(); 
    39.  
    40.         // 读取Kafka消息 
    41.         DataStream<String> input = env.addSource(consumer); 
    42.  
    43.  
    44.         // 数据处理 
    45.         DataStream<String> result = input.map(new MapFunction<String, String>() { 
    46.             public String map(String s) throws Exception { 
    47.                 String msg = "Flink study ".concat(s); 
    48.                 System.out.println(msg); 
    49.                 return msg; 
    50.             } 
    51.         }); 
    52.  
    53.         // 创建生产者 
    54.         FlinkKafkaProducer producer = new FlinkKafkaProducer<String>( 
    55.                 sinkTopic, 
    56.                 new KeyedSerializationSchemaWrapper<String>(new KafkaMsgSchema()), 
    57.                 p, 
    58.                 FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); 
    59.  
    60.         // 将数据写入Kafka指定Topic中 
    61.         result.addSink(producer); 
    62.  
    63.         // 执行job 
    64.         env.execute("Kafka Example"); 
    65.     } 

(编辑:厦门网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读