Kafka/KafkaConnector

Debezium - Debezium Connector

희쨔응 2022. 12. 22. 09:13

Debezium Source Connector

  • Debezium은 Source Connector 역할
  • 각 DB 별 Debezium Connector는 별도(for MySQL, for PGSQL, for DB2 등)
  • jar 파일(Java Class library)로 JDK 설치 필요
  • Connector에서 Topic 생성 시 다른 Sink Connector 들이 읽을 수 있도록 공용 메타 데이터로 생성 
  • Debezium 홈페이지 : https://debezium.io/

 

Kafka Plugins 추가

[root@localhost /]# wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.7.Final/debezium-connector-mysql-1.9.7.Final-plugin.tar.gz  // debezium 다운로드

[root@localhost /]# tar -xf debezium-connector-mysql-1.9.7.Final-plugin.tar.gz -C /kafka_2.12-3.3.1/plugins/

[root@localhost /]# ls -al /kafka_2.12-3.3.1/plugins/

drwxr-xr-x. 2 root root 4096 Oct 25 20:42 debezium-connector-mysql

/////////// plugins 폴더 밑에 폴더 형식으로 있어야 Kafka에서 인식 //////////// 



[root@localhost kafka_2.12-3.3.1]# vi config/config/connect-distributed.properties                 // kafka의 여러 개의 Plugin 사용을 위한 설정 파일

...

18 # This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended
19 # to be used with the examples, and some settings may differ from those used in a production system, especially
20 # the `bootstrap.servers` and those specifying replication factors.
21
22 # A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
23 bootstrap.servers=kafka_IP:9092
24

...

33 # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
34 # it to
35 key.converter.schemas.enable=false                      // 토픽 저장 시 메타 데이터 포맷 에러 발생, 문제 해결 중
36 value.converter.schemas.enable=false
37

...

78
79 # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
80 # (connectors, converters, transformations). The list should consist of top level directories that include
81 # any combination of:
82 # a) directories immediately containing jars with plugins and their dependencies
83 # b) uber-jars with plugins and their dependencies
84 # c) directories immediately containing the package directory structure of classes of plugins and their dependencies
85 # Examples:
86 plugin.path=/kafka_2.12-3.3.1/plugins                        // Kafka connector lib 경로



[root@localhost kafka_2.12-3.3.1]# bin/connect-distributed.sh config/connect-distributed.properties &

...

[2022-11-07 02:09:14,818] INFO Added plugin 'io.debezium.connector.mysql.MySqlConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:172)
[2022-11-07 02:09:14,818] INFO Added plugin 'io.debezium.converters.ByteArrayConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:172)

...



[root@localhost kafka_2.12-3.3.1]# curl --request GET localhost:8083/connector-plugins     // kafka에 추가된 Plugins 확인
{"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.9.7.Final"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.1.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.1.0"}]



[root@localhost kafka_2.12-3.3.1]# curl --request GET localhost:8083/connectors        // kafka에 연결된 Connector 목록

[]                                                                                                     // 추가 해야지 list 표출

 

Debezium Connector 추가 예제

[root@localhost kafka_2.12-3.3.1]# curl --request POST localhost:8083/connectors -H

'Content-Type: application/json' -d

'{

    "name" : "deb-source-connector",

    "config" :
    {
        "connector.class" : "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max" : "1",
        "database.hostname" : "192.168.56.107",
        "database.port" : "3306",
        "database.user" : "cdc",
        "database.password" : "cdc",
        "database.server.id" : "1",
        "database.server.name" : "dbserver2",
        "database.allowPublicKeyRetrieval" : "true",
        "database.include.list" : "test",
        "database.history.kafka.bootstrap.servers" : "192.168.56.109:9092",
        "database.history.kafka.topic" : "histroy.test",
        "converters" : "datetime",
        "datetime.type" : "com.darcytech.debezium.converter.MySqlDateTimeConverter",
        "datetime.format.date" : "yyyy-MM-dd",
        "datetime.format.time" : "HH:mm:ss",
        "datetime.format.datetime" : "yyyy-MM-dd HH:mm:ss",
        "datetime.format.timestamp" : "yyy-MM-dd HH:mm:ss",
        "datetime.format.timestamp.zone" : "UTC+9",
        "key.converter" : "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable" : "true",
        "value.converter" : "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable" : "true",
        "transforms" : "unwrap, Reroute",
        "transforms.unwrap.type" : "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.delete.handling.mode" : "rewrite",
        "transfroms.unwrap.drop.tombstones" : "false",
        "transforms.unwrap.drop.deletes" : "false",
        "time.precision.mode" : "connect",
        "transforms.Reroute.type" : "io.debezium.transforms.ByLogicalTableRouter",
        "transforms.Reroute.topic.regex" : "([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.Reroute.topic.replacement" : "$1.$2.$3"
    }

}'



