Obsah
1. Faust: platforma pro proudové zpracování dat v Pythonu
2. Potenciálně problematická část – instalace knihovny Faust
3. Instalace, konfigurace a spuštění Apache Kafky
4. Otestování činnosti Apache Kafky
5. Klasický producent posílající zprávy do Kafky
6. Klasický konzument, který čte zprávy ze specifikovaného tématu
7. Spuštění producenta a konzumenta
8. Konzument realizovaný formou workera postavený na knihovně Faust
10. Producent posílající zprávy do většího množství témat
11. Konzumace zpráv workerem z většího množství témat
12. Produkce zpráv ve formátu JSON
13. Worker přijímající zprávy ve formátu JSON
15. Producent založený na knihovně Faust využívající model
16. Kombinace producenta a konzumenta – reálná síla knihovny Faust
17. Spuštění kombinace workerů s producenty a konzumenty
19. Repositář s demonstračními příklady
1. Faust: platforma pro proudové zpracování dat v Pythonu
V dnešním článku se seznámíme s knihovnou Faust, která umožňuje realizaci platformy pro proudové zpracování dat (stream processing) založené na systému Apache Kafky. V knihovně Faust se nepracuje (resp. nemusí pracovat) se standardními samostatně běžícími producenty a konzumenty. Namísto toho se nabízí možnost realizace zpracování zpráv (událostí) z jednoho tématu s posíláním výsledků do dalšího tématu či témat s využitím asynchronně naprogramovaných workerů. Navíc je možné pro popis struktury zpráv použít takzvané modely, což vlastně není nic jiného než popis struktury zprávy pomocí třídy. Jak workeři, tak i modely jsou přitom zapisovány jako běžný kód v Pythonu, tj. na rozdíl od některých jiných systémů pro realizaci stream processingu se zde nemusí používat specializovaný doménově specifický jazyk (DSL).
Obrázek 1: Jednoduchý stream processing založený na systému Apache Kafky.
Navíc Faust podporuje persistenci s využitím tabulek (interně se vlastně jedná o persistentní slovníky), což je téma, kterému se budeme podrobněji věnovat v navazujícím článku.
Obrázek 2: Nepatrně složitější konfigurace stream processorů založených na systému Apache Kafky.
2. Potenciálně problematická část – instalace knihovny Faust
Teoreticky je možné knihovnu Faust nainstalovat naprosto stejným způsobem, jako jakoukoli jinou knihovnu dostupnou přes PyPi. Pro instalaci tímto stylem postačuje zadat příkaz:
$ pip install --user faust-streaming
Měly by se nainstalovat i všechny závislosti:
Collecting faust-streaming Downloading faust_streaming-0.11.0-cpc311-cpc311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (24 kB) Collecting aiohttp<4.0,>=3.8.0 (from faust-streaming) Downloading aiohttp-3.9.5-cpc311-cpc311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.5 kB) Collecting aiohttp-cors<2.0,>=0.7 (from faust-streaming) Downloading aiohttp_cors-0.7.0-py3-none-any.whl.metadata (20 kB) Collecting aiokafka>=0.9.0 (from faust-streaming) Downloading aiokafka-0.10.0-cpc311-cpc311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (17 kB) Requirement already satisfied: click<8.2,>=6.7 in ./.local/lib/python3.8/site-packages (from faust-streaming) (8.1.7) Collecting mode-streaming>=0.4.0 (from faust-streaming) Downloading mode_streaming-0.4.1-py2.py3-none-any.whl.metadata (16 kB) Collecting opentracing<=2.4.0,>=1.3.0 (from faust-streaming) Downloading opentracing-2.4.0.tar.gz (46 kB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 46.2/46.2 kB 1.2 MB/s eta 0:00:00 Preparing metadata (setup.py) ... done Collecting terminaltables<4.0,>=3.1 (from faust-streaming) Downloading terminaltables-3.1.10-py2.py3-none-any.whl.metadata (3.5 kB) Requirement already satisfied: yarl<2.0,>=1.0 in ./.local/lib/python3.8/site-packages (from faust-streaming) (1.9.4) Collecting croniter>=0.3.16 (from faust-streaming) Downloading croniter-2.0.5-py2.py3-none-any.whl.metadata (25 kB) Requirement already satisfied: mypy-extensions in ./.local/lib/python3.8/site-packages (from faust-streaming) (1.0.0) Collecting venusian==3.1.0 (from faust-streaming) Downloading venusian-3.1.0-py3-none-any.whl.metadata (10 kB) Collecting intervaltree (from faust-streaming) Downloading intervaltree-3.1.0.tar.gz (32 kB) Preparing metadata (setup.py) ... done Requirement already satisfied: six in /usr/lib/python3/dist-packages (from faust-streaming) (1.14.0) Collecting aiosignal>=1.1.2 (from aiohttp<4.0,>=3.8.0->faust-streaming) Downloading aiosignal-1.3.1-py3-none-any.whl.metadata (4.0 kB) Requirement already satisfied: attrs>=17.3.0 in ./.local/lib/python3.8/site-packages (from aiohttp<4.0,>=3.8.0->faust-streaming) (23.1.0) Requirement already satisfied: frozenlist>=1.1.1 in ./.local/lib/python3.8/site-packages (from aiohttp<4.0,>=3.8.0->faust-streaming) (1.4.1) Requirement already satisfied: multidict<7.0,>=4.5 in ./.local/lib/python3.8/site-packages (from aiohttp<4.0,>=3.8.0->faust-streaming) (6.0.4) Requirement already satisfied: async-timeout<5.0,>=4.0 in ./.local/lib/python3.8/site-packages (from aiohttp<4.0,>=3.8.0->faust-streaming) (4.0.3) Requirement already satisfied: packaging in ./.local/lib/python3.8/site-packages (from aiokafka>=0.9.0->faust-streaming) (23.2) Collecting python-dateutil (from croniter>=0.3.16->faust-streaming) Downloading python_dateutil-2.9.0.post0-py2.py3-none-any.whl.metadata (8.4 kB) Collecting pytz>2021.1 (from croniter>=0.3.16->faust-streaming) Downloading pytz-2024.1-py2.py3-none-any.whl.metadata (22 kB) Collecting colorlog<7.0.0,>=6.0.0 (from mode-streaming>=0.4.0->faust-streaming) Downloading colorlog-6.8.2-py3-none-any.whl.metadata (10 kB) Requirement already satisfied: idna>=2.0 in /usr/lib/python3/dist-packages (from yarl<2.0,>=1.0->faust-streaming) (2.8) Collecting sortedcontainers<3.0,>=2.0 (from intervaltree->faust-streaming) Downloading sortedcontainers-2.4.0-py2.py3-none-any.whl.metadata (10 kB) Downloading faust_streaming-0.11.0-cpc311-cpc311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.2 MB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 1.2/1.2 MB 3.1 MB/s eta 0:00:00 Downloading venusian-3.1.0-py3-none-any.whl (13 kB) Downloading aiohttp-3.9.5-cpc311-cpc311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 1.3/1.3 MB 2.7 MB/s eta 0:00:00 Downloading aiohttp_cors-0.7.0-py3-none-any.whl (27 kB) Downloading aiokafka-0.10.0-cpc311-cpc311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.1 MB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 1.1/1.1 MB 2.4 MB/s eta 0:00:00 Downloading croniter-2.0.5-py2.py3-none-any.whl (20 kB) Downloading mode_streaming-0.4.1-py2.py3-none-any.whl (94 kB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 94.7/94.7 kB 1.9 MB/s eta 0:00:00 Downloading terminaltables-3.1.10-py2.py3-none-any.whl (15 kB) Downloading aiosignal-1.3.1-py3-none-any.whl (7.6 kB) Downloading colorlog-6.8.2-py3-none-any.whl (11 kB) Downloading pytz-2024.1-py2.py3-none-any.whl (505 kB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 505.5/505.5 kB 2.5 MB/s eta 0:00:00 Downloading sortedcontainers-2.4.0-py2.py3-none-any.whl (29 kB) Downloading python_dateutil-2.9.0.post0-py2.py3-none-any.whl (229 kB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 229.9/229.9 kB 2.4 MB/s eta 0:00:00 Building wheels for collected packages: opentracing, intervaltree Building wheel for opentracing (setup.py) ... done Created wheel for opentracing: filename=opentracing-2.4.0-py3-none-any.whl size=51403 sha256=32bff185cd7a87b97f48f23ba6447ac311acdaaadf2f0e9fa3a1a5371c09cf6867 Stored in directory: /home/ptisnovs/.cache/pip/wheels/c7/d1/cc/de262a2bc00e9fdfe32334094183aa9a78a2458dca9212145c Building wheel for intervaltree (setup.py) ... done Created wheel for intervaltree: filename=intervaltree-3.1.0-py2.py3-none-any.whl size=26102 sha256=091f3c2a6e2bcca88cbc2f214fbd466ee7e3fb5fea612ee6602164c4bc87107a Stored in directory: /home/ptisnovs/.cache/pip/wheels/45/23/de/5789a92962483fd33cb06674792b9697c1b3766d7c7742830e Successfully built opentracing intervaltree Installing collected packages: sortedcontainers, pytz, opentracing, venusian, terminaltables, python-dateutil, intervaltree, colorlog, aiosignal, aiokafka, croniter, aiohttp, mode-streaming, aiohttp-cors, faust-streaming Successfully installed aiohttp-3.9.5 aiohttp-cors-0.7.0 aiokafka-0.10.0 aiosignal-1.3.1 colorlog-6.8.2 croniter-2.0.5 faust-streaming-0.11.0 intervaltree-3.1.0 mode-streaming-0.4.1 opentracing-2.4.0 python-dateutil-2.9.0.post0 pytz-2024.1 sortedcontainers-2.4.0 terminaltables-3.1.10 venusian-3.1.0
To však bude bez problémů fungovat pouze pro starší verze Pythonu (CPythonu). V případě novějších verzí Pythonu (3.12, 3.13), v nichž došlo ke změně balíčků pro asynchronní běh aplikací, nebude možné takto nainstalovaný Faust použít. Někdy se tedy musíme uchýlit k jiné formě spuštění (což ani zdaleka není ideální situace!):
- Naklonujeme repositář https://github.com/faust-streaming/faust
- Vytvoříme virtuální prostředí Pythonu příkazem python3 -m venv venv && source venv/bin/activate
- Nainstalujeme do virtuálního prostředí všechny potřebné knihovny příkazem: ./scripts/install
Po těchto třech krocích bude možné knihovnu Faust používat v rámci právě inicializovaného virtuálního prostředí.
3. Instalace, konfigurace a spuštění Apache Kafky
Dnešní článek je zaměřen na ukázku některých možností nabízených knihovnou Faust, která interně využívá známý projekt Apache Kafka. To tedy znamená, že kromě instalace knihovny Faust budeme navíc potřebovat spustit (lokálně, vzdáleně, či v kontejneru) i Apache Kafku. Konkrétně nám bude postačovat spuštění jediného Kafka clusteru, přičemž se každý Kafka cluster skládá z jednoho běžícího Zookeepera a z jednoho brokera (tedy ze dvou procesů, které mezi sebou komunikují po nakonfigurovaných portech).
V praktické části budeme brokera Apache Kafky i Zookeepera spouštět lokálně (popř. z Dockeru či Podmana), takže je nejdříve nutné Apache Kafku nainstalovat. Není to ve skutečnosti vůbec nic složitého. V případě, že je na počítači nainstalováno JRE (běhové prostředí Javy), je instalace Apache Kafky pro testovací účely triviální. V článku si ukážeme instalaci verze 2.13–3.6.1, ovšem můžete si stáhnout i prakticky libovolnou novější či některé starší verze (3.5.x nebo 3.6.x, ovšem dále popsaný postup by měl být platný i pro ještě starší verze, v podstatě můžeme dojít až k verzi 2.4.0 a vše by mělo být funkční). Tarball s instalací Apache Kafky je možné získat z adresy https://downloads.apache.org/kafka/3.6.1/kafka2.13–3.6.1.tgz.
Stažení a rozbalení tarballu s Apache Kafkou zajistí následující sekvence příkazů:
$ wget https://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz $ tar xvfz kafka_2.13-3.6.1.tgz $ cd kafka_2.13-3.6.1/
Po rozbalení staženého tarballu získáme adresář, v němž se nachází všechny potřebné Java archivy (JAR), konfigurační soubory (v podadresáři config) a několik pomocných skriptů (v podadresáři bin a bin/windows). Pro spuštění Zookeepera a brokerů je zapotřebí, jak jsme si již řekli v předchozím odstavci, mít nainstalovánu JRE (Java Runtime Environment) a samozřejmě též nějaký shell (BASH, cmd, …).
. ├── bin │ └── windows ├── config │ └── kraft ├── libs ├── licenses └── site-docs 7 directories
Mezi důležité soubory, které budeme používat v rámci dalších kapitol pro ukázku možností knihovny Faust, patří především skripty pro spouštění jednotlivých služeb. Tyto skripty jsou uloženy v podadresáři bin (a pro Windows ještě v dalším podadresáři windows):
Skript | Stručný popis |
---|---|
bin/kafka-server-start.sh | spuštění brokera |
bin/zookeeper-server-start.sh | spuštění Zookeepera |
bin/kafka-configs.sh | konfigurace brokerů |
bin/kafka-topics.sh | konfigurace témat, zjištění informace o tématech atd. |
bin/kafka-consumer-groups.sh | konfigurace, popř. zjištění informací o skupinách konzumentů |
Nyní již máme všechny potřebné nástroje k dispozici, takže Kafka cluster spustíme. Nejdříve je vždy nutné spustit Zookeepera a teprve poté brokera či brokery. Pro sledování činnosti obou procesů si můžete Zookeepera i brokera spustit v samostatném terminálu, využít nástroj screen atd.
Spuštění Zookeepera:
$ cd ${kafka_dir} $ bin/zookeeper-server-start.sh config/zookeeper.properties
Spuštění brokera:
$ cd ${kafka_dir} $ bin/kafka-server-start.sh config/server.properties
4. Otestování činnosti Apache Kafky
Pro otestování, jestli je právě zprovozněný Kafka cluster skutečně funkční, si vytvoříme jednoduché producenty a konzumenty zpráv. Použijeme přitom programovací jazyk Python, protože i demonstrační příklady využívající knihovnu Faust budou naprogramovány v Pythonu. Samozřejmě by bylo možné použít konzolového producenta a konzumenta, který je součástí instalace Apache Kafky, ovšem možnosti těchto nástrojů jsou ve skutečnosti velmi omezené a nebudou nám postačovat v případě, že budeme chtít produkovat či naopak konzumovat složitější datové struktury a objekty.
Před vytvořením producenta zpráv v Pythonu je nutné nainstalovat knihovnu, která zabezpečuje připojení ke clusteru Apache Kafky. Takových knihoven dnes existuje několik. Nejjednodušší z těchto knihoven (bez dalších závislostí atd.) je knihovna nazvaná kafka-python. Vzhledem k tomu, že autoři této knihovny nevydávají časté updaty, vznikl její fork nazvaný jednoduše kafka-python-ng; a právě tento fork dnes použijeme. Tuto knihovnu můžeme nainstalovat pouze pro aktivního uživatele (což je nejjednodušší a nevyžaduje to rootovská práva) s využitím příkazu pip, protože tato knihovna je pochopitelně dostupná na PyPi:
$ pip3 install --verbose kafka-python-ng Using pip 22.3.1 from /usr/lib/python3.11/site-packages/pip (python 3.11) Defaulting to user installation because normal site-packages is not writeable Collecting kafka-python-ng Downloading kafka_python_ng-2.2.2-py2.py3-none-any.whl (232 kB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 232.4/232.4 kB 1.5 MB/s eta 0:00:00 Installing collected packages: kafka-python-ng Successfully installed kafka-python-ng-2.2.2
5. Klasický producent posílající zprávy do Kafky
Producent, tedy aplikace posílající zprávy do Apache Kafky, je představován instancí třídy KafkaProducer. Konstruktoru této třídy se předává pouze seznam brokerů (v našem případě jediný broker, protože náš Kafka cluster obsahuje jediného brokera) a popř. i další nepovinné údaje. Mezi ně patří i takzvaný handler, který je zavolán před serializací každé zprávy.
Prakticky každou zprávu je ve skutečnosti nutné serializovat, protože zprávy v Apache Kafce tvoří dále nestrukturovaná sekvence bajtů (jedinou datovou strukturou, kterou není potřeba serializovat, je typ bytes). Tudíž například i řetězce (přesněji řečeno Pythonovské řetězce) se explicitně převádí na hodnotu typu bytes, tj. na neměnitelnou (immutable) sekvenci bajtů. Převod řetězců do tuto sekvenci je realizován metodou encode, které se předá vyžadované kódování, tj. mapování mezi znaky na vstupu a jedním až více bajty ve výsledné sekvenci. V našem skriptu použijeme dnes standardní kódování UTF-8:
producer = KafkaProducer( bootstrap_servers=[server], value_serializer=lambda x: x.encode("utf-8") )
Poslání zprávy se provádí metodou KafkaProducer.send. Zpráva je, jak již víme z předchozího textu, uložena do zvoleného tématu (topic) a skládá se z těla, popř. klíče a těla:
producer.send(topic, value=data)
V našem prvním demonstračním producentovi budou posílány řetězce, které se budou postupně měnit podle hodnoty čítače (counter). Mezi jednotlivé zprávy jsou vkládány časové pauzy o délce přibližně jedné sekundy:
for i in range(1000): print(i) message = f"Greeting #{i}" producer.send(topic, value=message) sleep(1)
Takto vypadá úplný zdrojový kód producenta zpráv, který naleznete na adrese https://github.com/tisnik/most-popular-python-libs/blob/master/faust/greeting_producer.py:
#!/usr/bin/env python3 from kafka import KafkaProducer from time import sleep server = "localhost:9092" topic = "greetings" print("Connecting to Kafka") producer = KafkaProducer( bootstrap_servers=[server], value_serializer=lambda x: x.encode("utf-8") ) print("Connected to Kafka") for i in range(1000): print(i) message = f"Greeting #{i}" producer.send(topic, value=message) sleep(1)
6. Klasický konzument, který čte zprávy ze specifikovaného tématu
Implementace jednoduchého konzumenta zpráv v programovacím jazyku Python je poměrně snadná a vlastně se ani příliš neliší od implementace konzumenta. Nejdříve je nutné vytvořit instanci třídy KafkaConsumer, přičemž se specifikuje téma (topic), skupina, seznam brokerů a nepovinně též informace o tom, jakým způsobem se má nastavit offset první zpracované zprávy. Většinou budeme potřebovat zpracovávat nejnovější (earliest) zprávy, takže nastavení konzumenta může vypadat například následovně:
consumer = KafkaConsumer( topic, group_id=group_id, bootstrap_servers=[server], auto_offset_reset="earliest" )
Samotná konzumace (tedy čtení a zpracování) zpráv může probíhat v nekonečné programové smyčce, přičemž každá zpráva je reprezentována objektem obsahujícím téma, číslo oddílu, offset v rámci oddílu, klíč zprávy a samozřejmě též obsah zprávy (klíč zprávy a tělo zprávy není žádným způsobem dekódováno, resp. deserializováno):
for message in consumer: print("%s:%d:%d: key=%s value=%s" % ( message.topic, message.partition, message.offset, message.key, message.value, ) )
A takto vypadá úplný zdrojový kód běžného konzumenta zpráv, který naleznete na adrese https://github.com/tisnik/most-popular-python-libs/blob/master/faust/greeting_consumer.py:
#!/usr/bin/env python3 import sys from kafka import KafkaConsumer server = "localhost:9092" topic = "greetings" group_id = "group1" print("Connecting to Kafka") consumer = KafkaConsumer( topic, group_id=group_id, bootstrap_servers=[server], auto_offset_reset="earliest" ) print("Connected to Kafka") try: for message in consumer: print( "%s:%d:%d: key=%s value=%s" % ( message.topic, message.partition, message.offset, message.key, message.value, ) ) except KeyboardInterrupt: sys.exit()
7. Spuštění producenta a konzumenta
Nyní si funkce producenta a konzumenta otestujeme. V jednom terminálu spustíme producenta zpráv:
$ python3 greeting_producer.py
Producent začne posílat zprávy:
Connecting to Kafka Connected to Kafka 0 1 2 3 4 5 ... ... ...
Ve druhém terminálu naopak spustíme konzumenta:
$ python3 greeting_consumer.py
Nyní by měl konzument začít přijímat nové zprávy posílané s periodou přibližně jedné sekundy:
Connecting to Kafka Connected to Kafka greetings:0:90: key=None value=b'Greeting #0' greetings:0:91: key=None value=b'Greeting #1' greetings:0:92: key=None value=b'Greeting #2' greetings:0:93: key=None value=b'Greeting #3' greetings:0:94: key=None value=b'Greeting #4' greetings:0:95: key=None value=b'Greeting #5'
8. Konzument realizovaný formou workera postavený na knihovně Faust
Podívejme se nyní na to, jak by se konzument realizoval s využitím knihovny Faust. Tato knihovna je postavena na poněkud odlišných konceptech, než je klasický producent a konzument. Namísto konzumenta se zde používá termín worker, načítané zprávy mohou být popsány modelem a vše je zabaleno v aplikaci, která řídí konzumaci či produkci zpráv (resp. celý tok dat). Začněme nejdříve posledním zmíněným termínem aplikace. Jedná se o koncept umožňující nejenom spouštění workerů, ale i další činnosti, například práci s tabulkami (což si popíšeme v navazujícím článku), práci s takzvanými okny (pohledů na část sekvence zpráv) atd.
Objekt představující aplikaci se vytvoří konstruktorem App, kterému se musí předat minimálně jeden povinný parametr – identifikátor aplikace, což je řetězec. Ovšem předat je možné i další parametry, zejména adresu Kafka brokeru, způsob (kodek) serializace a deserializace zpráv atd. Vytvoření aplikace tedy může vypadat následovně:
app = faust.App( "hello-world", broker="kafka://localhost:9092", value_serializer="raw", )
V dalším kroku zkonstruujeme ještě jeden objekt, který bude reprezentovat téma (topic). V tomto případě se předává jen jméno tématu, i když opět platí, že je možné předat i další nepovinné parametry (schéma, typ klíčů, typ těl zpráv atd. atd.):
greetings_topic = app.topic("greetings")
Nyní je již možné nadefinovat asynchronní funkci, která bude konzumovat zprávy přicházející do nakonfigurovaného tématu. Tato funkce je obalena dekorátorem @app.agent, přičemž i dekorátor akceptuje různé parametry. My mu prozatím předáme pouze objekt představující téma:
@app.agent(greetings_topic) async def greet(greetings): async for greeting in greetings: print(greeting)
Posledním krokem je spuštění aplikace, což je již triviální operace:
if __name__ == "__main__": app.main()
Úplný zdrojový kód workera je možné najít na adrese https://github.com/tisnik/most-popular-python-libs/blob/master/faust/greeting_faust_consumer.py:
import faust app = faust.App( "hello-world", broker="kafka://localhost:9092", value_serializer="raw", ) greetings_topic = app.topic("greetings") @app.agent(greetings_topic) async def greet(greetings): async for greeting in greetings: print(greeting) if __name__ == "__main__": app.main()
9. Spuštění workera
V případě, že se pokusíme o spuštění workera běžným způsobem, tj. zadáním:
$ python greeting_faust_consumer.py
ukáže se nám pouze nápověda s informacemi, jaké možnosti nám worker umožňuje:
Usage: greeting_faust_consumer.py [OPTIONS] COMMAND [ARGS]... Welcome, see list of commands and options below. Use --help for help, --version for version information. https://faust-streaming.github.io/faust Options: --console-port RANGE[1-65535] when --debug: Port to run debugger console on. [default: 50101; 1<=x<=65535] --blocking-timeout FLOAT when --debug: Blocking detector timeout. -l, --loglevel [crit|error|warn|info|debug] Logging level to use. [default: WARN] -f, --logfile FILE Path to logfile (default is <stderr>). -L, --loop [aio|eventlet|uvloop] Event loop implementation to use. [default: aio] --json Return output in machine-readable JSON format -D, --datadir DIRECTORY Directory to keep application state. [default: {conf.name}-data] -W, --workdir DIRECTORY Working directory to change to after start. --debug / --no-debug Enable debugging output, and the blocking detector. [default: no-debug] -q, --quiet / --no-quiet Silence output to <stdout>/<stderr> [default: no-quiet] -A, --app TEXT Path of Faust application to use, or the name of a module. --version Show the version and exit. --help Show this message and exit. Commands: agents List agents. clean-versions Delete old version directories. completion Output shell completion to be evaluated by the shell. livecheck Manage LiveCheck instances. model Show model detail. models List all available models as a tabulated list. reset Delete local table state. send Send message to agent/topic. tables List available tables. worker Start worker instance for given app.
Vlastní spuštění se provede příkazem:
$ python3 greeting_faust_consumer.py worker
Nejprve se zobrazí tabulka se základními informacemi o workerovi:
┌ƒaµS† v0.11.1.dev4+ga489db3b───────────────────────────────────┐ │ id │ hello-world │ │ transport │ [URL('kafka://localhost:9092')] │ │ store │ memory: │ │ web │ http://localhost:6066/ │ │ log │ -stderr- (warn) │ │ pid │ 1296383 │ │ hostname │ ptisnovs.xxx.yyy.zzz │ │ platform │ CPython 3.11.8 (Linux x86_64) │ │ + │ Cython (GCC 13.2.1 20231011 (Red Hat 13.2.1-4)) │ │ drivers │ │ │ transport │ aiokafka=0.10.0 │ │ web │ aiohttp=3.9.5 │ │ datadir │ /tmp/ramdisk/faust/hello-world-data │ │ appdir │ /tmp/ramdisk/faust/hello-world-data/v1 │ └─────────────┴─────────────────────────────────────────────────┘ starting➢ 😊
Ihned po výpisu zprávy „starting“ by měl worker začít se čtením a zpracováváním zpráv, které jsou konzumovány z tématu „greetings“. Informace o každé zkonzumované zprávě je vypisována do terminálu:
[2024-04-20 18:03:39,583] [1296383] [WARNING] b'Greeting #0' [2024-04-20 18:03:39,583] [1296383] [WARNING] b'Greeting #1' [2024-04-20 18:03:39,584] [1296383] [WARNING] b'Greeting #2' [2024-04-20 18:03:39,584] [1296383] [WARNING] b'Greeting #3' [2024-04-20 18:03:39,584] [1296383] [WARNING] b'Greeting #4'
Můžeme se navíc pokusit o modifikaci nastavení úrovně logovacích zpráv, abychom získali více informací o spuštěném workerovi (workerech). Tím se přesvědčíme, jaké téma se používá atd.:
+ƒaµS† v0.11.0+-----------------------------------------------------------------------+ | id | hello-world | | transport | [URL('kafka://localhost:9092')] | | store | memory: | | web | http://localhost:6066/ | | log | -stderr- (info) | | pid | 1539476 | | hostname | 123.123.123.123 | | platform | CPython 3.11.8 (Linux x86_64) | | + | Cython (GCC 13.2.1 20231011 (Red Hat 13.2.1-4)) | | drivers | | | transport | aiokafka=0.10.0 | | web | aiohttp=3.9.1 | | datadir | /home/ptisnovs/src/most-popular-python-libs/faust/hello-world-data | | appdir | /home/ptisnovs/src/most-popular-python-libs/faust/hello-world-data/v1 | +-------------+-----------------------------------------------------------------------+ [2024-04-24 09:16:23,967] [1539476] [INFO] [^Worker]: Starting... [2024-04-24 09:16:23,968] [1539476] [INFO] [^-App]: Starting... [2024-04-24 09:16:23,969] [1539476] [INFO] [^--Monitor]: Starting... [2024-04-24 09:16:23,969] [1539476] [INFO] [^--Producer]: Starting... [2024-04-24 09:16:23,969] [1539476] [INFO] [^---ProducerBuffer]: Starting... [2024-04-24 09:16:23,976] [1539476] [INFO] [^--CacheBackend]: Starting... [2024-04-24 09:16:23,976] [1539476] [INFO] [^--Web]: Starting... [2024-04-24 09:16:23,976] [1539476] [INFO] [^---Server]: Starting... [2024-04-24 09:16:23,977] [1539476] [INFO] [^--Consumer]: Starting... [2024-04-24 09:16:23,978] [1539476] [INFO] [^---AIOKafkaConsumerThread]: Starting... [2024-04-24 09:16:23,983] [1539476] [INFO] [^--LeaderAssignor]: Starting... [2024-04-24 09:16:23,983] [1539476] [INFO] [^--Producer]: Creating topic 'hello-world-__assignor-__leader' [2024-04-24 09:16:23,986] [1539476] [INFO] [^--ReplyConsumer]: Starting... [2024-04-24 09:16:23,986] [1539476] [INFO] [^--AgentManager]: Starting... [2024-04-24 09:16:23,986] [1539476] [INFO] [^---Agent: __main__.greet]: Starting... [2024-04-24 09:16:23,987] [1539476] [INFO] [^----OneForOneSupervisor: (1@0x7fac75d2e6d0)]: Starting... [2024-04-24 09:16:23,987] [1539476] [INFO] [^---Conductor]: Starting... [2024-04-24 09:16:23,987] [1539476] [INFO] [^--TableManager]: Starting... [2024-04-24 09:16:23,987] [1539476] [INFO] [^---Conductor]: Waiting for agents to start... [2024-04-24 09:16:23,987] [1539476] [INFO] [^---Conductor]: Waiting for tables to be registered... [2024-04-24 09:16:24,988] [1539476] [INFO] [^---Recovery]: Starting... [2024-04-24 09:16:24,989] [1539476] [INFO] [^--Producer]: Creating topic 'hello-world-__assignor-__leader' [2024-04-24 09:16:24,993] [1539476] [INFO] Updating subscribed topics to: +Requested Subscription-----------+ | topic name | +---------------------------------+ | greetings | | hello-world-__assignor-__leader | +---------------------------------+ [2024-04-24 09:16:24,994] [1539476] [INFO] Subscribed to topic(s): +Final Subscription---------------+ | topic name | +---------------------------------+ | greetings | | hello-world-__assignor-__leader | +---------------------------------+ [2024-04-24 09:16:25,003] [1539476] [INFO] Discovered coordinator 100 for group hello-world [2024-04-24 09:16:25,004] [1539476] [INFO] Revoking previously assigned partitions set() for group hello-world [2024-04-24 09:16:25,005] [1539476] [INFO] (Re-)joining group hello-world [2024-04-24 09:16:25,011] [1539476] [INFO] Joined group 'hello-world' (generation 41) with member_id faust-0.11.0-cebf28b2-040d-41a0-a1f0-06881d9a9b0a [2024-04-24 09:16:25,011] [1539476] [INFO] Elected group leader -- performing partition assignments using faust [2024-04-24 09:16:25,016] [1539476] [INFO] Successfully synced group hello-world with generation 41 [2024-04-24 09:16:25,016] [1539476] [INFO] Setting newly assigned partitions +Topic Partition Set--------------+------------+ | topic | partitions | +---------------------------------+------------+ | greetings | {0} | | hello-world-__assignor-__leader | {0} | +---------------------------------+------------+ for group hello-world [2024-04-24 09:16:25,017] [1539476] [INFO] Executing _on_partitions_assigned [2024-04-24 09:16:25,018] [1539476] [INFO] generation id 41 app consumers id 41 [2024-04-24 09:16:25,020] [1539476] [INFO] [^---Recovery]: Seek stream partitions to committed offsets. [2024-04-24 09:16:25,022] [1539476] [INFO] [^---Recovery]: Resuming flow... [2024-04-24 09:16:25,022] [1539476] [INFO] [^---Fetcher]: Starting... [2024-04-24 09:16:25,023] [1539476] [INFO] [^---Recovery]: Worker ready [2024-04-24 09:16:25,023] [1539476] [INFO] [^Worker]: Ready
10. Producent posílající zprávy do většího množství témat
Mezi přednosti knihovny Faust patří především to, že lze spustit větší množství asynchronně běžících workerů. Abychom si to ukázali v praxi na co nejjednodušším příkladu, upravíme nepatrně našeho producenta zpráv takovým způsobem, aby posílal zprávy do dvou témat nazvaných „greetings“ a „real_work“. Tato úprava je snadná, protože postačuje do programové smyčky přidat další volání metod producer.send. Výsledný kód takto upraveného producenta zpráv lze nalézt na adrese https://github.com/tisnik/most-popular-python-libs/blob/master/faust/multi_producer_raw.py:
#!/usr/bin/env python3 from kafka import KafkaProducer from time import sleep from json import dumps server = "localhost:9092" topic1 = "greetings" topic2 = "real_work" print("Connecting to Kafka") producer = KafkaProducer( bootstrap_servers=[server], value_serializer=lambda x: dumps(x).encode("utf-8") ) print("Connected to Kafka") for i in range(1000): print(i) message = f"Greeting #{i}" producer.send(topic1, value=message) sleep(1) work = f"Real work #{i*2}" producer.send(topic2, value=work) work = f"Real work #{i*2+1}" producer.send(topic2, value=work) sleep(1)
11. Konzumace zpráv workerem z většího množství témat
Jak bude vypadat konzument zpráv (resp. konzumenti zpráv) v případě, že použijeme knihovnu Faust? Díky tomu, že každý worker je realizován asynchronní funkcí, je řešení až překvapivě snadné – každý worker bude v takovém případě definován ve vlastní funkci s dekorátorem @app.agent a každý z těchto workerů zpracovává zprávy z odlišného tématu:
@app.agent(greetings_topic) async def greet(greetings): async for greeting in greetings: print(f"Greeter: {greeting}") @app.agent(real_work_topic) async def worker(works): async for work in works: print(f"Worker: {work}")
O spouštění, koordinaci práce atd. se postará knihovna Faust automaticky, takže tento problém vlastně vůbec nemusíme na aplikační úrovni řešit. Výsledná aplikace s workery je dostupná na adrese https://github.com/tisnik/most-popular-python-libs/blob/master/faust/greeting_worker_consumer_raw.py a vypadá následovně:
import faust app = faust.App( "hello-world", broker="kafka://localhost:9092", value_serializer="raw", ) greetings_topic = app.topic("greetings") real_work_topic = app.topic("real_work") @app.agent(greetings_topic) async def greet(greetings): async for greeting in greetings: print(f"Greeter: {greeting}") @app.agent(real_work_topic) async def worker(works): async for work in works: print(f"Worker: {work}") if __name__ == "__main__": app.main()
12. Produkce zpráv ve formátu JSON
Velmi často se v praxi setkáme s tím, že zprávy jsou posílány a přijímány ve formátu JSON. To lze v praxi realizovat velmi snadno. Nejdříve si to ukažme u producenta zpráv, který není (alespoň prozatím) založen na knihovně Faust. Při inicializaci producenta pouze musíme specifikovat, jak se mají zprávy serializovat, a to s využitím funkce json.dumps:
#!/usr/bin/env python3 from kafka import KafkaProducer from time import sleep from json import dumps server = "localhost:9092" topic1 = "greetings" topic2 = "real_work" print("Connecting to Kafka") producer = KafkaProducer( bootstrap_servers=[server], value_serializer=lambda x: dumps(x).encode("utf-8") ) print("Connected to Kafka") for i in range(1000): print(i) message = {"subject": "Greeting", "value": i} producer.send(topic1, value=message) sleep(1) work = {"subject": "Real work", "value": i*2} producer.send(topic2, value=work) work = {"subject": "Real work", "value": i*2+1} producer.send(topic2, value=work) sleep(1)
13. Worker přijímající zprávy ve formátu JSON
Úprava workera, jenž má přijímat zprávy ve formátu JSON, je ve skutečnosti ještě snadnější. Pouze musíme specifikovat parametr value_serializer při inicializaci aplikace (tedy při vytváření instance třídy faust.App). Namísto hodnoty „raw“ nyní použijeme hodnotu „json“:
import faust app = faust.App( "hello-world", broker="kafka://localhost:9092", value_serializer="json", ) greetings_topic = app.topic("greetings") real_work_topic = app.topic("real_work") @app.agent(greetings_topic) async def greet(greetings): async for greeting in greetings: print(f"Greeter: {greeting}") @app.agent(real_work_topic) async def worker(works): async for work in works: print(f"Worker: {work}") if __name__ == "__main__": app.main()
14. Využití modelů
Velká síla knihovny Faust spočívá v tom, že je možné formát přijímaných či naopak posílaných zpráv popsat pomocí modelů, což vlastně není nic jiného než deklarace třídy se specifikací názvů a typů atributů této třídy (viz například systém Pydantic). Například zprávy (v tomto případě spíše události) s hodnotami zadanými při rezervaci uživatele do nějakého systému lze popsat následujícím jednoduchým modelem:
class Registration(faust.Record): name: str surname: str id: int
Vše si opět otestujeme na dvojici producent-konzument. Producent bude (prozatím) posílat zprávy ve formátu JSON, takže použijeme serializaci slovníků do JSONu. Slovník je konstruován v pomocné funkci user:
#!/usr/bin/env python3 from kafka import KafkaProducer from time import sleep from json import dumps def user(name, surname, id): return { "name": name, "surname": surname, "id": id } server = "localhost:9092" topic = "registrations" print("Connecting to Kafka") producer = KafkaProducer( bootstrap_servers=[server], value_serializer=lambda x: dumps(x).encode("utf-8") ) print("Connected to Kafka") producer.send(topic, value=user("Eliška", "Najbrtová", 4)) producer.send(topic, value=user("Jenny", "Suk", 3)) producer.send(topic, value=user("Anička", "Šafářová", 0)) producer.send(topic, value=user("Sváťa", "Pulec", 3)) producer.send(topic, value=user("Blažej", "Motyčka", 8)) producer.send(topic, value=user("Eda", "Wasserfall", 0)) producer.send(topic, value=user("Přemysl", "Hájek", 10)) producer.flush() print("Done")
Mnohem zajímavější je realizace workera, v níž využijeme model představovaný třídou Registration. Povšimněte si, že se model použije při specifikaci tématu – kromě jména tématu zadáme, že tělo zprávy bude tvořeno hodnotami, které je možné reprezentovat právě instancí třídy Registration. Příslušná deserializace bude provedena zcela automaticky:
import faust app = faust.App( "registrations", broker="kafka://localhost:9092", #value_serializer="json", ) class Registration(faust.Record): name: str surname: str id: int registrations_topic = app.topic("registrations", key_type=str, value_type=Registration) @app.agent(registrations_topic) async def register(registrations): async for registration in registrations: print(f"Registration: {registration}") if __name__ == "__main__": app.main()
15. Producent založený na knihovně Faust využívající model
Prozatím jsme pro vytvoření producenta knihovnu Faust vůbec nepoužili. Ovšem nyní, když již víme, že zprávy lze v případě potřeby popsat modelem, si můžeme ukázat, jak bude vypadat producent, který namísto knihovny kafka-python použije právě Faust. Samotný producent je opět představován workerem, který v intervalu přibližně pěti sekund pošle do tématu registrations serializované instance třídy Registration. Povšimněte si, že se zde již nemusíme starat o způsob serializace (zda se jedná o JSON atd.), protože postačuje, aby se obě strany (tedy producent i konzument) pouze shodly na jménu tématu a modelu:
import faust app = faust.App( "registrations", broker="kafka://localhost:9092", ) class Registration(faust.Record): name: str surname: str id: int registrations_topic = app.topic("registrations", key_type=str, value_type=Registration) @app.timer(interval=5.0) async def example_sender(app): await registrations_topic.send( value=Registration("Eliška", "Najbrtová", 4)) await registrations_topic.send( value=Registration("Jenny", "Suk", 3)) await registrations_topic.send( value=Registration("Anička", "Šafářová", 0)) await registrations_topic.send( value=Registration("Sváťa", "Pulec", 3)) await registrations_topic.send( value=Registration("Blažej", "Motyčka", 8)) await registrations_topic.send( value=Registration("Eda", "Wasserfall", 0)) await registrations_topic.send( value=Registration("Přemysl", "Hájek", 10)) if __name__ == "__main__": app.main()
16. Kombinace producenta a konzumenta – reálná síla knihovny Faust
Nyní se již dostáváme k vytvoření jednoduchého stream processoru realizovaného kompletně v knihovně Faust. Tento procesor kombinovat producenta z konzumentem v jediné aplikaci. Později dodáme i transformaci zpráv:
import faust app = faust.App( "registrations", broker="kafka://localhost:9092", ) class Registration(faust.Record): name: str surname: str id: int registrations_topic = app.topic("registrations", key_type=str, value_type=Registration) @app.timer(interval=5.0) async def example_sender(app): await registrations_topic.send( value=Registration("Eliška", "Najbrtová", 4)) await registrations_topic.send( value=Registration("Jenny", "Suk", 3)) await registrations_topic.send( value=Registration("Anička", "Šafářová", 0)) await registrations_topic.send( value=Registration("Sváťa", "Pulec", 3)) await registrations_topic.send( value=Registration("Blažej", "Motyčka", 8)) await registrations_topic.send( value=Registration("Eda", "Wasserfall", 0)) await registrations_topic.send( value=Registration("Přemysl", "Hájek", 10)) @app.agent(registrations_topic) async def register(registrations): async for registration in registrations: print(f"Registration: {registration}") if __name__ == "__main__": app.main()
17. Spuštění kombinace workerů s producenty a konzumenty
Nyní nám již zbývá maličkost, a to spuštění výše uvedené aplikace, v níž běží asynchronně worker produkující data a současně i worker, který tato data naopak konzumuje. Spuštění se opět provede nám již známým způsobem:
$ python3 registration_consumer_producer.py worker
Nejprve by se měly zobrazit informace o právě inicializované aplikaci:
┌ƒaµS† v0.11.1.dev4+ga489db3b───────────────────────────────────┐ │ id │ registrations │ │ transport │ [URL('kafka://localhost:9092')] │ │ store │ memory: │ │ web │ http://localhost:6066/ │ │ log │ -stderr- (warn) │ │ pid │ 1508601 │ │ hostname │ ptisnovs.xxx.yyy.zzz │ │ platform │ CPython 3.11.8 (Linux x86_64) │ │ + │ Cython (GCC 13.2.1 20231011 (Red Hat 13.2.1-4)) │ │ drivers │ │ │ transport │ aiokafka=0.10.0 │ │ web │ aiohttp=3.9.5 │ │ datadir │ /tmp/ramdisk/faust/registrations-data │ │ appdir │ /tmp/ramdisk/faust/registrations-data/v1 │ └─────────────┴─────────────────────────────────────────────────┘ starting➢
A posléze by se měly v logu objevit informace o zkonzumovaných (přijatých) zprávách:
[2024-04-24 16:13:00,702] [1576077] [WARNING] Registration: <Registration: name='Eliška', surname='Najbrtová', id=4> [2024-04-24 16:13:00,703] [1576077] [WARNING] Registration: <Registration: name='Jenny', surname='Suk', id=3> [2024-04-24 16:13:00,704] [1576077] [WARNING] Registration: <Registration: name='Anička', surname='Šafářová', id=0> [2024-04-24 16:13:00,704] [1576077] [WARNING] Registration: <Registration: name='Sváťa', surname='Pulec', id=3> [2024-04-24 16:13:00,704] [1576077] [WARNING] Registration: <Registration: name='Blažej', surname='Motyčka', id=8> [2024-04-24 16:13:00,705] [1576077] [WARNING] Registration: <Registration: name='Eda', surname='Wasserfall', id=0> [2024-04-24 16:13:00,705] [1576077] [WARNING] Registration: <Registration: name='Přemysl', surname='Hájek', id=10>
18. Obsah navazujícího článku
V navazujícím článku popis možností knihovny Faust dokončíme. Ukážeme si zejména práci s „okny“ (windows), které umožňují prohlížet a zpracovávat sekvenci po sobě jdoucích zpráv. A taktéž si řekneme, jakým způsobem se využívají lokální tabulky, které jsou v některých případech užitečné pro meziuložení stavu celého systému (persistenci) realizujícího a řídícího tok zpráv.
19. Repositář s demonstračními příklady
Zdrojové kódy všech prozatím popsaných demonstračních příkladů určených pro programovací jazyk Python 3 byly uloženy do Git repositáře dostupného na adrese https://github.com/tisnik/most-popular-python-libs:
# | Demonstrační příklad | Stručný popis příkladu | Cesta |
---|---|---|---|
1 | greeting_producer.py | klasický producent zpráv vytvořený bez použití knihovny Faust (používá knihovnu Kafka-python-ng) | https://github.com/tisnik/most-popular-python-libs/blob/master/faust/greeting_producer.py |
2 | greeting_consumer.py | klasický konzument zpráv vytvořený bez použití knihovny Faust (používá knihovnu Kafka-python-ng) | https://github.com/tisnik/most-popular-python-libs/blob/master/faust/greeting_consumer.py |
3 | greeting_faust_consumer.py | worker definovaný s využitím knihovny Faust | https://github.com/tisnik/most-popular-python-libs/blob/master/faust/greeting_faust_consumer.py |
4 | multi_producer_raw.py | producent zpráv do většího množství témat, zprávy jsou posílány jako řetězce | https://github.com/tisnik/most-popular-python-libs/blob/master/faust/multi_producer_raw.py |
5 | greeting_worker_consumer_raw.py | dvojice workerů definovaných s využitím knihovny Faust pro zprávy posílané jako sekvence bajtů | https://github.com/tisnik/most-popular-python-libs/blob/master/faust/greeting_worker_consumer_raw.py |
6 | multi_producer_json.py | producent zpráv do většího množství témat, zprávy jsou serializovány do formátu JSON | https://github.com/tisnik/most-popular-python-libs/blob/master/faust/multi_producer_json.py |
7 | greeting_worker_consumer_json.py | dvojice workerů definovaných s využitím knihovny Faust pro zprávy ve formátu JSON | https://github.com/tisnik/most-popular-python-libs/blob/master/faust/greeting_worker_consumer_json.py |
8 | registration_producer.py | producent zpráv ve formátu JSON obsahujících atributy objektů (používá knihovnu Kafka-python-ng) | https://github.com/tisnik/most-popular-python-libs/blob/master/faust/registration_producer.py |
9 | registration_consumer.py | konzument zpráv založený na knihovně Faust (pracuje s modelem) | https://github.com/tisnik/most-popular-python-libs/blob/master/faust/registration_consumer.py |
10 | registration_producer_faust.py | producent zpráv založený na knihovně Faust (pracuje s modelem) | https://github.com/tisnik/most-popular-python-libs/blob/master/faust/registration_producer_faust.py |
11 | registration_consumer_producer.py | tok dat od producenta ke konzumentovi, založeno na knihovně Faust | https://github.com/tisnik/most-popular-python-libs/blob/master/faust/registration_consumer_producer.py |
20. Odkazy na Internetu
- Faust – Python Stream Processing
https://faust-streaming.github.io/faust/ - Knihovna Faust na GitHubu
https://github.com/faust-streaming/faust - faust 1.10.4 na Pypi
https://pypi.org/project/faust/ - Introduction to Kafka Stream Processing in Python using Faust (video)
https://www.youtube.com/watch?v=Nt96udaC5Zk - Windowing in Kafka Streams using Faust Framework in Python | Tumbling Window (video)
https://www.youtube.com/watch?v=ZlBXg9Kp8vE - Processing Complex Type & Producing Messages to Topic using Faust
https://www.youtube.com/watch?v=DpTJYtiYK9w - Stream Processing with Python, Kafka & Faust
https://towardsdatascience.com/stream-processing-with-python-kafka-faust-a11740d0910c - ETL Batch Processing With Kafka?
https://medium.com/swlh/etl-batch-processing-with-kafka-7f66f843e20d - ETL with Kafka
https://blog.codecentric.de/en/2018/03/etl-kafka/ - Building ETL Pipelines with Clojure and Transducers
https://www.grammarly.com/blog/engineering/building-etl-pipelines-with-clojure-and-transducers/ - pipeline (možné použít pro ETL)
https://clojuredocs.org/clojure.core.async/pipeline - On Track with Apache Kafka – Building a Streaming ETL Solution with Rail Data
https://www.confluent.io/blog/build-streaming-etl-solutions-with-kafka-and-rail-data/ - Kafka – Understanding Offset Commits
https://www.logicbig.com/tutorials/misc/kafka/committing-offsets.html - fundingcircle/jackdaw (na Clojars)
https://clojars.org/fundingcircle/jackdaw/versions/0.7.6 - Dokumentace ke knihovně jackdaw
https://cljdoc.org/d/fundingcircle/jackdaw/0.7.6/doc/readme - Jackdaw AdminClient API
https://cljdoc.org/d/fundingcircle/jackdaw/0.7.6/doc/jackdaw-adminclient-api - Jackdaw Client API
https://cljdoc.org/d/fundingcircle/jackdaw/0.7.6/doc/jackdaw-client-api - Kafka.clj
https://github.com/helins-io/kafka.clj - Kafka mirroring (MirrorMaker)
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330 - Mastering Kafka migration with MirrorMaker 2
https://developers.redhat.com/articles/2024/01/04/mastering-kafka-migration-mirrormaker-2 - Apache Kafka MirrorMaker 2 (MM2) Part 1: Theory
https://www.instaclustr.com/blog/kafka-mirrormaker-2-theory/#h-2-replication-in-kafka - Apache Kafka MirrorMaker 2 (MM2) Part 2: Practice
https://www.instaclustr.com/blog/apache-kafka-mirrormaker-2-practice/ - Demystifying Kafka MirrorMaker 2: Use cases and architecture
https://developers.redhat.com/articles/2023/11/13/demystifying-kafka-mirrormaker-2-use-cases-and-architecture# - How to use Kafka MirrorMaker 2.0 in data migration, replication and the use-cases
https://learn.microsoft.com/en-us/azure/hdinsight/kafka/kafka-mirrormaker-2–0-guide - Release Notes – Kafka – Version 2.4.0
https://archive.apache.org/dist/kafka/2.4.0/RELEASE_NOTES.html - Kafka Mirror Maker Best Practices
https://community.cloudera.com/t5/Community-Articles/Kafka-Mirror-Maker-Best-Practices/ta-p/249269 - Apache Kafka MirrorMaker 2 (MM2) Part 1: Theory
https://www.instaclustr.com/blog/kafka-mirrormaker-2-theory/ - Kcli: is a kafka read only command line browser.
https://github.com/cswank/kcli - Kcli: a kafka command line browser
https://go.libhunt.com/kcli-alternatives - Kafka Connect and Schemas
https://rmoff.net/2020/01/22/kafka-connect-and-schemas/ - JSON and schemas
https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/#json-schemas - What, why, when to use Apache Kafka, with an example
https://www.startdataengineering.com/post/what-why-and-how-apache-kafka/ - When NOT to use Apache Kafka?
https://www.kai-waehner.de/blog/2022/01/04/when-not-to-use-apache-kafka/ - Microservices: The Rise Of Kafka
https://movio.co/blog/microservices-rise-kafka/ - Building a Microservices Ecosystem with Kafka Streams and KSQL
https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/ - An introduction to Apache Kafka and microservices communication
https://medium.com/@ulymarins/an-introduction-to-apache-kafka-and-microservices-communication-bf0a0966d63 - kappa-architecture.com
http://milinda.pathirage.org/kappa-architecture.com/ - Questioning the Lambda Architecture
https://www.oreilly.com/ideas/questioning-the-lambda-architecture - Lambda architecture
https://en.wikipedia.org/wiki/Lambda_architecture - Kafka – ecosystem (Wiki)
https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem - The Kafka Ecosystem – Kafka Core, Kafka Streams, Kafka Connect, Kafka REST Proxy, and the Schema Registry
http://cloudurable.com/blog/kafka-ecosystem/index.html - A Kafka Operator for Kubernetes
https://github.com/krallistic/kafka-operator - Kafka Streams
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams - Kafka Streams
http://kafka.apache.org/documentation/streams/ - Kafka Streams (FAQ)
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Streams - Event stream processing
https://en.wikipedia.org/wiki/Event_stream_processing - Part 1: Apache Kafka for beginners – What is Apache Kafka?
https://www.cloudkarafka.com/blog/2016–11–30-part1-kafka-for-beginners-what-is-apache-kafka.html - What are some alternatives to Apache Kafka?
https://www.quora.com/What-are-some-alternatives-to-Apache-Kafka - What is the best alternative to Kafka?
https://www.slant.co/options/961/alternatives/~kafka-alternatives - A super quick comparison between Kafka and Message Queues
https://hackernoon.com/a-super-quick-comparison-between-kafka-and-message-queues-e69742d855a8?gi=e965191e72d0 - Kafka Queuing: Kafka as a Messaging System
https://dzone.com/articles/kafka-queuing-kafka-as-a-messaging-system - Apache Kafka Logs: A Comprehensive Guide
https://hevodata.com/learn/apache-kafka-logs-a-comprehensive-guide/ - Microservices – Not a free lunch!
http://highscalability.com/blog/2014/4/8/microservices-not-a-free-lunch.html - Microservices, Monoliths, and NoOps
http://blog.arungupta.me/microservices-monoliths-noops/ - Microservice Design Patterns
http://blog.arungupta.me/microservice-design-patterns/ - REST vs Messaging for Microservices – Which One is Best?
https://solace.com/blog/experience-awesomeness-event-driven-microservices/ - Kappa Architecture Our Experience
https://events.static.linuxfound.org/sites/events/files/slides/ASPgems%20-%20Kappa%20Architecture.pdf - Apache Kafka Streams and Tables, the stream-table duality
https://towardsdatascience.com/apache-kafka-streams-and-tables-the-stream-table-duality-ee904251a7e?gi=f22a29cd1854 - Configure Self-Managed Connectors
https://docs.confluent.io/kafka-connectors/self-managed/configuring.html#configure-self-managed-connectors - Schema Evolution and Compatibility
https://docs.confluent.io/platform/current/schema-registry/avro.html#schema-evolution-and-compatibility - Configuring Key and Value Converters
https://docs.confluent.io/kafka-connectors/self-managed/userguide.html#configuring-key-and-value-converters - Introduction to Kafka Connectors
https://www.baeldung.com/kafka-connectors-guide - Kafka CLI: command to list all consumer groups for a topic?
https://stackoverflow.com/questions/63883999/kafka-cli-command-to-list-all-consumer-groups-for-a-topic - Java Property File Processing
https://www.w3resource.com/java-tutorial/java-propertyfile-processing.php - Skipping bad records with the Kafka Connect JDBC sink connector
https://rmoff.net/2019/10/15/skipping-bad-records-with-the-kafka-connect-jdbc-sink-connector/ - Kafka Connect Deep Dive – Error Handling and Dead Letter Queues
https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/ - Errors and Dead Letter Queues
https://developer.confluent.io/learn-kafka/kafka-connect/error-handling-and-dead-letter-queues/ - Confluent Cloud Dead Letter Queue
https://docs.confluent.io/cloud/current/connectors/dead-letter-queue.html - Dead Letter Queues (DLQs) in Kafka
https://medium.com/@sannidhi.s.t/dead-letter-queues-dlqs-in-kafka-afb4b6835309 - Deserializer
https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-json.html#json-schema-serializer-and-deserializer - JSON, Kafka, and the need for schema
https://mikemybytes.com/2022/07/11/json-kafka-and-the-need-for-schema/ - Using Kafka Connect with Schema Registry
https://docs.confluent.io/platform/current/schema-registry/connect.html - Zpracování dat reprezentovaných ve formátu JSON nástrojem jq
https://www.root.cz/clanky/zpracovani-dat-reprezentovanych-ve-formatu-json-nastrojem-jq/ - Repositář projektu jq (GitHub)
https://github.com/stedolan/jq - GitHub stránky projektu jq
https://stedolan.github.io/jq/ - 5 modern alternatives to essential Linux command-line tools
https://opensource.com/article/20/6/modern-linux-command-line-tools - Návod k nástroji jq
https://stedolan.github.io/jq/tutorial/ - jq Manual (development version)
https://stedolan.github.io/jq/manual/ - Introducing JSON
https://www.json.org/json-en.html - Understanding JSON schema
https://json-schema.org/understanding-json-schema/index.html - JDBC Sink Connector for Confluent Platform
https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/overview.html#jdbc-sink-connector-for-cp - JDBC Connector (Source and Sink)
https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc - Introduction to Schema Registry in Kafka
https://medium.com/slalom-technology/introduction-to-schema-registry-in-kafka-915ccf06b902 - Understanding JSON Schema Compatibility
https://yokota.blog/2021/03/29/understanding-json-schema-compatibility/