【Flink】事务型Sink连接器、kafka连接器、写入到redis、mysql
迪丽瓦拉
2024-03-18 14:09:56
0

文章目录

  • 一 事务性 sink 连接器
    • 1 预写式日志接口
    • 2 两阶段提交接口
  • 二 kafka连接器
    • 0 导入依赖
    • 1 写入kafka
    • 2 读取kafka
  • 三 写入redis
    • 0 导入依赖
    • 1 代码编写
  • 四 写入MySQL
    • 1 自定义向MySQL中写入数据

一 事务性 sink 连接器

事务写入 sink 连接器需要和 Flink 的检查点机制集成,因为只有在检查点成功完成以后,事务写入 sink 连接器才会向外部系统 commit 数据。

为了简化事务性 sink 的实现,Flink 提供了两个模版用来实现自定义 sink 运算符。这两个模版都实现了 CheckpointListener 接口。CheckpointListener 接口将会从作业管理器接收到检查点完成的通知。

GenericWriteAheadSink 模版会收集检查点之前的所有的数据,并将数据存储到 sink任务的运算符状态中。状态保存到了检查点中,并在任务故障的情况下恢复。当任务接收到检查点完成的通知时,任务会将所有的数据写入到外部系统中。

TwoPhaseCommitSinkFunction 模版利用了外部系统的事务特性。对于每一个检查点,任务首先开始一个新的事务,并将接下来所有的数据都写到外部系统的当前事务上下文中去。当任务接收到检查点完成的通知时,sink 连接器将会 commit 这个事务。

1 预写式日志接口

GenericWriteAheadSink 使得 sink 运算符可以很方便的实现。这个运算符和 Flink 的检查点机制集成使用,目标是将每一条数据恰好一次写入到外部系统中去。需要注意的是,在发生故障的情况下,write-ahead log sink 可能会不止一次的发送相同的数据。所以GenericWriteAheadSink 无法提供完美无缺的恰好处理一次语义的一致性保证,而是仅能提供 at-least-once 这样的保证。

GenericWriteAheadSink 的原理是将接收到的所有数据都追加到有检查点分割好的预写式日志中去。每当 sink 运算符碰到检查点屏障,运算符将会开辟一个新的 section,并将接下来的所有数据都追加到新的 section 中去。WAL(预写式日志)将会保存到运算符状态中。由于 log 能被恢复,所有不会有数据丢失。

当 GenericWriteAheadSink 接收到检查点完成的通知时,将会发送对应检查点的 WAL中存储的所有数据。当所有数据发送成功,对应的检查点必须在内部提交。

检查点的提交分两步。第一步,sink 持久化检查点被提交的信息。第二步,删除 WAL中所有的数据。我们不能将 commit 信息保存在 Flink 应用程序状态中,因为状态不是持久化的,会在故障恢复时重置状态。相反,GenericWriteAheadSink 依赖于可插拔的组件在一个外部持久化存储中存储和查找提交信息。这个组件就是 CheckpointCommitter。

继承 GenericWriteAheadSink 的运算符需要提供三个构造器函数。

  • CheckpointCommitter
  • TypeSerializer,用来序列化输入数据。
  • 一个 job ID,传给 CheckpointCommitter,当应用重启时可以识别 commit 信息。

还有,write-ahead 运算符需要实现一个单独的方法:

boolean sendValues(Iterable values, long chkpntId, long timestamp)

当检查点完成时,GenericWriteAheadSink 调用 sendValues() 方法来将数据写入到外部存储系统中。这个方法接收一个检查点对应的所有数据的迭代器,检查点的 ID,检查点被处理时的时间戳。当数据写入成功时,方法必须返回 true,写入失败返回 false。

GenericWriteAheadSink 无法提供完美的 exactly-once 保证。有两个故障状况会导致数据可能被发送不止一次。

  • 当任务执行 sendValues() 方法时,程序挂掉了。如果外部系统无法原子性的写入所有数据(要么都写入要么都不写),一些数据可能会写入,而另一些数据并没有被写入。由于 checkpoint 还没有 commit,所以在任务恢复的过程中一些数据可能会被再次写入。
  • 所有数据都写入成功了,sendValues() 方法也返回 true 了;但在 CheckpointCommitter方法被调用之前程序挂了,或者 CheckpointCommitter 在 commit 检查点时失败了。那么在恢复的过程中,所有未被提交的检查点将会被重新写入。

2 两阶段提交接口

Flink 提供了 TwoPhaseCommitSinkFunction 接口来简化 sink 函数的实现。这个接口保证了端到端的 exactly-once 语义。2PC sink 函数是否提供这样的一致性保证取决于我们的实现细节。这里就需要讨论一个问题:“2PC 协议是否开销太大?”

通常来讲,为了保证分布式系统的一致性,2PC 是一个非常昂贵的方法。尽管如此,在 Flink 的语境下,2PC 协议针对每一个检查点只运行一次。TwoPhaseCommitSinkFunction和 WAL sink 很相似,不同点在于前者不会将数据收集到 state 中,而是会写入到外部系统事务的上下文中。

TwoPhaseCommitSinkFunction 实现了以下协议。在 sink 任务发送出第一条数据之前,任务将在外部系统中开始一个事务,所有接下来的数据将被写入这个事务的上下文中。当作业管理器初始化检查点并将检查点屏障插入到流中的时候,2PC 协议的投票阶段开始。当运算符接收到检查点屏障,运算符将保存它的状态,当保存完成时,运算符将发送
一个 acknowledgement 信息给作业管理器。当 sink 任务接收到检查点屏障时,运算符将会持久化它的状态,并准备提交当前的事务,以及 acknowledge JobManager 中的检查点。发送给作业管理器的 acknowledgement 信息类似于 2PC 协议中的 commit 投票。sink 任务还不能提交事务,因为它还没有保证所有的任务都已经完成了它们的检查点操作。sink
任务也会为下一个检查点屏障之前的所有数据开始一个新的事务。

当作业管理器成功接收到所有任务实例发出的检查点操作成功的通知时,作业管理器将会把检查点完成的通知发送给所有感兴趣的任务。这里的通知对应于 2PC 协议的提交命令。当 sink 任务接收到通知时,它将 commit 所有处于开启状态的事务。一旦 sink 任务 acknowledge 了检查点操作,它必须能够 commit 对应的事务,即使任务发生故障。如
果 commit 失败,数据将会丢失。

外部系统需要满足的要求:

  • 外部系统必须提供事务支持,或者 sink 的实现能在外部系统上模拟事务功能。
  • 在检查点操作期间,事务必须处于 open 状态,并接收这段时间数据的持续写入。
  • 事务必须等到检查点操作完成的通知到来才可以提交。在恢复周期中,可能需要一段时间等待。如果 sink 系统关闭了事务(例如超时了),那么未被 commit 的数据将会丢失。
  • sink 必须在进程挂掉后能够恢复事务。一些 sink 系统会提供事务 ID,用来 commit或者 abort 一个开始的事务。
  • commit 一个事务必须是一个幂等性操作。sink 系统或者外部系统能够观察到事务已经被提交,或者重复提交并没有副作用。

二 kafka连接器

0 导入依赖

org.apache.flinkflink-connector-kafka_${scala.binary.version}${flink.version}

使用flink + kafka完成每一个窗口的热门商品。

1 写入kafka

使用生产者将数据写入kafka

