Faust: platforma pro proudové zpracování dat v Pythonu

25. 4. 2024
Doba čtení: 34 minut

Sdílet

 Autor: Root.cz s využitím DALL-E
Knihovna Faust zajišťuje proudové zpracování dat a je postavena nad Apache Kafkou. Nejedná se však o pouhou realizaci producentů a konzumentů, protože je možné používat lokální tabulky, asynchronní zpracování atd.

Obsah

1. Faust: platforma pro proudové zpracování dat v Pythonu

2. Potenciálně problematická část – instalace knihovny Faust

3. Instalace, konfigurace a spuštění Apache Kafky

4. Otestování činnosti Apache Kafky

5. Klasický producent posílající zprávy do Kafky

6. Klasický konzument, který čte zprávy ze specifikovaného tématu

7. Spuštění producenta a konzumenta

8. Konzument realizovaný formou workera postavený na knihovně Faust

9. Spuštění workera

10. Producent posílající zprávy do většího množství témat

11. Konzumace zpráv workerem z většího množství témat

12. Produkce zpráv ve formátu JSON

13. Worker přijímající zprávy ve formátu JSON

14. Využití modelů

15. Producent založený na knihovně Faust využívající model

16. Kombinace producenta a konzumenta – reálná síla knihovny Faust

17. Spuštění kombinace workerů s producenty a konzumenty

18. Obsah navazujícího článku

19. Repositář s demonstračními příklady

20. Odkazy na Internetu

1. Faust: platforma pro proudové zpracování dat v Pythonu

V dnešním článku se seznámíme s knihovnou Faust, která umožňuje realizaci platformy pro proudové zpracování dat (stream processing) založené na systému Apache Kafky. V knihovně Faust se nepracuje (resp. nemusí pracovat) se standardními samostatně běžícími producenty a konzumenty. Namísto toho se nabízí možnost realizace zpracování zpráv (událostí) z jednoho tématu s posíláním výsledků do dalšího tématu či témat s využitím asynchronně naprogramovaných workerů. Navíc je možné pro popis struktury zpráv použít takzvané modely, což vlastně není nic jiného než popis struktury zprávy pomocí třídy. Jak workeři, tak i modely jsou přitom zapisovány jako běžný kód v Pythonu, tj. na rozdíl od některých jiných systémů pro realizaci stream processingu se zde nemusí používat specializovaný doménově specifický jazyk (DSL).

Obrázek 1: Jednoduchý stream processing založený na systému Apache Kafky.

Navíc Faust podporuje persistenci s využitím tabulek (interně se vlastně jedná o persistentní slovníky), což je téma, kterému se budeme podrobněji věnovat v navazujícím článku.

Obrázek 2: Nepatrně složitější konfigurace stream processorů založených na systému Apache Kafky.

Poznámka: prakticky všechny operace jsou v knihovně Faust realizovány asynchronním kódem. Na tuto vlastnost si lze snadno zvyknout a konkrétně pro Python (s jeho GILem atd.) se jedná o elegantní řešení.

2. Potenciálně problematická část – instalace knihovny Faust

Teoreticky je možné knihovnu Faust nainstalovat naprosto stejným způsobem, jako jakoukoli jinou knihovnu dostupnou přes PyPi. Pro instalaci tímto stylem postačuje zadat příkaz:

$ pip install --user faust-streaming

Měly by se nainstalovat i všechny závislosti:

Collecting faust-streaming
  Downloading faust_streaming-0.11.0-cpc311-cpc311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (24 kB)
Collecting aiohttp<4.0,>=3.8.0 (from faust-streaming)
  Downloading aiohttp-3.9.5-cpc311-cpc311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.5 kB)
Collecting aiohttp-cors<2.0,>=0.7 (from faust-streaming)
  Downloading aiohttp_cors-0.7.0-py3-none-any.whl.metadata (20 kB)
Collecting aiokafka>=0.9.0 (from faust-streaming)
  Downloading aiokafka-0.10.0-cpc311-cpc311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (17 kB)
Requirement already satisfied: click<8.2,>=6.7 in ./.local/lib/python3.8/site-packages (from faust-streaming) (8.1.7)
Collecting mode-streaming>=0.4.0 (from faust-streaming)
  Downloading mode_streaming-0.4.1-py2.py3-none-any.whl.metadata (16 kB)
Collecting opentracing<=2.4.0,>=1.3.0 (from faust-streaming)
  Downloading opentracing-2.4.0.tar.gz (46 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 46.2/46.2 kB 1.2 MB/s eta 0:00:00
  Preparing metadata (setup.py) ... done
Collecting terminaltables<4.0,>=3.1 (from faust-streaming)
  Downloading terminaltables-3.1.10-py2.py3-none-any.whl.metadata (3.5 kB)
Requirement already satisfied: yarl<2.0,>=1.0 in ./.local/lib/python3.8/site-packages (from faust-streaming) (1.9.4)
Collecting croniter>=0.3.16 (from faust-streaming)
  Downloading croniter-2.0.5-py2.py3-none-any.whl.metadata (25 kB)
Requirement already satisfied: mypy-extensions in ./.local/lib/python3.8/site-packages (from faust-streaming) (1.0.0)
Collecting venusian==3.1.0 (from faust-streaming)
  Downloading venusian-3.1.0-py3-none-any.whl.metadata (10 kB)
Collecting intervaltree (from faust-streaming)
  Downloading intervaltree-3.1.0.tar.gz (32 kB)
  Preparing metadata (setup.py) ... done
Requirement already satisfied: six in /usr/lib/python3/dist-packages (from faust-streaming) (1.14.0)
Collecting aiosignal>=1.1.2 (from aiohttp<4.0,>=3.8.0->faust-streaming)
  Downloading aiosignal-1.3.1-py3-none-any.whl.metadata (4.0 kB)
Requirement already satisfied: attrs>=17.3.0 in ./.local/lib/python3.8/site-packages (from aiohttp<4.0,>=3.8.0->faust-streaming) (23.1.0)
Requirement already satisfied: frozenlist>=1.1.1 in ./.local/lib/python3.8/site-packages (from aiohttp<4.0,>=3.8.0->faust-streaming) (1.4.1)
Requirement already satisfied: multidict<7.0,>=4.5 in ./.local/lib/python3.8/site-packages (from aiohttp<4.0,>=3.8.0->faust-streaming) (6.0.4)
Requirement already satisfied: async-timeout<5.0,>=4.0 in ./.local/lib/python3.8/site-packages (from aiohttp<4.0,>=3.8.0->faust-streaming) (4.0.3)
Requirement already satisfied: packaging in ./.local/lib/python3.8/site-packages (from aiokafka>=0.9.0->faust-streaming) (23.2)
Collecting python-dateutil (from croniter>=0.3.16->faust-streaming)
  Downloading python_dateutil-2.9.0.post0-py2.py3-none-any.whl.metadata (8.4 kB)
Collecting pytz>2021.1 (from croniter>=0.3.16->faust-streaming)
  Downloading pytz-2024.1-py2.py3-none-any.whl.metadata (22 kB)
Collecting colorlog<7.0.0,>=6.0.0 (from mode-streaming>=0.4.0->faust-streaming)
  Downloading colorlog-6.8.2-py3-none-any.whl.metadata (10 kB)
Requirement already satisfied: idna>=2.0 in /usr/lib/python3/dist-packages (from yarl<2.0,>=1.0->faust-streaming) (2.8)
Collecting sortedcontainers<3.0,>=2.0 (from intervaltree->faust-streaming)
  Downloading sortedcontainers-2.4.0-py2.py3-none-any.whl.metadata (10 kB)
Downloading faust_streaming-0.11.0-cpc311-cpc311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.2 MB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 1.2/1.2 MB 3.1 MB/s eta 0:00:00
Downloading venusian-3.1.0-py3-none-any.whl (13 kB)
Downloading aiohttp-3.9.5-cpc311-cpc311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 1.3/1.3 MB 2.7 MB/s eta 0:00:00
Downloading aiohttp_cors-0.7.0-py3-none-any.whl (27 kB)
Downloading aiokafka-0.10.0-cpc311-cpc311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.1 MB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 1.1/1.1 MB 2.4 MB/s eta 0:00:00
Downloading croniter-2.0.5-py2.py3-none-any.whl (20 kB)
Downloading mode_streaming-0.4.1-py2.py3-none-any.whl (94 kB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 94.7/94.7 kB 1.9 MB/s eta 0:00:00
Downloading terminaltables-3.1.10-py2.py3-none-any.whl (15 kB)
Downloading aiosignal-1.3.1-py3-none-any.whl (7.6 kB)
Downloading colorlog-6.8.2-py3-none-any.whl (11 kB)
Downloading pytz-2024.1-py2.py3-none-any.whl (505 kB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 505.5/505.5 kB 2.5 MB/s eta 0:00:00
Downloading sortedcontainers-2.4.0-py2.py3-none-any.whl (29 kB)
Downloading python_dateutil-2.9.0.post0-py2.py3-none-any.whl (229 kB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 229.9/229.9 kB 2.4 MB/s eta 0:00:00
Building wheels for collected packages: opentracing, intervaltree
  Building wheel for opentracing (setup.py) ... done
  Created wheel for opentracing: filename=opentracing-2.4.0-py3-none-any.whl size=51403 sha256=32bff185cd7a87b97f48f23ba6447ac311acdaaadf2f0e9fa3a1a5371c09cf6867
  Stored in directory: /home/ptisnovs/.cache/pip/wheels/c7/d1/cc/de262a2bc00e9fdfe32334094183aa9a78a2458dca9212145c
  Building wheel for intervaltree (setup.py) ... done
  Created wheel for intervaltree: filename=intervaltree-3.1.0-py2.py3-none-any.whl size=26102 sha256=091f3c2a6e2bcca88cbc2f214fbd466ee7e3fb5fea612ee6602164c4bc87107a
  Stored in directory: /home/ptisnovs/.cache/pip/wheels/45/23/de/5789a92962483fd33cb06674792b9697c1b3766d7c7742830e
Successfully built opentracing intervaltree
Installing collected packages: sortedcontainers, pytz, opentracing, venusian, terminaltables, python-dateutil, intervaltree, colorlog, aiosignal, aiokafka, croniter, aiohttp, mode-streaming, aiohttp-cors, faust-streaming
Successfully installed aiohttp-3.9.5 aiohttp-cors-0.7.0 aiokafka-0.10.0 aiosignal-1.3.1 colorlog-6.8.2 croniter-2.0.5 faust-streaming-0.11.0 intervaltree-3.1.0 mode-streaming-0.4.1 opentracing-2.4.0 python-dateutil-2.9.0.post0 pytz-2024.1 sortedcontainers-2.4.0 terminaltables-3.1.10 venusian-3.1.0

To však bude bez problémů fungovat pouze pro starší verze Pythonu (CPythonu). V případě novějších verzí Pythonu (3.12, 3.13), v nichž došlo ke změně balíčků pro asynchronní běh aplikací, nebude možné takto nainstalovaný Faust použít. Někdy se tedy musíme uchýlit k jiné formě spuštění (což ani zdaleka není ideální situace!):

  1. Naklonujeme repositář https://github.com/faust-streaming/faust
  2. Vytvoříme virtuální prostředí Pythonu příkazem python3 -m venv venv && source venv/bin/activate
  3. Nainstalujeme do virtuálního prostředí všechny potřebné knihovny příkazem: ./scripts/install

Po těchto třech krocích bude možné knihovnu Faust používat v rámci právě inicializovaného virtuálního prostředí.

3. Instalace, konfigurace a spuštění Apache Kafky

Dnešní článek je zaměřen na ukázku některých možností nabízených knihovnou Faust, která interně využívá známý projekt Apache Kafka. To tedy znamená, že kromě instalace knihovny Faust budeme navíc potřebovat spustit (lokálně, vzdáleně, či v kontejneru) i Apache Kafku. Konkrétně nám bude postačovat spuštění jediného Kafka clusteru, přičemž se každý Kafka cluster skládá z jednoho běžícího Zookeepera a z jednoho brokera (tedy ze dvou procesů, které mezi sebou komunikují po nakonfigurovaných portech).

V praktické části budeme brokera Apache Kafky i Zookeepera spouštět lokálně (popř. z Dockeru či Podmana), takže je nejdříve nutné Apache Kafku nainstalovat. Není to ve skutečnosti vůbec nic složitého. V případě, že je na počítači nainstalováno JRE (běhové prostředí Javy), je instalace Apache Kafky pro testovací účely triviální. V článku si ukážeme instalaci verze 2.13–3.6.1, ovšem můžete si stáhnout i prakticky libovolnou novější či některé starší verze (3.5.x nebo 3.6.x, ovšem dále popsaný postup by měl být platný i pro ještě starší verze, v podstatě můžeme dojít až k verzi 2.4.0 a vše by mělo být funkční). Tarball s instalací Apache Kafky je možné získat z adresy https://downloads.apache.or­g/kafka/3.6.1/kafka2.13–3.6.1.tgz.

Stažení a rozbalení tarballu s Apache Kafkou zajistí následující sekvence příkazů:

$ wget https://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz
$ tar xvfz kafka_2.13-3.6.1.tgz
$ cd kafka_2.13-3.6.1/

Po rozbalení staženého tarballu získáme adresář, v němž se nachází všechny potřebné Java archivy (JAR), konfigurační soubory (v podadresáři config) a několik pomocných skriptů (v podadresáři bin a bin/windows). Pro spuštění Zookeepera a brokerů je zapotřebí, jak jsme si již řekli v předchozím odstavci, mít nainstalovánu JRE (Java Runtime Environment) a samozřejmě též nějaký shell (BASH, cmd, …).

.
├── bin
│   └── windows
├── config
│   └── kraft
├── libs
├── licenses
└── site-docs
 
7 directories

Mezi důležité soubory, které budeme používat v rámci dalších kapitol pro ukázku možností knihovny Faust, patří především skripty pro spouštění jednotlivých služeb. Tyto skripty jsou uloženy v podadresáři bin (a pro Windows ještě v dalším podadresáři windows):

Skript Stručný popis
bin/kafka-server-start.sh spuštění brokera
bin/zookeeper-server-start.sh spuštění Zookeepera
   
bin/kafka-configs.sh konfigurace brokerů
bin/kafka-topics.sh konfigurace témat, zjištění informace o tématech atd.
bin/kafka-consumer-groups.sh konfigurace, popř. zjištění informací o skupinách konzumentů
Poznámka: většina výše uvedených skriptů byla upravena i pro spuštění ve Windows. Tyto varianty naleznete v podadresáři bin/windows a namísto koncovky .sh mají koncovku bat (batch).

Nyní již máme všechny potřebné nástroje k dispozici, takže Kafka cluster spustíme. Nejdříve je vždy nutné spustit Zookeepera a teprve poté brokera či brokery. Pro sledování činnosti obou procesů si můžete Zookeepera i brokera spustit v samostatném terminálu, využít nástroj screen atd.

Spuštění Zookeepera:

$ cd ${kafka_dir}
$ bin/zookeeper-server-start.sh config/zookeeper.properties

Spuštění brokera:

$ cd ${kafka_dir}
$ bin/kafka-server-start.sh config/server.properties
Poznámka: povšimněte si, že používáme konfigurační soubory dodávané přímo s Apache Kafkou, tedy soubory config/zookeeper.properties a config/server.properties. V případě potřeby si můžete tyto soubory zkopírovat, kopie upravit a ty dále použít.

4. Otestování činnosti Apache Kafky

Pro otestování, jestli je právě zprovozněný Kafka cluster skutečně funkční, si vytvoříme jednoduché producenty a konzumenty zpráv. Použijeme přitom programovací jazyk Python, protože i demonstrační příklady využívající knihovnu Faust budou naprogramovány v Pythonu. Samozřejmě by bylo možné použít konzolového producenta a konzumenta, který je součástí instalace Apache Kafky, ovšem možnosti těchto nástrojů jsou ve skutečnosti velmi omezené a nebudou nám postačovat v případě, že budeme chtít produkovat či naopak konzumovat složitější datové struktury a objekty.

Před vytvořením producenta zpráv v Pythonu je nutné nainstalovat knihovnu, která zabezpečuje připojení ke clusteru Apache Kafky. Takových knihoven dnes existuje několik. Nejjednodušší z těchto knihoven (bez dalších závislostí atd.) je knihovna nazvaná kafka-python. Vzhledem k tomu, že autoři této knihovny nevydávají časté updaty, vznikl její fork nazvaný jednoduše kafka-python-ng; a právě tento fork dnes použijeme. Tuto knihovnu můžeme nainstalovat pouze pro aktivního uživatele (což je nejjednodušší a nevyžaduje to rootovská práva) s využitím příkazu pip, protože tato knihovna je pochopitelně dostupná na PyPi:

$ pip3 install --verbose kafka-python-ng
 
Using pip 22.3.1 from /usr/lib/python3.11/site-packages/pip (python 3.11)
Defaulting to user installation because normal site-packages is not writeable
Collecting kafka-python-ng
  Downloading kafka_python_ng-2.2.2-py2.py3-none-any.whl (232 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 232.4/232.4 kB 1.5 MB/s eta 0:00:00
Installing collected packages: kafka-python-ng
Successfully installed kafka-python-ng-2.2.2
Poznámka: povšimněte si, že tato knihovna skutečně nemá žádné tranzitivní závislosti.

5. Klasický producent posílající zprávy do Kafky

Producent, tedy aplikace posílající zprávy do Apache Kafky, je představován instancí třídy KafkaProducer. Konstruktoru této třídy se předává pouze seznam brokerů (v našem případě jediný broker, protože náš Kafka cluster obsahuje jediného brokera) a popř. i další nepovinné údaje. Mezi ně patří i takzvaný handler, který je zavolán před serializací každé zprávy.

Prakticky každou zprávu je ve skutečnosti nutné serializovat, protože zprávy v Apache Kafce tvoří dále nestrukturovaná sekvence bajtů (jedinou datovou strukturou, kterou není potřeba serializovat, je typ bytes). Tudíž například i řetězce (přesněji řečeno Pythonovské řetězce) se explicitně převádí na hodnotu typu bytes, tj. na neměnitelnou (immutable) sekvenci bajtů. Převod řetězců do tuto sekvenci je realizován metodou encode, které se předá vyžadované kódování, tj. mapování mezi znaky na vstupu a jedním až více bajty ve výsledné sekvenci. V našem skriptu použijeme dnes standardní kódování UTF-8:

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=lambda x: x.encode("utf-8")
)

Poslání zprávy se provádí metodou KafkaProducer.send. Zpráva je, jak již víme z předchozího textu, uložena do zvoleného tématu (topic) a skládá se z těla, popř. klíče a těla:

producer.send(topic, value=data)

V našem prvním demonstračním producentovi budou posílány řetězce, které se budou postupně měnit podle hodnoty čítače (counter). Mezi jednotlivé zprávy jsou vkládány časové pauzy o délce přibližně jedné sekundy:

for i in range(1000):
    print(i)
    message = f"Greeting #{i}"
    producer.send(topic, value=message)
    sleep(1)
Poznámka: aby byla zpráva skutečně poslána, je vhodné zavolat metodu flush. V tomto konkrétním skriptu to neděláme, protože po naplnění bufferu nebo uplynutí určité doby je zpráva poslána i bez tohoto volání. Později ovšem budeme muset flush explicitně volat.

Takto vypadá úplný zdrojový kód producenta zpráv, který naleznete na adrese https://github.com/tisnik/most-popular-python-libs/blob/master/faust/gre­eting_producer.py:

#!/usr/bin/env python3
 
from kafka import KafkaProducer
from time import sleep
 
 
server = "localhost:9092"
topic = "greetings"
 
print("Connecting to Kafka")
producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=lambda x: x.encode("utf-8")
)
print("Connected to Kafka")
 
for i in range(1000):
    print(i)
    message = f"Greeting #{i}"
    producer.send(topic, value=message)
    sleep(1)
Poznámka: povšimněte si, že nám pro poslání zprávy postačuje znát adresu brokera a jméno tématu. Konzument (popsaný v navazující kapitole) je nepatrně složitější, protože je nutné specifikovat i jméno skupiny (consumer group).

6. Klasický konzument, který čte zprávy ze specifikovaného tématu

Implementace jednoduchého konzumenta zpráv v programovacím jazyku Python je poměrně snadná a vlastně se ani příliš neliší od implementace konzumenta. Nejdříve je nutné vytvořit instanci třídy KafkaConsumer, přičemž se specifikuje téma (topic), skupina, seznam brokerů a nepovinně též informace o tom, jakým způsobem se má nastavit offset první zpracované zprávy. Většinou budeme potřebovat zpracovávat nejnovější (earliest) zprávy, takže nastavení konzumenta může vypadat například následovně:

consumer = KafkaConsumer(
    topic, group_id=group_id,
    bootstrap_servers=[server],
    auto_offset_reset="earliest"
)

Samotná konzumace (tedy čtení a zpracování) zpráv může probíhat v nekonečné programové smyčce, přičemž každá zpráva je reprezentována objektem obsahujícím téma, číslo oddílu, offset v rámci oddílu, klíč zprávy a samozřejmě též obsah zprávy (klíč zprávy a tělo zprávy není žádným způsobem dekódováno, resp. deserializováno):

for message in consumer:
    print("%s:%d:%d: key=%s value=%s" % (
            message.topic,
            message.partition,
            message.offset,
            message.key,
            message.value,
        )
    )
Poznámka: ve skriptu je navíc přidána reakce na stisk klávesové zkratky Ctrl+C pro ukončení činnosti nekonečné smyčky.

A takto vypadá úplný zdrojový kód běžného konzumenta zpráv, který naleznete na adrese https://github.com/tisnik/most-popular-python-libs/blob/master/faust/gre­eting_consumer.py:

#!/usr/bin/env python3
 
import sys
from kafka import KafkaConsumer
 
server = "localhost:9092"
topic = "greetings"
group_id = "group1"
 
print("Connecting to Kafka")
consumer = KafkaConsumer(
    topic, group_id=group_id,
    bootstrap_servers=[server],
    auto_offset_reset="earliest"
)
print("Connected to Kafka")
 
try:
    for message in consumer:
        print(
            "%s:%d:%d: key=%s value=%s"
            % (
                message.topic,
                message.partition,
                message.offset,
                message.key,
                message.value,
            )
        )
except KeyboardInterrupt:
    sys.exit()

7. Spuštění producenta a konzumenta

Nyní si funkce producenta a konzumenta otestujeme. V jednom terminálu spustíme producenta zpráv:

$ python3 greeting_producer.py

Producent začne posílat zprávy:

Connecting to Kafka
Connected to Kafka
0
1
2
3
4
5
...
...
...

Ve druhém terminálu naopak spustíme konzumenta:

$ python3 greeting_consumer.py

Nyní by měl konzument začít přijímat nové zprávy posílané s periodou přibližně jedné sekundy:

Connecting to Kafka
Connected to Kafka
greetings:0:90: key=None value=b'Greeting #0'
greetings:0:91: key=None value=b'Greeting #1'
greetings:0:92: key=None value=b'Greeting #2'
greetings:0:93: key=None value=b'Greeting #3'
greetings:0:94: key=None value=b'Greeting #4'
greetings:0:95: key=None value=b'Greeting #5'
Poznámka: povšimněte si, že tělo zpráv je tvořeno hodnotami typu bytes a nikoli řetězcem. Převod na řetězec je ovšem triviální a lze ho provést zavoláním jediné metody.

8. Konzument realizovaný formou workera postavený na knihovně Faust

Podívejme se nyní na to, jak by se konzument realizoval s využitím knihovny Faust. Tato knihovna je postavena na poněkud odlišných konceptech, než je klasický producent a konzument. Namísto konzumenta se zde používá termín worker, načítané zprávy mohou být popsány modelem a vše je zabaleno v aplikaci, která řídí konzumaci či produkci zpráv (resp. celý tok dat). Začněme nejdříve posledním zmíněným termínem aplikace. Jedná se o koncept umožňující nejenom spouštění workerů, ale i další činnosti, například práci s tabulkami (což si popíšeme v navazujícím článku), práci s takzvanými okny (pohledů na část sekvence zpráv) atd.

Objekt představující aplikaci se vytvoří konstruktorem App, kterému se musí předat minimálně jeden povinný parametr – identifikátor aplikace, což je řetězec. Ovšem předat je možné i další parametry, zejména adresu Kafka brokeru, způsob (kodek) serializace a deserializace zpráv atd. Vytvoření aplikace tedy může vypadat následovně:

app = faust.App(
    "hello-world",
    broker="kafka://localhost:9092",
    value_serializer="raw",
)

V dalším kroku zkonstruujeme ještě jeden objekt, který bude reprezentovat téma (topic). V tomto případě se předává jen jméno tématu, i když opět platí, že je možné předat i další nepovinné parametry (schéma, typ klíčů, typ těl zpráv atd. atd.):

greetings_topic = app.topic("greetings")

Nyní je již možné nadefinovat asynchronní funkci, která bude konzumovat zprávy přicházející do nakonfigurovaného tématu. Tato funkce je obalena dekorátorem @app.agent, přičemž i dekorátor akceptuje různé parametry. My mu prozatím předáme pouze objekt představující téma:

@app.agent(greetings_topic)
async def greet(greetings):
    async for greeting in greetings:
        print(greeting)
Poznámka: často se v praxi setkáme s tím, že je parametr asynchronní funkce s workerem pojmenován stream.

Posledním krokem je spuštění aplikace, což je již triviální operace:

if __name__ == "__main__":
    app.main()

Úplný zdrojový kód workera je možné najít na adrese https://github.com/tisnik/most-popular-python-libs/blob/master/faust/gre­eting_faust_consumer.py:

import faust
 
app = faust.App(
    "hello-world",
    broker="kafka://localhost:9092",
    value_serializer="raw",
)
 
greetings_topic = app.topic("greetings")
 
@app.agent(greetings_topic)
async def greet(greetings):
    async for greeting in greetings:
        print(greeting)
 
 
if __name__ == "__main__":
    app.main()

9. Spuštění workera

V případě, že se pokusíme o spuštění workera běžným způsobem, tj. zadáním:

$ python greeting_faust_consumer.py

ukáže se nám pouze nápověda s informacemi, jaké možnosti nám worker umožňuje:

Usage: greeting_faust_consumer.py [OPTIONS] COMMAND [ARGS]...
 
  Welcome, see list of commands and options below.
 
  Use --help for help, --version for version information.
 
  https://faust-streaming.github.io/faust
 
Options:
  --console-port RANGE[1-65535]   when --debug: Port to run debugger console
                                  on.  [default: 50101; 1<=x<=65535]
  --blocking-timeout FLOAT        when --debug: Blocking detector timeout.
  -l, --loglevel [crit|error|warn|info|debug]
                                  Logging level to use.  [default: WARN]
  -f, --logfile FILE              Path to logfile (default is <stderr>).
  -L, --loop [aio|eventlet|uvloop]
                                  Event loop implementation to use.  [default:
                                  aio]
  --json                          Return output in machine-readable JSON
                                  format
  -D, --datadir DIRECTORY         Directory to keep application state.
                                  [default: {conf.name}-data]
  -W, --workdir DIRECTORY         Working directory to change to after start.
  --debug / --no-debug            Enable debugging output, and the blocking
                                  detector.  [default: no-debug]
  -q, --quiet / --no-quiet        Silence output to <stdout>/<stderr>
                                  [default: no-quiet]
  -A, --app TEXT                  Path of Faust application to use, or the
                                  name of a module.
  --version                       Show the version and exit.
  --help                          Show this message and exit.
 
Commands:
  agents          List agents.
  clean-versions  Delete old version directories.
  completion      Output shell completion to be evaluated by the shell.
  livecheck       Manage LiveCheck instances.
  model           Show model detail.
  models          List all available models as a tabulated list.
  reset           Delete local table state.
  send            Send message to agent/topic.
  tables          List available tables.
  worker          Start worker instance for given app.

Vlastní spuštění se provede příkazem:

$ python3 greeting_faust_consumer.py worker

Nejprve se zobrazí tabulka se základními informacemi o workerovi:

┌ƒaµS† v0.11.1.dev4+ga489db3b───────────────────────────────────┐
│ id          │ hello-world                                     │
│ transport   │ [URL('kafka://localhost:9092')]                 │
│ store       │ memory:                                         │
│ web         │ http://localhost:6066/                          │
│ log         │ -stderr- (warn)                                 │
│ pid         │ 1296383                                         │
│ hostname    │ ptisnovs.xxx.yyy.zzz                            │
│ platform    │ CPython 3.11.8 (Linux x86_64)                   │
│        +    │ Cython (GCC 13.2.1 20231011 (Red Hat 13.2.1-4)) │
│ drivers     │                                                 │
│   transport │ aiokafka=0.10.0                                 │
│   web       │ aiohttp=3.9.5                                   │
│ datadir     │ /tmp/ramdisk/faust/hello-world-data             │
│ appdir      │ /tmp/ramdisk/faust/hello-world-data/v1          │
└─────────────┴─────────────────────────────────────────────────┘
starting➢ 😊

Ihned po výpisu zprávy „starting“ by měl worker začít se čtením a zpracováváním zpráv, které jsou konzumovány z tématu „greetings“. Informace o každé zkonzumované zprávě je vypisována do terminálu:

[2024-04-20 18:03:39,583] [1296383] [WARNING] b'Greeting #0'
[2024-04-20 18:03:39,583] [1296383] [WARNING] b'Greeting #1'
[2024-04-20 18:03:39,584] [1296383] [WARNING] b'Greeting #2'
[2024-04-20 18:03:39,584] [1296383] [WARNING] b'Greeting #3'
[2024-04-20 18:03:39,584] [1296383] [WARNING] b'Greeting #4'
Poznámka: povšimněte si, že těla zpráv jsou reprezentována typem bytes, tj. neměnitelnou (immutable) sekvencí bajtů. Tento nedostatek samozřejmě později napravíme.

Můžeme se navíc pokusit o modifikaci nastavení úrovně logovacích zpráv, abychom získali více informací o spuštěném workerovi (workerech). Tím se přesvědčíme, jaké téma se používá atd.:

+ƒaµS† v0.11.0+-----------------------------------------------------------------------+
| id          | hello-world                                                           |
| transport   | [URL('kafka://localhost:9092')]                                       |
| store       | memory:                                                               |
| web         | http://localhost:6066/                                                |
| log         | -stderr- (info)                                                       |
| pid         | 1539476                                                               |
| hostname    | 123.123.123.123                                                       |
| platform    | CPython 3.11.8 (Linux x86_64)                                         |
|        +    | Cython (GCC 13.2.1 20231011 (Red Hat 13.2.1-4))                       |
| drivers     |                                                                       |
|   transport | aiokafka=0.10.0                                                       |
|   web       | aiohttp=3.9.1                                                         |
| datadir     | /home/ptisnovs/src/most-popular-python-libs/faust/hello-world-data    |
| appdir      | /home/ptisnovs/src/most-popular-python-libs/faust/hello-world-data/v1 |
+-------------+-----------------------------------------------------------------------+
[2024-04-24 09:16:23,967] [1539476] [INFO] [^Worker]: Starting...
[2024-04-24 09:16:23,968] [1539476] [INFO] [^-App]: Starting...
[2024-04-24 09:16:23,969] [1539476] [INFO] [^--Monitor]: Starting...
[2024-04-24 09:16:23,969] [1539476] [INFO] [^--Producer]: Starting...
[2024-04-24 09:16:23,969] [1539476] [INFO] [^---ProducerBuffer]: Starting...
[2024-04-24 09:16:23,976] [1539476] [INFO] [^--CacheBackend]: Starting...
[2024-04-24 09:16:23,976] [1539476] [INFO] [^--Web]: Starting...
[2024-04-24 09:16:23,976] [1539476] [INFO] [^---Server]: Starting...
[2024-04-24 09:16:23,977] [1539476] [INFO] [^--Consumer]: Starting...
[2024-04-24 09:16:23,978] [1539476] [INFO] [^---AIOKafkaConsumerThread]: Starting...
[2024-04-24 09:16:23,983] [1539476] [INFO] [^--LeaderAssignor]: Starting...
[2024-04-24 09:16:23,983] [1539476] [INFO] [^--Producer]: Creating topic 'hello-world-__assignor-__leader'
[2024-04-24 09:16:23,986] [1539476] [INFO] [^--ReplyConsumer]: Starting...
[2024-04-24 09:16:23,986] [1539476] [INFO] [^--AgentManager]: Starting...
[2024-04-24 09:16:23,986] [1539476] [INFO] [^---Agent: __main__.greet]: Starting...
[2024-04-24 09:16:23,987] [1539476] [INFO] [^----OneForOneSupervisor: (1@0x7fac75d2e6d0)]: Starting...
[2024-04-24 09:16:23,987] [1539476] [INFO] [^---Conductor]: Starting...
[2024-04-24 09:16:23,987] [1539476] [INFO] [^--TableManager]: Starting...
[2024-04-24 09:16:23,987] [1539476] [INFO] [^---Conductor]: Waiting for agents to start...
[2024-04-24 09:16:23,987] [1539476] [INFO] [^---Conductor]: Waiting for tables to be registered...
[2024-04-24 09:16:24,988] [1539476] [INFO] [^---Recovery]: Starting...
[2024-04-24 09:16:24,989] [1539476] [INFO] [^--Producer]: Creating topic 'hello-world-__assignor-__leader'
[2024-04-24 09:16:24,993] [1539476] [INFO] Updating subscribed topics to:
+Requested Subscription-----------+
| topic name                      |
+---------------------------------+
| greetings                       |
| hello-world-__assignor-__leader |
+---------------------------------+
[2024-04-24 09:16:24,994] [1539476] [INFO] Subscribed to topic(s):
+Final Subscription---------------+
| topic name                      |
+---------------------------------+
| greetings                       |
| hello-world-__assignor-__leader |
+---------------------------------+
[2024-04-24 09:16:25,003] [1539476] [INFO] Discovered coordinator 100 for group hello-world
[2024-04-24 09:16:25,004] [1539476] [INFO] Revoking previously assigned partitions set() for group hello-world
[2024-04-24 09:16:25,005] [1539476] [INFO] (Re-)joining group hello-world
[2024-04-24 09:16:25,011] [1539476] [INFO] Joined group 'hello-world' (generation 41) with member_id faust-0.11.0-cebf28b2-040d-41a0-a1f0-06881d9a9b0a
[2024-04-24 09:16:25,011] [1539476] [INFO] Elected group leader -- performing partition assignments using faust
[2024-04-24 09:16:25,016] [1539476] [INFO] Successfully synced group hello-world with generation 41
[2024-04-24 09:16:25,016] [1539476] [INFO] Setting newly assigned partitions
+Topic Partition Set--------------+------------+
| topic                           | partitions |
+---------------------------------+------------+
| greetings                       | {0}        |
| hello-world-__assignor-__leader | {0}        |
+---------------------------------+------------+ for group hello-world
[2024-04-24 09:16:25,017] [1539476] [INFO] Executing _on_partitions_assigned
[2024-04-24 09:16:25,018] [1539476] [INFO] generation id 41 app consumers id 41
[2024-04-24 09:16:25,020] [1539476] [INFO] [^---Recovery]: Seek stream partitions to committed offsets.
[2024-04-24 09:16:25,022] [1539476] [INFO] [^---Recovery]: Resuming flow...
[2024-04-24 09:16:25,022] [1539476] [INFO] [^---Fetcher]: Starting...
[2024-04-24 09:16:25,023] [1539476] [INFO] [^---Recovery]: Worker ready
[2024-04-24 09:16:25,023] [1539476] [INFO] [^Worker]: Ready

10. Producent posílající zprávy do většího množství témat

Mezi přednosti knihovny Faust patří především to, že lze spustit větší množství asynchronně běžících workerů. Abychom si to ukázali v praxi na co nejjednodušším příkladu, upravíme nepatrně našeho producenta zpráv takovým způsobem, aby posílal zprávy do dvou témat nazvaných „greetings“ a „real_work“. Tato úprava je snadná, protože postačuje do programové smyčky přidat další volání metod producer.send. Výsledný kód takto upraveného producenta zpráv lze nalézt na adrese https://github.com/tisnik/most-popular-python-libs/blob/master/faust/mul­ti_producer_raw.py:

#!/usr/bin/env python3
 
from kafka import KafkaProducer
from time import sleep
from json import dumps
 
server = "localhost:9092"
topic1 = "greetings"
topic2 = "real_work"
 
print("Connecting to Kafka")
producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=lambda x: dumps(x).encode("utf-8")
)
print("Connected to Kafka")
 
for i in range(1000):
    print(i)
    message = f"Greeting #{i}"
    producer.send(topic1, value=message)
    sleep(1)
    work = f"Real work #{i*2}"
    producer.send(topic2, value=work)
    work = f"Real work #{i*2+1}"
    producer.send(topic2, value=work)
    sleep(1)
Poznámka: povšimněte si, že se zprávy stále serializují stejným způsobem – z řetězců na hodnoty typu bytes.

11. Konzumace zpráv workerem z většího množství témat

Jak bude vypadat konzument zpráv (resp. konzumenti zpráv) v případě, že použijeme knihovnu Faust? Díky tomu, že každý worker je realizován asynchronní funkcí, je řešení až překvapivě snadné – každý worker bude v takovém případě definován ve vlastní funkci s dekorátorem @app.agent a každý z těchto workerů zpracovává zprávy z odlišného tématu:

@app.agent(greetings_topic)
async def greet(greetings):
    async for greeting in greetings:
        print(f"Greeter: {greeting}")
 
@app.agent(real_work_topic)
async def worker(works):
    async for work in works:
        print(f"Worker: {work}")

O spouštění, koordinaci práce atd. se postará knihovna Faust automaticky, takže tento problém vlastně vůbec nemusíme na aplikační úrovni řešit. Výsledná aplikace s workery je dostupná na adrese https://github.com/tisnik/most-popular-python-libs/blob/master/faust/gre­eting_worker_consumer_raw­.py a vypadá následovně:

import faust
 
app = faust.App(
    "hello-world",
    broker="kafka://localhost:9092",
    value_serializer="raw",
)
 
greetings_topic = app.topic("greetings")
real_work_topic = app.topic("real_work")
 
@app.agent(greetings_topic)
async def greet(greetings):
    async for greeting in greetings:
        print(f"Greeter: {greeting}")
 
@app.agent(real_work_topic)
async def worker(works):
    async for work in works:
        print(f"Worker: {work}")
 
 
if __name__ == "__main__":
    app.main()

12. Produkce zpráv ve formátu JSON

Velmi často se v praxi setkáme s tím, že zprávy jsou posílány a přijímány ve formátu JSON. To lze v praxi realizovat velmi snadno. Nejdříve si to ukažme u producenta zpráv, který není (alespoň prozatím) založen na knihovně Faust. Při inicializaci producenta pouze musíme specifikovat, jak se mají zprávy serializovat, a to s využitím funkce json.dumps:

#!/usr/bin/env python3
 
from kafka import KafkaProducer
from time import sleep
from json import dumps
 
server = "localhost:9092"
topic1 = "greetings"
topic2 = "real_work"
 
print("Connecting to Kafka")
producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=lambda x: dumps(x).encode("utf-8")
)
print("Connected to Kafka")
 
for i in range(1000):
    print(i)
    message = {"subject": "Greeting", "value": i}
    producer.send(topic1, value=message)
    sleep(1)
    work = {"subject": "Real work", "value": i*2}
    producer.send(topic2, value=work)
    work = {"subject": "Real work", "value": i*2+1}
    producer.send(topic2, value=work)
    sleep(1)
Poznámka: tímto způsobem je možné snadno serializovat prakticky jakkoli komplikovanou datovou strukturu Pythonu.

13. Worker přijímající zprávy ve formátu JSON

Úprava workera, jenž má přijímat zprávy ve formátu JSON, je ve skutečnosti ještě snadnější. Pouze musíme specifikovat parametr value_serializer při inicializaci aplikace (tedy při vytváření instance třídy faust.App). Namísto hodnoty „raw“ nyní použijeme hodnotu „json“:

import faust
 
app = faust.App(
    "hello-world",
    broker="kafka://localhost:9092",
    value_serializer="json",
)
 
greetings_topic = app.topic("greetings")
real_work_topic = app.topic("real_work")
 
@app.agent(greetings_topic)
async def greet(greetings):
    async for greeting in greetings:
        print(f"Greeter: {greeting}")
 
@app.agent(real_work_topic)
async def worker(works):
    async for work in works:
        print(f"Worker: {work}")
 
 
if __name__ == "__main__":
    app.main()

14. Využití modelů

Velká síla knihovny Faust spočívá v tom, že je možné formát přijímaných či naopak posílaných zpráv popsat pomocí modelů, což vlastně není nic jiného než deklarace třídy se specifikací názvů a typů atributů této třídy (viz například systém Pydantic). Například zprávy (v tomto případě spíše události) s hodnotami zadanými při rezervaci uživatele do nějakého systému lze popsat následujícím jednoduchým modelem:

class Registration(faust.Record):
    name: str
    surname: str
    id: int
Poznámka: povšimněte si, že se jedná o třídu odvozenou z bázové třídy faust.Record.

Vše si opět otestujeme na dvojici producent-konzument. Producent bude (prozatím) posílat zprávy ve formátu JSON, takže použijeme serializaci slovníků do JSONu. Slovník je konstruován v pomocné funkci user:

#!/usr/bin/env python3
 
from kafka import KafkaProducer
from time import sleep
from json import dumps
 
def user(name, surname, id):
    return {
            "name": name,
            "surname": surname,
            "id": id
            }
 
 
server = "localhost:9092"
topic = "registrations"
 
print("Connecting to Kafka")
producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=lambda x: dumps(x).encode("utf-8")
)
print("Connected to Kafka")
 
