Kafka Connect: definice a kontrola schématu zpráv

16. 2. 2023
Doba čtení: 26 minut

Sdílet

 Autor: Apache Foundation
Na úvodní článek o frameworku Kafka Connect dnes navážeme a budeme se zabývat definicí a kontrolou schématu zpráv, což je v oblasti heterogenních architektur založených na mikroslužbách velmi užitečná a žádaná vlastnost.

Obsah

1. Kafka Connect: tvorba producentů a konzumentů bez nutnosti udržovat zdrojový kód (2.část)

2. Konektor typu sink, který kontroluj schéma zpráv

3. Poslání celého souboru JSON do zvoleného tématu

4. Chování konektoru pro zprávy bez schématu

5. Přidání schématu přímo do zprávy

6. Atribut typu „celé číslo“

7. Povinné a volitelné atributy

8. Schémata pro složitěji strukturované zprávy

9. Zalogování konkrétního problému při zpracování zprávy v DLQ

10. Příklady hlaviček zpráv uložených do DLQ

11. Zápis přijatých zpráv do relační databáze

12. Nastavení cesty k Java archivům s JDBC konektorem i ovladači

13. Konfigurace konektoru pro zápis zpráv do PostgreSQL

14. Změna jména tématu, ukázka konzumace zpráv s jejich zápisem do databáze

15. Kontrola zpráv zapsaných do databáze

16. Registr schémat

17. Obsah třetí části článku

18. Repositář s demonstračními příklady, konfiguračními soubory a testovacími zprávami

19. Odkazy na Internetu

1. Kontrola formátu zpráv vůči schématu

„When I started my journey with Apache Kafka, JSON was already everywhere. From Javascript UIs, through API calls, and even databases – it became a lingua franca of data exchange.“

Na úvodní článek o frameworku Kafka Connect dnes navážeme. V úvodní části dnešního textu se budeme zabývat způsobem definice a kontroly schématu zpráv, což je v oblasti heterogenních architektur založených na mikroslužbách (které se samy postupně vyvíjí) velmi užitečná a žádaná vlastnost.

Připomeňme si nejdříve, jakým způsobem je možné definovat konektor pro technologii Kafka Connect, který slouží pro definici konzumenta zpráv. Konzumované zprávy jsou ukládány do souboru nazvaného test.sink4.jsons a přitom se provádí kontrola, zda jak klíč zprávy (pokud existuje), tak i vlastní tělo zprávy jsou uloženy ve formátu JSON. Pokud tomu tak není, je zpráva přeposlána do takzvané dead letter queue, což je v našem případě konkrétně téma nazvané dlq_bad_jsons:

name=local-file-sink-json
connector.class=FileStreamSink
tasks.max=1
file=test.sink4.jsons
topics=connect-test-json
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
errors.tolerance=all
errors.deadletterqueue.topic.name=dlq_bad_jsons
errors.deadletterqueue.topic.replication.factor=1

Tento konektor se spustí příkazem:

$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink-4.properties

Obsah DLQ si můžeme vypsat příkazem:

$ bin/kafka-console-consumer.sh  --bootstrap-server localhost:9092 --topic dlq_bad_jsons --partition 0 --offset earliest

V tomto případě je ovšem výhodnější použít nástroj Kafkacat a netrápit se s uváděním offsetů a oddílů:

$ kafkacat -b localhost:9092 -t dlq_bad_jsons -C

2. Konektor typu sink, který kontroluj schéma zpráv

Další konektor, který nakonfigurujeme v rámci této kapitoly, se od konektoru popsaného v úvodní kapitole odlišuje především tím, že má vlastnosti key.converter.schemas.enable a value.converter.schemas.enable nastaveny na hodnotu true, což znamená, že se konektor bude snažit zjistit, zda zpracovávané zprávy odpovídají schématu (samotné schéma prozatím ovšem nebudeme mít nikde definováno):

name=local-file-sink-json-checked
connector.class=FileStreamSink
tasks.max=1
file=test.sink5.jsons
topics=connect-test-json
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
errors.tolerance=all
errors.deadletterqueue.topic.name=dlq_bad_jsons
errors.deadletterqueue.topic.replication.factor=1

Tento konektor, jenž je uložen v souboru connect-file-sink-5.properties, se spustí příkazem:

$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-sink-5.properties

3. Poslání celého souboru JSON do zvoleného tématu

Nyní se pokusíme do tématu se jménem connect-test-json předat zprávu ve formátu JSON, ovšem bez specifikace schématu:

{
  "ID": 1,
  "Name": "Linus",
  "Surname": "Torvalds"
}

Jak tuto operaci provedeme, pokud je zpráva uložena v souboru? V tomto případě nemůžeme obsah souboru přesměrovat přímo producentovi, protože ten by obsah chápal jako několik uložených zpráv – každá zpráva na jednom řádku. To znamená, že následující příkaz je nekorektní:

$ cat bad_msg.json | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic connect-test-json

V tomto případě si budeme muset pomoci malým trikem, například použitím nástroje jq, který dokáže zprávu ve formátu JSON převést do kompaktního (jednořádkového) tvaru, pokud použijeme přepínač -c:

Korektní způsob poslání obsahu souboru jako jediné zprávy tedy bude vypadat takto:

$ jq -c . msg3.json | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic connect-test-json
Poznámka: nástroj jq, který je v praxi velmi užitečný, jsme si popsali v článku Zpracování dat reprezentovaných ve formátu JSON nástrojem jq.

4. Chování konektoru pro zprávy bez schématu

Nyní si otestujme, jak se bude konektor chovat v případě, že mu pošleme zprávu z předchozí kapitoly, tedy správu bez schématu:

{
  "ID": 1,
  "Name": "Linus",
  "Surname": "Torvalds"
}

Po poslání zprávy bad_msg.json do námi používaného tématu zůstane soubor test.sink5.jsons prázdný:

$ file test.sink5.jsons 
 
test.sink5.jsons: empty

To musí nutně znamenat, že zpráva byla uložena do DLQ, o čemž se ostatně snadno přesvědčíme zadáním příkazu:

kafkacat -b localhost:9092 -t dlq_bad_jsons -C \
  -f '\nKey (%K bytes): %k
  Value (%S bytes): %s
  Timestamp: %T
  Partition: %p
  Offset: %o
  Headers: %h\n'

S tímto výsledkem (pro větší přehlednost vynecháme záznamy do DLQ provedené v rámci předchozího článku):

Key (-1 bytes):
  Value (44 bytes): {"ID":1,"Name":"Linus","Surname":"Torvalds"}
  Timestamp: 1676301868176
  Partition: 0
  Offset: 163
  Headers:

5. Přidání schématu přímo do zprávy

Jedním z možných řešení „schematizace“ zpráv je přidání schématu přímo do vlastní zprávy. V tomto případě se tělo zprávy skládá z JSONu, který obsahuje dvojici atributů nazvaných schema a payload, přičemž význam obou atributů je zřejmý už z jejich názvu – první z atributů obsahuje schéma, druhý vlastní data. Z následující ukázky je zřejmé, jak může vypadat schéma zprávy předepisující JSON objekt se třemi atributy (fields), které jsou povinné a jsou typu řetězec:

{
  "schema": {
    "type": "struct",
    "optional": false,
    "version": 1,
    "fields": [
      {
        "field": "ID",
        "type": "string",
        "optional": false
      },
      {
        "field": "Name",
        "type": "string",
        "optional": false
      },
      {
        "field": "Surname",
        "type": "string",
        "optional": false
      }
    ]
  },
  "payload": {
    "ID": "1",
    "Name": "Linus",
    "Surname": "Torvalds"
  }
}

Vyzkoušejme nyní do tématu tuto zprávu poslat:

$ jq -c . msg3.json | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic connect-test-json

Nyní již bude soubor zachycující zprávy obsahovat tento řádek:

$ cat test.sink5.jsons 
 
Struct{ID=1,Name=Linus,Surname=Torvalds}
Poznámka: povšimněte si, jak se změnil formát zprávy – nyní se jedná o strukturu s pojmenovanými atributy a bez schématu.

6. Atribut typu „celé číslo“

U jednotlivých atributů, které má zpráva obsahovat, se kromě jejich jména (field) musí specifikovat minimálně datový typ (typ) a popř. i další omezení (rozsah hodnot, regulární výraz u řetězce apod.) Příkladem může být následující zpráva, která má v atributu ID obsahovat celé číslo (pro jednoduchost se znaménkem, i když ID typicky bývají bez znaménka). V takovém případě použijeme typ int64 a celá zpráva včetně schématu může vypadat následovně:

{
  "schema": {
    "type": "struct",
    "optional": false,
    "version": 1,
    "fields": [
      {
        "field": "ID",
        "type": "int64",
        "optional": false
      },
      {
        "field": "Name",
        "type": "string",
        "optional": false
      },
      {
        "field": "Surname",
        "type": "string",
        "optional": false
      }
    ]
  },
  "payload": {
    "ID": 1,
    "Name": "Linus",
    "Surname": "Torvalds"
  }
}
Poznámka: payload zprávy plně odpovídá schématu, takže bude zpráva bez problémů přijata.

7. Povinné a volitelné atributy

