Post

Streaming data with Debezium, DBMS and Kafka

Streaming data with Debezium, DBMS and Kafka

  • kafka connector를 통해서 두 개의 DB 사이의 데이터를 스트리밍하는 예제
  • 저는 sink DB를 두 개로 설정하여 실습하였습니다.
  • (mysql - kafka - mysql, postgres)
  • 참조: Local 환경

https://github.com/pranav1699/debezium-kafka-cdc

Untitled

Step 1 : Creating the docker compose file

MySQL - Kafka

Zookeeper, MySQL, Kafka 이미지를 가져와서 도커에서 실행시킴

  • docker-compose.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
version: '3'
services:
  mysql:
    image: mysql:8.0
    container_name: mysql
    ports:
    - 3306:3306
    environment:
      MYSQL_ROOT_PASSWORD: admin
      MYSQL_USER: mysqluser
      MYSQL_PASSWORD: mysqlpw
    command:
      - --character-set-server=utf8mb4
      - --collation-server=utf8mb4_unicode_ci
    volumes:
      - C:/mysql/data:/var/lib/mysql
  zookeeper:
    container_name: zookeeper
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    
  kafka:
    container_name: kafka
    image: wurstmeister/kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  • docker-compose
1
docker-compose up -d

Untitled1

  • 사진을 마지막에 찍음

  • source MySQL(:3306) db에 테이블 생성 및 권한 부여

/assets/img/2023-10-39-Kafka/Untitled

  • Debezium Connector 설치 & JDBC Connector 설치
1
2
3
docker cp debezium-connector-mysql-1.9.6.Final-plugin.tar.gz kafka:/opt/kafka_2.13-2.8.1/connectors/debezium-connector-mysql1.9.6.Final-plugin.tar.gz

docker cp confluentinc-kafka-connect-jdbc-10.7.0.zip kafka:/opt/kafka_2.13-2.8.1/connectors/

/assets/img/2023-10-39-Kafka/Untitled

  • connect-distributed.properties를 수정

  • plugin 경로 설정

/assets/img/2023-10-39-Kafka/Untitled

/assets/img/2023-10-39-Kafka/Untitled

1
connect-distributed.sh /opt/kafka/config/connect-distributed.properties
  • kafka 실행

/assets/img/2023-10-39-Kafka/Untitled

  • 포트확인

/assets/img/2023-10-39-Kafka/Untitled

  • 커넥터 플러그인 확인

/assets/img/2023-10-39-Kafka/Untitled

  • source-test-connector 설정 및 생성

  • 커넥터는 데비지움 MySqlConnector 사용

  • 데이터베이스 접속 정보 명시

  • topic 명시

/assets/img/2023-10-39-Kafka/Untitled

  • source 커넥터 설정 확인

/assets/img/2023-10-39-Kafka/Untitled

  • topics 목록 확인

/assets/img/2023-10-39-Kafka/Untitled

  • source mysql db에 데이터 삽입

/assets/img/2023-10-39-Kafka/Untitled

  • kafka consumer 동작 확인

/assets/img/2023-10-39-Kafka/Untitled

  • Source Connector 를 만들었던 docker compose 파일에 sink용 Mysql 하나 더 추가하였다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
version: '3'
services:
  mysql:
    image: mysql:8.0
    container_name: mysql
    ports:
    - 3306:3306
    environment:
      MYSQL_ROOT_PASSWORD: admin
      MYSQL_USER: mysqluser
      MYSQL_PASSWORD: mysqlpw
    command:
      - --character-set-server=utf8mb4
      - --collation-server=utf8mb4_unicode_ci
    volumes:
      - C:/mysql/data:/var/lib/mysql
  zookeeper:
    container_name: zookeeper
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    
  kafka:
    container_name: kafka
    image: wurstmeister/kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  
  mysql-sink:
    image: mysql:8.0
    container_name: mysql-sink
    ports:
      - 3307:3306
    environment:
      MYSQL_ROOT_PASSWORD: admin
      MYSQL_USER: mysqluser
      MYSQL_PASSWORD: mysqlpw
    command:
      - --character-set-server=utf8mb4
      - --collation-server=utf8mb4_unicode_ci
    volumes:
      - C:/mysql-sink/data:/var/lib/mysql

/assets/img/2023-10-39-Kafka/Untitled

  • sink MySQL DB에 데이터베이스 & 테이블 생성 및 권한 부여

/assets/img/2023-10-39-Kafka/Untitled

  • JDBC Connector 설치

/assets/img/2023-10-39-Kafka/Untitled

  • plugins 확인

/assets/img/2023-10-39-Kafka/Untitled

  • Rest API 로 sink-test-connector 생성

/assets/img/2023-10-39-Kafka/Untitled

/assets/img/2023-10-39-Kafka/Untitled

  • source-MySQL-DB에 데이터 추가 및 Kafka Consumer 동작 확인

/assets/img/2023-10-39-Kafka/Untitled

  • 오류 발생:
1
NO Suitable driber found for jdbc:mysql://mysql-sink:3306/sinkdb?user=mysqluser&password=mysqlpw

/assets/img/2023-10-39-Kafka/Untitled

Connect/J JDBC driver for MySQL를 다운받는다.

  • 해당 jar 파일을 Confluent의 connect 플러그인이 설치된 디렉토리에 넣는다.

/assets/img/2023-10-39-Kafka/Untitled

  • plugin 재확인

/assets/img/2023-10-39-Kafka/Untitled

  • source MySQL DB에 데이터 삽입

/assets/img/2023-10-39-Kafka/Untitled

  • sinkdb 접속

/assets/img/2023-10-39-Kafka/Untitled

  • sinkdb 반영 확인

  • source db에서 삭제

/assets/img/2023-10-39-Kafka/Untitled

  • sink db에 삭제 반영

/assets/img/2023-10-39-Kafka/Untitled

/assets/img/2023-10-39-Kafka/Untitled

/assets/img/2023-10-39-Kafka/Untitled

/assets/img/2023-10-39-Kafka/Untitled

  • 자신이 만든 custom table을 Source에 생성하고, target table에 데이터 변경사항을 Capture

  • source-custom-connector 추가

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
"name": "source-custom-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "mysqluser",
"database.password": "mysqlpw",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.allowPublicKeyRetrieval": "true",
"database.include.list": "sourcedb",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.sourcedb",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap,addTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode":"rewrite",
"transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex":"(.*)",
"transforms.addTopicPrefix.replacement":"$1"
}
}'
  • source-custom-connector 설정 확인

/assets/img/2023-10-39-Kafka/Untitled

  • kafka consumer 확인

/assets/img/2023-10-39-Kafka/Untitled

  • 새로운 db 생성 및 테이블 생성

/assets/img/2023-10-39-Kafka/Untitled

  • source db에 데이터 삽입

/assets/img/2023-10-39-Kafka/Untitled

  • kafka consumer 확인
1
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"email"},{"type":"int32","optional":true,"field":"age"},{"type":"string","optional":true,"field":"__deleted"}],"optional":false,"name":"dbserver1.sourcedb.users.Value"},"payload":{"id":1,"name":"Ham","email":"tomy8964@naver.com","age":25,"__deleted":"false"}}
  • sink-custom-connector 추가
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
"name": "sink-custom-connector1",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://mysql-sink:3306/custom_sinkdb?user=mysqluser&password=mysqlpw",
"auto.create": "false",
"auto.evolve": "true",
"delete.enabled": "true",
"insert.mode": "upsert",
"pk.mode": "record_key",
"table.name.format":"${topic}",
"tombstones.on.delete": "true",
"connection.user": "mysqluser",
"connection.password": "mysqlpw",
"topics.regex": "dbserver1.sourcedb.(.*)",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap, route, TimestampConverter",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode":"rewrite" ,
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
"transforms.TimestampConverter.target.type": "Timestamp",
"transforms.TimestampConverter.field": "update_date"
}
}'
  • sink-custom-connector 설정 확인

