将PostgreSQL变更通过Debezium+Redis Stream同步到Redis中
source link: https://www.jdon.com/62569
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
将PostgreSQL变更通过Debezium+Redis Stream同步到Redis中
Debezium 是一个很好的工具,用于捕获数据库上发生的行级更改并将这些更改流式传输到我们选择的代理。
我们的目标是监听 PostgreSQL 更改并通过 Debezium 服务器将它们流式传输到 Redis 流。
通常将 Debizum 与 Kafka 一起使用,在我们的例子中,我们将通过使用 Redis Streams 来保持轻量级。
无需任何额外配置即可设置 Redis。
为了将 PostgreSQL 与 Debezium 一起使用,必须更改 postgreSQL 上的配置。
我们将在 postgreSQL 上使用的配置如下
listen_addresses = '*' port = 5432 max_connections = 20 shared_buffers = 128MB temp_buffers = 8MB work_mem = 4MB wal_level = logical max_wal_senders = 3 |
正如我们所见,我们使用 PostgreSQL 的logical_decoding。
从文档中:
logical_decoding逻辑解码是将数据库表的所有持久更改提取为连贯的、易于理解的格式的过程,无需详细了解数据库的内部状态即可解释该格式。
在 PostgreSQL中,逻辑解码是通过将描述存储级别更改的 预写日志的内容解码为特定于应用程序的形式(例如元组流或 SQL 语句)来实现的。
我们还将为 PostgreSQL 创建一个命名空间和一个表。命名空间和表将是监听更改的目标对象。
#!/bin/bash set -e psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL create schema test_schema; create table test_schema.employee( id SERIAL PRIMARY KEY, firstname TEXT NOT NULL, lastname TEXT NOT NULL, email TEXT not null, age INT NOT NULL, salary real, unique(email) ); EOSQL |
Debezium 必须能够与 PostgreSQL 服务器以及 redis 服务器交互。配置应该如下:
debezium.sink.type=redis debezium.sink.redis.address=redis:6379 debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=0 debezium.source.database.hostname=postgres debezium.source.database.port=5432 debezium.source.database.user=postgres debezium.source.database.password=postgres debezium.source.database.dbname=postgres debezium.source.database.server.name=tutorial debezium.source.schema.whitelist=test_schema debezium.source.plugin.name=pgoutput |
通过检查配置,我们可以看到我们拥有 Debezium 与 PostgreSQL 数据库通信的必要信息
由于此演示将涉及三个不同的软件组件,因此docker compose 将派上用场:
version: '3.1' services: redis: image: redis ports: - 6379:6379 depends_on: - postgres postgres: image: postgres restart: always environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres volumes: - ./postgresql.conf:/etc/postgresql/postgresql.conf - ./init:/docker-entrypoint-initdb.d command: - "-c" - "config_file=/etc/postgresql/postgresql.conf" ports: - 5432:5432 debezium: image: debezium/server volumes: - ./conf:/debezium/conf - ./data:/debezium/data depends_on: - redis |
通过使用Compose,我们能够在同一个网络上启动三个不同的软件组件。这有助于组件之间通过使用Compose上指定的服务的dns名称进行交互。同时,我们之前创建的配置文件也被挂载到Docker容器中。
为了让堆栈运行,我们将执行以下命令
$ docker compose up |
由于它已经启动并运行,我们现在可以开始监听事件了。
我们将登录 Redis 并开始监听任何可能的数据库更新:
$ docker exec -it debezium-example-redis-1 redis-cli > xread block 1000000 streams tutorial.test_schema.employee $ |
这会阻塞到我们从流中接收到一个事件。
如果我们检查流名称,我们应该看到 {server-name}.{schema}.{table} 的模式。这将允许消费者仅订阅感兴趣的更改。
$ docker exec -it debezium-example-postgres-1 psql postgres postgres > insert into test_schema.employee (firstname,lastname,email,age,salary) values ('John','Doe 1','[email protected]',18,1234.23); > \q |
如果我们检查 redis 会话,我们应该看到我们收到了来自 Redis 流的事件:
127.0.0.1:6379> xread block 1000000 streams tutorial.test_schema.employee $ 1) 1) "tutorial.test_schema.employee" 2) 1) 1) "1663796657336-0" 2) 1) "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"}],\"optional\":false,\"name\":\"tutorial.test_schema.employee.Key\"},\"payload\":{\"id\":1}}" 2) "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"firstname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"lastname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"},{\"type\":\"float\",\"optional\":true,\"field\":\"salary\"}],\"optional\":true,\"name\":\"tutorial.test_schema.employee.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"default\":0,\"field\":\"id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"firstname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"lastname\"},{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"age\"},{\"type\":\"float\",\"optional\":true,\"field\":\"salary\"}],\"optional\":true,\"name\":\"tutorial.test_schema.employee.Value\",\"field\":\"after\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,last,false,incremental\"},\"default\":\"false\",\"field\":\"snapshot\"},{\"type\":\"string\",\"optional\":false,\"field\":\"db\"},{\"type\":\"string\",\"optional\":true,\"field\":\"sequence\"},{\"type\":\"string\",\"optional\":false,\"field\":\"schema\"},{\"type\":\"string\",\"optional\":false,\"field\":\"table\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"txId\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"lsn\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"xmin\"}],\"optional\":false,\"name\":\"io.debezium.connector.postgresql.Source\",\"field\":\"source\"},{\"type\":\"string\",\"optional\":false,\"field\":\"op\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ms\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"total_order\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"data_collection_order\"}],\"optional\":true,\"field\":\"transaction\"}],\"optional\":false,\"name\":\"tutorial.test_schema.employee.Envelope\"},\"payload\":{\"before\":null,\"after\":{\"id\":1,\"firstname\":\"John\",\"lastname\":\"Doe 1\",\"email\":\"[email protected]\",\"age\":18,\"salary\":1234.23},\"source\":{\"version\":\ŕ.9.5.Final\",\"connector\":\"postgresql\",\"name\":\"tutorial\",\"ts_ms\":1663796656393,\"snapshot\":\"false\",\"db\":\"postgres\",\"sequence\":\"[null,\\\�\\\"]\",\"schema\":\"test_schema\",\"table\":\"employee\",\"txId\":738,\"lsn\":24289128,\"xmin\":null},\"op\":\"c\",\"ts_ms\":1663796657106,\"transaction\":null}}" (10.17s) 127.0.0.1:6379> |
您可以在GitHub 上找到源代码。
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK