Durable Queue aneb implementace front zpráv bez použití klasického message brokera

1. 8. 2019
Doba čtení: 39 minut

Sdílet

Minulý článek byl věnován projektu Dramatiq určenému především pro vývojáře používající jazyk Python. Dnes se pro změnu budeme věnovat jinému jazyku, konkrétně jazyku Clojure a knihovně nazvané příznačně Durable Queue.

Obsah

1. Durable Queue aneb implementace front zpráv bez použití klasického message brokera

2. Rozdíly mezi Durable Queue a dalšími implementacemi front zpráv

3. Instalace knihovny Durable Queue

4. Kostra aplikace používající knihovnu Durable Queue

5. Vložení zprávy do fronty, přečtení zprávy a změna stavu zprávy

6. První demonstrační příklad: inicializace knihovny Durable Queue

7. Druhý demonstrační příklad: použití funkcí put!take!

8. Třetí demonstrační příklad: použití dvojice front, blokující čtení zpráv

9. Čtvrtý demonstrační příklad: potvrzení zpracování zpráv funkcí complete!

10. Informace o tom, že zpráva nebyla zpracována a má být znovu vložena do fronty: funkce retry!

11. Omezení kapacity fronty na zadaný maximální počet zpráv

12. Implementace jednoduchého workera běžícího v samostatném vlákně

13. Ukázka spuštění workera

14. Speciální zpráva určená pro ukončení práce workera

15. Ukázka spuštění workera

16. Složitější příklad s větším počtem paralelně pracujících workerů

17. Ukázka spuštění workerů

18. Message broker RabbitMQ a programovací jazyk Clojure

19. Repositář s demonstračními příklady

20. Odkazy na Internetu

1. Durable Queue aneb implementace front zpráv bez použití klasického message brokera

V dnešní části seriálu o message brokerech se seznámíme se zajímavou knihovnou nazvanou Durable Queue. Jedná se o knihovnu určenou zejména pro ty vývojáře, kteří pracují s programovacím jazykem Clojure. Tato knihovna vývojářům nabízí možnost použití front zpráv v rámci jedné aplikace (přesněji řečeno v rámci jednoho běžícího procesu), ovšem s tím velmi důležitým dodatkem, že fronty (a pochopitelně tedy i jejich obsah – zprávy a jejich stav) bez problémů přežijí i případný restart či pád aplikace, protože jejich obsah je uložen a vhodným způsobem synchronizován na souborovém systému v nakonfigurovaném adresáři. Navíc je možné specifikovat i způsob synchronizace obsahu front s obsahem těchto souborů, takže si uživatel může zvolit očekávané chování aplikace na škále „nejrychlejší“–„nejbezpečnější“.

V dalším textu se nejprve budeme zabývat popisem možností knihovny Durable Queue a posléze si připomeneme i některé základní možnosti nabízené knihovnou core.anync a taktéž způsobem propojení programů psaných v jazyku Clojure s message brokerem RabbitMQ s využitím knihovny Langohr.

2. Rozdíly mezi Durable Queue a dalšími implementacemi front zpráv

V případě, že budeme chtít použít nějakou formu fronty zpráv popř. fronty úloh v aplikacích psaných v programovacím jazyku Clojure, máme v principu k dispozici čtyři různé způsoby, které se od sebe odlišují především tím, zda jsou fronty přímo součástí aplikace, zda „přežijí“ restart aplikace a zda jsou popř. fronty řešeny samostatným message brokerem (který navíc může běžet na jiném počítači):

  1. První způsob spočívá v použití fronty implementované přímo nějakým základním datovým typem programovacího jazyka Clojure. Taková fronta je potom sdílena mezi jednotlivými vlákny aplikace (přičemž typicky některá vlákna implementují producenty zpráv a další vlákna konzumenty). Jedná se o nejjednodušší řešení, protože není nutné instalovat a nastavovat message brokera, fronty jsou uloženy jen v operační paměti, takže nenastávají problémy s právy v souborovém systému atd. Navíc se pochopitelně jedná o řešení velmi rychlé. Ovšem i nevýhody jsou zřejmé – celé škálování se provádí v rámci jednoho procesu a případnou persistenci dat musí explicitně zajistit vývojář (a to není zdaleka tak jednoduché, jak by se možná na první pohled mohlo zdát).
  2. Explicitní použití fronty popsané v předchozím bodu je podle mého názoru zbytečně složité a může vést k potenciálním problémům při sdílení dat, odstraňování zpráv z front atd. atd. Proto může být v případě, že fronty budeme potřebovat použít pouze v rámci jednoho procesu, lepší použít funkce a makra nabízené standardní knihovnou nazvanoucore.async, která programátorům nabízí podobnou funkcionalitu, jako gorutiny a kanály v programovacím jazyku Go. Toto řešení může být pro většinu řešených problémů takřka ideální (navíc i zhruba stejně rychlé), ovšem s jedním dodatkem – o případnou persistenci front a zpráv v nich uložených se opět musí nějakým vhodným způsobem postaral vývojář aplikace, což může vést k potenciálním problémům.
  3. Třetí možnost pravděpodobně využijí především ti programátoři, kteří sice potřebují použít fronty v rámci jednoho procesu, ovšem s tím, že by zprávy měly přežít i případný restart aplikace. Jinými slovy – obsah front musí být vhodným způsobem uložen na souborovém systému. Tato možnost je implementována v projektu (knihovně) Durable Queue, který si popíšeme v dnešním článku. Již na tomto místě je však vhodné upozornit na to, že se nejedná o implementaci klasického message brokera, ale jen jeho lokální varianty použitelné pro jednu aplikaci (ovšem naopak tato aplikace může sama o sobě být message brokerem, pokud je to zapotřebí).
  4. Samozřejmě ale můžeme použít i běžné message brokery, které se o správu front postarají samy. Aplikace psaná v Clojure se k těmto brokerům většinou připojí s využitím nějakého standardního protokolu, který se v této oblasti prosadil. Může se jednat například o protokol STOMP, MQTT či AMQP (různé verze). S tímto konceptem jsme se ostatně již jednou setkali, a to konkrétně při popisu message brokera RabbitMQ a knihovny Langohr určené právě pro programovací jazyk Clojure. Ovšem ve skutečnosti jsou možnosti Clojure (podobně jako i dalších programovacích jazyků) mnohem větší, protože můžeme použít ty message brokery, které jsou dodávány/nabízeny jako služba. Příkladem může být SQS (Simple Queue Service), které je součástí AWS (Amazon Web Services) či IBM MQ. Toto řešení je z pohledu architektury aplikace většinou nejlepší, může dobře škálovat, ovšem pro některé úlohy bude zbytečně pomalé.

3. Instalace knihovny Durable Queue

Popišme si nyní způsob instalace knihovny Durable Queue.

Poznámka: v dalším textu se předpokládá, že máte nainstalován správce projektu Leiningen. Pokud tomu tak není, bude nutné si Leiningen nainstalovat, což ve skutečnosti není nic těžkého. Navíc se jedná o velmi užitečný projekt s mnoha přídavnými moduly, které využijete nejenom při testování, ale například i při přípravě dokumentace nebo ve chvíli, kdy se aplikace připravuje na nasazení (deployment). To, zda je Leiningen nainstalován a lze ho spustit, zjistíte velmi snadno například příkazem which lein.

Na úplném začátku si připravíme kostru projektu, který bude představovat nový projekt. Pro vytvoření této kostry použijeme Leiningen. Kostra projektu se při použití Leiningenu vytvoří příkazem:

$ lein new app durable-queue-1
 
Generating a project called durable-queue-1 based on the 'app' template.

Výsledkem tohoto příkazu by měla být následující adresářová struktura (obsah se může nepatrně odlišovat podle verze Leiningenu, ovšem samotná kostra bude stejná):

.
└── durable-queue-1
    ├── doc
    │   └── intro.md
    ├── LICENSE
    ├── project.clj
    ├── README.md
    ├── resources
    ├── src
    │   └── durable_queue_1
    │       └── core.clj
    └── test
        └── durable_queue_1
            └── core_test.clj

V dalším kroku přistoupíme k úpravám projektového souboru project.clj. Po vytvoření nového projektu by projektový soubor měl vypadat přibližně takto (pouze si pro jistotu zkontrolujte verzi interpretru jazyka Clojure; minimální požadovaná verze je 1.8.0):

(defproject durable-queue-1 "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "Eclipse Public License"
            :url "http://www.eclipse.org/legal/epl-v10.html"}
  :dependencies [[org.clojure/clojure "1.8.0"]]
  :main ^:skip-aot durable-queue-1.core
  :target-path "target/%s"
  :profiles {:uberjar {:aot :all}})

Úprava projektového souboru spočívá v přidání informace o tom, že se v projektu bude používat knihovna durable-queue verze 1.8.0:

(defproject durable-queue-1 "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "Eclipse Public License"
            :url "http://www.eclipse.org/legal/epl-v10.html"}
  :dependencies [[org.clojure/clojure "1.8.0"]
                 [factual/durable-queue "0.1.5"]]
  :main ^:skip-aot durable-queue-1.core
  :target-path "target/%s"
  :profiles {:uberjar {:aot :all}})

Posledním krokem konfigurace projektu je spuštění příkazu:

$ lein deps
 
Retrieving factual/durable-queue/0.1.5/durable-queue-0.1.5.pom from clojars
Retrieving factual/durable-queue/0.1.5/durable-queue-0.1.5.jar from clojars

Tento příkaz zajistí, že se do adresáře ~/.m2/ stáhnou všechny potřebné knihovny, tj. jak durable-queue, tak i její závislosti.

4. Kostra aplikace používající knihovnu Durable Queue

Ukažme si tedy kostru aplikace, která bude používat frontu, jejíž obsah bude uložen v souboru umístěném v adresáři „/tmp“ (můžete si pochopitelně zvolit i odlišné umístění, teoreticky i na síťovém souborovém systému). Nejprve je nutné načíst všechny funkce, makra a symboly z balíčku nazvaného durable-queue. Pro jednoduchost jména všech objektů vložíme přímo do aktivního jmenného prostoru, takže nebudeme muset používat prefix (neboli namísto durable-queue/stats zavoláme přímo a jednoduše stats):

(require '[durable-queue :refer :all])

Dále musíme knihovnu durable-queue inicializovat a určit, ve kterém adresáři budou uloženy soubory obsahující prvky jednotlivých front. Výsledkem úspěšné inicializace provedené funkcí queues je struktura, kterou (zde pro jednoduchost) navážeme na globální symbol q (samozřejmě si opět můžete zvolit libovolné jiné platné jméno):

(def q (queues "/tmp" {}))
Poznámka: pochopitelně je možné v případě potřeby použít lokální navázání vytvořené s využitím let, například uvnitř nějaké funkce:
(defn test
    []
    (let [q (queues "/tmp" {})]
        ...
        ...
        ...))

5. Vložení zprávy do fronty, přečtení zprávy a změna stavu zprávy

Ve chvíli, kdy je knihovna durable-queue inicializována, můžeme se začít dotazovat na stav front, posílat zprávy do fronty (operace typu enqueue), vybírat zprávy z fronty (operace typu dequeue) apod. Používají se přitom tyto základní funkce, které jsou všechny definovány v rámci jmenného prostoru durable-queue:

# Funkce Stručný popis funkce
1 stats vrátí informace o stavu všech front (ve formě mapy)
2 put! pošle do vybrané fronty zprávu
3 take! získá z vybrané fronty zprávu (blokující či neblokující operace)
Poznámka: povšimněte si, že jak funkce put!, tak i funkce take! končí vykřičníkem. Jedná se o velmi často používaný způsob pojmenování těch funkcí, které mění stav aplikace (resp. nějakého objektu). Podobně predikáty (funkce vracející pravdivostní hodnotu true nebo false) mívají na konci svého jména otazník. Příkladem je mnoho funkcí ze standardní knihovny Clojure.

Pro posílání zpráv, které je zajištěno funkcí put!, není zapotřebí fronty explicitně vytvářet – fronta (a k ní příslušející soubor nebo soubory) se automaticky vytvoří společně s první zprávou, která je do ní poslána. Můžeme tedy psát:

(put! q :queue-1 "zpráva")

Funkci put! jsme předali jak strukturu se stavem všech front (q), tak i jméno fronty (:queue-1) a vlastní zprávu („zpráva“). Tělo zprávy může být reprezentováno prakticky jakoukoli datovou strukturou programovacího jazyka Clojure; my ovšem pro jednoduchost použijeme buď řetězec nebo v pozdějších demonstračních příkladech raději takzvaný keyword.

Poznámka: v dalších příkladech budeme poměrně často používat identifikátory, které začínají dvojtečkou. Jedná se o takzvaná keywords, což je označení, které v programovacím jazyce Clojure neznamená běžné klíčové slovo, ale jednoznačné a neměnné jméno. To se typicky používá v těch případech, kdy budeme potřebovat použít symbolickou konstantu. Navíc je práce s keywords (porovnání na ekvivalenci) rychlejší, než kdybychom například použili řetězce.

Podobným způsobem lze zprávu z vybrané fronty přečíst:

(take! q :queue-1 1000 :timed-out)

Funkce take! může být buď blokující popř. může po určitém definovaném čase (timeout) vrátit programátorem specifikovanou hodnotu. První případ může být vhodný pro implementaci klasických workerů, ovšem v některých situacích není vhodné blokovat workera pro (teoreticky) nekonečný čas a proto lze funkci take! zavolat a předat jí jak maximální čas čekání specifikovaný v milisekundách, tak i hodnotu, která se vrátí v případě, že je tento čas překročen:

(take! q :queue-1 1000 :timed-out)

Předchozí volání vrátí buď hodnotu přečtenou z fronty :queue-1 nebo po jedné sekundě hodnotu :timed-out.

Taktéž se můžeme kdykoli dotázat na stav jednotlivých front, a to s využitím funkce stats:

(stats q)

Výsledkem je mapa, která vypadá takto:

{:enqueued 0,
 :retried 0,
 :completed 0,
 :in-progress 0,
 :num-slabs 1,
 :num-active-slabs 1}
Poznámka: pro „pěkně“ naformátovaný výstup použijte funkci pprint namísto funkce println. Vše bude ukázáno v prvním demonstračním příkladu.

Jednotlivé prvky v mapě mají následující význam:

# Prvek Stručný popis prvku
1 :enqueued počet zpráv vložených do fronty
2 :retried počet zpráv, jejichž doručení se muselo zopakovat
3 :completed počet přečtených a dokončených zpráv
4 :in-progress počet zpráv nacházejících se ve stavu zpracování
5 :num-slabs počet souborů s uložením stavu fronty (ve výchozím nastavení jeden)
6 :num-active-slabs počet aktivních souborů

Výsledek volání funkce take! je struktura představující získanou zprávu. Samotný text zprávy (nebo libovolná data, které byly součástí zprávy) se získají standardní funkcí defer. Ovšem ve chvíli, kdy je zpráva z fronty získána, je nutné (kdykoli později) systém informovat, zda zpráva byla zpracována či zda naopak došlo k nějaké chybě a je nutné zprávu zpracovat znovu. K tomu slouží dvojice funkcí:

# Funkce Stručný popis funkce
1 complete! dokončení zpracování zprávy
2 retry! zpracování se nepovedlo, zprávu je nutno doručit znovu
Poznámka: opět si povšimněte, že obě funkce zmíněné v předchozí tabulce končí vykřičníkem. Tyto funkce totiž mění stav zprávy a tím pádem (obecněji) stav celé aplikace.

V případě, že se zavolá funkce retry!, bude zpráva znovu vložena do fronty, a to na její konec (tj. nebude zpracována hned, ale obecně mnohem později, pochopitelně v závislosti na počtu dalších zpráv ve frontě).

6. První demonstrační příklad: inicializace knihovny Durable Queue

Dnešní první demonstrační příklad, který naleznete na adrese https://github.com/tisnik/message-queues-examples/tree/master/durable-queue/durable-queue-1, je velmi primitivní. Pouze v něm inicializujeme knihovnu durable-queue, nastavíme pracovní adresář, do kterého se budou ukládat soubory s prvky fronty a nakonec zavoláme výše popsanou funkci stats, která vypíše informace o stavu všech front.

(ns durable-queue-1.core
  (:gen-class))
 
(require '[durable-queue :refer :all])
 
(defn -main
  [& args]
  (let [q (queues "/tmp" {})]
       (println q)
       (println (stats q))))

Výsledek činnosti tohoto příkladu bude vypadat následovně:

#object[durable_queue$queues$reify__7083 0x6afd9125 durable_queue$queues$reify__7083@6afd9125]
{}

Můžeme vidět, že se nejdříve vypíše informace o struktuře nesoucí všechny stavové informace o frontách (interní struktura je pro nás nezajímavá) a na druhém řádku se vypíše prázdná mapa. Pokud by již nějaká fronta či fronty byly vytvořeny, samozřejmě by se v této mapě zobrazily všechny získané informace (viz další příklad).

7. Druhý demonstrační příklad: použití funkcí put! a take!

Druhý demonstrační příklad, který lze nalézt na adrese https://github.com/tisnik/message-queues-examples/tree/master/durable-queue/durable-queue-2, již provádí větší množství operací, než příklad první. Po inicializaci pošleme dvě zprávy, přičemž obě zprávy budou poslány do stejné fronty pojmenované pro jednoduchost :queue-1:

(put! q :queue-1 "task #A")
(put! q :queue-1 "task #B")

Následně jsou obě zprávy přečteny a tím pádem odstraněny z fronty:

(-> (take! q :queue-1) println)
(-> (take! q :queue-1) println)

Po každé operaci se vypíšou informace o stavu všech front, a to opět pomocí nám již známé funkce stats:

(pprint (stats q))

Úplný zdrojový kód dnešního druhého demonstračního příkladu vypadá následovně:

(ns durable-queue-2.core
  (:gen-class))
 
(require '[durable-queue :refer :all])
(require '[clojure.pprint :refer :all])
 
(defn -main
  [& args]
  (let [q (queues "/tmp" {})]
       (pprint (stats q))
       (put! q :queue-1 "task #A")
       (pprint (stats q))
       (put! q :queue-1 "task #B")
       (pprint (stats q))
       (-> (take! q :queue-1) println)
       (pprint (stats q))
       (-> (take! q :queue-1) println)
       (pprint (stats q))

S výsledky:

{}
 
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 1,
  :retried 0,
  :completed 0,
  :in-progress 0}}
 
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 2,
  :retried 0,
  :completed 0,
  :in-progress 0}}
 
< :in-progress | task #A >
 
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 2,
  :retried 0,
  :completed 0,
  :in-progress 1}}
 