Podívejme se na následující zprávu, která má ve svém schématu předepsány tři povinné atributy pojmenované ID, Name a Surname. Všechny zmíněné atributy jsou povinné (resp. přesněji řečeno nejsou volitelné), ovšem v samotném těle zprávy chybí uvedení atributu Surname. Z tohoto důvodu nebude zpráva přijata (bude přeposlána do DLQ):

{
  "schema": {
    "type": "struct",
    "optional": false,
    "version": 1,
    "fields": [
      {
        "field": "ID",
        "type": "int64",
        "optional": false
      },
      {
        "field": "Name",
        "type": "string",
        "optional": false
      },
      {
        "field": "Surname",
        "type": "string",
        "optional": false
      }
    ]
  },
  "payload": {
    "ID": 1,
    "Surname": "Torvalds"
  }
}

V případě, že budeme chtít zpracovat i zprávy, jejichž některé atributy nejsou povinné, je zapotřebí takové atributy správně nakonfigurovat, což je ostatně dobře patrné při pohledu na zvýrazněnou vlastnost atributu Name. Nyní bude zpráva zpracována korektně (a nebude tedy poslána do DLQ):

{
  "schema": {
    "type": "struct",
    "optional": false,
    "version": 1,
    "fields": [
      {
        "field": "ID",
        "type": "int64",
        "optional": false
      },
      {
        "field": "Name",
        "type": "string",
        "optional": true
      },
      {
        "field": "Surname",
        "type": "string",
        "optional": false
      }
    ]
  },
  "payload": {
    "ID": 1,
    "Surname": "Torvalds"
  }
}

8. Schémata pro složitěji strukturované zprávy

Pro zprávy, které mají složitější strukturu, než pouhý výčet atributů primitivního datového typu je možné použít různé alternativní serializační formáty, z nichž dnes nejpopulárnější je Apache Avro. Tímto důležitým tématem se budeme zabývat příště.

9. Zalogování konkrétního problému při zpracování zprávy v DLQ

Prozatím jsme do DLQ ukládali „pouze“ těla takových zpráv, které nebyly korektně zpracovány, a to z různých důvodů – v naprosté většině případů proto, že neměly korektní formát (nejednalo se tedy o JSON) a/nebo neodpovídaly specifikovanému schématu (popř. nemělo samotné schéma správný formát). Ovšem konkrétní informace o tom, proč byla zpráva „zahozena“ do DLQ, se vlastně zpracováním a přesunem zprávy ztratí. Aby se do DLQ uložila i tato potenciálně velmi důležitá informace, je nutné konfigurační soubor konektoru nepatrně upravit přidáním řádku errors.deadletterqueue.con­text.headers.enable=true. Tím se zajistí, že se do hlavičky (resp. hlaviček) zprávy přeposlané do DLQ zapíše i konkrétní problém, který nastal, což většinou znamená zápis celého výpisu zásobníkových rámců JVM). Upravený soubor s konfigurací konektoru vypadá takto:

name=local-file-sink-json-checked
connector.class=FileStreamSink
tasks.max=1
file=test.sink6.jsons
topics=connect-test-json
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
errors.tolerance=all
errors.deadletterqueue.topic.name=dlq_bad_jsons
errors.deadletterqueue.topic.replication.factor=1
errors.deadletterqueue.context.headers.enable=true

10. Příklady hlaviček zpráv uložených do DLQ

Pokud nyní spustíme konektor s konfigurací uvedenou v předchozí kapitole a pošleme do tématu connect-test-json několik zpráv, které nejsou (z různých důvodů) korektní, budeme se následně moci podívat na to, jaké informace a v jaké úrovni podrobností jsou zapsány do DLQ. Proto si necháme vypsat i hlavičky zpráv, což se v případě použití nástroje kafkacat provádí následovně:

kafkacat -b localhost:9092 -t dlq_bad_jsons -C \
  -f '\nKey (%K bytes): %k
  Value (%S bytes): %s
  Timestamp: %T
  Partition: %p
  Offset: %o
  Headers: %h\n'

Kafkacat by měl u každé zprávy vypsat nejenom její tělo (value), ale i všechny hlavičky (headers), z nichž je možné relativně snadno vyčíst, proč nebyla zpráva zvalidována. Tyto důvody jsou na výpisu zvýrazněny:

Key (-1 bytes):
  Value (44 bytes): {"ID":1,"Name":"Linus","Surname":"Torvalds"}
  Timestamp: 1676302494257
  Partition: 0
  Offset: 164
  Headers: __connect.errors.topic=connect-test-json,__connect.errors.partition=0,__connect.errors.offset=113,__connect.errors.connector.name=local-file-sink-json-checked,__connect.errors.task.id=0,__connect.errors.stage=VALUE_CONVERTER,__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter,__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException,__connect.errors.exception.message=JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.,__connect.errors.exception.stacktrace=org.apache.kafka.connect.errors.
DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields.
If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:328)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:516)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:516)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
 
 
Key (-1 bytes):
  Value (258 bytes): {"schema":{"type":"struct","optional":false,"version":1,"fields":[{"field":"ID","type":"int64","optional":false},{"field":"Name","type":"string","optional":false},{"field":"Surname","type":"string","optional":false}]},"payload":{"ID":1,"Surname":"Torvalds"}}
  Timestamp: 1676302522399
  Partition: 0
  Offset: 165
  Headers: __connect.errors.topic=connect-test-json,__connect.errors.partition=0,__connect.errors.offset=116,__connect.errors.connector.name=local-file-sink-json-checked,__connect.errors.task.id=0,__connect.errors.stage=VALUE_CONVERTER,__connect.errors.class.name=org.apache.kafka.connect.json.JsonConverter,__connect.errors.exception.class.name=org.apache.kafka.connect.errors.DataException,__connect.errors.exception.message=Invalid null value for required STRING field,__connect.errors.exception.stacktrace=org.apache.kafka.connect.errors.
DataException: Invalid null value for required STRING field
    at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:685)
    at org.apache.kafka.connect.json.JsonConverter.lambda$static$11(JsonConverter.java:133)
    at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:730)
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:343)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:516)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:516)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Poznámka: na těchto dvou malých ukázkách je patrné, proč je zápis chyb do hlaviček zpráv ukládaných do DLQ ve výchozím nastavení zakázán – informace o chybě totiž může být delší, než samotná zpráva.

11. Zápis přijatých zpráv do relační databáze

Poměrně velkou předností frameworku Kafka Connect je fakt, že je v současné době k dispozici velké množství konektorů, které umožňují Kafku velmi snadno propojit s dalšími zdroji či cíli dat (S3, MQTT, SQS atd.). Mezi potenciálně velmi užitečné konektory patří i JDBC konektor (konektory), které podporují posílání zpráv získaných z relační databáze nebo naopak ukládání zpráv do relační databáze. Takovou možnost jsme si ostatně zmínili již v úvodním článku:

Obrázek 1: Zprávy jsou přes jeden konektor načítány z databáze (například z PostgreSQL) a posílány do zvoleného tématu v Apache Kafce. Odtud si je načítá (konzumuje) další konektor, který zprávy ukládá do Hadoopu.

V následujících kapitolách si proto ukážeme, jak lze ukládat zprávy do známé databáze PostgreSQL. Nejdříve je však nutné stáhnout a rozbalit balíček s JDBC ovladači i vlastními konektory. Tento balíček naleznete na adrese https://www.confluent.io/hub/con­fluentinc/kafka-connect-jdbc.

12. Nastavení cesty k Java archivům s JDBC konektorem i ovladači

Po rozbalení je nutné korektně nastavit cestu k Java archivům získaným ze staženého souboru. Tato cesta se nastavuje v souboru config/connect-standalone.properties a v mém konkrétním případě jsem provedl změny, které jsou zvýrazněny:

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
 
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false
 
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
 
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include 
# any combination of: 
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/home/ptisnovs/kafka/kafka_2.12-3.3.2/libs/,/home/ptisnovs/kafka/confluentinc-kafka-connect-jdbc-10.6.0/lib/,
Poznámka: je zapotřebí si dát pozor na to, že hodnota uložená v plugin.path se zapisuje odlišně, než CLASSPATH (jsou použity odlišné oddělovače). Pokud použijete relativní cesty nebo ~ pro označení domovského adresáře, bude chování konektorů poněkud nevyzpytatelné (ne vše bude plně funkční). Z tohoto důvodu jsem použil absolutní cesty.

13. Konfigurace konektoru pro zápis zpráv do PostgreSQL

Podívejme se nyní na to, jak by mohla vypadat konfigurace jednoduchého konektoru typu sink, který bude konzumované zprávy zapisovat do databáze PostgreSQL. V konfiguraci nalezneme především odlišnou třídu s implementací konektoru:

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector

Nejdůležitější konfigurační volbou je connection.url, který obsahuje (na databázi závislý) „connection string“. V našem konkrétním případě se budeme připojovat k lokálně běžícímu Postgresu s uvedeným jménem a heslem (což v praxi bude nutné řešit jinak – bezpečněji). A jméno databáze, do které se budou zprávy zapisovat, se jmenuje kafka_sink:

