kafka connect
如何更改偏移量中的位置?Debezium Oracle 连接器在偏移量中维护两个关键值,一个名为 scn 的字段 和另一个名为commit_scn的字段。该scn字段是一个字符串,表示连接器在捕获更改时使用的低水位线起始位置。
找出包含连接器偏移量的主题的名称。这是根据设置为配置 offset.storage.topic
属性的值进行配置的。
找出连接器的最后一个偏移量、存储它的键并确定用于存储偏移量的分区。kafkacat这可以使用Kafka 代理安装提供的实用程序脚本来完成。一个示例可能如下所示:
kafkacat -b localhost:9092 -C -t my_connect_offsets -f 'Partition(%p) %k %s\n'
执行结果:
Partition(11) ["inventory-connector",{"server":"server1"}] {"scn":"324567897", "commit_scn":"324567897: 0x2832343233323:1"}
注: macOS可以使用
brew install kafkacat
安装
只有数据存在变更,才会在offset.storage.topic
设置的topic中记录offset, 如果数据长时间没有变更,记录的offset可能是很久之前的,出错重启的话就会出现问题,会出现找不到给定SCN偏移量的异常。
echo '["inventory-connector",{"server":"server1"}]|{"scn":"3245675000","commit_scn":"324567500"}' | \
kafkacat -P -b localhost -t my_connect_offsets -K \| -p 11
重启对应的task,就可以从设置的新的低水位线开始消费了。
Debezium 连接器在连接器偏移量中保持低水位线和高水位线 SCN 值。低水位线 SCN 表示起始位置,并且必须存在于可用的联机重做或归档日志中,以便连接器成功启动。当连接器报告找不到此偏移 SCN 时,这表明仍然可用的日志不包含 SCN,因此连接器无法从它停止的地方挖掘更改。
发生这种情况时,有两种选择。第一种是删除连接器的历史主题和偏移量并重新启动连接器,按照建议拍摄新快照。这将保证任何主题消费者都不会发生数据丢失。第二种是手动操作偏移量,将 SCN 推进到重做或存档日志中可用的位置。这将导致旧 SCN 值和新提供的 SCN 值之间发生的更改丢失,并且不会写入主题。不推荐这样做。