Obsah
1. Implementace různých komunikačních strategií s využitím knihovny nanomsg
2. Rozdíly mezi projekty nanomsg a ZeroMQ
3. Základní koncepty, na nichž je knihovna nanomsg postavena
4. Podporované přenosové mechanismy
5. Rozhraní pro další programovací jazyky
6. Instalace nanomsg ze zdrojových kódů
7. Demonstrační příklady ukazující základní komunikační vzory (strategie)
8. Aplikace posílající zprávy s využitím strategie PIPELINE
9. Aplikace přijímající zprávy s využitím strategie PIPELINE
10. Soubor Makefile pro překlad obou klientů
11. Vylepšení obou uzlů – přidání kontroly návratových kódů všech funkcí
12. Oboustranná komunikace mezi dvěma uzly (strategie PAIR)
13. Realizace prvního uzlu (klienta)
14. Realizace druhého uzlu (serveru)
15. Refaktoring klienta i serveru
16. Chování uzlů ve chvíli, kdy zvolíme špatnou komunikační strategii
17. Využití komunikační strategie PUBSUB
18. Repositář s demonstračními příklady
19. Odkazy na předchozí části seriálu o message brokerech
1. Implementace různých komunikačních strategií s využitím knihovny nanomsg
V dnešní části seriálu o message brokerech i o technologiích, které s message brokery souvisí, se seznámíme s projektem nazvaným nanomsg. Jedná se o následovníka (stále velmi často používané) knihovny ZeroMQ neboli ØMQ, s níž jsme se seznámili v trojici článků [1] [2] a taktéž [3]. Připomeňme si ve stručnosti, k čemu je vlastně knihovna ØMQ určena. Jedná se o relativně nízkoúrovňovou knihovnu vyvinutou v programovacím jazyce C++, která vývojářům nabízí implementaci různých tzv. komunikačních strategií. Tyto strategie je možné využít a popř. i vhodně zkombinovat při implementaci aplikací se složitější architekturou, v níž mezi sebou jednotlivé části komunikují s využitím synchronních či asynchronních zpráv, popř. přes takzvané brokery.
Na knihovnu ØMQ se ovšem můžeme dívat i z jiného pohledu, protože se jedná o abstrakci nad klasickými Berkeley sockety, ovšem s mnoha vylepšeními. V ØMQ je totiž možné zprávy odesílat asynchronně; samotné zpracování zpráv je provedeno na pozadí (ve vlastním vláknu), nemusíme se starat o délku zpráv ani o jejich případnou fragmentaci a do určité míry jsme odstíněni od toho, jaký konkrétní protokol bude pro komunikaci použit (IPC, TCP, atd.). Toto zjednodušení se ještě více projeví v těch programovacích jazycích, které se mohou postarat o automatické uvolňování prostředků (což je mj. i případ Pythonu, v němž je vytvořeno prvních pět demonstračních příkladů, s nimiž se seznámíme v navazujících kapitolách).
2. Rozdíly mezi projekty nanomsg a ZeroMQ
Mezi projekty nanomsg a ZeroMQ přirozeně najdeme několik rozdílů, které jsou velmi pěkně shrnuty na stránce https://nanomsg.org/documentation-zeromq.html. Jedná se o následující podstatné rozdíly:
- Nepoužívá se žádný kontext, tj. programátor nemusí volat funkcezmq_ctx_new a zmq_ctx_destroy.
- Samotné sockety vytvářené funkcí nn_socket jsou reprezentovány celým číslem; záporné číslo značí chybu.
- Sémantika funkcí nn_send, nn_recv atd. odpovídá POSIXu.
- Změnil se i samotný implementační jazyk, protože se přešlo od C++ k čistému céčku (což je minimálně zajímavý směr vývoje).
- Počet komunikačních strategií se rozšířil na šest, viz též navazující kapitoly.
- Při práci se zprávami je podporováno „zero-copy“, což je technika, kterou si vysvětlíme příště.
3. Základní koncepty, na nichž je knihovna nanomsg postavena
V knihovně nanomsg jsou připraveny funkce navržené takovým způsobem, aby se s jejich využitím daly implementovat protokoly založené na jedné ze šesti základních komunikačních strategiích (neboli komunikačních vzorech). Jména a stručné charakteristiky těchto strategií naleznete v následující tabulce:
# | Strategie/vzor | Stručný popis významu strategie |
---|---|---|
1 | PAIR | jedna z nejjednodušších komunikačních strategií s dvojicí uzlů a vazbou 1:1; komunikace je obecně obousměrná (samozřejmě lze použít i komunikaci jednosměrnou) |
2 | BUS | složitější strategie, v níž se používá obecnější vazba M:N; tuto strategii si popíšeme příště |
3 | PUBSUB | klasická komunikační strategie PUB-SUB neboli PUBLISH-SUBSCRIBE |
4 | REQREP | klasická komunikační strategie REQ-REP neboli REQUEST-RESPONSE, opět bude popsána příště |
5 | PIPELINE | jednosměrná komunikace buď s vazbami 1:1 (jeden vysílač a jeden přijímač), popř. mezi více vysílači a několika přijímači |
6 | SURVEY | speciální strategie umožňující získat stav více uzlů (procesů) jediným dotazem a mnoha odpovědmi; tato zcela nová strategie bude popsána v navazujícím článku |
U jednotlivých strategií/vzorů se v čase běhu aplikace kontroluje, zda jsou použity správně. Příkladem může být strategie pojmenovaná PIPELINE, v níž jeden komunikující uzel používá socket typu PUSH pro vysílání zpráv a druhý uzel naopak socket typu PULL pro příjem zpráv. V případě, že se pokusíme poslat zprávu opačným směrem, dojde k běhové chybě, kterou lze snadno detekovat, což si ukážeme v šestnácté kapitole.
Jen pro připomenutí, jaké strategie poskytuje původní knihovna ØMQ:
- PAIR – jednosměrné či obousměrné propojení dvou procesů, z nichž každý může běžet na odlišném počítači. Tato strategie se nejvíce přibližuje běžnému použití klasických Berkeley socketů. Prakticky stejná strategie, i když implementovaná odlišným způsobem, je součástí knihovny nanomsg.
- REQ-REP – jedná se o komunikaci typu požadavek-odpověď. Požadavky posílají klienti, odpovědi generuje server, který dokáže obsloužit prakticky libovolné množství klientů. Podobnou strategii nalezneme i v dnes popisované knihovně nanomsg pod názvem REQREP.
- PUB-SUB – server zde publikuje zprávy, k jejichž odběru se mohou přihlásit různí klienti. Zprávy je možné filtrovat na straně klientů (tato vlastnost se ovšem ve starších verzích ØMQ odlišuje). I tuto strategii nalezneme v knihovně nanomsg, pouze pod nepatrně odlišným názvem.
- PUSH-PULL – rozšíření předchozí strategie PUB-SUB: server či servery vytváří zprávy zpracovávané buď přímo připojenými workery nebo celou kolonou (pipeline) workerů. Tato strategie je v knihovně nanomsg rozšířena a zobecněna ve strategii pojmenované PIPELINE.
4. Podporované přenosové mechanismy
Aplikace (tj. jak klienti, tak i servery), které spolu mají komunikovat s využitím knihovny nanomsg, mohou využívat různé přenosové mechanismy. Ty jsou vypsány v následující tabulce, přičemž jsou jednotlivé mechanismy seřazeny podle své „lokality“. Například přenosový mechanismus nazvaný INPROC je použitelný pouze v případě, že spolu komunikují jednotlivé části aplikace běžící v rámci stejného procesu, typicky každá ve svém vlastním vláknu. Menší „lokalitu“ nabízí přenosový mechanismus pojmenovaný IPC, jenž umožňuje komunikovat mezi několika procesy, které ovšem musí běžet na stejném počítači. Pro komunikaci mezi aplikacemi běžícími na různých počítačích v rámci intranetu či internetu se používá buď klasické TCP (tj. skutečné přenosy dat přes protokol TCP) nebo WS (web sockety):
# | Přenosový mechanismus | Stručný popis |
---|---|---|
1 | INPROC | komunikace v rámci jednoho procesu, například typizovaná komunikace mezi několika vlákny (obecně nejrychlejší řešení) |
2 | IPC | komunikace mezi několika procesy běžícími na jednom počítači |
3 | TCP | komunikace mezi procesy běžícími na různých počítačích s využitím protokolu TCP |
4 | WS | komunikace mezi procesy běžícími na různých počítačích s využitím web socketů |
5. Rozhraní pro další programovací jazyky
Jak jsme si již řekli v úvodních kapitolách, je knihovna nanomsg vyvinuta v programovacím jazyku C a z tohoto důvodu je jejím primárním rozhraním právě API vázané na céčko. Ovšem to pochopitelně v žádném případě neznamená, že by tuto užitečnou knihovnu nebylo možné použít i v dalších programovacích jazycích. Ve skutečnosti je tomu právě naopak, protože existuje celá řada již hotových a otestovaných rozhraní, které nanomsg zpřístupní i vývojářům, kteří z různých důvodů preferují jiné programovací jazyky. Tato rozhraní jsou vypsána v následující tabulce:
Jazyk (platforma) | Knihovna/projekt s rozhraním |
---|---|
C | nanomsg |
NNG (nová reimplementace, viz poznámka v úvodní kapitole) | |
C++ | nanomsgxx |
cppnanomsg | |
nngpp (pro NNG) | |
Clojure | jnanomsg (voláno přes standardní Java interop) |
D | nanomsg-wrapper |
Dylan | nanomsg-dylan |
Erlang | enm |
Fortran | nanofort |
Go | mangos (reimplementace v Go) |
mangos v2 (druhá verze) | |
go-nanomsg | |
Haskell | nanomsg-haskell |
nanomsg | |
Haxe | hx-nanomsg |
Swift | swiftc nanomsg |
Java | jnano |
jnanomsg | |
nngjvm (pro NNG) | |
JavaScript (Node.js) | node-nanomsg |
Lua | lua-nanomsg |
luajit-nanomsg (pro systém LuaJIT) | |
luananomsg | |
.NET | NNanomsg |
Ocaml | onanomsg |
Perl | NanoMsg::Raw |
PHP | php-nano |
PicoLisp | picolisp-nanomsg FFI bindings |
Python | nanomsg-python |
pynanomsg | |
nnpy | |
pynng (pro NNG, prozatím ve vývoji) | |
R | rnanomsg |
Ruby | nn-core |
nanomsg | |
Rust | rust-nanomsg |
nng-rs (opět pro NNG) | |
Scheme (CHICKEN) | chicken-nanomsg |
Smalltalk | NanoStrand |
6. Instalace nanomsg ze zdrojových kódů
Knihovnu nanomsg i k ní příslušející nástroje lze nainstalovat buď ze zdrojových kódů nebo z repositářů Linuxové distribuce. Vzhledem k tomu, že v distribučních balíčcích nemusí být dostupná vždy poslední verze této knihovny, si ukážeme, jak se nanomsg překládá ze zdrojových kódů. Není to ve skutečnosti nic těžkého; budeme potřebovat pouze překladač céčka (například GCC), linker a taktéž nástroje make a CMake. Pokud budeme chtít, aby se při překladu vytvořily i soubory s nápovědou, je vyžadován nástroj asciidoctor, ovšem v tomto případě se jedná pouze o volitelný krok (překlad i bez asciidoctoru proběhne bez problémů).
Nejdříve je nutné stáhnout archiv se zdrojovými kódy knihovny a dalších k ní příslušejících nástrojů. V dnešním článku se zabýváme poslední stabilní verzí 1.1.5, která je dostupná na adrese https://github.com/nanomsg/nanomsg/archive/1.1.5.zip. Z této adresy archiv získáme takto:
$ wget https://github.com/nanomsg/nanomsg/archive/1.1.5.zip --2019-04-11 20:03:44-- https://github.com/nanomsg/nanomsg/archive/1.1.5.zip Resolving github.com (github.com)... 192.30.253.113, 192.30.253.112 Connecting to github.com (github.com)|192.30.253.113|:443... connected. HTTP request sent, awaiting response... 302 Found Location: https://codeload.github.com/nanomsg/nanomsg/zip/1.1.5 [following] --2019-04-11 20:03:45-- https://codeload.github.com/nanomsg/nanomsg/zip/1.1.5 Resolving codeload.github.com (codeload.github.com)... 192.30.253.120, 192.30.253.121 Connecting to codeload.github.com (codeload.github.com)|192.30.253.120|:443... connected. HTTP request sent, awaiting response... 200 OK Length: unspecified [application/zip] Saving to: ‘1.1.5.zip’ 2019-04-11 20:03:46 (1,16 MB/s) - ‘1.1.5.zip’ saved [661276]
Ve druhém kroku archiv běžným způsobem rozbalíme, což je triviální:
$ unzip 1.1.5.zip Archive: 1.1.5.zip 1749fd7b039165a91b8d556b4df18e3e632ad830 creating: nanomsg-1.1.5/
Následně přejdeme do adresáře nanomsg-1.1.5, který vznikl po rozbalení archivu:
$ cd nanomsg-1.1.5/
Nyní je již možné přistoupit k vlastnímu překladu. Necháme si vygenerovat soubor Makefile s využitím utility cmake:
$ cmake . -- The C compiler identification is GNU 7.3.1 -- Performing Test NN_HAVE_GCC_ATOMIC_BUILTINS -- Performing Test NN_HAVE_GCC_ATOMIC_BUILTINS - Success CMake Warning at CMakeLists.txt:294 (message): Could not find asciidoctor: skipping docs -- Configuring done -- Generating done -- Build files have been written to: /home/ptisnovs/nanomsg-1.1.5
Nyní již máme soubor Makefile vytvořený, takže je možné spustit nástroj make, který provede vlastní překlad a slinkování výsledné knihovny i podpůrných nástrojů (samotný překlad bude na moderním HW trvat několik sekund):
$ make Scanning dependencies of target nanomsg [ 1%] Building C object src/CMakeFiles/nanomsg.dir/core/ep.c.o [ 1%] Building C object src/CMakeFiles/nanomsg.dir/core/global.c.o [ 2%] Building C object src/CMakeFiles/nanomsg.dir/core/pipe.c.o [ 2%] Building C object src/CMakeFiles/nanomsg.dir/core/poll.c.o [ 3%] Building C object src/CMakeFiles/nanomsg.dir/core/sock.c.o ... ... ... [100%] Linking C executable symbol [100%] Built target symbol
Výsledkem překladu by měly být minimálně tyto soubory:
Soubor | Stručný popis |
---|---|
/usr/local/lib64/libnanomsg.so.5.1.0 | samotná knihovna s implementací všech přenosových mechanismů |
/usr/local/lib64/libnanomsg.so.5 | symbolický odkaz na předchozí soubor |
/usr/local/lib64/libnanomsg.so | symbolický odkaz na předchozí soubor, typicky se právě tento soubor předává linkeru |
/usr/local/include/nanomsg/nn.h | hlavičkový soubor se základními funkcemi a datovými typy knihovny nanomsg |
/usr/local/include/nanomsg/pair.h | konstanty a funkce používané v komunikační strategii PAIR |
/usr/local/include/nanomsg/pipeline.h | konstanty a funkce používané v komunikační strategii PIPELINE |
/usr/local/include/nanomsg/pubsub.h | konstanty a funkce používané v komunikační strategii PUBSUB |
/usr/local/include/nanomsg/survey.h | konstanty a funkce používané v komunikační strategii SURVEY |
/usr/local/include/nanomsg/reqrep.h | konstanty a funkce používané v komunikační strategii REQREP |
/usr/local/include/nanomsg/bus.h | konstanty a funkce používané v komunikační strategii BUS |
7. Demonstrační příklady ukazující základní komunikační vzory (strategie)
V navazujících kapitolách si ukážeme demonstrační příklady, v nichž postupně použijeme základní komunikační vzory (neboli komunikační strategie), které nám knihovna nanomsg nabízí. Všechny příklady budou vyvinuty v programovacím jazyku C (jsou kompatibilní s ANSI C), protože rozhraními pro další programovací jazyky se budeme zabývat v navazující části tohoto seriálu. Příklady jsou navíc napsány s ohledem na co největší stručnost zápisu, takže u některých z nich nejsou provedeny všechny kontroly chyb, jež by se ovšem v produkčním kódu pochopitelně měly použít (na případné možnosti vylepšení se ovšem taktéž zaměříme).
Pro překlad, slinkování a spuštění je u každého příkladu použit Makefile, takže využijete příkaz/nástroj make, který by měl být nainstalován (ovšem bez tohoto nástroje nebude možné zkompilovat ani nainstalovat samotnou knihovnu nanomsg, viz též předchozí kapitolu).
Ve všech dále popisovaných aplikacích se používá stejný postup:
- Nejprve je nutné vytvořit a inicializovat socket, a to s využitím funkce nazvané nn_socket.
- Socket se na straně serveru otevře funkcí nn_bind, zatímco u klienta se používá funkce nn_connect. Rozdíl mezi oběma funkcemi odpovídá postupu při navazování připojení: server naslouchá na určitém portu (pokud se jedná o TCP), zatímco klient se k portu připojuje. To, který komunikující uzel bude serverem a který klientem, je většinou ponecháno na rozhodnutí programátora.
- Následně je již možné posílat zprávy, a to buď jedním směrem (strategiePIPELINE apod.) nebo směry oběma (strategie PAIR apod.). Pro posílání zpráv je určena funkce nn_send, pro jejich příjem pak funkce pojmenovaná nn_recv. Použití „recv“ namísto celého slova „receive“ vychází ze snahy sémanticky se přiblížit klasickým Berkeley socketům.
- Nakonec se spojení ukončí zavoláním funkcenn_shutdown.
Jen pro připomenutí si ukažme, do jaké míry je odlišné pojetí knihovny ØMQ. Tato knihovna totiž používá takzvaný kontext, který je nutné vytvořit ještě před konstrukcí socketu a na konci aplikace je ho nutné korektně deaktivovat. Příkladem může být jednoduchý klient přijímající zprávy na lokální adrese a portu 5556:
#include <unistd.h> #include <stdio.h> #include <string.h> #include <zmq.h> #define BUFFER_LENGTH 32 int main() { char buffer[BUFFER_LENGTH]; char *address = "tcp://localhost:5556"; void *context = zmq_ctx_new(); void *socket = zmq_socket(context, ZMQ_PAIR); zmq_connect(socket, address); printf("Connected to address %s\n", address); while (1) { int num = zmq_recv(socket, buffer, BUFFER_LENGTH-1, 0); buffer[num] = '\0'; printf("Received '%s'\n", buffer); } zmq_close(socket); zmq_ctx_destroy(context); return 0; }
Vidíme, že základní postup sice skutečně zůstává zachován: zmq_socket, zmq_connect, zmq_send/zmq_recv a konečně zmq_close, ovšem navíc se musíme postarat o kontext pomocí funkcí zmq_ctx_new a zmq_ctx_destroy.
8. Aplikace posílající zprávy s využitím strategie PIPELINE
Strategie nazvaná PIPELINE zajišťuje jednosměrný přenos zpráv od vysílající aplikace (vlákna, procesu) k aplikaci přijímající. V tom nejjednodušším případě existuje pouze jediný vysílač (neboli zdroj zpráv, producent) a jediný přijímač (konzument). Nejprve se podívejme na implementaci producenta, která je nepatrně jednodušší, zejména s ohledem na to, že se nemusí pracovat s bufferem pro příjem zprávy. Zdrojový kód naleznete na adrese úplný zdrojový kód producenta; podrobnější popis jednotlivých kroků bude uveden pod tímto výpisem:
#include <stdio.h> #include <string.h> #include <unistd.h> #include <nanomsg/nn.h> #include <nanomsg/pipeline.h> const char *URL = "ipc:///tmp/example1"; void sender(const char *url, const char *message) { int message_size = strlen(message) + 1; int socket; int endpoint; int bytes; socket = nn_socket(AF_SP, NN_PUSH); puts("Socket created"); endpoint = nn_connect(socket, url); puts("Remote endpoint added to the socket"); printf("Sending message '%s'\n", message); bytes = nn_send(socket, message, message_size, 0); printf("Message with length %d bytes sent, flushing", bytes); sleep(1); puts("Done"); nn_shutdown(socket, endpoint); } int main(const int argc, const char **argv) { sender(URL, "Hello"); sender(URL, "world"); sender(URL, "!"); return 0; }
Popišme si nyní jednotlivé části producenta. Zajímavé je už určení URL použité pro připojení ke konzumentovi. Povšimněte si, že se URL skládá z určení komunikačního (přenosového) mechanismu zmíněného ve čtvrté kapitole, za nímž následují znaky „://“ (bez uvozovek). Za těmito znaky již následuje konkrétní určení adresy, které je ovšem závislé na použitém přenosovém mechanismu. U IPC se jedná o jméno speciálního souboru, v našem případě o jméno „/tmp/example1“ (proto se v URL nachází trojice lomítek za sebou, i když jsou podporovány i relativní cesty):
const char *URL = "ipc:///tmp/example1";
Samotná implementace producenta se nachází ve funkci sender. Nejprve vytvoříme socket s uvedením jeho typu, což je u strategie PIPELINE typ NN_PUSH u producenta a NN_PULL u konzumenta:
socket = nn_socket(AF_SP, NN_PUSH);
Dále k socketu přiřadíme koncový bod specifikovaný adresou (URL):
endpoint = nn_connect(socket, url);
V této chvíli je již možné přes socket posílat zprávy případným konzumentům. Poslání zprávy je realizováno funkcí nn_send, které se předá jak socket, tak i vlastní zpráva současně s její délkou. Vzhledem k tomu, že zpráva je považována za sekvenci bajtů a nikoli za řetězec ukončený nulou (ASCIIZ), je nutné délku skutečně vypočítat a specifikovat:
bytes = nn_send(socket, message, message_size, 0);
Dále – což je ovšem velké zjednodušení – je nutné počkat, až se zpráva skutečně přenese z interního bufferu, který je knihovnou nanomsg udržován. Pro dosažení co největší jednoduchosti použijeme funkci sleep:
sleep(1);
Zcela poslední operací je uzavření připojení a všech alokovaných prostředků, což zajišťuje funkce nn_shutdown:
nn_shutdown(socket, endpoint);
Tímto způsobem jsou poslány celkem tři zprávy.
9. Aplikace přijímající zprávy s využitím strategie PIPELINE
Podobným způsobem je možné realizovat klienta, který bude zprávy přijímat. Vysílací strana používá socket typu NN_PUSH, strana přijímací tedy použije socket NN_PULL:
socket = nn_socket(AF_SP, NN_PULL);
Odlišné bude i navázání připojení. Ve vysílači jsme použili nn_connect, zde tedy musíme pro zachování symetrie použít nn_bind (popř. volání otočit, podle toho, který z klientů je stabilnější a který tedy má plnit funkci serveru):
nn_bind(socket, url);
Zpráva se čte/přijímá funkcí nn_recv, které musíme předat identifikátor socketu, adresu proměnné, která bude obsahovat ukazatel na automaticky alokovaný buffer (jedná se tedy o ukazatel na ukazatel), délku zprávy popř. konstantu NN_MSG pokud se má buffer alokovat automaticky a konečně případné příznaky:
char *message = NULL; int bytes = nn_recv(socket, &message, NN_MSG, 0);
Vzhledem k tomu, že byl buffer alokován automaticky knihovnou nanomsg, musíme zaručit i jeho pozdější dealokaci, a to zavoláním:
nn_freemsg(message);
Úplný zdrojový kód přijímače naleznete na adrese https://github.com/tisnik/message-queues-examples/blob/master/nanomsg/01_simple_sender_receiver/receiver.c. Jeho ukončení se provede například pomocí Ctrl+C nebo příkazem kill:
#include <stdio.h> #include <nanomsg/nn.h> #include <nanomsg/pipeline.h> const char *URL = "ipc:///tmp/example1"; void receiver(const char *url) { int socket; socket = nn_socket(AF_SP, NN_PULL); puts("Socket created"); nn_bind(socket, url); puts("Endpoint bound to socket"); puts("Waiting for messages..."); while (1) { char *message = NULL; int bytes = nn_recv(socket, &message, NN_MSG, 0); printf("Received message '%s' with length %d bytes\n", message, bytes); nn_freemsg(message); } } int main(int argc, char **argv) { receiver(URL); return 0; }
10. Soubor Makefile pro překlad obou klientů
Na adrese https://github.com/tisnik/message-queues-examples/blob/master/nanomsg/01_simple_sender_receiver/Makefile naleznete soubor Makefile určený pro překlad obou klientů, tj. jak vysílače, tak i přijímače:
CC=gcc LINKER=gcc LIBS=nanomsg CFLAGS=-O0 -Wall -ansi -pedantic LFLAGS=-l$(LIBS) LIBRARY_PATH=/usr/local/lib64/ .PHONY: clean run_sender run_receiver all: sender receiver %.o: %.c $(CC) -c -o $@ $(CFLAGS) $< sender: sender.o $(CC) -o $@ $(LFLAGS) $< receiver: receiver.o $(CC) -o $@ $(LFLAGS) $< clean: rm -f sender.o \ rm -f receiver.o \ rm -f sender \ rm -f receiver run_sender: LD_LIBRARY_PATH=$(LIBRARY_PATH) ./sender run_receiver: LD_LIBRARY_PATH=$(LIBRARY_PATH) ./receiver
Samotný překlad obou klientů zajistí příkaz:
$ make
Spuštění přijímače v jednom terminálu se provede příkazem:
$ make run_receiver
Spuštění vysílače pak příkazem:
$ make run_sender
11. Vylepšení obou uzlů – přidání kontroly návratových kódů všech funkcí
Oba uzlů je vhodné vylepšit, a to takovým způsobem, že se přidají kontroly návratových kódů všech funkcí z knihovny nanomsg. Většina těchto funkcí vrací záporné číslo v případě chyby, což je stav, který můžeme velmi snadno otestovat. Navíc ještě přidáme jak do vysílače, tak i do přijímače volání funkce nn_shutdown, aby se skutečně provedlo korektní odpojení obou klientů (to sice není zcela striktně vyžadováno, ovšem jedná se o dobré vychování programátora).
Nejprve si ukažme upravený zdrojový kód vysílače:
#include <stdlib.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include <nanomsg/nn.h> #include <nanomsg/pipeline.h> const char *URL = "ipc:///tmp/example2"; void report_error(const char *func) { fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno())); exit(1); } void sender(const char *url, const char *message) { int message_size = strlen(message) + 1; int socket; int endpoint; int bytes; if ((socket = nn_socket(AF_SP, NN_PUSH)) < 0) { report_error("nn_socket"); } puts("Socket created"); if ((endpoint = nn_connect(socket, url)) < 0) { report_error("nn_connect"); } puts("Remote endpoint added to the socket"); printf("Sending message '%s'\n", message); if ((bytes = nn_send(socket, message, message_size, 0)) < 0) { report_error("nn_send"); } printf("Message with length %d bytes sent, flushing", bytes); sleep(1); puts("Done"); if (nn_shutdown(socket, endpoint) < 0) { report_error("nn_shutdown"); } } int main(const int argc, const char **argv) { sender(URL, "Hello"); sender(URL, "world"); sender(URL, "!"); return 0; }
Následuje upravený zdrojový kód přijímače:
#include <stdlib.h> #include <stdio.h> #include <nanomsg/nn.h> #include <nanomsg/pipeline.h> const char *URL = "ipc:///tmp/example2"; void report_error(const char *func) { fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno())); exit(1); } void receiver(const char *url) { int socket; int endpoint; if ((socket = nn_socket(AF_SP, NN_PULL)) < 0) { report_error("nn_socket"); } puts("Socket created"); if ((endpoint = nn_bind(socket, url)) < 0) { report_error("nn_bind"); } puts("Endpoint bound to socket"); puts("Waiting for messages..."); while (1) { char *message = NULL; int bytes; if ((bytes = nn_recv(socket, &message, NN_MSG, 0)) < 0) { report_error("nn_recv"); } printf("Received message '%s' with length %d bytes\n", message, bytes); if (nn_freemsg(message) < 0) { report_error("nn_freemsg"); } } } int main(int argc, char **argv) { receiver(URL); return 0; }
12. Oboustranná komunikace mezi dvěma uzly (strategie PAIR)
Druhá komunikační strategie, s níž se v dnešním článku alespoň ve stručnosti seznámíme, se jmenuje PAIR. Tato strategie umožňuje, aby mezi sebou jednotlivé uzly komunikovaly oboustranně, což nebylo při použití strategie PIPELINE možné (přesněji možné bylo, ovšem otevřením dvou komunikačních kanálů, což je velmi křehké řešení). I u strategie PAIR vystupuje jeden z komunikujících uzlů ve funkci serveru (otevírá svůj port a očekává, že se na něj připojí klient) a druhý uzel ve funkci klienta. Ovšem až na tento rozdíl jsou si po navázání spojení oba uzly rovnocenné, tj. každý z nich může vysílat i přijímat zprávy, a to libovolným způsobem, který si zvolí sám programátor. Ten například může implementovat jednoduchý systém typu dotaz-odpověď (což je ovšem lepší realizovat strategií REQREP) či skutečně použít plnohodnotný oboustranný komunikační kanál.
13. Realizace prvního uzlu (klienta)
Podívejme se nejprve na způsob realizace prvního uzlu, který je naprogramován jako klient, protože používá funkci nn_connect a nikoli nn_bind. Po navázání připojení pošle klient zprávu serveru a očekává jeho odpověď, opět ve funkci běžné zprávy. To, že se jedná o odpověď, je tedy řešeno čistě logikou aplikace. Pořadí volání funkcí knihovny nanomsg:
- nn_socket
- nn_connect
- nn_send
- nn_recv + nn_freemsg
- nn_shutdown
Úplný zdrojový kód klienta naleznete na adrese https://github.com/tisnik/message-queues-examples/blob/master/nanomsg/03_simple_pair_communication/first.c:
#include <stdlib.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include <nanomsg/nn.h> #include <nanomsg/pair.h> const char *URL = "ipc:///tmp/example3"; void report_error(const char *func) { fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno())); exit(1); } void first(const char *url) { int socket; int endpoint; int bytes; char *message = NULL; char *response = NULL; if ((socket = nn_socket(AF_SP, NN_PAIR)) < 0) { report_error("nn_socket"); } puts("Socket created"); if ((endpoint = nn_connect(socket, url)) < 0) { report_error("nn_connect"); } puts("Remote endpoint added to the socket"); message = "Hello from 'first'!"; printf("Sending message '%s'\n", message); if ((bytes = nn_send(socket, message, strlen(message)+1, 0)) < 0) { report_error("nn_send"); } printf("Message with length %d bytes sent, flushing\n", bytes); sleep(1); puts("Waiting for response..."); response = NULL; if ((bytes = nn_recv(socket, &response, NN_MSG, 0)) < 0) { report_error("nn_recv"); } printf("Received response '%s' with length %d bytes\n", response, bytes); if (nn_freemsg(response) < 0) { report_error("nn_freemsg"); } if (nn_shutdown(socket, endpoint) < 0) { report_error("nn_shutdown"); } puts("Shutdown completed"); } int main(const int argc, const char **argv) { first(URL); return 0; }
14. Realizace druhého uzlu (serveru)
Druhý uzel je realizován jako server, opět z toho důvodu, že používá funkci nn_bind a nikoli nn_connect. Jedná se o velmi primitivní server, který přijme zprávu od klienta a následně mu pošle textovou odpověď „ACK!“. Pořadí volání funkcí knihovny nanomsg je nyní odlišné:
- nn_socket
- nn_bind
- nn_recv + nn_freemsg
- nn_send
- nn_shutdown
Úplný zdrojový kód serveru naleznete na adrese https://github.com/tisnik/message-queues-examples/blob/master/nanomsg/03_simple_pair_communication/second.c:
#include <stdlib.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include <nanomsg/nn.h> #include <nanomsg/pair.h> const char *URL = "ipc:///tmp/example3"; void report_error(const char *func) { fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno())); exit(1); } void second(const char *url) { int socket; int endpoint; int bytes; char *message = NULL; char *response = NULL; if ((socket = nn_socket(AF_SP, NN_PAIR)) < 0) { report_error("nn_socket"); } puts("Socket created"); if ((endpoint = nn_bind(socket, url)) < 0) { report_error("nn_bind"); } puts("Endpoint bound to socket"); puts("Waiting for message..."); if ((bytes = nn_recv(socket, &message, NN_MSG, 0)) < 0) { report_error("nn_recv"); } printf("Received message '%s' with length %d bytes\n", message, bytes); if (nn_freemsg(message) < 0) { report_error("nn_freemsg"); } response = "ACK!"; printf("Sending response '%s'\n", response); if ((bytes = nn_send(socket, response, strlen(response)+1, 0)) < 0) { report_error("nn_send"); } printf("Response with length %d bytes sent, flushing\n", bytes); sleep(1); if (nn_shutdown(socket, endpoint) < 0) { report_error("nn_shutdown"); } puts("Shutdown completed"); } int main(int argc, char **argv) { second(URL); return 0; }
15. Refaktoring klienta i serveru
Oba výše realizované uzly, tj. jak klienta, tak i server, můžeme upravit takovým způsobem, aby se namísto dlouhé „špagety“ s voláním funkcí knihovny nanomsg použilo několik specializovaných uživatelských funkcí. Například klient bude používat funkce se sémanticky správnými názvy send_message a receive_response. Jeho úplný zdrojový kód bude vypadat následovně:
#include <stdlib.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include <nanomsg/nn.h> #include <nanomsg/pair.h> const char *URL = "ipc:///tmp/example4"; void report_error(const char *func) { fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno())); exit(1); } void send_message(const int socket, const char *message) { int bytes; printf("Sending message '%s'\n", message); if ((bytes = nn_send(socket, message, strlen(message)+1, 0)) < 0) { report_error("nn_send"); } printf("Message with length %d bytes sent, flushing\n", bytes); sleep(1); } void receive_response(socket) { char *response = NULL; int bytes; if ((bytes = nn_recv(socket, &response, NN_MSG, 0)) < 0) { report_error("nn_recv"); } printf("Received response '%s' with length %d bytes\n", response, bytes); if (nn_freemsg(response) < 0) { report_error("nn_freemsg"); } } void first(const char *url) { int socket; int endpoint; if ((socket = nn_socket(AF_SP, NN_PAIR)) < 0) { report_error("nn_socket"); } puts("Socket created"); if ((endpoint = nn_connect(socket, url)) < 0) { report_error("nn_connect"); } puts("Remote endpoint added to the socket"); send_message(socket, "Hello from 'first'!"); puts("Waiting for response..."); receive_response(socket); if (nn_shutdown(socket, endpoint) < 0) { report_error("nn_shutdown"); } puts("Shutdown completed"); } int main(const int argc, const char **argv) { first(URL); return 0; }
V případě serveru použijeme naopak uživatelské funkce pojmenované receive_message a send_response, takže jeho modifikovaný zdrojový kód bude vypadat takto:
#include <stdlib.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include <nanomsg/nn.h> #include <nanomsg/pair.h> const char *URL = "ipc:///tmp/example4"; void report_error(const char *func) { fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno())); exit(1); } void receive_message(socket) { char *message = NULL; int bytes; if ((bytes = nn_recv(socket, &message, NN_MSG, 0)) < 0) { report_error("nn_recv"); } printf("Received message '%s' with length %d bytes\n", message, bytes); if (nn_freemsg(message) < 0) { report_error("nn_freemsg"); } } void send_response(const int socket, const char *response) { int bytes; printf("Sending response '%s'\n", response); if ((bytes = nn_send(socket, response, strlen(response)+1, 0)) < 0) { report_error("nn_send"); } printf("Response with length %d bytes sent, flushing\n", bytes); sleep(1); } void second(const char *url) { int socket; int endpoint; if ((socket = nn_socket(AF_SP, NN_PAIR)) < 0) { report_error("nn_socket"); } puts("Socket created"); if ((endpoint = nn_bind(socket, url)) < 0) { report_error("nn_bind"); } puts("Endpoint bound to socket"); puts("Waiting for message..."); receive_message(socket); send_response(socket, "ACK!"); if (nn_shutdown(socket, endpoint) < 0) { report_error("nn_shutdown"); } puts("Shutdown completed"); } int main(int argc, char **argv) { second(URL); return 0; }
16. Chování uzlů ve chvíli, kdy zvolíme špatnou komunikační strategii
Nyní si zkusíme otestovat, co se stane ve chvíli, kdy při implementaci jednotlivých uzlů zvolíme špatnou komunikační strategii. Příkladem může být klient z předchozího příkladu, u nějž ovšem namísto strategie PAIR zvolíme strategii PIPELINE. Úprava (resp. přesněji řečeno rozbití) zdrojového kódu je přímočará:
#include <stdlib.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include <nanomsg/nn.h> #include <nanomsg/pipeline.h> const char *URL = "ipc:///tmp/example5"; void report_error(const char *func) { fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno())); exit(1); } void send_message(const int socket, const char *message) { int bytes; printf("Sending message '%s'\n", message); if ((bytes = nn_send(socket, message, strlen(message)+1, 0)) < 0) { report_error("nn_send"); } printf("Message with length %d bytes sent, flushing\n", bytes); sleep(1); } void receive_response(socket) { char *response = NULL; int bytes; if ((bytes = nn_recv(socket, &response, NN_MSG, 0)) < 0) { report_error("nn_recv"); } printf("Received response '%s' with length %d bytes\n", response, bytes); if (nn_freemsg(response) < 0) { report_error("nn_freemsg"); } } void first(const char *url) { int socket; int endpoint; if ((socket = nn_socket(AF_SP, NN_PULL)) < 0) { report_error("nn_socket"); } puts("Socket created"); if ((endpoint = nn_connect(socket, url)) < 0) { report_error("nn_connect"); } puts("Remote endpoint added to the socket"); send_message(socket, "Hello from 'first'!"); puts("Waiting for response..."); receive_response(socket); if (nn_shutdown(socket, endpoint) < 0) { report_error("nn_shutdown"); } puts("Shutdown completed"); } int main(const int argc, const char **argv) { first(URL); return 0; }
Při pokusu o spuštění klienta získáme pouze informaci o tom, že zvolená kombinace není správná – ve strategii PIPELINE není možné uplatnit obousměrnou komunikaci:
Socket created Remote endpoint added to the socket Sending message 'Hello from 'first'!' nn_send: Operation not supported
Podobným způsobem samozřejmě můžeme upravit/rozbít i serverovou část:
#include <stdlib.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include <nanomsg/nn.h> #include <nanomsg/pipeline.h> const char *URL = "ipc:///tmp/example5"; void report_error(const char *func) { fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno())); exit(1); } void receive_message(socket) { char *message = NULL; int bytes; if ((bytes = nn_recv(socket, &message, NN_MSG, 0)) < 0) { report_error("nn_recv"); } printf("Received message '%s' with length %d bytes\n", message, bytes); if (nn_freemsg(message) < 0) { report_error("nn_freemsg"); } } void send_response(const int socket, const char *response) { int bytes; printf("Sending response '%s'\n", response); if ((bytes = nn_send(socket, response, strlen(response)+1, 0)) < 0) { report_error("nn_send"); } printf("Response with length %d bytes sent, flushing\n", bytes); sleep(1); } void second(const char *url) { int socket; int endpoint; if ((socket = nn_socket(AF_SP, NN_PUSH)) < 0) { report_error("nn_socket"); } puts("Socket created"); if ((endpoint = nn_bind(socket, url)) < 0) { report_error("nn_bind"); } puts("Endpoint bound to socket"); puts("Waiting for message..."); receive_message(socket); send_response(socket, "ACK!"); if (nn_shutdown(socket, endpoint) < 0) { report_error("nn_shutdown"); } puts("Shutdown completed"); } int main(int argc, char **argv) { second(URL); return 0; }
Ani pokus o spuštění serveru nedopadne zcela korektně, což je patrné z následujícího výpisu:
Socket created Endpoint bound to socket Waiting for message... nn_recv: Operation not supported
17. Využití komunikační strategie PUBSUB
V samotném závěru článku si ještě ukážeme způsob využití komunikační strategie PUBSUB, s níž jsme se již v tomto seriálu mnohokrát setkali. Tato strategie umožňuje rozesílat zprávy libovolnému množství příjemců. Zdroj zpráv bude v tomto případě implementován jako server a typ použitého socketu bude NN_PUB. Podívejme se na zdrojový kód producenta/zdroje zpráv. Vidíme, že producent posílá PIN všech platebních karet:
#include <stdlib.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include <nanomsg/nn.h> #include <nanomsg/pubsub.h> const char *URL = "ipc:///tmp/example6"; void report_error(const char *func) { fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno())); exit(1); } void send_message(const int socket, const char *message) { int bytes; printf("Sending message '%s'\n", message); if ((bytes = nn_send(socket, message, strlen(message)+1, 0)) < 0) { report_error("nn_send"); } printf("Message with length %d bytes sent, flushing\n", bytes); sleep(1); } void publisher(const char *url) { int socket; int endpoint; if ((socket = nn_socket(AF_SP, NN_PUB)) < 0) { report_error("nn_socket"); } puts("Socket created"); if ((endpoint = nn_bind(socket, url)) < 0) { report_error("nn_bind"); } puts("Remote endpoint bound to the socket"); while (1) { char buffer[45]; int number = rand() % 10000; sprintf(buffer, "Hello, this is my top secret PIN: %04d", number); send_message(socket, buffer); } if (nn_shutdown(socket, endpoint) < 0) { report_error("nn_shutdown"); } puts("Shutdown completed"); } int main(const int argc, const char **argv) { publisher(URL); return 0; }
Konzumenta zpráv je již nutné realizovat nepatrně složitějším způsobem, protože musíme specifikovat téma (topic), které má být přijímáno. Pro jednoduchost se přihlásíme k příjmu všech témat:
if (nn_setsockopt(socket, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) < 0) { report_error("nn_setsockopt"); }
Samotné zprávy jsou již přijímány klasicky v nekonečné smyčce:
#include <stdlib.h> #include <stdio.h> #include <string.h> #include <unistd.h> #include <nanomsg/nn.h> #include <nanomsg/pubsub.h> const char *URL = "ipc:///tmp/example6"; void report_error(const char *func) { fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno())); exit(1); } void receive_message(socket) { char *message = NULL; int bytes; if ((bytes = nn_recv(socket, &message, NN_MSG, 0)) < 0) { report_error("nn_recv"); } printf("Received message '%s' with length %d bytes\n", message, bytes); if (nn_freemsg(message) < 0) { report_error("nn_freemsg"); } } void subscriber(const char *url) { int socket; int endpoint; if ((socket = nn_socket(AF_SP, NN_SUB)) < 0) { report_error("nn_socket"); } puts("Socket created"); if (nn_setsockopt(socket, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) < 0) { report_error("nn_setsockopt"); } if ((endpoint = nn_connect(socket, url)) < 0) { report_error("nn_connect"); } puts("Endpoint connected to socket"); puts("Waiting for messages..."); while (1) { receive_message(socket); } if (nn_shutdown(socket, endpoint) < 0) { report_error("nn_shutdown"); } puts("Shutdown completed"); } int main(int argc, char **argv) { subscriber(URL); return 0; }
18. Repositář s demonstračními příklady
Zdrojové kódy všech dnes popsaných demonstračních příkladů vyvinutých v programovacím jazyku Go byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce. Každý příklad se skládá ze dvou samostatně překládaných a spouštěných souborů – producenta zpráv a konzumenta zpráv:
19. Odkazy na předchozí části seriálu o message brokerech
V této kapitole jsou uvedeny odkazy na všech čtrnáct předchozích částí seriálu, v němž se zabýváme různými způsoby implementace front zpráv a k nim přidružených technologií message brokerů:
- Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí
https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/ - Celery: systém implementující asynchronní fronty úloh pro Python
https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python/ - Celery: systém implementující asynchronní fronty úloh pro Python (dokončení)
https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python-dokonceni/ - RabbitMQ: jedna z nejúspěšnějších implementací brokera
https://www.root.cz/clanky/rabbitmq-jedna-z-nejuspesnejsich-implementaci-brokera/ - Pokročilejší operace nabízené systémem RabbitMQ
https://www.root.cz/clanky/pokrocilejsi-operace-nabizene-systemem-rabbitmq/ - ØMQ: knihovna pro asynchronní předávání zpráv
https://www.root.cz/clanky/0mq-knihovna-pro-asynchronni-predavani-zprav/ - Další možnosti poskytované knihovnou ØMQ
https://www.root.cz/clanky/dalsi-moznosti-poskytovane-knihovnou-mq/ - Další možnosti nabízené knihovnou ØMQ, implementace protokolů ØMQ v čisté Javě
https://www.root.cz/clanky/dalsi-moznosti-nabizene-knihovnou-mq-implementace-protokolu-mq-v-ciste-jave/ - Apache ActiveMQ – další systém implementující message brokera
https://www.root.cz/clanky/apache-activemq-dalsi-system-implementujici-message-brokera/ - Použití Apache ActiveMQ s protokolem STOMP
https://www.root.cz/clanky/pouziti-apache-activemq-s-protokolem-stomp/ - Použití Apache ActiveMQ s protokolem AMQP, jazyk Go a message brokeři
https://www.root.cz/clanky/pouziti-apache-activemq-s-protokolem-amqp-jazyk-go-a-message-brokeri/ - Komunikace s message brokery z programovacího jazyka Go
https://www.root.cz/clanky/komunikace-s-message-brokery-z-programovaciho-jazyka-go/ - Použití message brokeru NATS
https://www.root.cz/clanky/pouziti-message-brokeru-nats/ - NATS Streaming Server
https://www.root.cz/clanky/nats-streaming-server/
20. Odkazy na Internetu
- nanomsg na GitHubu
https://github.com/nanomsg/nanomsg - Referenční příručka knihovny nanomsg
https://nanomsg.org/v1.1.5/nanomsg.html - nng (nanomsg-next-generation)
https://github.com/nanomsg/nng - Differences between nanomsg and ZeroMQ
https://nanomsg.org/documentation-zeromq.html - NATS
https://nats.io/about/ - NATS Streaming Concepts
https://nats.io/documentation/streaming/nats-streaming-intro/ - NATS Streaming Server
https://nats.io/download/nats-io/nats-streaming-server/ - NATS Introduction
https://nats.io/documentation/ - NATS Client Protocol
https://nats.io/documentation/internals/nats-protocol/ - NATS Messaging (Wikipedia)
https://en.wikipedia.org/wiki/NATS_Messaging - Stránka Apache Software Foundation
http://www.apache.org/ - Informace o portu 5672
http://www.tcp-udp-ports.com/port-5672.htm - Třída MessagingHandler knihovny Qpid Proton
https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/proton._handlers.MessagingHandler-class.html - Třída Event knihovny Qpid Proton
https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/proton._events.Event-class.html - package stomp (Go)
https://godoc.org/github.com/go-stomp/stomp - Go language library for STOMP protocol
https://github.com/go-stomp/stomp - python-qpid-proton 0.26.0 na PyPi
https://pypi.org/project/python-qpid-proton/ - Qpid Proton
http://qpid.apache.org/proton/ - Using the AMQ Python Client
https://access.redhat.com/documentation/en-us/red_hat_amq/7.1/html-single/using_the_amq_python_client/ - Apache ActiveMQ
http://activemq.apache.org/ - Apache ActiveMQ Artemis
https://activemq.apache.org/artemis/ - Apache ActiveMQ Artemis User Manual
https://activemq.apache.org/artemis/docs/latest/index.html - KahaDB
http://activemq.apache.org/kahadb.html - Understanding the KahaDB Message Store
https://access.redhat.com/documentation/en-US/Fuse_MQ_Enterprise/7.1/html/Configuring_Broker_Persistence/files/KahaDBOverview.html - Command Line Tools (Apache ActiveMQ)
https://activemq.apache.org/activemq-command-line-tools-reference.html - stomp.py 4.1.21 na PyPi
https://pypi.org/project/stomp.py/ - Stomp Tutorial
https://access.redhat.com/documentation/en-US/Fuse_Message_Broker/5.5/html/Connectivity_Guide/files/FMBConnectivityStompTelnet.html - Heartbeat (computing)
https://en.wikipedia.org/wiki/Heartbeat_(computing) - Apache Camel
https://camel.apache.org/ - Red Hat Fuse
https://developers.redhat.com/products/fuse/overview/ - Confusion between ActiveMQ and ActiveMQ-Artemis?
https://serverfault.com/questions/873533/confusion-between-activemq-and-activemq-artemis - Staré stránky projektu HornetQ
http://hornetq.jboss.org/ - Snapshot JeroMQ verze 0.4.4
https://oss.sonatype.org/content/repositories/snapshots/org/zeromq/jeromq/0.4.4-SNAPSHOT/ - Difference between ActiveMQ vs Apache ActiveMQ Artemis
http://activemq.2283324.n4.nabble.com/Difference-between-ActiveMQ-vs-Apache-ActiveMQ-Artemis-td4703828.html - Microservices communications. Why you should switch to message queues
https://dev.to/matteojoliveau/microservices-communications-why-you-should-switch-to-message-queues–48ia - Stomp.py 4.1.19 documentation
https://stomppy.readthedocs.io/en/stable/ - Repositář knihovny JeroMQ
https://github.com/zeromq/jeromq/ - ØMQ – Distributed Messaging
http://zeromq.org/ - ØMQ Community
http://zeromq.org/community - Get The Software
http://zeromq.org/intro:get-the-software - PyZMQ Documentation
https://pyzmq.readthedocs.io/en/latest/ - Module: zmq.decorators
https://pyzmq.readthedocs.io/en/latest/api/zmq.decorators.html - ZeroMQ is the answer, by Ian Barber
https://vimeo.com/20605470 - ZeroMQ RFC
https://rfc.zeromq.org/ - ZeroMQ and Clojure, a brief introduction
https://antoniogarrote.wordpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/ - zeromq/czmq
https://github.com/zeromq/czmq - golang wrapper for CZMQ
https://github.com/zeromq/goczmq - ZeroMQ version reporting in Python
http://zguide.zeromq.org/py:version - A Go interface to ZeroMQ version 4
https://github.com/pebbe/zmq4 - Broker vs. Brokerless
http://zeromq.org/whitepapers:brokerless - Learning ØMQ with pyzmq
https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/ - Céčková funkce zmq_ctx_new
http://api.zeromq.org/4–2:zmq-ctx-new - Céčková funkce zmq_ctx_destroy
http://api.zeromq.org/4–2:zmq-ctx-destroy - Céčková funkce zmq_bind
http://api.zeromq.org/4–2:zmq-bind - Céčková funkce zmq_unbind
http://api.zeromq.org/4–2:zmq-unbind - Céčková C funkce zmq_connect
http://api.zeromq.org/4–2:zmq-connect - Céčková C funkce zmq_disconnect
http://api.zeromq.org/4–2:zmq-disconnect - Céčková C funkce zmq_send
http://api.zeromq.org/4–2:zmq-send - Céčková C funkce zmq_recv
http://api.zeromq.org/4–2:zmq-recv - Třída Context (Python)
https://pyzmq.readthedocs.io/en/latest/api/zmq.html#context - Třída Socket (Python)
https://pyzmq.readthedocs.io/en/latest/api/zmq.html#socket - Python binding
http://zeromq.org/bindings:python - Why should I have written ZeroMQ in C, not C++ (part I)
http://250bpm.com/blog:4 - Why should I have written ZeroMQ in C, not C++ (part II)
http://250bpm.com/blog:8 - About Nanomsg
https://nanomsg.org/ - Advanced Message Queuing Protocol
https://www.amqp.org/ - Advanced Message Queuing Protocol na Wikipedii
https://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol - Dokumentace k příkazu rabbitmqctl
https://www.rabbitmq.com/rabbitmqctl.8.html - RabbitMQ
https://www.rabbitmq.com/ - RabbitMQ Tutorials
https://www.rabbitmq.com/getstarted.html - RabbitMQ: Clients and Developer Tools
https://www.rabbitmq.com/devtools.html - RabbitMQ na Wikipedii
https://en.wikipedia.org/wiki/RabbitMQ - Streaming Text Oriented Messaging Protocol
https://en.wikipedia.org/wiki/Streaming_Text_Oriented_Messaging_Protocol - Message Queuing Telemetry Transport
https://en.wikipedia.org/wiki/MQTT - Erlang
http://www.erlang.org/ - pika 0.12.0 na PyPi
https://pypi.org/project/pika/ - Introduction to Pika
https://pika.readthedocs.io/en/stable/ - Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
http://clojurerabbitmq.info/ - AMQP 0–9–1 Model Explained
http://www.rabbitmq.com/tutorials/amqp-concepts.html - Part 1: RabbitMQ for beginners – What is RabbitMQ?
https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html - Downloading and Installing RabbitMQ
https://www.rabbitmq.com/download.html - celery na PyPi
https://pypi.org/project/celery/ - Databáze Redis (nejenom) pro vývojáře používající Python
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/ - Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/ - Redis Queue (RQ)
https://www.fullstackpython.com/redis-queue-rq.html - Python Celery & RabbitMQ Tutorial
https://tests4geeks.com/python-celery-rabbitmq-tutorial/ - Flower: Real-time Celery web-monitor
http://docs.celeryproject.org/en/latest/userguide/monitoring.html#flower-real-time-celery-web-monitor - Asynchronous Tasks With Django and Celery
https://realpython.com/asynchronous-tasks-with-django-and-celery/ - First Steps with Celery
http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html - node-celery
https://github.com/mher/node-celery - Full Stack Python: web development
https://www.fullstackpython.com/web-development.html - Introducing RQ
https://nvie.com/posts/introducing-rq/ - Asynchronous Tasks with Flask and Redis Queue
https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue - rq-dashboard
https://github.com/eoranged/rq-dashboard - Stránky projektu Redis
https://redis.io/ - Introduction to Redis
https://redis.io/topics/introduction - Try Redis
http://try.redis.io/ - Redis tutorial, April 2010 (starší, ale pěkně udělaný)
https://static.simonwillison.net/static/2010/redis-tutorial/ - Python Redis
https://redislabs.com/lp/python-redis/ - Redis: key-value databáze v paměti i na disku
https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/ - Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
http://www.cloudsvet.cz/?p=253 - Praktický úvod do Redis (2): transakce
http://www.cloudsvet.cz/?p=256 - Praktický úvod do Redis (3): cluster
http://www.cloudsvet.cz/?p=258 - Connection pool
https://en.wikipedia.org/wiki/Connection_pool - Instant Redis Sentinel Setup
https://github.com/ServiceStack/redis-config - How to install REDIS in LInux
https://linuxtechlab.com/how-install-redis-server-linux/ - Redis RDB Dump File Format
https://github.com/sripathikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format - Lempel–Ziv–Welch
https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch - Redis Persistence
https://redis.io/topics/persistence - Redis persistence demystified
http://oldblog.antirez.com/post/redis-persistence-demystified.html - Redis reliable queues with Lua scripting
http://oldblog.antirez.com/post/250 - Ost (knihovna)
https://github.com/soveran/ost - NoSQL
https://en.wikipedia.org/wiki/NoSQL - Shard (database architecture)
https://en.wikipedia.org/wiki/Shard_%28database_architecture%29 - What is sharding and why is it important?
https://stackoverflow.com/questions/992988/what-is-sharding-and-why-is-it-important - What Is Sharding?
https://btcmanager.com/what-sharding/ - Redis clients
https://redis.io/clients - Category:Lua-scriptable software
https://en.wikipedia.org/wiki/Category:Lua-scriptable_software - Seriál Programovací jazyk Lua
https://www.root.cz/serialy/programovaci-jazyk-lua/ - Redis memory usage
http://nosql.mypopescu.com/post/1010844204/redis-memory-usage - Ukázka konfigurace Redisu pro lokální testování
https://github.com/tisnik/presentations/blob/master/redis/redis.conf - Resque
https://github.com/resque/resque - Nested transaction
https://en.wikipedia.org/wiki/Nested_transaction - Publish–subscribe pattern
https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern - Messaging pattern
https://en.wikipedia.org/wiki/Messaging_pattern - Using pipelining to speedup Redis queries
https://redis.io/topics/pipelining - Pub/Sub
https://redis.io/topics/pubsub - ZeroMQ distributed messaging
http://zeromq.org/ - ZeroMQ: Modern & Fast Networking Stack
https://www.igvita.com/2010/09/03/zeromq-modern-fast-networking-stack/ - Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
https://stackoverflow.com/questions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr - Python & Redis PUB/SUB
https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7 - Message broker
https://en.wikipedia.org/wiki/Message_broker - RESP Arrays
https://redis.io/topics/protocol#array-reply - Redis Protocol specification
https://redis.io/topics/protocol - Redis Pub/Sub: Intro Guide
https://www.redisgreen.net/blog/pubsub-intro/ - Redis Pub/Sub: Howto Guide
https://www.redisgreen.net/blog/pubsub-howto/ - Comparing Publish-Subscribe Messaging and Message Queuing
https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message - Apache Kafka
https://kafka.apache.org/ - Iron
http://www.iron.io/mq - kue (založeno na Redisu, určeno pro node.js)
https://github.com/Automattic/kue - Cloud Pub/Sub
https://cloud.google.com/pubsub/ - Introduction to Redis Streams
https://redis.io/topics/streams-intro - glob (programming)
https://en.wikipedia.org/wiki/Glob_(programming) - Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
https://www.slideshare.net/sylvinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2 - Enqueueing internals
http://python-rq.org/contrib/ - queue — A synchronized queue class
https://docs.python.org/3/library/queue.html - Queue – A thread-safe FIFO implementation
https://pymotw.com/2/Queue/ - Queues
http://queues.io/ - Windows Subsystem for Linux Documentation
https://docs.microsoft.com/en-us/windows/wsl/about - RestMQ
http://restmq.com/ - ActiveMQ
http://activemq.apache.org/ - Amazon MQ
https://aws.amazon.com/amazon-mq/ - Amazon Simple Queue Service
https://aws.amazon.com/sqs/ - Celery: Distributed Task Queue
http://www.celeryproject.org/ - Disque, an in-memory, distributed job queue
https://github.com/antirez/disque - rq-dashboard
https://github.com/eoranged/rq-dashboard - Projekt RQ na PyPi
https://pypi.org/project/rq/ - rq-dashboard 0.3.12
https://pypi.org/project/rq-dashboard/ - Job queue
https://en.wikipedia.org/wiki/Job_queue - Why we moved from Celery to RQ
https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq - Running multiple workers using Celery
https://serverfault.com/questions/655387/running-multiple-workers-using-celery - celery — Distributed processing
http://docs.celeryproject.org/en/latest/reference/celery.html - Chains
https://celery.readthedocs.io/en/latest/userguide/canvas.html#chains - Routing
http://docs.celeryproject.org/en/latest/userguide/routing.html#automatic-routing - Celery Distributed Task Queue in Go
https://github.com/gocelery/gocelery/ - Python Decorators
https://wiki.python.org/moin/PythonDecorators - Periodic Tasks
http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html - celery.schedules
http://docs.celeryproject.org/en/latest/reference/celery.schedules.html#celery.schedules.crontab - Pros and cons to use Celery vs. RQ
https://stackoverflow.com/questions/13440875/pros-and-cons-to-use-celery-vs-rq - Priority queue
https://en.wikipedia.org/wiki/Priority_queue - Jupyter
https://jupyter.org/ - How IPython and Jupyter Notebook work
https://jupyter.readthedocs.io/en/latest/architecture/how_jupyter_ipython_work.html - Context Managers
http://book.pythontips.com/en/latest/context_managers.html