producer.send(topic, value=user("Eliška", "Najbrtová", 4))
producer.send(topic, value=user("Jenny", "Suk", 3))
producer.send(topic, value=user("Anička", "Šafářová", 0))
producer.send(topic, value=user("Sváťa", "Pulec", 3))
producer.send(topic, value=user("Blažej", "Motyčka", 8))
producer.send(topic, value=user("Eda", "Wasserfall", 0))
producer.send(topic, value=user("Přemysl", "Hájek", 10))
producer.flush()
 
print("Done")

Mnohem zajímavější je realizace workera, v níž využijeme model představovaný třídou Registration. Povšimněte si, že se model použije při specifikaci tématu – kromě jména tématu zadáme, že tělo zprávy bude tvořeno hodnotami, které je možné reprezentovat právě instancí třídy Registration. Příslušná deserializace bude provedena zcela automaticky:

import faust
 
app = faust.App(
    "registrations",
    broker="kafka://localhost:9092",
    #value_serializer="json",
)
 
 
class Registration(faust.Record):
    name: str
    surname: str
    id: int
 
 
registrations_topic = app.topic("registrations", key_type=str, value_type=Registration)
 
@app.agent(registrations_topic)
async def register(registrations):
    async for registration in registrations:
        print(f"Registration: {registration}")
 
 