< :in-progress | task #B >
 
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 2,
  :retried 0,
  :completed 0,
  :in-progress 2}}
Poznámka: povšimněte si, že zprávy získané pomocí take! a dále nijak nezpracovány, jsou stále počítány do skupiny enqueued.

8. Třetí demonstrační příklad: použití dvojice front, blokující čtení zpráv

Ve třetím demonstračním příkladu použijeme dvojici front pojmenovaných :queue-1 a :queue-2. Do první z těchto front jsou vloženy dvě zprávy, do druhé fronty však pouze zpráva jediná. Ovšem posléze se budeme snažit přečíst z každé fronty dvě zprávy. To v důsledku znamená, že čtení ze druhé fronty celý proces zablokuje, což uvidíme po spuštění příkladu:

(ns durable-queue-3.core
  (:gen-class))
 
(require '[durable-queue :refer :all])
(require '[clojure.pprint :refer :all])
 
(defn -main
  [& args]
  (let [q (queues "/tmp" {})]
       (pprint (stats q))
       (put! q :queue-1 "task #A")
       (put! q :queue-1 "task #B")
       (put! q :queue-2 "task #C")
       (pprint (stats q))
       (println "Získávám dvě zprávy z front queue-1 a queue-2")
       (-> (take! q :queue-1) println)
       (-> (take! q :queue-2) println)
       (pprint (stats q))
       (println "Získávám další dvě zprávy z front queue-1 a queue-2")
       (-> (take! q :queue-1) println)
       (-> (take! q :queue-2) println)
       (pprint (stats q))))

Po spuštění tohoto demonstračního příkladu se postupně začne vypisovat stav jednotlivých front, ovšem na předposledním příkazu se příklad „zasekne“, protože jsme použili blokující čtení ve chvíli, kdy je druhá fronta prázdná a nikdo do ní již další prvky (zprávy) nevloží:

{}
 
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 2,
  :retried 0,
  :completed 0,
  :in-progress 0},
 "queue_2"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 1,
  :retried 0,
  :completed 0,
  :in-progress 0}}
 
Získávám dvě zprávy z front queue-1 a queue-2
< :in-progress | task #A >
< :in-progress | task #C >
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 2,
  :retried 0,
  :completed 0,
  :in-progress 1},
 "queue_2"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 1,
  :retried 0,
  :completed 0,
  :in-progress 1}}
Získávám další dvě zprávy z front queue-1 a queue-2
< :in-progress | task #B >

9. Čtvrtý demonstrační příklad: potvrzení zpracování zpráv funkcí complete!

Čtvrtý příklad již vychází z reálných projektů. Je v něm definována funkce nazvaná deque-and-complete-message, která nejprve načte zprávu ze zvolené fronty a následně potvrdí přijetí a zpracování zprávy funkcí complete!:

(defn deque-and-complete-message
  [q queue-name]
  (let [message (take! q queue-name)]
       (println "Message dequeued" message)
       (complete! message)
       (println "Message completed" message)))

Po spuštění tohoto příkladu si povšimněte, jak se postupně mění obsah čítačů :completed u obou sledovaných front:

{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 2,
  :retried 0,
  :completed 0,
  :in-progress 0},
 "queue_2"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 1,
  :retried 0,
  :completed 0,
  :in-progress 0}}
 
Získávám a kompletuji dvě zprávy z front queue-1 a queue-2
Message dequeued < :in-progress | task #A >
Message completed < :complete | task #A >
Message dequeued < :in-progress | task #C >
Message completed < :complete | task #C >
 
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 2,
  :retried 0,
  :completed 1,
  :in-progress 0},
 "queue_2"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 1,
  :retried 0,
  :completed 1,
  :in-progress 0}}
 
Získávám a kompletuji další dvě zprávy z front queue-1 a queue-2
Message dequeued < :in-progress | task #B >
Message completed < :complete | task #B >

Následuje výpis kompletního zdrojového kódu tohoto demonstračního příkladu:

(ns durable-queue-4.core
  (:gen-class))
 
(require '[durable-queue :refer :all])
(require '[clojure.pprint :refer :all])
 
(defn deque-and-complete-message
  [q queue-name]
  (let [message (take! q queue-name)]
       (println "Message dequeued" message)
       (complete! message)
       (println "Message completed" message)))
 
(defn -main
  [& args]
  (let [q (queues "/tmp" {})]
       (pprint (stats q))
       (put! q :queue-1 "task #A")
       (put! q :queue-1 "task #B")
       (put! q :queue-2 "task #C")
       (pprint (stats q))
       (println "Získávám a kompletuji dvě zprávy z front queue-1 a queue-2")
       (deque-and-complete-message q :queue-1)
       (deque-and-complete-message q :queue-2)
       (pprint (stats q))
       (println "Získávám a kompletuji další dvě zprávy z front queue-1 a queue-2")
       (deque-and-complete-message q :queue-1)
       (deque-and-complete-message q :queue-2)
       (pprint (stats q))))

10. Informace o tom, že zpráva nebyla zpracována a má být znovu vložena do fronty: funkce retry!

Pátý příklad se podobá příkladu předchozímu, ovšem namísto potvrzení zpracování zprávy funkcí complete! si naopak vyžádáme znovuposlání a tím pádem i opakované zpracování zprávy. Pro tento účel slouží funkce retry!:

(defn deque-and-retry-message
  [q queue-name]
  (let [message (take! q queue-name)]
       (println "Message dequeued" message)
       (retry! message)
       (println "Message completed" message)))

Ze záznamu činnosti tohoto příkladu po spuštění si povšimněte, jak postupně roste hodnota v čítačích :retried a nikoli :completed:

{}
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 2,
  :retried 0,
  :completed 0,
  :in-progress 0},
 "queue_2"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 1,
  :retried 0,
  :completed 0,
  :in-progress 0}}
 
Získávám dvě zprávy z front queue-1 a queue-2, které budou uloženy zpět
Message dequeued < :in-progress | task #A >
Message completed < :incomplete | task #A >
Message dequeued < :in-progress | task #C >
Message completed < :incomplete | task #C >
 
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 2,
  :retried 1,
  :completed 0,
  :in-progress 0},
 "queue_2"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 1,
  :retried 1,
  :completed 0,
  :in-progress 0}}
 
Získávám další dvě zprávy z front queue-1 a queue-2, které budou uloženy zpět
Message dequeued < :in-progress | task #B >
Message completed < :incomplete | task #B >
Message dequeued < :in-progress | task #C >
Message completed < :incomplete | task #C >
 
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 2,
  :retried 2,
  :completed 0,
  :in-progress 0},
 "queue_2"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 1,
  :retried 2,
  :completed 0,
  :in-progress 0}}

Opět si ukažme úplný zdrojový kód tohoto příkladu:

(ns durable-queue-5.core
  (:gen-class))
 
(require '[durable-queue :refer :all])
(require '[clojure.pprint :refer :all])
 
(defn deque-and-retry-message
  [q queue-name]
  (let [message (take! q queue-name)]
       (println "Message dequeued" message)
       (retry! message)
       (println "Message completed" message)))
 
(defn -main
  [& args]
  (let [q (queues "/tmp" {})]
       (pprint (stats q))
       (put! q :queue-1 "task #A")
       (put! q :queue-1 "task #B")
       (put! q :queue-2 "task #C")
       (pprint (stats q))
       (println "Získávám dvě zprávy z front queue-1 a queue-2, které budou uloženy zpět")
       (deque-and-retry-message q :queue-1)
       (deque-and-retry-message q :queue-2)
       (pprint (stats q))
       (println "Získávám další dvě zprávy z front queue-1 a queue-2, které budou uloženy zpět")
       (deque-and-retry-message q :queue-1)
       (deque-and-retry-message q :queue-2)
       (pprint (stats q))))

11. Omezení kapacity fronty na zadaný maximální počet zpráv

Při inicializaci knihovny Durable Queue můžeme mj. specifikovat i maximální počet zpráv, které je možné uložit do front(y). V takovém okamžiku se kromě funkce take! stane i funkce put! blokující, tj. její provádění se zastaví ve chvíli, kdy je fronta plná a funkce se dokončí až tehdy, pokud se z fronty přečte alespoň jeden prvek:

(let [q (queues "/tmp" {:max-queue-size 10})]
     ...
     ...
     ...)

Pokud se budeme snažit do takové fronty (s kapacitou deseti zpráv) vložit dvacet zpráv, zápis se zablokuje po desáté zprávě:

{}
 
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 1,
  :retried 0,
  :completed 0,
  :in-progress 0}}
 
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 2,
  :retried 0,
  :completed 0,
  :in-progress 0}}
 
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 3,
  :retried 0,
  :completed 0,
  :in-progress 0}}
 
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 4,
  :retried 0,
  :completed 0,
  :in-progress 0}}
 
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 5,
  :retried 0,
  :completed 0,
  :in-progress 0}}
 
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 6,
  :retried 0,
  :completed 0,
  :in-progress 0}}
 
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 7,
  :retried 0,
  :completed 0,
  :in-progress 0}}
 
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 8,
  :retried 0,
  :completed 0,
  :in-progress 0}}
 
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 9,
  :retried 0,
  :completed 0,
  :in-progress 0}}
 
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 10,
  :retried 0,
  :completed 0,
  :in-progress 0}}

Úplný zdrojový kód tohoto demonstračního příkladu vypadá následovně:

(ns durable-queue-6.core
  (:gen-class))
 
(require '[durable-queue :refer :all])
(require '[clojure.pprint :refer :all])
 
(defn deque-and-retry-message
  [q queue-name]
  (let [message (take! q queue-name)]
       (println "Message dequeued" message)
       (retry! message)
       (println "Message completed" message)))
 
(defn -main
  [& args]
  (let [q (queues "/tmp" {:max-queue-size 10})]
       (pprint (stats q))
       (doseq [i (range 20)]
           (put! q :queue-1 (str "task #" i))
           (pprint (stats q)))))

12. Implementace jednoduchého workera běžícího v samostatném vlákně

Podívejme se nyní na způsob implementace jednoduchého workera. Může se jednat o funkci, která postupně vybírá zprávy z vybrané fronty a nějak je zpracovává. V našem případě bude zpracování simulováno voláním funkce (sleep 2000):

(defn worker
  [q queue-name]
  (println "Worker started")
  (while true
     (let [message (take! q queue-name)]
        (println "Worker received message" (deref message))
        (complete! message)
        (sleep 2000)
        (println "Worker completed message" (deref message)))))

Workera spustíme v samostatném vláknu a pochopitelně mu musíme předat jak strukturu q, tak i jméno fronty, z níž má vybírat zprávy:

(.start (Thread. (fn [] (worker q :queue-1))))

V hlavním vláknu naopak budeme vytvářet zprávy, ovšem s vyšší frekvencí – přibližně jednu zprávu za sekundu:

(doseq [i (range 10)]
    (println "Enqueuing task #" i)
    (put! q :queue-1 (str "task #" i))
    (pprint (stats q))
    (sleep 1000))))

Úplný zdrojový kód příkladu s workerem může vypadat následovně:

(ns durable-queue-7.core
  (:gen-class))
 
(require '[durable-queue :refer :all])
(require '[clojure.pprint :refer :all])
 
(defn sleep
  [amount]
  (Thread/sleep amount))
 
(defn worker
  [q queue-name]
  (println "Worker started")
  (while true
     (let [message (take! q queue-name)]
        (println "Worker received message" (deref message))
        (complete! message)
        (sleep 2000)
        (println "Worker completed message" (deref message)))))
 
(defn -main
  [& args]
  (let [q (queues "/tmp" {:max-queue-size 10})]
       (pprint (stats q))
       (println "Starting worker")
       (.start (Thread. (fn [] (worker q :queue-1))))
       (doseq [i (range 10)]
           (println "Enqueuing task #" i)
           (put! q :queue-1 (str "task #" i))
           (pprint (stats q))
           (sleep 1000))))

13. Ukázka spuštění workera

Předchozí příklad po spuštění bude vypisovat zprávy typické pro systém typu producent-konzument nebo producent-worker. Celkem se zpracuje dvacet zpráv:

{}
Starting worker
Worker started
Enqueuing task # 0
{"queue_1"
 Worker received message task #0
{:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 1,
  :retried 0,
  :completed 0,
  :in-progress 1}}
Enqueuing task # 1
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 2,
  :retried 0,
  :completed 1,
  :in-progress 0}}
Worker completed message task #0
Worker received message task #1
Enqueuing task # 2
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 3,
  :retried 0,
  :completed 2,
  :in-progress 0}}
Enqueuing task # 3
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 4,
  :retried 0,
  :completed 2,
  :in-progress 0}}
Worker completed message task #1
Worker received message task #2
Enqueuing task # 4
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 5,
  :retried 0,
  :completed 3,
  :in-progress 0}}
Enqueuing task # 5
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 6,
  :retried 0,
  :completed 3,
  :in-progress 0}}
Worker completed message task #2
Worker received message task #3
Enqueuing task # 6
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 7,
  :retried 0,
  :completed 4,
  :in-progress 0}}
Enqueuing task # 7
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 8,
  :retried 0,
  :completed 4,
  :in-progress 0}}
Worker completed message task #3
Worker received message task #4
Enqueuing task # 8
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 9,
  :retried 0,
  :completed 5,
  :in-progress 0}}
Enqueuing task # 9
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 10,
  :retried 0,
  :completed 5,
  :in-progress 0}}
Worker completed message task #4
Worker received message task #5
Worker completed message task #5
Worker received message task #6
Worker completed message task #6
Worker received message task #7
Worker completed message task #7
Worker received message task #8
Worker completed message task #8
Worker received message task #9

14. Speciální zpráva určená pro ukončení práce workera

Pokud budeme chtít mít možnost nějakým způsobem workera ukončit, můžeme pro tuto operaci použít speciální zprávu. Může se jednat například o zprávu s tělem :exit. Samotný kód workera je v tomto případě nutné nepatrně upravit a namísto smyčky while použít konstrukci loop-recur:

(defn worker
  [q queue-name]
  (println "Worker started")
  (loop []
    (let [message (take! q queue-name)
          value   (deref message)]
        (if (=  value :exit)
            (println "Stopping worker")
            (do
                (println "Worker received message" value)
                (complete! message)
                (sleep 2000)
                (println "Worker completed message" value)
                (recur))))))
Poznámka: zcela korektní by bylo potvrdit i zprávu :exit.

