Meetup PostgreSQL Lille
Gwendoline ROTROU, Julien RIOU
25 Janvier 2024
|
|
Extraction de la donnée depuis une source
Transformation de la donnée pour correspondre au format attendu
Chargement dans une destination
SELECT * FROM... WHERE id BETWEEN...Détection des changements dans une base de données
Déclenche des actions en fonction d’événements
OLD et NEWAFTER INSERT OR UPDATE OR DELETESystème de publication et de souscription
NOTIFYLISTENDécodage d’événements grâce aux Write-Ahead Logs (WAL)
CREATE PUBLICATIONCREATE SUBSCRIPTION*.include.list*.exclude.listINSERT, UPDATE, DELETE)pgoutputAPI REST de Kafka Connect
curl -XPOST -d @payload.json -s localhost:8083/connectors
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.ApplicationName": "debezium",
"database.dbname": "test",
"database.hostname": "${file:/path/to/secrets/test:hostname}",
"database.password": "${file:/path/to/secrets/test:password}",
"database.port": "${file:/path/to/secrets/test:port}",
"database.ssl.mode": "required",
"database.user": "${file:/path/to/secrets/test:user}",
"decimal.handling.mode": "double",
"field.name.adjustment.mode": "avro",
"heartbeat.action.query": "update ovh_debezium_heartbeat set ts=now();",
"heartbeat.interval.ms": "1000",
"incremental.snapshot.chunk.size": "100000",
"name": "test",
"plugin.name": "pgoutput",
"producer.override.compression.type": "lz4",
"publication.name": "debezium_test",
"sanitize.field.names": "true",
"schema.name.adjustment.mode": "avro",
"signal.data.collection": "public.ovh_debezium_signal",
"slot.name": "debezium_test",
"status.update.interval.ms": "1000",
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.delete.retention.ms": "604800000",
"topic.creation.default.partitions": "6",
"topic.creation.default.replication.factor": "3",
"topic.heartbeat.prefix": "namespace.debezium.heartbeat",
"topic.prefix": "namespace.test",
"transforms": "schema,public",
"transforms.public.regex": "namespace\\.test\\.public\\.(.+)",
"transforms.public.replacement": "namespace.test.$1",
"transforms.public.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.schema.regex": "namespace\\.test\\.((?!public).+)\\.(.+)",
"transforms.schema.replacement": "namespace.test.$1.$2",
"transforms.schema.type": "org.apache.kafka.connect.transforms.RegexRouter"
}
ansible-playbook -e '{"database_name":"test"}' database-connector-create.yml
ansible-playbook -e '{"database_name":"test"}' database-connector-delete.yml
{
"database.password": "${file:/path/to/secrets/test:password}"
}
Redémarrage manuel du connecteur
io.debezium.DebeziumException: Couldn't obtain encoding for database test
events
events_2023_10events_2023_11events_2023_12events_2024_01test=> select count(*) from events;
count
------------
1314817483
(1 row)
test=> select count(*) from only events;
count
-------
0
(1 row)
Transformation d’un nom de table en un nom de topic à l’aide de regex
{
"transforms": "events",
"transforms.events.type": "org.apache.kafka.connect.transforms.RegexRouter"
"transforms.events.regex": "namespace\\.database\\.public\\.events_[0-9]{4}_[0-9]{2}",
"transforms.events.replacement": "namespace.database.public.events",
}
{
"transforms": "public",
"transforms.public.type": "org.apache.kafka.connect.transforms.RegexRouter"
"transforms.public.regex": "namespace\\.test\\.public\\.(.+)",
"transforms.public.replacement": "namespace.test.$1",
}
Technique de stockage des attributs trop grands
UPDATE sans changement de l’attribut
{
"op": "u",
"before": null,
"after": {
"namespace.database.public.toasty.Value": {
"id": "2e142822-cc92-4c4f-9fdc-ec45504753a9",
"updated_at": {
"string": "2024-01-17T10:53:34.929630Z"
},
"clob": {
"string": "__debezium_unavailable_value"
}
...
ALTER TABLE... REPLICA IDENTITY DEFAULT
unavailable.value.placeholderALTER TABLE... REPLICA IDENTITY FULL
{
"heartbeat.interval.ms": "1000",
"heartbeat.action.query": "update ovh_debezium_heartbeat set ts=now();"
}
{
"topic.creation.default.partitions": "6",
"topic.creation.default.replication.factor": "3"
}
partitions (6) * replication factor (3))Topic routing
14 topics
252 partitions
-99.88 %
{
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.delete.retention.ms": "604800000"
}
{
"producer.override.compression.type": "lz4"
}