connection.url=jdbc:postgresql://localhost:5432/kafka_sink?user=postgres&password=postgres

A konečně se volbou auto.create povolí tvorba tabulky/tabulek pro zápis dat:

auto.create=true

Celá konfigurace konektoru vypadá následovně:

name=db-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=connect-test-db
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
connection.url=jdbc:postgresql://localhost:5432/kafka_sink?user=postgres&password=postgres
auto.create=true
delete.enabled=false

Do jaké tabulky se však budou zprávy zapisovat? Jméno tabulky je přímo odvozeno od jména tématu, což bude poněkud problematické, neboť se ve jméně tabulky objeví znaky „-“. O tom se lze snadno přesvědčit spuštěním konektoru, posláním zprávy a kontrolou databáze kafka_sink v klientu psql (podrobněji si to ukážeme o několik odstavců níže):

kafka_sink=# \d
              List of relations
 Schema |      Name       | Type  |  Owner
--------+-----------------+-------+----------
 public | connect-test-db | table | postgres
(1 rows)

14. Změna jména tématu, ukázka konzumace zpráv s jejich zápisem do databáze

Aby bylo možné snadněji pracovat s databázovou tabulkou vytvářenou a naplňovanou konektorem, změníme jméno tématu konzumované konektorem. Konkrétně použijeme téma nazvané test_table (což není příliš přiléhavé, ale v databázi se nám tato tabulka bude snadno hledat):

name=db-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=test_table
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
connection.url=jdbc:postgresql://localhost:5432/kafka_sink?user=postgres&password=postgres
auto.create=true
delete.enabled=false

Před spuštěním konektoru se přesvědčíme, že v Postgresu existuje databáze kafka_sink a popř. ji vytvoříme příkazem CREATE DATABASE:

$ psql -U postgres
 
Password for user postgres:
psql (9.6.10)
Type "help" for help.
 
 
postgres=# \l
                                   List of databases
     Name     |  Owner   | Encoding |   Collate   |    Ctype    |   Access privileges
--------------+----------+----------+-------------+-------------+-----------------------
 aggregator   | postgres | UTF8     | en_US.UTF-8 | en_US.UTF-8 |
 controller   | postgres | UTF8     | en_US.UTF-8 | en_US.UTF-8 |
 cve          | postgres | UTF8     | en_US.UTF-8 | en_US.UTF-8 |
 kafka_sink   | postgres | UTF8     | en_US.UTF-8 | en_US.UTF-8 | 
 notification | postgres | UTF8     | en_US.UTF-8 | en_US.UTF-8 |
 postgres     | postgres | UTF8     | en_US.UTF-8 | en_US.UTF-8 |
 ptisnovs     | postgres | UTF8     | en_US.UTF-8 | en_US.UTF-8 |
 template0    | postgres | UTF8     | en_US.UTF-8 | en_US.UTF-8 | =c/postgres          +
              |          |          |             |             | postgres=CTc/postgres
 template1    | postgres | UTF8     | en_US.UTF-8 | en_US.UTF-8 | =c/postgres          +
              |          |          |             |             | postgres=CTc/postgres
 test         | postgres | UTF8     | en_US.UTF-8 | en_US.UTF-8 |
(11 rows)

Spustíme nově nakonfigurovaný konektor a pošleme do něj zprávy s korektním schématem. V logu by se mj. měly objevit zprávy o vytvoření nové tabulky. Povšimněte si, že sloupce v tabulce jsou získány ze schématu, tedy tabulka test_table bude mít tři sloupce ID, Name a Surname:

[2023-02-13 19:38:35,693] INFO [db-sink|task-0] Checking PostgreSql dialect for existence of TABLE "test_table" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:586)
[2023-02-13 19:38:35,698] INFO [db-sink|task-0] Using PostgreSql dialect TABLE "test_table" absent (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:594)
[2023-02-13 19:38:35,700] INFO [db-sink|task-0] Creating table with sql: CREATE TABLE "test_table" (
"ID" TEXT NOT NULL,
"Name" TEXT NOT NULL,
"Surname" TEXT NOT NULL) (io.confluent.connect.jdbc.sink.DbStructure:122)
[2023-02-13 19:38:35,710] INFO [db-sink|task-0] Checking PostgreSql dialect for existence of TABLE "test_table" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:586)
[2023-02-13 19:38:35,715] INFO [db-sink|task-0] Using PostgreSql dialect TABLE "test_table" present (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:594)
[2023-02-13 19:38:35,727] INFO [db-sink|task-0] Checking PostgreSql dialect for type of TABLE "test_table" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect:880)
[2023-02-13 19:38:35,730] INFO [db-sink|task-0] Setting metadata for table "test_table" to Table{name='"test_table"', type=TABLE columns=[Column{'Name', isPrimaryKey=false, allowsNull=false, sqlType=text}, Column{'Surname', isPrimaryKey=false, allowsNull=false, sqlType=text}, Column{'ID', isPrimaryKey=false, allowsNull=false, sqlType=text}]} (io.confluent.connect.jdbc.util.TableDefinitions:64)
Poznámka: v případě, že se tabulka nevytvoří nebo dojde k výjimce při vytváření nové tabulky, zkontrolujte si connection string a taktéž to, zda má uživatel (specifikovaný v connection stringu) právo na vytváření nových tabulek.

