Obsah
1. Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw (2. část)
2. Jednoduchý producent a konzument zpráv naprogramovaný v Clojure
3. Specifikace, od jakého offsetu se budou zprávy číst
4. Malá odbočka: využití nástroje kafkacat v roli producenta nebo konzumenta zpráv
5. Instalace nástroje kafkacat
6. Praktické ukázky použití kafkacat v roli producenta i konzumenta zpráv
7. Serializace a deserializace zpráv – serdes
8. Použití formátu EDN (Extensible Data Notation)
9. Ukázka skutečného formátu zpráv uložených do tématu
10. Formát EDN bez konců řádků
11. Ukázka skutečného formátu zpráv uložených do tématu
13. Ukázka skutečného formátu zpráv uložených do tématu
14. Základy pokročilého streamingu – kolona (pipe)
15. Kolona s explicitním kódem konzumenta i producenta
17. Lepší řešení – využití Streams API
18. Repositář s demonstračními příklady
19. Odkazy na předchozí části seriálu o programovacím jazyku Clojure
1. Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw (2. část)
Posílání a konzumace zpráv ve streamovací platformě Apache Kafka není ve skutečnosti tak přímočará, jak by se možná mohlo z demonstračních příkladů uvedených v úvodním článku o kombinaci jazyka Clojure a Apache Kafky zdát. Z tohoto důvodu se tímto tématem budeme zabývat i dnes. Ovšem mnohem důležitější je skutečný „streaming“, který knihovna Jackdaw taktéž podporuje. Tímto tématem se tedy budeme taktéž zabývat. Uvidíme, že Jackdaw nám umožňuje zápis „funkcionální“ pipeliny (kolony), která může vypadat například následovně:
(-> (j/kstream builder (:input topic-config)) (j/peek (fn [[k v]] (println "Received message with key: " k " and value:" v))) (j/map etl) (j/peek (fn [[k v]] (println "Transformed message with key:" k " and value:" v))) (j/to (:output topic-config)))
(defn etl [[message-key message-value]] [message-key {:result (+ (:x message-value) (:y message-value))}])
2. Jednoduchý producent a konzument zpráv naprogramovaný v Clojure
V této kapitole si zopakujeme dva demonstrační příklady, které byly popsány minule a nad kterými budeme v navazujících kapitolách stavět příklady nepatrně složitější. V prvním příkladu je realizován jednoduchý producent deseti zpráv. Zpráva je poslána do tématu nazvaného „test3“ a pokud toto téma neexistuje, bude automaticky vytvořeno. Jak text zprávy, tak i její klíč mohou obsahovat libovolné řetězce, pochopitelně včetně řetězců prázdných. Příklad využívá pouze funkce a makra ze jmenného prostoru jackdaw.client:
(ns produce-messages-3.core (:require [jackdaw.client :as jc] [clojure.pprint :as pp])) (def producer-config {"bootstrap.servers" "localhost:9092" "key.serializer" "org.apache.kafka.common.serialization.StringSerializer" "value.serializer" "org.apache.kafka.common.serialization.StringSerializer" "acks" "all" "client.id" "foo"}) (defn -main [& args] (with-open [producer (jc/producer producer-config)] (doseq [i (range 1 101)] (let [key (str i) value (str "Message #" i)] (println "Publishing message with key '" key "' and value '" value "'") (let [record-metadata (jc/produce! producer {:topic-name "test3"} key value)] (pp/pprint @record-metadata))) )))
Druhý demonstrační příklad představuje konzumenta zpráv, který je postaven nad funkcemi a makry ze jmenných prostorů jackdaw.client a jackdaw.client.log. Funkce ze druhého jmenného prostoru nám umožňuje pracovat s příchozími zprávami jako s nekonečnou línou sekvencí (infinite lazy sequence), což vede v jazyce Clojure k tvorbě idiomatického kódu:
(ns consume-messages-1.core (:require [jackdaw.client :as jc] [jackdaw.client.log :as jl] [clojure.pprint :as pp])) (def consumer-config {"bootstrap.servers" "localhost:9092" "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" "group.id" "group-A"}) (defn -main [& args] (with-open [consumer (-> (jc/consumer consumer-config) (jc/subscribe [{:topic-name "test3"}]))] (doseq [{:keys [key value partition timestamp offset]} (jl/log consumer 10)] (println "key: " key) (println "value: " value) (println "partition: " partition) (println "timestamp: " timestamp) (println "offset: " offset))))
# | Funkce pro konzumaci zpráv |
---|---|
1 | jackdaw.client.log/log-until |
2 | jackdaw.client.log/log-until-inactivity |
3. Specifikace, od jakého offsetu se budou zprávy číst
Při připojení konzumenta do zvolené skupiny konzumentů (viz konfigurační volbu group.id) se musí zjistit, od jakého offsetu (v rámci skupiny) se mají zprávy číst. Pokud tento offset není z nějakého důvodu nastaven (nové téma, vymazaná data atd.), lze volbou auto.offset.reset zvolit požadované chování, které je v našich konfiguračních příkladech součástí mapy consumer-config:
# | Hodnota auto.offset.reset | Stručný popis |
---|---|---|
1 | latest | automaticky nastaví offset na index posledního záznamu |
2 | earliest | automaticky nastaví offset na index prvního záznamu |
3 | nic | vyhodí výjimku, pokud není offset pro skupinu nastaven |
Příklad specifikace volby offsetu ve chvíli, kdy není skutečný offset z nějakého důvodu dostupný:
(def consumer-config { ... ... "auto.offset.reset" "earliest" ...})
Úprava konzumenta zpráv je v tomto případě jednoduchá a může vypadat následovně:
(ns consume-messages-2.core (:require [jackdaw.client :as jc] [jackdaw.client.log :as jl] [clojure.pprint :as pp])) (def consumer-config {"bootstrap.servers" "localhost:9092" "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" "auto.offset.reset" "earliest" "group.id" "group-A"}) (defn -main "I don't do a whole lot ... yet." [& args] (with-open [consumer (-> (jc/consumer consumer-config) (jc/subscribe [{:topic-name "test1"}]))] (doseq [{:keys [key value partition timestamp offset]} (jl/log consumer 10)] (println "key: " key) (println "value: " value) (println "partition: " partition) (println "timestamp: " timestamp) (println "offset: " offset))))
# | Název volby |
---|---|
1 | enable.auto.commit |
2 | auto.commit.interval.ms |
3 | offset.flush.interval.ms |
4 | offset.flush.timeout.ms |
4. Malá odbočka: využití nástroje kafkacat v roli producenta nebo konzumenta zpráv
Součástí ekosystému vytvořeného okolo Apache Kafky je i mnohdy velmi užitečný nástroj nazvaný kafkacat (autoři ho taktéž označují jako „netcat for Kafka“, v kontextu REST API by se hodilo i „curl for Kafka“). Tento nástroj, který naleznete na adrese https://github.com/edenhill/kafkacat slouží pro komunikaci s brokery přímo z příkazové řádky. Pochopitelně se s velkou pravděpodobností nebude jednat o řešení používané v produkčním kódu, ovšem možnost vytvořit producenta zpráv či jejich konzumenta přímo z CLI je použitelná jak při vývoji, tak i při řešení problémů, které mohou při běhu aplikace nastat. Tento nástroj budeme používat v dalších kapitolách při komunikaci s producenty či konzumenty naprogramovanými v Clojure, takže se v této kapitole krátce zmiňme o několika příkladech použití převzatých z oficiální dokumentace. Všechny ukázky předpokládají, že broker běží na lokálním počítači na portu 9092 a nepoužívá SSL resp. TLS.
Výpis informací o všech tématech a jejich konfigurace se provede následovně:
$ kafkacat -L -b localhost:9092
Spuštění nového producenta zpráv čtených ze souborů specifikovaných na příkazové řádce:
$ kafkacat -P -b localhost:9092 -t filedrop -p 0 file1.bin file2.txt /etc/motd dalsi_soubor.tgz
Přečtení posledních 1000 zpráv z tématu „téma1“. Po této operaci se konzument automaticky ukončí, tj. nebude čekat na další zprávy:
$ kafkacat -C -b localhost:9092 -t tema1 -p 0 -o -1000 -e
Spuštění konzumentů, kteří jsou přihlášení k tématu „téma1“:
$ kafkacat -b localhost:9092 -G skupina_konzumentů téma1
Přihlásit se lze i k odběru většího množství témat:
$ kafkacat -b localhost:9092 -G skupina_konzumentů téma1 téma2
5. Instalace nástroje kafkacat
Nástroj kafkacat se skládá z několika komponent, které jsou většinou naprogramovány v jazyku C popř. v C++. Překlad a slinkování lze provést takovým způsobem, že výsledkem bude jediný spustitelný soubor nazvaný též „kafkacat“, který bude obsahovat i všechny potřebné (tedy staticky slinkované) knihovny, což zjednodušuje nasazení této utility. Na druhou stranu však nebude možné použít běžné prostředky operačního systému při updatu knihoven, například při opravách CVE atd.
Napřed se provede naklonování repositáře:
$ git clone git@github.com:edenhill/kafkacat.git
po přesunu do naklonovaného repositáře:
$ cd kafkacat
se překlad provede běžnou trojkombinací skriptu configure a příkazů make+make install:
$ ./configure $ make $ sudo make install
Alternativně je ovšem možné použít připravený skript bootstrap.sh, který zajistí stažení všech potřebných knihoven (crypto atd.) s jejich následným překladem:
$ ./bootstrap.sh
Výsledkem je potom skutečně „tlustý“ binární soubor:
$ ls -l ~/bin/kafkacat -rwxrwxr-x. 1 ptisnovs ptisnovs 20987784 17. led 14.34 /home/ptisnovs/bin/kafkacat
Některé linuxové distribuce obsahují přímo ve svých repositářích balíček kafkacat, což samozřejmě celý proces instalace (a případných updatů) značně zjednodušuje. Například na systémech založených na Debianu postačuje použít:
$ apt-get install kafkacat
$ kafkacat Error: -b <broker,..> missing Usage: kafkacat [file1 file2 .. | topic1 topic2 ..]] kafkacat - Apache Kafka producer and consumer tool https://github.com/edenhill/kafkacat Copyright (c) 2014-2019, Magnus Edenhill Version 1.5.0-4-ge461fb (JSON, Avro, librdkafka 1.1.0 builtin.features=gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer) ... ... ...
6. Praktické ukázky použití kafkacat v roli producenta i konzumenta zpráv
Nástroj kafkacat je možné použít společně s producenty a konzumenty, s nimiž se setkáme v navazujících kapitolách. Ve všech příkladech v této kapitole budeme používat téma (topic) nazvaný „upload“.
Konzument zpráv posílaných do tématu „upload“:
$ kafkacat -C -b localhost:9092 -t "upload"
Producent zpráv zapisovaných na standardní vstup uživatelem (co zpráva, to jeden řádek):
$ kafkacat -P -b localhost:9092 -t "upload"
Dtto, ale u každé zprávy lze specifikovat i klíč oddělený od těla zprávy dvojtečkou:
$ kafkacat -P -b localhost:9092 -t "upload" -K:
Producent deseti zpráv s různými hodnotami (vlastním tělem zprávy):
$ for i in {1..10}; do echo "Hello ${i}" | kafkacat -P -b localhost:9092 -t "upload"; done;
7. Serializace a deserializace zpráv – serdes
V samotném systému Apache Kafka se klíče i hodnoty (text) zpráv skládají z libovolné sekvence bajtů, kterým není v Kafce přikládán žádný význam (jedná se tedy o data a nikoli o informace). Ostatně tento způsob nakládání se zprávami je jedním z důvodů, proč Apache Kafka nabízí tak velký výkon při ukládání či naopak čtení zpráv. Ovšem aplikace, které zprávy vytváří či naopak konzumují, se zprávami pracují odlišným způsobem – zprávy představují určité informace reprezentované například pomocí složitějších datových struktur – map, záznamů, polí, řetězců atd. Pro převod informací do sekvence bajtů (na straně producenta) se používají serializátory a pro zpětný převod (na straně konzumenta) se používají deserializátory. V systému Apache Kafka se z tohoto důvodu poměrně často setkáme s označením serializer/deserializer, popř. se zkratkou, která z tohoto sousloví vznikla – serdes.
Serializaci i deserializaci klíčů popř. textů zpráv můžeme v programech provádět explicitně, což je ovšem zbytečně nepřehledné. Častěji se setkáme s tím, že se pouze na straně producenta i konzumenta specifikuje, který serde se používá a o serializaci či deserializaci se postará automaticky příslušná část knihovny Jackdaw. V dalších kapitolách si ukážeme serializaci do formátů EDN a JSON, přičemž druhý zmíněný formát má sice mnoho omezení, ovšem je stále velmi často používaný (a bude tomu tak i v budoucnu).
8. Použití formátu EDN (Extensible Data Notation)
Nejprve si ukažme použití formátu EDN, který je odvozen od syntaxe i sémantiky jazyka Clojure, ovšem podporován je i v dalších jazycích, například v Pythonu. Formát EDN neboli Extensible Data Notation vychází ze syntaxe a sémantiky programovacího jazyka Clojure, je tedy založen na S-výrazech rozšířených o možnost zápisu map (slovníků) a vektorů. Formát EDN je rozšířen primárně v ekosystému jazyka Clojure. Popis tohoto formátu (a tím pádem i popis syntaxe Clojure) naleznete na stránce https://github.com/edn-format/edn.
Serializátor a deserializátor EDN je implementován ve jmenném prostoru jackdaw.serdes.edn:
(require '[jackdaw.serdes.edn])
Použití EDN lze specifikovat již při vytváření nového tématu v mapě s konfigurací nově vytvářeného tématu:
{:topic-name topic-name :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.edn/serde) :value-serde (jackdaw.serdes.edn/serde) :topic-config {"cleanup.policy" "compact"}}]
Specifikovat lze serdes i při konstrukci producenta popř. konzumenta zpráv. U producenta se jedná o druhý nepovinný parametr konstruktoru jackdaw.client/producer:
(let [producer-config (merge broker-config {"acks" "all" "client.id" "foo"}) ; specifikace zpusobu serializace klicu i obsahu zpravy producer-serde-config {:key-serde (jackdaw.serdes.edn/serde) :value-serde (jackdaw.serdes.edn/serde)}] ;; poslani zvoleneho poctu zprav se serializaci klice i hodnoty (with-open [producer (jc/producer producer-config producer-serde-config)]
Při samotném poslání zprávy již není žádná další serializace zapotřebí – mapa obsahující další typy hodnot se automaticky převede do formátu EDN a uloží do tématu v Apache Kafce:
(let [topic {:topic-name topic-name} ; posilany klic message-key {:n i :foo "foo"} ; posilany obsah zpravy message-value {:bar "bar" :value i :recip (/ 1 (inc i)) :factorial (reduce * (range 1M (inc i))) :values (range i) }] (jc/produce! producer topic message-key message-value))
9. Ukázka skutečného formátu zpráv uložených do tématu
Ukažme si nyní, jak vypadá formát zpráv ukládaných do tématu ve chvíli, kdy je použit serializátor klíčů i textů do formátu EDN. Demonstrační příklad, který zprávy v tomto formátu vytvoří, vypadá následovně a je dostupný na adrese https://github.com/tisnik/message-queues-examples/blob/master/kafka/clojure/edn-1-serializer:
(ns edn-1-serializer.core (:require [jackdaw.admin :as ja] [jackdaw.client :as jc] [clojure.pprint :as pp])) (require '[jackdaw.serdes.edn]) (defn delete-topic [broker-config topic-name] (try (let [client (ja/->AdminClient broker-config)] (ja/delete-topics! client [{:topic-name topic-name}])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn new-topic [broker-config topic-name] (try (let [client (ja/->AdminClient broker-config) topic-config {:topic-name topic-name :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.edn/serde) :value-serde (jackdaw.serdes.edn/serde) :topic-config {"cleanup.policy" "compact"}}] (ja/create-topics! client [topic-config])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn produce-messages [broker-config topic-name messages] ;; konfigurace producenta zprav ve formatu EDN (let [producer-config (merge broker-config {"acks" "all" "client.id" "foo"}) ; specifikace zpusobu serializace klicu i obsahu zpravy producer-serde-config {:key-serde (jackdaw.serdes.edn/serde) :value-serde (jackdaw.serdes.edn/serde)}] ;; poslani zvoleneho poctu zprav se serializaci klice i hodnoty (with-open [producer (jc/producer producer-config producer-serde-config)] (doseq [i (range 0 messages)] (let [topic {:topic-name topic-name} ; posilany klic message-key {:n i :foo "foo"} ; posilany obsah zpravy message-value {:bar "bar" :value i :recip (/ 1 (inc i)) :factorial (reduce * (range 1M (inc i))) :values (range i) } record-metadata (jc/produce! producer topic message-key message-value)] (pp/pprint @record-metadata)))))) (defn -main [& args] (let [broker-config {"bootstrap.servers" "localhost:9092"} topic-name "edn"] ;; na zacatku pro jistotu vymazeme tema "edn" (delete-topic broker-config topic-name) ;; vytvoreni noveho tematu akceptujiciho zpravy ve formatu EDN (new-topic broker-config topic-name) ;; poslani zvoleneho poctu zprav se serializaci klice i hodnoty (produce-messages broker-config topic-name 100)))
Po spuštění tohoto demonstračního příkladu příkazem:
$ lein run
…se vytvoří nové téma nazvané „edn“ (stávající téma je vymazáno, pokud ovšem existovalo) a do tohoto tématu se uloží celkem sto zpráv. Povšimněte si, jak postupně roste velikost textů zpráv (to kvůli ukládání hodnoty faktoriálu a sekvence/pole číselných hodnot):
{:topic-name "edn", :partition 0, :timestamp 1607090709548, :offset 0, :serialized-key-size 19, :serialized-value-size 59} {:topic-name "edn", :partition 0, :timestamp 1607090709590, :offset 1, :serialized-key-size 19, :serialized-value-size 63} {:topic-name "edn", :partition 0, :timestamp 1607090709598, :offset 2, :serialized-key-size 19, :serialized-value-size 65} ... ... ... {:topic-name "edn", :partition 0, :timestamp 1607090709956, :offset 97, :serialized-key-size 20, :serialized-value-size 495} {:topic-name "edn", :partition 0, :timestamp 1607090709959, :offset 98, :serialized-key-size 20, :serialized-value-size 500} {:topic-name "edn", :partition 0, :timestamp 1607090709961, :offset 99, :serialized-key-size 20, :serialized-value-size 506}
Čtení zpráv je realizováno utilitou kafkacat popsanou v páté kapitole. Povšimněte si, že specifikujeme způsob výpisu přijatých zpráv – nejprve se vypíše klíč, za ním hvězdička, za hvězdičkou tělo (text) zprávy a následuje další hvězdička. Výpis je ukončen novým řádkem:
$ kafkacat -C -b localhost:9092 -t edn -f '%k*%s*\n'
S výsledkem:
{:n 0, :foo "foo"} *{:bar "bar", :value 0, :recip 1, :factorial 1, :values ()} * {:n 1, :foo "foo"} *{:bar "bar", :value 1, :recip 1/2, :factorial 1M, :values (0)} * {:n 2, :foo "foo"} *{:bar "bar", :value 2, :recip 1/3, :factorial 2M, :values (0 1)} * ... ... ... *{:bar "bar", :value 98, :recip 1/99, :factorial 9426890448883247745626185743057242473809693764078951663494238777294707070023223798882976159207729119823605850588608460429412647567360000000000000000000000M, :values (0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97)} * {:n 99, :foo "foo"} *{:bar "bar", :value 99, :recip 1/100, :factorial 933262154439441526816992388562667004907159682643816214685929638952175999932299156089414639761565182862536979208272237582511852109168640000000000000000000000M, :values (0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98)} *
Z výpisu je patrných několik důležitých faktů:
- Ve formátu EDN lze pracovat se zlomky (ratio)
- Ve formátu EDN lze ukládat čísla s prakticky neomezeným rozsahem (a dodejme, že i přesností)
- Líné sekvence jsou (pochopitelně) realizovány, tj. je z nich vytvořena běžná sekvence
- Ve formátu EDN je za vlastními daty uložen i znak pro konec řádku, což může být matoucí
Podívejme se nyní na poslední zmíněnou vlastnost. Jednu zprávu vypsanou nástrojem kafkacat zvýrazníme – klíč bude vypsán tučně, hodnota (text) podtrženě:
{:n 0, :foo "foo"} *{:bar "bar", :value 0, :recip 1, :factorial 1, :values ()} * {:n 1, :foo "foo"} *{:bar "bar", :value 1, :recip 1/2, :factorial 1M, :values (0)} * {:n 2, :foo "foo"} *{:bar "bar", :value 2, :recip 1/3, :factorial 2M, :values (0 1)} *
V navazující kapitole si ukážeme, jak se onoho přebytečného (?) konce řádku zbavit.
10. Formát EDN bez konců řádků
Na konci předchozí kapitoly jsme si ukázali, že serdes realizovaný ve jmenném prostoru jackdaw.serdes.edn při serializaci přidává na konec dat prázdný řádek. To je poněkud nepříjemné zejména při prohlížení zpráv s využitím nástroje kafkacat či při konzumaci některými typy klientů. Existuje však alternativní způsob serializace zpráv, tentokrát ovšem založený na serdes realizovaný v modulu jackdaw.serdes (nyní bez .edn), jehož konstruktor se zavolá následujícím způsobem:
(require '[jackdaw.serdes]) (jackdaw.serdes/edn-serde)
Následuje výpis upravené verze demonstračního příkladu, který je založen na použití této varianty serdes:
(ns edn-2-serializer.core (:require [jackdaw.admin :as ja] [jackdaw.client :as jc] [clojure.pprint :as pp])) (require '[jackdaw.serdes]) (defn delete-topic [broker-config topic-name] (try (let [client (ja/->AdminClient broker-config)] (ja/delete-topics! client [{:topic-name topic-name}])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn new-topic [broker-config topic-name] (try (let [client (ja/->AdminClient broker-config) topic-config {:topic-name topic-name :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes/edn-serde) :value-serde (jackdaw.serdes/edn-serde) :topic-config {"cleanup.policy" "compact"}}] (ja/create-topics! client [topic-config])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn produce-messages [broker-config topic-name messages] ;; konfigurace producenta zprav ve formatu EDN (let [producer-config (merge broker-config {"acks" "all" "client.id" "foo"}) ; specifikace zpusobu serializace klicu i obsahu zpravy producer-serde-config {:key-serde (jackdaw.serdes/edn-serde) :value-serde (jackdaw.serdes/edn-serde)}] ;; poslani zvoleneho poctu zprav se serializaci klice i hodnoty (with-open [producer (jc/producer producer-config producer-serde-config)] (doseq [i (range 0 messages)] (let [topic {:topic-name topic-name} ; posilany klic message-key {:n i :foo "foo"} ; posilany obsah zpravy message-value {:bar "bar" :value i :recip (/ 1 (inc i)) :factorial (reduce * (range 1M (inc i))) :values (range i) } record-metadata (jc/produce! producer topic message-key message-value)] (pp/pprint @record-metadata)))))) (defn -main [& args] (let [broker-config {"bootstrap.servers" "localhost:9092"} topic-name "edn2"] ;; na zacatku pro jistotu vymazeme tema "edn2" (delete-topic broker-config topic-name) ;; vytvoreni noveho tematu akceptujiciho zpravy ve formatu EDN (new-topic broker-config topic-name) ;; poslani 100 zprav se serializaci klice i hodnoty (produce-messages broker-config topic-name 100)))
11. Ukázka skutečného formátu zpráv uložených do tématu
Demonstrační příklad popsaný v předchozí kapitole spustíme. Výsledkem by mělo být sto zpráv poslaných do nově vytvořeného tématu nazvaného „edn2“:
{:topic-name "edn2", :partition 0, :timestamp 1607091366882, :offset 0, :serialized-key-size 18, :serialized-value-size 58} {:topic-name "edn2", :partition 0, :timestamp 1607091366928, :offset 1, :serialized-key-size 18, :serialized-value-size 62} {:topic-name "edn2", :partition 0, :timestamp 1607091366934, :offset 2, :serialized-key-size 18, :serialized-value-size 64} ... ... ... {:topic-name "edn2", :partition 0, :timestamp 1607091367286, :offset 97, :serialized-key-size 19, :serialized-value-size 494} {:topic-name "edn2", :partition 0, :timestamp 1607091367289, :offset 98, :serialized-key-size 19, :serialized-value-size 499} {:topic-name "edn2", :partition 0, :timestamp 1607091367292, :offset 99, :serialized-key-size 19, :serialized-value-size 505}
Opět se pokusme zprávy z tohoto tématu přečíst nástrojem kafkacat:
$ kafkacat -C -b localhost:9092 -t edn2 -f '%k*%s*\n'
Prvních deset zpráv vypadá následovně:
{:n 0, :foo "foo"}*{:bar "bar", :value 0, :recip 1, :factorial 1, :values ()}* {:n 1, :foo "foo"}*{:bar "bar", :value 1, :recip 1/2, :factorial 1M, :values (0)}* {:n 2, :foo "foo"}*{:bar "bar", :value 2, :recip 1/3, :factorial 2M, :values (0 1)}* {:n 3, :foo "foo"}*{:bar "bar", :value 3, :recip 1/4, :factorial 6M, :values (0 1 2)}* {:n 4, :foo "foo"}*{:bar "bar", :value 4, :recip 1/5, :factorial 24M, :values (0 1 2 3)}* {:n 5, :foo "foo"}*{:bar "bar", :value 5, :recip 1/6, :factorial 120M, :values (0 1 2 3 4)}* {:n 6, :foo "foo"}*{:bar "bar", :value 6, :recip 1/7, :factorial 720M, :values (0 1 2 3 4 5)}* {:n 7, :foo "foo"}*{:bar "bar", :value 7, :recip 1/8, :factorial 5040M, :values (0 1 2 3 4 5 6)}* {:n 8, :foo "foo"}*{:bar "bar", :value 8, :recip 1/9, :factorial 40320M, :values (0 1 2 3 4 5 6 7)}* {:n 9, :foo "foo"}*{:bar "bar", :value 9, :recip 1/10, :factorial 362880M, :values (0 1 2 3 4 5 6 7 8)}*
Poslední tři zprávy vypadají takto:
{:n 97, :foo "foo"}*{:bar "bar", :value 97, :recip 1/98, :factorial 96192759682482119853328425949563698712343813919172976158104477319333745612481875498805879175589072651261284189679678167647067832320000000000000000000000M, :values (0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96)}* {:n 98, :foo "foo"}*{:bar "bar", :value 98, :recip 1/99, :factorial 9426890448883247745626185743057242473809693764078951663494238777294707070023223798882976159207729119823605850588608460429412647567360000000000000000000000M, :values (0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97)}* {:n 99, :foo "foo"}*{:bar "bar", :value 99, :recip 1/100, :factorial 933262154439441526816992388562667004907159682643816214685929638952175999932299156089414639761565182862536979208272237582511852109168640000000000000000000000M, :values (0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98)}*
{:n 2, :foo "foo"}*{:bar "bar", :value 2, :recip 1/3, :factorial 2M, :values (0 1)}*
12. Použití formátu JSON
Poměrně často se při práci se streamovací platformou Apache Kafka setkáme se zprávami uloženými ve formátu JSON, který je v současnosti využíván (a někdy též až neskutečným způsobem zneužíván) i v mnoha dalších oblastech výpočetní techniky. Pro serializaci a deserializaci zpráv s využitím JSONu se používá serdes realizovaný ve jmenném prostoru jackdaw.serdes.json.
Deklarace tématu s popisem serdesu:
{:topic-name topic-name :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde) :topic-config {"cleanup.policy" "compact"}}]
Vytvoření producenta zpráv založeného na tomto serdesu:
(let [producer-config (merge broker-config {"acks" "all" "client.id" "foo"}) ; specifikace zpusobu serializace klicu i obsahu zpravy producer-serde-config {:key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)}] ;; poslani specifikovaneho poctu zprav se serializaci klice i hodnoty (with-open [producer (jc/producer producer-config producer-serde-config)] ... ... ...))
Následuje výpis úplného zdrojového kódu příkladu produkujícího zprávy ve formátu JSON (včetně klíčů):
(ns json-serializer.core (:require [jackdaw.admin :as ja] [jackdaw.client :as jc] [clojure.pprint :as pp])) (require '[jackdaw.serdes.json]) (defn delete-topic [broker-config topic-name] (try (let [client (ja/->AdminClient broker-config)] (ja/delete-topics! client [{:topic-name topic-name}])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn new-topic [broker-config topic-name] (try (let [client (ja/->AdminClient broker-config) topic-config {:topic-name topic-name :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde) :topic-config {"cleanup.policy" "compact"}}] (ja/create-topics! client [topic-config])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn produce-messages [broker-config topic-name messages] ;; konfigurace producenta zprav ve formatu JSON (let [producer-config (merge broker-config {"acks" "all" "client.id" "foo"}) ; specifikace zpusobu serializace klicu i obsahu zpravy producer-serde-config {:key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)}] ;; poslani specifikovaneho poctu zprav se serializaci klice i hodnoty (with-open [producer (jc/producer producer-config producer-serde-config)] (doseq [i (range 0 messages)] (let [topic {:topic-name topic-name} ; posilany klic message-key {:n i :foo "foo"} ; posilany obsah zpravy message-value {:bar "bar" :value i :recip (/ 1 (inc i)) :factorial (reduce * (range 1M (inc i))) :values (range i) } record-metadata (jc/produce! producer topic message-key message-value)] (pp/pprint @record-metadata)))))) (defn -main [& args] (let [broker-config {"bootstrap.servers" "localhost:9092"} topic-name "json"] ;; na zacatku pro jistotu vymazeme tema "json" (delete-topic broker-config topic-name) ;; vytvoreni noveho tematu akceptujiciho zpravy ve formatu JSON (new-topic broker-config topic-name) ;; poslani 100 zprav se serializaci klice i hodnoty (produce-messages broker-config topic-name 100)))
13. Ukázka skutečného formátu zpráv uložených do tématu
Demonstrační příklad zmíněný v předchozí kapitole opět spustíme příkazem lein run:
{:topic-name "json", :partition 0, :timestamp 1607093463523, :offset 0, :serialized-key-size 19, :serialized-value-size 59} {:topic-name "json", :partition 0, :timestamp 1607093463577, :offset 1, :serialized-key-size 19, :serialized-value-size 62} {:topic-name "json", :partition 0, :timestamp 1607093463585, :offset 2, :serialized-key-size 19, :serialized-value-size 79} ... ... ... {:topic-name "json", :partition 0, :timestamp 1607093464057, :offset 97, :serialized-key-size 20, :serialized-value-size 509} {:topic-name "json", :partition 0, :timestamp 1607093464061, :offset 98, :serialized-key-size 20, :serialized-value-size 513} {:topic-name "json", :partition 0, :timestamp 1607093464066, :offset 99, :serialized-key-size 20, :serialized-value-size 504}
A výsledky, tedy klíče i hodnoty jednotlivých zpráv, si necháme vypsat nástrojem kafkacat:
$ kafkacat -C -b localhost:9092 -t json -f '%k*%s*\n'
Prvních deset zpráv vypadá následovně:
{"n":0,"foo":"foo"}*{"bar":"bar","value":0,"recip":1,"factorial":1,"values":[]}* {"n":1,"foo":"foo"}*{"bar":"bar","value":1,"recip":0.5,"factorial":1,"values":[0]}* {"n":2,"foo":"foo"}*{"bar":"bar","value":2,"recip":0.3333333333333333,"factorial":2,"values":[0,1]}* {"n":3,"foo":"foo"}*{"bar":"bar","value":3,"recip":0.25,"factorial":6,"values":[0,1,2]}* {"n":4,"foo":"foo"}*{"bar":"bar","value":4,"recip":0.2,"factorial":24,"values":[0,1,2,3]}* {"n":5,"foo":"foo"}*{"bar":"bar","value":5,"recip":0.1666666666666667,"factorial":120,"values":[0,1,2,3,4]}* {"n":6,"foo":"foo"}*{"bar":"bar","value":6,"recip":0.1428571428571429,"factorial":720,"values":[0,1,2,3,4,5]}* {"n":7,"foo":"foo"}*{"bar":"bar","value":7,"recip":0.125,"factorial":5040,"values":[0,1,2,3,4,5,6]}* {"n":8,"foo":"foo"}*{"bar":"bar","value":8,"recip":0.1111111111111111,"factorial":40320,"values":[0,1,2,3,4,5,6,7]}* {"n":9,"foo":"foo"}*{"bar":"bar","value":9,"recip":0.1,"factorial":362880,"values":[0,1,2,3,4,5,6,7,8]}*
Poslední tři zprávy vypadají takto:
{"n":97,"foo":"foo"}*{"bar":"bar","value":97,"recip":0.01020408163265306,"factorial":96192759682482119853328425949563698712343813919172976158104477319333745612481875498805879175589072651261284189679678167647067832320000000000000000000000,"values":[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96]}* {"n":98,"foo":"foo"}*{"bar":"bar","value":98,"recip":0.0101010101010101,"factorial":9426890448883247745626185743057242473809693764078951663494238777294707070023223798882976159207729119823605850588608460429412647567360000000000000000000000,"values":[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97]}* {"n":99,"foo":"foo"}*{"bar":"bar","value":99,"recip":0.01,"factorial":933262154439441526816992388562667004907159682643816214685929638952175999932299156089414639761565182862536979208272237582511852109168640000000000000000000000,"values":[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98]}*
Z výpisu je patrných několik důležitých faktů:
- Ve formátu JSON není možné pracovat se zlomky (ratio), hodnoty se převedou na typ float
- Ve formátu JSON nelze ukládat čísla s prakticky neomezeným rozsahem, velké faktoriály se převedou na řetězec
- Líné sekvence jsou (pochopitelně) realizovány, tj. je z nich vytvořena běžné pole
14. Základy pokročilého streamingu – kolona (pipe)
V oblasti „proudového“ zpracování dat (ale nejenom v této oblasti) se poměrně často setkáme s názvem Extract, transform, load neboli ETL. Jedná se o název, který je tradičně používán i v jiných částech IT, například pro extrakci a zpracování dat získaných z CSV tabulek, relačních databází atd. s uložením výsledku do nějakého datového úložiště. V případě streamingu a konkrétně Apache Kafky je možné termín použít pro každý prvek kolony (pipe, pipeline), přičemž v tom nejjednodušším případě je zdrojem dat jedno téma a cílem (úložištěm) jiné téma:
[input] → (ETL) → [output]
ETL je (alespoň v typických případech) realizován bezstavovým algoritmem a právě z tohoto důvodu se zde může s výhodou použít jazyk Clojure.
Podívejme se nyní na způsob explicitního vytvoření jednoduchého ETL realizovaného spojením konzumenta z tématu „input“, nějaké transformační funkce a producenta do tématu „output“. Vstupní i výstupní zprávy budou používat formát JSON.
Na začátku pro jistotu obě témata „input“ i „output“ z Kafky vymažeme:
(let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})] (ja/delete-topics! client [{:topic-name "input"} {:topic-name "output"}]))
Témata opět vytvoříme se specifikací formátu JSON:
;; vytvoreni noveho tematu "input" akceptujiciho zpravy ve formatu JSON (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"}) topic-config {:topic-name "input" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde) :topic-config {"cleanup.policy" "compact"}}] (ja/create-topics! client [topic-config]))
;; vytvoreni noveho tematu "output" akceptujiciho zpravy ve formatu JSON (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"}) topic-config {:topic-name "output" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde) :topic-config {"cleanup.policy" "compact"}}] (ja/create-topics! client [topic-config]))
Konfigurace konzumenta zpráv:
(def consumer-config {"bootstrap.servers" "localhost:9092" "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" "auto.offset.reset" "earliest" "group.id" "group-A"})
Konfigurace producenta zpráv:
(def producer-config {"bootstrap.servers" "localhost:9092" "acks" "all" "client.id" "foo"})
Ještě musíme vytvořit mapy s konfigurací serializátoru a deserializátoru producenta i konzumenta:
;; konfigurace serializatoru a deserializatoru (def consumer-serde-config {:key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)}) ;; konfigurace serializatoru a deserializatoru (def producer-serde-config {:key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)})
Nyní se dostáváme k samotné realizaci prvku pipeline. Otevřeme připojení konzumenta i producenta zpráv a následně budeme zprávy načítat takovým způsobem, jakoby se jednalo o (nekonečnou) sekvenci jazyka Clojure:
;; start jednoduche pipeline ; otevření připojení ke vstupnímu tématu – konzument (with-open [consumer (-> (jc/consumer consumer-config consumer-serde-config) (jc/subscribe [{:topic-name "input"}]))] ; otevření připojení k výstupnímu tématu – producent (with-open [producer (jc/producer producer-config producer-serde-config)] ; zpracování příchozích zpráv, jejich transformace a uložení (doseq [{:keys [key value partition timestamp offset]} (jl/log consumer 10)] (println "Incoming message:") (println "key: " key) (println "value: " value) (println "partition: " partition) (println "timestamp: " timestamp) (println "offset: " offset) ; transformace a uložení (let [result (+ (:x value) (:y value)) ; vypocet vysledku message-key {:id (.toString (java.util.UUID/randomUUID))} message-value {:result result} record-metadata (jc/produce! producer {:topic-name "output"} message-key message-value)] (println "Outgoing message:") (pp/pprint @record-metadata) (println "------------------")))))
Nyní do vstupního tématu „input“ pošleme několik zpráv v JSONu s očekávaným formátem:
$ echo '{"x":1, "y":2}' | kafkacat -P -b localhost:9092 -t input -k '{"id":1}' $ echo '{"x":-1, "y":2}' | kafkacat -P -b localhost:9092 -t input -k '{"id":1}' $ echo '{"x":1000, "y":2}' | kafkacat -P -b localhost:9092 -t input -k '{"id":1}
A podíváme se, jaké zprávy byly poslány do tématu „output“:
$ kafkacat -C -b localhost:9092 -t output -f '%k*%s*\n'
Povšimněte si, že se skutečně přijaly zprávy s výsledkem součtu atributů x a y:
{"id":"16668996-9fc8-464d-b7d8-f9cf1dcb4bb6"}*{"result":3}* {"id":"15ce8223-e29a-48b5-8274-dae42bbf2e90"}*{"result":1}* {"id":"fcac0963-7cfb-4403-8f88-4ab6abcf102c"}*{"result":1002}* % Reached end of topic output [0] at offset 3
15. Kolona s explicitním kódem konzumenta i producenta
Ukažme si nyní, jak by mohl vypadat zdrojový kód uceleného projektu realizujícího kolonu (pipe) s vlastní explicitně zapsanou realizací konzumenta i producenta. Tento příklad je založen na ukázkových kódech uvedených v rámci předchozí kapitoly:
(ns custom-pipe.core (:require [jackdaw.admin :as ja] [jackdaw.client :as jc] [jackdaw.client.log :as jl] [clojure.pprint :as pp])) (require '[jackdaw.serdes.json]) (defn delete-topic [broker-config topic-name] (try (let [client (ja/->AdminClient broker-config)] (ja/delete-topics! client [{:topic-name topic-name}])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn new-topic [broker-config topic-name] (try (let [client (ja/->AdminClient broker-config) topic-config {:topic-name topic-name :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde) :topic-config {"cleanup.policy" "compact"}}] (ja/create-topics! client [topic-config])) (catch Exception e (str "caught exception: " (.getMessage e))))) ;; konfigurace serializatoru a deserializatoru (def consumer-serde-config {:key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)}) (defn pipe [broker-config input-topic output-topic] ;; konfigurace producenta i konzumenta zprav ve formatu JSON (let [producer-config (merge broker-config {"acks" "all" "client.id" "foo"}) consumer-config (merge broker-config {"key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" "auto.offset.reset" "earliest" "group.id" "group-A"}) ; specifikace zpusobu serializace klicu i obsahu zpravy producer-serde-config {:key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} consumer-serde-config {:key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)}] ; otevreni konzumenta zprav (with-open [consumer (-> (jc/consumer consumer-config consumer-serde-config) (jc/subscribe [{:topic-name input-topic}]))] ; otevreni producenta zprav (with-open [producer (jc/producer producer-config producer-serde-config)] ; cteni zprav ze vstupniho tematu (doseq [{:keys [key value partition timestamp offset]} (jl/log consumer 10)] (println "Incoming message:") (println "key: " key) (println "value: " value) (println "partition: " partition) (println "timestamp: " timestamp) (println "offset: " offset) (println) ; vypocet vysledku s jeho poslanim do vystupniho tematu (let [result (+ (:x value) (:y value)) ; vypocet vysledku message-key {:id (.toString (java.util.UUID/randomUUID))} message-value {:result result} record-metadata (jc/produce! producer {:topic-name output-topic} message-key message-value)] (println "Result:") (println "value:" result) (println) (println "Outgoing message:") (pp/pprint @record-metadata) (println "------------------"))))))) (defn -main [& args] (let [broker-config {"bootstrap.servers" "localhost:9092"} input-topic-name "input" output-topic-name "output"] ;; na zacatku pro jistotu vymazeme temata pouzivane pipou (delete-topic broker-config input-topic-name) (delete-topic broker-config output-topic-name) ;; vytvoreni novych temat akceptujiciho zpravy ve formatu JSON (new-topic broker-config input-topic-name) (new-topic broker-config output-topic-name) ;; spusteni kolony (println "Starting pipe") (pipe broker-config input-topic-name output-topic-name)))
Příklad spustíme klasicky příkazem lein run:
$ lein run
Vytvoříme několik zpráv a pošleme je do tématu:
$ echo '{"x":1, "y":2}' | kafkacat -P -b localhost:9092 -t input -k '{"id":1}' $ echo '{"x":-1, "y":2}' | kafkacat -P -b localhost:9092 -t input -k '{"id":1}' $ echo '{"x":1000, "y":2}' | kafkacat -P -b localhost:9092 -t input -k '{"id":1}
Projekt s pipeline by měl zareagovat na příchozí zprávy následovně:
Incoming message: key: {:id 1} value: {:x 1, :y 2} partition: 0 timestamp: 1607155890020 offset: 1 Result: value: 3 Outgoing message: {:topic-name "output", :partition 0, :timestamp 1607155890034, :offset 1, :serialized-key-size 45, :serialized-value-size 12} ------------------ Incoming message: key: {:id 1} value: {:x -1, :y 2} partition: 0 timestamp: 1607155893547 offset: 2 Result: value: 1 Outgoing message: {:topic-name "output", :partition 0, :timestamp 1607155893561, :offset 2, :serialized-key-size 45, :serialized-value-size 12} ------------------
Spustíme i konzumenta:
$ kafkacat -C -b localhost:9092 -t output -f '%k*%s*\n'
Ten vypíše svoje výsledky, tedy zprávy přijaté z tématu „output“:
{"id":"16668996-9fc8-464d-b7d8-f9cf1dcb4bb6"}*{"result":3}* % Reached end of topic output [0] at offset 1 {"id":"15ce8223-e29a-48b5-8274-dae42bbf2e90"}*{"result":1}* % Reached end of topic output [0] at offset 2 {"id":"fcac0963-7cfb-4403-8f88-4ab6abcf102c"}*{"result":1002}* % Reached end of topic output [0] at offset 3
16. Zobecnění ETL funkce
Předchozí demonstrační příklad je možné vylepšit, a to takovým způsobem, že umožníme specifikaci transformační (ETL) funkce, tedy vlastního „jádra“ příkladu – právě ETL funkce totiž určuje, jak se budou data převádět ze vstupního tématu do tématu výstupního. Budeme tedy chtít, aby se kolona spustila takto:
(pipe broker-config input-topic-name output-topic-name etl)))
Přičemž transformační funkce může vypadat například následovně:
(defn etl [input-value] {:key {:id (.toString (java.util.UUID/randomUUID))} :value (* (:x input-value) (:y input-value))})
Ve skutečnosti není úprava příkladu nijak složitá. Upravené části jsou podtrženy:
(ns custom-pipe-processing-function.core (:require [jackdaw.admin :as ja] [jackdaw.client :as jc] [jackdaw.client.log :as jl] [clojure.pprint :as pp])) (require '[jackdaw.serdes.json]) (defn delete-topic [broker-config topic-name] (try (let [client (ja/->AdminClient broker-config)] (ja/delete-topics! client [{:topic-name topic-name}])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn new-topic [broker-config topic-name] (try (let [client (ja/->AdminClient broker-config) topic-config {:topic-name topic-name :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde) :topic-config {"cleanup.policy" "compact"}}] (ja/create-topics! client [topic-config])) (catch Exception e (str "caught exception: " (.getMessage e))))) ;; konfigurace serializatoru a deserializatoru (def consumer-serde-config {:key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)}) (defn pipe [broker-config input-topic output-topic etl-function] ;; konfigurace producenta i konzumenta zprav ve formatu JSON (let [producer-config (merge broker-config {"acks" "all" "client.id" "foo"}) consumer-config (merge broker-config {"key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" "auto.offset.reset" "earliest" "group.id" "group-A"}) ; specifikace zpusobu serializace klicu i obsahu zpravy producer-serde-config {:key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} consumer-serde-config {:key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)}] ; otevreni konzumenta zprav (with-open [consumer (-> (jc/consumer consumer-config consumer-serde-config) (jc/subscribe [{:topic-name input-topic}]))] ; otevreni producenta zprav (with-open [producer (jc/producer producer-config producer-serde-config)] ; cteni zprav ze vstupniho tematu (doseq [{:keys [key value partition timestamp offset]} (jl/log consumer 10)] (println "Incoming message:") (println "key: " key) (println "value: " value) (println "partition: " partition) (println "timestamp: " timestamp) (println "offset: " offset) (println) ; vypocet vysledku s jeho poslanim do vystupniho tematu (let [message (etl-function value) record-metadata (jc/produce! producer {:topic-name output-topic} (:key message) (:value message))] (println "Result:") (println "message" message) (println) (println "Outgoing message:") (pp/pprint @record-metadata) (println "------------------"))))))) (defn etl [input-value] {:key {:id (.toString (java.util.UUID/randomUUID))} :value (* (:x input-value) (:y input-value))}) (defn -main [& args] (let [broker-config {"bootstrap.servers" "localhost:9092"} input-topic-name "input" output-topic-name "output"] ;; na zacatku pro jistotu vymazeme temata pouzivane pipou (delete-topic broker-config input-topic-name) (delete-topic broker-config output-topic-name) ;; vytvoreni novych temat akceptujiciho zpravy ve formatu JSON (new-topic broker-config input-topic-name) (new-topic broker-config output-topic-name) ;; spusteni kolony (println "Starting pipe") (pipe broker-config input-topic-name output-topic-name etl)))
Příklad zpracování zpráv pipelinou s odlišným výpočtem v ETL funkci:
------------------ Incoming message: key: {:id 1} value: {:x 10, :y 20} partition: 0 timestamp: 1607156843578 offset: 1 Result: message {:key {:id f4d5f7ed-70f7-4609-a3c3-65a9ba8c89f8}, :value 200} Outgoing message: {:topic-name "output", :partition 0, :timestamp 1607156843583, :offset 1, :serialized-key-size 45, :serialized-value-size 3} ------------------ Incoming message: key: {:id 1} value: {:x -1, :y 2} partition: 0 timestamp: 1607156846869 offset: 2 Result: message {:key {:id 0e6a9466-b07a-4b97-8d4c-ec4667668465}, :value -2} Outgoing message: {:topic-name "output", :partition 0, :timestamp 1607156846882, :offset 2, :serialized-key-size 45, :serialized-value-size 2} ------------------
17. Lepší řešení – využití Streams API
Předchozí řešení kolony mělo poměrně mnoho nevýhod. Patří k nim nízkoúrovňový přístup k platformě Apache Kafka, který vyžaduje deklaraci producenta i konzumenta, spojení logiky konzumenta i producenta, případné společné řešení chybových stavů atd. Existuje však mnohem lepší a v rámci jazyka Clojure i idiomatičtější způsob založení na Streams API realizovaného funkcemi a makry ve jmenném prostoru jackdaw.streams. Pojďme si nyní použití Streams API ukázat krok za krokem (tedy tak, jak by se jednotlivé formy zadávaly do interaktivní smyčky REPL).
Budeme používat především již zmíněný jmenný prostor jackdaw.streams:
(require '[jackdaw.serdes.json]) (require '[jackdaw.streams :as j])
Vymažeme vstupní i výstupní téma (podobně jako v předchozích příkladech):
(let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"})] (ja/delete-topics! client [{:topic-name "input"} {:topic-name "output"}]))
Témata vytvoříme znovu:
;; vytvoreni noveho tematu "input" akceptujiciho zpravy ve formatu JSON (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"}) input-topic {:topic-name "input" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde) :topic-config {"cleanup.policy" "compact"}}] (ja/create-topics! client [input-topic])) ;; vytvoreni noveho tematu "output" akceptujiciho zpravy ve formatu JSON (let [client (ja/->AdminClient {"bootstrap.servers" "localhost:9092"}) input-topic {:topic-name "output" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde) :topic-config {"cleanup.policy" "compact"}}] (ja/create-topics! client [input-topic]))
Mapa topic-config obsahuje konfigurace vstupního i výstupního tématu, včetně specifikace serdes. Zde se skutečně jedná o jediné místo, kde je serdes explicitně zmíněn:
;; konfigurace temat (def topic-config {:input {:topic-name "input" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output {:topic-name "output" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)}})
Musíme nakonfigurovat i celou „aplikaci“, čímž je myšlena celá pipeline. Nejdůležitější volbou je zde jak broker, tak i ID aplikace:
(def app-config {"application.id" "pipe" "bootstrap.servers" "localhost:9092" "cache.max.bytes.buffering" "0"})
Vlastní transformační funkce, která realizuje operaci „T“ v „ETL“, akceptuje dvojici klíč+hodnota a vrací taktéž dvojici klíč+hodnota (jedná se tedy o dvouprvkové vektory). V našem případě bude v rámci transformace proveden součet atributů ze vstupní zprávy a výsledek uložen do atributu zprávy výsledné:
;; transformacni funkce (defn etl [[k v]] [k {:result (+ (:x v) (:y v))}])
Nejdůležitější částí je ovšem definice samotné kolony. Použijeme threading makro; na začátku je ze vstupního tématu vytvořen „stream“, což je vlastně z pohledu jazyka Clojure běžná (nekonečná) sekvence. Na prvky sekvence postupně aplikujeme funkce peek (ta stream nemění, jen volá jinou funkci s vedlejším efektem – použito například při logování), map (aplikace transformační funkce) a to (uložení výsledného/transformovaného prvku do výstupního tématu):
;; cela pipeline (kolona) (defn build-topology [builder] (-> (j/kstream builder (:input topic-config)) (j/peek (fn [[k v]] (println "Received message with key: " k " and value:" v))) (j/map etl) (j/peek (fn [[k v]] (println "Transformed message with key:" k " and value:" v))) (j/to (:output topic-config))) builder)
Další dvě pomocné funkce „aplikaci“ spustí popř. zastaví. Vypadají následovně:
;; spusteni aplikace (defn start-app "Starts the stream processing application." [app-config] (let [builder (j/streams-builder) topology (build-topology builder) app (j/kafka-streams topology app-config)] (j/start app) (println "pipe is up") app)) ;; zastaveni aplikace (defn stop-app "Stops the stream processing application." [app] (j/close app) (println "pipe is down"))
Nyní zkusíme aplikaci spustit:
(def app (start-app app-config))
V této chvíli je možné přes kafkacat posílat zprávy do tématu „input“ a číst výsledky z tématu „output“.
Aplikaci lze kdykoli zastavit:
(stop-app app)
Předchozí kroky můžeme přepsat do skutečného projektu, k němuž se ještě vrátíme v navazujícím článku:
(ns stream-pipe-1.core (:require [jackdaw.admin :as ja] [jackdaw.client :as jc] [jackdaw.client.log :as jl] [jackdaw.serdes.json] [jackdaw.streams :as j] [clojure.pprint :as pp])) ;; konfigurace temat (def topic-config {:input {:topic-name "input" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)} :output {:topic-name "output" :partition-count 1 :replication-factor 1 :key-serde (jackdaw.serdes.json/serde) :value-serde (jackdaw.serdes.json/serde)}}) ;; konfigurace aplikace (def app-config {"application.id" "pipe" "bootstrap.servers" "localhost:9092" "cache.max.bytes.buffering" "0"}) (defn delete-topic [broker-config topic] (try (let [client (ja/->AdminClient broker-config)] (ja/delete-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) (defn new-topic [broker-config topic] (try (let [client (ja/->AdminClient broker-config)] (ja/create-topics! client [topic])) (catch Exception e (str "caught exception: " (.getMessage e))))) ;; transformacni funkce (defn etl [[k v]] [k {:result (+ (:x v) (:y v))}]) ;; cela pipeline (kolona) (defn build-topology [builder topic-config] (-> (j/kstream builder (:input topic-config)) (j/peek (fn [[k v]] (println "Received message with key: " k " and value:" v))) (j/map etl) (j/peek (fn [[k v]] (println "Transformed message with key:" k " and value:" v))) (j/to (:output topic-config))) builder) ;; spusteni aplikace (defn start-app "Starts the stream processing application." [app-config topic-config] (let [builder (j/streams-builder) topology (build-topology builder topic-config) app (j/kafka-streams topology app-config)] (j/start app) (println "pipe is up") app)) ;; zastaveni aplikace (defn stop-app "Stops the stream processing application." [app] (j/close app) (println "pipe is down")) (defn -main [& args] (let [broker-config {"bootstrap.servers" "localhost:9092"}] ;; na zacatku pro jistotu vymazeme temata pouzivane pipou (delete-topic broker-config (:input topic-config)) (delete-topic broker-config (:output topic-config)) ;; vytvoreni novych temat akceptujiciho zpravy ve formatu JSON (new-topic broker-config (:input topic-config)) (new-topic broker-config (:output topic-config)) ;; spusteni kolony (println "Starting pipe") (let [app (start-app app-config topic-config)] (println "App:" app))))
18. Repositář s demonstračními příklady
Zdrojové kódy všech dnes popsaných demonstračních příkladů vyvinutých v programovacím jazyku Clojure byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-):
19. Odkazy na předchozí části seriálu o programovacím jazyku Clojure
- Clojure 1: Úvod
http://www.root.cz/clanky/clojure-aneb-jazyk-umoznujici-tvorbu-bezpecnych-vicevlaknovych-aplikaci-pro-jvm/ - Clojure 2: Symboly, kolekce atd.
http://www.root.cz/clanky/clojure-aneb-jazyk-umoznujici-tvorbu-bezpecnych-vicevlaknovych-aplikaci-pro-jvm-2-cast/ - Clojure 3: Funkcionální programování
http://www.root.cz/clanky/clojure-aneb-jazyk-umoznujici-tvorbu-bezpecnych-vicevlaknovych-aplikaci-pro-jvm-3-cast-funkcionalni-programovani/ - Clojure 4: Kolekce, sekvence a lazy sekvence
http://www.root.cz/clanky/clojure-aneb-jazyk-umoznujici-tvorbu-bezpecnych-vicevlaknovych-aplikaci-pro-jvm-4-cast-kolekce-sekvence-a-lazy-sekvence/ - Clojure 5: Sekvence, lazy sekvence a paralelní programy
http://www.root.cz/clanky/clojure-a-bezpecne-aplikace-pro-jvm-sekvence-lazy-sekvence-a-paralelni-programy/ - Clojure 6: Podpora pro paralelní programování
http://www.root.cz/clanky/programovaci-jazyk-clojure-6-futures-nejsou-jen-financni-derivaty/ - Clojure 7: Další funkce pro paralelní programování
http://www.root.cz/clanky/programovaci-jazyk-clojure-7-dalsi-podpurne-prostredky-pro-paralelni-programovani/ - Clojure 8: Identity, stavy, neměnné hodnoty a reference
http://www.root.cz/clanky/programovaci-jazyk-clojure-8-identity-stavy-nemenne-hodnoty-a-referencni-typy/ - Clojure 9: Validátory, pozorovatelé a kooperace s Javou
http://www.root.cz/clanky/programovaci-jazyk-clojure-9-validatory-pozorovatele-a-kooperace-mezi-clojure-a-javou/ - Clojure 10: Kooperace mezi Clojure a Javou
http://www.root.cz/clanky/programovaci-jazyk-clojure-10-kooperace-mezi-clojure-a-javou-pokracovani/ - Clojure 11: Generátorová notace seznamu/list comprehension
http://www.root.cz/clanky/programovaci-jazyk-clojure-11-generatorova-notace-seznamu-list-comprehension/ - Clojure 12: Překlad programů z Clojure do bajtkódu JVM I:
http://www.root.cz/clanky/programovaci-jazyk-clojure-12-preklad-programu-z-clojure-do-bajtkodu-jvm/ - Clojure 13: Překlad programů z Clojure do bajtkódu JVM II:
http://www.root.cz/clanky/programovaci-jazyk-clojure-13-preklad-programu-z-clojure-do-bajtkodu-jvm-pokracovani/ - Clojure 14: Základy práce se systémem maker
http://www.root.cz/clanky/programovaci-jazyk-clojure-14-zaklady-prace-se-systemem-maker/ - Clojure 15: Tvorba uživatelských maker
http://www.root.cz/clanky/programovaci-jazyk-clojure-15-tvorba-uzivatelskych-maker/ - Programovací jazyk Clojure – triky při práci s řetězci
http://www.root.cz/clanky/programovaci-jazyk-clojure-triky-pri-praci-s-retezci/ - Programovací jazyk Clojure – triky při práci s kolekcemi
http://www.root.cz/clanky/programovaci-jazyk-clojure-triky-pri-praci-s-kolekcemi/ - Programovací jazyk Clojure – práce s mapami a množinami
http://www.root.cz/clanky/programovaci-jazyk-clojure-prace-s-mapami-a-mnozinami/ - Programovací jazyk Clojure – základy zpracování XML
http://www.root.cz/clanky/programovaci-jazyk-clojure-zaklady-zpracovani-xml/ - Programovací jazyk Clojure – testování s využitím knihovny Expectations
http://www.root.cz/clanky/programovaci-jazyk-clojure-testovani-s-vyuzitim-knihovny-expectations/ - Programovací jazyk Clojure – některé užitečné triky použitelné (nejenom) v testech
http://www.root.cz/clanky/programovaci-jazyk-clojure-nektere-uzitecne-triky-pouzitelne-nejenom-v-testech/ - Enlive – výkonný šablonovací systém pro jazyk Clojure
http://www.root.cz/clanky/enlive-vykonny-sablonovaci-system-pro-jazyk-clojure/ - Nástroj Leiningen a programovací jazyk Clojure: tvorba vlastních knihoven pro veřejný repositář Clojars
http://www.root.cz/clanky/nastroj-leiningen-a-programovaci-jazyk-clojure-tvorba-vlastnich-knihoven-pro-verejny-repositar-clojars/ - Novinky v Clojure verze 1.8.0
http://www.root.cz/clanky/novinky-v-clojure-verze-1–8–0/ - Asynchronní programování v Clojure s využitím knihovny core.async
http://www.root.cz/clanky/asynchronni-programovani-v-clojure-s-vyuzitim-knihovny-core-async/ - Asynchronní programování v Clojure s využitím knihovny core.async (pokračování)
http://www.root.cz/clanky/asynchronni-programovani-v-clojure-s-vyuzitim-knihovny-core-async-pokracovani/ - Asynchronní programování v Clojure s využitím knihovny core.async (dokončení)
http://www.root.cz/clanky/asynchronni-programovani-v-clojure-s-vyuzitim-knihovny-core-async-dokonceni/ - Vytváříme IRC bota v programovacím jazyce Clojure
http://www.root.cz/clanky/vytvarime-irc-bota-v-programovacim-jazyce-clojure/ - Gorilla REPL: interaktivní prostředí pro programovací jazyk Clojure
https://www.root.cz/clanky/gorilla-repl-interaktivni-prostredi-pro-programovaci-jazyk-clojure/ - Multimetody v Clojure aneb polymorfismus bez použití OOP
https://www.root.cz/clanky/multimetody-v-clojure-aneb-polymorfismus-bez-pouziti-oop/ - Práce s externími Java archivy v programovacím jazyku Clojure
https://www.root.cz/clanky/prace-s-externimi-java-archivy-v-programovacim-jazyku-clojure/ - Clojure 16: Složitější uživatelská makra
http://www.root.cz/clanky/programovaci-jazyk-clojure-16-slozitejsi-uzivatelska-makra/ - Clojure 17: Využití standardních maker v praxi
http://www.root.cz/clanky/programovaci-jazyk-clojure-17-vyuziti-standardnich-maker-v-praxi/ - Clojure 18: Základní techniky optimalizace aplikací
http://www.root.cz/clanky/programovaci-jazyk-clojure-18-zakladni-techniky-optimalizace-aplikaci/ - Clojure 19: Vývojová prostředí pro Clojure
http://www.root.cz/clanky/programovaci-jazyk-clojure-19-vyvojova-prostredi-pro-clojure/ - Clojure 20: Vývojová prostředí pro Clojure (Vimu s REPL)
http://www.root.cz/clanky/programovaci-jazyk-clojure-20-vyvojova-prostredi-pro-clojure-integrace-vimu-s-repl/ - Clojure 21: ClojureScript aneb překlad Clojure do JS
http://www.root.cz/clanky/programovaci-jazyk-clojure-21-clojurescript-aneb-preklad-clojure-do-javascriptu/ - Leiningen: nástroj pro správu projektů napsaných v Clojure
http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure/ - Leiningen: nástroj pro správu projektů napsaných v Clojure (2)
http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure-2/ - Leiningen: nástroj pro správu projektů napsaných v Clojure (3)
http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure-3/ - Leiningen: nástroj pro správu projektů napsaných v Clojure (4)
http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure-4/ - Leiningen: nástroj pro správu projektů napsaných v Clojure (5)
http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure-5/ - Leiningen: nástroj pro správu projektů napsaných v Clojure (6)
http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure-6/ - Programovací jazyk Clojure a databáze (1.část)
http://www.root.cz/clanky/programovaci-jazyk-clojure-a-databaze-1-cast/ - Pluginy pro Leiningen
http://www.root.cz/clanky/leiningen-nastroj-pro-spravu-projektu-napsanych-v-clojure-pluginy-pro-leiningen/ - Programovací jazyk Clojure a knihovny pro práci s vektory a maticemi
http://www.root.cz/clanky/programovaci-jazyk-clojure-a-knihovny-pro-praci-s-vektory-a-maticemi/ - Programovací jazyk Clojure a knihovny pro práci s vektory a maticemi (2)
http://www.root.cz/clanky/programovaci-jazyk-clojure-a-knihovny-pro-praci-s-vektory-a-maticemi-2/ - Programovací jazyk Clojure: syntéza procedurálních textur s využitím knihovny Clisk
http://www.root.cz/clanky/programovaci-jazyk-clojure-synteza-proceduralnich-textur-s-vyuzitim-knihovny-clisk/ - Programovací jazyk Clojure: syntéza procedurálních textur s využitím knihovny Clisk (2)
http://www.root.cz/clanky/programovaci-jazyk-clojure-synteza-proceduralnich-textur-s-vyuzitim-knihovny-clisk-2/ - Seesaw: knihovna pro snadnou tvorbu GUI v jazyce Clojure
http://www.root.cz/clanky/seesaw-knihovna-pro-snadnou-tvorbu-gui-v-jazyce-clojure/ - Seesaw: knihovna pro snadnou tvorbu GUI v jazyce Clojure (2)
http://www.root.cz/clanky/seesaw-knihovna-pro-snadnou-tvorbu-gui-v-jazyce-clojure-2/ - Seesaw: knihovna pro snadnou tvorbu GUI v jazyce Clojure (3)
http://www.root.cz/clanky/seesaw-knihovna-pro-snadnou-tvorbu-gui-v-jazyce-clojure-3/ - Programovací jazyk Clojure a práce s Gitem
http://www.root.cz/clanky/programovaci-jazyk-clojure-a-prace-s-gitem/ - Programovací jazyk Clojure a práce s Gitem (2)
http://www.root.cz/clanky/programovaci-jazyk-clojure-a-prace-s-gitem-2/ - Programovací jazyk Clojure: syntéza procedurálních textur s využitím knihovny Clisk (dokončení)
http://www.root.cz/clanky/programovaci-jazyk-clojure-synteza-proceduralnich-textur-s-vyuzitim-knihovny-clisk-dokonceni/ - Pixie: lehký skriptovací jazyk s „kouzelnými“ schopnostmi
https://www.root.cz/clanky/pixie-lehky-skriptovaci-jazyk-s-kouzelnymi-schopnostmi/ - Programovací jazyk Pixie: funkce ze základní knihovny a použití FFI
https://www.root.cz/clanky/programovaci-jazyk-pixie-funkce-ze-zakladni-knihovny-a-pouziti-ffi/ - Novinky v Clojure verze 1.9.0
https://www.root.cz/clanky/novinky-v-clojure-verze-1–9–0/ - Validace dat s využitím knihovny spec v Clojure 1.9.0
https://www.root.cz/clanky/validace-dat-s-vyuzitim-knihovny-spec-v-clojure-1–9–0/ - Použití jazyka Gherkin při tvorbě testovacích scénářů pro aplikace psané v Clojure
https://www.root.cz/clanky/pouziti-jazyka-gherkin-pri-tvorbe-testovacich-scenaru-pro-aplikace-psane-v-nbsp-clojure/ - Použití jazyka Gherkin při tvorbě testovacích scénářů pro aplikace psané v Clojure (2)
https://www.root.cz/clanky/pouziti-jazyka-gherkin-pri-tvorbe-testovacich-scenaru-pro-aplikace-psane-v-nbsp-clojure-2/ - Incanter: prostředí pro statistické výpočty s grafickým výstupem založené na Clojure
https://www.root.cz/clanky/incanter-prostredi-pro-statisticke-vypocty-s-grafickym-vystupem-zalozene-na-clojure/ - Incanter: operace s maticemi
https://www.root.cz/clanky/incanter-operace-s-maticemi/ - Pixie: lehký skriptovací jazyk s „kouzelnými“ schopnostmi
https://www.root.cz/clanky/pixie-lehky-skriptovaci-jazyk-s-kouzelnymi-schopnostmi/ - Programovací jazyk Pixie: funkce ze základní knihovny a použití FFI
https://www.root.cz/clanky/programovaci-jazyk-pixie-funkce-ze-zakladni-knihovny-a-pouziti-ffi/ - Novinky v Clojure verze 1.9.0
https://www.root.cz/clanky/novinky-v-clojure-verze-1–9–0/ - Validace dat s využitím knihovny spec v Clojure 1.9.0
https://www.root.cz/clanky/validace-dat-s-vyuzitim-knihovny-spec-v-clojure-1–9–0/ - Použití jazyka Gherkin při tvorbě testovacích scénářů pro aplikace psané v Clojure
https://www.root.cz/clanky/pouziti-jazyka-gherkin-pri-tvorbe-testovacich-scenaru-pro-aplikace-psane-v-nbsp-clojure/ - Použití jazyka Gherkin při tvorbě testovacích scénářů pro aplikace psané v Clojure (2)
https://www.root.cz/clanky/pouziti-jazyka-gherkin-pri-tvorbe-testovacich-scenaru-pro-aplikace-psane-v-nbsp-clojure-2/ - Interpret programovacího jazyka Clojure integrovaný do Jupyter Notebooku
https://www.root.cz/clanky/interpret-programovaciho-jazyka-clojure-integrovany-do-jupyter-notebooku/ - Babashka: interpret Clojure určený pro rychlé spouštění utilit z příkazového řádku
https://www.root.cz/clanky/babashka-interpret-clojure-urceny-pro-rychle-spousteni-utilit-z-prikazoveho-radku/ - Pokročilý streaming založený na Apache Kafce, jazyku Clojure a knihovně Jackdaw
https://www.root.cz/clanky/pokrocily-streaming-zalozeny-na-apache-kafce-jazyku-clojure-a-knihovne-jackdaw/
20. Odkazy na Internetu
- ETL Batch Processing With Kafka?
https://medium.com/swlh/etl-batch-processing-with-kafka-7f66f843e20d - ETL with Kafka
https://blog.codecentric.de/en/2018/03/etl-kafka/ - On Track with Apache Kafka – Building a Streaming ETL Solution with Rail Data
https://www.confluent.io/blog/build-streaming-etl-solutions-with-kafka-and-rail-data/ - Kafka – Understanding Offset Commits
https://www.logicbig.com/tutorials/misc/kafka/committing-offsets.html - fundingcircle/jackdaw (na Clojars)
https://clojars.org/fundingcircle/jackdaw/versions/0.7.6 - Dokumentace ke knihovně jackdaw
https://cljdoc.org/d/fundingcircle/jackdaw/0.7.6/doc/readme - Jackdaw AdminClient API
https://cljdoc.org/d/fundingcircle/jackdaw/0.7.6/doc/jackdaw-adminclient-api - Jackdaw Client API
https://cljdoc.org/d/fundingcircle/jackdaw/0.7.6/doc/jackdaw-client-api - Kafka.clj
https://github.com/helins-io/kafka.clj - Použití nástroje Apache Kafka v aplikacích založených na mikroslužbách
https://www.root.cz/clanky/pouziti-nastroje-apache-kafka-v-aplikacich-zalozenych-na-mikrosluzbach/ - Apache Kafka: distribuovaná streamovací platforma
https://www.root.cz/clanky/apache-kafka-distribuovana-streamovaci-platforma/ - Real-Time Payments with Clojure and Apache Kafka (podcast)
https://www.evidentsystems.com/news/confluent-podcast-about-apache-kafka/ - Kafka and Clojure – Immutable event streams
https://practicalli.github.io/kafka-and-clojure/ - Kafka Streams, the Clojure way
https://blog.davemartin.me/posts/kafka-streams-the-clojure-way/ - dvlopt.kafka na GitHubu
https://github.com/helins-io/kafka.clj - kafka-streams-the-clojure-way na GitHubu
https://github.com/DaveWM/kafka-streams-the-clojure-way - babashka: A Clojure babushka for the grey areas of Bash
https://github.com/borkdude/babashka - Babashka and the Small Clojure Interpreter @ ClojureD 2020 (slajdy)
https://speakerdeck.com/borkdude/babashka-and-the-small-clojure-interpreter-at-clojured-2020 - Babashka: ukázky použití
https://github.com/borkdude/babashka/blob/master/doc/examples.md - clojureD 2020: „Babashka and Small Clojure Interpreter: Clojure in new contexts“ by Michiel Borkent
https://www.youtube.com/watch?v=Nw8aN-nrdEk&t=5s - Meetup #124 Babashka, implementing an nREPL server & game engines with Clojure
https://www.youtube.com/watch?v=0YmZYnwyHHc - The Last Programming Language (shrnutí vývoje programovacích jazyků)
https://www.youtube.com/watch?v=P2yr-3F6PQo - Shebang (Unix): Wikipedia EN
https://en.wikipedia.org/wiki/Shebang_(Unix) - Shebang (Unix): Wikipedia CZ
https://cs.wikipedia.org/wiki/Shebang_(Unix) - How to create Clojure notebooks in Jupyter
https://s01blog.wordpress.com/2017/12/10/how-to-create-clojure-notebooks-in-jupyter/ - Dokumentace k nástroji Conda
https://docs.conda.io/en/latest/ - Notebook interface
https://en.wikipedia.org/wiki/Notebook_interface - Jypyter: open source, interactive data science and scientific computing across over 40 programming languages
https://jupyter.org/ - Calysto Scheme
https://github.com/Calysto/calysto_scheme - scheme.py (základ projektu Calysto Scheme)
https://github.com/Calysto/calysto_scheme/blob/master/calysto_scheme/scheme.py - Humane test output for clojure.test
https://github.com/pjstadig/humane-test-output - iota
https://github.com/juxt/iota - 5 Differences between clojure.spec and Schema
https://lispcast.com/clojure.spec-vs-schema/ - Schema: Clojure(Script) library for declarative data description and validation
https://github.com/plumatic/schema - Zip archiv s Clojure 1.9.0
http://repo1.maven.org/maven2/org/clojure/clojure/1.9.0/clojure-1.9.0.zip - Clojure 1.9 is now available
https://clojure.org/news/2017/12/08/clojure19 - Deps and CLI Guide
https://clojure.org/guides/deps_and_cli - Changes to Clojure in Version 1.9
https://github.com/clojure/clojure/blob/master/changes.md - clojure.spec – Rationale and Overview
https://clojure.org/about/spec - Zip archiv s Clojure 1.8.0
http://repo1.maven.org/maven2/org/clojure/clojure/1.8.0/clojure-1.8.0.zip - Clojure 1.8 is now available
http://clojure.org/news/2016/01/19/clojure18 - Socket Server REPL
http://dev.clojure.org/display/design/Socket+Server+REPL - CLJ-1671: Clojure socket server
http://dev.clojure.org/jira/browse/CLJ-1671 - CLJ-1449: Add clojure.string functions for portability to ClojureScript
http://dev.clojure.org/jira/browse/CLJ-1449 - Launching a Socket Server
http://clojure.org/reference/repl_and_main#_launching_a_socket_server - API for clojure.string
http://clojure.github.io/clojure/branch-master/clojure.string-api.html - Clojars:
https://clojars.org/ - Seznam knihoven na Clojars:
https://clojars.org/projects - Clojure Cookbook: Templating HTML with Enlive
https://github.com/clojure-cookbook/clojure-cookbook/blob/master/07_webapps/7–11_enlive.asciidoc - An Introduction to Enlive
https://github.com/swannodette/enlive-tutorial/ - Enlive na GitHubu
https://github.com/cgrand/enlive - Expectations: příklady atd.
http://jayfields.com/expectations/ - Expectations na GitHubu
https://github.com/jaycfields/expectations - Lein-expectations na GitHubu
https://github.com/gar3thjon3s/lein-expectations - Testing Clojure With Expectations
https://semaphoreci.com/blog/2014/09/23/testing-clojure-with-expectations.html - Clojure testing TDD/BDD libraries: clojure.test vs Midje vs Expectations vs Speclj
https://www.reddit.com/r/Clojure/comments/1viilt/clojure_testing_tddbdd_libraries_clojuretest_vs/ - Testing: One assertion per test
http://blog.jayfields.com/2007/06/testing-one-assertion-per-test.html - Rewriting Your Test Suite in Clojure in 24 hours
http://blog.circleci.com/rewriting-your-test-suite-in-clojure-in-24-hours/ - Clojure doc: zipper
http://clojuredocs.org/clojure.zip/zipper - Clojure doc: parse
http://clojuredocs.org/clojure.xml/parse - Clojure doc: xml-zip
http://clojuredocs.org/clojure.zip/xml-zip - Clojure doc: xml-seq
http://clojuredocs.org/clojure.core/xml-seq - Parsing XML in Clojure
https://github.com/clojuredocs/guides - Clojure Zipper Over Nested Vector
https://vitalyper.wordpress.com/2010/11/23/clojure-zipper-over-nested-vector/ - Understanding Clojure's PersistentVector implementation
http://blog.higher-order.net/2009/02/01/understanding-clojures-persistentvector-implementation - Understanding Clojure's PersistentHashMap (deftwice…)
http://blog.higher-order.net/2009/09/08/understanding-clojures-persistenthashmap-deftwice.html - Assoc and Clojure's PersistentHashMap: part ii
http://blog.higher-order.net/2010/08/16/assoc-and-clojures-persistenthashmap-part-ii.html - Ideal Hashtrees (paper)
http://lampwww.epfl.ch/papers/idealhashtrees.pdf - Clojure home page
http://clojure.org/ - Clojure (downloads)
http://clojure.org/downloads - Clojure Sequences
http://clojure.org/sequences - Clojure Data Structures
http://clojure.org/data_structures - The Structure and Interpretation of Computer Programs: 2.2.1 Representing Sequences
http://mitpress.mit.edu/sicp/full-text/book/book-Z-H-15.html#%_sec2.2.1 - The Structure and Interpretation of Computer Programs: 3.3.1 Mutable List Structure
http://mitpress.mit.edu/sicp/full-text/book/book-Z-H-22.html#%_sec3.3.1 - Clojure – Functional Programming for the JVM
http://java.ociweb.com/mark/clojure/article.html - Clojure quick reference
http://faustus.webatu.com/clj-quick-ref.html - 4Clojure
http://www.4clojure.com/ - ClojureDoc (rozcestník s dokumentací jazyka Clojure)
http://clojuredocs.org/ - Clojure (na Wikipedia EN)
http://en.wikipedia.org/wiki/Clojure - Clojure (na Wikipedia CS)
http://cs.wikipedia.org/wiki/Clojure - SICP (The Structure and Interpretation of Computer Programs)
http://mitpress.mit.edu/sicp/ - Pure function
http://en.wikipedia.org/wiki/Pure_function - Funkcionální programování
http://cs.wikipedia.org/wiki/Funkcionální_programování - Čistě funkcionální (datové struktury, jazyky, programování)
http://cs.wikipedia.org/wiki/Čistě_funkcionální - Clojure Macro Tutorial (Part I, Getting the Compiler to Write Your Code For You)
http://www.learningclojure.com/2010/09/clojure-macro-tutorial-part-i-getting.html - Clojure Macro Tutorial (Part II: The Compiler Strikes Back)
http://www.learningclojure.com/2010/09/clojure-macro-tutorial-part-ii-compiler.html - Clojure Macro Tutorial (Part III: Syntax Quote)
http://www.learningclojure.com/2010/09/clojure-macro-tutorial-part-ii-syntax.html - Tech behind Tech: Clojure Macros Simplified
http://techbehindtech.com/2010/09/28/clojure-macros-simplified/ - Fatvat – Exploring functional programming: Clojure Macros
http://www.fatvat.co.uk/2009/02/clojure-macros.html - Eulerovo číslo
http://cs.wikipedia.org/wiki/Eulerovo_číslo - List comprehension
http://en.wikipedia.org/wiki/List_comprehension - List Comprehensions in Clojure
http://asymmetrical-view.com/2008/11/18/list-comprehensions-in-clojure.html - Clojure Programming Concepts: List Comprehension
http://en.wikibooks.org/wiki/Clojure_Programming/Concepts#List_Comprehension - Clojure core API: for macro
http://clojure.github.com/clojure/clojure.core-api.html#clojure.core/for - cirrus machina – The Clojure for macro
http://www.cirrusmachina.com/blog/comment/the-clojure-for-macro/ - Riastradh's Lisp Style Rules
http://mumble.net/~campbell/scheme/style.txt - Dynamic Languages Strike Back
http://steve-yegge.blogspot.cz/2008/05/dynamic-languages-strike-back.html - Scripting: Higher Level Programming for the 21st Century
http://www.tcl.tk/doc/scripting.html - Java Virtual Machine Support for Non-Java Languages
http://docs.oracle.com/javase/7/docs/technotes/guides/vm/multiple-language-support.html - Třída java.lang.String
http://docs.oracle.com/javase/7/docs/api/java/lang/String.html - Třída java.lang.StringBuffer
http://docs.oracle.com/javase/7/docs/api/java/lang/StringBuffer.html - Třída java.lang.StringBuilder
http://docs.oracle.com/javase/7/docs/api/java/lang/StringBuilder.html - StringBuffer versus String
http://www.javaworld.com/article/2076072/build-ci-sdlc/stringbuffer-versus-string.html - Threading macro (dokumentace k jazyku Clojure)
https://clojure.github.io/clojure/clojure.core-api.html#clojure.core/-> - Understanding the Clojure → macro
http://blog.fogus.me/2009/09/04/understanding-the-clojure-macro/ - clojure.inspector
http://clojure.github.io/clojure/clojure.inspector-api.html - The Clojure Toolbox
http://www.clojure-toolbox.com/ - Unit Testing in Clojure
http://nakkaya.com/2009/11/18/unit-testing-in-clojure/ - Testing in Clojure (Part-1: Unit testing)
http://blog.knoldus.com/2014/03/22/testing-in-clojure-part-1-unit-testing/ - API for clojure.test – Clojure v1.6 (stable)
https://clojure.github.io/clojure/clojure.test-api.html - Leiningen: úvodní stránka
http://leiningen.org/ - Leiningen: Git repository
https://github.com/technomancy/leiningen - leiningen-win-installer
http://leiningen-win-installer.djpowell.net/ - Clojure.org: Vars and the Global Environment
http://clojure.org/Vars - Clojure.org: Refs and Transactions
http://clojure.org/Refs - Clojure.org: Atoms
http://clojure.org/Atoms - Clojure.org: Agents as Asynchronous Actions
http://clojure.org/agents - Transient Data Structures
http://clojure.org/transients