Post

SourceDB - Kafka - SinkDB 2 (MySql, Postgres)

SourceDB - Kafka - SinkDB 2 (MySql, Postgres)

구성도

Untitled

  • 로컬 환경

docker compose 파일 생성

  • MySQL(sourceDB), MySQL(sinkDB1), Kafka, Zookeeper, PostgreSQL(sinkDB2) 이미지로 구성
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
version: '3'
services:
  mysql:
    image: mysql:8.0
    container_name: mysql
    ports:
    - 3308:3306
    environment:
      MYSQL_ROOT_PASSWORD: admin
      MYSQL_USER: mysqluser
      MYSQL_PASSWORD: mysqlpw
    command:
      - --character-set-server=utf8mb4
      - --collation-server=utf8mb4_unicode_ci
    volumes:
      - C:/mysql/data:/var/lib/mysql
  zookeeper:
    container_name: zookeeper
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    
  kafka:
    container_name: kafka
    image: wurstmeister/kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  
  mysql-sink:
    image: mysql:8.0
    container_name: mysql-sink
    ports:
      - 3307:3306
    environment:
      MYSQL_ROOT_PASSWORD: admin
      MYSQL_USER: mysqluser
      MYSQL_PASSWORD: mysqlpw
    command:
      - --character-set-server=utf8mb4
      - --collation-server=utf8mb4_unicode_ci
    volumes:
      - C:/mysql-sink/data:/var/lib/mysql

  postgresql:
    image: quay.io/debezium/postgres:9.6
    container_name: postgres-sink
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_USER=postgresuser
      - POSTGRES_PASSWORD=postgrespw
    volumes:
      - C:/postgres/data:/var/lib/postgres
1
docker-compose -f docker-compose2.yml up -d

Untitled

DB Custom 테이블(users) 생성 및 유저 권한 부여

MySQL(source db)

Untitled

Untitled

MySQL(sink db1)

Untitled

Untitled

PostgreSQL(sink db2)

postgres 접속

Untitled

custom_sinkdb 생성 및 users 테이블 생성

Untitled

postgresuser 권한 부여

Untitled

Untitled

Debezium Connector 설치 & JDBC Connector 설치

connector 파일들 /opt/kafka_2.13-2.8.1/connectors/로 복사

1
2
3
docker cp debezium-connector-mysql-1.9.6.Final-plugin.tar.gz kafka:/opt/kafka_2.13-2.8.1/connectors/debezium-connector-mysql1.9.6.Final-plugin.tar.gz

docker cp confluentinc-kafka-connect-jdbc-10.7.0.zip kafka:/opt/kafka_2.13-2.8.1/connectors/

파일 압축 해제

Untitled

connect-distributed.properties 수정

plugin 경로 설정

Untitled

Untitled

Kafka 실행

1
connect-distributed.sh /opt/kafka/config/connect-distributed.properties

Untitled

포트 확인

Untitled

커넥터 플러그인 확인

Untitled

커넥터 설정 및 생성

source-custom-connector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
"name": "source-custom-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "mysqluser",
"database.password": "mysqlpw",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.allowPublicKeyRetrieval": "true",
"database.include.list": "sourcedb",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.sourcedb",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap,addTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode":"rewrite" ,
"transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex":"(.*)",
"transforms.addTopicPrefix.replacement":"$1",
"include.schema.changes": "true"
}
}'
  • Debezium MySQL 커넥터 사용
  • MySQL Souce DB 설정
  • 토픽 설정
  • value 변환에 JsonConverter를 사용
1
2
3
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode":"rewrite",
"include.schema.changes": "true"
  • transforms.unwrap.drop.tombstones: 레코드가 삭제되었을 때 해당 레코드를 무시할지 여부
  • transforms.unwrap.delete.handling.mode: 레코드가 삭제되었을 때 처리 방법
  • sourcedb 데이터베이스에서 변경 사항을 추적하여 dbserver1이라는 이름으로 Kafka 토픽에 전송합니다.
  • transforms 설정을 사용하여 Kafka 토픽 이름을 sourcedb.<table-name>형식으로 변경합니다. 이렇게 하면 추적 중인 테이블 이름으로 구분된 Kafka 토픽을 만들 수 있습니다.
  • "include.schema.changes": "true" 를 설정하면, 해당 커넥터는 스키마 변화를 감지하고 대응하는 ALTER TABLE 문을 자동으로 생성하여 데이터베이스에 반영하게 됩니다.
  • source-custom-connector 등록 확인

Untitled

1
NO Suitable driber found for jdbc:mysql://mysql-sink:3306/sinkdb?user=mysqluser&password=mysqlpw

Untitled

  1. mysql-connector-java 파일을 다운받는다.
  2. 해당 jar 파일을 Confluent의 connect 플러그인이 설치된 디렉토리에 넣는다.

    Untitled

