Obsah
1. Fronty zpráv podle Systemu V
2. Tři příkazy a čtveřice knihovních funkcí použitých při práci s frontami zpráv
4. Získání klíče použitého pro vytvoření nové fronty
5. Vytvoření nové fronty funkcí msgget
6. Poslání zprávy do fronty funkcí msgsnd
7. Připojení k existující frontě a přečtení zprávy funkcí msgrcv
8. Spuštění producenta i konzumenta zprávy
9. Připojení k existující frontě a přečtení zprávy
10. Producent posílající více zpráv
11. Konzument akceptující postupně všechny zprávy
12. Chování producenta při zaplnění fronty
13. Poslání zprávy do fronty bez čekání
14. Přečtení doplňujících informací o frontě funkcí msgctl
15. Filtrace zpráv podle nastaveného typu
16. Možné konfigurace: více producentů a jeden konzument, jeden konzument a více producentů atd.
17. Konzumenti a producenti s filtrací zpráv
18. Obsah dalšího pokračování seriálu
19. Repositář s demonstračními příklady
1. Fronty zpráv podle Systemu V
Pokud se podíváme na seznam předchozích částí seriálu o message brokerech, uvidíme, že jsme se prozatím zabývali popisem mnoha více či méně komplikovaných a současně i pokročilých technologií, od relativně jednoduchých knihoven typu ØMQ a Nanomsg přes klasické message brokery typu RabbitMQ a Apache ActiveMQ až po sofistikované systémy, které se již z klasifikace message brokerů vymykají (příkladem je NATS Streaming Server a Nsq). Všechny výše zmíněné nástroje je nutné explicitně nainstalovat a patřičným způsobem nakonfigurovat.
Ovšem už v základní instalaci Linuxu (a vlastně i většiny dalších unixových systémů – až na několik výjimek) mají vývojáři k dispozici hned dvě implementace front zpráv určených pro meziprocesovou komunikaci v rámci jednoho systému. První implementace vychází ze Systemu V, druhá je pak definována v POSIXu. V dnešním článku se zaměříme na popis prvního systému založeného na implementaci použité v Systemu V, což je mj. i jeden z přímých předchůdců Solarisu, ovšem vlastnosti Systemu V se postupně dostaly do dalších unixových systémů i do Linuxu. Tyto fronty zpráv lze v současnosti použít prakticky kdekoli a navíc se jedná o relativně jednoduše zvládnutelnou technologii, pro jejíž využití postačuje znát pouze tři příkazy a čtyři knihovní funkce (volatelné z céčka a tím pádem i z prakticky jakéhokoli programovacího jazyka s FFI či obdobnou technologií). Ovšem pozor – ani jedna implementace plně nenahrazuje klasické message brokery – jedná se o nástroje vhodné skutečně především pro implementaci komunikace mezi několika procesy, které tak nemusí být pevně svázány; navíc je veškerá komunikace asynchronní. Ovšem například není zajištěna persistence zpráv, potvrzení zpracování zprávy atd.
2. Tři příkazy a čtveřice knihovních funkcí použitých při práci s frontami zpráv
V úvodní kapitole jsme si mj. řekli, že pro práci se systémem front zpráv odvozených od Systemu V je zapotřebí znát pouze trojici příkazů a čtveřici knihovních funkcí. Nejprve se podívejme na příkazy, které se při práci s frontami používají:
# | Příkaz | Stručný popis příkazu |
---|---|---|
1 | ipcs | získání informací o vybraném prostředku používaném pro IPC |
2 | ipcmk | vytvoření prostředku používaného pro IPC (mk=make) |
3 | ipcrm | odstranění prostředku používaného pro IPC (rm=remove) |
V aplikacích a nástrojích vytvářených v programovacím jazyku C (což je ve světě Unixu standardní systémový jazyk) se pro práci s frontami zpráv mohou použít následující čtyři funkce, jejichž konkrétní příklady použití si ukážeme v navazujících kapitolách:
# | Funkce | Stručný popis funkce |
---|---|---|
1 | msgget | vytvoření nové fronty či získání existující fronty s daným identifikátorem (celé číslo) |
2 | msgctl | provedení jedné ze tří funkcí: informace o frontě, změna nastavení fronty nebo smazání fronty |
3 | msgsnd | poslání zprávy do fronty (s čekáním či bez čekání při zaplnění fronty, podle nastavení příznaků) |
4 | msgrcv | přečtení zprávy z fronty (s čekáním či bez čekání v případě prázdné fronty, podle nastavení příznaků) |
Všechny čtyři výše zmíněné funkce budou použity v dnešních demonstračních příkladech.
3. Příkazy ipcs a ipcrm
Nejprve se podívejme na základní operace poskytované příkazy ipcs a ipcrm, které byly vypsány v tabulce v předchozí kapitole. Tyto příkazy je možné použít pro administraci front, zjišťování jejich aktuálního stavu, jejich mazání atd. Pokud spustíme příkaz ipcs bez parametrů, zobrazí se seznam všech existujících prostředků použitých pro IPC, tedy pro komunikaci mezi několika procesy. Jak již víme, může se jednat o semafory, sdílenou paměť a právě i o fronty zpráv:
$ ipcs
Na mém testovacím počítači se původně (před spuštěním dnešních demonstračních příkazů) vypsaly informace pouze o několika regionech sdílené paměti, ovšem žádné fronty ani semafory neexistovaly, resp. přesněji řečeno je žádné aplikace nevytvořily a nepoužívaly:
------ Message Queues -------- key msqid owner perms used-bytes messages ------ Shared Memory Segments -------- key shmid owner perms bytes nattch status 0x00000000 1146880 ptisnovs 600 524288 2 dest 0x00000000 2686977 ptisnovs 600 7925760 2 dest 0x00000000 2719746 ptisnovs 600 393216 2 dest 0x00000000 2588675 ptisnovs 600 7925760 2 dest 0x00000000 2818052 ptisnovs 600 524288 2 dest 0x00000000 2850821 ptisnovs 600 393216 2 dest 0x00000000 4423686 ptisnovs 600 155648 2 dest 0x00000000 3145735 ptisnovs 600 2867200 2 dest 0x00000000 3112968 ptisnovs 600 2867200 2 dest 0x00000000 3342345 ptisnovs 600 135168 2 dest 0x00000000 3309578 ptisnovs 600 135168 2 dest 0x00000000 4456459 ptisnovs 600 155648 2 dest ------ Semaphore Arrays -------- key semid owner perms nsems
Zobrazit je možné i informace o vlastnících jednotlivých prostředků, a to konkrétně přepínačem -c:
$ ipcs -c ------ Message Queues Creators/Owners -------- msqid perms cuid cgid uid gid 0 660 ptisnovs ptisnovs ptisnovs ptisnovs ------ Shared Memory Segment Creators/Owners -------- shmid perms cuid cgid uid gid 1146880 600 ptisnovs ptisnovs ptisnovs ptisnovs ------ Semaphore Arrays Creators/Owners -------- semid perms cuid cgid uid gid
A dokonce si můžeme zobrazit i informace o procesech, které prostředky naposledy využily:
$ ipcs -p ------ Message Queues PIDs -------- msqid owner lspid lrpid 0 ptisnovs 5586 5560 ------ Shared Memory Creator/Last-op PIDs -------- shmid owner cpid lpid 1146880 ptisnovs 1901 1811
Využití (utilizaci) prostředků získáme příkazem:
$ ipcs -u ------ Shared Memory Status -------- segments allocated 3 pages allocated 320 pages resident 112 pages swapped 0 Swap performance: 0 attempts 0 successes ------ Semaphore Status -------- used arrays = 0 allocated semaphores = 0 ------ Messages Status -------- allocated queues = 0 used headers = 0 used space = 0 bytes
Aktuálně nastavené limity (počet prostředků, maximální kapacita paměti, počet front atd.) je možné vypsat po zadání přepínače -l:
$ ipcs -l ------ Shared Memory Limits -------- max number of segments = 4096 max seg size (kbytes) = 32768 max total shared memory (kbytes) = 8388608 min seg size (bytes) = 1 ------ Semaphore Limits -------- max number of arrays = 128 max semaphores per array = 250 max semaphores system wide = 32000 max ops per semop call = 32 semaphore max value = 32767 ------ Messages Limits -------- max queues system wide = 1250 max size of message (bytes) = 8192 default max size of queue (bytes) = 16384
Nás však budou zajímat pouze informace o frontách, takže navíc přidáme i přepínač -q:
$ ipcs -l -q ------ Messages Limits -------- max queues system wide = 937 max size of message (bytes) = 8192 default max size of queue (bytes) = 16384
Na dalším testovacím počítači zobrazíme počet front – bude zpočátku nulový:
$ ipcs -q ------ Message Queues -------- key msqid owner perms used-bytes messages
Pokud by nějaká fronta existovala, byl by výpis odlišný (zde konkrétně ukazuje sedm zpráv o celkové délce 700 bajtů):
$ ipcs -q ------ Message Queues -------- key msqid owner perms used-bytes messages 0x2e010dbb 0 tester 660 700 7
Fronta má ID=0 (msqid=message queue ID), takže ji můžeme smazat příkazem icrm:
$ ipcrm -q 0
Nyní již při novém výpisu front frontu s ID=0 neuvidíme:
$ ipcs -q ------ Message Queues -------- key msqid owner perms used-bytes messages
4. Získání klíče použitého pro vytvoření nové fronty
Dva základní příkazy pro práci (nejenom) s frontami už známe, takže se nyní podívejme na to, jak lze s frontami pracovat z programovacího jazyka C či z libovolného programovacího jazyka, který má přístup ke standardním céčkovským knihovnách (Python a LuaJIT přes FFI atd. atd.). Pro vytvoření fronty či pro přístup k existující frontě se používá takzvaný klíč (key), což je celé číslo typu key_t, které by v ideálním případě mělo být na daném systému unikátní a jedinečné. Z tohoto důvodu se klíč vytváří funkcí ftok, které se předá jméno libovolného souboru a další celé číslo. Funkce frok na základě inode předaného souboru a druhého čísla (z něhož se v tradičních systémech přebírá spodních osm bitů) vytvoří klíč, který lze použít ve všech procesech, které budou frontu používat. Pokud tato funkce z nějakého důvodu zhavaruje (neexistující soubor atd.), vrátí se chybová hodnota –1:
key_t key; key = ftok("/home/tester/.bashrc", 1234); if (key == -1) { perror("Unable to generate key"); return 2; } printf("Key: %x\n", key);
Teoreticky by se pro zadanou dvojici jméno_souboru+celé číslo měl vytvořit unikátní klíč, který se nebude rovnat klíči získanému s jakýmikoli jinými parametry. Ovšem pokud je v systému připojeno (mount) více souborových systémů, mohou být inode různých souborů (na různých souborových systémech) shodné a tím pádem bude shodný i vygenerovaný klíč. Také záleží na konkrétním algoritmu výpočtu – někdy se bere v úvahu jen 16 bitů z čísla inode, což může vést ke kolizi i v případě, že se používá jen jediný připojený systém. Na druhou stranu se většinou příliš mnoho front nepoužívá (což jsme ostatně viděli i v předchozí kapitole), takže nebezpečí reálné kolize bude nízké.
key_t key; key = ftok("/home/tester/.bashrc", 'x');
5. Vytvoření nové fronty funkcí msgget
Jakmile máme k dispozici klíč, který bude shodný pro všechny procesy používající frontu, můžeme se pokusit o vytvoření nové fronty, a to konkrétně funkcí msgget. Jméno této funkce může být poněkud matoucí, ovšem jedná se o funkci pro získání obecné fronty – ať již existující, tak nové. V případě, že tato funkce vrátí kladné číslo nebo nulu, jedná se o identifikátor fronty, pokud vrátí hodnotu –1, značí to chybu:
int queue_id; queue_id = msgget(key, IPC_CREAT | 0660); if (queue_id == -1) { perror("Unable to get message queue identifier"); return 2; }
Povšimněte si druhého parametru funkce msgget. Zde se předává několik bitových příznaků, přičemž příznak (bit) IPC_CREAT udává, že se má vytvořit nová fronta a další příznaky slouží k nastavení práv ke frontě (ta jsou podobná jako práva k souborům, ovšem až na příznak x, který zde nemá význam – frontu není možné „spustit“). V našem případě jsme nastavili příznaky 0660 (osmičkově), tedy čtení i zápis pro vlastníka i skupinu. Ovšem pochopitelně můžeme namísto čísla použít i kombinaci symbolických konstant S_IRUSR, S_IWUSR, S_IRGRP, S_IWGRP, S_IROTH a S_IWOTH, osobně mi ovšem oktalový zápis přijde mnohem čitelnější.
Nastavené příznaky přístupu uvidíme i ve výstupu příkazu ipcs:
$ ipcs -q ------ Message Queues -------- key msqid owner perms used-bytes messages 0x2e010dbb 0 tester 660 700 7
6. Poslání zprávy do fronty funkcí msgsnd
Nyní, když již máme frontu vytvořenou, se můžeme pokusit do ní poslat nějakou zprávu. Zpráva má formát struktury, v níž je na začátku uložen typ zprávy (nenulové číslo) následované libovolnou sekvencí bajtů. Pro jednoduchost budeme používat zprávy s konstantní délkou 100 bajtů reprezentované touto strukturou (typ ovšem není součástí délky zprávy):
typedef struct { long message_type; char message_text[100]; } t_message; t_message message; message.message_type = 1; strcpy(message.message_text, "Hello world!");
Poslání zprávy do fronty s ID uloženým v proměnné queue_id zajišťuje funkce msgsnd, které musíme předat jak ID fronty (první parametr), tak i strukturu se zprávou (druhý parametr), její délku (třetí parametr) a případné další příznaky (čtvrtý parametr), které si vysvětlíme v navazujících kapitolách:
status = msgsnd(queue_id, (void*)&message, sizeof(message.message_text), 0); if (status == -1) { perror("Can not send message"); return 1; }
Podívejme se nyní na zdrojový kód jednoduchého producenta zpráv, který po svém spuštění vytvoří frontu a pošle do ní jedinou zprávu s obsahem „Hello world!“. V programu se kontroluje, zda se podařilo frontu vytvořit a zda se poslání zprávy skutečně podařilo či nikoli:
#include <stdio.h> #include <string.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> typedef struct { long message_type; char message_text[100]; } t_message; int main(void) { t_message message; key_t key; int queue_id; int status; message.message_type = 1; strcpy(message.message_text, "Hello world!"); key = ftok("/home/tester/.bashrc", 1234); if (key == -1) { perror("Unable to generate key"); return 2; } printf("Key: %x\n", key); queue_id = msgget(key, IPC_CREAT | 0660); if (queue_id == -1) { perror("Unable to get message queue identifier"); return 2; } printf("Message queue identifier: %x\n", queue_id); status = msgsnd(queue_id, (void*)&message, sizeof(message.message_text), 0); if (status == -1) { perror("Can not send message"); return 1; } return 0; }
7. Připojení k existující frontě a přečtení zprávy funkcí msgrcv
Nyní si ukažme, jakým způsobem se můžeme připojit k existující frontě a přečíst z ní zprávu, popř. počkat na poslání zprávy. Nejdříve je opět nutné získat klíč (celočíselnou hodnotu), který musí být stejný, jako tomu bylo v producentovi zpráv. Použijeme tedy shodný soubor i shodnou konstantu předanou ve druhém parametru funkce ftok:
key_t key; key = ftok("/home/tester/.bashrc", 1234); if (key == -1) { perror("Unable to generate key"); return 2; } printf("Key: %x\n", key);
Dále se pokusíme o otevření fronty, a to opět s využitím funkce msgget. Ovšem zatímco pro vytvoření fronty bylo nutné předat příznak IPC_CREAT a režim přístupu:
queue_id = msgget(key, IPC_CREAT | 0660);
Pro otevření již existující fronty můžeme ve druhém parametry předat nulu (počítá se tedy, že fronta existuje):
queue_id = msgget(key, 0); if (queue_id == -1) { perror("Unable to get message queue identifier"); return 2; }
Zpráva se přečte funkcí msgrcv:
status = msgrcv(queue_id, (void*)&message, sizeof(message.message_text), 0, 0);
Této funkci se opět předává identifikátor fronty (první parametr), ukazatel na strukturu pro uložení přečtené zprávy (druhý parametr), maximální očekávaná délka zprávy (třetí parametr), typ zprávy (čtvrtý parametr) a případné další příznaky ovlivňující příjem zprávy. Povšimněte si, že typ zprávy jsme nastavili na nulu. Tato speciální hodnota říká, že se má přečíst první zpráva z fronty, bez ohledu na její skutečný typ. V případě, že by byl typ kladným číslem, přečetla by se první zpráva s tímto nastaveným typem (pokud existuje). Pokud by se naopak jednalo o číslo záporné, přečetla by se první zpráva s typem menším než absolutní hodnota tohoto čísla. Díky tomu lze tento parametr použít pro filtraci zpráv, což si ukážeme v dalších demonstračních příkladech. V posledním parametru lze příznakovými bity určit, zda se má čekat na zprávu (výchozí chování odpovídající klasické frontě), popř. jaká operace se má vykonat ve chvíli, kdy je přijatá zpráva delší než nastavený limit.
Implementace konzumenta (příjemce) jediné zprávy může vypadat následovně:
#include <stdio.h> #include <string.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> typedef struct { long message_type; char message_text[100]; } t_message; int main(void) { t_message message; key_t key; int queue_id; int status; key = ftok("/home/tester/.bashrc", 1234); if (key == -1) { perror("Unable to generate key"); return 2; } printf("Key: %x\n", key); queue_id = msgget(key, 0); if (queue_id == -1) { perror("Unable to get message queue identifier"); return 2; } printf("Message queue identifier: %x\n", queue_id); status = msgrcv(queue_id, (void*)&message, sizeof(message.message_text), 0, 0); if (status == -1) { perror("Can not receive message"); return 1; } printf("Message type: %ld\n", message.message_type); printf("Message text: %s\n", message.message_text); return 0; }
8. Spuštění producenta i konzumenta zprávy
Jak producenta zprávy, tak i jejího konzumenta přeložíme běžným způsobem. Můžeme použít následující soubor Makefile, který je značně zjednodušen, protože nepotřebujeme linkovat žádné další knihovny (příště už bude situace nepatrně komplikovanější, protože budeme linkovat knihovnu rt):
CC=gcc LINKER=gcc CFLAGS=-O0 -Wall -pedantic LFLAGS= .PHONY: clean all: publisher subscriber %.o: %.c $(CC) -c -o $@ $(CFLAGS) $< publisher: publisher.o $(CC) -o $@ $< $(LFLAGS) subscriber: subscriber.o $(CC) -o $@ $< $(LFLAGS) clean: rm -f *.o \ rm -f publisher \ rm -f subscriber
Překlad a slinkování producenta i konzumenta:
$ make gcc -c -o publisher.o -O0 -Wall -pedantic publisher.c gcc -o publisher publisher.o gcc -c -o subscriber.o -O0 -Wall -pedantic subscriber.c gcc -o subscriber subscriber.o
Nejprve spustíme producenta zprávy. Ten by měl vypsat klíč, který získal a taktéž identifikátor nově vytvořené fronty:
$ ./publisher Key: d2010dbb Message queue identifier: 8000
A následně spustíme konzumenta. Ten opět vypíše klíč (musí být shodný s klíčem předchozím), identifikátor fronty a konečně zprávu, kterou z fronty přečetl:
$ ./subscriber Key: d2010dbb Message queue identifier: 8000 Message type: 1 Message text: Hello world!
9. Prozkoumání stavu fronty příkazem ipcs
Můžeme si pochopitelně vyzkoušet, jakým způsobem ovlivnilo spuštění producenta a konzumenta stav front, jenž dokážeme získat příkazem ipcs.
Ještě před prvním spuštění producenta by žádná fronta neměla existovat, což jsme si již ověřili v úvodních kapitolách:
$ ipcs -q ------ Message Queues -------- key msqid owner perms used-bytes messages
Po spuštění producenta by se měla fronta vytvořit a současně by se do ní měla zapsat zpráva o délce 100 bajtů:
$ ipcs -q ------ Message Queues -------- key msqid owner perms used-bytes messages 0xd2010dbb 32768 tester 660 100 1
Prozkoumat můžeme i konkrétní frontu, resp. přesněji řečeno frontu se zadaným ID, v našem případě ID=32768:
$ ipcs -q -i 32768 Message Queue msqid=32768 uid=1000 gid=1000 cuid=1000 cgid=1000 mode=0660 cbytes=100 qbytes=16384 qnum=1 lspid=15028 lrpid=15002 send_time=Sat Nov 16 21:58:51 2019 rcv_time=Sat Nov 16 21:57:09 2019 change_time=Sat Nov 16 21:56:02 2019
Po spuštění konzumenta, který zprávu přečte, se pochopitelně změní i stav fronty, což si můžeme velmi snadno ověřit:
$ ipcs -q ------ Message Queues -------- key msqid owner perms used-bytes messages 0xd2010dbb 32768 tester 660 0 0 $ ipcs -q -i 32768 Message Queue msqid=32768 uid=1000 gid=1000 cuid=1000 cgid=1000 mode=0660 cbytes=0 qbytes=16384 qnum=0 lspid=15028 lrpid=15077 send_time=Sat Nov 16 21:58:51 2019 rcv_time=Sat Nov 16 22:04:42 2019 change_time=Sat Nov 16 21:56:02 2019
Vidíme, že fronta sice stále existuje, ovšem neobsahuje žádné zprávy a její zaplněná kapacita je tedy nula bajtů.
$ ipcs -q ------ Message Queues -------- key msqid owner perms used-bytes messages 0xd200017f 0 ptisnovs 660 200 2
Informace o frontě číslo 0:
$ ipcs -q -i 0 Message Queue msqid=0 uid=1000 gid=1000 cuid=1000 cgid=1000 mode=0660 cbytes=100 qbytes=16384 qnum=1 lspid=5043 lrpid=0 send_time=Fri Nov 15 16:01:24 2019 rcv_time=Not set change_time=Fri Nov 15 15:44:27 2019
10. Producent posílající více zpráv
Nepatrnou úpravou producenta můžeme zajistit, aby se zprávy vytvářely nepřetržitě s intervalem přibližně jedné sekundy. Každá zpráva bude obsahovat odlišnou zprávu obsahující mj. i hodnotu počitadla:
while (1) { sprintf(message.message_text, "Message #%d", msg_number); msg_number++; status = msgsnd(queue_id, (void*)&message, sizeof(message.message_text), 0); if (status == -1) { perror("Can not send message"); return 1; } sleep(1); }
Úplný zdrojový kód takto upraveného producenta zpráv může vypadat následovně:
#include <stdio.h> #include <string.h> #include <unistd.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> typedef struct { long message_type; char message_text[100]; } t_message; int main(void) { t_message message; key_t key; int queue_id; int status; int msg_number = 1; message.message_type = 1; key = ftok("/home/tester/.bashrc", 1234); if (key == -1) { perror("Unable to generate key"); return 2; } printf("Key: %x\n", key); queue_id = msgget(key, IPC_CREAT | 0660); if (queue_id == -1) { perror("Unable to get message queue identifier"); return 2; } printf("Message queue identifier: %x\n", queue_id); while (1) { sprintf(message.message_text, "Message #%d", msg_number); msg_number++; status = msgsnd(queue_id, (void*)&message, sizeof(message.message_text), 0); if (status == -1) { perror("Can not send message"); return 1; } sleep(1); } return 0; }
Producenta spustíme po dobu přibližně jedné minuty a poté zkontrolujeme stav fronty. Měla by obsahovat přibližně šedesát zpráv a zaplněná kapacita by měla být přesně 100*počet_zpráv:
$ ipcs -q ------ Message Queues -------- key msqid owner perms used-bytes messages 0xd2010dbb 32768 tester 660 6600 66 $ ipcs -q -i 32768 Message Queue msqid=32768 uid=1000 gid=1000 cuid=1000 cgid=1000 mode=0660 cbytes=6600 qbytes=16384 qnum=66 lspid=15115 lrpid=15077 send_time=Sat Nov 16 22:11:09 2019 rcv_time=Sat Nov 16 22:04:42 2019 change_time=Sat Nov 16 21:56:02 2019
11. Konzument akceptující postupně všechny zprávy
Podobným způsobem můžeme upravit i konzumenta zpráv, který ovšem bude přijímat všechny zprávy bez zbytečné jednosekundové pauzy:
while (1) { status = msgrcv(queue_id, (void*)&message, sizeof(message.message_text), 0, 0); if (status == -1) { perror("Can not receive message"); return 1; } printf("Message type: %ld\n", message.message_type); printf("Message text: %s\n", message.message_text); }
Úplný zdrojový kód upraveného konzumenta zpráv:
#include <stdio.h> #include <string.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> typedef struct { long message_type; char message_text[100]; } t_message; int main(void) { t_message message; key_t key; int queue_id; int status; key = ftok("/home/tester/.bashrc", 1234); if (key == -1) { perror("Unable to generate key"); return 2; } printf("Key: %x\n", key); queue_id = msgget(key, 0); if (queue_id == -1) { perror("Unable to get message queue identifier"); return 2; } printf("Message queue identifier: %x\n", queue_id); while (1) { status = msgrcv(queue_id, (void*)&message, sizeof(message.message_text), 0, 0); if (status == -1) { perror("Can not receive message"); return 1; } printf("Message type: %ld\n", message.message_type); printf("Message text: %s\n", message.message_text); } return 0; }
Po překladu a spuštění konzumenta zpráv by se měly prakticky okamžitě načíst všechny zprávy, které byly uloženy do fronty:
$ ./subscriber tester@tester-ThinkPad-T410 ~/temp/message-queues-examples/unix-messages/system-v/example2 $ ./subscriber Key: d2010dbb Message queue identifier: 0 Message type: 1 Message text: Message #1 Message type: 1 Message text: Message #2 Message type: 1 Message text: Message #3 Message type: 1 Message text: Message #4 ... ... ... Message type: 1 Message text: Message #66 ... ... ... konzument čeká na další zprávy
12. Chování producenta při zaplnění fronty
Nyní vyzkoušíme, co se stane ve chvíli, kdy producent zaplní celou kapacitu fronty (konzument nebude spuštěn). Producenta z předchozích kapitol nepatrně upravíme takovým způsobem, aby zprávy posílal co nejrychleji, bez sekundových prodlev:
#include <stdio.h> #include <string.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> typedef struct { long message_type; char message_text[100]; } t_message; int main(void) { t_message message; key_t key; int queue_id; int status; int msg_number = 1; message.message_type = 1; key = ftok("/home/tester/.bashrc", 1234); if (key == -1) { perror("Unable to generate key"); return 2; } printf("Key: %x\n", key); queue_id = msgget(key, IPC_CREAT | 0660); if (queue_id == -1) { perror("Unable to get message queue identifier"); return 2; } printf("Message queue identifier: %x\n", queue_id); while (1) { sprintf(message.message_text, "Message #%d", msg_number); msg_number++; status = msgsnd(queue_id, (void*)&message, sizeof(message.message_text), 0); if (status == -1) { perror("Can not send message"); return 1; } } return 0; }
Producenta přeložíme a následně běžným způsobem spustíme:
$ ./publisher Key: d2010dbb Message queue identifier: 0
V této chvíli se producent zastaví, protože došlo k naplnění fronty. Můžeme se o tom velmi snadno přesvědčit:
$ ipcs -q ------ Message Queues -------- key msqid owner perms used-bytes messages 0xd2010dbb 0 tester 660 16300 163
Fronta nyní obsahuje 163 zpráv, každou o délce 100 bajtů, což znamená, že z kapacity 16384 je obsazeno 16300 bajtů a další zpráva se do zbylých 84 bajtů již nevleze:
$ ipcs -q -i 0 Message Queue msqid=0 uid=1000 gid=1000 cuid=1000 cgid=1000 mode=0660 cbytes=16300 qbytes=16384 qnum=163 lspid=4139 lrpid=4118 send_time=Sun Nov 17 16:39:20 2019 rcv_time=Sun Nov 17 16:38:28 2019 change_time=Sun Nov 17 16:38:05 2019
13. Poslání zprávy do fronty bez čekání
Prozatím jsme zprávy do fronty posílali takto:
status = msgsnd(queue_id, (void*)&message, sizeof(message.message_text), 0);
Vzhledem k tomu, že poslední parametr obsahoval nulu, bylo chování funkce msgsnd blokující – ve chvíli, kdy byla fronta zaplněna, čekala funkce msgsnd tak dlouho, dokud nedošlo k ukončení procesu, ke smazání fronty či k uvolnění místa ve frontě. Někdy však můžeme chtít, aby se v případě, že je fronta plná, o této situaci producent dozvěděl, a to okamžitě. I to je možné – postačuje v posledním parametru funkce msgsnd předat příznak IPC_NOWAIT:
status = msgsnd(queue_id, (void*)&message, sizeof(message.message_text), IPC_NOWAIT);
V případě, že je fronta plná, vrátí se chyba (-1) a proměnná errno bude obsahovat konstantu EAGAIN (z hlavičkového souboru errno.h), kterou můžeme testovat.
#include <stdio.h> #include <string.h> #include <errno.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> typedef struct { long message_type; char message_text[100]; } t_message; int main(void) { t_message message; key_t key; int queue_id; int status; int msg_number = 1; message.message_type = 1; key = ftok("/home/tester/.bashrc", 1234); if (key == -1) { perror("Unable to generate key"); return 2; } printf("Key: %x\n", key); queue_id = msgget(key, IPC_CREAT | 0660); if (queue_id == -1) { perror("Unable to get message queue identifier"); return 2; } printf("Message queue identifier: %x\n", queue_id); while (1) { sprintf(message.message_text, "Message #%d", msg_number); msg_number++; status = msgsnd(queue_id, (void*)&message, sizeof(message.message_text), IPC_NOWAIT); if (status == -1) { if (errno == EAGAIN) { puts("Message queue is full"); } perror("Can not send message"); return 1; } } return 0; }
Opět si otestujme chování producenta zpráv:
$ ./publisher Key: d2010dbb Message queue identifier: 0 Message queue is full Can not send message: Resource temporarily unavailable
Vidíme, že v tomto případě byl producent ukončen a správně napsal, z jakého důvodu.
14. Přečtení doplňujících informací o frontě funkcí msgctl
Prozatím jsme použili tři knihovní funkce ze seznamu uvedeného ve druhé kapitole – msgget, msgsnd a msgrcv. Zbývá nám použít čtvrtou funkci nazvanou msgctl. Ukažme si, jak lze tuto funkci použít pro přečtení informací o frontě, a to kdykoli – klidně i ve chvíli, kdy se do fronty přidávají nové zprávy popř. se zprávy odebírají. Informace o stavu fronty se přenesou do struktury typu msqid_ds:
struct msqid_ds buf;
Samotné přečtení informací zajistí právě funkce msgctl, pokud ve druhém parametru předáme příznak IPC_STAT. Návratový kód funkce indikuje případnou chybu:
status = msgctl(queue_id, IPC_STAT, &buf); if (status == -1) { perror("Can not read message queue info"); return 1; }
Získané informace lze vypsat – některé mají podobu časového razítka, které převedeme na řetězec funkcí ctime, další obsahují celočíselné údaje (počet zpráv, PID procesů používajících frontu atd.). Pro otestování, zda je PID uvedeno správně, se vypisuje i PID procesu, který právě běží:
printf("Last sent: %s", ctime(&buf.msg_stime)); printf("Last recv: %s", ctime(&buf.msg_rtime)); printf("Last change: %s", ctime(&buf.msg_ctime)); printf("Messages in queue: %ld\n", buf.msg_qnum); printf("PID of last sender: %d\n", buf.msg_lspid); printf("PID of last receiver: %d\n", buf.msg_lrpid); printf("PID of this process: %d\n\n", getpid());
Upravený producent zpráv může vypadat takto:
#include <stdio.h> #include <string.h> #include <unistd.h> #include <time.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> typedef struct { long message_type; char message_text[100]; } t_message; int main(void) { t_message message; key_t key; int queue_id; int status; int msg_number = 1; message.message_type = 1; key = ftok("/home/tester/.bashrc", 1234); if (key == -1) { perror("Unable to generate key"); return 2; } printf("Key: %x\n", key); queue_id = msgget(key, IPC_CREAT | 0660); if (queue_id == -1) { perror("Unable to get message queue identifier"); return 2; } printf("Message queue identifier: %x\n", queue_id); while (1) { sprintf(message.message_text, "Message #%d", msg_number); msg_number++; status = msgsnd(queue_id, (void*)&message, sizeof(message.message_text), 0); if (status == -1) { perror("Can not send message"); return 1; } { struct msqid_ds buf; status = msgctl(queue_id, IPC_STAT, &buf); if (status == -1) { perror("Can not read message queue info"); return 1; } printf("Last sent: %s", ctime(&buf.msg_stime)); printf("Last recv: %s", ctime(&buf.msg_rtime)); printf("Last change: %s", ctime(&buf.msg_ctime)); printf("Messages in queue: %ld\n", buf.msg_qnum); printf("PID of last sender: %d\n", buf.msg_lspid); printf("PID of last receiver: %d\n", buf.msg_lrpid); printf("PID of this process: %d\n\n", getpid()); } sleep(1); } return 0; }
Příklad výpisu provedeného producentem po jeho spuštění:
Key: d200017f Message queue identifier: 0 Message type: 1 Message text: Hello world! Last sent: Fri Nov 15 20:34:12 2019 Last recv: Fri Nov 15 20:22:14 2019 Last change: Fri Nov 15 19:57:25 2019 Messages in queue: 24 PID of last sender: 4590 PID of last receiver: 4168 PID of this process: 4590
15. Filtrace zpráv podle nastaveného typu
Prozatím jsme příliš nevyužívali typ zprávy, což je hodnota typu long, která musí být nenulová a musí být ke každé zprávě přiřazena. Upravme tedy producenta zpráv takovým způsobem, aby bylo možné typ zprávy (celé číslo) nastavit z příkazového řádku:
int main(int argc, char **argv) { t_message message; if (argc < 1) { puts("Usage: ./producer message_type"); return 1; } message.message_type = atoi(argv[1]);
Úplný kód producenta se změní následovně:
#include <stdio.h> #include <string.h> #include <stdlib.h> #include <unistd.h> #include <time.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> typedef struct { long message_type; char message_text[100]; } t_message; int main(int argc, char **argv) { t_message message; key_t key; int queue_id; int status; int msg_number = 1; if (argc < 1) { puts("Usage: ./producer message_type"); return 1; } message.message_type = atoi(argv[1]); key = ftok("/home/tester/.bashrc", 5678); if (key == -1) { perror("Unable to generate key"); return 2; } printf("Key: %x\n", key); queue_id = msgget(key, IPC_CREAT | 0660); if (queue_id == -1) { perror("Unable to get message queue identifier"); return 2; } printf("Message queue identifier: %x\n", queue_id); while (1) { sprintf(message.message_text, "Message #%d", msg_number); msg_number++; status = msgsnd(queue_id, (void*)&message, sizeof(message.message_text), 0); if (status == -1) { perror("Can not send message"); return 1; } { struct msqid_ds buf; status = msgctl(queue_id, IPC_STAT, &buf); if (status == -1) { perror("Can not read message queue info"); return 1; } printf("Last sent: %s", ctime(&buf.msg_stime)); printf("Last recv: %s", ctime(&buf.msg_rtime)); printf("Last change: %s", ctime(&buf.msg_ctime)); printf("Messages in queue: %ld\n", buf.msg_qnum); printf("PID of last sender: %d\n", buf.msg_lspid); printf("PID of last receiver: %d\n", buf.msg_lrpid); printf("PID of this process: %d\n\n", getpid()); } sleep(1); } return 0; }
Prakticky stejným způsobem upravíme konzumenta, ovšem navíc provedeme další změnu – budeme filtrovat zprávy již ve funkci msgrcv, přesněji řečeno určíme, že klient bude akceptovat pouze ty zprávy, které mají nastaven příslušný typ:
status = msgrcv(queue_id, (void*)&message, sizeof(message.message_text), message_type, 0);
Pro úplnost si opět (dnes již naposledy) ukažme celý zdrojový kód konzumenta:
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> typedef struct { long message_type; char message_text[100]; } t_message; int main(int argc, char **argv) { t_message message; key_t key; int queue_id; int status; long message_type; if (argc < 1) { puts("Usage: ./subscriber message_type"); return 1; } message_type = atoi(argv[1]); key = ftok("/home/tester/.bashrc", 5678); if (key == -1) { perror("Unable to generate key"); return 2; } printf("Key: %x\n", key); queue_id = msgget(key, 0); if (queue_id == -1) { perror("Unable to get message queue identifier"); return 2; } printf("Message queue identifier: %x\n", queue_id); while (1) { status = msgrcv(queue_id, (void*)&message, sizeof(message.message_text), message_type, 0); if (status == -1) { perror("Can not receive message"); return 1; } printf("Message type: %ld\n", message.message_type); printf("Message text: %s\n", message.message_text); } return 0; }
16. Možné konfigurace: více producentů a jeden konzument, jeden konzument a více producentů atd.
Poslední verze producenta a konzumenta zpráv nám umožňuje sledovat činnost message brokera v praxi. Nejdříve (což již známe) spustíme jediného producenta a jediného konzumenta:
$ ./publisher 10 Last change: Sun Nov 17 20:50:08 2019 Messages in queue: 1 PID of last sender: 7258 PID of last receiver: 0 PID of this process: 7258 Last sent: Sun Nov 17 20:50:09 2019 Last recv: Thu Jan 1 01:00:00 1970 Last change: Sun Nov 17 20:50:08 2019 Messages in queue: 2 PID of last sender: 7258 PID of last receiver: 0 PID of this process: 7258 Last sent: Sun Nov 17 20:50:10 2019 ... ... ...
$ ./subscriber 10 Key: 2e010dbb Message queue identifier: 8001 Message type: 10 Message text: Message #1 Message type: 10 Message text: Message #2 Message type: 10 Message text: Message #3 ... ... ...
Jeden producent a dva konzumenti:
$ ./publisher 10 Key: 2e010dbb Message queue identifier: 8001 Last sent: Sun Nov 17 20:52:27 2019 Last recv: Sun Nov 17 20:52:27 2019 Last change: Sun Nov 17 20:50:08 2019 Messages in queue: 0 PID of last sender: 7329 PID of last receiver: 7298 PID of this process: 7329 Last sent: Sun Nov 17 20:52:28 2019 Last recv: Sun Nov 17 20:52:28 2019 Last change: Sun Nov 17 20:50:08 2019 Messages in queue: 0 PID of last sender: 7329 PID of last receiver: 7299 PID of this process: 7329 ... ... ...
$ ./subscriber 10 Key: 2e010dbb Message queue identifier: 8001 Message type: 10 Message text: Message #1 Message type: 10 Message text: Message #3 ... ... ...
$ ./subscriber 10 Key: 2e010dbb Message queue identifier: 8001 Message type: 10 Message text: Message #2 Message type: 10 Message text: Message #4 ... ... ...
Dva producenti a jeden konzument:
$ ./publisher 10 ... ... ...
$ ./publisher 10 ... ... ...
$ ./subscriber 10 Key: 2e010dbb Message queue identifier: 8001 Message type: 10 Message text: Message #1 Message type: 10 Message text: Message #1 Message type: 10 Message text: Message #2 Message type: 10 Message text: Message #2 Message type: 10 Message text: Message #3 Message type: 10 Message text: Message #3 Message type: 10 ... ... ...
A nakonec dva producenti a dva konzumenti:
$ ./publisher 10 ... ... ...
$ ./publisher 10 ... ... ...
$ ./subscriber 10 Key: 2e010dbb Message queue identifier: 8001 Message type: 10 Message text: Message #1 Message type: 10 Message text: Message #2 Message type: 10 Message text: Message #3 Message type: 10 Message text: Message #4 Message type: 10 Message text: Message #5 Message type: 10 Message text: Message #6
$ ./subscriber 10 Key: 2e010dbb Message queue identifier: 8001 Message type: 10 Message text: Message #1 Message type: 10 Message text: Message #2 Message type: 10 Message text: Message #3 Message type: 10 Message text: Message #4 Message type: 10 Message text: Message #5 Message type: 10 Message text: Message #6
17. Konzumenti a producenti s filtrací zpráv
Samozřejmě můžeme zajistit, aby jeden producent posílal zprávy jen jedinému konzumentovi, a to přes společnou (jedinou) frontu. Děje se tak nastavením typu zprávy. V dalším příkladu bude existovat producent zpráv s typem 1, druhý producent bude posílat zprávy typu 2. A navíc budou existovat dva konzumenti, jeden pro typ 1 a druhý pro typ 2:
$ ./publisher 1 ... ... ...
$ ./publisher 2 ... ... ...
$ ./subscriber 1 Key: 2e010dbb Message queue identifier: 8001 Message type: 1 Message text: Message #1 Message type: 1 Message text: Message #2 Message type: 1 Message text: Message #3 Message type: 1 ... ... ...
$ ./subscriber 2 Key: 2e010dbb Message queue identifier: 8001 Message type: 2 Message text: Message #1 Message type: 2 Message text: Message #2 Message type: 2 ... ... ...
Poslední příklad bude navíc obsahovat konzumenta, který správy nefiltruje a akceptuje tedy jakoukoli zprávu:
$ ./publisher 1 ... ... ...
$ ./publisher 2 ... ... ...
$ ./subscriber 1 Key: 2e010dbb Message queue identifier: 8001 Message type: 1 Message text: Message #1 Message type: 1 Message text: Message #3 Message type: 1 Message text: Message #4 Message type: 1 Message text: Message #6 Message type: 1 Message text: Message #7
$ ./subscriber 2 Key: 2e010dbb Message queue identifier: 8001 Message type: 2 Message text: Message #1 Message type: 2 Message text: Message #3 Message type: 2 Message text: Message #5 Message type: 2 Message text: Message #6 Message type: 2 Message text: Message #8 Message type: 2 Message text: Message #9 Message type: 2 Message text: Message #11
$ ./subscriber Key: 2e010dbb Message queue identifier: 8001 Message type: 2 Message text: Message #2 Message type: 2 Message text: Message #4 Message type: 1 Message text: Message #2 Message type: 2 Message text: Message #7 Message type: 1 Message text: Message #5 Message type: 2 Message text: Message #10 Message type: 1 Message text: Message #8
18. Obsah dalšího pokračování seriálu
V navazující části seriálu o message brokerech se seznámíme s druhou technologií, kterou lze v Linuxu (a mnoha Unixech) použít pro posílání zpráv mezi procesy běžícími na stejném počítači. Ukážeme si totiž použití front zpráv definovaných v POSIXu. V Linuxu je práce s těmito frontami zpráv realizována přímo v jádře operačního systému a nabízí několik režimů činnosti, které nebyly ve frontách podle Systemu V podporovány. Navíc je možné „POSIXové fronty“ připojit jako zvláštní souborový systém, což může být v některých případech velmi užitečné.
19. Repositář s demonstračními příklady
Zdrojové kódy všech dnes popsaných demonstračních příkladů vyvinutých v programovacím jazyku Go byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má stále ještě doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce:
20. Odkazy na Internetu
- POSIX message queues in Linux
https://www.softprayog.in/programming/interprocess-communication-using-posix-message-queues-in-linux - How is a message queue implemented in the Linux kernel?
https://unix.stackexchange.com/questions/6930/how-is-a-message-queue-implemented-in-the-linux-kernel/6935 - ‘IPCS’ command in Linux with examples
https://www.geeksforgeeks.org/ipcs-command-linux-examples/ - System V IPC: Message Queues
https://nitish712.blogspot.com/2012/11/system-v-ipc-message-queues.html - How to create, check and delete IPC share memory, semaphare and message queue on linux
https://fibrevillage.com/sysadmin/225-how-to-create-check-and-delete-ipc-share-memory-semaphare-and-message-queue-on-linux - MQ_OVERVIEW(7): Linux Programmer's Manual
http://man7.org/linux/man-pages/man7/mq_overview.7.html - mq_overview (7) – Linux Man Pages
https://www.systutorials.com/docs/linux/man/7-mq_overview/ - POSIX.4 Message Queues (+ rozšíření QNX)
https://users.pja.edu.pl/~jms/qnx/help/watcom/clibref/mq_overview.html - System V message queues in Linux
https://www.softprayog.in/programming/interprocess-communication-using-system-v-message-queues-in-linux - Linux System V and POSIX IPC Examples
http://hildstrom.com/projects/ipc_sysv_posix/index.html - Programming Tutorial – Linux: Message Queues
https://ccppcoding.blogspot.com/2013/03/linux-message-queues.html - Go wrapper for POSIX Message Queues
https://github.com/syucream/posix_mq - Stránka projektu NSQ
https://nsq.io/ - Dokumentace k projektu NSQ
https://nsq.io/overview/design.html - Dokumentace ke klientovi pro Go
https://godoc.org/github.com/nsqio/go-nsq - Dokumentace ke klientovi pro Python
https://pynsq.readthedocs.io/en/latest/ - Binární tarbally s NSQ
https://nsq.io/deployment/installing.html - GitHub repositář projektu NSQ
https://github.com/nsqio/nsq - Klienti pro NSQ
https://nsq.io/clients/client_libraries.html - Klient pro Go
https://github.com/nsqio/go-nsq - Klient pro Python
https://github.com/nsqio/pynsq - An Example of Using NSQ From Go
http://tleyden.github.io/blog/2014/11/12/an-example-of-using-nsq-from-go/ - Go Go Gadget
https://word.bitly.com/post/29550171827/go-go-gadget - Simplehttp
https://github.com/bitly/simplehttp - Dramatiq: simple task processing
https://dramatiq.io/ - Cookbook (for Dramatiq)
https://dramatiq.io/cookbook.html - Balíček dramatiq na PyPi
https://pypi.org/project/dramatiq/ - Dramatiq dashboard
https://github.com/Bogdanp/dramatiq_dashboard - Dramatiq na Redditu
https://www.reddit.com/r/dramatiq/ - A Dramatiq broker that can be used with Amazon SQS
https://github.com/Bogdanp/dramatiq_sqs - nanomsg na GitHubu
https://github.com/nanomsg/nanomsg - Referenční příručka knihovny nanomsg
https://nanomsg.org/v1.1.5/nanomsg.html - nng (nanomsg-next-generation)
https://github.com/nanomsg/nng - Differences between nanomsg and ZeroMQ
https://nanomsg.org/documentation-zeromq.html - NATS
https://nats.io/about/ - NATS Streaming Concepts
https://nats.io/documentation/streaming/nats-streaming-intro/ - NATS Streaming Server
https://nats.io/download/nats-io/nats-streaming-server/ - NATS Introduction
https://nats.io/documentation/ - NATS Client Protocol
https://nats.io/documentation/internals/nats-protocol/ - NATS Messaging (Wikipedia)
https://en.wikipedia.org/wiki/NATS_Messaging - Stránka Apache Software Foundation
http://www.apache.org/ - Informace o portu 5672
http://www.tcp-udp-ports.com/port-5672.htm - Třída MessagingHandler knihovny Qpid Proton
https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/proton._handlers.MessagingHandler-class.html - Třída Event knihovny Qpid Proton
https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/proton._events.Event-class.html - package stomp (Go)
https://godoc.org/github.com/go-stomp/stomp - Go language library for STOMP protocol
https://github.com/go-stomp/stomp - python-qpid-proton 0.26.0 na PyPi
https://pypi.org/project/python-qpid-proton/ - Qpid Proton
http://qpid.apache.org/proton/ - Using the AMQ Python Client
https://access.redhat.com/documentation/en-us/red_hat_amq/7.1/html-single/using_the_amq_python_client/ - Apache ActiveMQ
http://activemq.apache.org/ - Apache ActiveMQ Artemis
https://activemq.apache.org/artemis/ - Apache ActiveMQ Artemis User Manual
https://activemq.apache.org/artemis/docs/latest/index.html - KahaDB
http://activemq.apache.org/kahadb.html - Understanding the KahaDB Message Store
https://access.redhat.com/documentation/en-US/Fuse_MQ_Enterprise/7.1/html/Configuring_Broker_Persistence/files/KahaDBOverview.html - Command Line Tools (Apache ActiveMQ)
https://activemq.apache.org/activemq-command-line-tools-reference.html - stomp.py 4.1.21 na PyPi
https://pypi.org/project/stomp.py/ - Stomp Tutorial
https://access.redhat.com/documentation/en-US/Fuse_Message_Broker/5.5/html/Connectivity_Guide/files/FMBConnectivityStompTelnet.html - Heartbeat (computing)
https://en.wikipedia.org/wiki/Heartbeat_(computing) - Apache Camel
https://camel.apache.org/ - Red Hat Fuse
https://developers.redhat.com/products/fuse/overview/ - Confusion between ActiveMQ and ActiveMQ-Artemis?
https://serverfault.com/questions/873533/confusion-between-activemq-and-activemq-artemis - Staré stránky projektu HornetQ
http://hornetq.jboss.org/ - Snapshot JeroMQ verze 0.4.4
https://oss.sonatype.org/content/repositories/snapshots/org/zeromq/jeromq/0.4.4-SNAPSHOT/ - Difference between ActiveMQ vs Apache ActiveMQ Artemis
http://activemq.2283324.n4.nabble.com/Difference-between-ActiveMQ-vs-Apache-ActiveMQ-Artemis-td4703828.html - Microservices communications. Why you should switch to message queues
https://dev.to/matteojoliveau/microservices-communications-why-you-should-switch-to-message-queues–48ia - Stomp.py 4.1.19 documentation
https://stomppy.readthedocs.io/en/stable/ - Repositář knihovny JeroMQ
https://github.com/zeromq/jeromq/ - ØMQ – Distributed Messaging
http://zeromq.org/ - ØMQ Community
http://zeromq.org/community - Get The Software
http://zeromq.org/intro:get-the-software - PyZMQ Documentation
https://pyzmq.readthedocs.io/en/latest/ - Module: zmq.decorators
https://pyzmq.readthedocs.io/en/latest/api/zmq.decorators.html - ZeroMQ is the answer, by Ian Barber
https://vimeo.com/20605470 - ZeroMQ RFC
https://rfc.zeromq.org/ - ZeroMQ and Clojure, a brief introduction
https://antoniogarrote.wordpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/ - zeromq/czmq
https://github.com/zeromq/czmq - golang wrapper for CZMQ
https://github.com/zeromq/goczmq - ZeroMQ version reporting in Python
http://zguide.zeromq.org/py:version - A Go interface to ZeroMQ version 4
https://github.com/pebbe/zmq4 - Broker vs. Brokerless
http://zeromq.org/whitepapers:brokerless - Learning ØMQ with pyzmq
https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/ - Céčková funkce zmq_ctx_new
http://api.zeromq.org/4–2:zmq-ctx-new - Céčková funkce zmq_ctx_destroy
http://api.zeromq.org/4–2:zmq-ctx-destroy - Céčková funkce zmq_bind
http://api.zeromq.org/4–2:zmq-bind - Céčková funkce zmq_unbind
http://api.zeromq.org/4–2:zmq-unbind - Céčková C funkce zmq_connect
http://api.zeromq.org/4–2:zmq-connect - Céčková C funkce zmq_disconnect
http://api.zeromq.org/4–2:zmq-disconnect - Céčková C funkce zmq_send
http://api.zeromq.org/4–2:zmq-send - Céčková C funkce zmq_recv
http://api.zeromq.org/4–2:zmq-recv - Třída Context (Python)
https://pyzmq.readthedocs.io/en/latest/api/zmq.html#context - Třída Socket (Python)
https://pyzmq.readthedocs.io/en/latest/api/zmq.html#socket - Python binding
http://zeromq.org/bindings:python - Why should I have written ZeroMQ in C, not C++ (part I)
http://250bpm.com/blog:4 - Why should I have written ZeroMQ in C, not C++ (part II)
http://250bpm.com/blog:8 - About Nanomsg
https://nanomsg.org/ - Advanced Message Queuing Protocol
https://www.amqp.org/ - Advanced Message Queuing Protocol na Wikipedii
https://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol - Dokumentace k příkazu rabbitmqctl
https://www.rabbitmq.com/rabbitmqctl.8.html - RabbitMQ
https://www.rabbitmq.com/ - RabbitMQ Tutorials
https://www.rabbitmq.com/getstarted.html - RabbitMQ: Clients and Developer Tools
https://www.rabbitmq.com/devtools.html - RabbitMQ na Wikipedii
https://en.wikipedia.org/wiki/RabbitMQ - Streaming Text Oriented Messaging Protocol
https://en.wikipedia.org/wiki/Streaming_Text_Oriented_Messaging_Protocol - Message Queuing Telemetry Transport
https://en.wikipedia.org/wiki/MQTT - Erlang
http://www.erlang.org/ - pika 0.12.0 na PyPi
https://pypi.org/project/pika/ - Introduction to Pika
https://pika.readthedocs.io/en/stable/ - Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
http://clojurerabbitmq.info/ - AMQP 0–9–1 Model Explained
http://www.rabbitmq.com/tutorials/amqp-concepts.html - Part 1: RabbitMQ for beginners – What is RabbitMQ?
https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html - Downloading and Installing RabbitMQ
https://www.rabbitmq.com/download.html - celery na PyPi
https://pypi.org/project/celery/ - Databáze Redis (nejenom) pro vývojáře používající Python
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/ - Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/ - Redis Queue (RQ)
https://www.fullstackpython.com/redis-queue-rq.html - Python Celery & RabbitMQ Tutorial
https://tests4geeks.com/python-celery-rabbitmq-tutorial/ - Flower: Real-time Celery web-monitor
http://docs.celeryproject.org/en/latest/userguide/monitoring.html#flower-real-time-celery-web-monitor - Asynchronous Tasks With Django and Celery
https://realpython.com/asynchronous-tasks-with-django-and-celery/ - First Steps with Celery
http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html - node-celery
https://github.com/mher/node-celery - Full Stack Python: web development
https://www.fullstackpython.com/web-development.html - Introducing RQ
https://nvie.com/posts/introducing-rq/ - Asynchronous Tasks with Flask and Redis Queue
https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue - rq-dashboard
https://github.com/eoranged/rq-dashboard - Stránky projektu Redis
https://redis.io/ - Introduction to Redis
https://redis.io/topics/introduction - Try Redis
http://try.redis.io/ - Redis tutorial, April 2010 (starší, ale pěkně udělaný)
https://static.simonwillison.net/static/2010/redis-tutorial/ - Python Redis
https://redislabs.com/lp/python-redis/ - Redis: key-value databáze v paměti i na disku
https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/ - Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
http://www.cloudsvet.cz/?p=253 - Praktický úvod do Redis (2): transakce
http://www.cloudsvet.cz/?p=256 - Praktický úvod do Redis (3): cluster
http://www.cloudsvet.cz/?p=258 - Connection pool
https://en.wikipedia.org/wiki/Connection_pool - Instant Redis Sentinel Setup
https://github.com/ServiceStack/redis-config - How to install REDIS in LInux
https://linuxtechlab.com/how-install-redis-server-linux/ - Redis RDB Dump File Format
https://github.com/sripathikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format - Lempel–Ziv–Welch
https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch - Redis Persistence
https://redis.io/topics/persistence - Redis persistence demystified
http://oldblog.antirez.com/post/redis-persistence-demystified.html - Redis reliable queues with Lua scripting
http://oldblog.antirez.com/post/250 - Ost (knihovna)
https://github.com/soveran/ost - NoSQL
https://en.wikipedia.org/wiki/NoSQL - Shard (database architecture)
https://en.wikipedia.org/wiki/Shard_%28database_architecture%29 - What is sharding and why is it important?
https://stackoverflow.com/questions/992988/what-is-sharding-and-why-is-it-important - What Is Sharding?
https://btcmanager.com/what-sharding/ - Redis clients
https://redis.io/clients - Category:Lua-scriptable software
https://en.wikipedia.org/wiki/Category:Lua-scriptable_software - Seriál Programovací jazyk Lua
https://www.root.cz/serialy/programovaci-jazyk-lua/ - Redis memory usage
http://nosql.mypopescu.com/post/1010844204/redis-memory-usage - Ukázka konfigurace Redisu pro lokální testování
https://github.com/tisnik/presentations/blob/master/redis/redis.conf - Resque
https://github.com/resque/resque - Nested transaction
https://en.wikipedia.org/wiki/Nested_transaction - Publish–subscribe pattern
https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern - Messaging pattern
https://en.wikipedia.org/wiki/Messaging_pattern - Using pipelining to speedup Redis queries
https://redis.io/topics/pipelining - Pub/Sub
https://redis.io/topics/pubsub - ZeroMQ distributed messaging
http://zeromq.org/ - ZeroMQ: Modern & Fast Networking Stack
https://www.igvita.com/2010/09/03/zeromq-modern-fast-networking-stack/ - Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
https://stackoverflow.com/questions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr - Python & Redis PUB/SUB
https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7 - Message broker
https://en.wikipedia.org/wiki/Message_broker - RESP Arrays
https://redis.io/topics/protocol#array-reply - Redis Protocol specification
https://redis.io/topics/protocol - Redis Pub/Sub: Intro Guide
https://www.redisgreen.net/blog/pubsub-intro/ - Redis Pub/Sub: Howto Guide
https://www.redisgreen.net/blog/pubsub-howto/ - Comparing Publish-Subscribe Messaging and Message Queuing
https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message - Apache Kafka
https://kafka.apache.org/ - Iron
http://www.iron.io/mq - kue (založeno na Redisu, určeno pro node.js)
https://github.com/Automattic/kue - Cloud Pub/Sub
https://cloud.google.com/pubsub/ - Introduction to Redis Streams
https://redis.io/topics/streams-intro - glob (programming)
https://en.wikipedia.org/wiki/Glob_(programming) - Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
https://www.slideshare.net/sylvinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2 - Enqueueing internals
http://python-rq.org/contrib/ - queue — A synchronized queue class
https://docs.python.org/3/library/queue.html - Queue – A thread-safe FIFO implementation
https://pymotw.com/2/Queue/ - Queues
http://queues.io/ - Windows Subsystem for Linux Documentation
https://docs.microsoft.com/en-us/windows/wsl/about - RestMQ
http://restmq.com/ - ActiveMQ
http://activemq.apache.org/ - Amazon MQ
https://aws.amazon.com/amazon-mq/ - Amazon Simple Queue Service
https://aws.amazon.com/sqs/ - Celery: Distributed Task Queue
http://www.celeryproject.org/ - Disque, an in-memory, distributed job queue
https://github.com/antirez/disque - rq-dashboard
https://github.com/eoranged/rq-dashboard - Projekt RQ na PyPi
https://pypi.org/project/rq/ - rq-dashboard 0.3.12
https://pypi.org/project/rq-dashboard/ - Job queue
https://en.wikipedia.org/wiki/Job_queue - Why we moved from Celery to RQ
https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq - Running multiple workers using Celery
https://serverfault.com/questions/655387/running-multiple-workers-using-celery - celery — Distributed processing
http://docs.celeryproject.org/en/latest/reference/celery.html - Chains
https://celery.readthedocs.io/en/latest/userguide/canvas.html#chains - Routing
http://docs.celeryproject.org/en/latest/userguide/routing.html#automatic-routing - Celery Distributed Task Queue in Go
https://github.com/gocelery/gocelery/ - Python Decorators
https://wiki.python.org/moin/PythonDecorators - Periodic Tasks
http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html - celery.schedules
http://docs.celeryproject.org/en/latest/reference/celery.schedules.html#celery.schedules.crontab - Pros and cons to use Celery vs. RQ
https://stackoverflow.com/questions/13440875/pros-and-cons-to-use-celery-vs-rq - Priority queue
https://en.wikipedia.org/wiki/Priority_queue - Jupyter
https://jupyter.org/ - How IPython and Jupyter Notebook work
https://jupyter.readthedocs.io/en/latest/architecture/how_jupyter_ipython_work.html - Context Managers
http://book.pythontips.com/en/latest/context_managers.html