Debizium Mysql CDC Source 커넥터가 Avro 형식으로 데이터를 추출해 Kafka Topic에 Pub해주고 Confluent Kudu sink 커넥터가 해당 Topic을 Consume 하여 Impala Table에 적재하는 시나리오 입니다.
데모 시나리오 테스트 결과
- Debizium Mysql CDC Source 커넥터 동작 이상 없음 (Insert / Delete / Update / Upsert)
- Schema Resister 동작 이상 없음 (Source 테이블 컬럼명 데이터 타입 매핑 동작)
- Confluent Kudu sink 제한 사항 Topic에 적재된 CDC 데이터 Impala 적용시 Delete, Update, Upsert 지원 하지 않음 오로지 Insert 동작만 지원
Mysql CDC source connector 소스
{
"name": "source-mysql-kudu-hj0",
"config": {
"timestamp.column.name": "user",
"topic.creation.default.partitions": "1",
"topic.creation.default.delete.retention.ms": "60000",
"topic.creation.default.replication.factor": "1",
"database.allowPublicKeyRetrieval": "true",
"topic.creation.default.compression.type": "lz4",
"topic.creation.default.cleanup.policy": "compact",
"value.converter.schema.registry.url": "http://IP:8081",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true",
"key.converter.schema.registry.url": "http://IP:8081",
"name": "source-mysql-kudu-hj0",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"transforms": "unwrap, route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode": "drop",
"transforms.unwrap.drop.tombstones": "false",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$1.$2.$3",
"database.hostname": "IP",
"database.port": "3306",
"database.user": "ishark",
"database.password": "ishark2020",
"database.server.name": "mysql111",
"snapshot.mode": "initial",
"time.precision.mode": "connect",
"database.history.kafka.bootstrap.servers": "IP:9092",
"database.history.kafka.topic": "history.mysql111.kudu_testh03",
"table.include.list": "testdb.kudu_testh03",
"include.schema.changes": "true",
"database.include.list": "testdb"
}
}
Kudu Sink Connector 소스
{
"name": "sink-mysql-kudu-hj0",
"config": {
"value.converter.schema.registry.url": "http://IP:8081",
"key.converter.schema.registry.url": "http://IP:8081",
"name": "sink-mysql-kudu-hj0",
"connector.class": "io.confluent.connect.kudu.KuduSinkConnector",
"tasks.max": "1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"transforms": "route",
"topics": "mysql110.testdb.kudu_testh02",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3",
"impala.server": "IP",
"impala.port": "21050",
"kudu.database": "test",
"impala.ldap.user": "",
"impala.ldap.password": "",
"kudu.tablet.replicas": "1",
"confluent.topic.bootstrap.servers": "IP:9092",
"confluent.topic.replication.factor": "1",
"table.name.format": "${topic}",
"pk.mode": "record_value",
"pk.fields": "id",
"auto.create": "true"
}
}
'Kafka' 카테고리의 다른 글
Apache Flume(Kafka To File) (0) | 2023.03.06 |
---|---|
Kafkaconnector(MysqlCDC To Impala&Kudu) (0) | 2023.02.22 |
Kcat(Kafkacat) (0) | 2023.02.20 |
Apache Beam (0) | 2023.01.26 |
Apache Flume(Kafka To Hadoop) (0) | 2022.10.21 |