Naplánování deseti úloh ukončených příkazem :exit je v tomto případě triviální:

(doseq [i (range 10)]
    (println "Enqueuing task #" i)
    (put! q :queue-1 (str "task #" i))
    (pprint (stats q))
    (sleep 1000))
(println "Enqueuing task to stop worker")
(put! q :queue-1 :exit)
(println "All tasks has been scheduled")))

Celý příklad bude vypadat takto:

(ns durable-queue-8.core
  (:gen-class))
 
(require '[durable-queue :refer :all])
(require '[clojure.pprint :refer :all])
 
(defn sleep
  [amount]
  (Thread/sleep amount))
 
(defn worker
  [q queue-name]
  (println "Worker started")
  (loop []
    (let [message (take! q queue-name)
          value   (deref message)]
        (if (=  value :exit)
            (println "Stopping worker")
            (do
                (println "Worker received message" value)
                (complete! message)
                (sleep 2000)
                (println "Worker completed message" value)
                (recur))))))
 
(defn -main
  [& args]
  (let [q (queues "/tmp" {:max-queue-size 10})]
       (pprint (stats q))
       (println "Starting worker")
       (.start (Thread. (fn [] (worker q :queue-1))))
       (doseq [i (range 10)]
           (println "Enqueuing task #" i)
           (put! q :queue-1 (str "task #" i))
           (pprint (stats q))
           (sleep 1000))
       (println "Enqueuing task to stop worker")
       (put! q :queue-1 :exit)
       (println "All tasks has been scheduled")))

15. Ukázka spuštění workera

Samozřejmě si opět ukážeme, jak bude vypadat spuštění příkladu s jedním workerem ukončovaným speciální zprávou :exit:

{}
Starting worker
Enqueuing task #0
Worker started
Worker received message task #0
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 1,
  :retried 0,
  :completed 0,
  :in-progress 1}}
Enqueuing task # 1
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 2,
  :retried 0,
  :completed 1,
  :in-progress 0}}
Worker completed message task #0
Worker received message task #1
Enqueuing task # 2
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 3,
  :retried 0,
  :completed 2,
  :in-progress 0}}
Enqueuing task # 3
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 4,
  :retried 0,
  :completed 2,
  :in-progress 0}}
Worker completed message task #1
Worker received message task #2
Enqueuing task # 4
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 5,
  :retried 0,
  :completed 3,
  :in-progress 0}}
Enqueuing task # 5
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 6,
  :retried 0,
  :completed 3,
  :in-progress 0}}
Worker completed message task #2
Worker received message task #3
Enqueuing task # 6
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 7,
  :retried 0,
  :completed 4,
  :in-progress 0}}
Enqueuing task # 7
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 8,
  :retried 0,
  :completed 4,
  :in-progress 0}}
Worker completed message task #3
Worker received message task #4
Enqueuing task # 8
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 9,
  :retried 0,
  :completed 5,
  :in-progress 0}}
Enqueuing task # 9
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 10,
  :retried 0,
  :completed 5,
  :in-progress 0}}
Worker completed message task #4
Worker received message task #5
Enqueuing task to stop worker
All tasks has been scheduled
Worker completed message task #5
Worker received message task #6
Worker completed message task #6
Worker received message task #7
Worker completed message task #7
Worker received message task #8
Worker completed message task #8
Worker received message task #9
Worker completed message task #9
Stopping worker

16. Složitější příklad s větším počtem paralelně pracujících workerů

Dnešní devátý a současně i poslední příklad již bude složitější, protože v něm vytvoříme trojici paralelně pracujících workerů, přičemž každý z nich bude zpracovávat zprávy z vlastní fronty:

(def queue-names
    [:queue-1 :queue-2 :queue-3])

Spuštění tří workerů, každého v samostatném vláknu a pro vlastní frontu:

(doseq [queue queue-names]
    (.start (Thread. (fn [] (worker q queue)))))

Naplánování práce pro všechny workery, včetně speciální zprávy :exit:

(doseq [queue queue-names]
    (doseq [i (range 10)]
        (println "Enqueuing task #" i)
        (put! q queue (str "task #" i))
        (pprint (stats q))
        (sleep 500))
    (println "Enqueuing task to stop worker subscribed to queue" (name queue))
    (put! q queue :exit))

Následuje výpis úplného zdrojového kódu dnešního posledního demonstračního příkladu:

(ns durable-queue-9.core
  (:gen-class))
 
(require '[durable-queue :refer :all])
(require '[clojure.pprint :refer :all])
 
(defn sleep
  [amount]
  (Thread/sleep amount))
 
(defn worker
  [q queue-name]
  (println "Worker started, using queue" queue-name)
  (loop []
    (let [message (take! q queue-name)
          value   (deref message)]
        (if (=  value :exit)
            (println "Stopping worker that use queue" (name queue-name))
            (do
                (println "Worker received message" value "from queue" (name queue-name))
                (complete! message)
                (sleep 2000)
                (println "Worker completed message" value "from queue" (name queue-name))
                (recur))))))
 
(def queue-names
    [:queue-1 :queue-2 :queue-3])
 
(defn -main
  [& args]
  (let [q (queues "/tmp" {:max-queue-size 10})]
       (pprint (stats q))
 
       (println "Starting workers")
       (doseq [queue queue-names]
           (.start (Thread. (fn [] (worker q queue)))))
 
       (doseq [queue queue-names]
           (doseq [i (range 10)]
               (println "Enqueuing task #" i)
               (put! q queue (str "task #" i))
               (pprint (stats q))
               (sleep 500))
           (println "Enqueuing task to stop worker subscribed to queue" (name queue))
           (put! q queue :exit))
 
       (println "All tasks has been scheduled")))

17. Ukázka spuštění workerů

Na závěr si ukažme, jak se budou workeři chovat po svém spuštění a při paralelním zpracování zpráv:

{}
Starting workers
Enqueuing task # Worker started, using queue :queue-1
Worker started, using queue :queue-3
Worker started, using queue :queue-2
0
{"queue_1"
Worker received message  task #0 from queue queue-1
{:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 1,
  :retried 0,
  :completed 0,
  :in-progress 1}}
Enqueuing task # 1
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 2,
  :retried 0,
  :completed 1,
  :in-progress 0}}
Enqueuing task to stop worker subscribed to queue queue-1
...
...
...
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 11,
  :retried 0,
  :completed 6,
  :in-progress 0},
 "queue_2"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 11,
  :retried 0,
  :completed 4,
  :in-progress 0},
 "queue_3"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 3,
  :retried 0,
  :completed 1,
  :in-progress 0}}
Worker completed message task #5 from queue queue-1
Worker received message task #6 from queue queue-1
Enqueuing task # 3
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 11,
  :retried 0,
  :completed 7,
  :in-progress 0},
 "queue_2"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 11,
  :retried 0,
  :completed 4,
  :in-progress 0},
 "queue_3"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 4,
  :retried 0,
  :completed 1,
  :in-progress 0}}
Worker completed message task #3 from queue queue-2
Worker received message task #4 from queue queue-2
Enqueuing task # 6
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 11,
  :retried 0,
  :completed 7,
  :in-progress 0},
 "queue_2"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 11,
  :retried 0,
  :completed 5,
  :in-progress 0},
 "queue_3"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 7,
  :retried 0,
  :completed 2,
  :in-progress 0}}
Worker completed message task #6 from queue queue-1
Worker received message task #7 from queue queue-1
Enqueuing task # 7
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 11,
  :retried 0,
  :completed 8,
  :in-progress 0},
 "queue_2"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 11,
  :retried 0,
  :completed 5,
  :in-progress 0},
 "queue_3"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 8,
  :retried 0,
  :completed 2,
  :in-progress 0}}
Worker completed message task #1 from queue queue-3
Worker received message task #2 from queue queue-3
Enqueuing task # 8
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 11,
  :retried 0,
  :completed 8,
  :in-progress 0},
 "queue_2"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 11,
  :retried 0,
  :completed 5,
  :in-progress 0},
 "queue_3"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 9,
  :retried 0,
  :completed 3,
  :in-progress 0}}
Enqueuing task # 9
Worker completed message task #4 from queue queue-2
Worker received message task #5 from queue queue-2
{"queue_1"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 11,
  :retried 0,
  :completed 8,
  :in-progress 0},
 "queue_2"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 11,
  :retried 0,
  :completed 6,
  :in-progress 0},
 "queue_3"
 {:num-slabs 1,
  :num-active-slabs 1,
  :enqueued 10,
  :retried 0,
  :completed 3,
  :in-progress 0}}
Enqueuing task to stop worker subscribed to queue queue-3
All tasks has been scheduled
Worker completed message task #7 from queue queue-1
Worker received message task #8 from queue queue-1
Worker completed message task #2 from queue queue-3
Worker received message task #3 from queue queue-3
Worker completed message task #5 from queue queue-2
Worker received message task #6 from queue queue-2
Worker completed message task #8 from queue queue-1
Worker received message task #9 from queue queue-1
Worker completed message task #3 from queue queue-3
Worker received message task #4 from queue queue-3
Worker completed message task #6 from queue queue-2
Worker received message task #7 from queue queue-2
Worker completed message task #9 from queue queue-1
Stopping worker that use queue queue-1
Worker completed message task #4 from queue queue-3
Worker received message task #5 from queue queue-3
Worker completed message task #7 from queue queue-2
Worker received message task #8 from queue queue-2
Worker completed message task #5 from queue queue-3
Worker received message task #6 from queue queue-3
Worker completed message task #8 from queue queue-2
Worker received message task #9 from queue queue-2
Worker completed message task #6 from queue queue-3
Worker received message task #7 from queue queue-3
Worker completed message task #9 from queue queue-2
Stopping worker that use queue queue-2
Worker completed message task #7 from queue queue-3
Worker received message task #8 from queue queue-3
Worker completed message task #8 from queue queue-3
Worker received message task #9 from queue queue-3
Worker completed message task #9 from queue queue-3
Stopping worker that use queue queue-3

18. Message broker RabbitMQ a programovací jazyk Clojure

V závěrečné části článku si připomeneme, jakým způsobem lze se systémem RabbitMQ komunikovat z programovacího jazyka Clojure. Pro tento jazyk vzniklo hned několik knihoven, které rozhraní k RabbitMQ realizují. Většina těchto knihoven je postavena na tzv. Java interop (interoperabilita mezi Javou a Clojure). Rozdíly mezi knihovnami spočívají v tom, zda se skutečně jedná o pouhou úzkou vrstvičku mezi Javou a Clojure či zda knihovna realizuje vlastní složitější (a abstraktnější) framework. Protože jsme se zaměřili na RabbitMQ a nikoli nad ním postavenými systémy, použijeme knihovnu Langohr, která nám nebude poskytovat příliš abstraktní operace, což je dobře, protože jediné, co budeme potřebovat, je získávání zpráv z fronty s jejich dalším zpracováním.

Samotná implementace konzumenta zpráv (vzniklá úpravou getting started příkladu) je při použití programovacího jazyka Clojure nepatrně delší, než je tomu v případě Pythonu. Je tomu tak především proto, že knihovna Langohr je rozdělena na víc částí a budeme muset provést import čtyř konkrétních jmenných prostorů:

(require '[langohr.core      :as rabbit-mq])
(require '[langohr.channel   :as l-channel])
(require '[langohr.queue     :as l-queue])
(require '[langohr.consumers :as l-consumers])

Dále je v konzumentovi deklarována callback funkce zavolaná při příjmu každé zprávy. Povšimněte si, že tělo zprávy (poslední parametr) je typu bytes, ovšem v těle callback funkce ze sekvence bajtů vytvoříme řetězec. Zajímavý je i destructuring [1] použitý u druhého parametru. Jedná se o specialitu nabízenou některými Lispovskými jazyky ve chvíli, kdy se do funkcí předávají sekvence, vektory nebo mapy (slovníky):

(defn message-handler
    [ch {:keys [content-type delivery-tag type] :as meta} ^bytes payload]
    (println (format "Received a message: %s" (String. payload "UTF-8"))))

Zbývá nám provést připojení k RabbitMQ a vytvoření komunikačního kanálu:

(let [conn  (rabbit-mq/connect)
      ch    (l-channel/open conn)]

Další postup je prakticky totožný s kódem naprogramovaným v Pythonu: deklarace fronty, s níž se pracuje, přihlášení k příjmu zpráv s registrací callback funkce a na konci aplikace úklid – uzavření komunikačního kanálu a uzavření připojení k RabbitMQ:

bitcoin_skoleni

(l-queue/declare ch "test" {:exclusive false :auto-delete false})
(l-consumers/subscribe ch "test" message-handler {:auto-ack true})
(println (format "Connected to channel id: %d" (.getChannelNumber ch)))
(Thread/sleep 10000)
(println "Disconnecting...")
(rabbit-mq/close ch)
(rabbit-mq/close conn)))

Výsledný zdrojový kód realizující celého konzumenta vypadá následovně:

(ns example-01.core
    (:gen-class))
 
(require '[langohr.core      :as rabbit-mq])
(require '[langohr.channel   :as l-channel])
(require '[langohr.queue     :as l-queue])
(require '[langohr.consumers :as l-consumers])
 
 
(defn message-handler
    [ch {:keys [content-type delivery-tag type] :as meta} ^bytes payload]
    (println (format "Received a message: %s" (String. payload "UTF-8"))))
 
 
(defn -main
    [& args]
    (let [conn  (rabbit-mq/connect)
          ch    (l-channel/open conn)]
      (l-queue/declare ch "test" {:exclusive false :auto-delete false})
      (l-consumers/subscribe ch "test" message-handler {:auto-ack true})
      (println (format "Connected to channel id: %d" (.getChannelNumber ch)))
      (Thread/sleep 10000)
      (println "Disconnecting...")
      (rabbit-mq/close ch)
      (rabbit-mq/close conn)))

19. Repositář s demonstračními příklady

Zdrojové kódy všech dnes popsaných demonstračních příkladů vyvinutých v programovacím jazyku Clojure byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má stále ještě doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce:

Příklad Skript Stručný popis Cesta
1 durable-queue-1 inicializace knihovny Durable Queue https://github.com/tisnik/message-queues-examples/blob/master/durable-queue/durable-queue-1/
2 durable-queue-2 použití funkcí put! a take! https://github.com/tisnik/message-queues-examples/blob/master/durable-queue/durable-queue-2/
3 durable-queue-3 použití dvojice front, blokující čtení zpráv https://github.com/tisnik/message-queues-examples/blob/master/durable-queue/durable-queue-3/
4 durable-queue-4 potvrzení zpracování zpráv funkcí complete! https://github.com/tisnik/message-queues-examples/blob/master/durable-queue/durable-queue-4/
5 durable-queue-5 informace o tom, že zpráva nebyla zpracována a má být znovu vložena do fronty funkcí retry! https://github.com/tisnik/message-queues-examples/blob/master/durable-queue/durable-queue-5/
6 durable-queue-6 omezení kapacity fronty na zadaný maximální počet zpráv https://github.com/tisnik/message-queues-examples/blob/master/durable-queue/durable-queue-6/
7 durable-queue-7 implementace jednoduchého workera běžícího v samostatném vlákně https://github.com/tisnik/message-queues-examples/blob/master/durable-queue/durable-queue-7/
8 durable-queue-8 speciální zpráva určená pro ukončení práce workera https://github.com/tisnik/message-queues-examples/blob/master/durable-queue/durable-queue-8/
9 durable-queue-9 složitější příklad s větším počtem paralelně pracujících workerů https://github.com/tisnik/message-queues-examples/blob/master/durable-queue/durable-queue-9/

20. Odkazy na Internetu

  1. durable-queue na GitHubu
    https://github.com/Factual/durable-queue
  2. awesome-clojure
    https://github.com/razum2um/awesome-clojure
  3. Dramatiq: simple task processing
    https://dramatiq.io/
  4. Lightweight fast persistent queue in Java using Berkley DB
    https://sysgears.com/arti­cles/lightweight-fast-persistent-queue-in-java-using-berkley-db/
  5. Rozhraní BlockingQueue
    https://docs.oracle.com/ja­vase/7/docs/api/java/util/con­current/BlockingQueue.html
  6. Třída SynchronousQueue
    https://docs.oracle.com/ja­vase/7/docs/api/java/util/con­current/SynchronousQueue.html
  7. Třída ConcurrentLinkedQueue
    https://docs.oracle.com/ja­vase/7/docs/api/java/util/con­current/ConcurrentLinkedQu­eue.html
  8. A Guide to Java SynchronousQueue
    https://www.baeldung.com/java-synchronous-queue
  9. PersistentQueue pro Clojure
    https://github.com/clojure/clo­jure/blob/master/src/jvm/clo­jure/lang/PersistentQueue­.java
  10. Cookbook (for Dramatiq)
    https://dramatiq.io/cookbook.html
  11. Balíček dramatiq na PyPi
    https://pypi.org/project/dramatiq/
  12. Dramatiq dashboard
    https://github.com/Bogdan­p/dramatiq_dashboard
  13. Dramatiq na Redditu
    https://www.reddit.com/r/dramatiq/
  14. A Dramatiq broker that can be used with Amazon SQS
    https://github.com/Bogdan­p/dramatiq_sqs
  15. nanomsg na GitHubu
    https://github.com/nanomsg/nanomsg
  16. Referenční příručka knihovny nanomsg
    https://nanomsg.org/v1.1.5/na­nomsg.html
  17. nng (nanomsg-next-generation)
    https://github.com/nanomsg/nng
  18. Differences between nanomsg and ZeroMQ
    https://nanomsg.org/documentation-zeromq.html
  19. NATS
    https://nats.io/about/
  20. NATS Streaming Concepts
    https://nats.io/documenta­tion/streaming/nats-streaming-intro/
  21. NATS Streaming Server
    https://nats.io/download/nats-io/nats-streaming-server/
  22. NATS Introduction
    https://nats.io/documentation/
  23. NATS Client Protocol
    https://nats.io/documenta­tion/internals/nats-protocol/
  24. NATS Messaging (Wikipedia)
    https://en.wikipedia.org/wi­ki/NATS_Messaging
  25. Stránka Apache Software Foundation
    http://www.apache.org/
  26. Informace o portu 5672
    http://www.tcp-udp-ports.com/port-5672.htm
  27. Třída MessagingHandler knihovny Qpid Proton
    https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/pro­ton._handlers.MessagingHan­dler-class.html
  28. Třída Event knihovny Qpid Proton
    https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/pro­ton._events.Event-class.html
  29. package stomp (Go)
    https://godoc.org/github.com/go-stomp/stomp
  30. Go language library for STOMP protocol
    https://github.com/go-stomp/stomp
  31. python-qpid-proton 0.26.0 na PyPi
    https://pypi.org/project/python-qpid-proton/
  32. Qpid Proton
    http://qpid.apache.org/proton/
  33. Using the AMQ Python Client
    https://access.redhat.com/do­cumentation/en-us/red_hat_amq/7.1/html-single/using_the_amq_python_client/
  34. Apache ActiveMQ
    http://activemq.apache.org/
  35. Apache ActiveMQ Artemis
    https://activemq.apache.org/artemis/
  36. Apache ActiveMQ Artemis User Manual
    https://activemq.apache.or­g/artemis/docs/latest/index­.html
  37. KahaDB
    http://activemq.apache.or­g/kahadb.html
  38. Understanding the KahaDB Message Store
    https://access.redhat.com/do­cumentation/en-US/Fuse_MQ_Enterprise/7.1/html/Con­figuring_Broker_Persisten­ce/files/KahaDBOverview.html
  39. Command Line Tools (Apache ActiveMQ)
    https://activemq.apache.org/activemq-command-line-tools-reference.html
  40. stomp.py 4.1.21 na PyPi
    https://pypi.org/project/stomp.py/
  41. Stomp Tutorial
    https://access.redhat.com/do­cumentation/en-US/Fuse_Message_Broker/5.5/html/Con­nectivity_Guide/files/FMBCon­nectivityStompTelnet.html
  42. Heartbeat (computing)
    https://en.wikipedia.org/wi­ki/Heartbeat_(computing)
  43. Apache Camel
    https://camel.apache.org/
  44. Red Hat Fuse
    https://developers.redhat­.com/products/fuse/overvi­ew/
  45. Confusion between ActiveMQ and ActiveMQ-Artemis?
    https://serverfault.com/qu­estions/873533/confusion-between-activemq-and-activemq-artemis
  46. Staré stránky projektu HornetQ
    http://hornetq.jboss.org/
  47. Snapshot JeroMQ verze 0.4.4
    https://oss.sonatype.org/con­tent/repositories/snapshot­s/org/zeromq/jeromq/0.4.4-SNAPSHOT/
  48. Difference between ActiveMQ vs Apache ActiveMQ Artemis
    http://activemq.2283324.n4­.nabble.com/Difference-between-ActiveMQ-vs-Apache-ActiveMQ-Artemis-td4703828.html
  49. Microservices communications. Why you should switch to message queues
    https://dev.to/matteojoli­veau/microservices-communications-why-you-should-switch-to-message-queues–48ia
  50. Stomp.py 4.1.19 documentation
    https://stomppy.readthedoc­s.io/en/stable/
  51. Repositář knihovny JeroMQ
    https://github.com/zeromq/jeromq/
  52. ØMQ – Distributed Messaging
    http://zeromq.org/
  53. ØMQ Community
    http://zeromq.org/community
  54. Get The Software
    http://zeromq.org/intro:get-the-software
  55. PyZMQ Documentation
    https://pyzmq.readthedocs­.io/en/latest/
  56. Module: zmq.decorators
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.deco­rators.html
  57. ZeroMQ is the answer, by Ian Barber
    https://vimeo.com/20605470
  58. ZeroMQ RFC
    https://rfc.zeromq.org/
  59. ZeroMQ and Clojure, a brief introduction
    https://antoniogarrote.wor­dpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/
  60. zeromq/czmq
    https://github.com/zeromq/czmq
  61. golang wrapper for CZMQ
    https://github.com/zeromq/goczmq
  62. ZeroMQ version reporting in Python
    http://zguide.zeromq.org/py:version
  63. A Go interface to ZeroMQ version 4
    https://github.com/pebbe/zmq4
  64. Broker vs. Brokerless
    http://zeromq.org/whitepa­pers:brokerless
  65. Learning ØMQ with pyzmq
    https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/
  66. Céčková funkce zmq_ctx_new
    http://api.zeromq.org/4–2:zmq-ctx-new
  67. Céčková funkce zmq_ctx_destroy
    http://api.zeromq.org/4–2:zmq-ctx-destroy
  68. Céčková funkce zmq_bind
    http://api.zeromq.org/4–2:zmq-bind
  69. Céčková funkce zmq_unbind
    http://api.zeromq.org/4–2:zmq-unbind
  70. Céčková C funkce zmq_connect
    http://api.zeromq.org/4–2:zmq-connect
  71. Céčková C funkce zmq_disconnect
    http://api.zeromq.org/4–2:zmq-disconnect
  72. Céčková C funkce zmq_send
    http://api.zeromq.org/4–2:zmq-send
  73. Céčková C funkce zmq_recv
    http://api.zeromq.org/4–2:zmq-recv
  74. Třída Context (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#con­text
  75. Třída Socket (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#soc­ket
  76. Python binding
    http://zeromq.org/bindings:python
  77. Why should I have written ZeroMQ in C, not C++ (part I)
    http://250bpm.com/blog:4
  78. Why should I have written ZeroMQ in C, not C++ (part II)
    http://250bpm.com/blog:8
  79. About Nanomsg
    https://nanomsg.org/
  80. Advanced Message Queuing Protocol
    https://www.amqp.org/
  81. Advanced Message Queuing Protocol na Wikipedii
    https://en.wikipedia.org/wi­ki/Advanced_Message_Queuin­g_Protocol
  82. Dokumentace k příkazu rabbitmqctl
    https://www.rabbitmq.com/rab­bitmqctl.8.html
  83. RabbitMQ
    https://www.rabbitmq.com/
  84. RabbitMQ Tutorials
    https://www.rabbitmq.com/get­started.html
  85. RabbitMQ: Clients and Developer Tools
    https://www.rabbitmq.com/dev­tools.html
  86. RabbitMQ na Wikipedii
    https://en.wikipedia.org/wi­ki/RabbitMQ
  87. Streaming Text Oriented Messaging Protocol
    https://en.wikipedia.org/wi­ki/Streaming_Text_Oriented_Mes­saging_Protocol
  88. Message Queuing Telemetry Transport
    https://en.wikipedia.org/wiki/MQTT
  89. Erlang
    http://www.erlang.org/
  90. pika 0.12.0 na PyPi
    https://pypi.org/project/pika/
  91. Introduction to Pika
    https://pika.readthedocs.i­o/en/stable/
  92. Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
    http://clojurerabbitmq.info/
  93. AMQP 0–9–1 Model Explained
    http://www.rabbitmq.com/tutorials/amqp-concepts.html
  94. Part 1: RabbitMQ for beginners – What is RabbitMQ?
    https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html
  95. Downloading and Installing RabbitMQ
    https://www.rabbitmq.com/dow­nload.html
  96. celery na PyPi
    https://pypi.org/project/celery/
  97. Databáze Redis (nejenom) pro vývojáře používající Python
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/
  98. Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/
  99. Redis Queue (RQ)
    https://www.fullstackpython.com/redis-queue-rq.html
  100. Python Celery & RabbitMQ Tutorial
    https://tests4geeks.com/python-celery-rabbitmq-tutorial/
  101. Flower: Real-time Celery web-monitor
    http://docs.celeryproject­.org/en/latest/userguide/mo­nitoring.html#flower-real-time-celery-web-monitor
  102. Asynchronous Tasks With Django and Celery
    https://realpython.com/asynchronous-tasks-with-django-and-celery/
  103. First Steps with Celery
    http://docs.celeryproject­.org/en/latest/getting-started/first-steps-with-celery.html
  104. node-celery
    https://github.com/mher/node-celery
  105. Full Stack Python: web development
    https://www.fullstackpython.com/web-development.html
  106. Introducing RQ
    https://nvie.com/posts/introducing-rq/
  107. Asynchronous Tasks with Flask and Redis Queue
    https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue
  108. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  109. Stránky projektu Redis
    https://redis.io/
  110. Introduction to Redis
    https://redis.io/topics/introduction
  111. Try Redis
    http://try.redis.io/
  112. Redis tutorial, April 2010 (starší, ale pěkně udělaný)
    https://static.simonwilli­son.net/static/2010/redis-tutorial/
  113. Python Redis
    https://redislabs.com/lp/python-redis/
  114. Redis: key-value databáze v paměti i na disku
    https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/
  115. Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
    http://www.cloudsvet.cz/?p=253
  116. Praktický úvod do Redis (2): transakce
    http://www.cloudsvet.cz/?p=256
  117. Praktický úvod do Redis (3): cluster
    http://www.cloudsvet.cz/?p=258
  118. Connection pool
    https://en.wikipedia.org/wi­ki/Connection_pool
  119. Instant Redis Sentinel Setup
    https://github.com/ServiceStack/redis-config
  120. How to install REDIS in LInux
    https://linuxtechlab.com/how-install-redis-server-linux/
  121. Redis RDB Dump File Format
    https://github.com/sripat­hikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format
  122. Lempel–Ziv–Welch
    https://en.wikipedia.org/wi­ki/Lempel%E2%80%93Ziv%E2%80%93­Welch
  123. Redis Persistence
    https://redis.io/topics/persistence
  124. Redis persistence demystified
    http://oldblog.antirez.com/post/redis-persistence-demystified.html
  125. Redis reliable queues with Lua scripting
    http://oldblog.antirez.com/post/250
  126. Ost (knihovna)
    https://github.com/soveran/ost
  127. NoSQL
    https://en.wikipedia.org/wiki/NoSQL
  128. Shard (database architecture)
    https://en.wikipedia.org/wi­ki/Shard_%28database_archi­tecture%29
  129. What is sharding and why is it important?
    https://stackoverflow.com/qu­estions/992988/what-is-sharding-and-why-is-it-important
  130. What Is Sharding?
    https://btcmanager.com/what-sharding/
  131. Redis clients
    https://redis.io/clients
  132. Category:Lua-scriptable software
    https://en.wikipedia.org/wi­ki/Category:Lua-scriptable_software
  133. Seriál Programovací jazyk Lua
    https://www.root.cz/seria­ly/programovaci-jazyk-lua/
  134. Redis memory usage
    http://nosql.mypopescu.com/pos­t/1010844204/redis-memory-usage
  135. Ukázka konfigurace Redisu pro lokální testování
    https://github.com/tisnik/pre­sentations/blob/master/re­dis/redis.conf
  136. Resque
    https://github.com/resque/resque
  137. Nested transaction
    https://en.wikipedia.org/wi­ki/Nested_transaction
  138. Publish–subscribe pattern
    https://en.wikipedia.org/wi­ki/Publish%E2%80%93subscri­be_pattern
  139. Messaging pattern
    https://en.wikipedia.org/wi­ki/Messaging_pattern
  140. Using pipelining to speedup Redis queries
    https://redis.io/topics/pipelining
  141. Pub/Sub
    https://redis.io/topics/pubsub
  142. ZeroMQ distributed messaging
    http://zeromq.org/
  143. ZeroMQ: Modern & Fast Networking Stack
    https://www.igvita.com/2010/09/03/ze­romq-modern-fast-networking-stack/
  144. Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
    https://stackoverflow.com/qu­estions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr
  145. Python & Redis PUB/SUB
    https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7
  146. Message broker
    https://en.wikipedia.org/wi­ki/Message_broker
  147. RESP Arrays
    https://redis.io/topics/protocol#array-reply
  148. Redis Protocol specification
    https://redis.io/topics/protocol
  149. Redis Pub/Sub: Intro Guide
    https://www.redisgreen.net/blog/pubsub-intro/
  150. Redis Pub/Sub: Howto Guide
    https://www.redisgreen.net/blog/pubsub-howto/
  151. Comparing Publish-Subscribe Messaging and Message Queuing
    https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message
  152. Apache Kafka
    https://kafka.apache.org/
  153. Iron
    http://www.iron.io/mq
  154. kue (založeno na Redisu, určeno pro node.js)
    https://github.com/Automattic/kue
  155. Cloud Pub/Sub
    https://cloud.google.com/pubsub/
  156. Introduction to Redis Streams
    https://redis.io/topics/streams-intro
  157. glob (programming)
    https://en.wikipedia.org/wi­ki/Glob_(programming)
  158. Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
    https://www.slideshare.net/syl­vinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2
  159. Enqueueing internals
    http://python-rq.org/contrib/
  160. queue — A synchronized queue class
    https://docs.python.org/3/li­brary/queue.html
  161. Queue – A thread-safe FIFO implementation
    https://pymotw.com/2/Queue/
  162. Queues
    http://queues.io/
  163. Windows Subsystem for Linux Documentation
    https://docs.microsoft.com/en-us/windows/wsl/about
  164. RestMQ
    http://restmq.com/
  165. ActiveMQ
    http://activemq.apache.org/
  166. Amazon MQ
    https://aws.amazon.com/amazon-mq/
  167. Amazon Simple Queue Service
    https://aws.amazon.com/sqs/
  168. Celery: Distributed Task Queue
    http://www.celeryproject.org/
  169. Disque, an in-memory, distributed job queue
    https://github.com/antirez/disque
  170. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  171. Projekt RQ na PyPi
    https://pypi.org/project/rq/
  172. rq-dashboard 0.3.12
    https://pypi.org/project/rq-dashboard/
  173. Job queue
    https://en.wikipedia.org/wi­ki/Job_queue
  174. Why we moved from Celery to RQ
    https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq
  175. Running multiple workers using Celery
    https://serverfault.com/qu­estions/655387/running-multiple-workers-using-celery
  176. celery — Distributed processing
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.html
  177. Chains
    https://celery.readthedoc­s.io/en/latest/userguide/can­vas.html#chains
  178. Routing
    http://docs.celeryproject­.org/en/latest/userguide/rou­ting.html#automatic-routing
  179. Celery Distributed Task Queue in Go
    https://github.com/gocelery/gocelery/
  180. Python Decorators
    https://wiki.python.org/mo­in/PythonDecorators
  181. Periodic Tasks
    http://docs.celeryproject­.org/en/latest/userguide/pe­riodic-tasks.html
  182. celery.schedules
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.schedules.html#celery­.schedules.crontab
  183. Pros and cons to use Celery vs. RQ
    https://stackoverflow.com/qu­estions/13440875/pros-and-cons-to-use-celery-vs-rq
  184. Priority queue
    https://en.wikipedia.org/wi­ki/Priority_queue
  185. Jupyter
    https://jupyter.org/
  186. Context Managers
    http://book.pythontips.com/en/la­test/context_managers.html
Seriál: Message brokery

Autor článku

Vystudoval VUT FIT a v současné době pracuje na projektech vytvářených v jazycích Python a Go.