Debezium Source Connector
- Debezium은 Source Connector 역할
- 각 DB 별 Debezium Connector는 별도(for MySQL, for PGSQL, for DB2 등)
- jar 파일(Java Class library)로 JDK 설치 필요
- Connector에서 Topic 생성 시 다른 Sink Connector 들이 읽을 수 있도록 공용 메타 데이터로 생성
- Debezium 홈페이지 : https://debezium.io/
Kafka Plugins 추가
[root@localhost /]# wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.7.Final/debezium-connector-mysql-1.9.7.Final-plugin.tar.gz // debezium 다운로드
[root@localhost /]# tar -xf debezium-connector-mysql-1.9.7.Final-plugin.tar.gz -C /kafka_2.12-3.3.1/plugins/
[root@localhost /]# ls -al /kafka_2.12-3.3.1/plugins/
drwxr-xr-x. 2 root root 4096 Oct 25 20:42 debezium-connector-mysql
/////////// plugins 폴더 밑에 폴더 형식으로 있어야 Kafka에서 인식 ////////////
[root@localhost kafka_2.12-3.3.1]# vi config/config/connect-distributed.properties // kafka의 여러 개의 Plugin 사용을 위한 설정 파일
...
18 # This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended
19 # to be used with the examples, and some settings may differ from those used in a production system, especially
20 # the `bootstrap.servers` and those specifying replication factors.
21
22 # A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
23 bootstrap.servers=kafka_IP:9092
24
...
33 # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
34 # it to
35 key.converter.schemas.enable=false // 토픽 저장 시 메타 데이터 포맷 에러 발생, 문제 해결 중
36 value.converter.schemas.enable=false
37
...
78
79 # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
80 # (connectors, converters, transformations). The list should consist of top level directories that include
81 # any combination of:
82 # a) directories immediately containing jars with plugins and their dependencies
83 # b) uber-jars with plugins and their dependencies
84 # c) directories immediately containing the package directory structure of classes of plugins and their dependencies
85 # Examples:
86 plugin.path=/kafka_2.12-3.3.1/plugins // Kafka connector lib 경로
[root@localhost kafka_2.12-3.3.1]# bin/connect-distributed.sh config/connect-distributed.properties &
...
[2022-11-07 02:09:14,818] INFO Added plugin 'io.debezium.connector.mysql.MySqlConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:172)
[2022-11-07 02:09:14,818] INFO Added plugin 'io.debezium.converters.ByteArrayConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:172)
...
[root@localhost kafka_2.12-3.3.1]# curl --request GET localhost:8083/connector-plugins // kafka에 추가된 Plugins 확인
{"class":"io.debezium.connector.mysql.MySqlConnector","type":"source","version":"1.9.7.Final"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.1.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.1.0"}]
[root@localhost kafka_2.12-3.3.1]# curl --request GET localhost:8083/connectors // kafka에 연결된 Connector 목록
[] // 추가 해야지 list 표출
Debezium Connector 추가 예제
[root@localhost kafka_2.12-3.3.1]# curl --request POST localhost:8083/connectors -H
'Content-Type: application/json' -d
'{
"name" : "deb-source-connector",
"config" :
{
"connector.class" : "io.debezium.connector.mysql.MySqlConnector",
"tasks.max" : "1",
"database.hostname" : "192.168.56.107",
"database.port" : "3306",
"database.user" : "cdc",
"database.password" : "cdc",
"database.server.id" : "1",
"database.server.name" : "dbserver2",
"database.allowPublicKeyRetrieval" : "true",
"database.include.list" : "test",
"database.history.kafka.bootstrap.servers" : "192.168.56.109:9092",
"database.history.kafka.topic" : "histroy.test",
"converters" : "datetime",
"datetime.type" : "com.darcytech.debezium.converter.MySqlDateTimeConverter",
"datetime.format.date" : "yyyy-MM-dd",
"datetime.format.time" : "HH:mm:ss",
"datetime.format.datetime" : "yyyy-MM-dd HH:mm:ss",
"datetime.format.timestamp" : "yyy-MM-dd HH:mm:ss",
"datetime.format.timestamp.zone" : "UTC+9",
"key.converter" : "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable" : "true",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable" : "true",
"transforms" : "unwrap, Reroute",
"transforms.unwrap.type" : "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode" : "rewrite",
"transfroms.unwrap.drop.tombstones" : "false",
"transforms.unwrap.drop.deletes" : "false",
"time.precision.mode" : "connect",
"transforms.Reroute.type" : "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex" : "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.Reroute.topic.replacement" : "$1.$2.$3"
}
}'
[root@localhost kafka_2.12-3.3.1]# curl --request GET localhost:8083/connectors // kafka에 연결된 Connector 목록
["deb-source-connector"]
[root@localhost kafka_2.12-3.3.1]# bin/kafka-topics.sh --list --zookeeper // DB와 정상 연결 시 Topic 생성됨
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
__consumer_offsets
connect-configs
connect-offsets
connect-status
dbserver1
dbserver1.schema.table
schema-changes.test
- connector 생성 시 connect-* topic에 관련 내용들이 저장됨
- connector의 이름을 그대로 하여 삭제 및 추가 시 connect-* topic에 내용이 남아있어, 설정 값 변경 안됨
- Connector 설정 파라미터 주소
Debezium Connector 설정 변경법
[root@localhost kafka_2.12-3.3.1]# curl --request PUT localhost:8083/connectors/deb-source-connector/config -H
'Content-Type: application/json' -d
'{
"connector.class" : "io.debezium.connector.mysql.MySqlConnector",
"tasks.max" : "1",
"database.hostname" : "DB_IP",
"database.port" : "DB_Port",
"database.user" : "DB_user",
"database.password" : "DB_user_pw",
"database.server.id" : "DB_Server_id",
"database.server.name" : "dbserver1",
"database.allowPublicKeyRetrieval" : "true",
"database.include.list" : "test",
"database.serverTimezone" : "Asia/Seoul",
"database.history.kafka.bootstrap.servers" : "Kafka_IP:9092",
"database.history.kafka.topic" : "schema-changes.test",\
"snapshot.mode" : "schema_only",
"key.converter" : "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable" : "true",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable" : "true",
"transforms" : "unwrap, addTopicPrefix",
"transforms.unwrap.type" : "io.debezium.transforms.ExtractNewRecordState",
"transforms.addTopicPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex" : "(.*)",
"transforms.addTopicPrefix.replacement" : "$1"
}'
- Connector 연결에서 connectors/connector_name/config에 PUT 으로 설정 값을 다시 입력
- 설정 값 중 "name" 과 "config" 구문 삭제 후 변경 하여 PUT
- 변경 안되는 파라미터 값도 넣어야 함
Debezium Connector 설정 파라미터
파라미터 | 설명 |
value.converter | 카프카 커넥트 포맷과 카프카에 기록한 직렬화된 포맷을 변환할 때 사용할 컨버터 클래스 value 포맷은 해당 클래스가 제어 Json 데이터 포맷 사용 |
transforms | 데이터 스트림의 타입 |
snapshot.mode | 커넥터 연결 시 스냅샷(토픽생성) 기준 지정 파라미터 initial : 커넥터가 처음 연결됐을 당시에만 스냅샷?? initial_only : initial + binlog 에서 변경된 이력 사용 X when_needed : 특정 binlog/GTID 지정 시 부터 데이터 생ㅅ어 never : Binlog에있는 기록 전체 데이터 생성 schema_only : 스키마 스냅샷 실행, 변경된 이력 제외 schema_only_recovery : 스키마 스냅샷 + 변경된 이력이 binlog 에 있을 시 기록 |
key.converter | 카프카 커넥트 포맷과 카프카에 기록한 직렬화된 포맷을 변환할 때 사용할 컨버터 클래스 key 포맷은 해당 클래스가 제어 Json 데이터 포맷 사용 |
database.serverTimezone | DB 타임존 지정 KST의 경우 debezium에서 인식하지 못해, Asia/Seoul로 지정해주어야 한다. |
database.server.name | 커넥터에서 사용할 DB 서버 명 DB서버명.스키마.테이블 로 토픽이 생성됨 |
database.include.list | debezium에서 데이터 변경을 확인할 schema 리스트 |
database.history.kafka.topic | DB서버의 스키마 변경 사항 |
database.allowPublicKeyRetrieval | MySQL 8.0 이상부터 보안을 위해 추가 클라이언트가 서버에 자동으로 공개 키를 요청하는 기능 Java 연동을 위해 "true"로 설정 |
Debezium Connector - DB 연동 테스트 예제
[root@localhost kafka_2.12-3.3.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbtest.test --from-beginning
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
{
"source" : {
"server" : "dbserver1"
},
"position" : {
"ts_sec" : 1667809343,
"file" : "mysql-bin.000002",
"pos" : 1279,
"gtids" : "093ea1d1-5e75-11ed-ae67-0800273d7f4a:1-5",
"snapshot" : true
},
"databaseName" : "",
"ddl" : "SET character_set_server=utf8mb4, collation_server=utf8mb4_general_ci",
"tableChanges" : [ ]
}
///// create database test;
///// create table tt1(IDX int not null);
[root@localhost kafka_2.12-3.3.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbtest.test --from-beginning
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
{
"source" : {
"server" : "dbserver1"
},
"position" : {
"ts_sec" : 1667809343,
"file" : "mysql-bin.000002",
"pos" : 1279,
"gtids" : "093ea1d1-5e75-11ed-ae67-0800273d7f4a:1-5",
"snapshot" : true
},
"databaseName" : "",
"ddl" : "SET character_set_server=utf8mb4, collation_server=utf8mb4_general_ci",
"tableChanges" : [ ]
}
{
"source" : {
"server" : "dbserver1"
},
"position" : {
"transaction_id" : null,
"ts_sec" : 1667809502,
"file" : "mysql-bin.000002",
"pos" : 1464,
"gtids" : "093ea1d1-5e75-11ed-ae67-0800273d7f4a:1-5",
"server_id" : 2
},
"databaseName" : "test",
"ddl" : "create database test",
"tableChanges" : [ ]
}
{
"source" : {
"server" : "dbserver1"
},
"position" : {
"transaction_id" : null,
"ts_sec" : 1667809513,
"file" : "mysql-bin.000002",
"pos" : 1663,
"gtids" : "093ea1d1-5e75-11ed-ae67-0800273d7f4a:1-6",
"server_id" : 2
},
"databaseName" : "test",
"ddl" : "create table tt1(IDX int not null)",
"tableChanges" : [ {
"type" : "CREATE",
"id" : "\"test\".\"tt1\"",
"table" : {
"defaultCharsetName" : "utf8mb4",
"primaryKeyColumnNames" : [ ],
"columns" : [ {
"name" : "IDX",
"jdbcType" : 4,
"typeName" : "INT",
"typeExpression" : "INT",
"charsetName" : null,
"position" : 1,
"optional" : false,
"autoIncremented" : false,
"generated" : false,
"comment" : null,
"hasDefaultValue" : false,
"enumValues" : [ ]
} ]
},
"comment" : null
} ]
}
///// insert into tt1 values ('1');
///// insert into tt1 values ('2');
[root@localhost kafka_2.12-3.3.1]# bin/kafka-topics.sh --list --zookeeper localhost:2181
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
__consumer_offsets
connect-configs
connect-offsets
connect-status
dbserver1
dbserver1.test.tt1
dbtest.test
[root@localhost kafka_2.12-3.3.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbserver1.test.tt1 --from-beginning
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"IDX"}],"optional":false,"name":"dbserver1.test.tt1.Value"},"payload":{"IDX":1}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"IDX"}],"optional":false,"name":"dbserver1.test.tt1.Value"},"payload":{"IDX":2}}
Debezium Connector - DB 연동 테스트 Delete 부문
- Debezium Connector 설정 시 transfrom 설정에 추가
[root@localhost kafka_2.12-3.3.1]# curl --request POST localhost:8083/connectors -H
'Content-Type: application/json' -d
'{
"name" : "deb-source-connector",
"config" :
{
"connector.class" : "io.debezium.connector.mysql.MySqlConnector",
"tasks.max" : "1",
"database.hostname" : "DB_IP",
"database.port" : "DB_Port",
"database.user" : "DB_user",
"database.password" : "DB_user_pw",
"database.server.id" : "DB_Server_id",
"database.server.name" : "dbserver1",
"database.allowPublicKeyRetrieval" : "true",
"database.include.list" : "test",
"database.history.kafka.bootstrap.servers" : "Kafka_IP:9092",
"database.history.kafka.topic" : "dbtest.test",
"snapshot.mode" : "schema_only",
"key.converter" : "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable" : "true",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable" : "true",
"transforms" : "unwrap, addTopicPrefix",
"transforms.unwrap.type" : "io.debezium.transforms.ExtractNewRecordState",
1번 방법 ---------------------------------
"transforms.unwrap.drop.tombstones" : "false", // Delete 이벤트에 대한 삭제 표시 유지
"transforms.unwrap.delete.handling.mode" : "rewrite", // 삭제된 데이터 true/false 로 기록
2번 방법 ---------------------------------
"transforms.unwrap.add.fields" : "op, table" // op 필드와 payload 필드에 Insert/Update/Delete 기록
"transforms.addTopicPrefix.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex" : "(.*)",
"transforms.addTopicPrefix.replacement" : "$1"
}
}'
초기 토픽 ---------------------------------
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"IDX"},{"type":"string","optional":true,"field":"char1"},{"type":"string","optional":true,"field":"char2"},{"type":"string","optional":true,"field":"__op"},{"type":"string","optional":true,"field":"__table"},{"type":"string","optional":true,"field":"__deleted"}],"optional":false,"name":"dbserver1.test.tt1.Value"},
"payload":{"IDX":106,"char1":"l","char2":"as","__op":"r","__table":"tt1","__deleted":"false"}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"IDX"},{"type":"string","optional":true,"field":"char1"},{"type":"string","optional":true,"field":"char2"},{"type":"string","optional":true,"field":"__op"},{"type":"string","optional":true,"field":"__table"},{"type":"string","optional":true,"field":"__deleted"}],"optional":false,"name":"dbserver1.test.tt1.Value"},
"payload":{"IDX":107,"char1":"q","char2":"et","__op":"r","__table":"tt1","__deleted":"false"}}
Insert 토픽 ---------------------------------
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"IDX"},{"type":"string","optional":true,"field":"char1"},{"type":"string","optional":true,"field":"char2"},{"type":"string","optional":true,"field":"__op"},{"type":"string","optional":true,"field":"__table"},{"type":"string","optional":true,"field":"__deleted"}],"optional":false,"name":"dbserver1.test.tt1.Value"},
"payload":{"IDX":108,"char1":"c","char2":"PP","__op":"c","__table":"tt1","__deleted":"false"}}
Delete 토픽 --------------------------------- {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"IDX"},{"type":"string","optional":true,"field":"char1"},{"type":"string","optional":true,"field":"char2"},{"type":"string","optional":true,"field":"__op"},{"type":"string","optional":true,"field":"__table"},{"type":"string","optional":true,"field":"__deleted"}],"optional":false,"name":"dbserver1.test.tt1.Value"},
"payload":{"IDX":106,"char1":"l","char2":"as","__op":"d","__table":"tt1","__deleted":"true"}}
Update 토픽 ---------------------------------
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"IDX"},{"type":"string","optional":true,"field":"char1"},{"type":"string","optional":true,"field":"char2"},{"type":"string","optional":true,"field":"__op"},{"type":"string","optional":true,"field":"__table"},{"type":"string","optional":true,"field":"__deleted"}],"optional":false,"name":"dbserver1.test.tt1.Value"},
"payload":{"IDX":105,"char1":"h","char2":"AA","__op":"u","__table":"tt1","__deleted":"false"}}
zookeeper/Kafka/Connector Port 정리