Obsah
1. Durable Queue aneb implementace front zpráv bez použití klasického message brokera
2. Rozdíly mezi Durable Queue a dalšími implementacemi front zpráv
3. Instalace knihovny Durable Queue
4. Kostra aplikace používající knihovnu Durable Queue
5. Vložení zprávy do fronty, přečtení zprávy a změna stavu zprávy
6. První demonstrační příklad: inicializace knihovny Durable Queue
7. Druhý demonstrační příklad: použití funkcí put! a take!
8. Třetí demonstrační příklad: použití dvojice front, blokující čtení zpráv
9. Čtvrtý demonstrační příklad: potvrzení zpracování zpráv funkcí complete!
10. Informace o tom, že zpráva nebyla zpracována a má být znovu vložena do fronty: funkce retry!
11. Omezení kapacity fronty na zadaný maximální počet zpráv
12. Implementace jednoduchého workera běžícího v samostatném vlákně
14. Speciální zpráva určená pro ukončení práce workera
16. Složitější příklad s větším počtem paralelně pracujících workerů
18. Message broker RabbitMQ a programovací jazyk Clojure
19. Repositář s demonstračními příklady
1. Durable Queue aneb implementace front zpráv bez použití klasického message brokera
V dnešní části seriálu o message brokerech se seznámíme se zajímavou knihovnou nazvanou Durable Queue. Jedná se o knihovnu určenou zejména pro ty vývojáře, kteří pracují s programovacím jazykem Clojure. Tato knihovna vývojářům nabízí možnost použití front zpráv v rámci jedné aplikace (přesněji řečeno v rámci jednoho běžícího procesu), ovšem s tím velmi důležitým dodatkem, že fronty (a pochopitelně tedy i jejich obsah – zprávy a jejich stav) bez problémů přežijí i případný restart či pád aplikace, protože jejich obsah je uložen a vhodným způsobem synchronizován na souborovém systému v nakonfigurovaném adresáři. Navíc je možné specifikovat i způsob synchronizace obsahu front s obsahem těchto souborů, takže si uživatel může zvolit očekávané chování aplikace na škále „nejrychlejší“–„nejbezpečnější“.
V dalším textu se nejprve budeme zabývat popisem možností knihovny Durable Queue a posléze si připomeneme i některé základní možnosti nabízené knihovnou core.anync a taktéž způsobem propojení programů psaných v jazyku Clojure s message brokerem RabbitMQ s využitím knihovny Langohr.
2. Rozdíly mezi Durable Queue a dalšími implementacemi front zpráv
V případě, že budeme chtít použít nějakou formu fronty zpráv popř. fronty úloh v aplikacích psaných v programovacím jazyku Clojure, máme v principu k dispozici čtyři různé způsoby, které se od sebe odlišují především tím, zda jsou fronty přímo součástí aplikace, zda „přežijí“ restart aplikace a zda jsou popř. fronty řešeny samostatným message brokerem (který navíc může běžet na jiném počítači):
- První způsob spočívá v použití fronty implementované přímo nějakým základním datovým typem programovacího jazyka Clojure. Taková fronta je potom sdílena mezi jednotlivými vlákny aplikace (přičemž typicky některá vlákna implementují producenty zpráv a další vlákna konzumenty). Jedná se o nejjednodušší řešení, protože není nutné instalovat a nastavovat message brokera, fronty jsou uloženy jen v operační paměti, takže nenastávají problémy s právy v souborovém systému atd. Navíc se pochopitelně jedná o řešení velmi rychlé. Ovšem i nevýhody jsou zřejmé – celé škálování se provádí v rámci jednoho procesu a případnou persistenci dat musí explicitně zajistit vývojář (a to není zdaleka tak jednoduché, jak by se možná na první pohled mohlo zdát).
- Explicitní použití fronty popsané v předchozím bodu je podle mého názoru zbytečně složité a může vést k potenciálním problémům při sdílení dat, odstraňování zpráv z front atd. atd. Proto může být v případě, že fronty budeme potřebovat použít pouze v rámci jednoho procesu, lepší použít funkce a makra nabízené standardní knihovnou nazvanoucore.async, která programátorům nabízí podobnou funkcionalitu, jako gorutiny a kanály v programovacím jazyku Go. Toto řešení může být pro většinu řešených problémů takřka ideální (navíc i zhruba stejně rychlé), ovšem s jedním dodatkem – o případnou persistenci front a zpráv v nich uložených se opět musí nějakým vhodným způsobem postaral vývojář aplikace, což může vést k potenciálním problémům.
- Třetí možnost pravděpodobně využijí především ti programátoři, kteří sice potřebují použít fronty v rámci jednoho procesu, ovšem s tím, že by zprávy měly přežít i případný restart aplikace. Jinými slovy – obsah front musí být vhodným způsobem uložen na souborovém systému. Tato možnost je implementována v projektu (knihovně) Durable Queue, který si popíšeme v dnešním článku. Již na tomto místě je však vhodné upozornit na to, že se nejedná o implementaci klasického message brokera, ale jen jeho lokální varianty použitelné pro jednu aplikaci (ovšem naopak tato aplikace může sama o sobě být message brokerem, pokud je to zapotřebí).
- Samozřejmě ale můžeme použít i běžné message brokery, které se o správu front postarají samy. Aplikace psaná v Clojure se k těmto brokerům většinou připojí s využitím nějakého standardního protokolu, který se v této oblasti prosadil. Může se jednat například o protokol STOMP, MQTT či AMQP (různé verze). S tímto konceptem jsme se ostatně již jednou setkali, a to konkrétně při popisu message brokera RabbitMQ a knihovny Langohr určené právě pro programovací jazyk Clojure. Ovšem ve skutečnosti jsou možnosti Clojure (podobně jako i dalších programovacích jazyků) mnohem větší, protože můžeme použít ty message brokery, které jsou dodávány/nabízeny jako služba. Příkladem může být SQS (Simple Queue Service), které je součástí AWS (Amazon Web Services) či IBM MQ. Toto řešení je z pohledu architektury aplikace většinou nejlepší, může dobře škálovat, ovšem pro některé úlohy bude zbytečně pomalé.
3. Instalace knihovny Durable Queue
Popišme si nyní způsob instalace knihovny Durable Queue.
Na úplném začátku si připravíme kostru projektu, který bude představovat nový projekt. Pro vytvoření této kostry použijeme Leiningen. Kostra projektu se při použití Leiningenu vytvoří příkazem:
$ lein new app durable-queue-1 Generating a project called durable-queue-1 based on the 'app' template.
Výsledkem tohoto příkazu by měla být následující adresářová struktura (obsah se může nepatrně odlišovat podle verze Leiningenu, ovšem samotná kostra bude stejná):
. └── durable-queue-1 ├── doc │ └── intro.md ├── LICENSE ├── project.clj ├── README.md ├── resources ├── src │ └── durable_queue_1 │ └── core.clj └── test └── durable_queue_1 └── core_test.clj
V dalším kroku přistoupíme k úpravám projektového souboru project.clj. Po vytvoření nového projektu by projektový soubor měl vypadat přibližně takto (pouze si pro jistotu zkontrolujte verzi interpretru jazyka Clojure; minimální požadovaná verze je 1.8.0):
(defproject durable-queue-1 "0.1.0-SNAPSHOT" :description "FIXME: write description" :url "http://example.com/FIXME" :license {:name "Eclipse Public License" :url "http://www.eclipse.org/legal/epl-v10.html"} :dependencies [[org.clojure/clojure "1.8.0"]] :main ^:skip-aot durable-queue-1.core :target-path "target/%s" :profiles {:uberjar {:aot :all}})
Úprava projektového souboru spočívá v přidání informace o tom, že se v projektu bude používat knihovna durable-queue verze 1.8.0:
(defproject durable-queue-1 "0.1.0-SNAPSHOT" :description "FIXME: write description" :url "http://example.com/FIXME" :license {:name "Eclipse Public License" :url "http://www.eclipse.org/legal/epl-v10.html"} :dependencies [[org.clojure/clojure "1.8.0"] [factual/durable-queue "0.1.5"]] :main ^:skip-aot durable-queue-1.core :target-path "target/%s" :profiles {:uberjar {:aot :all}})
Posledním krokem konfigurace projektu je spuštění příkazu:
$ lein deps Retrieving factual/durable-queue/0.1.5/durable-queue-0.1.5.pom from clojars Retrieving factual/durable-queue/0.1.5/durable-queue-0.1.5.jar from clojars
Tento příkaz zajistí, že se do adresáře ~/.m2/ stáhnou všechny potřebné knihovny, tj. jak durable-queue, tak i její závislosti.
4. Kostra aplikace používající knihovnu Durable Queue
Ukažme si tedy kostru aplikace, která bude používat frontu, jejíž obsah bude uložen v souboru umístěném v adresáři „/tmp“ (můžete si pochopitelně zvolit i odlišné umístění, teoreticky i na síťovém souborovém systému). Nejprve je nutné načíst všechny funkce, makra a symboly z balíčku nazvaného durable-queue. Pro jednoduchost jména všech objektů vložíme přímo do aktivního jmenného prostoru, takže nebudeme muset používat prefix (neboli namísto durable-queue/stats zavoláme přímo a jednoduše stats):
(require '[durable-queue :refer :all])
Dále musíme knihovnu durable-queue inicializovat a určit, ve kterém adresáři budou uloženy soubory obsahující prvky jednotlivých front. Výsledkem úspěšné inicializace provedené funkcí queues je struktura, kterou (zde pro jednoduchost) navážeme na globální symbol q (samozřejmě si opět můžete zvolit libovolné jiné platné jméno):
(def q (queues "/tmp" {}))
(defn test [] (let [q (queues "/tmp" {})] ... ... ...))
5. Vložení zprávy do fronty, přečtení zprávy a změna stavu zprávy
Ve chvíli, kdy je knihovna durable-queue inicializována, můžeme se začít dotazovat na stav front, posílat zprávy do fronty (operace typu enqueue), vybírat zprávy z fronty (operace typu dequeue) apod. Používají se přitom tyto základní funkce, které jsou všechny definovány v rámci jmenného prostoru durable-queue:
# | Funkce | Stručný popis funkce |
---|---|---|
1 | stats | vrátí informace o stavu všech front (ve formě mapy) |
2 | put! | pošle do vybrané fronty zprávu |
3 | take! | získá z vybrané fronty zprávu (blokující či neblokující operace) |
Pro posílání zpráv, které je zajištěno funkcí put!, není zapotřebí fronty explicitně vytvářet – fronta (a k ní příslušející soubor nebo soubory) se automaticky vytvoří společně s první zprávou, která je do ní poslána. Můžeme tedy psát:
(put! q :queue-1 "zpráva")
Funkci put! jsme předali jak strukturu se stavem všech front (q), tak i jméno fronty (:queue-1) a vlastní zprávu („zpráva“). Tělo zprávy může být reprezentováno prakticky jakoukoli datovou strukturou programovacího jazyka Clojure; my ovšem pro jednoduchost použijeme buď řetězec nebo v pozdějších demonstračních příkladech raději takzvaný keyword.
Podobným způsobem lze zprávu z vybrané fronty přečíst:
(take! q :queue-1 1000 :timed-out)
Funkce take! může být buď blokující popř. může po určitém definovaném čase (timeout) vrátit programátorem specifikovanou hodnotu. První případ může být vhodný pro implementaci klasických workerů, ovšem v některých situacích není vhodné blokovat workera pro (teoreticky) nekonečný čas a proto lze funkci take! zavolat a předat jí jak maximální čas čekání specifikovaný v milisekundách, tak i hodnotu, která se vrátí v případě, že je tento čas překročen:
(take! q :queue-1 1000 :timed-out)
Předchozí volání vrátí buď hodnotu přečtenou z fronty :queue-1 nebo po jedné sekundě hodnotu :timed-out.
Taktéž se můžeme kdykoli dotázat na stav jednotlivých front, a to s využitím funkce stats:
(stats q)
Výsledkem je mapa, která vypadá takto:
{:enqueued 0, :retried 0, :completed 0, :in-progress 0, :num-slabs 1, :num-active-slabs 1}
Jednotlivé prvky v mapě mají následující význam:
# | Prvek | Stručný popis prvku |
---|---|---|
1 | :enqueued | počet zpráv vložených do fronty |
2 | :retried | počet zpráv, jejichž doručení se muselo zopakovat |
3 | :completed | počet přečtených a dokončených zpráv |
4 | :in-progress | počet zpráv nacházejících se ve stavu zpracování |
5 | :num-slabs | počet souborů s uložením stavu fronty (ve výchozím nastavení jeden) |
6 | :num-active-slabs | počet aktivních souborů |
Výsledek volání funkce take! je struktura představující získanou zprávu. Samotný text zprávy (nebo libovolná data, které byly součástí zprávy) se získají standardní funkcí defer. Ovšem ve chvíli, kdy je zpráva z fronty získána, je nutné (kdykoli později) systém informovat, zda zpráva byla zpracována či zda naopak došlo k nějaké chybě a je nutné zprávu zpracovat znovu. K tomu slouží dvojice funkcí:
# | Funkce | Stručný popis funkce |
---|---|---|
1 | complete! | dokončení zpracování zprávy |
2 | retry! | zpracování se nepovedlo, zprávu je nutno doručit znovu |
V případě, že se zavolá funkce retry!, bude zpráva znovu vložena do fronty, a to na její konec (tj. nebude zpracována hned, ale obecně mnohem později, pochopitelně v závislosti na počtu dalších zpráv ve frontě).
6. První demonstrační příklad: inicializace knihovny Durable Queue
Dnešní první demonstrační příklad, který naleznete na adrese https://github.com/tisnik/message-queues-examples/tree/master/durable-queue/durable-queue-1, je velmi primitivní. Pouze v něm inicializujeme knihovnu durable-queue, nastavíme pracovní adresář, do kterého se budou ukládat soubory s prvky fronty a nakonec zavoláme výše popsanou funkci stats, která vypíše informace o stavu všech front.
(ns durable-queue-1.core (:gen-class)) (require '[durable-queue :refer :all]) (defn -main [& args] (let [q (queues "/tmp" {})] (println q) (println (stats q))))
Výsledek činnosti tohoto příkladu bude vypadat následovně:
#object[durable_queue$queues$reify__7083 0x6afd9125 durable_queue$queues$reify__7083@6afd9125] {}
Můžeme vidět, že se nejdříve vypíše informace o struktuře nesoucí všechny stavové informace o frontách (interní struktura je pro nás nezajímavá) a na druhém řádku se vypíše prázdná mapa. Pokud by již nějaká fronta či fronty byly vytvořeny, samozřejmě by se v této mapě zobrazily všechny získané informace (viz další příklad).
7. Druhý demonstrační příklad: použití funkcí put! a take!
Druhý demonstrační příklad, který lze nalézt na adrese https://github.com/tisnik/message-queues-examples/tree/master/durable-queue/durable-queue-2, již provádí větší množství operací, než příklad první. Po inicializaci pošleme dvě zprávy, přičemž obě zprávy budou poslány do stejné fronty pojmenované pro jednoduchost :queue-1:
(put! q :queue-1 "task #A") (put! q :queue-1 "task #B")
Následně jsou obě zprávy přečteny a tím pádem odstraněny z fronty:
(-> (take! q :queue-1) println) (-> (take! q :queue-1) println)
Po každé operaci se vypíšou informace o stavu všech front, a to opět pomocí nám již známé funkce stats:
(pprint (stats q))
Úplný zdrojový kód dnešního druhého demonstračního příkladu vypadá následovně:
(ns durable-queue-2.core (:gen-class)) (require '[durable-queue :refer :all]) (require '[clojure.pprint :refer :all]) (defn -main [& args] (let [q (queues "/tmp" {})] (pprint (stats q)) (put! q :queue-1 "task #A") (pprint (stats q)) (put! q :queue-1 "task #B") (pprint (stats q)) (-> (take! q :queue-1) println) (pprint (stats q)) (-> (take! q :queue-1) println) (pprint (stats q))
S výsledky:
{} {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 1, :retried 0, :completed 0, :in-progress 0}} {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 2, :retried 0, :completed 0, :in-progress 0}} < :in-progress | task #A > {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 2, :retried 0, :completed 0, :in-progress 1}} < :in-progress | task #B > {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 2, :retried 0, :completed 0, :in-progress 2}}
8. Třetí demonstrační příklad: použití dvojice front, blokující čtení zpráv
Ve třetím demonstračním příkladu použijeme dvojici front pojmenovaných :queue-1 a :queue-2. Do první z těchto front jsou vloženy dvě zprávy, do druhé fronty však pouze zpráva jediná. Ovšem posléze se budeme snažit přečíst z každé fronty dvě zprávy. To v důsledku znamená, že čtení ze druhé fronty celý proces zablokuje, což uvidíme po spuštění příkladu:
(ns durable-queue-3.core (:gen-class)) (require '[durable-queue :refer :all]) (require '[clojure.pprint :refer :all]) (defn -main [& args] (let [q (queues "/tmp" {})] (pprint (stats q)) (put! q :queue-1 "task #A") (put! q :queue-1 "task #B") (put! q :queue-2 "task #C") (pprint (stats q)) (println "Získávám dvě zprávy z front queue-1 a queue-2") (-> (take! q :queue-1) println) (-> (take! q :queue-2) println) (pprint (stats q)) (println "Získávám další dvě zprávy z front queue-1 a queue-2") (-> (take! q :queue-1) println) (-> (take! q :queue-2) println) (pprint (stats q))))
Po spuštění tohoto demonstračního příkladu se postupně začne vypisovat stav jednotlivých front, ovšem na předposledním příkazu se příklad „zasekne“, protože jsme použili blokující čtení ve chvíli, kdy je druhá fronta prázdná a nikdo do ní již další prvky (zprávy) nevloží:
{} {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 2, :retried 0, :completed 0, :in-progress 0}, "queue_2" {:num-slabs 1, :num-active-slabs 1, :enqueued 1, :retried 0, :completed 0, :in-progress 0}} Získávám dvě zprávy z front queue-1 a queue-2 < :in-progress | task #A > < :in-progress | task #C > {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 2, :retried 0, :completed 0, :in-progress 1}, "queue_2" {:num-slabs 1, :num-active-slabs 1, :enqueued 1, :retried 0, :completed 0, :in-progress 1}} Získávám další dvě zprávy z front queue-1 a queue-2 < :in-progress | task #B >
9. Čtvrtý demonstrační příklad: potvrzení zpracování zpráv funkcí complete!
Čtvrtý příklad již vychází z reálných projektů. Je v něm definována funkce nazvaná deque-and-complete-message, která nejprve načte zprávu ze zvolené fronty a následně potvrdí přijetí a zpracování zprávy funkcí complete!:
(defn deque-and-complete-message [q queue-name] (let [message (take! q queue-name)] (println "Message dequeued" message) (complete! message) (println "Message completed" message)))
Po spuštění tohoto příkladu si povšimněte, jak se postupně mění obsah čítačů :completed u obou sledovaných front:
{"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 2, :retried 0, :completed 0, :in-progress 0}, "queue_2" {:num-slabs 1, :num-active-slabs 1, :enqueued 1, :retried 0, :completed 0, :in-progress 0}} Získávám a kompletuji dvě zprávy z front queue-1 a queue-2 Message dequeued < :in-progress | task #A > Message completed < :complete | task #A > Message dequeued < :in-progress | task #C > Message completed < :complete | task #C > {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 2, :retried 0, :completed 1, :in-progress 0}, "queue_2" {:num-slabs 1, :num-active-slabs 1, :enqueued 1, :retried 0, :completed 1, :in-progress 0}} Získávám a kompletuji další dvě zprávy z front queue-1 a queue-2 Message dequeued < :in-progress | task #B > Message completed < :complete | task #B >
Následuje výpis kompletního zdrojového kódu tohoto demonstračního příkladu:
(ns durable-queue-4.core (:gen-class)) (require '[durable-queue :refer :all]) (require '[clojure.pprint :refer :all]) (defn deque-and-complete-message [q queue-name] (let [message (take! q queue-name)] (println "Message dequeued" message) (complete! message) (println "Message completed" message))) (defn -main [& args] (let [q (queues "/tmp" {})] (pprint (stats q)) (put! q :queue-1 "task #A") (put! q :queue-1 "task #B") (put! q :queue-2 "task #C") (pprint (stats q)) (println "Získávám a kompletuji dvě zprávy z front queue-1 a queue-2") (deque-and-complete-message q :queue-1) (deque-and-complete-message q :queue-2) (pprint (stats q)) (println "Získávám a kompletuji další dvě zprávy z front queue-1 a queue-2") (deque-and-complete-message q :queue-1) (deque-and-complete-message q :queue-2) (pprint (stats q))))
10. Informace o tom, že zpráva nebyla zpracována a má být znovu vložena do fronty: funkce retry!
Pátý příklad se podobá příkladu předchozímu, ovšem namísto potvrzení zpracování zprávy funkcí complete! si naopak vyžádáme znovuposlání a tím pádem i opakované zpracování zprávy. Pro tento účel slouží funkce retry!:
(defn deque-and-retry-message [q queue-name] (let [message (take! q queue-name)] (println "Message dequeued" message) (retry! message) (println "Message completed" message)))
Ze záznamu činnosti tohoto příkladu po spuštění si povšimněte, jak postupně roste hodnota v čítačích :retried a nikoli :completed:
{} {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 2, :retried 0, :completed 0, :in-progress 0}, "queue_2" {:num-slabs 1, :num-active-slabs 1, :enqueued 1, :retried 0, :completed 0, :in-progress 0}} Získávám dvě zprávy z front queue-1 a queue-2, které budou uloženy zpět Message dequeued < :in-progress | task #A > Message completed < :incomplete | task #A > Message dequeued < :in-progress | task #C > Message completed < :incomplete | task #C > {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 2, :retried 1, :completed 0, :in-progress 0}, "queue_2" {:num-slabs 1, :num-active-slabs 1, :enqueued 1, :retried 1, :completed 0, :in-progress 0}} Získávám další dvě zprávy z front queue-1 a queue-2, které budou uloženy zpět Message dequeued < :in-progress | task #B > Message completed < :incomplete | task #B > Message dequeued < :in-progress | task #C > Message completed < :incomplete | task #C > {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 2, :retried 2, :completed 0, :in-progress 0}, "queue_2" {:num-slabs 1, :num-active-slabs 1, :enqueued 1, :retried 2, :completed 0, :in-progress 0}}
Opět si ukažme úplný zdrojový kód tohoto příkladu:
(ns durable-queue-5.core (:gen-class)) (require '[durable-queue :refer :all]) (require '[clojure.pprint :refer :all]) (defn deque-and-retry-message [q queue-name] (let [message (take! q queue-name)] (println "Message dequeued" message) (retry! message) (println "Message completed" message))) (defn -main [& args] (let [q (queues "/tmp" {})] (pprint (stats q)) (put! q :queue-1 "task #A") (put! q :queue-1 "task #B") (put! q :queue-2 "task #C") (pprint (stats q)) (println "Získávám dvě zprávy z front queue-1 a queue-2, které budou uloženy zpět") (deque-and-retry-message q :queue-1) (deque-and-retry-message q :queue-2) (pprint (stats q)) (println "Získávám další dvě zprávy z front queue-1 a queue-2, které budou uloženy zpět") (deque-and-retry-message q :queue-1) (deque-and-retry-message q :queue-2) (pprint (stats q))))
11. Omezení kapacity fronty na zadaný maximální počet zpráv
Při inicializaci knihovny Durable Queue můžeme mj. specifikovat i maximální počet zpráv, které je možné uložit do front(y). V takovém okamžiku se kromě funkce take! stane i funkce put! blokující, tj. její provádění se zastaví ve chvíli, kdy je fronta plná a funkce se dokončí až tehdy, pokud se z fronty přečte alespoň jeden prvek:
(let [q (queues "/tmp" {:max-queue-size 10})] ... ... ...)
Pokud se budeme snažit do takové fronty (s kapacitou deseti zpráv) vložit dvacet zpráv, zápis se zablokuje po desáté zprávě:
{} {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 1, :retried 0, :completed 0, :in-progress 0}} {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 2, :retried 0, :completed 0, :in-progress 0}} {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 3, :retried 0, :completed 0, :in-progress 0}} {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 4, :retried 0, :completed 0, :in-progress 0}} {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 5, :retried 0, :completed 0, :in-progress 0}} {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 6, :retried 0, :completed 0, :in-progress 0}} {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 7, :retried 0, :completed 0, :in-progress 0}} {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 8, :retried 0, :completed 0, :in-progress 0}} {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 9, :retried 0, :completed 0, :in-progress 0}} {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 10, :retried 0, :completed 0, :in-progress 0}}
Úplný zdrojový kód tohoto demonstračního příkladu vypadá následovně:
(ns durable-queue-6.core (:gen-class)) (require '[durable-queue :refer :all]) (require '[clojure.pprint :refer :all]) (defn deque-and-retry-message [q queue-name] (let [message (take! q queue-name)] (println "Message dequeued" message) (retry! message) (println "Message completed" message))) (defn -main [& args] (let [q (queues "/tmp" {:max-queue-size 10})] (pprint (stats q)) (doseq [i (range 20)] (put! q :queue-1 (str "task #" i)) (pprint (stats q)))))
12. Implementace jednoduchého workera běžícího v samostatném vlákně
Podívejme se nyní na způsob implementace jednoduchého workera. Může se jednat o funkci, která postupně vybírá zprávy z vybrané fronty a nějak je zpracovává. V našem případě bude zpracování simulováno voláním funkce (sleep 2000):
(defn worker [q queue-name] (println "Worker started") (while true (let [message (take! q queue-name)] (println "Worker received message" (deref message)) (complete! message) (sleep 2000) (println "Worker completed message" (deref message)))))
Workera spustíme v samostatném vláknu a pochopitelně mu musíme předat jak strukturu q, tak i jméno fronty, z níž má vybírat zprávy:
(.start (Thread. (fn [] (worker q :queue-1))))
V hlavním vláknu naopak budeme vytvářet zprávy, ovšem s vyšší frekvencí – přibližně jednu zprávu za sekundu:
(doseq [i (range 10)] (println "Enqueuing task #" i) (put! q :queue-1 (str "task #" i)) (pprint (stats q)) (sleep 1000))))
Úplný zdrojový kód příkladu s workerem může vypadat následovně:
(ns durable-queue-7.core (:gen-class)) (require '[durable-queue :refer :all]) (require '[clojure.pprint :refer :all]) (defn sleep [amount] (Thread/sleep amount)) (defn worker [q queue-name] (println "Worker started") (while true (let [message (take! q queue-name)] (println "Worker received message" (deref message)) (complete! message) (sleep 2000) (println "Worker completed message" (deref message))))) (defn -main [& args] (let [q (queues "/tmp" {:max-queue-size 10})] (pprint (stats q)) (println "Starting worker") (.start (Thread. (fn [] (worker q :queue-1)))) (doseq [i (range 10)] (println "Enqueuing task #" i) (put! q :queue-1 (str "task #" i)) (pprint (stats q)) (sleep 1000))))
13. Ukázka spuštění workera
Předchozí příklad po spuštění bude vypisovat zprávy typické pro systém typu producent-konzument nebo producent-worker. Celkem se zpracuje dvacet zpráv:
{} Starting worker Worker started Enqueuing task # 0 {"queue_1" Worker received message task #0 {:num-slabs 1, :num-active-slabs 1, :enqueued 1, :retried 0, :completed 0, :in-progress 1}} Enqueuing task # 1 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 2, :retried 0, :completed 1, :in-progress 0}} Worker completed message task #0 Worker received message task #1 Enqueuing task # 2 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 3, :retried 0, :completed 2, :in-progress 0}} Enqueuing task # 3 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 4, :retried 0, :completed 2, :in-progress 0}} Worker completed message task #1 Worker received message task #2 Enqueuing task # 4 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 5, :retried 0, :completed 3, :in-progress 0}} Enqueuing task # 5 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 6, :retried 0, :completed 3, :in-progress 0}} Worker completed message task #2 Worker received message task #3 Enqueuing task # 6 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 7, :retried 0, :completed 4, :in-progress 0}} Enqueuing task # 7 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 8, :retried 0, :completed 4, :in-progress 0}} Worker completed message task #3 Worker received message task #4 Enqueuing task # 8 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 9, :retried 0, :completed 5, :in-progress 0}} Enqueuing task # 9 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 10, :retried 0, :completed 5, :in-progress 0}} Worker completed message task #4 Worker received message task #5 Worker completed message task #5 Worker received message task #6 Worker completed message task #6 Worker received message task #7 Worker completed message task #7 Worker received message task #8 Worker completed message task #8 Worker received message task #9
14. Speciální zpráva určená pro ukončení práce workera
Pokud budeme chtít mít možnost nějakým způsobem workera ukončit, můžeme pro tuto operaci použít speciální zprávu. Může se jednat například o zprávu s tělem :exit. Samotný kód workera je v tomto případě nutné nepatrně upravit a namísto smyčky while použít konstrukci loop-recur:
(defn worker [q queue-name] (println "Worker started") (loop [] (let [message (take! q queue-name) value (deref message)] (if (= value :exit) (println "Stopping worker") (do (println "Worker received message" value) (complete! message) (sleep 2000) (println "Worker completed message" value) (recur))))))
Naplánování deseti úloh ukončených příkazem :exit je v tomto případě triviální:
(doseq [i (range 10)] (println "Enqueuing task #" i) (put! q :queue-1 (str "task #" i)) (pprint (stats q)) (sleep 1000)) (println "Enqueuing task to stop worker") (put! q :queue-1 :exit) (println "All tasks has been scheduled")))
Celý příklad bude vypadat takto:
(ns durable-queue-8.core (:gen-class)) (require '[durable-queue :refer :all]) (require '[clojure.pprint :refer :all]) (defn sleep [amount] (Thread/sleep amount)) (defn worker [q queue-name] (println "Worker started") (loop [] (let [message (take! q queue-name) value (deref message)] (if (= value :exit) (println "Stopping worker") (do (println "Worker received message" value) (complete! message) (sleep 2000) (println "Worker completed message" value) (recur)))))) (defn -main [& args] (let [q (queues "/tmp" {:max-queue-size 10})] (pprint (stats q)) (println "Starting worker") (.start (Thread. (fn [] (worker q :queue-1)))) (doseq [i (range 10)] (println "Enqueuing task #" i) (put! q :queue-1 (str "task #" i)) (pprint (stats q)) (sleep 1000)) (println "Enqueuing task to stop worker") (put! q :queue-1 :exit) (println "All tasks has been scheduled")))
15. Ukázka spuštění workera
Samozřejmě si opět ukážeme, jak bude vypadat spuštění příkladu s jedním workerem ukončovaným speciální zprávou :exit:
{} Starting worker Enqueuing task #0 Worker started Worker received message task #0 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 1, :retried 0, :completed 0, :in-progress 1}} Enqueuing task # 1 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 2, :retried 0, :completed 1, :in-progress 0}} Worker completed message task #0 Worker received message task #1 Enqueuing task # 2 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 3, :retried 0, :completed 2, :in-progress 0}} Enqueuing task # 3 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 4, :retried 0, :completed 2, :in-progress 0}} Worker completed message task #1 Worker received message task #2 Enqueuing task # 4 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 5, :retried 0, :completed 3, :in-progress 0}} Enqueuing task # 5 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 6, :retried 0, :completed 3, :in-progress 0}} Worker completed message task #2 Worker received message task #3 Enqueuing task # 6 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 7, :retried 0, :completed 4, :in-progress 0}} Enqueuing task # 7 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 8, :retried 0, :completed 4, :in-progress 0}} Worker completed message task #3 Worker received message task #4 Enqueuing task # 8 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 9, :retried 0, :completed 5, :in-progress 0}} Enqueuing task # 9 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 10, :retried 0, :completed 5, :in-progress 0}} Worker completed message task #4 Worker received message task #5 Enqueuing task to stop worker All tasks has been scheduled Worker completed message task #5 Worker received message task #6 Worker completed message task #6 Worker received message task #7 Worker completed message task #7 Worker received message task #8 Worker completed message task #8 Worker received message task #9 Worker completed message task #9 Stopping worker
16. Složitější příklad s větším počtem paralelně pracujících workerů
Dnešní devátý a současně i poslední příklad již bude složitější, protože v něm vytvoříme trojici paralelně pracujících workerů, přičemž každý z nich bude zpracovávat zprávy z vlastní fronty:
(def queue-names [:queue-1 :queue-2 :queue-3])
Spuštění tří workerů, každého v samostatném vláknu a pro vlastní frontu:
(doseq [queue queue-names] (.start (Thread. (fn [] (worker q queue)))))
Naplánování práce pro všechny workery, včetně speciální zprávy :exit:
(doseq [queue queue-names] (doseq [i (range 10)] (println "Enqueuing task #" i) (put! q queue (str "task #" i)) (pprint (stats q)) (sleep 500)) (println "Enqueuing task to stop worker subscribed to queue" (name queue)) (put! q queue :exit))
Následuje výpis úplného zdrojového kódu dnešního posledního demonstračního příkladu:
(ns durable-queue-9.core (:gen-class)) (require '[durable-queue :refer :all]) (require '[clojure.pprint :refer :all]) (defn sleep [amount] (Thread/sleep amount)) (defn worker [q queue-name] (println "Worker started, using queue" queue-name) (loop [] (let [message (take! q queue-name) value (deref message)] (if (= value :exit) (println "Stopping worker that use queue" (name queue-name)) (do (println "Worker received message" value "from queue" (name queue-name)) (complete! message) (sleep 2000) (println "Worker completed message" value "from queue" (name queue-name)) (recur)))))) (def queue-names [:queue-1 :queue-2 :queue-3]) (defn -main [& args] (let [q (queues "/tmp" {:max-queue-size 10})] (pprint (stats q)) (println "Starting workers") (doseq [queue queue-names] (.start (Thread. (fn [] (worker q queue))))) (doseq [queue queue-names] (doseq [i (range 10)] (println "Enqueuing task #" i) (put! q queue (str "task #" i)) (pprint (stats q)) (sleep 500)) (println "Enqueuing task to stop worker subscribed to queue" (name queue)) (put! q queue :exit)) (println "All tasks has been scheduled")))
17. Ukázka spuštění workerů
Na závěr si ukažme, jak se budou workeři chovat po svém spuštění a při paralelním zpracování zpráv:
{} Starting workers Enqueuing task # Worker started, using queue :queue-1 Worker started, using queue :queue-3 Worker started, using queue :queue-2 0 {"queue_1" Worker received message task #0 from queue queue-1 {:num-slabs 1, :num-active-slabs 1, :enqueued 1, :retried 0, :completed 0, :in-progress 1}} Enqueuing task # 1 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 2, :retried 0, :completed 1, :in-progress 0}} Enqueuing task to stop worker subscribed to queue queue-1 ... ... ... {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 11, :retried 0, :completed 6, :in-progress 0}, "queue_2" {:num-slabs 1, :num-active-slabs 1, :enqueued 11, :retried 0, :completed 4, :in-progress 0}, "queue_3" {:num-slabs 1, :num-active-slabs 1, :enqueued 3, :retried 0, :completed 1, :in-progress 0}} Worker completed message task #5 from queue queue-1 Worker received message task #6 from queue queue-1 Enqueuing task # 3 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 11, :retried 0, :completed 7, :in-progress 0}, "queue_2" {:num-slabs 1, :num-active-slabs 1, :enqueued 11, :retried 0, :completed 4, :in-progress 0}, "queue_3" {:num-slabs 1, :num-active-slabs 1, :enqueued 4, :retried 0, :completed 1, :in-progress 0}} Worker completed message task #3 from queue queue-2 Worker received message task #4 from queue queue-2 Enqueuing task # 6 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 11, :retried 0, :completed 7, :in-progress 0}, "queue_2" {:num-slabs 1, :num-active-slabs 1, :enqueued 11, :retried 0, :completed 5, :in-progress 0}, "queue_3" {:num-slabs 1, :num-active-slabs 1, :enqueued 7, :retried 0, :completed 2, :in-progress 0}} Worker completed message task #6 from queue queue-1 Worker received message task #7 from queue queue-1 Enqueuing task # 7 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 11, :retried 0, :completed 8, :in-progress 0}, "queue_2" {:num-slabs 1, :num-active-slabs 1, :enqueued 11, :retried 0, :completed 5, :in-progress 0}, "queue_3" {:num-slabs 1, :num-active-slabs 1, :enqueued 8, :retried 0, :completed 2, :in-progress 0}} Worker completed message task #1 from queue queue-3 Worker received message task #2 from queue queue-3 Enqueuing task # 8 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 11, :retried 0, :completed 8, :in-progress 0}, "queue_2" {:num-slabs 1, :num-active-slabs 1, :enqueued 11, :retried 0, :completed 5, :in-progress 0}, "queue_3" {:num-slabs 1, :num-active-slabs 1, :enqueued 9, :retried 0, :completed 3, :in-progress 0}} Enqueuing task # 9 Worker completed message task #4 from queue queue-2 Worker received message task #5 from queue queue-2 {"queue_1" {:num-slabs 1, :num-active-slabs 1, :enqueued 11, :retried 0, :completed 8, :in-progress 0}, "queue_2" {:num-slabs 1, :num-active-slabs 1, :enqueued 11, :retried 0, :completed 6, :in-progress 0}, "queue_3" {:num-slabs 1, :num-active-slabs 1, :enqueued 10, :retried 0, :completed 3, :in-progress 0}} Enqueuing task to stop worker subscribed to queue queue-3 All tasks has been scheduled Worker completed message task #7 from queue queue-1 Worker received message task #8 from queue queue-1 Worker completed message task #2 from queue queue-3 Worker received message task #3 from queue queue-3 Worker completed message task #5 from queue queue-2 Worker received message task #6 from queue queue-2 Worker completed message task #8 from queue queue-1 Worker received message task #9 from queue queue-1 Worker completed message task #3 from queue queue-3 Worker received message task #4 from queue queue-3 Worker completed message task #6 from queue queue-2 Worker received message task #7 from queue queue-2 Worker completed message task #9 from queue queue-1 Stopping worker that use queue queue-1 Worker completed message task #4 from queue queue-3 Worker received message task #5 from queue queue-3 Worker completed message task #7 from queue queue-2 Worker received message task #8 from queue queue-2 Worker completed message task #5 from queue queue-3 Worker received message task #6 from queue queue-3 Worker completed message task #8 from queue queue-2 Worker received message task #9 from queue queue-2 Worker completed message task #6 from queue queue-3 Worker received message task #7 from queue queue-3 Worker completed message task #9 from queue queue-2 Stopping worker that use queue queue-2 Worker completed message task #7 from queue queue-3 Worker received message task #8 from queue queue-3 Worker completed message task #8 from queue queue-3 Worker received message task #9 from queue queue-3 Worker completed message task #9 from queue queue-3 Stopping worker that use queue queue-3
18. Message broker RabbitMQ a programovací jazyk Clojure
V závěrečné části článku si připomeneme, jakým způsobem lze se systémem RabbitMQ komunikovat z programovacího jazyka Clojure. Pro tento jazyk vzniklo hned několik knihoven, které rozhraní k RabbitMQ realizují. Většina těchto knihoven je postavena na tzv. Java interop (interoperabilita mezi Javou a Clojure). Rozdíly mezi knihovnami spočívají v tom, zda se skutečně jedná o pouhou úzkou vrstvičku mezi Javou a Clojure či zda knihovna realizuje vlastní složitější (a abstraktnější) framework. Protože jsme se zaměřili na RabbitMQ a nikoli nad ním postavenými systémy, použijeme knihovnu Langohr, která nám nebude poskytovat příliš abstraktní operace, což je dobře, protože jediné, co budeme potřebovat, je získávání zpráv z fronty s jejich dalším zpracováním.
Samotná implementace konzumenta zpráv (vzniklá úpravou getting started příkladu) je při použití programovacího jazyka Clojure nepatrně delší, než je tomu v případě Pythonu. Je tomu tak především proto, že knihovna Langohr je rozdělena na víc částí a budeme muset provést import čtyř konkrétních jmenných prostorů:
(require '[langohr.core :as rabbit-mq]) (require '[langohr.channel :as l-channel]) (require '[langohr.queue :as l-queue]) (require '[langohr.consumers :as l-consumers])
Dále je v konzumentovi deklarována callback funkce zavolaná při příjmu každé zprávy. Povšimněte si, že tělo zprávy (poslední parametr) je typu bytes, ovšem v těle callback funkce ze sekvence bajtů vytvoříme řetězec. Zajímavý je i destructuring [1] použitý u druhého parametru. Jedná se o specialitu nabízenou některými Lispovskými jazyky ve chvíli, kdy se do funkcí předávají sekvence, vektory nebo mapy (slovníky):
(defn message-handler [ch {:keys [content-type delivery-tag type] :as meta} ^bytes payload] (println (format "Received a message: %s" (String. payload "UTF-8"))))
Zbývá nám provést připojení k RabbitMQ a vytvoření komunikačního kanálu:
(let [conn (rabbit-mq/connect) ch (l-channel/open conn)]
Další postup je prakticky totožný s kódem naprogramovaným v Pythonu: deklarace fronty, s níž se pracuje, přihlášení k příjmu zpráv s registrací callback funkce a na konci aplikace úklid – uzavření komunikačního kanálu a uzavření připojení k RabbitMQ:
(l-queue/declare ch "test" {:exclusive false :auto-delete false}) (l-consumers/subscribe ch "test" message-handler {:auto-ack true}) (println (format "Connected to channel id: %d" (.getChannelNumber ch))) (Thread/sleep 10000) (println "Disconnecting...") (rabbit-mq/close ch) (rabbit-mq/close conn)))
Výsledný zdrojový kód realizující celého konzumenta vypadá následovně:
(ns example-01.core (:gen-class)) (require '[langohr.core :as rabbit-mq]) (require '[langohr.channel :as l-channel]) (require '[langohr.queue :as l-queue]) (require '[langohr.consumers :as l-consumers]) (defn message-handler [ch {:keys [content-type delivery-tag type] :as meta} ^bytes payload] (println (format "Received a message: %s" (String. payload "UTF-8")))) (defn -main [& args] (let [conn (rabbit-mq/connect) ch (l-channel/open conn)] (l-queue/declare ch "test" {:exclusive false :auto-delete false}) (l-consumers/subscribe ch "test" message-handler {:auto-ack true}) (println (format "Connected to channel id: %d" (.getChannelNumber ch))) (Thread/sleep 10000) (println "Disconnecting...") (rabbit-mq/close ch) (rabbit-mq/close conn)))
19. Repositář s demonstračními příklady
Zdrojové kódy všech dnes popsaných demonstračních příkladů vyvinutých v programovacím jazyku Clojure byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má stále ještě doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce:
20. Odkazy na Internetu
- durable-queue na GitHubu
https://github.com/Factual/durable-queue - awesome-clojure
https://github.com/razum2um/awesome-clojure - Dramatiq: simple task processing
https://dramatiq.io/ - Lightweight fast persistent queue in Java using Berkley DB
https://sysgears.com/articles/lightweight-fast-persistent-queue-in-java-using-berkley-db/ - Rozhraní BlockingQueue
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html - Třída SynchronousQueue
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/SynchronousQueue.html - Třída ConcurrentLinkedQueue
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html - A Guide to Java SynchronousQueue
https://www.baeldung.com/java-synchronous-queue - PersistentQueue pro Clojure
https://github.com/clojure/clojure/blob/master/src/jvm/clojure/lang/PersistentQueue.java - Cookbook (for Dramatiq)
https://dramatiq.io/cookbook.html - Balíček dramatiq na PyPi
https://pypi.org/project/dramatiq/ - Dramatiq dashboard
https://github.com/Bogdanp/dramatiq_dashboard - Dramatiq na Redditu
https://www.reddit.com/r/dramatiq/ - A Dramatiq broker that can be used with Amazon SQS
https://github.com/Bogdanp/dramatiq_sqs - nanomsg na GitHubu
https://github.com/nanomsg/nanomsg - Referenční příručka knihovny nanomsg
https://nanomsg.org/v1.1.5/nanomsg.html - nng (nanomsg-next-generation)
https://github.com/nanomsg/nng - Differences between nanomsg and ZeroMQ
https://nanomsg.org/documentation-zeromq.html - NATS
https://nats.io/about/ - NATS Streaming Concepts
https://nats.io/documentation/streaming/nats-streaming-intro/ - NATS Streaming Server
https://nats.io/download/nats-io/nats-streaming-server/ - NATS Introduction
https://nats.io/documentation/ - NATS Client Protocol
https://nats.io/documentation/internals/nats-protocol/ - NATS Messaging (Wikipedia)
https://en.wikipedia.org/wiki/NATS_Messaging - Stránka Apache Software Foundation
http://www.apache.org/ - Informace o portu 5672
http://www.tcp-udp-ports.com/port-5672.htm - Třída MessagingHandler knihovny Qpid Proton
https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/proton._handlers.MessagingHandler-class.html - Třída Event knihovny Qpid Proton
https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/proton._events.Event-class.html - package stomp (Go)
https://godoc.org/github.com/go-stomp/stomp - Go language library for STOMP protocol
https://github.com/go-stomp/stomp - python-qpid-proton 0.26.0 na PyPi
https://pypi.org/project/python-qpid-proton/ - Qpid Proton
http://qpid.apache.org/proton/ - Using the AMQ Python Client
https://access.redhat.com/documentation/en-us/red_hat_amq/7.1/html-single/using_the_amq_python_client/ - Apache ActiveMQ
http://activemq.apache.org/ - Apache ActiveMQ Artemis
https://activemq.apache.org/artemis/ - Apache ActiveMQ Artemis User Manual
https://activemq.apache.org/artemis/docs/latest/index.html - KahaDB
http://activemq.apache.org/kahadb.html - Understanding the KahaDB Message Store
https://access.redhat.com/documentation/en-US/Fuse_MQ_Enterprise/7.1/html/Configuring_Broker_Persistence/files/KahaDBOverview.html - Command Line Tools (Apache ActiveMQ)
https://activemq.apache.org/activemq-command-line-tools-reference.html - stomp.py 4.1.21 na PyPi
https://pypi.org/project/stomp.py/ - Stomp Tutorial
https://access.redhat.com/documentation/en-US/Fuse_Message_Broker/5.5/html/Connectivity_Guide/files/FMBConnectivityStompTelnet.html - Heartbeat (computing)
https://en.wikipedia.org/wiki/Heartbeat_(computing) - Apache Camel
https://camel.apache.org/ - Red Hat Fuse
https://developers.redhat.com/products/fuse/overview/ - Confusion between ActiveMQ and ActiveMQ-Artemis?
https://serverfault.com/questions/873533/confusion-between-activemq-and-activemq-artemis - Staré stránky projektu HornetQ
http://hornetq.jboss.org/ - Snapshot JeroMQ verze 0.4.4
https://oss.sonatype.org/content/repositories/snapshots/org/zeromq/jeromq/0.4.4-SNAPSHOT/ - Difference between ActiveMQ vs Apache ActiveMQ Artemis
http://activemq.2283324.n4.nabble.com/Difference-between-ActiveMQ-vs-Apache-ActiveMQ-Artemis-td4703828.html - Microservices communications. Why you should switch to message queues
https://dev.to/matteojoliveau/microservices-communications-why-you-should-switch-to-message-queues–48ia - Stomp.py 4.1.19 documentation
https://stomppy.readthedocs.io/en/stable/ - Repositář knihovny JeroMQ
https://github.com/zeromq/jeromq/ - ØMQ – Distributed Messaging
http://zeromq.org/ - ØMQ Community
http://zeromq.org/community - Get The Software
http://zeromq.org/intro:get-the-software - PyZMQ Documentation
https://pyzmq.readthedocs.io/en/latest/ - Module: zmq.decorators
https://pyzmq.readthedocs.io/en/latest/api/zmq.decorators.html - ZeroMQ is the answer, by Ian Barber
https://vimeo.com/20605470 - ZeroMQ RFC
https://rfc.zeromq.org/ - ZeroMQ and Clojure, a brief introduction
https://antoniogarrote.wordpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/ - zeromq/czmq
https://github.com/zeromq/czmq - golang wrapper for CZMQ
https://github.com/zeromq/goczmq - ZeroMQ version reporting in Python
http://zguide.zeromq.org/py:version - A Go interface to ZeroMQ version 4
https://github.com/pebbe/zmq4 - Broker vs. Brokerless
http://zeromq.org/whitepapers:brokerless - Learning ØMQ with pyzmq
https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/ - Céčková funkce zmq_ctx_new
http://api.zeromq.org/4–2:zmq-ctx-new - Céčková funkce zmq_ctx_destroy
http://api.zeromq.org/4–2:zmq-ctx-destroy - Céčková funkce zmq_bind
http://api.zeromq.org/4–2:zmq-bind - Céčková funkce zmq_unbind
http://api.zeromq.org/4–2:zmq-unbind - Céčková C funkce zmq_connect
http://api.zeromq.org/4–2:zmq-connect - Céčková C funkce zmq_disconnect
http://api.zeromq.org/4–2:zmq-disconnect - Céčková C funkce zmq_send
http://api.zeromq.org/4–2:zmq-send - Céčková C funkce zmq_recv
http://api.zeromq.org/4–2:zmq-recv - Třída Context (Python)
https://pyzmq.readthedocs.io/en/latest/api/zmq.html#context - Třída Socket (Python)
https://pyzmq.readthedocs.io/en/latest/api/zmq.html#socket - Python binding
http://zeromq.org/bindings:python - Why should I have written ZeroMQ in C, not C++ (part I)
http://250bpm.com/blog:4 - Why should I have written ZeroMQ in C, not C++ (part II)
http://250bpm.com/blog:8 - About Nanomsg
https://nanomsg.org/ - Advanced Message Queuing Protocol
https://www.amqp.org/ - Advanced Message Queuing Protocol na Wikipedii
https://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol - Dokumentace k příkazu rabbitmqctl
https://www.rabbitmq.com/rabbitmqctl.8.html - RabbitMQ
https://www.rabbitmq.com/ - RabbitMQ Tutorials
https://www.rabbitmq.com/getstarted.html - RabbitMQ: Clients and Developer Tools
https://www.rabbitmq.com/devtools.html - RabbitMQ na Wikipedii
https://en.wikipedia.org/wiki/RabbitMQ - Streaming Text Oriented Messaging Protocol
https://en.wikipedia.org/wiki/Streaming_Text_Oriented_Messaging_Protocol - Message Queuing Telemetry Transport
https://en.wikipedia.org/wiki/MQTT - Erlang
http://www.erlang.org/ - pika 0.12.0 na PyPi
https://pypi.org/project/pika/ - Introduction to Pika
https://pika.readthedocs.io/en/stable/ - Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
http://clojurerabbitmq.info/ - AMQP 0–9–1 Model Explained
http://www.rabbitmq.com/tutorials/amqp-concepts.html - Part 1: RabbitMQ for beginners – What is RabbitMQ?
https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html - Downloading and Installing RabbitMQ
https://www.rabbitmq.com/download.html - celery na PyPi
https://pypi.org/project/celery/ - Databáze Redis (nejenom) pro vývojáře používající Python
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/ - Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/ - Redis Queue (RQ)
https://www.fullstackpython.com/redis-queue-rq.html - Python Celery & RabbitMQ Tutorial
https://tests4geeks.com/python-celery-rabbitmq-tutorial/ - Flower: Real-time Celery web-monitor
http://docs.celeryproject.org/en/latest/userguide/monitoring.html#flower-real-time-celery-web-monitor - Asynchronous Tasks With Django and Celery
https://realpython.com/asynchronous-tasks-with-django-and-celery/ - First Steps with Celery
http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html - node-celery
https://github.com/mher/node-celery - Full Stack Python: web development
https://www.fullstackpython.com/web-development.html - Introducing RQ
https://nvie.com/posts/introducing-rq/ - Asynchronous Tasks with Flask and Redis Queue
https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue - rq-dashboard
https://github.com/eoranged/rq-dashboard - Stránky projektu Redis
https://redis.io/ - Introduction to Redis
https://redis.io/topics/introduction - Try Redis
http://try.redis.io/ - Redis tutorial, April 2010 (starší, ale pěkně udělaný)
https://static.simonwillison.net/static/2010/redis-tutorial/ - Python Redis
https://redislabs.com/lp/python-redis/ - Redis: key-value databáze v paměti i na disku
https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/ - Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
http://www.cloudsvet.cz/?p=253 - Praktický úvod do Redis (2): transakce
http://www.cloudsvet.cz/?p=256 - Praktický úvod do Redis (3): cluster
http://www.cloudsvet.cz/?p=258 - Connection pool
https://en.wikipedia.org/wiki/Connection_pool - Instant Redis Sentinel Setup
https://github.com/ServiceStack/redis-config - How to install REDIS in LInux
https://linuxtechlab.com/how-install-redis-server-linux/ - Redis RDB Dump File Format
https://github.com/sripathikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format - Lempel–Ziv–Welch
https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch - Redis Persistence
https://redis.io/topics/persistence - Redis persistence demystified
http://oldblog.antirez.com/post/redis-persistence-demystified.html - Redis reliable queues with Lua scripting
http://oldblog.antirez.com/post/250 - Ost (knihovna)
https://github.com/soveran/ost - NoSQL
https://en.wikipedia.org/wiki/NoSQL - Shard (database architecture)
https://en.wikipedia.org/wiki/Shard_%28database_architecture%29 - What is sharding and why is it important?
https://stackoverflow.com/questions/992988/what-is-sharding-and-why-is-it-important - What Is Sharding?
https://btcmanager.com/what-sharding/ - Redis clients
https://redis.io/clients - Category:Lua-scriptable software
https://en.wikipedia.org/wiki/Category:Lua-scriptable_software - Seriál Programovací jazyk Lua
https://www.root.cz/serialy/programovaci-jazyk-lua/ - Redis memory usage
http://nosql.mypopescu.com/post/1010844204/redis-memory-usage - Ukázka konfigurace Redisu pro lokální testování
https://github.com/tisnik/presentations/blob/master/redis/redis.conf - Resque
https://github.com/resque/resque - Nested transaction
https://en.wikipedia.org/wiki/Nested_transaction - Publish–subscribe pattern
https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern - Messaging pattern
https://en.wikipedia.org/wiki/Messaging_pattern - Using pipelining to speedup Redis queries
https://redis.io/topics/pipelining - Pub/Sub
https://redis.io/topics/pubsub - ZeroMQ distributed messaging
http://zeromq.org/ - ZeroMQ: Modern & Fast Networking Stack
https://www.igvita.com/2010/09/03/zeromq-modern-fast-networking-stack/ - Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
https://stackoverflow.com/questions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr - Python & Redis PUB/SUB
https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7 - Message broker
https://en.wikipedia.org/wiki/Message_broker - RESP Arrays
https://redis.io/topics/protocol#array-reply - Redis Protocol specification
https://redis.io/topics/protocol - Redis Pub/Sub: Intro Guide
https://www.redisgreen.net/blog/pubsub-intro/ - Redis Pub/Sub: Howto Guide
https://www.redisgreen.net/blog/pubsub-howto/ - Comparing Publish-Subscribe Messaging and Message Queuing
https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message - Apache Kafka
https://kafka.apache.org/ - Iron
http://www.iron.io/mq - kue (založeno na Redisu, určeno pro node.js)
https://github.com/Automattic/kue - Cloud Pub/Sub
https://cloud.google.com/pubsub/ - Introduction to Redis Streams
https://redis.io/topics/streams-intro - glob (programming)
https://en.wikipedia.org/wiki/Glob_(programming) - Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
https://www.slideshare.net/sylvinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2 - Enqueueing internals
http://python-rq.org/contrib/ - queue — A synchronized queue class
https://docs.python.org/3/library/queue.html - Queue – A thread-safe FIFO implementation
https://pymotw.com/2/Queue/ - Queues
http://queues.io/ - Windows Subsystem for Linux Documentation
https://docs.microsoft.com/en-us/windows/wsl/about - RestMQ
http://restmq.com/ - ActiveMQ
http://activemq.apache.org/ - Amazon MQ
https://aws.amazon.com/amazon-mq/ - Amazon Simple Queue Service
https://aws.amazon.com/sqs/ - Celery: Distributed Task Queue
http://www.celeryproject.org/ - Disque, an in-memory, distributed job queue
https://github.com/antirez/disque - rq-dashboard
https://github.com/eoranged/rq-dashboard - Projekt RQ na PyPi
https://pypi.org/project/rq/ - rq-dashboard 0.3.12
https://pypi.org/project/rq-dashboard/ - Job queue
https://en.wikipedia.org/wiki/Job_queue - Why we moved from Celery to RQ
https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq - Running multiple workers using Celery
https://serverfault.com/questions/655387/running-multiple-workers-using-celery - celery — Distributed processing
http://docs.celeryproject.org/en/latest/reference/celery.html - Chains
https://celery.readthedocs.io/en/latest/userguide/canvas.html#chains - Routing
http://docs.celeryproject.org/en/latest/userguide/routing.html#automatic-routing - Celery Distributed Task Queue in Go
https://github.com/gocelery/gocelery/ - Python Decorators
https://wiki.python.org/moin/PythonDecorators - Periodic Tasks
http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html - celery.schedules
http://docs.celeryproject.org/en/latest/reference/celery.schedules.html#celery.schedules.crontab - Pros and cons to use Celery vs. RQ
https://stackoverflow.com/questions/13440875/pros-and-cons-to-use-celery-vs-rq - Priority queue
https://en.wikipedia.org/wiki/Priority_queue - Jupyter
https://jupyter.org/ - Context Managers
http://book.pythontips.com/en/latest/context_managers.html