public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);Properties properties = new Properties();properties.put("bootstrap.servers","hadoop101:9092");env.readTextFile("E:\\develop\\MyWork\\flink2022tutorial\\src\\main\\resources\\UserBehavior.csv").addSink(new FlinkKafkaProducer(// topic名字"user-behavior",new SimpleStringSchema(),properties));
}

2 读取kafka

public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);Properties properties = new Properties();properties.setProperty("bootstrap.servers", "hadoop101:9092");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");env.addSource(new FlinkKafkaConsumer("user-behavior",new SimpleStringSchema(),properties)).map(new MapFunction() {@Overridepublic Example7.UserBehavior map(String value) throws Exception {String[] arr = value.split(",");return new Example7.UserBehavior(arr[0],arr[1],arr[2],arr[3],Long.parseLong(arr[4]) * 1000L);}}).filter(r -> r.behavior.equals("pv")).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Example7.UserBehavior element, long recordTimestamp) {return element.timestamp;}})).keyBy(r -> true).window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(5))).process(new ProcessWindowFunction() {@Overridepublic void process(Boolean aBoolean, Context context, Iterable elements, Collector out) throws Exception {HashMap hashMap = new HashMap<>();for (Example7.UserBehavior element : elements) {if(hashMap.containsKey(element.itemId)){hashMap.put(element.itemId,hashMap.get(element.itemId) + 1L);}else {hashMap.put(element.itemId,1L);}}ArrayList> arrayList = new ArrayList<>();for (String key : hashMap.keySet()) {arrayList.add(Tuple2.of(key,hashMap.get(key)));}arrayList.sort(new Comparator>() {@Overridepublic int compare(Tuple2 o1, Tuple2 o2) {return o1.f1.intValue() - o2.f1.intValue();}});StringBuilder result = new StringBuilder();result.append("============================\n").append("窗口:" + new Timestamp(context.window().getStart()) + "~" + new Timestamp(context.window().getEnd())).append("\n");for(int i = 0; i < 3; i++){result.append("第" + (i + 1) + "名的商品ID是:" + arrayList.get(i).f0).append(";浏览次数是:" + arrayList.get(i).f1).append("\n");}out.collect(result.toString());}}).print();env.execute();
}

先打开消费者,然后启动生产者,观察消费者是否可以正确消费到数据。

三 写入redis

0 导入依赖

org.apache.bahirflink-connector-redis_2.111.0

将此条配置放到依赖项的最后面。

1 代码编写

public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource> stream = env.fromElements(Tuple2.of("key", 1), Tuple2.of("key", 2));FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop101").build();// 幂等性写入,最终结果为key,2stream.addSink(new RedisSink>(conf,new MyRedisMapper()));env.execute();
}public static class MyRedisMapper implements RedisMapper>{@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET,"tuple");}@Overridepublic String getKeyFromData(Tuple2 stringIntegerTuple2) {return stringIntegerTuple2.f0;}@Overridepublic String getValueFromData(Tuple2 stringIntegerTuple2) {return stringIntegerTuple2.f1.toString();}
}

四 写入MySQL

1 自定义向MySQL中写入数据

public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.fromElements(Tuple2.of("key", 1), Tuple2.of("key", 2)).addSink(new RichSinkFunction>() {private Connection conn;private PreparedStatement insertStmt;private PreparedStatement updateStmt;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);conn = DriverManager.getConnection("jdbc:mysql://hadoop101:3306/test","root","123456");insertStmt = conn.prepareStatement("INSERT INTO tuple (k, v) VALUES (?, ?)");updateStmt = conn.prepareStatement("UPDATE tuple SET v = ? WHERE k = ?");}// 实现幂等性写入@Overridepublic void invoke(Tuple2 value, Context context) throws Exception {super.invoke(value,context);updateStmt.setInt(1, value.f1);updateStmt.setString(2, value.f0);updateStmt.execute();if (updateStmt.getUpdateCount() == 0) {insertStmt.setString(1, value.f0);insertStmt.setInt(2, value.f1);insertStmt.execute();}}@Overridepublic void close() throws Exception {super.close();insertStmt.close();updateStmt.close();conn.close();}});env.execute();
}

相关内容