Table of contents
- Given problem
- Solution with Debezium
- Using Docker Compose to define all softwares
- Source code
- Some notes about configuration of this project
- Some problems happen in this project
- Wrapping up
In Microservice architectural pattern or CQRS pattern that is combined with Event Sourcing, all things that we will work is events. It makes our our system reliable, fault-tolerance, asynchronous, …
Then, how do we implement it in the real projects?
Solution with Debezium
In this article, we will use Debezium to deal the above problem. In reality, Debezium is built on top of Kafka Connect, Kafka, and some other driver of databases.
Before jumping into source code, we need to understand about the architecture of Debezium.
Debezium will contain some important parts:
This part will be database that we usually in project such as MySQL, PostgreSQL, Oracle, MariaDB, …
This part will be used to contain all data that we get from Source part. Our data will be processed based on our intention.
If we want to statistic errors, rates about requests, …, we can use Elastic Search.
If we want to cache all things, we can use Infinispan.
This part contains:
Debezium library will read binlog file of MySQL, or transaction log of the other databases to convert CRUD operations in database into events of Debezium.
Kafka Connect will receive data from Debezium. Then it pushes data into the specific topic that we configure.
Using Docker Compose to define all softwares
In this project, we will use some tools:
- MySQL 8.0
- Kafka 2.5 with Zookeeper
Below is the content of docker-compose.yaml file.
version: '3' services: # https://hub.docker.com/_/mysql mysqldb: image: mysql:8.0 container_name: mysql ports: - 3307:3306 command: --max-connections=1000 environment: - MYSQL_DATABASE=employee_sample - MYSQL_ROOT_PASSWORD=12345 volumes: - mysqldb_data:/var/lib/mysql restart: always redis: image: redis:5.0.7 container_name: redis ports: - 6379:6379 volumes: - redis_volume:/data restart: always zookeeper-server: image: bitnami/zookeeper:latest container_name: zookeeper-server-dev ports: - 2181:2181 environment: - ALLOW_ANONYMOUS_LOGIN=yes restart: always volumes: - zookeeper_data:/bitnami networks: - backend kafka-server: image: bitnami/kafka:latest container_name: kafka-server-dev ports: - 9092:9092 environment: - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_LISTENERS=PLAINTEXT://:9092 - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 restart: always depends_on: - zookeeper-server volumes: - kafka_data:/bitnami networks: - backend # local means the volumes zookeeper_data, kafka_data are created on the same Docker host # where we run our container. # It is equivalent to: # docker volume create --driver local --name zookeeper_data # docker volume create --driver local --name kafka_data volumes: mysqldb_data: driver: local zookeeper_data: driver: local kafka_data: driver: local redis_volume: driver: local networks: backend: driver: bridge
When we run with command docker-compose up, we need to configure binlog file with row mode.
To view source code of this project, you can refer to this link on github.
Some notes about configuration of this project
To understand the meaning of each parameter in Debezium’s configuration, we need to look at the below table.
|connector.class||The connector’s name when registered with the Kafka Connect service.||io.debezium.connector.mysql.MySqlConnector|
|offset.storage.file.filename||Specify the name of file that we want||/tmp/file.dat|
|offset.flush.interval.ms||Interval at which to try committing offsets for tasks of Kafka Connect||60000|
|name||The name of this connector that we want||table_name-mysql-connector|
|database.server.id||By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value.||a random value|
|database.server.name||This is the logical name of the MySQL server or cluster||mysqlServer-employee|
|database.hostname||The MySQL server address||127.0.0.1|
|database.port||The MySQL server port number||3306|
|database.user||The MySQL user with the appropriate privileges.||root|
|database.password||The MySQL user’s password.||**|
|database.dbname||The MySQL database name that we want to listen data change event||employee|
|table.whitelist||A list of all tables that we want to monitor. Each identifier is of the form databaseName.tableName. By default the connector will monitor every non-system table in each monitored database||employee_sample.employee|
|database.whitelist||An optional comma-separated list of regular expressions that match database names to be monitored; any database name not included in the whitelist will be excluded from monitoring. By default all databases will be monitored. May not be used with database.blacklist.||employee_sample,order_sample|
|debezium.source.database.history||In this option, we can have two values: io.debezium.relational.history.MemoryDatabaseHistory or io.debezium.relational.history.FileDatabaseHistory||io.debezium.relational.history.MemoryDatabaseHistory|
|debezium.source.database.history.file.filename||If we select the value io.debezium.relational.history.FileDatabaseHistory, then we need to point the path of file||/path/db-history.dat|
|database.history.kafka.topic||Define a topic that Kafka Connect will push data into it. It is as same as a topic that we defined with Kafka||dbHistory.table_name|
|database.history.kafka.bootstrap.servers||A list of Kafka brokers that the connector uses to write and recover DDL statements to the database history topic||localhost:9092|
Some problems happen in this project
Timeout with Kafka
Error configuring an instance of KafkaDatabaseHistory; check the logs for details
In order to create KafkaDatabaseHistory right away, we need to configure some important options for Kafka.
The value of this option is equal to the topic that we are configured in kafka.
The value of this option is the IP address of Kafka.
debezium.source.database.history option for the mysql connector
The default value is io.debezium.relational.history.KafkaDatabaseHistory. If we use non-Kafka deployments, we need to use one the following values:
io.debezium.relational.history.FileDatabaseHistory (along with debezium.source.database.history.file.filename property);
io.debezium.relational.history.MemoryDatabaseHistory for test environments.
Access the volume in local host
This volume directory is protected by Linux, so we can not access it by using cd command.
To solve this problem, we can use command
sudo ls <volume_name>.
Client does not support authentication protocol requested by server; consider upgrading MySQL client
When we use MySQL 8.0, we will encounter this problem. Because mysql-binlog-connector-java library depends on the mysql-connector-java library in the old version that uses the different encryption algorithm.
To solve this problem, we will configure MySQL 8.0 server that accepts the password of the old encryption algorithm.
- MySQL 8.0 uses Secure Password Authentication algorithm for password.
- The older version of MySQL uses sha2_password authentication algorithm.
Before jump to fix it, we need to know the name of our current user. Then, we can use the below command:
ALTER USER 'root'@'%' IDENTIFIED WITH mysql_native_password BY 'our_password';
Understanding about some configurations when we are working with debezium in our project.
Take care about configuration because it can cause some problems.
Thanks for your reading.
Timeout when deploying Kafka
Fix bug about authenticating in MySQL 8.0