[root@localhost kafka_2.12-3.3.1]# curl --request GET localhost:8083/connectors        // kafka에 연결된 Connector 목록

["deb-source-connector"]

[root@localhost kafka_2.12-3.3.1]# bin/kafka-topics.sh --list --zookeeper                     // DB와 정상 연결 시 Topic 생성됨

OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
__consumer_offsets
connect-configs
connect-offsets
connect-status
dbserver1

dbserver1.schema.table
schema-changes.test
  • connector 생성 시 connect-* topic에 관련 내용들이 저장됨
  • connector의 이름을 그대로 하여 삭제 및 추가 시 connect-* topic에 내용이 남아있어, 설정 값 변경 안됨
  • Connector 설정 파라미터 주소

Debezium Connector 설정 변경법

[root@localhost kafka_2.12-3.3.1]# curl --request PUT localhost:8083/connectors/deb-source-connector/config -H

'Content-Type: application/json' -d

'{

        "connector.class" : "io.debezium.connector.mysql.MySqlConnector",

        "tasks.max" : "1",

        "database.hostname" : "DB_IP",

        "database.port" : "DB_Port",

        "database.user" : "DB_user",

        "database.password" : "DB_user_pw",

        "database.server.id" : "DB_Server_id",

        "database.server.name" : "dbserver1",

        "database.allowPublicKeyRetrieval" : "true",

        "database.include.list" : "test",

        "database.serverTimezone" : "Asia/Seoul",

        "database.history.kafka.bootstrap.servers" : "Kafka_IP:9092",

        "database.history.kafka.topic" : "schema-changes.test",\

        "snapshot.mode" : "schema_only", 

        "key.converter" : "org.apache.kafka.connect.json.JsonConverter",

        "key.converter.schemas.enable" : "true",

        "value.converter" : "org.apache.kafka.connect.json.JsonConverter",

        "value.converter.schemas.enable" : "true",

        "transforms" : "unwrap, addTopicPrefix",

        "transforms.unwrap.type" : "io.debezium.transforms.ExtractNewRecordState",

        "transforms.addTopicPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter",        

        "transforms.addTopicPrefix.regex" : "(.*)",

        "transforms.addTopicPrefix.replacement" : "$1"

}'
  • Connector 연결에서 connectors/connector_name/config에 PUT 으로 설정 값을 다시 입력
  • 설정 값 중 "name" 과 "config"  구문 삭제 후 변경 하여 PUT
  • 변경 안되는 파라미터 값도 넣어야 함

Debezium Connector 설정 파라미터

파라미터 설명
value.converter 카프카 커넥트 포맷과 카프카에 기록한 직렬화된 포맷을 변환할 때 사용할 컨버터 클래스
value 포맷은 해당 클래스가 제어
Json 데이터 포맷 사용
transforms 데이터 스트림의 타입
snapshot.mode 커넥터 연결 시 스냅샷(토픽생성) 기준 지정 파라미터
initial : 커넥터가 처음 연결됐을 당시에만 스냅샷??
initial_only : initial + binlog 에서 변경된 이력 사용 X 
when_needed : 특정 binlog/GTID 지정 시 부터 데이터 생ㅅ어
never : Binlog에있는 기록 전체 데이터 생성
schema_only : 스키마 스냅샷 실행, 변경된 이력 제외 
schema_only_recovery : 스키마 스냅샷 + 변경된 이력이 binlog 에 있을 시 기록
key.converter 카프카 커넥트 포맷과 카프카에 기록한 직렬화된 포맷을 변환할 때 사용할 컨버터 클래스
key 포맷은 해당 클래스가 제어
Json 데이터 포맷 사용
database.serverTimezone DB 타임존 지정