15. Kontrola zpráv zapsaných do databáze

Nyní nastal čas na kontrolu, zda byly všechny zprávy skutečně naším konektorem zapsány do databáze. Nejprve se proto přihlásíme do Postgresu přes konzoli psql:

$ psql -U postgres
 
Password for user postgres:
psql (9.6.10)
Type "help" for help.
 

Dále se přepneme do databáze kafka_sink. Tato databáze by měla existovat, protože kontrolu jsme provedli již v předchozí kapitole:

postgres=# \c kafka_sink
You are now connected to database "kafka_sink" as user "postgres".
 

Vypíšeme si všechny tabulky popř. další objekty ze zvolené databáze:

kafka_sink=# \d
 
              List of relations
 Schema |      Name       | Type  |  Owner
--------+-----------------+-------+----------
 public | connect-test-db | table | postgres
 public | test_table      | table | postgres
(2 rows)

Vidíme, že existuje jak poněkud nešťastně pojmenovaná tabulka connect-test-db vytvořená prvním „databázovým“ konektorem, tak i tabulka druhá. Bude nás zajímat především struktura druhé tabulky, takže si ji necháme zobrazit:

kafka_sink=# \d+ test_table
 
                     Table "public.test_table"
 Column  | Type | Modifiers | Storage  | Stats target | Description
---------+------+-----------+----------+--------------+-------------
 ID      | text | not null  | extended |              |
 Name    | text | not null  | extended |              |
 Surname | text | not null  | extended |              |

A konečně si pochopitelně můžeme nechat vypsat zprávy zapsané do této tabulky:

kafka_sink=# select * from test_table;
 
 ID | Name  | Surname
----+-------+----------
 1  | Linus | Torvalds
(1 row)
Poznámka: celý postup sice může na první pohled vypadat poněkud komplikovaně, ve skutečnosti je však konfigurace i spuštění „databázového“ konektoru otázka několika minut.

16. Registr schémat