if __name__ == "__main__":
    app.main()

15. Producent založený na knihovně Faust využívající model

Prozatím jsme pro vytvoření producenta knihovnu Faust vůbec nepoužili. Ovšem nyní, když již víme, že zprávy lze v případě potřeby popsat modelem, si můžeme ukázat, jak bude vypadat producent, který namísto knihovny kafka-python použije právě Faust. Samotný producent je opět představován workerem, který v intervalu přibližně pěti sekund pošle do tématu registrations serializované instance třídy Registration. Povšimněte si, že se zde již nemusíme starat o způsob serializace (zda se jedná o JSON atd.), protože postačuje, aby se obě strany (tedy producent i konzument) pouze shodly na jménu tématu a modelu:

import faust
 
app = faust.App(
    "registrations",
    broker="kafka://localhost:9092",
)
 
 
class Registration(faust.Record):
    name: str
    surname: str
    id: int
 
 
registrations_topic = app.topic("registrations", key_type=str, value_type=Registration)
 
@app.timer(interval=5.0)
async def example_sender(app):
    await registrations_topic.send(
        value=Registration("Eliška", "Najbrtová", 4))
    await registrations_topic.send(
        value=Registration("Jenny", "Suk", 3))
    await registrations_topic.send(
        value=Registration("Anička", "Šafářová", 0))
    await registrations_topic.send(
        value=Registration("Sváťa", "Pulec", 3))
    await registrations_topic.send(
        value=Registration("Blažej", "Motyčka", 8))
    await registrations_topic.send(
        value=Registration("Eda", "Wasserfall", 0))
    await registrations_topic.send(
        value=Registration("Přemysl", "Hájek", 10))
 
 
if __name__ == "__main__":
    app.main()
Poznámka: povšimněte si, že se opět jedná o plně asynchronní kód.

16. Kombinace producenta a konzumenta – reálná síla knihovny Faust

Nyní se již dostáváme k vytvoření jednoduchého stream processoru realizovaného kompletně v knihovně Faust. Tento procesor kombinovat producenta z konzumentem v jediné aplikaci. Později dodáme i transformaci zpráv:

import faust
 
app = faust.App(
    "registrations",
    broker="kafka://localhost:9092",
)
 
 
class Registration(faust.Record):
    name: str
    surname: str
    id: int
 
 
registrations_topic = app.topic("registrations", key_type=str, value_type=Registration)
 
@app.timer(interval=5.0)
async def example_sender(app):
    await registrations_topic.send(
        value=Registration("Eliška", "Najbrtová", 4))
    await registrations_topic.send(
        value=Registration("Jenny", "Suk", 3))
    await registrations_topic.send(
        value=Registration("Anička", "Šafářová", 0))
    await registrations_topic.send(
        value=Registration("Sváťa", "Pulec", 3))
    await registrations_topic.send(
        value=Registration("Blažej", "Motyčka", 8))
    await registrations_topic.send(
        value=Registration("Eda", "Wasserfall", 0))
    await registrations_topic.send(
        value=Registration("Přemysl", "Hájek", 10))
 
 
@app.agent(registrations_topic)
async def register(registrations):
    async for registration in registrations:
        print(f"Registration: {registration}")
 
 
if __name__ == "__main__":
    app.main()
Poznámka: na tomto místě je asi zřejmé, jak bude vypadat skutečná transformace. Worker přijímající zprávy, tj. funkce s dekorátorem @app.agent bude posílat výsledky pomocí await topic.send() do „výstupního“ tématu. Právě takto jednoduše, vlastně jen základními funkcemi, se může realizovat celý stream processing.

17. Spuštění kombinace workerů s producenty a konzumenty

Nyní nám již zbývá maličkost, a to spuštění výše uvedené aplikace, v níž běží asynchronně worker produkující data a současně i worker, který tato data naopak konzumuje. Spuštění se opět provede nám již známým způsobem:

$ python3 registration_consumer_producer.py worker

Nejprve by se měly zobrazit informace o právě inicializované aplikaci:

┌ƒaµS† v0.11.1.dev4+ga489db3b───────────────────────────────────┐
│ id          │ registrations                                   │
│ transport   │ [URL('kafka://localhost:9092')]                 │
│ store       │ memory:                                         │
│ web         │ http://localhost:6066/                          │
│ log         │ -stderr- (warn)                                 │
│ pid         │ 1508601                                         │
│ hostname    │ ptisnovs.xxx.yyy.zzz                            │
│ platform    │ CPython 3.11.8 (Linux x86_64)                   │
│        +    │ Cython (GCC 13.2.1 20231011 (Red Hat 13.2.1-4)) │
│ drivers     │                                                 │
│   transport │ aiokafka=0.10.0                                 │
│   web       │ aiohttp=3.9.5                                   │
│ datadir     │ /tmp/ramdisk/faust/registrations-data           │
│ appdir      │ /tmp/ramdisk/faust/registrations-data/v1        │
└─────────────┴─────────────────────────────────────────────────┘
starting➢

A posléze by se měly v logu objevit informace o zkonzumovaných (přijatých) zprávách:

bitcoin_skoleni

[2024-04-24 16:13:00,702] [1576077] [WARNING] Registration: <Registration: name='Eliška', surname='Najbrtová', id=4>
[2024-04-24 16:13:00,703] [1576077] [WARNING] Registration: <Registration: name='Jenny', surname='Suk', id=3>
[2024-04-24 16:13:00,704] [1576077] [WARNING] Registration: <Registration: name='Anička', surname='Šafářová', id=0>
[2024-04-24 16:13:00,704] [1576077] [WARNING] Registration: <Registration: name='Sváťa', surname='Pulec', id=3>
[2024-04-24 16:13:00,704] [1576077] [WARNING] Registration: <Registration: name='Blažej', surname='Motyčka', id=8>
[2024-04-24 16:13:00,705] [1576077] [WARNING] Registration: <Registration: name='Eda', surname='Wasserfall', id=0>
[2024-04-24 16:13:00,705] [1576077] [WARNING] Registration: <Registration: name='Přemysl', surname='Hájek', id=10>

18. Obsah navazujícího článku

V navazujícím článku popis možností knihovny Faust dokončíme. Ukážeme si zejména práci s „okny“ (windows), které umožňují prohlížet a zpracovávat sekvenci po sobě jdoucích zpráv. A taktéž si řekneme, jakým způsobem se využívají lokální tabulky, které jsou v některých případech užitečné pro meziuložení stavu celého systému (persistenci) realizujícího a řídícího tok zpráv.

19. Repositář s demonstračními příklady

Zdrojové kódy všech prozatím popsaných demonstračních příkladů určených pro programovací jazyk Python 3 byly uloženy do Git repositáře dostupného na adrese https://github.com/tisnik/most-popular-python-libs:

# Demonstrační příklad Stručný popis příkladu Cesta
1 greeting_producer.py klasický producent zpráv vytvořený bez použití knihovny Faust (používá knihovnu Kafka-python-ng) https://github.com/tisnik/most-popular-python-libs/blob/master/faust/gre­eting_producer.py
2 greeting_consumer.py klasický konzument zpráv vytvořený bez použití knihovny Faust (používá knihovnu Kafka-python-ng) https://github.com/tisnik/most-popular-python-libs/blob/master/faust/gre­eting_consumer.py
       
3 greeting_faust_consumer.py worker definovaný s využitím knihovny Faust https://github.com/tisnik/most-popular-python-libs/blob/master/faust/gre­eting_faust_consumer.py
       
4 multi_producer_raw.py producent zpráv do většího množství témat, zprávy jsou posílány jako řetězce https://github.com/tisnik/most-popular-python-libs/blob/master/faust/mul­ti_producer_raw.py
5 greeting_worker_consumer_raw.py dvojice workerů definovaných s využitím knihovny Faust pro zprávy posílané jako sekvence bajtů https://github.com/tisnik/most-popular-python-libs/blob/master/faust/gre­eting_worker_consumer_raw­.py
6 multi_producer_json.py producent zpráv do většího množství témat, zprávy jsou serializovány do formátu JSON https://github.com/tisnik/most-popular-python-libs/blob/master/faust/mul­ti_producer_json.py
7 greeting_worker_consumer_json.py dvojice workerů definovaných s využitím knihovny Faust pro zprávy ve formátu JSON https://github.com/tisnik/most-popular-python-libs/blob/master/faust/gre­eting_worker_consumer_json­.py
       
8 registration_producer.py producent zpráv ve formátu JSON obsahujících atributy objektů (používá knihovnu Kafka-python-ng) https://github.com/tisnik/most-popular-python-libs/blob/master/faust/re­gistration_producer.py
9 registration_consumer.py konzument zpráv založený na knihovně Faust (pracuje s modelem) https://github.com/tisnik/most-popular-python-libs/blob/master/faust/re­gistration_consumer.py
10 registration_producer_faust.py producent zpráv založený na knihovně Faust (pracuje s modelem) https://github.com/tisnik/most-popular-python-libs/blob/master/faust/re­gistration_producer_faust­.py
11 registration_consumer_producer.py tok dat od producenta ke konzumentovi, založeno na knihovně Faust https://github.com/tisnik/most-popular-python-libs/blob/master/faust/re­gistration_consumer_produ­cer.py