KST의 경우 debezium에서 인식하지 못해, Asia/Seoul로 지정해주어야 한다.
database.server.name 커넥터에서 사용할 DB 서버 명

DB서버명.스키마.테이블 로 토픽이 생성됨
database.include.list debezium에서 데이터 변경을 확인할 schema 리스트
database.history.kafka.topic DB서버의 스키마 변경 사항
database.allowPublicKeyRetrieval MySQL 8.0 이상부터 보안을 위해 추가

클라이언트가 서버에 자동으로 공개 키를 요청하는 기능

Java 연동을 위해 "true"로 설정

 

Debezium Connector - DB 연동 테스트 예제

[root@localhost kafka_2.12-3.3.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbtest.test --from-beginning
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
{
"source" : {
"server" : "dbserver1"
},
"position" : {
"ts_sec" : 1667809343,
"file" : "mysql-bin.000002",
"pos" : 1279,
"gtids" : "093ea1d1-5e75-11ed-ae67-0800273d7f4a:1-5",
"snapshot" : true
},
"databaseName" : "",
"ddl" : "SET character_set_server=utf8mb4, collation_server=utf8mb4_general_ci",
"tableChanges" : [ ]
}



///// create database test;

///// create table tt1(IDX int not null);



[root@localhost kafka_2.12-3.3.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbtest.test --from-beginning

OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
{
"source" : {
"server" : "dbserver1"
},
"position" : {
"ts_sec" : 1667809343,
"file" : "mysql-bin.000002",
"pos" : 1279,
"gtids" : "093ea1d1-5e75-11ed-ae67-0800273d7f4a:1-5",
"snapshot" : true
},
"databaseName" : "",
"ddl" : "SET character_set_server=utf8mb4, collation_server=utf8mb4_general_ci",
"tableChanges" : [ ]
}
{
"source" : {
"server" : "dbserver1"
},
"position" : {
"transaction_id" : null,
"ts_sec" : 1667809502,
"file" : "mysql-bin.000002",
"pos" : 1464,
"gtids" : "093ea1d1-5e75-11ed-ae67-0800273d7f4a:1-5",
"server_id" : 2
},
"databaseName" : "test",
"ddl" : "create database test",
"tableChanges" : [ ]
}
{
"source" : {
"server" : "dbserver1"
},
"position" : {
"transaction_id" : null,
"ts_sec" : 1667809513,
"file" : "mysql-bin.000002",
"pos" : 1663,
"gtids" : "093ea1d1-5e75-11ed-ae67-0800273d7f4a:1-6",
"server_id" : 2
},
"databaseName" : "test",
"ddl" : "create table tt1(IDX int not null)",
"tableChanges" : [ {
"type" : "CREATE",
"id" : "\"test\".\"tt1\"",
"table" : {
"defaultCharsetName" : "utf8mb4",
"primaryKeyColumnNames" : [ ],
"columns" : [ {
"name" : "IDX",
"jdbcType" : 4,
"typeName" : "INT",
"typeExpression" : "INT",
"charsetName" : null,
"position" : 1,
"optional" : false,
"autoIncremented" : false,
"generated" : false,
"comment" : null,
"hasDefaultValue" : false,
"enumValues" : [ ]
} ]
},
"comment" : null
} ]
}



///// insert into tt1 values ('1');

///// insert into tt1 values ('2');



[root@localhost kafka_2.12-3.3.1]# bin/kafka-topics.sh --list --zookeeper localhost:2181
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
__consumer_offsets
connect-configs
connect-offsets
connect-status
dbserver1
dbserver1.test.tt1
dbtest.test

[root@localhost kafka_2.12-3.3.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbserver1.test.tt1 --from-beginning
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"IDX"}],"optional":false,"name":"dbserver1.test.tt1.Value"},"payload":{"IDX":1}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"IDX"}],"optional":false,"name":"dbserver1.test.tt1.Value"},"payload":{"IDX":2}}

 

Debezium Connector - DB 연동 테스트 Delete 부문

  • Debezium Connector 설정 시 transfrom 설정에 추가
[root@localhost kafka_2.12-3.3.1]# curl --request POST localhost:8083/connectors -H

'Content-Type: application/json' -d

'{

    "name" : "deb-source-connector",

    "config" :

    {

        "connector.class" : "io.debezium.connector.mysql.MySqlConnector",

        "tasks.max" : "1",

        "database.hostname" : "DB_IP",

        "database.port" : "DB_Port",

        "database.user" : "DB_user",

        "database.password" : "DB_user_pw",

        "database.server.id" : "DB_Server_id",

        "database.server.name" : "dbserver1",

        "database.allowPublicKeyRetrieval" : "true",

        "database.include.list" : "test",

        "database.history.kafka.bootstrap.servers" : "Kafka_IP:9092",

        "database.history.kafka.topic" : "dbtest.test",

        "snapshot.mode" : "schema_only",

        "key.converter" : "org.apache.kafka.connect.json.JsonConverter",

        "key.converter.schemas.enable" : "true",

        "value.converter" : "org.apache.kafka.connect.json.JsonConverter",

        "value.converter.schemas.enable" : "true",

        "transforms" : "unwrap, addTopicPrefix",

        "transforms.unwrap.type" : "io.debezium.transforms.ExtractNewRecordState",

1번 방법 ---------------------------------

        "transforms.unwrap.drop.tombstones" : "false",                            // Delete 이벤트에 대한 삭제 표시 유지

        "transforms.unwrap.delete.handling.mode" : "rewrite",                // 삭제된 데이터 true/false 로 기록

2번 방법 --------------------------------- 

        "transforms.unwrap.add.fields" : "op, table"                                  // op 필드와 payload 필드에 Insert/Update/Delete 기록

        "transforms.addTopicPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter",        

        "transforms.addTopicPrefix.regex" : "(.*)",

        "transforms.addTopicPrefix.replacement" : "$1"

    }

}'



초기 토픽 ---------------------------------

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"IDX"},{"type":"string","optional":true,"field":"char1"},{"type":"string","optional":true,"field":"char2"},{"type":"string","optional":true,"field":"__op"},{"type":"string","optional":true,"field":"__table"},{"type":"string","optional":true,"field":"__deleted"}],"optional":false,"name":"dbserver1.test.tt1.Value"},

"payload":{"IDX":106,"char1":"l","char2":"as","__op":"r","__table":"tt1","__deleted":"false"}}


{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"IDX"},{"type":"string","optional":true,"field":"char1"},{"type":"string","optional":true,"field":"char2"},{"type":"string","optional":true,"field":"__op"},{"type":"string","optional":true,"field":"__table"},{"type":"string","optional":true,"field":"__deleted"}],"optional":false,"name":"dbserver1.test.tt1.Value"},

"payload":{"IDX":107,"char1":"q","char2":"et","__op":"r","__table":"tt1","__deleted":"false"}}



 Insert 토픽 ---------------------------------

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"IDX"},{"type":"string","optional":true,"field":"char1"},{"type":"string","optional":true,"field":"char2"},{"type":"string","optional":true,"field":"__op"},{"type":"string","optional":true,"field":"__table"},{"type":"string","optional":true,"field":"__deleted"}],"optional":false,"name":"dbserver1.test.tt1.Value"},

"payload":{"IDX":108,"char1":"c","char2":"PP","__op":"c","__table":"tt1","__deleted":"false"}}



Delete 토픽  --------------------------------- {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"IDX"},{"type":"string","optional":true,"field":"char1"},{"type":"string","optional":true,"field":"char2"},{"type":"string","optional":true,"field":"__op"},{"type":"string","optional":true,"field":"__table"},{"type":"string","optional":true,"field":"__deleted"}],"optional":false,"name":"dbserver1.test.tt1.Value"},

"payload":{"IDX":106,"char1":"l","char2":"as","__op":"d","__table":"tt1","__deleted":"true"}}



Update 토픽 ---------------------------------

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"IDX"},{"type":"string","optional":true,"field":"char1"},{"type":"string","optional":true,"field":"char2"},{"type":"string","optional":true,"field":"__op"},{"type":"string","optional":true,"field":"__table"},{"type":"string","optional":true,"field":"__deleted"}],"optional":false,"name":"dbserver1.test.tt1.Value"},

"payload":{"IDX":105,"char1":"h","char2":"AA","__op":"u","__table":"tt1","__deleted":"false"}}

 

zookeeper/Kafka/Connector Port 정리