/assets/img/2023-10-39-Kafka/Untitled

  • source db에 데이터 삽입 확인

/assets/img/2023-10-39-Kafka/Untitled

  • sourcedb.users topic의 kafka consumer 동작 확인

/assets/img/2023-10-39-Kafka/Untitled

  • 새로운 sink custom db 생성 및 테이블 생성

/assets/img/2023-10-39-Kafka/Untitled

  • source db 데이터 새로운 sink custom db에 반영 확인

/assets/img/2023-10-39-Kafka/Untitled

  • target db 하나 더 늘리기 위해 postgres 이미지 도커로 실행
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
version: '3'
services:
  mysql:
    image: mysql:8.0
    container_name: mysql
    ports:
    - 3306:3306
    environment:
      MYSQL_ROOT_PASSWORD: admin
      MYSQL_USER: mysqluser
      MYSQL_PASSWORD: mysqlpw
    command:
      - --character-set-server=utf8mb4
      - --collation-server=utf8mb4_unicode_ci
    volumes:
      - C:/mysql/data:/var/lib/mysql
  zookeeper:
    container_name: zookeeper
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    
  kafka:
    container_name: kafka
    image: wurstmeister/kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  
  mysql-sink:
    image: mysql:8.0
    container_name: mysql-sink
    ports:
      - 3307:3306
    environment:
      MYSQL_ROOT_PASSWORD: admin
      MYSQL_USER: mysqluser
      MYSQL_PASSWORD: mysqlpw
    command:
      - --character-set-server=utf8mb4
      - --collation-server=utf8mb4_unicode_ci
    volumes:
      - C:/mysql-sink/data:/var/lib/mysql

  postgresql:
    image: quay.io/debezium/postgres:9.6
    container_name: postgres-sink
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_USER=postgresuser
      - POSTGRES_PASSWORD=postgrespw
    volumes:
      - C:/postgres/data:/var/lib/postgres
  • postgres 접속

/assets/img/2023-10-39-Kafka/Untitled

  • custom_sinkdb 생성 및 users 테이블 생성

/assets/img/2023-10-39-Kafka/Untitled

  • postgresuser 권한 부여

/assets/img/2023-10-39-Kafka/Untitled

/assets/img/2023-10-39-Kafka/Untitled

  • custom-sink-connector2 생성 (kafka-postgres)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
"name": "sink-custom-connector2",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://postgres-sink:5432/custom_sinkdb?user=postgresuser&password=postgrespw",
"auto.create": "false",
"auto.evolve": "true",
"delete.enabled": "true",
"insert.mode": "upsert",
"pk.mode": "record_key",
"table.name.format":"${topic}",
"tombstones.on.delete": "true",
"connection.user": "postgresuser",
"connection.password": "postgrespw",
"topics.regex": "dbserver1.sourcedb.(.*)",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap, route, TimestampConverter",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode":"rewrite" ,
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
"transforms.TimestampConverter.target.type": "Timestamp",
"transforms.TimestampConverter.field": "update_date"
}
}'
  • sink-custom-connector2 생성 & 설정 확인

/assets/img/2023-10-39-Kafka/Untitled

  • postgres sinkdb 반영 확인

/assets/img/2023-10-39-Kafka/Untitled

  • mysql-source-db에서 id=1인 값 삭제

/assets/img/2023-10-39-Kafka/Untitled

/assets/img/2023-10-39-Kafka/Untitled

  • mysql-sink 데이터 삭제 확인

/assets/img/2023-10-39-Kafka/Untitled

  • postgres-sink 데이터 삭제 확인

/assets/img/2023-10-39-Kafka/Untitled


References: 가천 SW 아카데미

This post is licensed under CC BY 4.0 by the author.