20. Odkazy na Internetu

  1. Faust – Python Stream Processing
    https://faust-streaming.github.io/faust/
  2. Knihovna Faust na GitHubu
    https://github.com/faust-streaming/faust
  3. faust 1.10.4 na Pypi
    https://pypi.org/project/faust/
  4. Introduction to Kafka Stream Processing in Python using Faust (video)
    https://www.youtube.com/wat­ch?v=Nt96udaC5Zk
  5. Windowing in Kafka Streams using Faust Framework in Python | Tumbling Window (video)
    https://www.youtube.com/wat­ch?v=ZlBXg9Kp8vE
  6. Processing Complex Type & Producing Messages to Topic using Faust
    https://www.youtube.com/wat­ch?v=DpTJYtiYK9w
  7. Stream Processing with Python, Kafka & Faust
    https://towardsdatascience.com/stream-processing-with-python-kafka-faust-a11740d0910c
  8. ETL Batch Processing With Kafka?
    https://medium.com/swlh/etl-batch-processing-with-kafka-7f66f843e20d
  9. ETL with Kafka
    https://blog.codecentric.de/en/2018/03/e­tl-kafka/
  10. Building ETL Pipelines with Clojure and Transducers
    https://www.grammarly.com/blog/en­gineering/building-etl-pipelines-with-clojure-and-transducers/
  11. pipeline (možné použít pro ETL)
    https://clojuredocs.org/clo­jure.core.async/pipeline
  12. On Track with Apache Kafka – Building a Streaming ETL Solution with Rail Data
    https://www.confluent.io/blog/build-streaming-etl-solutions-with-kafka-and-rail-data/
  13. Kafka – Understanding Offset Commits
    https://www.logicbig.com/tu­torials/misc/kafka/commit­ting-offsets.html
  14. fundingcircle/jackdaw (na Clojars)
    https://clojars.org/fundin­gcircle/jackdaw/versions/0­.7.6
  15. Dokumentace ke knihovně jackdaw
    https://cljdoc.org/d/fundin­gcircle/jackdaw/0.7.6/doc/re­adme
  16. Jackdaw AdminClient API
    https://cljdoc.org/d/fundin­gcircle/jackdaw/0.7.6/doc/jac­kdaw-adminclient-api
  17. Jackdaw Client API
    https://cljdoc.org/d/fundin­gcircle/jackdaw/0.7.6/doc/jac­kdaw-client-api
  18. Kafka.clj
    https://github.com/helins-io/kafka.clj
  19. Kafka mirroring (MirrorMaker)
    https://cwiki.apache.org/con­fluence/pages/viewpage.ac­tion?pageId=27846330
  20. Mastering Kafka migration with MirrorMaker 2
    https://developers.redhat­.com/articles/2024/01/04/mas­tering-kafka-migration-mirrormaker-2
  21. Apache Kafka MirrorMaker 2 (MM2) Part 1: Theory
    https://www.instaclustr.com/blog/kafka-mirrormaker-2-theory/#h-2-replication-in-kafka
  22. Apache Kafka MirrorMaker 2 (MM2) Part 2: Practice
    https://www.instaclustr.com/blog/a­pache-kafka-mirrormaker-2-practice/
  23. Demystifying Kafka MirrorMaker 2: Use cases and architecture
    https://developers.redhat­.com/articles/2023/11/13/de­mystifying-kafka-mirrormaker-2-use-cases-and-architecture#
  24. How to use Kafka MirrorMaker 2.0 in data migration, replication and the use-cases
    https://learn.microsoft.com/en-us/azure/hdinsight/kafka/kafka-mirrormaker-2–0-guide
  25. Release Notes – Kafka – Version 2.4.0
    https://archive.apache.or­g/dist/kafka/2.4.0/RELEASE_NO­TES.html
  26. Kafka Mirror Maker Best Practices
    https://community.cloudera­.com/t5/Community-Articles/Kafka-Mirror-Maker-Best-Practices/ta-p/249269
  27. Apache Kafka MirrorMaker 2 (MM2) Part 1: Theory
    https://www.instaclustr.com/blog/kafka-mirrormaker-2-theory/
  28. Kcli: is a kafka read only command line browser.
    https://github.com/cswank/kcli
  29. Kcli: a kafka command line browser
    https://go.libhunt.com/kcli-alternatives
  30. Kafka Connect and Schemas
    https://rmoff.net/2020/01/22/kafka-connect-and-schemas/
  31. JSON and schemas
    https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/#json-schemas
  32. What, why, when to use Apache Kafka, with an example
    https://www.startdataengi­neering.com/post/what-why-and-how-apache-kafka/
  33. When NOT to use Apache Kafka?
    https://www.kai-waehner.de/blog/2022/01/04/when-not-to-use-apache-kafka/
  34. Microservices: The Rise Of Kafka
    https://movio.co/blog/microservices-rise-kafka/
  35. Building a Microservices Ecosystem with Kafka Streams and KSQL
    https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/
  36. An introduction to Apache Kafka and microservices communication
    https://medium.com/@ulymarins/an-introduction-to-apache-kafka-and-microservices-communication-bf0a0966d63
  37. kappa-architecture.com
    http://milinda.pathirage.org/kappa-architecture.com/
  38. Questioning the Lambda Architecture
    https://www.oreilly.com/i­deas/questioning-the-lambda-architecture
  39. Lambda architecture
    https://en.wikipedia.org/wi­ki/Lambda_architecture
  40. Kafka – ecosystem (Wiki)
    https://cwiki.apache.org/con­fluence/display/KAFKA/Eco­system
  41. The Kafka Ecosystem – Kafka Core, Kafka Streams, Kafka Connect, Kafka REST Proxy, and the Schema Registry
    http://cloudurable.com/blog/kafka-ecosystem/index.html
  42. A Kafka Operator for Kubernetes
    https://github.com/krallistic/kafka-operator
  43. Kafka Streams
    https://cwiki.apache.org/con­fluence/display/KAFKA/Kaf­ka+Streams
  44. Kafka Streams
    http://kafka.apache.org/do­cumentation/streams/
  45. Kafka Streams (FAQ)
    https://cwiki.apache.org/con­fluence/display/KAFKA/FAQ#FAQ-Streams
  46. Event stream processing
    https://en.wikipedia.org/wi­ki/Event_stream_processing
  47. Part 1: Apache Kafka for beginners – What is Apache Kafka?
    https://www.cloudkarafka.com/blog/2016–11–30-part1-kafka-for-beginners-what-is-apache-kafka.html
  48. What are some alternatives to Apache Kafka?
    https://www.quora.com/What-are-some-alternatives-to-Apache-Kafka
  49. What is the best alternative to Kafka?
    https://www.slant.co/opti­ons/961/alternatives/~kaf­ka-alternatives
  50. A super quick comparison between Kafka and Message Queues
    https://hackernoon.com/a-super-quick-comparison-between-kafka-and-message-queues-e69742d855a8?gi=e965191e72d0
  51. Kafka Queuing: Kafka as a Messaging System
    https://dzone.com/articles/kafka-queuing-kafka-as-a-messaging-system
  52. Apache Kafka Logs: A Comprehensive Guide
    https://hevodata.com/learn/apache-kafka-logs-a-comprehensive-guide/
  53. Microservices – Not a free lunch!
    http://highscalability.com/blog/2014/4/8/mi­croservices-not-a-free-lunch.html
  54. Microservices, Monoliths, and NoOps
    http://blog.arungupta.me/microservices-monoliths-noops/
  55. Microservice Design Patterns
    http://blog.arungupta.me/microservice-design-patterns/
  56. REST vs Messaging for Microservices – Which One is Best?
    https://solace.com/blog/experience-awesomeness-event-driven-microservices/
  57. Kappa Architecture Our Experience
    https://events.static.linux­found.org/sites/events/fi­les/slides/ASPgems%20-%20Kappa%20Architecture.pdf
  58. Apache Kafka Streams and Tables, the stream-table duality
    https://towardsdatascience.com/apache-kafka-streams-and-tables-the-stream-table-duality-ee904251a7e?gi=f22a29cd1854
  59. Configure Self-Managed Connectors
    https://docs.confluent.io/kafka-connectors/self-managed/configuring.html#configure-self-managed-connectors
  60. Schema Evolution and Compatibility
    https://docs.confluent.io/plat­form/current/schema-registry/avro.html#schema-evolution-and-compatibility
  61. Configuring Key and Value Converters
    https://docs.confluent.io/kafka-connectors/self-managed/userguide.html#configuring-key-and-value-converters
  62. Introduction to Kafka Connectors
    https://www.baeldung.com/kafka-connectors-guide
  63. Kafka CLI: command to list all consumer groups for a topic?
    https://stackoverflow.com/qu­estions/63883999/kafka-cli-command-to-list-all-consumer-groups-for-a-topic
  64. Java Property File Processing
    https://www.w3resource.com/java-tutorial/java-propertyfile-processing.php
  65. Skipping bad records with the Kafka Connect JDBC sink connector
    https://rmoff.net/2019/10/15/skipping-bad-records-with-the-kafka-connect-jdbc-sink-connector/
  66. Kafka Connect Deep Dive – Error Handling and Dead Letter Queues
    https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/
  67. Errors and Dead Letter Queues
    https://developer.confluent.io/learn-kafka/kafka-connect/error-handling-and-dead-letter-queues/
  68. Confluent Cloud Dead Letter Queue
    https://docs.confluent.io/clou­d/current/connectors/dead-letter-queue.html
  69. Dead Letter Queues (DLQs) in Kafka
    https://medium.com/@sannidhi.s.t/dead-letter-queues-dlqs-in-kafka-afb4b6835309
  70. Deserializer
    https://docs.confluent.io/plat­form/current/schema-registry/serdes-develop/serdes-json.html#json-schema-serializer-and-deserializer
  71. JSON, Kafka, and the need for schema
    https://mikemybytes.com/2022/07/11/json-kafka-and-the-need-for-schema/
  72. Using Kafka Connect with Schema Registry
    https://docs.confluent.io/plat­form/current/schema-registry/connect.html
  73. Zpracování dat reprezentovaných ve formátu JSON nástrojem jq
    https://www.root.cz/clanky/zpracovani-dat-reprezentovanych-ve-formatu-json-nastrojem-jq/
  74. Repositář projektu jq (GitHub)
    https://github.com/stedolan/jq
  75. GitHub stránky projektu jq
    https://stedolan.github.io/jq/
  76. 5 modern alternatives to essential Linux command-line tools
    https://opensource.com/ar­ticle/20/6/modern-linux-command-line-tools
  77. Návod k nástroji jq
    https://stedolan.github.i­o/jq/tutorial/
  78. jq Manual (development version)
    https://stedolan.github.io/jq/manual/
  79. Introducing JSON
    https://www.json.org/json-en.html
  80. Understanding JSON schema
    https://json-schema.org/understanding-json-schema/index.html
  81. JDBC Sink Connector for Confluent Platform
    https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/overview.html#jdbc-sink-connector-for-cp
  82. JDBC Connector (Source and Sink)
    https://www.confluent.io/hub/con­fluentinc/kafka-connect-jdbc
  83. Introduction to Schema Registry in Kafka
    https://medium.com/slalom-technology/introduction-to-schema-registry-in-kafka-915ccf06b902
  84. Understanding JSON Schema Compatibility
    https://yokota.blog/2021/03/29/un­derstanding-json-schema-compatibility/
Seriál: Message brokery

Autor článku

Vystudoval VUT FIT a v současné době pracuje na projektech vytvářených v jazycích Python a Go.