Prozatím jsme posílali schéma těla každé zprávy přímo v dané zprávě. To je sice na první pohled velmi flexibilní řešení, ovšem znemožňuje to efektivní centrální správu schémat popř. zajištění jednotného schématu pro všechny zprávy ve vybraném tématu. I tuto problematiku je možné řešit, a to konkrétně s využitím takzvaného registru schémat (https://docs.confluent.i­o/platform/current/schema-registry/index.html#sr-overview). Jedná se o samostatně běžící službu, která poskytuje schémata Kafce i konektorům a navíc umožňuje správu verzí těchto schémat atd. Touto velmi zajímavou a užitečnou problematikou se budeme zabývat příště.

bitcoin_skoleni

17. Obsah třetí části článku

Ve třetí části článku o frameworku Kafka Connect se budeme zabývat především prací s registrem schémat, o němž jsme se ve stručnosti zmínili v předchozí kapitole. Tento registr se totiž dá použít k mnoha účelům, například pro podporu transformace zpráv mezi různými verzemi schématu atd. – tyto transformace (popř. akceptace starších schémat) se tedy nemusí provádět ručně v programovém kódu, což je velmi pracné a mnohdy se musí provádět ve více producentech a/nebo konzumentech.

18. Repositář s demonstračními příklady, konfiguračními soubory a testovacími zprávami

Demonstrační příklady ukázané minule byly společně s konfiguracemi konektorů určenými pro Kafka Connect a ukázkovými zprávami uloženy do repositáře, jenž je dostupný na adrese https://github.com/tisnik/slides/. V případě, že nebudete chtít klonovat celý repositář, můžete namísto toho použít odkazy na jednotlivé demonstrační příklady i další soubory, které naleznete v následující tabulce:

# Soubor Stručný popis Adresa
1 producer1.go jednoduchý producent zpráv pro Apache Kafku naprogramovaný v jazyku Go (Sarama) https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/producer1.go
2 producer2.go jednoduchý producent zpráv pro Apache Kafku naprogramovaný v jazyku Go (Sarama) https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/producer2.go
3 consumer1.go jednoduchý konzument zpráv pro Apache Kafku naprogramovaný v jazyku Go (Confluent Kafka) https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/consumer1.go
4 consumer2.go jednoduchý konzument zpráv pro Apache Kafku naprogramovaný v jazyku Go (Confluent Kafka) https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/consumer2.go
5 producer.py jednoduchý producent zpráv pro Apache Kafku naprogramovaný v jazyku Python https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/producer.py
6 consumer.py jednoduchý konzument zpráv pro Apache Kafku naprogramovaný v jazyku Python https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/consumer.py
7 SimpleProducer.java jednoduchý producent zpráv pro Apache Kafku naprogramovaný v jazyku Java https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/SimpleProducer.java
8 SimpleConsumer.java jednoduchý konzument zpráv npro Apache Kafku aprogramovaný v jazyku Java https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/SimpleConsumer.java
       
9 connect-file-sink.properties konfigurace konektoru technologie Kafka Connect: zápis přijatých zpráv (v řetězcovém formátu) do souboru https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/connect-file-sink.properties
10 connect-file-sink-2.properties konfigurace konektoru technologie Kafka Connect: zápis přijatých zpráv ve formátu JSON bez kontroly schématu https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/connect-file-sink-2.properties
11 connect-file-sink-3.properties konfigurace konektoru technologie Kafka Connect: dtto, ale konektor není ukončen po přijetí nekorektní zprávy https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/connect-file-sink-3.properties
12 connect-file-sink-4.properties konfigurace konektoru technologie Kafka Connect: nekorektní zprávy budou uloženy do DLQ https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/connect-file-sink-4.properties
13 connect-file-sink-5.properties konfigurace konektoru technologie Kafka Connect: kontrola zpráv oproti schématu https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/connect-file-sink-5.properties
14 connect-file-sink-6.properties konfigurace konektoru technologie Kafka Connect: zápis konkrétní chyby do DLQ https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/connect-file-sink-6.properties
15 db-sink-1.properties konfigurace konektoru technologie Kafka Connect: uložení zpráv do databáze https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/db-sink-1.properties
16 db-sink-2.properties konfigurace konektoru technologie Kafka Connect: uložení zpráv do databáze (vhodnější jméno tabulky) https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/db-sink-2.properties
       
17 msg1.json zpráva uložená ve formátu JSON obsahující i schéma https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/msg1.json
18 msg2.json schéma s jedním celočíselným atributem, včetně vlastního těla zprávy https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/msg2.json
19 msg3.json nekorektní zpráva: chybějící povinné atributy předepsané schématem https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/msg3.json
20 msg4.json schéma s nepovinnými atributy + korektní zpráva, která odpovídá schématu https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/msg4.json
21 bad_msg.json zpráva ve formátu JSON, která ovšem neobsahuje schéma https://github.com/tisnik/sli­des/blob/master/files/kaf­ka/bad_msg.json

19. Odkazy na Internetu

  1. Kafka Connect and Schemas
    https://rmoff.net/2020/01/22/kafka-connect-and-schemas/
  2. JSON and schemas
    https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/#json-schemas
  3. What, why, when to use Apache Kafka, with an example
    https://www.startdataengi­neering.com/post/what-why-and-how-apache-kafka/
  4. When NOT to use Apache Kafka?
    https://www.kai-waehner.de/blog/2022/01/04/when-not-to-use-apache-kafka/
  5. Microservices: The Rise Of Kafka
    https://movio.co/blog/microservices-rise-kafka/
  6. Building a Microservices Ecosystem with Kafka Streams and KSQL
    https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/
  7. An introduction to Apache Kafka and microservices communication
    https://medium.com/@ulymarins/an-introduction-to-apache-kafka-and-microservices-communication-bf0a0966d63
  8. kappa-architecture.com
    http://milinda.pathirage.org/kappa-architecture.com/
  9. Questioning the Lambda Architecture
    https://www.oreilly.com/i­deas/questioning-the-lambda-architecture
  10. Lambda architecture
    https://en.wikipedia.org/wi­ki/Lambda_architecture
  11. Kafka – ecosystem (Wiki)
    https://cwiki.apache.org/con­fluence/display/KAFKA/Eco­system
  12. The Kafka Ecosystem – Kafka Core, Kafka Streams, Kafka Connect, Kafka REST Proxy, and the Schema Registry
    http://cloudurable.com/blog/kafka-ecosystem/index.html
  13. A Kafka Operator for Kubernetes
    https://github.com/krallistic/kafka-operator
  14. Kafka Streams
    https://cwiki.apache.org/con­fluence/display/KAFKA/Kaf­ka+Streams
  15. Kafka Streams
    http://kafka.apache.org/do­cumentation/streams/
  16. Kafka Streams (FAQ)
    https://cwiki.apache.org/con­fluence/display/KAFKA/FAQ#FAQ-Streams
  17. Event stream processing
    https://en.wikipedia.org/wi­ki/Event_stream_processing
  18. Part 1: Apache Kafka for beginners – What is Apache Kafka?
    https://www.cloudkarafka.com/blog/2016–11–30-part1-kafka-for-beginners-what-is-apache-kafka.html
  19. What are some alternatives to Apache Kafka?
    https://www.quora.com/What-are-some-alternatives-to-Apache-Kafka
  20. What is the best alternative to Kafka?
    https://www.slant.co/opti­ons/961/alternatives/~kaf­ka-alternatives
  21. A super quick comparison between Kafka and Message Queues
    https://hackernoon.com/a-super-quick-comparison-between-kafka-and-message-queues-e69742d855a8?gi=e965191e72d0
  22. Kafka Queuing: Kafka as a Messaging System
    https://dzone.com/articles/kafka-queuing-kafka-as-a-messaging-system
  23. Configure Self-Managed Connectors
    https://docs.confluent.io/kafka-connectors/self-managed/configuring.html#configure-self-managed-connectors
  24. Schema Evolution and Compatibility
    https://docs.confluent.io/plat­form/current/schema-registry/avro.html#schema-evolution-and-compatibility
  25. Configuring Key and Value Converters
    https://docs.confluent.io/kafka-connectors/self-managed/userguide.html#configuring-key-and-value-converters
  26. Introduction to Kafka Connectors
    https://www.baeldung.com/kafka-connectors-guide
  27. Kafka CLI: command to list all consumer groups for a topic?
    https://stackoverflow.com/qu­estions/63883999/kafka-cli-command-to-list-all-consumer-groups-for-a-topic
  28. Java Property File Processing
    https://www.w3resource.com/java-tutorial/java-propertyfile-processing.php
  29. Skipping bad records with the Kafka Connect JDBC sink connector
    https://rmoff.net/2019/10/15/skipping-bad-records-with-the-kafka-connect-jdbc-sink-connector/
  30. Kafka Connect Deep Dive – Error Handling and Dead Letter Queues
    https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/
  31. Errors and Dead Letter Queues
    https://developer.confluent.io/learn-kafka/kafka-connect/error-handling-and-dead-letter-queues/
  32. Confluent Cloud Dead Letter Queue
    https://docs.confluent.io/clou­d/current/connectors/dead-letter-queue.html
  33. Dead Letter Queues (DLQs) in Kafka
    https://medium.com/@sannidhi.s.t/dead-letter-queues-dlqs-in-kafka-afb4b6835309
  34. Deserializer
    https://docs.confluent.io/plat­form/current/schema-registry/serdes-develop/serdes-json.html#json-schema-serializer-and-deserializer
  35. JSON, Kafka, and the need for schema
    https://mikemybytes.com/2022/07/11/json-kafka-and-the-need-for-schema/
  36. Using Kafka Connect with Schema Registry
    https://docs.confluent.io/plat­form/current/schema-registry/connect.html
  37. Zpracování dat reprezentovaných ve formátu JSON nástrojem jq
    https://www.root.cz/clanky/zpracovani-dat-reprezentovanych-ve-formatu-json-nastrojem-jq/
  38. Repositář projektu jq (GitHub)
    https://github.com/stedolan/jq
  39. GitHub stránky projektu jq
    https://stedolan.github.io/jq/
  40. 5 modern alternatives to essential Linux command-line tools
    https://opensource.com/ar­ticle/20/6/modern-linux-command-line-tools
  41. Návod k nástroji jq
    https://stedolan.github.i­o/jq/tutorial/
  42. jq Manual (development version)
    https://stedolan.github.io/jq/manual/
  43. Introducing JSON
    https://www.json.org/json-en.html
  44. Understanding JSON schema
    https://json-schema.org/understanding-json-schema/index.html
  45. JDBC Sink Connector for Confluent Platform
    https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/overview.html#jdbc-sink-connector-for-cp
  46. JDBC Connector (Source and Sink)
    https://www.confluent.io/hub/con­fluentinc/kafka-connect-jdbc
  47. Introduction to Schema Registry in Kafka
    https://medium.com/slalom-technology/introduction-to-schema-registry-in-kafka-915ccf06b902
  48. Understanding JSON Schema Compatibility
    https://yokota.blog/2021/03/29/un­derstanding-json-schema-compatibility/

Autor článku

Vystudoval VUT FIT a v současné době pracuje na projektech vytvářených v jazycích Python a Go.