Obsah
1. Použití Apache ActiveMQ s protokolem STOMP
2. Zjednodušení producenta a konzumenta zpráv při použití strategie PUB-SUB
3. Totéž zjednodušení při použití komunikační strategie PUSH-PULL
4. Explicitní potvrzování zpráv (ACK) vybíraných z fronty
5. Záporné potvrzení zprávy (NACK)
6. Chování AMQ při záporném potvrzení
7. Použití vlastní fronty na straně příjemce při využití komunikace PUSH-PULL
9. Komunikace typu PUB-SUB bez transakce
10. Komunikace typu PUB-SUB v transakci
11. Komunikace typu PUSH-PULL bez transakce
12. Komunikace typu PUSH-PULL v transakci
13. Zrušení celé transakce (rollback namísto commit)
14. Využití heartbeat pro ověření funkčnosti jednotlivých částí systému
18. Repositář s demonstračními příklady
19. Odkazy na předchozí části seriálu
1. Použití Apache ActiveMQ s protokolem STOMP
V předchozím článku seriálu o message brokerech jsme se seznámili s nástrojem Apache ActiveMQ (AMQ) a ukázali jsme si i dva krátké demonstrační příklady, které AMQ používaly jak pro komunikační strategii PUB-SUB, tak i pro strategii PUSH-PULL (tj. pro klasické fronty zpráv). Dnes si ukážeme další možnosti nabízené nástrojem AMQ při použití protokolu STOMP (Streaming Text Oriented Messaging Protocol), který sice nedokáže využít všechny možnosti AMQ, ovšem základní architekturu systému založeného na kombinaci AMQ+STOMP celkem bez problémů dokážeme postavit (ostatně největší problémy nás většinou čekají při návrhu subsystému pro persistenci zpráv, popř. pro clusterování a load balancing).
V tabulce zobrazené pod tímto odstavcem jsou vypsány všechny příkazy podporované protokolem STOMP verze 1.2 (verze 1.1 se v tomto ohledu příliš neliší):
# | Příkaz | Stručný popis |
---|---|---|
1 | SEND | poslání zprávy |
2 | SUBSCRIBE | přihlášení k odběru zpráv |
3 | UNSUBSCRIBE | odhlášení od odběru zpráv |
4 | ACK | potvrzení zprávy |
5 | NACK | negativní potvrzení (vysvětlíme si příště) |
6 | BEGIN | začátek transakce (vysvětlíme si příště) |
7 | COMMIT | commit v transakci (provedení všech operací) |
8 | ABORT | rollback transakce |
9 | DISCONNECT | odpojení klienta |
Ještě před popisem demonstračních příkladů se krátce zmiňme o persistenci zpráv. Ve výchozím nastavení jsou zprávy ukládány do databáze KahaDB, která nabízí mj. i žurnálování. Soubory s databází naleznete v podadresáři „data/kahadb“:
$ ls -1 data/kahadb/ db-1.log db.data db.free db.redo lock
Konfiguraci ukládání zpráv nalezneme v souboru activemq.xml:
<broker brokerName="broker"> <persistenceAdapter> <kahaDB directory="activemq-data" journalMaxFileLength="32mb"/> </persistenceAdapter> </broker>
Bližší informace o konfiguraci i o principu činnosti KahaDB naleznete na stránkách:
- KahaDB
http://activemq.apache.org/kahadb.html - Understanding the KahaDB Message Store
https://access.redhat.com/documentation/en-US/Fuse_MQ_Enterprise/7.1/html/Configuring_Broker_Persistence/files/KahaDBOverview.html
Nastavení perzistence zpráv lze samozřejmě změnit, dokonce je možné i vyměnit typ databáze. Podrobnosti budou popsány příště v rámci konfigurace AMQ.
2. Zjednodušení producenta a konzumenta zpráv při použití strategie PUB-SUB
Nejprve si zopakujeme dva základní způsoby použití message brokera. Většinou jsou podporovány dvě rozdílně pracující komunikační strategie:
- PUB-SUB: zprávy jsou posílány producentem do message brokera, který má k dispozici dynamicky se měnící seznam příjemců (konzumentů) zpráv. Pokud příjemce daného tématu zpráv existuje, je mu zpráva přeposlána. Podobně se systém chová při existenci většího množství příjemců. Zpráva je od producenta přečtena i ve chvíli, kdy neexistuje žádný příjemce – v takovém případě je zahozena (pokud se interně používá fronta, je jen dočasná).
- PUSH-PULL: tato strategie je založena na použití fronty. Producent pošle zprávu message brokerovi, který ji uloží do fronty (každá fronta je pojmenovaná), konzumenti se mohou k frontě (frontám) připojit a zprávy přečíst. V tomto případě se zpráva doručí jen jedinému konzumentovi. Pokud se k frontě připojí větší množství konzumentů, je typicky použit algoritmus round-robin pro (polo)spravedlivé rozložení zátěže.
Ukažme si nejdříve implementaci producenta zpráv při použití strategie PUB-SUB. Jak jsme si již naznačili v předchozím textu, je každé zprávě přiřazeno téma (topic), které může být v případě použití protokolu STOMP specifikováno následovně:
"/topic/event" "/topic/téma2" "/topic/jiné_téma"
Producent zpráv se nejprve musí připojit k běžícímu message brokerovi. V demonstračním příkladu popsaném minule jsme pro připojení použili volání:
conn = stomp.Connection(host_and_ports=[("localhost", 61613)]) conn.start() conn.connect(login="admin", passcode="admin")
Ve skutečnosti je ovšem možné využít i zjednodušeného zápisu při volání konstruktoru Connection:
conn = stomp.Connection([("localhost", 61613)]) conn.start() conn.connect(login="admin", passcode="admin")
Zprávy se posílají metodou send, přičemž je nutné specifikovat jak téma, tak i vlastní zprávu:
conn.send("/topic/event", "Hello world!")
Podporovány jsou i další nepovinné parametry, například:
conn.send("/topic/event", "Hello world!", persistent='true') conn.send("/topic/event", "Hello world!", transaction='id-transakce') conn.send("/topic/event", "Hello world!", content_type='text/plain')
Po poslání všech zpráv je vhodné, aby se producent od message brokera odpojil:
conn.disconnect()
Podívejme se nyní na úplný zdrojový kód jednoduchého producenta zpráv, který použije dvě témata (topicy) se jmény „event“ a „event2“:
#!/usr/bin/env python import time import stomp destination1 = "/topic/event" destination2 = "/topic/event2" MESSAGES = 10 conn = stomp.Connection([("localhost", 61613)]) conn.start() conn.connect(login="admin", passcode="admin") for i in range(0, MESSAGES): message = "Hello world #{i}!".format(i=i) conn.send(destination1, message, persistent='true') conn.send(destination2, message, persistent='true') conn.disconnect()
Konzument (příjemce) zpráv je složitější, protože se samotný příjem zprávy (nebo vznik chyby) programuje s využitím callback metod. Samotný handler jednotlivých událostí může v nejjednodušším případě obsahovat pouze metody nazvané on_message (zavoláno při přijetí zprávy) a on_error (zavoláno v případě nějaké chyby). Existují ovšem i další callback metody, například on_disconnected a on_heartbeat, které si popíšeme později. Implementace handleru událostí vypadá takto:
class SimpleListener(object): def __init__(self, conn): self.conn = conn def on_message(self, headers, message): print("Received message: {m}".format(m=message)) def on_error(self, headers, message): print("Received an error {e}".format(e=message))
Samotné připojení příjemce zpráv je řešeno shodným programovým kódem, jako u producenta, ovšem navíc musíme zaregistrovat výše zmíněný handler metodou set_listener:
conn = stomp.Connection([("localhost", 61613)]) conn.set_listener('', SimpleListener(conn)) conn.start() conn.connect(login="admin", passcode="admin")
K odebírání zpráv pro zvolené téma se přihlásíme metodou subscribe:
conn.subscribe(id='simple_listener', destination="/topic/zvolené_téma", ack='auto')
Opět si ukažme úplný zdrojový kód jednoduchého konzumenta zpráv:
#!/usr/bin/env python import time import stomp class SimpleListener(object): def __init__(self, conn): self.conn = conn def on_message(self, headers, message): print("Received message: {m}".format(m=message)) def on_error(self, headers, message): print("Received an error {e}".format(e=message)) destination = "/topic/event" conn = stomp.Connection([("localhost", 61613)]) conn.set_listener('', SimpleListener(conn)) conn.start() conn.connect(login="admin", passcode="admin") conn.subscribe(id='simple_listener', destination=destination, ack='auto') print("Waiting for messages...") while True: time.sleep(10)
3. Totéž zjednodušení při použití komunikační strategie PUSH-PULL
Podobným způsobem můžeme implementovat producenta zpráv posílaných do front a tudíž používajících komunikační strategii PUSH-PULL a nikoli PUB-SUB. Na straně producenta se změní pouhé dva řádky, na nichž je specifikován cíl zpráv (z pohledu producenta je cílem nějaký bod message brokera). U strategie PUB-SUB byly cíle pojmenovány takto:
destination1 = "/topic/event" destination2 = "/topic/event2"
kdežto v případě strategie PUSH-PULL použijeme:
destination1 = "/queue/test" destination2 = "/queue/test2"
Nejjednodušší producent zpráv posílaných do dvojice front může vypadat takto:
#!/usr/bin/env python import time import stomp destination1 = "/queue/test" destination2 = "/queue/test2" MESSAGES = 10 conn = stomp.Connection([("localhost", 61613)]) conn.start() conn.connect(login="admin", passcode="admin") for i in range(0, MESSAGES): message = "Hello world #{i}!".format(i=i) conn.send(destination1, message, persistent='true') conn.send(destination2, message, persistent='true') conn.disconnect()
Konzument (příjemce) zpráv se změní jen nepatrně:
#!/usr/bin/env python import time import stomp class SimpleListener(object): def __init__(self, conn): self.conn = conn def on_message(self, headers, message): print("Received message: {m}".format(m=message)) def on_error(self, headers, message): print("Received an error {e}".format(e=message)) destination = "/queue/test" conn = stomp.Connection([("localhost", 61613)]) conn.set_listener('', SimpleListener(conn)) conn.start() conn.connect(login="admin", passcode="admin") conn.subscribe(id='simple_listener', destination=destination, ack='auto') print("Waiting for messages...") while True: time.sleep(10)
4. Explicitní potvrzování zpráv (ACK) vybíraných z fronty
U všech prozatím zmíněných konzumentů zpráv jsme používali automatické potvrzení jejich zpracování. To spočívá v tom, že se zpráva ihned po svém přijetí označí za zpracovanou a je vyřazena z příslušné fronty (message broker na ní zcela „zapomene“). Nastavení automatického potvrzení se provádělo přímo při přihlašování konzumenta zpráv k nějaké frontě či k tématu:
conn.subscribe(id='simple_listener', destination=destination, ack='auto')
Můžeme ovšem specifikovat i odlišné chování – explicitní potvrzení o tom, že zpráva byla zpracována:
conn.subscribe(id='simple_listener', destination=destination, ack='client')
V tomto případě je konzument povinen explicitně potvrdit zpracování zprávy:
def on_message(self, headers, message): print("Received message: {m}".format(m=message)) print(headers) message_id = headers['message-id'] subscription = headers['subscription'] self.conn.ack(message_id, subscription)
Nebo naopak zprávu nepotvrdit:
def on_message(self, headers, message): print("Received message: {m}".format(m=message)) print(headers) message_id = headers['message-id'] subscription = headers['subscription'] self.conn.nack(message_id, subscription)
Existuje ještě třetí stav – nepošle se ani ACK ani NACK. Tato zpráva není považována za zpracovanou a záleží na dalším nastavení (životnost), co se s ní stane.
Konzumenta zpráv upravíme pro explicitní potvrzování zpráv takto:
#!/usr/bin/env python import time import stomp class SimpleListener(object): def __init__(self, conn): self.conn = conn def on_message(self, headers, message): print("Received message: {m}".format(m=message)) print(headers) message_id = headers['message-id'] subscription = headers['subscription'] self.conn.ack(message_id, subscription) def on_error(self, headers, message): print("Received an error {e}".format(e=message)) destination = "/queue/test" conn = stomp.Connection(host_and_ports=[("localhost", 61613)]) conn.set_listener('', SimpleListener(conn)) conn.start() conn.connect(login="admin", passcode="admin") conn.subscribe(id='simple_listener', destination=destination, ack='client') print("Waiting for messages...") while True: time.sleep(10)
5. Záporné potvrzení zprávy (NACK)
Podívejme se nyní, jakým způsobem je možné realizovat konzumenta zpráv, který zprávy sice přečte ze zvolené fronty (zde konkrétně z fronty nazvané „test“), ovšem nepotvrdí korektní zpracování zprávy. Konzument tedy musí zprávu přečíst v rámci svého handleru a posléze zavolá metodu connection.nack(), které předá jednoznačné ID zprávy:
class SimpleListener: def __init__(self, conn): self.conn = conn def on_message(self, headers, message): print("Received message: {m}".format(m=message)) print(headers) message_id = headers['message-id'] subscription = headers['subscription'] self.conn.nack(message_id, subscription)
Další změny ve zdrojovém kódu konzumenta nenastaly, jak je to ostatně patrné i při pohledu na jeho zdrojový kód:
#!/usr/bin/env python import time import stomp class SimpleListener(object): def __init__(self, conn): self.conn = conn def on_message(self, headers, message): print("Received message: {m}".format(m=message)) print(headers) message_id = headers['message-id'] subscription = headers['subscription'] self.conn.nack(message_id, subscription) def on_error(self, headers, message): print("Received an error {e}".format(e=message)) destination = "/queue/test" conn = stomp.Connection(host_and_ports=[("localhost", 61613)]) conn.set_listener('', SimpleListener(conn)) conn.start() conn.connect(login="admin", passcode="admin") conn.subscribe(id='simple_listener', destination=destination, ack='client') print("Waiting for messages...") while True: time.sleep(10)
V navazující kapitole si ukážeme, co se vlastně stane se zprávami, u nichž nedošlo k potvrzení jejich zpracování. Message broker totiž musí v tomto případě předpokládat, že problém nastal již ve vlastním obsahu zprávy, tedy že se nutně nemusí jednat o špatně naimplementovaného klienta.
6. Chování AMQ při záporném potvrzení
Podívejme se nyní, co se stane se zprávami, které sice byly přijaty (přečteny z fronty), ovšem došlo k zápornému potvrzení jejich zpracování (NACK).
Následující výpis byl vytvořen s využitím příkazu activemq dstat, který vypíše informace o všech frontách a taktéž o všech tématech.
Name Queue Size Producer # Consumer # Enqueue # Dequeue # Forward # Memory % ActiveMQ.Advisory.Connection 0 0 0 6 0 0 0 ActiveMQ.Advisory.Consumer.Queue.test 0 0 0 10 0 0 0 ActiveMQ.Advisory.MasterBroker 0 0 0 1 0 0 0 ActiveMQ.Advisory.Queue 0 0 0 9 0 0 0 ActiveMQ.Advisory.Topic 0 0 0 2 0 0 0 ActiveMQ.DLQ 0 0 0 0 0 0 0 event 0 0 0 0 0 0 0 event2 0 0 0 0 0 0 0 test 0 0 0 1 1 0 0 test2 0 0 0 0 20 0 0
Nás ovšem budou zajímat pouze informace o frontách (queue), takže použijeme filtr představovaný příkazem activemq dstat queues. Nyní bude výsledek vypadat odlišně:
Name Queue Size Producer # Consumer # Enqueue # Dequeue # Forward # Memory % ActiveMQ.DLQ 0 0 0 0 0 0 0 test 0 0 0 1 1 0 0 test2 0 0 0 0 20 0 0
Z výpisu je patrné, že v systému AMQ existují v tomto okamžiku tři fronty, z nichž dvě test a test2 jsme vytvořili našimi demonstračními příklady a fronta ActiveMQ.DLQ byla vytvořena automaticky. Jméno poslední fronty je odvozeno z plného názvu „Dead Letter Queue“.
Pokud nyní spustíme výše popsaného producenta zpráv, mělo by se vygenerovat celkem dvacet zpráv, z nichž deset bude uloženo do fronty nazvané test a dalších deset do fronty nazvané test2:
Name Queue Size Producer # Consumer # Enqueue # Dequeue # Forward # Memory % ActiveMQ.DLQ 0 0 0 0 0 0 0 test 10 0 0 11 1 0 0 test2 10 0 0 10 20 0 0
Po spuštění konzumenta (příjemce) zpráv, který odpovídá stavovým kódem NACK se sice přečte všech deset zpráv z fronty test, ovšem tyto zprávy se ihned poté, co message broker získá stav NACK, uloží do speciální fronty ActiveMQ.DLQ:
Name Queue Size Producer # Consumer # Enqueue # Dequeue # Forward # Memory % ActiveMQ.DLQ 10 0 0 10 0 0 0 test 0 0 0 11 11 0 0 test2 10 0 0 10 20 0 0
Informace o těchto zprávách si můžeme relativně snadno zobrazit tímto příkazem:
$ ./activemq browse ActiveMQ.DLQ
Výsledky by mohly vypadat následovně:
JMS_HEADER_FIELD:JMSDestination = ActiveMQ.DLQ JMS_CUSTOM_FIELD:originalExpiration = 0 JMS_HEADER_FIELD:JMSDeliveryMode = persistent JMS_HEADER_FIELD:JMSMessageID = ID:localhost.localdomain-33489-1550598531973-3:3:-1:1:1 JMS_BODY_FIELD:JMSBytes:1 = JMS_CUSTOM_FIELD:OriginalDestination = test JMS_HEADER_FIELD:JMSExpiration = 0 JMS_HEADER_FIELD:JMSPriority = 4 JMS_HEADER_FIELD:JMSRedelivered = false JMS_HEADER_FIELD:JMSTimestamp = 1550607287050 JMS_HEADER_FIELD:JMSDestination = ActiveMQ.DLQ JMS_CUSTOM_FIELD:originalExpiration = 0 JMS_HEADER_FIELD:JMSDeliveryMode = persistent JMS_HEADER_FIELD:JMSMessageID = ID:localhost.localdomain-33489-1550598531973-3:3:-1:1:3 JMS_BODY_FIELD:JMSBytes:1 = JMS_CUSTOM_FIELD:OriginalDestination = test JMS_HEADER_FIELD:JMSExpiration = 0 JMS_HEADER_FIELD:JMSPriority = 4 JMS_HEADER_FIELD:JMSRedelivered = false JMS_HEADER_FIELD:JMSTimestamp = 1550607287052 ... ... ...
Povšimněte si, že je možné získat mj. i informaci o tom, ve které frontě byla zpráva původně uložena (hodnota atributu OriginalDestination).
V případě, že nám existence jediné fronty typu „DLQ“ pro všechny nepotvrzené zprávy nevyhovuje, můžeme si vytvořit vlastní pravidla editací souboru activemq.xml, který naleznete v podadresáři conf. Příkladem může být pravidlo pro zprávy, které byly původně poslány do fronty „test2“:
<broker> <destinationPolicy> <policyMap> <policyEntries> <policyEntry queue="test2"> <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/> </deadLetterStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> </broker>
Toto nastavení můžeme provést pro všechny fronty, protože v názvu fronty je možné použít žolíkové znaky. Podporovány jsou znaky „.“ (oddělovač), „*“ (jakékoli jméno/slovo v cestě) a „>“ (musí být uvedeno na konci, rekurzivní výskyty začínající daným řetězcem). To znamená, že všechny fronty mohou být reprezentovány pouze znakem „>“:
<broker> <destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">"> <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/> </deadLetterStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> </broker>
7. Použití vlastní fronty na straně příjemce při využití komunikace PUSH-PULL
V dalším demonstračním příkladu si ukážeme, jak je možné upravit konzumenta zpráv takovým způsobem, aby konzument akceptoval všechny zprávy bez blokování (prakticky okamžitě). Řešení spočívá v tom, že v konzumentovi použijeme jeho vlastní lokální frontu, do které se zprávy budou ukládat (enqueue) v jednom vláknu, zatímco v hlavním vláknu se budou zprávy z této fronty postupně vybírat a zpracovávat, nezávisle na tom, jak rychle či naopak pomalu jsou zprávy generovány. Pro implementaci vlastní lokální fronty použijeme standardní třídu Queue, která umožňuje provádění operací s frontou z většího množství vláken (operace jsou synchronizovány). Bližší informace o této užitečné třídě naleznete například na stránce https://docs.python.org/3/library/queue.html, popř. přímo z interaktivní konzole Pythonu:
$ python3 Python 3.6.3 (default, Oct 9 2017, 12:11:29) [GCC 7.2.1 20170915 (Red Hat 7.2.1-2)] on linux Type "help", "copyright", "credits" or "license" for more information. >>> help("queue")
S výsledkem:
Help on module queue: NAME queue - A multi-producer, multi-consumer queue. CLASSES builtins.Exception(builtins.BaseException) Empty Full builtins.object Queue LifoQueue PriorityQueue ... ... ...
V naší implementaci lokální fronty použijeme pouze metody Queue.put() a Queue.get():
>>> help("queue.Queue.put") queue.Queue.put = put(self, item, block=True, timeout=None) Put an item into the queue. If optional args 'block' is true and 'timeout' is None (the default), block if necessary until a free slot is available. If 'timeout' is a non-negative number, it blocks at most 'timeout' seconds and raises the Full exception if no free slot was available within that time. Otherwise ('block' is false), put an item on the queue if a free slot is immediately available, else raise the Full exception ('timeout' is ignored in that case).
Povšimněte si, že metoda Queue.get() je implicitně blokující, tj. pokud nebudou k dispozici žádné zprávy, bude hlavní vlákno pasivně čekat na jejich příchod:
>>> help("queue.Queue.get") Help on function get in queue.Queue: queue.Queue.get = get(self, block=True, timeout=None) Remove and return an item from the queue. If optional args 'block' is true and 'timeout' is None (the default), block if necessary until an item is available. If 'timeout' is a non-negative number, it blocks at most 'timeout' seconds and raises the Empty exception if no item was available within that time. Otherwise ('block' is false), return an item if one is immediately available, else raise the Empty exception ('timeout' is ignored in that case).
Nyní si ukažme vlastní implementaci. Přijetí zprávy je realizováno v handleru, jehož jedním atributem je právě reference na lokální frontu:
class SimpleListener(object): def __init__(self, queue): self.queue = queue def on_message(self, headers, message): print("Received message: {m}, putting it into local queue".format(m=message)) self.queue.put(message) def on_error(self, headers, message): print("Received an error {e}".format(e=message))
Naproti tomu zpracování zpráv z lokální fronty je snadné – využijeme zde blokující operaci Queue.get():
while True: item = q.get() print("Begin working on " + item) if item == "exit": break time.sleep(1) q.task_done() print("Finished working on " + item)
Úplný zdrojový kód konzumenta (příjemce) zprávy může vypadat takto:
#!/usr/bin/env python import time import stomp from queue import Queue class SimpleListener(object): def __init__(self, queue): self.queue = queue def on_message(self, headers, message): print("Received message: {m}, putting it into local queue".format(m=message)) self.queue.put(message) def on_error(self, headers, message): print("Received an error {e}".format(e=message)) q = Queue() destination = "/queue/test" conn = stomp.Connection([("localhost", 61613)]) conn.set_listener('', SimpleListener(q)) conn.start() conn.connect(login="admin", passcode="admin") conn.subscribe(id='simple_listener', destination=destination, ack='auto') print("Waiting for messages...") while True: item = q.get() print("Begin working on " + item) if item == "exit": break time.sleep(1) q.task_done() print("Finished working on " + item) conn.disconnect()
Samotný producent zpráv je prakticky totožný s předchozími producenty, ovšem poslední zprávou bude „exit“, kterým automaticky ukončíme konzumenta (příjemce) zprávy:
#!/usr/bin/env python import time import stomp destination1 = "/queue/test" destination2 = "/queue/test2" MESSAGES = 10 conn = stomp.Connection([("localhost", 61613)]) conn.start() conn.connect(login="admin", passcode="admin") for i in range(0, MESSAGES): message = "Hello world #{i}!".format(i=i) conn.send(destination1, message, persistent='true') conn.send(destination2, message, persistent='true') conn.send(destination1, "exit", persistent='true') conn.send(destination2, "exit", persistent='true') conn.disconnect() print("Done")
8. Transakce
Jednou z důležitých a mnohdy velmi užitečných vlastností protokolu STOMP, která je samozřejmě podporovaná i v systému AMQ, je možnost zpracovávat zprávy v transakci. Samotné transakce se specifikují prakticky stejným způsobem, jako například v databázových systémech – transakce je spuštěna metodou Connection.begin() a končí buď zavoláním metody Connection.commit() (dokončení transakce) nebo naopak zavoláním metody Connection.abort() (odvolání celé transakce, rollback).
Pro podporu transakcí existují v protokolu STOMP tři příkazy:
# | Příkaz | Stručný popis |
---|---|---|
1 | BEGIN | začátek transakce (vysvětlíme si příště) |
2 | COMMIT | commit v transakci (provedení všech operací) |
3 | ABORT | rollback transakce |
Použití těchto příkazů si můžeme vyzkoušet z klienta protokolu STOMP ovládaného z příkazového řádku:
$ stomp -H 127.0.0.1 -P 61613 -U admin -W admin
Příkazy zadávané uživatelem jsou zvýrazněny tučně:
> begin Transaction id: 72c39904-080c-478f-a540-550b2deb9d99 > send /queue/test x > send /queue/test y > send /queue/test z > commit Committing 72c39904-080c-478f-a540-550b2deb9d99 > begin Transaction id: 37d70eb2-5e91-4988-932b-e2acfe1bc275 > send /queue/test u > send /queue/test v > send /queue/test w > > rollback
Na straně příjemce zpráv se objeví pouze informace o přijetí prvních tří zpráv „x“, „y“ a „z“, zatímco zprávy „u“, „v“ a „w“ nejsou do fronty vloženy a tudíž je ani příjemce nemůže získat:
Waiting for messages... Received message: x Received message: y Received message: z
Rozdíl mezi použitím a nepoužitím transakcí si ukážeme na dvou příkladech:
- Producent-konzument používající PUB-SUB
- Producent-konzument používající PUSH-PULL
9. Komunikace typu PUB-SUB bez transakce
Zdrojový kód producenta zpráv:
#!/usr/bin/env python import time import stomp destination1 = "/topic/event" destination2 = "/topic/event2" MESSAGES = 10 conn = stomp.Connection([("localhost", 61613)]) conn.start() conn.connect(login="admin", passcode="admin") for i in range(0, MESSAGES): message = "Hello world #{i}!".format(i=i) print("Publishing message: " + message) conn.send(destination1, message, persistent='true') conn.send(destination2, message, persistent='true') time.sleep(1) print("Done") conn.disconnect()
Zdrojový kód konzumenta zpráv:
#!/usr/bin/env python import time import stomp class SimpleListener: def __init__(self, conn): self.conn = conn def on_message(self, headers, message): print("Received message: {m}".format(m=message)) def on_error(self, headers, message): print("Received an error {e}".format(e=message)) destination = "/topic/event" conn = stomp.Connection([("localhost", 61613)]) conn.set_listener('', SimpleListener(conn)) conn.start() conn.connect(login="admin", passcode="admin") conn.subscribe(id='simple_listener', destination=destination, ack='auto') print("Waiting for messages...") while True: time.sleep(10)
Chování takto nakonfigurovaného systému bude následující (vedle sebe jsou zobrazeny terminály producenta a konzumenta):
| Waiting for messages... Publishing message: Hello world #0! | Done | Received message: Hello world #0! Publishing message: Hello world #1! | Done | Received message: Hello world #1! Publishing message: Hello world #2! | Done | Received message: Hello world #2! Publishing message: Hello world #3! | Done | Received message: Hello world #3! Publishing message: Hello world #4! | Done | Received message: Hello world #4! Publishing message: Hello world #5! | Done | Received message: Hello world #5! Publishing message: Hello world #6! | Done | Received message: Hello world #6! Publishing message: Hello world #7! | Done | Received message: Hello world #7! Publishing message: Hello world #8! | Done | Received message: Hello world #8! Publishing message: Hello world #9! | Done | Received message: Hello world #9!
10. Komunikace typu PUB-SUB v transakci
Zdrojový kód producenta zpráv:
#!/usr/bin/env python import time import stomp destination1 = "/topic/event" destination2 = "/topic/event2" MESSAGES = 10 conn = stomp.Connection([("localhost", 61613)]) conn.start() conn.connect(login="admin", passcode="admin") transaction=conn.begin() print(transaction) for i in range(0, MESSAGES): message = "Hello world #{i}!".format(i=i) print("Publishing message: " + message) conn.send(destination1, message, persistent='true', transaction=transaction) conn.send(destination2, message, persistent='true', transaction=transaction) time.sleep(1) print("Done") conn.commit(transaction=transaction) conn.disconnect()
Zdrojový kód konzumenta zpráv:
#!/usr/bin/env python import time import stomp class SimpleListener: def __init__(self, conn): self.conn = conn def on_message(self, headers, message): print("Received message: {m}".format(m=message)) def on_error(self, headers, message): print("Received an error {e}".format(e=message)) destination = "/topic/event" conn = stomp.Connection([("localhost", 61613)]) conn.set_listener('', SimpleListener(conn)) conn.start() conn.connect(login="admin", passcode="admin") conn.subscribe(id='simple_listener', destination=destination, ack='auto') print("Waiting for messages...") while True: time.sleep(10)
Chování takto nakonfigurovaného systému bude následující (vedle sebe jsou zobrazeny terminály producenta a konzumenta):
| Waiting for messages... Publishing message: Hello world #0! | Done | Publishing message: Hello world #1! | Done | Publishing message: Hello world #2! | Done | Publishing message: Hello world #3! | Done | Publishing message: Hello world #4! | Done | Publishing message: Hello world #5! | Done | Publishing message: Hello world #6! | Done | Publishing message: Hello world #7! | Done | Publishing message: Hello world #8! | Done | Publishing message: Hello world #9! | Done | Received message: Hello world #0! | Received message: Hello world #1! | Received message: Hello world #2! | Received message: Hello world #3! | Received message: Hello world #4! | Received message: Hello world #5! | Received message: Hello world #6! | Received message: Hello world #7! | Received message: Hello world #8! | Received message: Hello world #9!
11. Komunikace typu PUSH-PULL bez transakce
Zdrojový kód producenta zpráv:
#!/usr/bin/env python import time import stomp destination1 = "/queue/test" destination2 = "/queue/test2" MESSAGES = 10 conn = stomp.Connection([("localhost", 61613)]) conn.start() conn.connect(login="admin", passcode="admin") for i in range(0, MESSAGES): message = "Hello world #{i}!".format(i=i) print("Publishing message: " + message) conn.send(destination1, message, persistent='true') conn.send(destination2, message, persistent='true') time.sleep(2) print("Done") conn.disconnect()
Chování systému:
| Waiting for messages... Publishing message: Hello world #0! | Done | Received message: Hello world #0! Publishing message: Hello world #1! | Done | Received message: Hello world #1! Publishing message: Hello world #2! | Done | Received message: Hello world #2! Publishing message: Hello world #3! | Done | Received message: Hello world #3! Publishing message: Hello world #4! | Done | Received message: Hello world #4! Publishing message: Hello world #5! | Done | Received message: Hello world #5! Publishing message: Hello world #6! | Done | Received message: Hello world #6! Publishing message: Hello world #7! | Done | Received message: Hello world #7! Publishing message: Hello world #8! | Done | Received message: Hello world #8! Publishing message: Hello world #9! | Done | Received message: Hello world #9!
12. Komunikace typu PUSH-PULL v transakci
Zdrojový kód producenta zpráv:
#!/usr/bin/env python import time import stomp destination1 = "/queue/test" destination2 = "/queue/test2" MESSAGES = 10 conn = stomp.Connection([("localhost", 61613)]) conn.start() conn.connect(login="admin", passcode="admin") transaction=conn.begin() print(transaction) for i in range(0, MESSAGES): message = "Hello world #{i}!".format(i=i) print("Publishing message: " + message) conn.send(destination1, message, persistent='true', transaction=transaction) conn.send(destination2, message, persistent='true', transaction=transaction) time.sleep(1) print("Done") conn.commit(transaction=transaction) conn.disconnect()
Chování systému:
| Waiting for messages... Publishing message: Hello world #0! | Done | Publishing message: Hello world #1! | Done | Publishing message: Hello world #2! | Done | Publishing message: Hello world #3! | Done | Publishing message: Hello world #4! | Done | Publishing message: Hello world #5! | Done | Publishing message: Hello world #6! | Done | Publishing message: Hello world #7! | Done | Publishing message: Hello world #8! | Done | Publishing message: Hello world #9! | Done | Received message: Hello world #0! | Received message: Hello world #1! | Received message: Hello world #2! | Received message: Hello world #3! | Received message: Hello world #4! | Received message: Hello world #5! | Received message: Hello world #6! | Received message: Hello world #7! | Received message: Hello world #8! | Received message: Hello world #9!
13. Zrušení celé transakce (rollback namísto commit)
Zdrojový kód producenta zpráv:
#!/usr/bin/env python import time import stomp destination1 = "/queue/test" destination2 = "/queue/test2" MESSAGES = 10 conn = stomp.Connection([("localhost", 61613)]) conn.start() conn.connect(login="admin", passcode="admin") transaction=conn.begin() print(transaction) for i in range(0, MESSAGES): message = "Hello world #{i}!".format(i=i) print("Publishing message: " + message) conn.send(destination1, message, persistent='true', transaction=transaction) conn.send(destination2, message, persistent='true', transaction=transaction) time.sleep(1) print("Done") conn.abort(transaction=transaction) conn.disconnect()
Chování systému:
| Waiting for messages... Publishing message: Hello world #0! | Done | Publishing message: Hello world #1! | Done | Publishing message: Hello world #2! | Done | Publishing message: Hello world #3! | Done | Publishing message: Hello world #4! | Done | Publishing message: Hello world #5! | Done | Publishing message: Hello world #6! | Done | Publishing message: Hello world #7! | Done | Publishing message: Hello world #8! | Done | Publishing message: Hello world #9! | Done |
14. Využití heartbeat pro ověření funkčnosti jednotlivých částí systému
Při připojení klientů k message brokerovi je možné specifikovat parametr heartbeats, kterým se specifikuje perioda „tepů“ posílaných klientem a taktéž perioda „tepů“, které klient očekává:
#!/usr/bin/env python import time import stomp destination1 = "/topic/event" destination2 = "/topic/event2" MESSAGES = 10 conn = stomp.Connection([("localhost", 61613)], heartbeats=(0, 0)) conn.start() conn.connect(login="admin", passcode="admin") for i in range(0, MESSAGES): message = "Hello world #{i}!".format(i=i) print("Publishing message: " + message) conn.send(destination1, message, persistent='true') conn.send(destination2, message, persistent='true') time.sleep(0.5) print("Done") conn.disconnect()
V druhém klientovi je nastaveno, že „tep“ je očekáván každou sekundu. Pokud není detekován, je klient odpojen:
#!/usr/bin/env python import time import stomp class SimpleListener: def __init__(self, conn): self.conn = conn def on_message(self, headers, message): print("Received message: {m}".format(m=message)) def on_error(self, headers, message): print("Received an error {e}".format(e=message)) def on_disconnected(self): print("Disconnected") def on_heartbeat(self): print("Heartbeat") def on_heartbeat_timeout(self): print("Heartbeat timeout") destination = "/topic/event" conn = stomp.Connection([("localhost", 61613)], heartbeats=(0,1000)) conn.set_listener('', SimpleListener(conn)) conn.start() conn.connect(login="admin", passcode="admin") conn.subscribe(id='simple_listener', destination=destination, ack='auto') print("Waiting for messages...") while True: time.sleep(10) print("Done...")
Na straně příjemce zpráv získáme jak informace o přijatých zprávách, tak i o „heartbeat“:
Received message: Hello world #7! Heartbeat Heartbeat Heartbeat Heartbeat Heartbeat Received message: Hello world #0! Received message: Hello world #1!
Pokud „heartbeat“ nepřijmeme ve specifikovaném intervalu, bude klient odpojen:
Heartbeat timeout: diff_receive=1.5015023681335151, time=2968372.729240428, lastrec=2968371.22773806 Disconnected Disconnected Heartbeat timeout
15. Posílání binárních zpráv
V posledním příkladu si ukážeme jednu z možností posílání zpráv obsahujících binární data. Protokol STOMP sice přímo podporuje práci s binárními daty (zprávy nemusí být čistě textové), ovšem na problémy narazíme na straně producentů a konzumentů, zejména ve chvíli, kdy je producent realizován v jiném programovacím jazyce než konzument (popř. v odlišné verzi interpretru – typicky Python 2 vs. Python 3). Taktéž případné ladění, popř. využití telnetu a podobných nástrojů při hledání problémů v systému je v případě přímého posílání binárních dat komplikovanější.
Z tohoto důvodu může být výhodnější binární data nejdříve nějakým způsobem zakódovat do sekvence binárních znaků, například s využitím Base64, který má tu výhodu, že algoritmus kódování/dekódování lze vytvořit velmi snadno (pokud již pro daný programovací jazyk neexistuje příslušná knihovna – většinou je již k dispozici). Nevýhodou jsou delší zprávy, které musí být v message brokerovi uloženy (ve frontách) a pomalejší posílání či příjem zpráv – pokud se ovšem zpracovávají relativně krátké zprávy s menší frekvencí (desítky za sekundu pro jednoho producenta), nemělo by být zpoždění vůbec patrné.
Pro otestování posílání a příjmu zpráv s binárními daty vytvoříme producenta, který do fronty pošle data s rastrovým obrázkem. Samotný obrázek získáme snadno (jedná se o ikonu Vimu, v němž tento článek vznikl):
wget https://www.vim.org/images/vim_editor.gif
Obrázek 1: Ikona použitá jako příklad binárních dat.
16. Producent binární zprávy
Na straně producenta (zdroje) zpráv jsou provedeny tyto operace:
- Načtení obsahu binárního souboru s rastrovým obrázkem.
- Převod na sekvenci čitelných znaků (podmnožina ASCII) pomocí Base64, konkrétně funkcí base64.b64encode().
- Výsledek bude poslán do message brokera jako běžná zpráva.
Realizace:
with open("vim_editor.gif", mode="rb") as file: binaryData = file.read() print("Read {l} bytes from binary file".format(l=len(binaryData))) textData = base64.b64encode(binaryData) conn.send(destination, textData, persistent='true') print("Sent {l} characters".format(l=len(textData)))
Producent tedy může vypadat takto:
#!/usr/bin/env python import time import stomp import base64 destination = "/queue/test" MESSAGES = 10 conn = stomp.Connection(host_and_ports=[("localhost", 61613)]) conn.start() conn.connect(login="admin", passcode="admin") with open("vim_editor.gif", mode="rb") as file: binaryData = file.read() print("Read {l} bytes from binary file".format(l=len(binaryData))) textData = base64.b64encode(binaryData) conn.send(destination, textData, persistent='true') print("Sent {l} characters".format(l=len(textData))) conn.disconnect()
Povšimněte si, že délka zprávy (ve znacích) je větší, než počet načtených bajtů z binárního souboru:
Read 1091 bytes from binary file Sent 1456 characters
Pokud se nyní podíváme na fyzický tvar zprávy uložené do fronty přes webové rozhraní Active MQ, uvidíme, že zpráva byla skutečně převedena do BASE64:
Obrázek 2: Takto vypadá zpráva uložená do fronty nazvané „test“. Vlastní obsah zprávy je červeně orámován.
17. Konzument binární zprávy
Na straně konzumenta se provedou opačné kroky:
- Přečtení zprávy, o které víme, že používá kódování Base64.
- Zpětný převod na objekt typu bytes.
- Uložení získané sekvence bajtů do nového binárního souboru.
Realizace:
def on_message(self, headers, message): binaryData = base64.b64decode(message) print("Received {l} characters".format(l=len(message))) print(" converted into {l} bytes".format(l=len(binaryData))) with open("output.gif", "wb") as f: f.write(binaryData)
Celý zdrojový kód producenta zpráv:
#!/usr/bin/env python import time import stomp import base64 class SimpleListener: def __init__(self, conn): self.conn = conn def on_message(self, headers, message): binaryData = base64.b64decode(message) print("Received {l} characters".format(l=len(message))) print(" converted into {l} bytes".format(l=len(binaryData))) with open("output.gif", "wb") as f: f.write(binaryData) def on_error(self, headers, message): print("Received an error {e}".format(e=message)) destination = "/queue/test" conn = stomp.Connection(host_and_ports=[("localhost", 61613)]) conn.set_listener('', SimpleListener(conn)) conn.start() conn.connect(login="admin", passcode="admin") conn.subscribe(id='simple_listener', destination=destination, ack='auto') print("Waiting for messages...") while True: time.sleep(10)
Po spuštění konzumenta by se měla zpráva načíst, převést na binární tvar a uložit do souboru „output.gif“:
Waiting for messages... Received 1456 characters converted into 1091 bytes
$ cmp -l vim_editor.gif output.gif $ echo $? 0
18. Repositář s demonstračními příklady
Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v Pythonu byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce.
19. Odkazy na předchozí části seriálu
V této kapitole jsou uvedeny odkazy na všech devět předchozích částí seriálu, v němž se zabýváme různými způsoby implementace front zpráv a k nim přidružených technologií:
- Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí
https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/ - Celery: systém implementující asynchronní fronty úloh pro Python
https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python/ - Celery: systém implementující asynchronní fronty úloh pro Python (dokončení)
https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python-dokonceni/ - RabbitMQ: jedna z nejúspěšnějších implementací brokera
https://www.root.cz/clanky/rabbitmq-jedna-z-nejuspesnejsich-implementaci-brokera/ - Pokročilejší operace nabízené systémem RabbitMQ
https://www.root.cz/clanky/pokrocilejsi-operace-nabizene-systemem-rabbitmq/ - ØMQ: knihovna pro asynchronní předávání zpráv
https://www.root.cz/clanky/0mq-knihovna-pro-asynchronni-predavani-zprav/ - Další možnosti poskytované knihovnou ØMQ
https://www.root.cz/clanky/dalsi-moznosti-poskytovane-knihovnou-mq/ - Další možnosti nabízené knihovnou ØMQ, implementace protokolů ØMQ v čisté Javě
https://www.root.cz/clanky/dalsi-moznosti-nabizene-knihovnou-mq-implementace-protokolu-mq-v-ciste-jave/ - Apache ActiveMQ – další systém implementující message brokera
https://www.root.cz/clanky/apache-activemq-dalsi-system-implementujici-message-brokera/
20. Odkazy na Internetu
- Apache ActiveMQ
http://activemq.apache.org/ - Apache ActiveMQ Artemis
https://activemq.apache.org/artemis/ - Apache ActiveMQ Artemis User Manual
https://activemq.apache.org/artemis/docs/latest/index.html - KahaDB
http://activemq.apache.org/kahadb.html - Understanding the KahaDB Message Store
https://access.redhat.com/documentation/en-US/Fuse_MQ_Enterprise/7.1/html/Configuring_Broker_Persistence/files/KahaDBOverview.html - Command Line Tools (Apache ActiveMQ)
https://activemq.apache.org/activemq-command-line-tools-reference.html - stomp.py 4.1.21 na PyPi
https://pypi.org/project/stomp.py/ - Stomp Tutorial
https://access.redhat.com/documentation/en-US/Fuse_Message_Broker/5.5/html/Connectivity_Guide/files/FMBConnectivityStompTelnet.html - Heartbeat (computing)
https://en.wikipedia.org/wiki/Heartbeat_(computing) - Apache Camel
https://camel.apache.org/ - Red Hat Fuse
https://developers.redhat.com/products/fuse/overview/ - Confusion between ActiveMQ and ActiveMQ-Artemis?
https://serverfault.com/questions/873533/confusion-between-activemq-and-activemq-artemis - Staré stránky projektu HornetQ
http://hornetq.jboss.org/ - Snapshot JeroMQ verze 0.4.4
https://oss.sonatype.org/content/repositories/snapshots/org/zeromq/jeromq/0.4.4-SNAPSHOT/ - Difference between ActiveMQ vs Apache ActiveMQ Artemis
http://activemq.2283324.n4.nabble.com/Difference-between-ActiveMQ-vs-Apache-ActiveMQ-Artemis-td4703828.html - Microservices communications. Why you should switch to message queues
https://dev.to/matteojoliveau/microservices-communications-why-you-should-switch-to-message-queues–48ia - Stomp.py 4.1.19 documentation
https://stomppy.readthedocs.io/en/stable/ - Repositář knihovny JeroMQ
https://github.com/zeromq/jeromq/ - ØMQ – Distributed Messaging
http://zeromq.org/ - ØMQ Community
http://zeromq.org/community - Get The Software
http://zeromq.org/intro:get-the-software - PyZMQ Documentation
https://pyzmq.readthedocs.io/en/latest/ - Module: zmq.decorators
https://pyzmq.readthedocs.io/en/latest/api/zmq.decorators.html - ZeroMQ is the answer, by Ian Barber
https://vimeo.com/20605470 - ZeroMQ RFC
https://rfc.zeromq.org/ - ZeroMQ and Clojure, a brief introduction
https://antoniogarrote.wordpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/ - zeromq/czmq
https://github.com/zeromq/czmq - golang wrapper for CZMQ
https://github.com/zeromq/goczmq - ZeroMQ version reporting in Python
http://zguide.zeromq.org/py:version - A Go interface to ZeroMQ version 4
https://github.com/pebbe/zmq4 - Broker vs. Brokerless
http://zeromq.org/whitepapers:brokerless - Learning ØMQ with pyzmq
https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/ - Céčková funkce zmq_ctx_new
http://api.zeromq.org/4–2:zmq-ctx-new - Céčková funkce zmq_ctx_destroy
http://api.zeromq.org/4–2:zmq-ctx-destroy - Céčková funkce zmq_bind
http://api.zeromq.org/4–2:zmq-bind - Céčková funkce zmq_unbind
http://api.zeromq.org/4–2:zmq-unbind - Céčková C funkce zmq_connect
http://api.zeromq.org/4–2:zmq-connect - Céčková C funkce zmq_disconnect
http://api.zeromq.org/4–2:zmq-disconnect - Céčková C funkce zmq_send
http://api.zeromq.org/4–2:zmq-send - Céčková C funkce zmq_recv
http://api.zeromq.org/4–2:zmq-recv - Třída Context (Python)
https://pyzmq.readthedocs.io/en/latest/api/zmq.html#context - Třída Socket (Python)
https://pyzmq.readthedocs.io/en/latest/api/zmq.html#socket - Python binding
http://zeromq.org/bindings:python - Why should I have written ZeroMQ in C, not C++ (part I)
http://250bpm.com/blog:4 - Why should I have written ZeroMQ in C, not C++ (part II)
http://250bpm.com/blog:8 - About Nanomsg
https://nanomsg.org/ - Advanced Message Queuing Protocol
https://www.amqp.org/ - Advanced Message Queuing Protocol na Wikipedii
https://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol - Dokumentace k příkazu rabbitmqctl
https://www.rabbitmq.com/rabbitmqctl.8.html - RabbitMQ
https://www.rabbitmq.com/ - RabbitMQ Tutorials
https://www.rabbitmq.com/getstarted.html - RabbitMQ: Clients and Developer Tools
https://www.rabbitmq.com/devtools.html - RabbitMQ na Wikipedii
https://en.wikipedia.org/wiki/RabbitMQ - Streaming Text Oriented Messaging Protocol
https://en.wikipedia.org/wiki/Streaming_Text_Oriented_Messaging_Protocol - Message Queuing Telemetry Transport
https://en.wikipedia.org/wiki/MQTT - Erlang
http://www.erlang.org/ - pika 0.12.0 na PyPi
https://pypi.org/project/pika/ - Introduction to Pika
https://pika.readthedocs.io/en/stable/ - Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
http://clojurerabbitmq.info/ - AMQP 0–9–1 Model Explained
http://www.rabbitmq.com/tutorials/amqp-concepts.html - Part 1: RabbitMQ for beginners – What is RabbitMQ?
https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html - Downloading and Installing RabbitMQ
https://www.rabbitmq.com/download.html - celery na PyPi
https://pypi.org/project/celery/ - Databáze Redis (nejenom) pro vývojáře používající Python
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/ - Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/ - Redis Queue (RQ)
https://www.fullstackpython.com/redis-queue-rq.html - Python Celery & RabbitMQ Tutorial
https://tests4geeks.com/python-celery-rabbitmq-tutorial/ - Flower: Real-time Celery web-monitor
http://docs.celeryproject.org/en/latest/userguide/monitoring.html#flower-real-time-celery-web-monitor - Asynchronous Tasks With Django and Celery
https://realpython.com/asynchronous-tasks-with-django-and-celery/ - First Steps with Celery
http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html - node-celery
https://github.com/mher/node-celery - Full Stack Python: web development
https://www.fullstackpython.com/web-development.html - Introducing RQ
https://nvie.com/posts/introducing-rq/ - Asynchronous Tasks with Flask and Redis Queue
https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue - rq-dashboard
https://github.com/eoranged/rq-dashboard - Stránky projektu Redis
https://redis.io/ - Introduction to Redis
https://redis.io/topics/introduction - Try Redis
http://try.redis.io/ - Redis tutorial, April 2010 (starší, ale pěkně udělaný)
https://static.simonwillison.net/static/2010/redis-tutorial/ - Python Redis
https://redislabs.com/lp/python-redis/ - Redis: key-value databáze v paměti i na disku
https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/ - Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
http://www.cloudsvet.cz/?p=253 - Praktický úvod do Redis (2): transakce
http://www.cloudsvet.cz/?p=256 - Praktický úvod do Redis (3): cluster
http://www.cloudsvet.cz/?p=258 - Connection pool
https://en.wikipedia.org/wiki/Connection_pool - Instant Redis Sentinel Setup
https://github.com/ServiceStack/redis-config - How to install REDIS in LInux
https://linuxtechlab.com/how-install-redis-server-linux/ - Redis RDB Dump File Format
https://github.com/sripathikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format - Lempel–Ziv–Welch
https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch - Redis Persistence
https://redis.io/topics/persistence - Redis persistence demystified
http://oldblog.antirez.com/post/redis-persistence-demystified.html - Redis reliable queues with Lua scripting
http://oldblog.antirez.com/post/250 - Ost (knihovna)
https://github.com/soveran/ost - NoSQL
https://en.wikipedia.org/wiki/NoSQL - Shard (database architecture)
https://en.wikipedia.org/wiki/Shard_%28database_architecture%29 - What is sharding and why is it important?
https://stackoverflow.com/questions/992988/what-is-sharding-and-why-is-it-important - What Is Sharding?
https://btcmanager.com/what-sharding/ - Redis clients
https://redis.io/clients - Category:Lua-scriptable software
https://en.wikipedia.org/wiki/Category:Lua-scriptable_software - Seriál Programovací jazyk Lua
https://www.root.cz/serialy/programovaci-jazyk-lua/ - Redis memory usage
http://nosql.mypopescu.com/post/1010844204/redis-memory-usage - Ukázka konfigurace Redisu pro lokální testování
https://github.com/tisnik/presentations/blob/master/redis/redis.conf - Resque
https://github.com/resque/resque - Nested transaction
https://en.wikipedia.org/wiki/Nested_transaction - Publish–subscribe pattern
https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern - Messaging pattern
https://en.wikipedia.org/wiki/Messaging_pattern - Using pipelining to speedup Redis queries
https://redis.io/topics/pipelining - Pub/Sub
https://redis.io/topics/pubsub - ZeroMQ distributed messaging
http://zeromq.org/ - ZeroMQ: Modern & Fast Networking Stack
https://www.igvita.com/2010/09/03/zeromq-modern-fast-networking-stack/ - Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
https://stackoverflow.com/questions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr - Python & Redis PUB/SUB
https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7 - Message broker
https://en.wikipedia.org/wiki/Message_broker - RESP Arrays
https://redis.io/topics/protocol#array-reply - Redis Protocol specification
https://redis.io/topics/protocol - Redis Pub/Sub: Intro Guide
https://www.redisgreen.net/blog/pubsub-intro/ - Redis Pub/Sub: Howto Guide
https://www.redisgreen.net/blog/pubsub-howto/ - Comparing Publish-Subscribe Messaging and Message Queuing
https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message - Apache Kafka
https://kafka.apache.org/ - Iron
http://www.iron.io/mq - kue (založeno na Redisu, určeno pro node.js)
https://github.com/Automattic/kue - Cloud Pub/Sub
https://cloud.google.com/pubsub/ - Introduction to Redis Streams
https://redis.io/topics/streams-intro - glob (programming)
https://en.wikipedia.org/wiki/Glob_(programming) - Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
https://www.slideshare.net/sylvinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2 - Enqueueing internals
http://python-rq.org/contrib/ - queue — A synchronized queue class
https://docs.python.org/3/library/queue.html - Queue – A thread-safe FIFO implementation
https://pymotw.com/2/Queue/ - Queues
http://queues.io/ - Windows Subsystem for Linux Documentation
https://docs.microsoft.com/en-us/windows/wsl/about - RestMQ
http://restmq.com/ - ActiveMQ
http://activemq.apache.org/ - Amazon MQ
https://aws.amazon.com/amazon-mq/ - Amazon Simple Queue Service
https://aws.amazon.com/sqs/ - Celery: Distributed Task Queue
http://www.celeryproject.org/ - Disque, an in-memory, distributed job queue
https://github.com/antirez/disque - rq-dashboard
https://github.com/eoranged/rq-dashboard - Projekt RQ na PyPi
https://pypi.org/project/rq/ - rq-dashboard 0.3.12
https://pypi.org/project/rq-dashboard/ - Job queue
https://en.wikipedia.org/wiki/Job_queue - Why we moved from Celery to RQ
https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq - Running multiple workers using Celery
https://serverfault.com/questions/655387/running-multiple-workers-using-celery - celery — Distributed processing
http://docs.celeryproject.org/en/latest/reference/celery.html - Chains
https://celery.readthedocs.io/en/latest/userguide/canvas.html#chains - Routing
http://docs.celeryproject.org/en/latest/userguide/routing.html#automatic-routing - Celery Distributed Task Queue in Go
https://github.com/gocelery/gocelery/ - Python Decorators
https://wiki.python.org/moin/PythonDecorators - Periodic Tasks
http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html - celery.schedules
http://docs.celeryproject.org/en/latest/reference/celery.schedules.html#celery.schedules.crontab - Pros and cons to use Celery vs. RQ
https://stackoverflow.com/questions/13440875/pros-and-cons-to-use-celery-vs-rq - Priority queue
https://en.wikipedia.org/wiki/Priority_queue - Jupyter
https://jupyter.org/ - How IPython and Jupyter Notebook work
https://jupyter.readthedocs.io/en/latest/architecture/how_jupyter_ipython_work.html - Context Managers
http://book.pythontips.com/en/latest/context_managers.html