Obsah
1. Kafka Connect: tvorba producentů a konzumentů bez nutnosti udržovat zdrojový kód (2.část)
2. Konektor typu sink, který kontroluj schéma zpráv
3. Poslání celého souboru JSON do zvoleného tématu
4. Chování konektoru pro zprávy bez schématu
5. Přidání schématu přímo do zprávy
7. Povinné a volitelné atributy
8. Schémata pro složitěji strukturované zprávy
9. Zalogování konkrétního problému při zpracování zprávy v DLQ
10. Příklady hlaviček zpráv uložených do DLQ
11. Zápis přijatých zpráv do relační databáze
12. Nastavení cesty k Java archivům s JDBC konektorem i ovladači
13. Konfigurace konektoru pro zápis zpráv do PostgreSQL
14. Změna jména tématu, ukázka konzumace zpráv s jejich zápisem do databáze
15. Kontrola zpráv zapsaných do databáze
18. Repositář s demonstračními příklady, konfiguračními soubory a testovacími zprávami
1. Kontrola formátu zpráv vůči schématu
„When I started my journey with Apache Kafka, JSON was already everywhere. From Javascript UIs, through API calls, and even databases – it became a lingua franca of data exchange.“
Na úvodní článek o frameworku Kafka Connect dnes navážeme. V úvodní části dnešního textu se budeme zabývat způsobem definice a kontroly schématu zpráv, což je v oblasti heterogenních architektur založených na mikroslužbách (které se samy postupně vyvíjí) velmi užitečná a žádaná vlastnost.
Připomeňme si nejdříve, jakým způsobem je možné definovat konektor pro technologii Kafka Connect, který slouží pro definici konzumenta zpráv. Konzumované zprávy jsou ukládány do souboru nazvaného test.sink4.jsons a přitom se provádí kontrola, zda jak klíč zprávy (pokud existuje), tak i vlastní tělo zprávy jsou uloženy ve formátu JSON. Pokud tomu tak není, je zpráva přeposlána do takzvané dead letter queue, což je v našem případě konkrétně téma nazvané dlq_bad_jsons:
name=local-file-sink-json connector.class=FileStreamSink tasks.max=1 file=test.sink4.jsons topics=connect-test-json key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false errors.tolerance=all errors.deadletterqueue.topic.name=dlq_bad_jsons errors.deadletterqueue.topic.replication.factor=1
Tento konektor se spustí příkazem:
$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink-4.properties
Obsah DLQ si můžeme vypsat příkazem:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dlq_bad_jsons --partition 0 --offset earliest
V tomto případě je ovšem výhodnější použít nástroj Kafkacat a netrápit se s uváděním offsetů a oddílů:
$ kafkacat -b localhost:9092 -t dlq_bad_jsons -C
2. Konektor typu sink, který kontroluj schéma zpráv
Další konektor, který nakonfigurujeme v rámci této kapitoly, se od konektoru popsaného v úvodní kapitole odlišuje především tím, že má vlastnosti key.converter.schemas.enable a value.converter.schemas.enable nastaveny na hodnotu true, což znamená, že se konektor bude snažit zjistit, zda zpracovávané zprávy odpovídají schématu (samotné schéma prozatím ovšem nebudeme mít nikde definováno):
name=local-file-sink-json-checked connector.class=FileStreamSink tasks.max=1 file=test.sink5.jsons topics=connect-test-json key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true errors.tolerance=all errors.deadletterqueue.topic.name=dlq_bad_jsons errors.deadletterqueue.topic.replication.factor=1
Tento konektor, jenž je uložen v souboru connect-file-sink-5.properties, se spustí příkazem:
$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink-5.properties
3. Poslání celého souboru JSON do zvoleného tématu
Nyní se pokusíme do tématu se jménem connect-test-json předat zprávu ve formátu JSON, ovšem bez specifikace schématu:
{ "ID": 1, "Name": "Linus", "Surname": "Torvalds" }
Jak tuto operaci provedeme, pokud je zpráva uložena v souboru? V tomto případě nemůžeme obsah souboru přesměrovat přímo producentovi, protože ten by obsah chápal jako několik uložených zpráv – každá zpráva na jednom řádku. To znamená, že následující příkaz je nekorektní:
$ cat bad_msg.json | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic connect-test-json
V tomto případě si budeme muset pomoci malým trikem, například použitím nástroje jq, který dokáže zprávu ve formátu JSON převést do kompaktního (jednořádkového) tvaru, pokud použijeme přepínač -c:
Korektní způsob poslání obsahu souboru jako jediné zprávy tedy bude vypadat takto:
$ jq -c . msg3.json | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic connect-test-json
4. Chování konektoru pro zprávy bez schématu
Nyní si otestujme, jak se bude konektor chovat v případě, že mu pošleme zprávu z předchozí kapitoly, tedy správu bez schématu:
{ "ID": 1, "Name": "Linus", "Surname": "Torvalds" }
Po poslání zprávy bad_msg.json do námi používaného tématu zůstane soubor test.sink5.jsons prázdný:
$ file test.sink5.jsons test.sink5.jsons: empty
To musí nutně znamenat, že zpráva byla uložena do DLQ, o čemž se ostatně snadno přesvědčíme zadáním příkazu:
kafkacat -b localhost:9092 -t dlq_bad_jsons -C \ -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Offset: %o Headers: %h\n'
S tímto výsledkem (pro větší přehlednost vynecháme záznamy do DLQ provedené v rámci předchozího článku):
Key (-1 bytes): Value (44 bytes): {"ID":1,"Name":"Linus","Surname":"Torvalds"} Timestamp: 1676301868176 Partition: 0 Offset: 163 Headers:
5. Přidání schématu přímo do zprávy
Jedním z možných řešení „schematizace“ zpráv je přidání schématu přímo do vlastní zprávy. V tomto případě se tělo zprávy skládá z JSONu, který obsahuje dvojici atributů nazvaných schema a payload, přičemž význam obou atributů je zřejmý už z jejich názvu – první z atributů obsahuje schéma, druhý vlastní data. Z následující ukázky je zřejmé, jak může vypadat schéma zprávy předepisující JSON objekt se třemi atributy (fields), které jsou povinné a jsou typu řetězec:
{ "schema": { "type": "struct", "optional": false, "version": 1, "fields": [ { "field": "ID", "type": "string", "optional": false }, { "field": "Name", "type": "string", "optional": false }, { "field": "Surname", "type": "string", "optional": false } ] }, "payload": { "ID": "1", "Name": "Linus", "Surname": "Torvalds" } }
Vyzkoušejme nyní do tématu tuto zprávu poslat:
$ jq -c . msg3.json | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic connect-test-json
Nyní již bude soubor zachycující zprávy obsahovat tento řádek:
$ cat test.sink5.jsons Struct{ID=1,Name=Linus,Surname=Torvalds}
6. Atribut typu „celé číslo“
U jednotlivých atributů, které má zpráva obsahovat, se kromě jejich jména (field) musí specifikovat minimálně datový typ (typ) a popř. i další omezení (rozsah hodnot, regulární výraz u řetězce apod.) Příkladem může být následující zpráva, která má v atributu ID obsahovat celé číslo (pro jednoduchost se znaménkem, i když ID typicky bývají bez znaménka). V takovém případě použijeme typ int64 a celá zpráva včetně schématu může vypadat následovně:
{ "schema": { "type": "struct", "optional": false, "version": 1, "fields": [ { "field": "ID", "type": "int64", "optional": false }, { "field": "Name", "type": "string", "optional": false }, { "field": "Surname", "type": "string", "optional": false } ] }, "payload": { "ID": 1, "Name": "Linus", "Surname": "Torvalds" } }
7. Povinné a volitelné atributy
Podívejme se na následující zprávu, která má ve svém schématu předepsány tři povinné atributy pojmenované ID, Name a Surname. Všechny zmíněné atributy jsou povinné (resp. přesněji řečeno nejsou volitelné), ovšem v samotném těle zprávy chybí uvedení atributu Surname. Z tohoto důvodu nebude zpráva přijata (bude přeposlána do DLQ):
{ "schema": { "type": "struct", "optional": false, "version": 1, "fields": [ { "field": "ID", "type": "int64", "optional": false }, { "field": "Name", "type": "string", "optional": false }, { "field": "Surname", "type": "string", "optional": false } ] }, "payload": { "ID": 1, "Surname": "Torvalds" } }
V případě, že budeme chtít zpracovat i zprávy, jejichž některé atributy nejsou povinné, je zapotřebí takové atributy správně nakonfigurovat, což je ostatně dobře patrné při pohledu na zvýrazněnou vlastnost atributu Name. Nyní bude zpráva zpracována korektně (a nebude tedy poslána do DLQ):
{ "schema": { "type": "struct", "optional": false, "version": 1, "fields": [ { "field": "ID", "type": "int64", "optional": false }, { "field": "Name", "type": "string", "optional": true }, { "field": "Surname", "type": "string", "optional": false } ] }, "payload": { "ID": 1, "Surname": "Torvalds" } }
8. Schémata pro složitěji strukturované zprávy
Pro zprávy, které mají složitější strukturu, než pouhý výčet atributů primitivního datového typu je možné použít různé alternativní serializační formáty, z nichž dnes nejpopulárnější je Apache Avro. Tímto důležitým tématem se budeme zabývat příště.
9. Zalogování konkrétního problému při zpracování zprávy v DLQ
Prozatím jsme do DLQ ukládali „pouze“ těla takových zpráv, které nebyly korektně zpracovány, a to z různých důvodů – v naprosté většině případů proto, že neměly korektní formát (nejednalo se tedy o JSON) a/nebo neodpovídaly specifikovanému schématu (popř. nemělo samotné schéma správný formát). Ovšem konkrétní informace o tom, proč byla zpráva „zahozena“ do DLQ, se vlastně zpracováním a přesunem zprávy ztratí. Aby se do DLQ uložila i tato potenciálně velmi důležitá informace, je nutné konfigurační soubor konektoru nepatrně upravit přidáním řádku errors.deadletterqueue.context.headers.enable=true. Tím se zajistí, že se do hlavičky (resp. hlaviček) zprávy přeposlané do DLQ zapíše i konkrétní problém, který nastal, což většinou znamená zápis celého výpisu zásobníkových rámců JVM). Upravený soubor s konfigurací konektoru vypadá takto:
name=local-file-sink-json-checked connector.class=FileStreamSink tasks.max=1 file=test.sink6.jsons topics=connect-test-json key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true errors.tolerance=all errors.deadletterqueue.topic.name=dlq_bad_jsons errors.deadletterqueue.topic.replication.factor=1 errors.deadletterqueue.context.headers.enable=true
10. Příklady hlaviček zpráv uložených do DLQ
Pokud nyní spustíme konektor s konfigurací uvedenou v předchozí kapitole a pošleme do tématu connect-test-json několik zpráv, které nejsou (z různých důvodů) korektní, budeme se následně moci podívat na to, jaké informace a v jaké úrovni podrobností jsou zapsány do DLQ. Proto si necháme vypsat i hlavičky zpráv, což se v případě použití nástroje kafkacat provádí následovně:
kafkacat -b localhost:9092 -t dlq_bad_jsons -C \ -f '\nKey (%K bytes): %k Value (%S bytes): %s Timestamp: %T Partition: %p Offset: %o Headers: %h\n'
Kafkacat by měl u každé zprávy vypsat nejenom její tělo (value), ale i všechny hlavičky (headers), z nichž je možné relativně snadno vyčíst, proč nebyla zpráva zvalidována. Tyto důvody jsou na výpisu zvýrazněny:
Key (-1 bytes): Value (44 bytes): {"ID":1,"Name":"Linus","Surname":"Torvalds"} Timestamp: 1676302494257 Partition: 0 Offset: 164 Headers: __connect.errors.topic=connect-test-json,__connect.errors.partition=0,__connect.errors.offset=113,__connect.errors.connector.name=local-file-sink-json-checked,__connect.errors.task.id=0,__connect.errors.stage=VALUE_CONVERTER,__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter,__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException,__connect.errors.exception.message=JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.,__connect.errors.exception.stacktrace=org.apache.kafka.connect.errors. DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration. at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:328) at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88) at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:516) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:516) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Key (-1 bytes): Value (258 bytes): {"schema":{"type":"struct","optional":false,"version":1,"fields":[{"field":"ID","type":"int64","optional":false},{"field":"Name","type":"string","optional":false},{"field":"Surname","type":"string","optional":false}]},"payload":{"ID":1,"Surname":"Torvalds"}} Timestamp: 1676302522399 Partition: 0 Offset: 165 Headers: __connect.errors.topic=connect-test-json,__connect.errors.partition=0,__connect.errors.offset=116,__connect.errors.connector.name=local-file-sink-json-checked,__connect.errors.task.id=0,__connect.errors.stage=VALUE_CONVERTER,__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter,__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException,__connect.errors.exception.message=Invalid null value for required STRING field,__connect.errors.exception.stacktrace=org.apache.kafka.connect.errors. DataException: Invalid null value for required STRING field at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:685) at org.apache.kafka.connect.json.JsonConverter.lambda$static$11(JsonConverter.java:133) at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:730) at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:343) at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88) at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:516) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:516) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
11. Zápis přijatých zpráv do relační databáze
Poměrně velkou předností frameworku Kafka Connect je fakt, že je v současné době k dispozici velké množství konektorů, které umožňují Kafku velmi snadno propojit s dalšími zdroji či cíli dat (S3, MQTT, SQS atd.). Mezi potenciálně velmi užitečné konektory patří i JDBC konektor (konektory), které podporují posílání zpráv získaných z relační databáze nebo naopak ukládání zpráv do relační databáze. Takovou možnost jsme si ostatně zmínili již v úvodním článku:
Obrázek 1: Zprávy jsou přes jeden konektor načítány z databáze (například z PostgreSQL) a posílány do zvoleného tématu v Apache Kafce. Odtud si je načítá (konzumuje) další konektor, který zprávy ukládá do Hadoopu.
V následujících kapitolách si proto ukážeme, jak lze ukládat zprávy do známé databáze PostgreSQL. Nejdříve je však nutné stáhnout a rozbalit balíček s JDBC ovladači i vlastními konektory. Tento balíček naleznete na adrese https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc.
12. Nastavení cesty k Java archivům s JDBC konektorem i ovladači
Po rozbalení je nutné korektně nastavit cestu k Java archivům získaným ze staženého souboru. Tato cesta se nastavuje v souboru config/connect-standalone.properties a v mém konkrétním případě jsem provedl změny, které jsou zvýrazněny:
# These are defaults. This file just demonstrates how to override some settings. bootstrap.servers=localhost:9092 # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will # need to configure these based on the format they want their data in when loaded from or stored into Kafka key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply # it to key.converter.schemas.enable=false value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins # (connectors, converters, transformations). The list should consist of top level directories that include # any combination of: # a) directories immediately containing jars with plugins and their dependencies # b) uber-jars with plugins and their dependencies # c) directories immediately containing the package directory structure of classes of plugins and their dependencies # Note: symlinks will be followed to discover dependencies or plugins. # Examples: # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, plugin.path=/home/ptisnovs/kafka/kafka_2.12-3.3.2/libs/,/home/ptisnovs/kafka/confluentinc-kafka-connect-jdbc-10.6.0/lib/,
13. Konfigurace konektoru pro zápis zpráv do PostgreSQL
Podívejme se nyní na to, jak by mohla vypadat konfigurace jednoduchého konektoru typu sink, který bude konzumované zprávy zapisovat do databáze PostgreSQL. V konfiguraci nalezneme především odlišnou třídu s implementací konektoru:
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
Nejdůležitější konfigurační volbou je connection.url, který obsahuje (na databázi závislý) „connection string“. V našem konkrétním případě se budeme připojovat k lokálně běžícímu Postgresu s uvedeným jménem a heslem (což v praxi bude nutné řešit jinak – bezpečněji). A jméno databáze, do které se budou zprávy zapisovat, se jmenuje kafka_sink:
connection.url=jdbc:postgresql://localhost:5432/kafka_sink?user=postgres&password=postgres
A konečně se volbou auto.create povolí tvorba tabulky/tabulek pro zápis dat:
auto.create=true
Celá konfigurace konektoru vypadá následovně:
name=db-sink connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=1 topics=connect-test-db key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true connection.url=jdbc:postgresql://localhost:5432/kafka_sink?user=postgres&password=postgres auto.create=true delete.enabled=false
Do jaké tabulky se však budou zprávy zapisovat? Jméno tabulky je přímo odvozeno od jména tématu, což bude poněkud problematické, neboť se ve jméně tabulky objeví znaky „-“. O tom se lze snadno přesvědčit spuštěním konektoru, posláním zprávy a kontrolou databáze kafka_sink v klientu psql (podrobněji si to ukážeme o několik odstavců níže):
kafka_sink=# \d List of relations Schema | Name | Type | Owner --------+-----------------+-------+---------- public | connect-test-db | table | postgres (1 rows)
14. Změna jména tématu, ukázka konzumace zpráv s jejich zápisem do databáze
Aby bylo možné snadněji pracovat s databázovou tabulkou vytvářenou a naplňovanou konektorem, změníme jméno tématu konzumované konektorem. Konkrétně použijeme téma nazvané test_table (což není příliš přiléhavé, ale v databázi se nám tato tabulka bude snadno hledat):
name=db-sink connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=1 topics=test_table key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true connection.url=jdbc:postgresql://localhost:5432/kafka_sink?user=postgres&password=postgres auto.create=true delete.enabled=false
Před spuštěním konektoru se přesvědčíme, že v Postgresu existuje databáze kafka_sink a popř. ji vytvoříme příkazem CREATE DATABASE:
$ psql -U postgres Password for user postgres: psql (9.6.10) Type "help" for help. postgres=# \l List of databases Name | Owner | Encoding | Collate | Ctype | Access privileges --------------+----------+----------+-------------+-------------+----------------------- aggregator | postgres | UTF8 | en_US.UTF-8 | en_US.UTF-8 | controller | postgres | UTF8 | en_US.UTF-8 | en_US.UTF-8 | cve | postgres | UTF8 | en_US.UTF-8 | en_US.UTF-8 | kafka_sink | postgres | UTF8 | en_US.UTF-8 | en_US.UTF-8 | notification | postgres | UTF8 | en_US.UTF-8 | en_US.UTF-8 | postgres | postgres | UTF8 | en_US.UTF-8 | en_US.UTF-8 | ptisnovs | postgres | UTF8 | en_US.UTF-8 | en_US.UTF-8 | template0 | postgres | UTF8 | en_US.UTF-8 | en_US.UTF-8 | =c/postgres + | | | | | postgres=CTc/postgres template1 | postgres | UTF8 | en_US.UTF-8 | en_US.UTF-8 | =c/postgres + | | | | | postgres=CTc/postgres test | postgres | UTF8 | en_US.UTF-8 | en_US.UTF-8 | (11 rows)
Spustíme nově nakonfigurovaný konektor a pošleme do něj zprávy s korektním schématem. V logu by se mj. měly objevit zprávy o vytvoření nové tabulky. Povšimněte si, že sloupce v tabulce jsou získány ze schématu, tedy tabulka test_table bude mít tři sloupce ID, Name a Surname:
[2023-02-13 19:38:35,693] INFO [db-sink|task-0] Checking PostgreSql dialect for existence of TABLE "test_table" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:586) [2023-02-13 19:38:35,698] INFO [db-sink|task-0] Using PostgreSql dialect TABLE "test_table" absent (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:594) [2023-02-13 19:38:35,700] INFO [db-sink|task-0] Creating table with sql: CREATE TABLE "test_table" ( "ID" TEXT NOT NULL, "Name" TEXT NOT NULL, "Surname" TEXT NOT NULL) (io.confluent.connect.jdbc.sink.DbStructure:122) [2023-02-13 19:38:35,710] INFO [db-sink|task-0] Checking PostgreSql dialect for existence of TABLE "test_table" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:586) [2023-02-13 19:38:35,715] INFO [db-sink|task-0] Using PostgreSql dialect TABLE "test_table" present (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:594) [2023-02-13 19:38:35,727] INFO [db-sink|task-0] Checking PostgreSql dialect for type of TABLE "test_table" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:880) [2023-02-13 19:38:35,730] INFO [db-sink|task-0] Setting metadata for table "test_table" to Table{name='"test_table"', type=TABLE columns=[Column{'Name', isPrimaryKey=false, allowsNull=false, sqlType=text}, Column{'Surname', isPrimaryKey=false, allowsNull=false, sqlType=text}, Column{'ID', isPrimaryKey=false, allowsNull=false, sqlType=text}]} (io.confluent.connect.jdbc.util.TableDefinitions:64)
15. Kontrola zpráv zapsaných do databáze
Nyní nastal čas na kontrolu, zda byly všechny zprávy skutečně naším konektorem zapsány do databáze. Nejprve se proto přihlásíme do Postgresu přes konzoli psql:
$ psql -U postgres Password for user postgres: psql (9.6.10) Type "help" for help.
Dále se přepneme do databáze kafka_sink. Tato databáze by měla existovat, protože kontrolu jsme provedli již v předchozí kapitole:
postgres=# \c kafka_sink You are now connected to database "kafka_sink" as user "postgres".
Vypíšeme si všechny tabulky popř. další objekty ze zvolené databáze:
kafka_sink=# \d List of relations Schema | Name | Type | Owner --------+-----------------+-------+---------- public | connect-test-db | table | postgres public | test_table | table | postgres (2 rows)
Vidíme, že existuje jak poněkud nešťastně pojmenovaná tabulka connect-test-db vytvořená prvním „databázovým“ konektorem, tak i tabulka druhá. Bude nás zajímat především struktura druhé tabulky, takže si ji necháme zobrazit:
kafka_sink=# \d+ test_table Table "public.test_table" Column | Type | Modifiers | Storage | Stats target | Description ---------+------+-----------+----------+--------------+------------- ID | text | not null | extended | | Name | text | not null | extended | | Surname | text | not null | extended | |
A konečně si pochopitelně můžeme nechat vypsat zprávy zapsané do této tabulky:
kafka_sink=# select * from test_table; ID | Name | Surname ----+-------+---------- 1 | Linus | Torvalds (1 row)
16. Registr schémat
Prozatím jsme posílali schéma těla každé zprávy přímo v dané zprávě. To je sice na první pohled velmi flexibilní řešení, ovšem znemožňuje to efektivní centrální správu schémat popř. zajištění jednotného schématu pro všechny zprávy ve vybraném tématu. I tuto problematiku je možné řešit, a to konkrétně s využitím takzvaného registru schémat (https://docs.confluent.io/platform/current/schema-registry/index.html#sr-overview). Jedná se o samostatně běžící službu, která poskytuje schémata Kafce i konektorům a navíc umožňuje správu verzí těchto schémat atd. Touto velmi zajímavou a užitečnou problematikou se budeme zabývat příště.
17. Obsah třetí části článku
Ve třetí části článku o frameworku Kafka Connect se budeme zabývat především prací s registrem schémat, o němž jsme se ve stručnosti zmínili v předchozí kapitole. Tento registr se totiž dá použít k mnoha účelům, například pro podporu transformace zpráv mezi různými verzemi schématu atd. – tyto transformace (popř. akceptace starších schémat) se tedy nemusí provádět ručně v programovém kódu, což je velmi pracné a mnohdy se musí provádět ve více producentech a/nebo konzumentech.
18. Repositář s demonstračními příklady, konfiguračními soubory a testovacími zprávami
Demonstrační příklady ukázané minule byly společně s konfiguracemi konektorů určenými pro Kafka Connect a ukázkovými zprávami uloženy do repositáře, jenž je dostupný na adrese https://github.com/tisnik/slides/. V případě, že nebudete chtít klonovat celý repositář, můžete namísto toho použít odkazy na jednotlivé demonstrační příklady i další soubory, které naleznete v následující tabulce:
# | Soubor | Stručný popis | Adresa |
---|---|---|---|
1 | producer1.go | jednoduchý producent zpráv pro Apache Kafku naprogramovaný v jazyku Go (Sarama) | https://github.com/tisnik/slides/blob/master/files/kafka/producer1.go |
2 | producer2.go | jednoduchý producent zpráv pro Apache Kafku naprogramovaný v jazyku Go (Sarama) | https://github.com/tisnik/slides/blob/master/files/kafka/producer2.go |
3 | consumer1.go | jednoduchý konzument zpráv pro Apache Kafku naprogramovaný v jazyku Go (Confluent Kafka) | https://github.com/tisnik/slides/blob/master/files/kafka/consumer1.go |
4 | consumer2.go | jednoduchý konzument zpráv pro Apache Kafku naprogramovaný v jazyku Go (Confluent Kafka) | https://github.com/tisnik/slides/blob/master/files/kafka/consumer2.go |
5 | producer.py | jednoduchý producent zpráv pro Apache Kafku naprogramovaný v jazyku Python | https://github.com/tisnik/slides/blob/master/files/kafka/producer.py |
6 | consumer.py | jednoduchý konzument zpráv pro Apache Kafku naprogramovaný v jazyku Python | https://github.com/tisnik/slides/blob/master/files/kafka/consumer.py |
7 | SimpleProducer.java | jednoduchý producent zpráv pro Apache Kafku naprogramovaný v jazyku Java | https://github.com/tisnik/slides/blob/master/files/kafka/SimpleProducer.java |
8 | SimpleConsumer.java | jednoduchý konzument zpráv npro Apache Kafku aprogramovaný v jazyku Java | https://github.com/tisnik/slides/blob/master/files/kafka/SimpleConsumer.java |
9 | connect-file-sink.properties | konfigurace konektoru technologie Kafka Connect: zápis přijatých zpráv (v řetězcovém formátu) do souboru | https://github.com/tisnik/slides/blob/master/files/kafka/connect-file-sink.properties |
10 | connect-file-sink-2.properties | konfigurace konektoru technologie Kafka Connect: zápis přijatých zpráv ve formátu JSON bez kontroly schématu | https://github.com/tisnik/slides/blob/master/files/kafka/connect-file-sink-2.properties |
11 | connect-file-sink-3.properties | konfigurace konektoru technologie Kafka Connect: dtto, ale konektor není ukončen po přijetí nekorektní zprávy | https://github.com/tisnik/slides/blob/master/files/kafka/connect-file-sink-3.properties |
12 | connect-file-sink-4.properties | konfigurace konektoru technologie Kafka Connect: nekorektní zprávy budou uloženy do DLQ | https://github.com/tisnik/slides/blob/master/files/kafka/connect-file-sink-4.properties |
13 | connect-file-sink-5.properties | konfigurace konektoru technologie Kafka Connect: kontrola zpráv oproti schématu | https://github.com/tisnik/slides/blob/master/files/kafka/connect-file-sink-5.properties |
14 | connect-file-sink-6.properties | konfigurace konektoru technologie Kafka Connect: zápis konkrétní chyby do DLQ | https://github.com/tisnik/slides/blob/master/files/kafka/connect-file-sink-6.properties |
15 | db-sink-1.properties | konfigurace konektoru technologie Kafka Connect: uložení zpráv do databáze | https://github.com/tisnik/slides/blob/master/files/kafka/db-sink-1.properties |
16 | db-sink-2.properties | konfigurace konektoru technologie Kafka Connect: uložení zpráv do databáze (vhodnější jméno tabulky) | https://github.com/tisnik/slides/blob/master/files/kafka/db-sink-2.properties |
17 | msg1.json | zpráva uložená ve formátu JSON obsahující i schéma | https://github.com/tisnik/slides/blob/master/files/kafka/msg1.json |
18 | msg2.json | schéma s jedním celočíselným atributem, včetně vlastního těla zprávy | https://github.com/tisnik/slides/blob/master/files/kafka/msg2.json |
19 | msg3.json | nekorektní zpráva: chybějící povinné atributy předepsané schématem | https://github.com/tisnik/slides/blob/master/files/kafka/msg3.json |
20 | msg4.json | schéma s nepovinnými atributy + korektní zpráva, která odpovídá schématu | https://github.com/tisnik/slides/blob/master/files/kafka/msg4.json |
21 | bad_msg.json | zpráva ve formátu JSON, která ovšem neobsahuje schéma | https://github.com/tisnik/slides/blob/master/files/kafka/bad_msg.json |
19. Odkazy na Internetu
- Kafka Connect and Schemas
https://rmoff.net/2020/01/22/kafka-connect-and-schemas/ - JSON and schemas
https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/#json-schemas - What, why, when to use Apache Kafka, with an example
https://www.startdataengineering.com/post/what-why-and-how-apache-kafka/ - When NOT to use Apache Kafka?
https://www.kai-waehner.de/blog/2022/01/04/when-not-to-use-apache-kafka/ - Microservices: The Rise Of Kafka
https://movio.co/blog/microservices-rise-kafka/ - Building a Microservices Ecosystem with Kafka Streams and KSQL
https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/ - An introduction to Apache Kafka and microservices communication
https://medium.com/@ulymarins/an-introduction-to-apache-kafka-and-microservices-communication-bf0a0966d63 - kappa-architecture.com
http://milinda.pathirage.org/kappa-architecture.com/ - Questioning the Lambda Architecture
https://www.oreilly.com/ideas/questioning-the-lambda-architecture - Lambda architecture
https://en.wikipedia.org/wiki/Lambda_architecture - Kafka – ecosystem (Wiki)
https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem - The Kafka Ecosystem – Kafka Core, Kafka Streams, Kafka Connect, Kafka REST Proxy, and the Schema Registry
http://cloudurable.com/blog/kafka-ecosystem/index.html - A Kafka Operator for Kubernetes
https://github.com/krallistic/kafka-operator - Kafka Streams
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams - Kafka Streams
http://kafka.apache.org/documentation/streams/ - Kafka Streams (FAQ)
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Streams - Event stream processing
https://en.wikipedia.org/wiki/Event_stream_processing - Part 1: Apache Kafka for beginners – What is Apache Kafka?
https://www.cloudkarafka.com/blog/2016–11–30-part1-kafka-for-beginners-what-is-apache-kafka.html - What are some alternatives to Apache Kafka?
https://www.quora.com/What-are-some-alternatives-to-Apache-Kafka - What is the best alternative to Kafka?
https://www.slant.co/options/961/alternatives/~kafka-alternatives - A super quick comparison between Kafka and Message Queues
https://hackernoon.com/a-super-quick-comparison-between-kafka-and-message-queues-e69742d855a8?gi=e965191e72d0 - Kafka Queuing: Kafka as a Messaging System
https://dzone.com/articles/kafka-queuing-kafka-as-a-messaging-system - Configure Self-Managed Connectors
https://docs.confluent.io/kafka-connectors/self-managed/configuring.html#configure-self-managed-connectors - Schema Evolution and Compatibility
https://docs.confluent.io/platform/current/schema-registry/avro.html#schema-evolution-and-compatibility - Configuring Key and Value Converters
https://docs.confluent.io/kafka-connectors/self-managed/userguide.html#configuring-key-and-value-converters - Introduction to Kafka Connectors
https://www.baeldung.com/kafka-connectors-guide - Kafka CLI: command to list all consumer groups for a topic?
https://stackoverflow.com/questions/63883999/kafka-cli-command-to-list-all-consumer-groups-for-a-topic - Java Property File Processing
https://www.w3resource.com/java-tutorial/java-propertyfile-processing.php - Skipping bad records with the Kafka Connect JDBC sink connector
https://rmoff.net/2019/10/15/skipping-bad-records-with-the-kafka-connect-jdbc-sink-connector/ - Kafka Connect Deep Dive – Error Handling and Dead Letter Queues
https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/ - Errors and Dead Letter Queues
https://developer.confluent.io/learn-kafka/kafka-connect/error-handling-and-dead-letter-queues/ - Confluent Cloud Dead Letter Queue
https://docs.confluent.io/cloud/current/connectors/dead-letter-queue.html - Dead Letter Queues (DLQs) in Kafka
https://medium.com/@sannidhi.s.t/dead-letter-queues-dlqs-in-kafka-afb4b6835309 - Deserializer
https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-json.html#json-schema-serializer-and-deserializer - JSON, Kafka, and the need for schema
https://mikemybytes.com/2022/07/11/json-kafka-and-the-need-for-schema/ - Using Kafka Connect with Schema Registry
https://docs.confluent.io/platform/current/schema-registry/connect.html - Zpracování dat reprezentovaných ve formátu JSON nástrojem jq
https://www.root.cz/clanky/zpracovani-dat-reprezentovanych-ve-formatu-json-nastrojem-jq/ - Repositář projektu jq (GitHub)
https://github.com/stedolan/jq - GitHub stránky projektu jq
https://stedolan.github.io/jq/ - 5 modern alternatives to essential Linux command-line tools
https://opensource.com/article/20/6/modern-linux-command-line-tools - Návod k nástroji jq
https://stedolan.github.io/jq/tutorial/ - jq Manual (development version)
https://stedolan.github.io/jq/manual/ - Introducing JSON
https://www.json.org/json-en.html - Understanding JSON schema
https://json-schema.org/understanding-json-schema/index.html - JDBC Sink Connector for Confluent Platform
https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/overview.html#jdbc-sink-connector-for-cp - JDBC Connector (Source and Sink)
https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc - Introduction to Schema Registry in Kafka
https://medium.com/slalom-technology/introduction-to-schema-registry-in-kafka-915ccf06b902 - Understanding JSON Schema Compatibility
https://yokota.blog/2021/03/29/understanding-json-schema-compatibility/