sink-custom-connector1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
"name": "sink-custom-connector1",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://mysql-sink:3307/custom_sinkdb?user=mysqluser&password=mysqlpw",
"auto.create": "false",
"auto.evolve": "true",
"delete.enabled": "true",
"insert.mode": "upsert",
"pk.mode": "record_key",
"table.name.format":"${topic}",
"tombstones.on.delete": "true",
"connection.user": "mysqluser",
"connection.password": "mysqlpw",
"topics.regex": "dbserver1.sourcedb.(.*)",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap, route, TimestampConverter",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode":"rewrite" ,
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
"transforms.TimestampConverter.target.type": "Timestamp",
"transforms.TimestampConverter.field": "update_date",
"include.schema.changes": "true"
}
}'
  • Kafka Connect에서 MySQL 브로커로 데이터를 전송하는 JDBC Sink Connector를 구성
  • MySQL Sink DB 설정
  • delete.enabled : 삭제된 레코드를 Sink에 쓸지 여부를 결정합니다. 이를 true로 설정하여 삭제된 레코드도 Sink에 쓰도록 합니다.
  • tombstones.on.delete : 삭제 시 토픽에 빈 메시지를 작성하는 옵션입니다. 이를 true로 설정하여 레코드 삭제 시 빈 메시지가 작성되도록 합니다.
  • sink-custom-connector1 등록 및 설정 확인

Untitled

sink-custom-connector2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
"name": "sink-custom-connector2",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://postgres-sink:5432/custom_sinkdb?user=postgresuser&password=postgrespw",
"auto.create": "false",
"auto.evolve": "true",
"delete.enabled": "true",
"insert.mode": "upsert",
"pk.mode": "record_key",
"table.name.format":"${topic}",
"tombstones.on.delete": "true",
"connection.user": "postgresuser",
"connection.password": "postgrespw",
"topics.regex": "dbserver1.sourcedb.(.*)",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap, route, TimestampConverter",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode":"rewrite" ,
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
"transforms.TimestampConverter.target.type": "Timestamp",
"transforms.TimestampConverter.field": "update_date",
"include.schema.changes": "true"
}
}'
  • Kafka Connect에서 Postgres 브로커로 데이터를 전송하는 JDBC Sink Connector 구성
  • PostgreSQL Sink DB 설정
  • sink-custom-connector1과 같음
  • sink-custom-connector2 생성 & 설정 확인

Untitled

Kafka 동작 영상

Kafka 동작 영상.mp4

데이터 삽입

Source_DB(MySQL)에 데이터 삽입

Untitled

Kafka Consumer 동작 확인

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": false,
        "field": "name"
      },
      {
        "type": "string",
        "optional": false,
        "field": "email"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "age"
      },
      {
        "type": "string",
        "optional": true,
        "field": "__deleted"
      }
    ],
    "optional": false,
    "name": "dbserver1.sourcedb.users.Value"
  },
  "payload": {
    "id": 1,
    "name": "Ham",
    "email": "tomy8964@naver.com",
    "age": 25,
    "__deleted": "false"
  }
}

Sink_DB_1(MySQL)에 데이터 삽입 반영 확인

Untitled

Sink_DB_2(PostgreSQL)에 데이터 삽입 반영 확인

Untitled

데이터 수정

Source_DB(MySQL)에서 데이터 수정

Untitled

Kafka Consumer 동작 확인

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": false,
        "field": "name"
      },
      {
        "type": "string",
        "optional": false,
        "field": "email"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "age"
      },
      {
        "type": "string",
        "optional": true,
        "field": "__deleted"
      }
    ],
    "optional": false,
    "name": "dbserver1.sourcedb.users.Value"
  },
  "payload": {
    "id": 1,
    "name": "John",
    "email": "john@gmail.com",
    "age": 25,
    "__deleted": "false"
  }
}

Sink_DB_1(MySQL)에 데이터 수정 반영 확인

Untitled

Sink_DB_2(PostgreSQL)에 데이터 수정 반영 확인

Untitled

데이터 삭제

Source_DB(MySQL)에서 데이터 삭제

Untitled

Untitled

Kafka Consumer 동작 확인

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": false,
        "field": "name"
      },
      {
        "type": "string",
        "optional": false,
        "field": "email"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "age"
      },
      {
        "type": "string",
        "optional": true,
        "field": "__deleted"
      }
    ],
    "optional": false,
    "name": "dbserver1.sourcedb.users.Value"
  },
  "payload": {
    "id": 1,
    "name": "Ham",
    "email": "tomy8964@naver.com",
    "age": 25,
    "__deleted": "true"
  }
}

Sink_DB_1(MySQL)에 데이터 삭제 반영 확인

Untitled

Sink_DB_2(PostgreSQL)에 데이터 삭제 반영 확인

Untitled

테이블 수정

Source_DB(MySQL)에서 테이블 수정

Untitled

Kafka Consumer 동작 확인

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": false,
        "field": "name"
      },
      {
        "type": "string",
        "optional": false,
        "field": "email"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "age"
      },
      {
        "type": "string",
        "optional": true,
        "field": "home"
      },
      {
        "type": "string",
        "optional": true,
        "field": "__deleted"
      }
    ],
    "optional": false,
    "name": "dbserver1.sourcedb.users.Value"
  },
  "payload": {
    "id": 1,
    "name": "Ham",
    "email": "tomy8964@naver.com",
    "age": 25,
    "home": "Seoul",
    "__deleted": "false"
  }
}

Sink_DB_1(MySQL)에 테이블 수정 반영 확인

Untitled

Sink_DB_2(PostgreSQL)에 테이블 수정 반영 확인

Untitled

This post is licensed under CC BY 4.0 by the author.