Obsah
1. Témata, oddíly a replikace v systému Apache Kafka
2. Základní koncepty, na nichž je Apache Kafka postavena
3. Nejjednodušší konfigurace tématu: jediný broker, jeden oddíl, bez replikace
4. Téma rozdělené na větší počet oddílů
5. Téma s jedním replikovaným oddílem
6. Téma s větším množstvím replikovaných oddílů
9. Konfigurace a spuštění Zookeepera, test zda Zookeeper přijímá příkazy
10. Konfigurace a spuštění brokera
11. Komunikace přes téma s jediným oddílem a bez replikace
12. Větší množství konzumentů z jedné skupiny vs. z více skupin
13. Komunikace přes téma s větším množstvím oddílů
14. Souběžná konzumace zpráv konzumenty z jedné skupiny
15. Spuštění většího množství brokerů
16. Vytvoření a použití tématu s jedním replikovaným oddílem
17. Chování při nedostupnosti brokeru (brokerů)
18. Vytvoření a použití tématu s několika replikovanými oddíly
19. Repositář s pomocnými skripty a konfiguračními soubory
1. Témata, oddíly a replikace v systému Apache Kafka
Systém Apache Kafka je v současnosti velmi rozšířen a používá se v mnoha oblastech IT. Někdy se setkáme s tím, že je Apache Kafka nasazena a využívána jako pouhý „vylepšený“ message broker, tj. jako centrální část celé architektury sloužící pro komunikaci mezi jednotlivými (mikro)službami a nástroji. Ovšem možnosti Apache Kafky jsou ve skutečnosti poněkud větší, a to díky poměrně unikátnímu způsobu práce s tzv. tématy (topic). Navíc Apache Kafka dokáže zajistit svoji velkou dostupnost a odolnost vůči pádům jednotlivých komponent či síťové infrastruktury (resilience). Tomuto tématu, s nímž do značné míry souvisí tzv. replikace oddílů a systém leader-follower(s), se budeme věnovat v dnešním článku.
Obrázek 1: Logo nástroje Apache Kafka, kterému se budeme dnes věnovat.
2. Základní koncepty, na nichž je Apache Kafka postavena
V této kapitole si ve stručnosti vysvětlíme základní koncepty, na nichž je Apache Kafka postavena. Systém Apache Kafka umožňuje ukládání zpráv (zde se ovšem poměrně často taktéž používá termín záznam – record) do různých témat (topic), přičemž každé téma je obecně rozděleno do oddílů neboli partition. Samozřejmě je možné pro téma vyhradit pouze jediný oddíl (což je ostatně výchozí nastavení, které se asi nejvíce podobá klasickým message brokerům) a tvářit se, že máme k dispozici „vylepšenou“ frontu – ostatně přesně takto lze s Kafkou začít a pro mnohé účely může být tato konfigurace dostatečná. Rozdělení tématu do většího množství oddílů se provádí z několika důvodů. Jedním z nich je snaha o rozdělení zátěže (load balancing), protože jednotlivé oddíly mohou být provozovány na různých počítačích v mnohdy i velmi rozsáhlém clusteru (většinou se jedná o zátěž disků, nikoli CPU).
Obrázek 2: Kafka nemusí být nasazena jako „pouhý“ message broker, ale může sloužit i jako primární zdroj dat pro další mikroslužby. To je základ pro architekturu Kappa.
Dále se dělení provádí z toho důvodu, že každý oddíl obsahuje sekvenci neměnných (immutable) zpráv, přičemž nové zprávy se pouze připojují na konec oddílu (append-only log). Zprávy z oddílů je možné číst (konzumovat) nezávisle na ostatních oddílech a zajistit tak potřebný load balancing (jak uvidíme dále, je tato možnost realizována přes skupiny konzumentů – consumer groups). Každá zpráva uložená do oddílu má přiřazen jednoznačný offset (reprezentovaný v Javě typem long, což je dostatečně vysoká hodnota na to, aby v reálném nasazení nedošlo k jejímu přetečení).
U většiny reálných nasazení Apache Kafky se taktéž počítá s využitím většího množství instancí brokerů, z nichž je vytvořen cluster (nazývaný Kafka Cluster). A právě při takovém uspořádání se setkáme s důležitým termínem replikace – každý oddíl je totiž typicky replikován na několika message brokerech v clusteru (ovšem nemusí se jednat o všechny brokery, replikace se provádí například na tři brokery ve větším clusteru, což si ostatně vyzkoušíme v dalších kapitolách).
To však není vše, jelikož je ve skutečnosti konfigurace poněkud složitější resp. může být složitější – každý oddíl totiž může být replikován na více počítačích, přičemž jeden z těchto oddílů je takzvaným „leaderem“ a ostatní jsou „followeři“. Zápis nových zpráv popř. čtení se provádí vždy jen v rámci leaderu, ovšem změny jsou replikovány na všechny kopie oddílu. Ve chvíli, kdy z nějakého (libovolného) důvodu dojde k pádu „leadera“, převezme jeho roli jeden z dalších uzlů. Pokud tedy existuje N uzlů s replikou oddílu, bude systém funkční i ve chvíli, kdy zhavaruje N-1 uzlů! (i to si vyzkoušíme).
3. Nejjednodušší konfigurace tématu: jediný broker, jeden oddíl, bez replikace
Podívejme se nejdříve na tu nejjednodušší možnou konfiguraci tématu. Jedná se o konfiguraci, v níž je téma spravováno jediným brokerem a není prováděna žádná replikace. Zprávy (události) jsou tedy fyzicky uloženy pouze v jediném souboru, přičemž zápis je všemi producenty prováděn na konec (což je očekávané chování), zatímco čtení zpráv může být konzumenty provedeno od libovolného offsetu. Příkladem je situace, kdy je konzument zpráv opožděn za producentem zpráv, protože čte zprávu na offsetu 4 zatímco producent bude zapisovat zprávu s offsetem 9:
write | +---+---+---+---+---+---+---+---+---+ v | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... +---+---+---+---+---+---+---+---+---+ ^ | read
Druhá situace nastane ve chvíli, kdy je producent pomalejší než konzument a konzument dojde na konec tématu. V tomto případě konzument bude čekat na příchod (resp. přesněji řečeno na připojení) nové zprávy na konec tématu:
write | +---+---+---+---+---+---+---+---+---+ v | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... +---+---+---+---+---+---+---+---+---+ ^ | read
A samozřejmě je možné k tématu připojit větší množství konzumentů. Pokud bude každý z konzumentů součástí jiné skupiny konzumentů, bude čtení zpráv probíhat pro každou skupinu nezávisle na ostatních skupinách. Jinými slovy – každá skupina konzumentů si „schraňuje“ svůj offset, jenž se může lišit od offsetu ostatních skupin:
write | +---+---+---+---+---+---+---+---+---+ v | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... +---+---+---+---+---+---+---+---+---+ ^ ^ ^ ^ | | | | | | read-group-1 | | | | | read-group-2 read-group-4 | read-group-3
4. Téma rozdělené na větší počet oddílů
Poněkud komplikovanější, ale velmi častá je taková konfigurace tématu, kde dochází k rozdělení zpráv do několika oddílů. V takovém případě producent či producenti nezapisují zprávy do jednoho oddílu (samozřejmě na konec), ale zápis je proveden pouze do jednoho z vybraných oddílů. O tom, do kterého oddílu bude zápis (resp. připojení) zprávy proveden, se rozhoduje na základě klíče připojeného ke zprávě. Samotná zpráva je totiž chápána jako dvě sekvence bajtů – klíč zprávy a tělo zprávy. A právě na základě klíče se vypočítá hash a algoritmus implementovaný v samotném brokeru rozhodne, do kterého oddílu bude zpráva uložena:
+---+---+---+---+---+---+ partition #0 | 0 | 1 | 2 | 3 | 4 | 5 | ... +---+---+---+---+---+---+ partition #1 | 0 | 1 | 2 | ... +---+---+---+ partition #2 | ... +---+---+---+---+---+---+---+---+---+ partition #3 | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... +---+---+---+---+---+---+---+---+---+
Po přijetí nové zprávy tedy může být zápis proveden do tohoto místa:
write | +---+---+---+---+---+---+ v partition #0 | 0 | 1 | 2 | 3 | 4 | 5 | ... +---+---+---+---+---+---+ partition #1 | 0 | 1 | 2 | ... +---+---+---+ partition #2 | ... +---+---+---+---+---+---+---+---+---+ partition #3 | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... +---+---+---+---+---+---+---+---+---+
Nebo se může broker rozhodnout pro připojení zprávy do posledního oddílu atd. atd.:
+---+---+---+---+---+---+ partition #0 | 0 | 1 | 2 | 3 | 4 | 5 | ... +---+---+---+---+---+---+ partition #1 | 0 | 1 | 2 | ... +---+---+---+ partition #2 | ... +---+---+---+---+---+---+---+---+---+ partition #3 | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... +---+---+---+---+---+---+---+---+---+ ^ | write
A jak se provádí čtení? Kafka ve chvíli, kdy je téma rozděleno do větví, musí přiřadit jednotlivé konzumenty (z jedné skupiny konzumentů) k nějakému oddílu. Nejjednodušší je situace, kdy má nadá skupina konzumentů stejný počet konzumentů, jako je počet oddílů, což je ostatně doporučované řešení. Pak je každý konzument přiřazen jednomu oddílu a konzumuje tedy pouze podmnožinu zpráv:
+---+---+---+---+---+---+ partition #0 | 0 | 1 | 2 | 3 | 4 | 5 | ............... konzument #1 +---+---+---+---+---+---+ partition #1 | 0 | 1 | 2 | ........................... konzument #3 +---+---+---+ partition #2 | ....................................... konzument #2 +---+---+---+---+---+---+---+---+---+ partition #3 | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |.... konzument #4 +---+---+---+---+---+---+---+---+---+
Konzumentů v jedné skupině ovšem může být méně, než je počet oddílů. V takovém případě musí některý konzument číst zprávy z většího množství oddílů:
+---+---+---+---+---+---+ partition #0 | 0 | 1 | 2 | 3 | 4 | 5 | ........... konzument #1 +---+---+---+---+---+---+ : partition #1 | 0 | 1 | 2 | ................................ konzument #3 +---+---+---+ partition #2 | ............................................ konzument #2 +---+---+---+---+---+---+---+---+---+ : partition #3 | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |........ +---+---+---+---+---+---+---+---+---+
Konzumentů ovšem může být i více. Tehdy nějaký konzument v daný okamžik nepracuje, tj. nepřijímá zprávy a situace může vypadat takto:
+---+---+---+---+---+---+ partition #0 | 0 | 1 | 2 | 3 | 4 | 5 | ............... konzument #1 +---+---+---+---+---+---+ partition #1 | 0 | 1 | 2 | ........................... konzument #3 +---+---+---+ partition #2 | ....................................... konzument #2 +---+---+---+---+---+---+---+---+---+ partition #3 | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |.... konzument #4 +---+---+---+---+---+---+---+---+---+ konzument #5 konzument #6 konzument #7
5. Téma s jedním replikovaným oddílem
Další možná konfigurace tématu může vypadat tak, že téma má sice jediný oddíl, ovšem tento oddíl je replikován mezi několika brokery. Příkladem může být oddíl replikovaný mezi trojicí brokerů. V takovém případě je jeden z těchto oddílů nazvaný leader a veškeré operace viděné zvnějšku Kafky (tedy posílání zpráv a jejich konzumace) probíhá právě s leaderem. Ostatní repliky jsou nazvané follower(s), protože pouze sledují leadera a synchronizují svůj obsah s leaderem:
write | +---+---+---+---+---+---+---+---+---+ v | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... (leader) +---+---+---+---+---+---+---+---+---+ ^ ^ | | read | sync | | v +---+---+---+---+---+---+---+---+---+ | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | (follower) +---+---+---+---+---+---+---+---+---+ ^ | | sync | | v +---+---+---+---+---+---+---+---+---+ | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | (follower) +---+---+---+---+---+---+---+---+---+
K čemu je to však dobré? V případě, že nějaký broker z celého Kafka clusteru zhavaruje a bude obsahovat oddíl typu follower, bude komunikace pokračovat dál a teprve po znovupřipojení brokera se follower postupně sesynchronizuje s leaderem. Zajímavější situace nastane ve chvíli, kdy zhavaruje samotný leader. V takovém případě Kafka „povýší“nějakého followera za nového leadera. V případě, že téma (resp. oddíl) je replikováno na N brokerů, může jich zhavarovat N-1 a systém bude stále funkční. I toto chování si pochopitelně postupně otestujeme.
6. Téma s větším množstvím replikovaných oddílů
Možnosti popsané ve čtvrté a v páté kapitole je pochopitelně možné v případě potřeby zkombinovat a vytvořit tak konfiguraci tématu, které bude rozděleno na větší množství oddílů a tyto oddíly budou replikovány mezi větší množství brokerů:
+---+---+---+---+---+---+ oddíl #0 | 0 | 1 | 2 | 3 | 4 | 5 | ... +---+---+---+---+---+---+ oddíl #1 | 0 | 1 | 2 | ... +---+---+---+ (leader) oddíl #2 | ... +---+---+---+---+---+---+---+---+---+ oddíl #3 | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... +---+---+---+---+---+---+---+---+---+ ^ | | sync | | v +---+---+---+---+---+---+ oddíl #0 | 0 | 1 | 2 | 3 | 4 | 5 | ... +---+---+---+---+---+---+ oddíl #1 | 0 | 1 | 2 | ... +---+---+---+ oddíl #2 | ... (follower) +---+---+---+---+---+---+---+---+---+ oddíl #3 | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... +---+---+---+---+---+---+---+---+---+
Opět zde můžeme vidět, že oddíl na jednom z brokerů bude zvolen za leadera a ostatní oddíly budou sledovat změny leadera a aplikovat je na své straně. A samozřejmě při výpadku leadera je jeden z následovníků zvolen za leadera a činnost celé infrastruktury Apache Kafky tak může pokračovat dál.
7. Praktická část
Druhá část dnešního článku bude zaměřená více prakticky. Nejdříve vytvoříme tu nejjednodušší instanci Kafka clusteru, která se skládá z jednoho běžícího Zookeepera a jednoho brokera. Na této instanci si otestujeme způsoby vytváření témat, chování většího množství konzumentů při připojení k tématu, použití většího množství oddílů pro téma atd. Ovšem zajímavější je situace, kdy je spuštěno větší množství brokerů a kdy je navíc nějaký oddíl replikován mezi tyto brokery. V této chvíli by mělo být možné brokera (či brokery) zastavit s tím, že zbývající brokeři se postarají o zachování funkcionality Apache Kafky (resp. přesněji řečeno celého Kafka clusteru). To, zda je tomu skutečně tak, si taktéž ověříme.
8. Instalace Apache Kafky
V praktické části budeme brokery Apache Kafky i Zookeeper spouštět lokálně (popř. z Dockeru), takže je nejdříve nutné Kafku nainstalovat. Není to vůbec nic složitého. V případě, že je na počítači nainstalováno JRE (běhové prostředí Javy), je instalace Kafky pro testovací účely triviální. V článku si ukážeme instalaci verze 3.3.2, ovšem můžete si stáhnout i nejnovější verzi 3.4.0, která byla vydána prakticky přesně před měsícem. Tarball s instalací Kafky lze získat z adresy https://downloads.apache.org/kafka/3.3.2/kafka2.13–3.3.2.tgz. Stažení a rozbalení tarballu:
$ wget https://downloads.apache.org/kafka/3.3.2/kafka_2.13-3.3.2.tgz $ tar xvfz kafka_2.13-3.3.2.tgz $ cd kafka_2.13-3.3.2/
Po rozbalení staženého tarballu získáme adresář, v němž se nachází všechny potřebné Java archivy (JAR), konfigurační soubory (v podadresáři config) a několik pomocných skriptů (v podadresáři bin). Pro spuštění Zookeepera a brokerů je zapotřebí mít nainstalovánu JRE (Java Runtime Environment) a samozřejmě též nějaký shell (BASH, cmd, …).
. ├── bin │ └── windows ├── config │ └── kraft ├── libs ├── licenses └── site-docs 7 directories
Mezi důležité soubory, které budeme používat v rámci dalších kapitol, patří především skripty pro spouštění jednotlivých služeb, konfiguraci témat, produkci zpráv či pro jejich konzumaci. Tyto skripty jsou uloženy v podadresáři bin (a pro Windows ještě v dalším podadresáři windows):
Skript | Stručný popis |
---|---|
bin/kafka-server-start.sh | spuštění brokera |
bin/zookeeper-server-start.sh | spuštění Zookeepera |
bin/kafka-configs.sh | konfigurace brokerů |
bin/kafka-topics.sh | konfigurace témat, zjištění informace o tématech atd. |
bin/kafka-consumer-groups.sh | konfigurace popř. zjištění informací o skupinách konzumentů |
bin/kafka-run-class.sh | spuštění konkrétní třídy z Apache Kafky (například pro zjištění informací o skupinách konzumentů) |
bin/kafka-console-producer.sh | jednoduchý producent zpráv |
bin/kafka-console-consumer.sh | jednoduchý konzument zpráv |
Používat budeme i několik konfiguračních souborů. Ty jsou pro změnu uloženy v podadresáři config a jedná se o soubory ve formátu Java property (file), tj. vlastně se jedná o sekvence dvojic klíč:hodnota (s podporou zápisu poznámek):
Konfigurační soubor | Stručný popis |
---|---|
config/server.properties | konfigurace brokeru |
config/zookeeper.properties | konfigurace Zookeepera |
9. Konfigurace a spuštění Zookeepera, test zda Zookeeper přijímá příkazy
Po (doufejme že úspěšné) instalaci Kafky již můžeme spustit Zookeeper a jednu instanci brokera (a to přesně v tomto pořadí!). Konfigurace Zookeepera je uložena ve výše zmíněném souboru config/zookeeper.properties a zajímat nás budou především následující tři konfigurační volby – adresář, kam ZooKeeper ukládá svoje data, port, který použijí brokeři a omezení počtu připojení jednoho klienta v daný okamžik:
dataDir=/tmp/zookeeper clientPort=2181 maxClientCnxns=0 admin.enableServer=false
Nyní již můžeme Zookeepera spustit:
$ cd kafka/kafka_2.12-3.3.2/ $ bin/zookeeper-server-start.sh config/zookeeper.properties
Průběh inicializace Zookeepera je vypisován na terminál:
[2023-02-04 08:37:49,555] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ... ... ... [2023-02-04 08:37:49,591] INFO (org.apache.zookeeper.server.ZooKeeperServer) [2023-02-04 08:37:49,591] INFO ______ _ (org.apache.zookeeper.server.ZooKeeperServer) [2023-02-04 08:37:49,591] INFO |___ / | | (org.apache.zookeeper.server.ZooKeeperServer) [2023-02-04 08:37:49,591] INFO / / ___ ___ | | __ ___ ___ _ __ ___ _ __ (org.apache.zookeeper.server.ZooKeeperServer) [2023-02-04 08:37:49,592] INFO / / / _ \ / _ \ | |/ / / _ \ / _ \ | '_ \ / _ \ | '__| (org.apache.zookeeper.server.ZooKeeperServer) [2023-02-04 08:37:49,592] INFO / /__ | (_) | | (_) | | < | __/ | __/ | |_) | | __/ | | (org.apache.zookeeper.server.ZooKeeperServer) [2023-02-04 08:37:49,592] INFO /_____| \___/ \___/ |_|\_\ \___| \___| | .__/ \___| |_| (org.apache.zookeeper.server.ZooKeeperServer) [2023-02-04 08:37:49,592] INFO | | (org.apache.zookeeper.server.ZooKeeperServer) [2023-02-04 08:37:49,592] INFO |_| (org.apache.zookeeper.server.ZooKeeperServer) ... ... ... [2023-02-04 08:37:49,691] INFO zookeeper.request_throttler.shutdownTimeout = 10000 (org.apache.zookeeper.server.RequestThrottler) [2023-02-04 08:37:49,706] INFO Using checkIntervalMs=60000 maxPerMinute=10000 maxNeverUsedIntervalMs=0 (org.apache.zookeeper.server.ContainerManager)
Ověření, zda Zookeeper běží a přijímá požadavky, můžeme provést standardním nástrojem telnet. Nejprve se k Zookeeperovi připojíme, a to konkrétně na port 2181:
$ telnet localhost 2181 Trying 192.168.1.34... Connected to 192.168.1.34. Escape character is '^]'.
Připojit se pochopitelně můžeme i ke vzdálenému stroji s běžícím Zookeeperem, pokud nám to umožňuje konfigurace sítě a nastavení firewallů:
$ telnet 192.168.1.34 2181 Trying 192.168.1.34... Connected to 192.168.1.34. Escape character is '^]'.
Nyní Zookeeper čeká na zadání four letter word command, tedy příkazu, který je zapsán formou čtyř znaků. Příkladem může být příkaz srvr. Po jeho zápisu Zookeeper odpoví a odpojí se:
srvr Zookeeper version: 3.6.3--6401e4ad2087061bc6b9f80dec2d69f2e3c8660a, built on 04/08/2021 16:35 GMT Latency min/avg/max: 0/0.0/0 Received: 1 Sent: 0 Connections: 1 Outstanding: 0 Zxid: 0x0 Mode: standalone Node count: 5 Connection closed by foreign host.
Mezi další podporované čtyřznakové příkazy patří:
Hodnota 32bitového slova | Odpovídající příkaz |
---|---|
1936881266 | srvr |
1937006964 | stat |
2003003491 | wchc |
1685417328 | dump |
1668445044 | crst |
1936880500 | srst |
1701738089 | envi |
1668247142 | conf |
1751217000 | hash |
2003003507 | wchs |
2003003504 | wchp |
1684632179 | dirs |
1668247155 | cons |
1835955314 | mntr |
1769173615 | isro |
1920298859 | ruok |
1735683435 | gtmk |
1937010027 | stmk |
10. Konfigurace a spuštění brokera
Nyní, když již běží jedna instance Zookeepera, si můžeme spustit brokera. Podívejme se ovšem nejdříve na jeho konfiguraci. Výchozí konfigurace jednoho brokera je uložená v souboru config/server.properties. Samotný konfigurační soubor obsahuje několik sekcí:
- Port, na kterém broker naslouchá
- Jednoznačné (unikátní) ID brokera
- Počet použitých vláken pro IO operace a počet vláken pro komunikaci.
- Velikost bufferů, maximální povolená velikost požadavků (což omezuje velikost zprávy) atd.
- Nastavení počtu partitions
- Nastavení retence dat
- Připojení k Zookeeperovi
Takto vypadá výchozí konfigurace (po odstranění původních komentářů a přidání komentářů vlastních):
# ########################### Jednoznačná identifikace brokera #################### # broker.id=0 num.io.threads=8 # ######################## Komunikace s producenty i konzumenty ################### # socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 # ########################### Adresář se soubory s daty oddílů #################### # log.dirs=/tmp/kafka-logs # ################### Výchozí konfigurace oddílů pro nová témata ################## # num.partitions=1 num.recovery.threads.per.data.dir=1 # ######################## Interní témata s offsety a transakcemi ################# # offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 # ############################# Log Retention Policy ############################## # log.retention.hours=168 log.retention.check.interval.ms=300000 # ############################# Zookeeper ######################################### # zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=18000 # ############################# Group Coordinator Settings ######################## # group.initial.rebalance.delay.ms=0
Spuštění jednoho brokera vypadá i probíhá jednoduše:
$ cd kafka/kafka_2.12-3.3.2/ $ bin/kafka-server-start.sh config/server.properties
Broker by měl vypsat minimálně informaci o tom, že se připojil k Zookeeperu a že dokázal inicializovat adresář pro uložení dat (zpráv) oddílů:
[2023-02-04 08:41:47,105] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) [2023-02-04 08:41:47,506] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util) [2023-02-04 08:41:47,587] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler) [2023-02-04 08:41:47,589] INFO starting (kafka.server.KafkaServer) [2023-02-04 08:41:47,590] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer) [2023-02-04 08:41:47,606] INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient) ... ... ... [2023-02-04 08:49:52,076] INFO [KafkaServer id=0] started (kafka.server.KafkaServer) [2023-02-04 08:49:52,167] INFO [BrokerToControllerChannelManager broker=0 name=alterPartition]: Recorded new controller, from now on will use node localhost:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread) [2023-02-04 08:49:52,184] INFO [BrokerToControllerChannelManager broker=0 name=forwarding]: Recorded new controller, from now on will use node localhost:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)
Alternativně je možné ZooKeepera i Kafku (jednu instanci brokera) spustit v Dockeru:
$ docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine ip \`docker-machine active\`` --env ADVERTISED_PORT=9092 spotify/kafka
11. Komunikace přes téma s jediným oddílem a bez replikace
Nejprve si ukažme komunikaci mezi producenty zpráv a jejich konzumenty v případě, že jak producenti, tak i konzumenti používají společné téma (topic) s jediným oddílem, který navíc není replikovaný. Jedná se tedy o konfiguraci, která byla popsána ve třetí kapitole. V dalším textu budeme předpokládat, že je již spuštěn jak jeden Zookeeper, tak i jeden Kafka broker.
Budeme potřebovat další tři terminály (nebo okna spravovaná přes tmux atd.):
- V prvním terminálu budeme spouštět příkazy pro konfiguraci Kafky a pro zjišťování jejího stavu
- Ve druhém terminálu spustíme producenta zpráv.
- Ve třetím terminálu spustíme konzumenta zpráv.
Pro větší přehlednost bude mít každý z terminálů nastaven odlišnou výzvu (prompt):
kafka $ producer $ consumer $
Vypíšeme si seznam témat; ten by měl být prázdný:
kafka $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
Vytvoříme nové téma nazvané topic1 a necháme si opět vypsat seznam témat:
kafka $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic1 Created topic topic1. kafka $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --list topic1
Získáme i podrobnější informace o tématu (s jediným oddílem):
kafka $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic1 Topic: topic1 TopicId: 3Xfn9Hu1QhmRAdXKmEua-w PartitionCount: 1 ReplicationFactor: 1 Configs: Topic: topic1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Vypíšeme si (prozatím prázdný) seznam skupin konzumentů:
kafka $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
V dalším terminálu spustíme producenta zpráv a pošleme do tématu topic1 tři zprávy s hodnotou (tělem) „foo“, „bar“ a „baz“:
producer $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic1 >foo >bar >baz <Ctrl+D>
Spustíme konzumenta zpráv a zajistíme, aby zprávy četl od začátku tématu (jinak by čekal na nejnovější zprávy a již uložené zprávy by ignoroval):
consumer $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic1 --from-beginning foo bar baz
Opět si zobrazíme seznam skupin konzumentů:
kafka $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list console-consumer-73416
A vypíšeme si o nich i podrobnější informace:
kafka $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID console-consumer-73416 topic1 0 - 3 - console-consumer-938ac3be-f3b6-4e93-aab4-74e34b9a3ac5 /192.168.1.34 console-consumer
Lepší však bude vytvořit konzumenta s explicitně specifikovaným jménem skupiny konzumentů:
consumer $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic1 --group topic1-group1
Nyní bude výpis informací o skupinách konzumentů vypadat takto:
kafka $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID topic1-group1 topic1 0 - 3 - console-consumer-3dc1ad8a-14fb-4758-b25e-9319436b409b /192.168.1.34 console-consumer kafka $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups
Po poslání další zprávy provede konzument její zpracování a upraví si „svůj“ offset, díky čemuž může Kafka spočítat i lag, tj. zpoždění konzumentů vůči producentovi (měřené v počtu nepřečtených zpráv):
kafka $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID topic1-group1 topic1 0 4 4 0 console-consumer-3dc1ad8a-14fb-4758-b25e-9319436b409b /192.168.1.34 console-consumer
12. Větší množství konzumentů z jedné skupiny vs. z více skupin
V této kapitole si ukážeme chování Kafky ve chvíli, kdy se k vybranému tématu připojí větší množství konzumentů, kteří mohou patřit buď do jedné skupiny nebo do většího množství skupin.
Nejprve nové téma vytvoříme. Není to sice nutné, protože téma dokáže vytvořit i připojený klient, ale proč nebýt explicitní:
kafka $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic2 Created topic topic2.
Vypíšeme si podrobnější informace o právě vytvořeném tématu. Vidíme, že téma má jediný oddíl a jediného leadera (což je ten samý oddíl):
kafka $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic2 Topic: topic2 TopicId: 43_uP1TFTuOd-lXgKxbIaA PartitionCount: 1 ReplicationFactor: 1 Configs: Topic: topic2 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
V samostatných terminálech spustíme dvojici konzumentů, kteří budou patřit do stejné skupiny nazvané topic2-group1:
consumer1 $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic2 --group topic2-group1 consumer2 $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic2 --group topic2-group1
A samozřejmě v dalším terminálu spustíme producenta, který bude do tématu předávat zprávy:
producer $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic2 >foo >bar >baz <Ctrl+D>
Povšimněte si, že pouze jeden z producentů (náhodně vybraný) bude dostávat zprávy, zatímco druhý bude pouze čekat na zprávy, které nedojdou. Jedná se tedy o následující situaci, kdy v rámci jedné skupiny může být k jedinému oddílu připojen jediný konzument:
+---+---+---+---+---+---+ partition #0 | 0 | 1 | 2 | 3 | 4 | 5 | ............... konzument #1 +---+---+---+---+---+---+ konzument #2 ... ... ... konzument #N
Co se však bude dít ve chvíli, kdy budeme mít dvojici konzumentů, ovšem každý bude patřit do jiné skupiny? I to si pochopitelně otestujeme. Vytvoříme pro tento účel nové téma:
kafka $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic3 Created topic topic3. kafka $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic3 Topic: topic3 TopicId: o1Szo6FoQLu67SUK4m9QsA PartitionCount: 1 ReplicationFactor: 1 Configs: Topic: topic3 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
V samostatných terminálech spustíme dvojici konzumentů, každý ovšem bude patřit do jiné skupiny:
consumer1 $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic3 --group topic3-group1 consumer2 $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic3 --group topic3-group2
Spustíme producenta zpráv:
producer $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic3 >foo >bar >baz <Ctrl+D>
Nyní by oba konzumenti měli dostávat stejné zprávy! Tj. bude se jednat o tuto konfiguraci:
write | +---+---+---+---+---+---+---+---+---+ v | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... +---+---+---+---+---+---+---+---+---+ ^ ^ | | topic3-group1- | | topic3-group2
O tom, že konzumenti z různých skupin mají uložen svůj offset nezávisle na dalších skupinách, se lze snadno přesvědčit:
kafka $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID topic3-group1 topic3 0 3 3 0 console-consumer-bec8112f-8c21-46e0-8a22-997ec663615f /192.168.1.34 console-consumer GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID topic3-group2 topic3 0 3 3 0 console-consumer-feb4ff84-460c-4ddb-959c-f949fc18968f /192.168.1.34 console-consumer
13. Komunikace přes téma s větším množstvím oddílů
Vyzkoušejme si nyní další situaci, konkrétně stav, kdy je jedno téma rozděleno na dva oddíly na jediném Kafka brokeru. Takové téma je nutné vytvořit explicitně s použitím parametru –partitions:
kafka $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic4 --partitions 2 Created topic topic4.
Nyní bude konfigurace tématu vypsaná samotnou Kafkou vypadat odlišně, protože se vypíšou informace o obou oddílech:
kafka $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic4 Topic: topic4 TopicId: 4qaw38skShK9elEG-gwEIA PartitionCount: 2 ReplicationFactor: 1 Configs: Topic: topic4 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: topic4 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Spusťme opět dva konzumenty patřící do stejné skupiny:
consumer1 $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic4 --group topic4-group1 consumer2 $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic4 --group topic4-group1
Kafka broker v tomto případě konzumenty rozdělí po jednotlivých oddílech:
+---+---+---+---+---+---+ partition #0 | 0 | 1 | 2 | 3 | 4 | 5 | ............... konzument #1 +---+---+---+---+---+---+ partition #1 | 0 | 1 | 2 | ........................... konzument #2 +---+---+---+
O tom se ostatně můžeme velmi snadno přesvědčit:
kafka $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID topic4-group1 topic4 0 0 0 0 console-consumer-00d59376-f63e-4833-adfa-b0578c275244 /192.168.1.34 console-consumer topic4-group1 topic4 1 0 0 0 console-consumer-22ddccc4-6ea4-4b97-9c32-e85755788e35 /192.168.1.34 console-consumer
Teoreticky by nyní měly být zprávy rozdělovány „spravedlivě“ mezi oba oddíly:
producer $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic4 >foo >bar >baz <Ctrl+D>
Ve skutečnosti však k rozdělení nedojde, o čemž se lze snadno přesvědčit – jeden z konzumentů (nevíme který) bude dostávat všechny zprávy, druhý konzument nebude zprávy dostávat žádné!
+---+---+---+---+---+---+ partition #0 | 0 | 1 | 2 | 3 | 4 | 5 | ............... konzument #1 +---+---+---+---+---+---+ partition #1 | ....................................... konzument #2 +
14. Souběžná konzumace zpráv konzumenty z jedné skupiny
Zprávy se (ve výchozím nastavení) rozdělují do jednotlivých oddílů v tématu na základě svého klíče. Ten lze u posílaných (resp. produkovaných) zpráv specifikovat podobně jako tělo zprávy. Musíme však producenta ovládaného z příkazové řádky vhodným způsobem nakonfigurovat tak, aby věděl, jakým způsobem je klíč (sekvence bajtů) oddělen od těla zprávy (což je taktéž sekvence bajtů). Jako oddělovač můžeme použít například dvojtečku. Producent se nakonfiguruje následovně:
producer $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic4 --property parse.key=true --property key.separator=":"
Do tématu nyní pošleme šest zpráv, každou s odlišným klíčem (zadán před dvojtečkou):
>1:a >2:b >3:c >foo:1 >bar:2 >baz:3 <Ctrl+D>
Pokud jsou současně spuštění dva konzumenti patřící do stejné skupiny, uvidíme, že zprávy by měly být mezi konzumenty rozděleny a to zhruba férově:
consumer1 $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic4 --group topic4-group1 b 1 3
consumer2 $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic4 --group topic4-group1 a c 2
Dosáhli jsme tedy tohoto stavu:
+---+---+---+ partition #0 | 0 | 1 | 2 | ................ konzument #1 +---+---+---+ partition #1 | 0 | 1 | 2 | ................ konzument #2 +---+---+---+
O stavu jednotlivých oddílů se můžeme snadno přesvědčit zadáním následujícího příkazu:
kafka $ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID topic4-group1 topic4 0 3 3 0 console-consumer-00d59376-f63e-4833-adfa-b0578c275244 /192.168.1.34 console-consumer topic4-group1 topic4 1 6 6 0 console-consumer-22ddccc4-6ea4-4b97-9c32-e85755788e35 /192.168.1.34 console-consumer
Z výpisu je patrné, že ve druhém oddílu je větší množství zpráv. To je způsobeno zprávami, které jsme do stejného tématu poslali v rámci předchozí kapitoly. Tyto zprávy neměly klíče a proto byly umístěny do jediného oddílu.
15. Spuštění většího množství brokerů
V dalším textu si ukážeme, jakým způsobem je možné vytvořit oddíly replikované mezi větší množství brokerů. Ovšem nejprve musíme tyto brokery spustit. Každý broker musí mít unikátní ID a musí běžet na svém vlastním (prozatím neobsazeném) portu. Pro spuštění tří brokerů tedy musíme mít tři konfigurační soubory, které postupně vypadají následovně (důležité řádky jsou zvýrazněny):
Konfigurační soubor server1.properties:
broker.id=0 listeners=PLAINTEXT://:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0
Konfigurační soubor server2.properties:
broker.id=1 listeners=PLAINTEXT://:9093 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs-2 num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0
Konfigurační soubor server3.properties:
broker.id=2 listeners=PLAINTEXT://:9094 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs-3 num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0
Jednotlivé instance Kafka brokerů postupně spustíme s využitím následujících skriptů:
Skript pro první broker:
bin/kafka-server-start.sh config/server1.properties
Skript pro druhý broker:
bin/kafka-server-start.sh config/server2.properties
Skript pro třetí broker:
bin/kafka-server-start.sh config/server3.properties
16. Vytvoření a použití tématu s jedním replikovaným oddílem
Nyní, když máme spuštěny tři brokery, si vytvoříme téma s jediným oddílem, který bude replikován na dva vybrané brokery:
kafka $ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic5 --replica-assignment 0:1 Created topic topic5.
Nyní si vypišme konfiguraci nového tématu tak, jak ji vidí Apache Kafka:
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic5 Topic: topic5 TopicId: 3XcqjGiHT-OpkO1jIuqKcQ PartitionCount: 1 ReplicationFactor: 2 Configs: Topic: topic5 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Informace, které jsme dříve do jisté míry ignorovali, se nyní stávají důležité:
- V sekci Leader je uvedeno ID brokeru, který je v daný okamžik leaderem
- V sekci Replicas vidíme zápis 0,1, což jsou ID brokerů, kde jsou uloženy repliky prvního oddílu (s indexem 0)
- A konečně v sekci Isr taktéž vidíme zápis 0,1, což jsou ID brokerů, kteří obsahují synchronizované repliky oddílů (in-sync replicas)
Vytvořili jsme tedy téma s touto konfigurací:
write | +---+---+---+---+---+---+---+---+---+ v | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... (leader) +---+---+---+---+---+---+---+---+---+ ^ ^ | | read | sync | | v +---+---+---+---+---+---+---+---+---+ | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | (follower) +---+---+---+---+---+---+---+---+---+
Pro zajímavost si vytvořme další téma, které bude obsahovat jeden oddíl replikovaný na všechny tři servery:
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic6 --replica-assignment 0:1:2 Created topic topic6.
Konfigurace tohoto tématu:
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic6 Topic: topic6 TopicId: 1kEi0YYaQ5uYot7xAsN3Eg PartitionCount: 1 ReplicationFactor: 3 Configs: Topic: topic6 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Jedná se tedy o tuto strukturu:
write | +---+---+---+---+---+---+---+---+---+ v | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... (leader) +---+---+---+---+---+---+---+---+---+ ^ ^ | | read | sync | | v +---+---+---+---+---+---+---+---+---+ | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | (follower) +---+---+---+---+---+---+---+---+---+ ^ | | sync | | v +---+---+---+---+---+---+---+---+---+ | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | (follower) +---+---+---+---+---+---+---+---+---+
A nakonec – téma nazvané topic7 bude taktéž obsahovat jediný oddíl replikovaný na všechny tři brokery, ovšem nyní zvolíme jako leadera třetí broker (s ID nastaveným na 2):
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic7 --replica-assignment 2:0:1 Created topic topic7.
Konfigurace tohoto tématu:
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic7 Topic: topic7 TopicId: 55pDYK4gSYSB2Cc4Me3eIA PartitionCount: 1 ReplicationFactor: 3 Configs: Topic: topic7 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
V tomto případě se jedná o tuto strukturu:
+---+---+---+---+---+---+---+---+---+ | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | (follower) +---+---+---+---+---+---+---+---+---+ ^ | | sync | | v +---+---+---+---+---+---+---+---+---+ | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | (follower) +---+---+---+---+---+---+---+---+---+ ^ | | read sync | | | | write v v | +---+---+---+---+---+---+---+---+---+ v | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... (leader) +---+---+---+---+---+---+---+---+---+
17. Chování při nedostupnosti brokeru (brokerů)
V této kapitole použijeme téma nazvané topic7, které bylo vytvořeno v rámci předchozí kapitoly. Do tohoto tématu budeme posílat zprávy konzumentem ovládaným z příkazové řádky:
producer $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic7 >foo >bar >baz
Konzument se připojí – což je možná zvláštní – k prvnímu brokeru, i když ve skutečnosti je leaderem třetí broker. To je však interní záležitost Apache Kafky:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic7 --group topic7-group1 foo bar baz
Pokud nyní klávesou Ctrl+C nebo příkazem kill ukončíme činnost třetího brokera, který je leaderem, vypíše producent toto varování:
[2023-03-06 13:07:58,594] WARN [Producer clientId=console-producer] Connection to node 2 (unknown.home/192.168.1.34:9094) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Podobné varování vypíše i konzument:
[2023-03-06 13:07:41,241] WARN [Consumer clientId=console-consumer, groupId=topic7-group1] Connection to node 2 (unknown.home/192.168.1.34:9094) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Současně však dojde k přepojení na jiného brokera, který se stane leaderem. Který prober to je zjistíme opět z konfigurace tématu:
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic7 Topic: topic7 TopicId: 55pDYK4gSYSB2Cc4Me3eIA PartitionCount: 1 ReplicationFactor: 3 Configs: Topic: topic7 Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: 0,1
Nyní se tedy leaderem stává broker číslo 0, repliky jsou stále nakonfigurovány pro brokery 0, 1 i 2, ovšem synchronizováni jsou jen brokeři 0 a 1 (což je logické, protože zbývající broker neběží).
Nyní třetí broker opět nastartujeme. V logu brokerů by se měly objevit zprávy o synchronizaci témat (což může nějakou dobu trvat):
[2023-03-06 13:08:45,041] INFO [Partition topic7-0 broker=0] ISR updated to 0,1,2 and version updated to 2 (kafka.cluster.Partition)
A v konfiguraci tématu by opět měli být v sekci Isr vypsáni všichni brokeři (ovšem leader se už nezmění – není k tomu ostatně důvod):
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic7 Topic: topic7 TopicId: 55pDYK4gSYSB2Cc4Me3eIA PartitionCount: 1 ReplicationFactor: 3 Configs: Topic: topic7 Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: 0,1,2
18. Vytvoření a použití tématu s několika replikovanými oddíly
V závěrečné kapitole si ukažme, jak se vytvoří téma s několika oddíly, které jsou replikovány. Jedná se tedy o nejsložitější možnou konfiguraci, která umožňuje souběžnou činnost více konzumentů a navíc zajišťuje dostupnost oddílů i v případě, kdy nějaký broker zhavaruje. Téma bude mít tuto konfiguraci:
- První oddíl bude replikován na prvním a druhém brokeru
- Druhý oddíl bude replikován na prvním a třetím brokeru
- Třetí oddíl bude replikován na druhém a třetím brokeru
Čísla oddílů i ID brokerů začínají od nuly, takže téma vytvoříme takto (povšimněte si syntaxe zápisu posledního parametru):
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic8 --replica-assignment 0:1,0:2,1:2 Created topic topic8.
Samotná konfigurace tématu se nyní vypíše v této poměrně dobře pochopitelné podobě:
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic8 Topic: topic8 TopicId: sLky-pgtQAibNaoguwwDVA PartitionCount: 3 ReplicationFactor: 2 Configs: Topic: topic8 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: topic8 Partition: 1 Leader: 0 Replicas: 0,2 Isr: 0,2 Topic: topic8 Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
19. Repositář s pomocnými skripty a konfiguračními soubory
Zdrojové kódy dnes použitých producentů a konzumentů byly společně s konfiguračními soubory Zookeeperu a brokerů uloženy do repositáře, jenž je dostupný na adrese https://github.com/tisnik/slides/. V případě, že nebudete chtít klonovat celý repositář, můžete namísto toho použít odkazy na jednotlivé demonstrační příklady i další soubory, které naleznete v následující tabulce:
20. Odkazy na Internetu
- Kcli: is a kafka read only command line browser.
https://github.com/cswank/kcli - Kcli: a kafka command line browser
https://go.libhunt.com/kcli-alternatives - Kafka Connect and Schemas
https://rmoff.net/2020/01/22/kafka-connect-and-schemas/ - JSON and schemas
https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/#json-schemas - What, why, when to use Apache Kafka, with an example
https://www.startdataengineering.com/post/what-why-and-how-apache-kafka/ - When NOT to use Apache Kafka?
https://www.kai-waehner.de/blog/2022/01/04/when-not-to-use-apache-kafka/ - Microservices: The Rise Of Kafka
https://movio.co/blog/microservices-rise-kafka/ - Building a Microservices Ecosystem with Kafka Streams and KSQL
https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/ - An introduction to Apache Kafka and microservices communication
https://medium.com/@ulymarins/an-introduction-to-apache-kafka-and-microservices-communication-bf0a0966d63 - kappa-architecture.com
http://milinda.pathirage.org/kappa-architecture.com/ - Questioning the Lambda Architecture
https://www.oreilly.com/ideas/questioning-the-lambda-architecture - Lambda architecture
https://en.wikipedia.org/wiki/Lambda_architecture - Kafka – ecosystem (Wiki)
https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem - The Kafka Ecosystem – Kafka Core, Kafka Streams, Kafka Connect, Kafka REST Proxy, and the Schema Registry
http://cloudurable.com/blog/kafka-ecosystem/index.html - A Kafka Operator for Kubernetes
https://github.com/krallistic/kafka-operator - Kafka Streams
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams - Kafka Streams
http://kafka.apache.org/documentation/streams/ - Kafka Streams (FAQ)
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Streams - Event stream processing
https://en.wikipedia.org/wiki/Event_stream_processing - Part 1: Apache Kafka for beginners – What is Apache Kafka?
https://www.cloudkarafka.com/blog/2016–11–30-part1-kafka-for-beginners-what-is-apache-kafka.html - What are some alternatives to Apache Kafka?
https://www.quora.com/What-are-some-alternatives-to-Apache-Kafka - What is the best alternative to Kafka?
https://www.slant.co/options/961/alternatives/~kafka-alternatives - A super quick comparison between Kafka and Message Queues
https://hackernoon.com/a-super-quick-comparison-between-kafka-and-message-queues-e69742d855a8?gi=e965191e72d0 - Kafka Queuing: Kafka as a Messaging System
https://dzone.com/articles/kafka-queuing-kafka-as-a-messaging-system - Apache Kafka Logs: A Comprehensive Guide
https://hevodata.com/learn/apache-kafka-logs-a-comprehensive-guide/ - Microservices – Not a free lunch!
http://highscalability.com/blog/2014/4/8/microservices-not-a-free-lunch.html - Microservices, Monoliths, and NoOps
http://blog.arungupta.me/microservices-monoliths-noops/ - Microservice Design Patterns
http://blog.arungupta.me/microservice-design-patterns/ - REST vs Messaging for Microservices – Which One is Best?
https://solace.com/blog/experience-awesomeness-event-driven-microservices/ - Kappa Architecture Our Experience
https://events.static.linuxfound.org/sites/events/files/slides/ASPgems%20-%20Kappa%20Architecture.pdf - Apache Kafka Streams and Tables, the stream-table duality
https://towardsdatascience.com/apache-kafka-streams-and-tables-the-stream-table-duality-ee904251a7e?gi=f22a29cd1854 - Configure Self-Managed Connectors
https://docs.confluent.io/kafka-connectors/self-managed/configuring.html#configure-self-managed-connectors - Schema Evolution and Compatibility
https://docs.confluent.io/platform/current/schema-registry/avro.html#schema-evolution-and-compatibility - Configuring Key and Value Converters
https://docs.confluent.io/kafka-connectors/self-managed/userguide.html#configuring-key-and-value-converters - Introduction to Kafka Connectors
https://www.baeldung.com/kafka-connectors-guide - Kafka CLI: command to list all consumer groups for a topic?
https://stackoverflow.com/questions/63883999/kafka-cli-command-to-list-all-consumer-groups-for-a-topic - Java Property File Processing
https://www.w3resource.com/java-tutorial/java-propertyfile-processing.php - Skipping bad records with the Kafka Connect JDBC sink connector
https://rmoff.net/2019/10/15/skipping-bad-records-with-the-kafka-connect-jdbc-sink-connector/ - Kafka Connect Deep Dive – Error Handling and Dead Letter Queues
https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/ - Errors and Dead Letter Queues
https://developer.confluent.io/learn-kafka/kafka-connect/error-handling-and-dead-letter-queues/ - Confluent Cloud Dead Letter Queue
https://docs.confluent.io/cloud/current/connectors/dead-letter-queue.html - Dead Letter Queues (DLQs) in Kafka
https://medium.com/@sannidhi.s.t/dead-letter-queues-dlqs-in-kafka-afb4b6835309 - Deserializer
https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-json.html#json-schema-serializer-and-deserializer - JSON, Kafka, and the need for schema
https://mikemybytes.com/2022/07/11/json-kafka-and-the-need-for-schema/ - Using Kafka Connect with Schema Registry
https://docs.confluent.io/platform/current/schema-registry/connect.html - Zpracování dat reprezentovaných ve formátu JSON nástrojem jq
https://www.root.cz/clanky/zpracovani-dat-reprezentovanych-ve-formatu-json-nastrojem-jq/ - Repositář projektu jq (GitHub)
https://github.com/stedolan/jq - GitHub stránky projektu jq
https://stedolan.github.io/jq/ - 5 modern alternatives to essential Linux command-line tools
https://opensource.com/article/20/6/modern-linux-command-line-tools - Návod k nástroji jq
https://stedolan.github.io/jq/tutorial/ - jq Manual (development version)
https://stedolan.github.io/jq/manual/ - Introducing JSON
https://www.json.org/json-en.html - Understanding JSON schema
https://json-schema.org/understanding-json-schema/index.html - JDBC Sink Connector for Confluent Platform
https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/overview.html#jdbc-sink-connector-for-cp - JDBC Connector (Source and Sink)
https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc