Implementace front zpráv podle normy POSIX

21. 11. 2019
Doba čtení: 39 minut

Sdílet

Na článek popisující implementaci front zpráv představených poprvé v Systemu V a od té doby používaných i v dalších unixech i v Linuxu dnes navážeme. Popíšeme si alternativní implementaci front, tentokrát podle normy POSIX.

Obsah

1. Implementace front zpráv podle normy POSIX

2. Knihovna, s níž je nutné slinkovat demonstrační příklady

3. Vytvoření fronty, její otevření a poslání zprávy

4. Úplný zdrojový kód aplikace pro otevření fronty a poslání zprávy

5. Otevření existující fronty, získání podrobnějších informací o frontě a přečtení zprávy

6. Úplný zdrojový kód aplikace pro otevření fronty a přečtení zprávy

7. Přečtení atributů fronty

8. Nastavení parametrů vytvářené fronty

9. Kontinuální posílání a příjem zpráv (vytvoření pipeline)

10. Utilitka pro vytvoření nové prázdné fronty

11. Klient pro posílání zpráv s periodou jedné sekundy

12. Klient pro kontinuální čtení zpráv z fronty

13. Ukázka komunikace mezi několika klienty

14. Použití signálů a handlerů pro příjem zpráv

15. Vylepšení předchozího příkladu – korektní alokace paměti pro zprávu v příjemci

16. Sledování front přes virtuální souborový systém

17. Informace o systému front získané přes souborový systém procfs

18. Repositář s demonstračními příklady

19. Odkazy na Internetu

1. Implementace front zpráv podle normy POSIX

V úterním článku jsme se seznámili s implementací front zpráv používanou pro meziprocesovou komunikaci. Jednalo se o technologii založenou na implementaci použité poprvé v Systemu V, což je mj. i jeden z přímých předchůdců Solarisu, ovšem vlastnosti Systemu V se postupně dostaly do dalších unixových systémů i do Linuxu. Tyto fronty zpráv lze v současnosti použít prakticky kdekoli a navíc se jedná o relativně jednoduše zvládnutelnou technologii, pro jejíž využití postačuje znát pouze tři příkazy (ipcs, ipcmk a ipcrm) a čtyři knihovní funkce (volatelné z céčka a tím pádem i z prakticky jakéhokoli programovacího jazyka s FFI či obdobnou technologií). Existuje ovšem i alternativní implementace front zpráv, která je definována v normě POSIX, kterou do větší či menší míry implementují všechny moderní unixy i systém Linux.

Tato implementace může vypadat z pohledu programátora nepatrně složitěji, protože je k dispozici větší množství funkcí. Namísto původních čtyř funkcí lze využít celkem devět funkcí, jejichž jména v mnoha případech přímo odpovídají příslušným syscallům (voláním služeb jádra operačního systému):

# Funkce Odpovídající syscall Stručný popis funkce
1 mq_open mq_open otevření, popř. i vytvoření fronty zpráv
2 mq_close close uzavření fronty zpráv
3 mq_unlink mq_unlink smazání fronty zpráv
       
3 mq_send mq_timedsend poslání zprávy do fronty
4 mq_timedsend mq_timedsend poslání zprávy do fronty se specifikací timeoutu
5 mq_receive mq_timedreceive přečtení zprávy z fronty
6 mq_timedreceive mq_timedreceive přečtení zprávy z fronty se specifikací timeoutu
       
7 mq_notify mq_notify registrace asynchronního upozornění na novou zprávu
       
8 mq_getattr mq_getsetattr získání parametrů vybrané fronty
9 mq_setattr mq_getsetattr změna parametrů vybrané fronty

Ovšem mezi oběma implementacemi jsou i další, mnohdy velmi podstatné rozdíly. V POSIXových frontách je fronta jednoznačně identifikována svým jménem, takže není zapotřebí používat relativně křehký koncept generovaných klíčů. Dále se POSIXové fronty chovají jako běžné file deskriptory, což je ostatně vidět i na tom, že funkce mq_close používá syscall close. A nakonec POSIXové fronty umožňují použít (poslat, přečíst) zprávy s prioritou, což je pro mnohé implementace velmi důležité.

2. Knihovna, s níž je nutné slinkovat demonstrační příklady

Minule popsané funkce, které se používají pro ovládání front zpráv podle Systemu V, jsou vlastně pouhými rozhraními na příslušné syscally jádra systému, o čemž se ostatně můžeme přesvědčit pohledem na manuálovou stránku:

$ man syscalls | grep \ \ msg
 
       msgctl(2)                   2.0           See notes on ipc(2)
       msgget(2)                   2.0           See notes on ipc(2)
       msgrcv(2)                   2.0           See notes on ipc(2)
       msgsnd(2)                   2.0           See notes on ipc(2)

V případě dnes popisovaného systému front je situace nepatrně odlišná, protože je nutné slinkovat všechny demonstrační příklady s knihovnou nazvanou librt, která je v systému dostupná jak ve formě archivu objektových souborů (.a), tak i sdílené knihovny (.so). V příkladech popsaných v následujících kapitolách je tedy nutné použít přepínač -lrt předaný linkeru. Úplná varianta souboru Makefile bude upravena následujícím způsobem (viz zvýrazněné řádky):

CC=gcc
LINKER=gcc
 
CFLAGS=-O0 -Wall -pedantic
LFLAGS=-lrt
 
.PHONY: clean
 
all:    publisher subscriber
 
%.o:    %.c
        $(CC) -c -o $@ $(CFLAGS) $<
 
publisher:      publisher.o
        $(CC) -o $@ $< $(LFLAGS)
 
subscriber:     subscriber.o
        $(CC) -o $@ $< $(LFLAGS)
 
clean:
        rm -f *.o \
        rm -f publisher \
        rm -f subscriber
Poznámka: u příkladů 4, 5 a 6 je soubor Makefile poněkud komplikovanější, protože se překládá větší množství nástrojů.

3. Vytvoření fronty, její otevření a poslání zprávy

V této kapitole si ukážeme, jak lze vytvořit novou frontu, otevřít ji a poslat do ní zprávy. Fronty (podle POSIXu) jsou identifikovány svým jménem, které musí začínat lomítkem, za nímž může následovat maximálně 254 alfanumerických znaků (kromě lomítka). Jméno fronty tedy nahrazuje původní koncept založený na vygenerovaném klíči. V prvním příkladu použijeme toto jméno:

#define QUEUE_NAME "/queue1"

Vytvoření a současně i otevření fronty zajišťuje funkce mq_open, které se předávají čtyři parametry. V prvním parametru je předáno jméno fronty, v parametru druhém režim otevření/vytvoření fronty, v parametru třetím režim vytvoření nové fronty a konečně v parametru čtvrtém ukazatel na strukturu s popisem dalších parametrů fronty. V našem konkrétním případě je fronta vytvořena (O_CREAT) a otevřena v režimu čtení i zápisu (O_RDWR), ovšem s tím, že pokud již fronta existuje, dojde k chybě (O_EXCL). Režim vytvoření fronty je 0770 (S_IRXWU a S_IRWXG). Žádné další parametry fronty prozatím nejsou specifikovány, takže se v posledním parametru namísto ukazatele předává NULL. Samozřejmě nesmíme zapomenout zkontrolovat, zda se vrátil handle fronty, či zda došlo k nějaké chybě:

mqd_t message_queue_id;
 
message_queue_id = mq_open(QUEUE_NAME, O_RDWR | O_CREAT | O_EXCL, S_IRWXU | S_IRWXG, NULL);
 
if (message_queue_id == -1) {
    perror("Unable to create queue");
    return 2;
}

Poslání zprávy je ve skutečnosti velmi jednoduché, protože potřebujeme znát pouze handle fronty (viz předchozí odstavec), samotný text zprávy (libovolná sekvence bajtů, tedy včetně řetězců), délku zprávy a její prioritu. Pro tuto chvíli budeme u všech zpráv používat prioritu nastavenou na jedničku:

int status;
 
status = mq_send(message_queue_id, message_text, strlen(message_text)+1, priority);
 
if (status == -1) {
    perror("Unable to send message");
    return 2;
}
Poznámka: povšimněte si, že délka zprávy je obecně různá; navíc nesmíme zapomenout započítat do délky zprávy i ukončující nulu.

Samotné poslání zprávy do fronty může být blokující i neblokující operace, což je rozdíl, který poznáme ve chvíli, kdy je fronta zaplněna (ve výchozím nastavení po poslání deseti zpráv). Implicitně je zápis do fronty blokující operace, což znamená, že se ve funkci mq_send aktuální vlákno zastaví a čeká na dobu, kdy dojde k uvolnění fronty.

Na konci programu je slušné frontu zavřít, a to zavoláním funkce mq_close. I tato funkce může vracet chybový stav, s ním ovšem již žádným dalším způsobem nebudeme pracovat, pouze program ukončíme:

int status;
 
status = mq_close(message_queue_id);
if (status == -1) {
    perror("Unable to close message queue");
    return 2;
}

4. Úplný zdrojový kód aplikace pro otevření fronty a poslání zprávy

Úplný zdrojový kód aplikace, která po svém spuštění vytvoří a otevře novou frontu, do které pošle zprávu, bude vypadat následovně (celý zdrojový kód lze nalézt na adrese https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example1/publisher.c):

#include <stdio.h>
#include <string.h>
 
#include <mqueue.h>
 
#define QUEUE_NAME "/queue1"
 
int main(void)
{
    mqd_t message_queue_id;
    unsigned int priority = 0;
    char message_text[100];
    int status;
 
    message_queue_id = mq_open(QUEUE_NAME, O_RDWR | O_CREAT | O_EXCL, S_IRWXU | S_IRWXG, NULL);
    if (message_queue_id == -1) {
        perror("Unable to create queue");
        return 2;
    }
 
    strcpy(message_text, "Hello world!");
 
    status = mq_send(message_queue_id, message_text, strlen(message_text)+1, priority);
    if (status == -1) {
        perror("Unable to send message");
        return 2;
    }
 
    status = mq_close(message_queue_id);
    if (status == -1) {
        perror("Unable to close message queue");
        return 2;
    }
 
    return 0;
}

5. Otevření existující fronty a přečtení zprávy

Příjemce zprávy bude taktéž potřebovat přístup k frontě. Tato fronta již ale musí existovat, takže volání funkce mq_open bude v tomto případě nepatrně jednodušší. Zadáme totiž pouze jméno fronty a režim otevření fronty (O_RDRW); další dva nepovinné parametry funkce mq_open není zapotřebí specifikovat:

#define QUEUE_NAME "/queue1"
 
mqd_t message_queue_id;
 
message_queue_id = mq_open(QUEUE_NAME, O_RDWR);
if (message_queue_id == -1) {
    perror("Unable to open queue");
    return 2;
}

Poněkud komplikovanější může být přečtení zprávy, protože nemáme k dispozici (resp. alespoň prozatím nevíme jak získat) délku zprávy. Zatím tedy vytvoříme dostatečně velký buffer, do kterého bude zpráva načítána a jak adresu, tak i délku tohoto bufferu předáme funkci mq_receive. Poslední parametr může být NULL, popř. může obsahovat ukazatel na proměnnou, do které se uloží priorita právě získané zprávy:

int status;
 
status = mq_receive(message_queue_id, message_text, sizeof(message_text), &sender);
if (status == -1) {
    perror("Unable to receive message");
    return 2;
}

I čtení z fronty je, podobně jako zápis do ní, buď operace blokující nebo neblokující. Ve výchozím nastavení se jedná o blokující operaci, což znamená, že pokud je fronta prázdná, bude funkce mq_receive čekat až do chvíle, kdy je do fronty nějaká zpráva zapsána.

Na konci pochopitelně nesmíme zapomenout frontu opět uzavřít s případnou kontrolou, zda se uzavření podařilo:

status = mq_close(message_queue_id);
if (status == -1) {
    perror("Unable to close message queue");
    return 2;
}

6. Úplný zdrojový kód aplikace pro otevření fronty a přečtení zprávy

Opět se podívejme na úplný zdrojový kód aplikace, po jejímž spuštění se otevře fronta „/queue1“ a přečte se z ní zpráva. Úplný zdrojový kód tohoto příkladu naleznete na adrese https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example1/subscriber.c:

#include <stdio.h>
#include <string.h>
 
#include <mqueue.h>
 
#define QUEUE_NAME "/queue1"
 
int main(void)
{
    mqd_t message_queue_id;
    char message_text[10000];
    unsigned int sender;
    int status;
 
    message_queue_id = mq_open(QUEUE_NAME, O_RDWR);
    if (message_queue_id == -1) {
        perror("Unable to open queue");
        return 2;
    }
 
    status = mq_receive(message_queue_id, message_text, sizeof(message_text), &sender);
    if (status == -1) {
        perror("Unable to receive message");
        return 2;
    }
    printf("Received message (%d bytes) from %d: %s\n", status, sender, message_text);
 
    status = mq_close(message_queue_id);
    if (status == -1) {
        perror("Unable to close message queue");
        return 2;
    }
 
    return 0;
}

7. Přečtení atributů fronty

V předchozím příkladu jsme řešili jeden problém – jak zjistit maximální možnou délku zprávy umístěné do fronty. Tato informace je ve skutečnosti součástí atributů fronty a tyto atributy je možné přečíst funkcí mq_getattr. Jedná se o funkci naplňující prvky struktury typu mq_attr:

struct mq_attr
{
  __syscall_slong_t mq_flags;   /* Message queue flags.  */
  __syscall_slong_t mq_maxmsg;  /* Maximum number of messages.  */
  __syscall_slong_t mq_msgsize; /* Maximum message size.  */
  __syscall_slong_t mq_curmsgs; /* Number of messages currently queued.  */
  __syscall_slong_t __pad[4];
};

Samotné přečtení atributů lze realizovat například tímto kódem:

struct mq_attr msgq_attr;
 
mq_getattr(message_queue_id, &msgq_attr);
printf("Queue: %s\n", QUEUE_NAME);
printf("Max. messages:     %ld\n", msgq_attr.mq_maxmsg);
printf("Current messages:  %ld\n", msgq_attr.mq_curmsgs);
printf("Max. message size: %ld\n", msgq_attr.mq_msgsize);

Získání těchto informací je možné prakticky kdykoli, jak je to ostatně ukázáno v dalším příkladu:

#include <stdio.h>
#include <string.h>
 
#include <mqueue.h>
 
#define QUEUE_NAME "/queue2"
 
int main(void)
{
    mqd_t message_queue_id;
    char message_text[10000];
    unsigned int sender;
    int status;
 
    message_queue_id = mq_open(QUEUE_NAME, O_RDWR);
    if (message_queue_id == -1) {
        perror("Unable to open queue");
        return 2;
    }
 
    {
        struct mq_attr msgq_attr;
        mq_getattr(message_queue_id, &msgq_attr);
        printf("Queue: %s\n", QUEUE_NAME);
        printf("Max. messages:     %ld\n", msgq_attr.mq_maxmsg);
        printf("Current messages:  %ld\n", msgq_attr.mq_curmsgs);
        printf("Max. message size: %ld\n", msgq_attr.mq_msgsize);
    }
 
    status = mq_receive(message_queue_id, message_text, sizeof(message_text), &sender);
    if (status == -1) {
        perror("Unable to receive message");
        return 2;
    }
    printf("Received message (%d bytes) from %d: %s\n", status, sender, message_text);
 
    status = mq_close(message_queue_id);
    if (status == -1) {
        perror("Unable to close message queue");
        return 2;
    }
 
    return 0;
}

8. Nastavení parametrů vytvářené fronty

Parametry fronty lze v případě potřeby i nastavit, ovšem v rámci mezí, o nichž se zmíníme v sedmnácté kapitole. Samotné nastavení se provádí přes datovou strukturu mq_attr:

struct mq_attr msgq_attr;
msgq_attr.mq_flags = 0;
msgq_attr.mq_maxmsg = 10;
msgq_attr.mq_msgsize = 20;
msgq_attr.mq_curmsgs = 1;

Ukazatel na tuto strukturu se předá do čtvrtého parametru funkce mq_open:

message_queue_id = mq_open(QUEUE_NAME, O_RDWR | O_CREAT | O_EXCL, S_IRWXU | S_IRWXG, &msgq_attr);
if (message_queue_id == -1) {
    perror("Unable to create queue");
    return 2;
}

Úplný zdrojový kód upraveného producenta zpráv může vypadat následovně:

#include <stdio.h>
#include <string.h>
 
#include <mqueue.h>
 
#define QUEUE_NAME "/queue3"
 
int main(void)
{
    mqd_t message_queue_id;
    unsigned int priority = 0;
    char message_text[100];
    int status;
 
    struct mq_attr msgq_attr;
    msgq_attr.mq_flags = 0;
    msgq_attr.mq_maxmsg = 10;
    msgq_attr.mq_msgsize = 20;
    msgq_attr.mq_curmsgs = 1;
 
    mq_unlink(QUEUE_NAME);
 
    message_queue_id = mq_open(QUEUE_NAME, O_RDWR | O_CREAT | O_EXCL, S_IRWXU | S_IRWXG, &msgq_attr);
    if (message_queue_id == -1) {
        perror("Unable to create queue");
        return 2;
    }
 
    strcpy(message_text, "Hello world!");
 
    status = mq_send(message_queue_id, message_text, strlen(message_text)+1, priority);
    if (status == -1) {
        perror("Unable to send message");
        return 2;
    }
 
    status = mq_close(message_queue_id);
    if (status == -1) {
        perror("Unable to close message queue");
        return 2;
    }
 
    return 0;
}

Konzument zpráv se prakticky nezmění, až na vylepšené vypsání informace o otevřené frontě do jediného řádku:

#include <stdio.h>
#include <string.h>
 
#include <mqueue.h>
 
#define QUEUE_NAME "/queue3"
 
int main(void)
{
    mqd_t message_queue_id;
    char message_text[10000];
    unsigned int sender;
    int status;
 
    message_queue_id = mq_open(QUEUE_NAME, O_RDWR);
    if (message_queue_id == -1) {
        perror("Unable to open queue");
        return 2;
    }
 
    {
        struct mq_attr msgq_attr;
        mq_getattr(message_queue_id, &msgq_attr);
        printf("Queue \"%s\":\n\t- stores at most %ld messages\n\t- large at most %ld bytes each\n\t- currently holds %ld messages\n", QUEUE_NAME, msgq_attr.mq_maxmsg, msgq_attr.mq_msgsize, msgq_attr.mq_curmsgs);
    }
 
    status = mq_receive(message_queue_id, message_text, sizeof(message_text), &sender);
    if (status == -1) {
        perror("Unable to receive message");
        return 2;
    }
    printf("Received message (%d bytes) from %d: %s\n", status, sender, message_text);
 
    status = mq_close(message_queue_id);
    if (status == -1) {
        perror("Unable to close message queue");
        return 2;
    }
 
    return 0;
}

9. Kontinuální posílání a příjem zpráv (vytvoření pipeline)

V navazujících kapitolách si ukážeme, jak lze realizovat kontinuální zápis a čtení zpráv s využitím jediné fronty a jak se systém bude chovat po připojení většího množství komunikujících uzlů k jedné frontě.

10. Utilitka pro vytvoření nové prázdné fronty

Pro vytvoření nové fronty, tedy pro obdobu příkazu ipcmk popsaného minule, můžeme použít následující jednoduchý program, který po svém spuštění smaže frontu /queue4 (pokud tedy taková fronta existuje) a vytvoří místo ní frontu novou s kapacitou deseti zpráv (což je většinou výchozí maximální povolená hodnota):

#include <stdio.h>
#include <string.h>
#include <unistd.h>
 
#include <mqueue.h>
 
#define QUEUE_NAME "/queue4"
 
int main(void)
{
    mqd_t message_queue_id;
 
    struct mq_attr msgq_attr;
    msgq_attr.mq_flags = 0;
    msgq_attr.mq_maxmsg = 10;
    msgq_attr.mq_msgsize = 20;
    msgq_attr.mq_curmsgs = 1;
 
    mq_unlink(QUEUE_NAME);
    message_queue_id = mq_open(QUEUE_NAME, O_RDWR | O_CREAT | O_EXCL, S_IRWXU | S_IRWXG, &msgq_attr);
    if (message_queue_id == -1) {
        perror("Unable to create queue");
        return 2;
    }
    return 0;
}
Poznámka: kapacita fronty pro pouhých deset zpráv může být pro mnoho účelů nedostatečná. V sedmnácté kapitole se seznámíme s jednou možností, jak kapacitu navýšit, pokud to ovšem nepřesáhne parametry nastavené při překladu jádra.

11. Klient pro posílání zpráv s periodou jedné sekundy

Klienta, který do fronty /queue4 posílá zprávy s frekvencí přibližně jedné sekundy, můžeme implementovat například následujícím způsobem:

#include <stdio.h>
#include <string.h>
#include <unistd.h>
 
#include <mqueue.h>
 
#define QUEUE_NAME "/queue4"
 
int main(void)
{
    mqd_t message_queue_id;
    unsigned int priority = 0;
    char message_text[100];
    int status;
    int msg_number = 1;
 
    struct mq_attr msgq_attr;
    msgq_attr.mq_flags = 0;
    msgq_attr.mq_maxmsg = 10;
    msgq_attr.mq_msgsize = 20;
    msgq_attr.mq_curmsgs = 1;
 
    message_queue_id = mq_open(QUEUE_NAME, O_WRONLY, S_IRWXU | S_IRWXG, &msgq_attr);
    if (message_queue_id == -1) {
        perror("Unable to create queue");
        return 2;
    }
 
    while (1) {
        struct mq_attr msgq_attr;
        sprintf(message_text, "Message #%d", msg_number);
        status = mq_send(message_queue_id, message_text, strlen(message_text)+1, priority);
        if (status == -1) {
            perror("Unable to send message");
            return 2;
        }
        mq_getattr(message_queue_id, &msgq_attr);
        printf("%ld/%ld\n", msgq_attr.mq_curmsgs, msgq_attr.mq_maxmsg);
        msg_number++;
 
        sleep(1);
    }
 
    status = mq_close(message_queue_id);
    if (status == -1) {
        perror("Unable to close message queue");
        return 2;
    }
 
    return 0;
}
Poznámka: toto řešení není zcela správné, protože při ukončení (kill, Ctrl+C) se explicitně nevolá funkce mq_close, ale spoléháme se na to, že se fronta uzavře v průběhu deaktivace programu.

12. Klient pro kontinuální čtení zpráv z fronty

Úprava předchozích konzumentů zpráv takovým způsobem, aby se zprávy četly kontinuálně, je snadná, jak je to ostatně ukázáno v dalším demonstračním příkladu:

#include <stdio.h>
#include <string.h>
 
#include <mqueue.h>
 
#define QUEUE_NAME "/queue4"
 
int main(void)
{
    mqd_t message_queue_id;
    char message_text[10000];
    unsigned int sender;
    int status;
 
    message_queue_id = mq_open(QUEUE_NAME, O_RDONLY);
    if (message_queue_id == -1) {
        perror("Unable to open queue");
        return 2;
    }
 
    {
        struct mq_attr msgq_attr;
        mq_getattr(message_queue_id, &msgq_attr);
        printf("Queue \"%s\":\n\t- stores at most %ld messages\n\t- large at most %ld bytes each\n\t- currently holds %ld messages\n", QUEUE_NAME, msgq_attr.mq_maxmsg, msgq_attr.mq_msgsize, msgq_attr.mq_curmsgs);
    }
 
    while (1) {
        status = mq_receive(message_queue_id, message_text, sizeof(message_text), &sender);
        if (status == -1) {
            perror("Unable to receive message");
            return 2;
        }
        printf("Received message (%d bytes) from %d: %s\n", status, sender, message_text);
    }
 
    status = mq_close(message_queue_id);
    if (status == -1) {
        perror("Unable to close message queue");
        return 2;
    }
 
    return 0;
}

13. Ukázka komunikace mezi několika klienty

Jeden producent a jeden konzument:

$ ./publisher
 
0/10
0/10
0/10
0/10
0/10
0/10
0/10
0/10
0/10
Poznámka: vypisuje se počet zpráv ve frontě a její kapacita.
$ ./subscriber 
 
Queue "/queue4":
        - stores at most 10 messages
        - large at most 20 bytes each
        - currently holds 0 messages
Received message (11 bytes) from 0: Message #1
Received message (11 bytes) from 0: Message #2
Received message (11 bytes) from 0: Message #3
Received message (11 bytes) from 0: Message #4
Received message (11 bytes) from 0: Message #5
Received message (11 bytes) from 0: Message #6

Připojení jednoho producenta a dvou konzumentů:

$ ./publisher
0/10
0/10
0/10
0/10
0/10
0/10
0/10
0/10
0/10
0/10
$ ./subscriber
 
Queue "/queue4":
        - stores at most 10 messages
        - large at most 20 bytes each
        - currently holds 0 messages
Received message (11 bytes) from 0: Message #1
Received message (11 bytes) from 0: Message #3
Received message (11 bytes) from 0: Message #5
Received message (11 bytes) from 0: Message #7
Received message (11 bytes) from 0: Message #9
$ ./subscriber 
 
Queue "/queue4":
        - stores at most 10 messages
        - large at most 20 bytes each
        - currently holds 0 messages
Received message (11 bytes) from 0: Message #2
Received message (11 bytes) from 0: Message #4
Received message (11 bytes) from 0: Message #6
Received message (11 bytes) from 0: Message #8
Received message (12 bytes) from 0: Message #10
Poznámka: vidíme, že se konzumenti dělí o práci.

Dva producenti a jeden konzument:

$ ./publisher
0/10
0/10
0/10
0/10
0/10
0/10
$ ./publisher
0/10
0/10
0/10
0/10
0/10
0/10
$ ./subscriber 
 
Queue "/queue4":
        - stores at most 10 messages
        - large at most 20 bytes each
        - currently holds 0 messages
Received message (11 bytes) from 0: Message #1
Received message (11 bytes) from 0: Message #1
Received message (11 bytes) from 0: Message #2
Received message (11 bytes) from 0: Message #2
Received message (11 bytes) from 0: Message #3
Received message (11 bytes) from 0: Message #3
Received message (11 bytes) from 0: Message #4
Received message (11 bytes) from 0: Message #4
Received message (11 bytes) from 0: Message #5
Poznámka: samozřejmě je možné vyzkoušet i složitější konfigurace.

14. Použití signálů a handlerů pro příjem zpráv

Zajímavý a poměrně užitečný koncept, s nímž se setkáme při práci s POSIXovými frontami zpráv, je možnost poslat signál (a následně ho zachytit v k tomu určeném handleru) ve chvíli, kdy je do fronty vložena nová zpráva. V následujícím demonstračním příkladu je tento koncept využit, protože samotná zpráva je zpracována v asynchronně spuštěném handleru. Nejdříve je nutné handler zaregistrovat, a to konkrétně s využitím funkce nazvané mq_notify. Registraci provedeme ve zvláštní uživatelské funkci, jejíž existence se nám bude později hodit:

void register_signal(mqd_t message_queue_id)
{
    struct sigevent sev;
    mqd_t mqdes;
 
    sev.sigev_notify = SIGEV_THREAD;
    sev.sigev_notify_function = on_message;
    sev.sigev_notify_attributes = NULL;
    sev.sigev_value.sival_ptr = &mqdes;
 
    if (mq_notify(message_queue_id, &sev) == -1)
    {
        perror("Unable to register event");
    }
    else
    {
        puts("Handler has been registered");
    }
}
Poznámka: povšimněte si, že se zde používá datová struktura sigevent, do které mj. vložíme ukazatel na funkci s implementací handleru (on_message). Samotná deklarace struktury sigevent je ve skutečnosti složitější, než její vlastní použití, protože vyplnit je nutné jen zvýrazněné položky (dvě z nich jsou dostupné i přes pomocná makra):
/* Structure to transport application-defined values with signals.  */
typedef struct sigevent
  {
    sigval_t sigev_value;
    int sigev_signo;
    int sigev_notify;
 
    union
      {
        int _pad[__SIGEV_PAD_SIZE];
 
        /* When SIGEV_SIGNAL and SIGEV_THREAD_ID set, LWP ID of the
           thread to receive the signal.  */
        __pid_t _tid;
 
        struct
          {
            void (*_function) (sigval_t);       /* Function to start.  */
            pthread_attr_t *_attribute;         /* Thread attributes.  */
          } _sigev_thread;
      } _sigev_un;
  } sigevent_t;
 
/* POSIX names to access some of the members.  */
#define sigev_notify_function   _sigev_un._sigev_thread._function
#define sigev_notify_attributes _sigev_un._sigev_thread._attribute

Samotná implementace handleru, v němž se z fronty přečte zpráva, je prozatím velmi přímočará, protože v ní využijeme globální proměnnou message_queue_id, v níž je uloženo ID fronty, s níž pracujeme. Přečtení zprávy v handleru se tedy nijak neodlišuje od předchozích demonstračních příkladů:

static void on_message(union sigval sv)
{
    unsigned int sender;
    char message_text[10000];
    int status;
 
    puts("On message...");
 
    status = mq_receive(message_queue_id, message_text, sizeof(message_text), &sender);
    if (status == -1) {
        perror("Unable to receive message");
        exit(2);
    }
    printf("Received message (%d bytes) from %d: %s\n", status, sender, message_text);
}

Na konci samotného handleru nesmíme zapomenout na to, že handler je zaregistrován pouze pro přijetí jediné zprávy. Pokud tedy budeme potřebovat podobným způsobem zpracovat větší množství zpráv, je nutné handler znovu zaregistrovat, což je nepatrná úprava, kterou provedeme v rámci dalšího demonstračního příkladu:

register_signal(message_queue_id);

Úplný zdrojový kód tohoto demonstračního příkladu vypadá následovně:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
 
#include <pthread.h>
#include <unistd.h>
#include <signal.h>
 
#include <mqueue.h>
 
#define QUEUE_NAME "/queue5"
 
mqd_t message_queue_id;
 
static void on_message(union sigval sv)
{
    unsigned int sender;
    char message_text[10000];
    int status;
 
    puts("On message...");
 
    status = mq_receive(message_queue_id, message_text, sizeof(message_text), &sender);
    if (status == -1) {
        perror("Unable to receive message");
        exit(2);
    }
    printf("Received message (%d bytes) from %d: %s\n", status, sender, message_text);
}
 
void register_signal(mqd_t message_queue_id)
{
    struct sigevent sev;
    mqd_t mqdes;
 
    sev.sigev_notify = SIGEV_THREAD;
    sev.sigev_notify_function = on_message;
    sev.sigev_notify_attributes = NULL;
    sev.sigev_value.sival_ptr = &mqdes;
 
    if (mq_notify(message_queue_id, &sev) == -1)
    {
        perror("Unable to register event");
    }
    else
    {
        puts("Handler has been registered");
    }
}
 
int main(void)
{
    int status;
 
    message_queue_id = mq_open(QUEUE_NAME, O_RDONLY);
    if (message_queue_id == -1) {
        perror("Unable to open queue");
        return 2;
    }
 
    register_signal(message_queue_id);
 
    pause();
 
    status = mq_close(message_queue_id);
    if (status == -1) {
        perror("Unable to close message queue");
        return 2;
    }
 
    return 0;
}
Poznámka: překladem a spuštěním producenta i konzumenta se můžeme snadno přesvědčit, že handler pro obsluhu zprávy z fronty je skutečně spuštěn pouze ve chvíli, kdy je do fronty zařazena nová zpráva. To znamená, že pokud je konzument spuštěn v okamžiku, kdy je fronta zaplněná, nezpracují se žádné zprávy, protože konzument nezíská informaci o tom, že přišla nová zpráva. Z tohoto důvodu můžete použít tento nástroj pro „vymazání“ fronty, přesněji řečeno pro přečtení všech zpráv, které se aktuálně ve frontě nachází (zde se používá neblokující čtení):
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
 
#include <mqueue.h>
 
#define QUEUE_NAME "/queue5"
 
int main(void)
{
    mqd_t message_queue_id;
    unsigned int sender;
    char message_text[10000];
    int status;
 
    message_queue_id = mq_open(QUEUE_NAME, O_RDONLY | O_NONBLOCK);
    if (message_queue_id == -1) {
        perror("Unable to get queue");
        return 2;
    }
 
    while (1) {
        status = mq_receive(message_queue_id, message_text, sizeof(message_text), &sender);
        if (status == -1) {
            if (errno == EAGAIN) {
                puts("Message queue is empty...");
                break;
            }
            perror("Unable to receive message");
            return 2;
        }
        printf("Received message (%d bytes) from %d: %s\n", status, sender, message_text);
    }
 
    status = mq_close(message_queue_id);
    if (status == -1) {
        perror("Unable to close message queue");
        return 2;
    }
 
    return 0;
}

15. Vylepšení předchozího příkladu – korektní alokace paměti pro zprávu v příjemci

Poslední demonstrační příklad, který si dnes ukážeme, vychází z příkladu předchozího. Došlo v něm k vylepšení alokace paměti, do které se ukládají zprávy získané z fronty. Namísto mnohdy zbytečně velkého (ovšem potenciálně nedostatečného) bufferu můžeme přímo z fronty získat informaci o tom, jak velký blok s daty se má očekávat:

long message_size;
 
if (mq_getattr(message_queue_id, &msgq_attr) == -1) {
    perror("Can not read message queue attributes");
}
 
message_size = msgq_attr.mq_msgsize;

Tento blok je následně naalokován, použit pro příjem zprávy a následně explicitně uvolněn z paměti:

message_text = malloc(message_size * sizeof(char));
if (message_text == NULL) {
    perror("Allocation error");
}
 
status = mq_receive(message_queue_id, message_text, message_size, &sender);
if (status == -1) {
    perror("Unable to receive message");
    exit(2);
}
printf("Received message (%d bytes) from %d: %s\n", status, sender, message_text);
free(message_text);

Taktéž na konci samotného handleru provedeme jeho opětovnou registraci, takže bude připraven přečíst a zpracovat další zprávy, které do fronty budou postupně přicházet:

register_signal(message_queue_id);

Další změny ve zdrojovém kódu příkladu jsou jen nepatrné:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
 
#include <pthread.h>
#include <unistd.h>
#include <signal.h>
 
#include <mqueue.h>
 
#define QUEUE_NAME "/queue6"
 
void register_signal(mqd_t message_queue_id);
 
mqd_t message_queue_id;
 
static void on_message(union sigval sv)
{
    unsigned int sender;
    struct mq_attr msgq_attr;
 
    char *message_text;
    int status;
    long message_size;
 
    puts("On message...");
    if (mq_getattr(message_queue_id, &msgq_attr) == -1) {
        perror("Can not read message queue attributes");
    }
 
    message_size = msgq_attr.mq_msgsize;
    message_text = malloc(message_size * sizeof(char));
    if (message_text == NULL) {
        perror("Allocation error");
    }
 
    status = mq_receive(message_queue_id, message_text, message_size, &sender);
    if (status == -1) {
        perror("Unable to receive message");
        exit(2);
    }
    printf("Received message (%d bytes) from %d: %s\n", status, sender, message_text);
    free(message_text);
    register_signal(message_queue_id);
}
 
void register_signal(mqd_t message_queue_id)
{
    struct sigevent sev;
    mqd_t mqdes;
 
    sev.sigev_notify = SIGEV_THREAD;
    sev.sigev_notify_function = on_message;
    sev.sigev_notify_attributes = NULL;
    sev.sigev_value.sival_ptr = &mqdes;
 
    if (mq_notify(message_queue_id, &sev) == -1)
    {
        perror("Unable to register event");
    }
    else
    {
        puts("Handler has been registered");
    }
}
 
int main(void)
{
    int status;
 
    message_queue_id = mq_open(QUEUE_NAME, O_RDONLY);
    if (message_queue_id == -1) {
        perror("Unable to open queue");
        return 2;
    }
 
    register_signal(message_queue_id);
 
    pause();
 
    status = mq_close(message_queue_id);
    if (status == -1) {
        perror("Unable to close message queue");
        return 2;
    }
 
    return 0;
}

16. Sledování front přes virtuální souborový systém

Mezi jednu z předností POSIXové implementace front zpráv patří fakt, že stav front je možné sledovat pouze s využitím základních nástrojů určených pro práci se soubory; není tedy nutné (a vlastně ani možné) používat minule popsané nástroje ipcs, ipcmk či ipcrm). Namísto toho lze příkazem mount připojit virtuální souborový systém, který bude informace o frontách obsahovat a poskytovat uživatelům. Tuto operaci je nutné provést s právy superuživatele (a to pouze za předpokladu, že již není virtuální souborový systém připraven):

# mkdir /dev/mqueue
# mount -t mqueue none /dev/mqueue

Dále již můžeme pracovat pod běžným uživatelským účtem, přesněji řečeno pod účtem toho uživatele, jehož proces fronty vytvořil. Získání seznamu front je triviální, neboť ho lze přečíst jako obsah adresáře /dev/mqueue:

$ ls -l /dev/mqueue/
 
total 0
-rwxrwx---. 1 ptisnovs ptisnovs 80 Nov 20 18:08 queue3
-rwxrwx---. 1 ptisnovs ptisnovs 80 Nov 20 18:08 queue4
-rwxrwx---. 1 ptisnovs ptisnovs 80 Nov 20 18:16 queue6
Poznámka: povšimněte si přístupových práv k frontám (-rwxrwx---). Tato práva přesně odpovídají příznakům, které jsme použili při volání funkce mq_open. Můžeme si pochopitelně vyzkoušet nastavit odlišná práva, například právo zápisu (jen zápisu) pro vlastníka a právo pro čtení pro skupinu:
#define QUEUE_NAME "/queue6"
 
mq_unlink(QUEUE_NAME);
message_queue_id = mq_open(QUEUE_NAME, O_RDWR | O_CREAT | O_EXCL, S_IWUSR | S_IRGRP, &msgq_attr);
if (message_queue_id == -1) {
    perror("Unable to create queue");
    return 2;
}

Nový obsah virtuálního souborového systému se změní, a to konkrétně u fronty pojmenované /queue6:

$ ls -l /dev/mqueue/
total 0
-rwxrwx---. 1 ptisnovs ptisnovs 80 Nov 20 18:08 queue3
-rwxrwx---. 1 ptisnovs ptisnovs 80 Nov 20 18:08 queue4
--w-r-----. 1 ptisnovs ptisnovs 80 Nov 20 18:33 queue6

Samozřejmě si můžeme prohlédnout obsah jednotlivých souborů reprezentujících vytvořené fronty, například nástrojem cat, otevřením v textovém editoru atd. atd.:

$ cat /dev/mqueue/queue3 
 
QSIZE:0          NOTIFY:0     SIGNO:0     NOTIFY_PID:0
$ cat /dev/mqueue/queue4
 
QSIZE:0          NOTIFY:0     SIGNO:0     NOTIFY_PID:0
$ cat /dev/mqueue/queue6
 
QSIZE:55         NOTIFY:0     SIGNO:0     NOTIFY_PID:0

V případě, že je spuštěn příjemce zpráv založený na použití signálů a handlerů, může situace v případě poslední fronty vypadat odlišně, protože se zobrazí informace o počtu notifikací i o ID procesu, který notifikace zpracovává:

$ cat /dev/mqueue/queue6
 
QSIZE:110        NOTIFY:2     SIGNO:0     NOTIFY_PID:619

17. Informace o systému front získané přes souborový systém procfs

Další informace o systému front zpráv jsou dostupné ve virtuálním souborovém systému procfs (viz například shrnující článek dostupný na adrese https://en.wikipedia.org/wi­ki/Procfs). Některé z těchto informací (či možná lépe řečeno atributů) je možné s příslušnými právy nastavit, další jsou určeny pouze pro čtení a jsou nastavovány při překladu jádra operačního systému. Konkrétně se jedná o následující soubory:

Soubor Význam
/proc/sys/fs/mqueue/msg_default kapacita zpráv pro nově vytvářené fronty
/proc/sys/fs/mqueue/msg_max maximální možná kapacita zpráv pro nově vytvářené fronty
/proc/sys/fs/mqueue/msgsize_default výchozí velikost zpráv pro nově vytvářené fronty
/proc/sys/fs/mqueue/msgsize_max maximální možná hodnota předchozí volby
/proc/sys/fs/mqueue/queues_max celkový počet front, které lze vytvořit

Výchozí hodnoty jsou odlišné podle verze jádra, ale pochopitelně je můžeme velmi snadno přečíst a dále zpracovat:

bitcoin_skoleni

$ cat /proc/sys/fs/mqueue/msg_default
10
 
$ cat /proc/sys/fs/mqueue/msg_max
10
 
$ cat /proc/sys/fs/mqueue/queues_max
256
 
$ cat /proc/sys/fs/mqueue/msgsize_default
8192
 
$ cat /proc/sys/fs/mqueue/msgsize_max
8192

Ke změně hodnot některých atributů (nikoli těch, které končí příponou _max) je nutné použít práva roota a projeví se při zakládání nových front.

18. Repositář s demonstračními příklady

Zdrojové kódy všech dnes popsaných demonstračních příkladů vyvinutých v programovacím jazyku Go byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má stále ještě doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce:

Příklad Zdrojový kód Stručný popis Cesta
1 publisher.c producent jediné zprávy https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example1/
2 subscriber.c příjemce jediné zprávy https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example1/
3 Makefile Makefile pro překlad a slinkování https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example1/Makefile
       
4 publisher.c vylepšení producenta, informace o frontě https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example2/
5 subscriber.c příjemce jediné zprávy https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example2/
6 Makefile Makefile pro překlad a slinkování https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example2/Makefile
       
7 publisher.c nastavení vlastností nové fronty https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example3/
8 subscriber.c příjemce jediné zprávy https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example3/
9 Makefile Makefile pro překlad a slinkování https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example3/Makefile
       
10 publisher.c posílání zpráv s periodou jedné sekundy https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example4/
11 subscriber.c kontinuální příjem zpráv https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example4/
12 create.c vytvoření nové prázdné fronty https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example4/
13 Makefile Makefile pro překlad a slinkování https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example4/Makefile
       
14 publisher.c posílání zpráv s periodou jedné sekundy https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example5/
15 subscriber.c kontinuální příjem zpráv založený na signálech a handleru https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example5/
16 create.c vytvoření nové prázdné fronty https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example5/
17 empty.c vyprázdnění fronty https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example5/
18 Makefile Makefile pro překlad a slinkování https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example5/Makefile
       
19 publisher.c posílání zpráv s periodou jedné sekundy https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example6/
20 subscriber.c kontinuální příjem zpráv založený na signálech a handleru https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example6/
21 create.c vytvoření nové prázdné fronty https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example6/
22 empty.c vyprázdnění fronty https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example6/
23 Makefile Makefile pro překlad a slinkování https://github.com/tisnik/message-queues-examples/blob/master/unix-messages/posix/example6/

19. Odkazy na Internetu

  1. procfs
    https://en.wikipedia.org/wiki/Procfs
  2. POSIX message queues in Linux
    https://www.softprayog.in/pro­gramming/interprocess-communication-using-posix-message-queues-in-linux
  3. How is a message queue implemented in the Linux kernel?
    https://unix.stackexchange­.com/questions/6930/how-is-a-message-queue-implemented-in-the-linux-kernel/6935
  4. ‘IPCS’ command in Linux with examples
    https://www.geeksforgeeks.org/ipcs-command-linux-examples/
  5. System V IPC: Message Queues
    https://nitish712.blogspot­.com/2012/11/system-v-ipc-message-queues.html
  6. How to create, check and delete IPC share memory, semaphare and message queue on linux
    https://fibrevillage.com/sysadmin/225-how-to-create-check-and-delete-ipc-share-memory-semaphare-and-message-queue-on-linux
  7. MQ_OVERVIEW(7): Linux Programmer's Manual
    http://man7.org/linux/man-pages/man7/mq_overview.7.html
  8. mq_overview (7) – Linux Man Pages
    https://www.systutorials.com/doc­s/linux/man/7-mq_overview/
  9. POSIX.4 Message Queues (+ rozšíření QNX)
    https://users.pja.edu.pl/~jms/qnx/hel­p/watcom/clibref/mq_overvi­ew.html
  10. System V message queues in Linux
    https://www.softprayog.in/pro­gramming/interprocess-communication-using-system-v-message-queues-in-linux
  11. Linux System V and POSIX IPC Examples
    http://hildstrom.com/projec­ts/ipc_sysv_posix/index.html
  12. Programming Tutorial – Linux: Message Queues
    https://ccppcoding.blogspot­.com/2013/03/linux-message-queues.html
  13. Go wrapper for POSIX Message Queues
    https://github.com/syucream/posix_mq
  14. Stránka projektu NSQ
    https://nsq.io/
  15. Dokumentace k projektu NSQ
    https://nsq.io/overview/design.html
  16. Dokumentace ke klientovi pro Go
    https://godoc.org/github.com/nsqio/go-nsq
  17. Dokumentace ke klientovi pro Python
    https://pynsq.readthedocs­.io/en/latest/
  18. Binární tarbally s NSQ
    https://nsq.io/deployment/in­stalling.html
  19. GitHub repositář projektu NSQ
    https://github.com/nsqio/nsq
  20. Klienti pro NSQ
    https://nsq.io/clients/cli­ent_libraries.html
  21. Klient pro Go
    https://github.com/nsqio/go-nsq
  22. Klient pro Python
    https://github.com/nsqio/pynsq
  23. An Example of Using NSQ From Go
    http://tleyden.github.io/blog/2014/11/12/an-example-of-using-nsq-from-go/
  24. Go Go Gadget
    https://word.bitly.com/pos­t/29550171827/go-go-gadget
  25. Simplehttp
    https://github.com/bitly/simplehttp
  26. Dramatiq: simple task processing
    https://dramatiq.io/
  27. Cookbook (for Dramatiq)
    https://dramatiq.io/cookbook.html
  28. Balíček dramatiq na PyPi
    https://pypi.org/project/dramatiq/
  29. Dramatiq dashboard
    https://github.com/Bogdan­p/dramatiq_dashboard
  30. Dramatiq na Redditu
    https://www.reddit.com/r/dramatiq/
  31. A Dramatiq broker that can be used with Amazon SQS
    https://github.com/Bogdan­p/dramatiq_sqs
  32. nanomsg na GitHubu
    https://github.com/nanomsg/nanomsg
  33. Referenční příručka knihovny nanomsg
    https://nanomsg.org/v1.1.5/na­nomsg.html
  34. nng (nanomsg-next-generation)
    https://github.com/nanomsg/nng
  35. Differences between nanomsg and ZeroMQ
    https://nanomsg.org/documentation-zeromq.html
  36. NATS
    https://nats.io/about/
  37. NATS Streaming Concepts
    https://nats.io/documenta­tion/streaming/nats-streaming-intro/
  38. NATS Streaming Server
    https://nats.io/download/nats-io/nats-streaming-server/
  39. NATS Introduction
    https://nats.io/documentation/
  40. NATS Client Protocol
    https://nats.io/documenta­tion/internals/nats-protocol/
  41. NATS Messaging (Wikipedia)
    https://en.wikipedia.org/wi­ki/NATS_Messaging
  42. Stránka Apache Software Foundation
    http://www.apache.org/
  43. Informace o portu 5672
    http://www.tcp-udp-ports.com/port-5672.htm
  44. Třída MessagingHandler knihovny Qpid Proton
    https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/pro­ton._handlers.MessagingHan­dler-class.html
  45. Třída Event knihovny Qpid Proton
    https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/pro­ton._events.Event-class.html
  46. package stomp (Go)
    https://godoc.org/github.com/go-stomp/stomp
  47. Go language library for STOMP protocol
    https://github.com/go-stomp/stomp
  48. python-qpid-proton 0.26.0 na PyPi
    https://pypi.org/project/python-qpid-proton/
  49. Qpid Proton
    http://qpid.apache.org/proton/
  50. Using the AMQ Python Client
    https://access.redhat.com/do­cumentation/en-us/red_hat_amq/7.1/html-single/using_the_amq_python_client/
  51. Apache ActiveMQ
    http://activemq.apache.org/
  52. Apache ActiveMQ Artemis
    https://activemq.apache.org/artemis/
  53. Apache ActiveMQ Artemis User Manual
    https://activemq.apache.or­g/artemis/docs/latest/index­.html
  54. KahaDB
    http://activemq.apache.or­g/kahadb.html
  55. Understanding the KahaDB Message Store
    https://access.redhat.com/do­cumentation/en-US/Fuse_MQ_Enterprise/7.1/html/Con­figuring_Broker_Persisten­ce/files/KahaDBOverview.html
  56. Command Line Tools (Apache ActiveMQ)
    https://activemq.apache.org/activemq-command-line-tools-reference.html
  57. stomp.py 4.1.21 na PyPi
    https://pypi.org/project/stomp.py/
  58. Stomp Tutorial
    https://access.redhat.com/do­cumentation/en-US/Fuse_Message_Broker/5.5/html/Con­nectivity_Guide/files/FMBCon­nectivityStompTelnet.html
  59. Heartbeat (computing)
    https://en.wikipedia.org/wi­ki/Heartbeat_(computing)
  60. Apache Camel
    https://camel.apache.org/
  61. Red Hat Fuse
    https://developers.redhat­.com/products/fuse/overvi­ew/
  62. Confusion between ActiveMQ and ActiveMQ-Artemis?
    https://serverfault.com/qu­estions/873533/confusion-between-activemq-and-activemq-artemis
  63. Staré stránky projektu HornetQ
    http://hornetq.jboss.org/
  64. Snapshot JeroMQ verze 0.4.4
    https://oss.sonatype.org/con­tent/repositories/snapshot­s/org/zeromq/jeromq/0.4.4-SNAPSHOT/
  65. Difference between ActiveMQ vs Apache ActiveMQ Artemis
    http://activemq.2283324.n4­.nabble.com/Difference-between-ActiveMQ-vs-Apache-ActiveMQ-Artemis-td4703828.html
  66. Microservices communications. Why you should switch to message queues
    https://dev.to/matteojoli­veau/microservices-communications-why-you-should-switch-to-message-queues–48ia
  67. Stomp.py 4.1.19 documentation
    https://stomppy.readthedoc­s.io/en/stable/
  68. Repositář knihovny JeroMQ
    https://github.com/zeromq/jeromq/
  69. ØMQ – Distributed Messaging
    http://zeromq.org/
  70. ØMQ Community
    http://zeromq.org/community
  71. Get The Software
    http://zeromq.org/intro:get-the-software
  72. PyZMQ Documentation
    https://pyzmq.readthedocs­.io/en/latest/
  73. Module: zmq.decorators
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.deco­rators.html
  74. ZeroMQ is the answer, by Ian Barber
    https://vimeo.com/20605470
  75. ZeroMQ RFC
    https://rfc.zeromq.org/
  76. ZeroMQ and Clojure, a brief introduction
    https://antoniogarrote.wor­dpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/
  77. zeromq/czmq
    https://github.com/zeromq/czmq
  78. golang wrapper for CZMQ
    https://github.com/zeromq/goczmq
  79. ZeroMQ version reporting in Python
    http://zguide.zeromq.org/py:version
  80. A Go interface to ZeroMQ version 4
    https://github.com/pebbe/zmq4
  81. Broker vs. Brokerless
    http://zeromq.org/whitepa­pers:brokerless
  82. Learning ØMQ with pyzmq
    https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/
  83. Céčková funkce zmq_ctx_new
    http://api.zeromq.org/4–2:zmq-ctx-new
  84. Céčková funkce zmq_ctx_destroy
    http://api.zeromq.org/4–2:zmq-ctx-destroy
  85. Céčková funkce zmq_bind
    http://api.zeromq.org/4–2:zmq-bind
  86. Céčková funkce zmq_unbind
    http://api.zeromq.org/4–2:zmq-unbind
  87. Céčková C funkce zmq_connect
    http://api.zeromq.org/4–2:zmq-connect
  88. Céčková C funkce zmq_disconnect
    http://api.zeromq.org/4–2:zmq-disconnect
  89. Céčková C funkce zmq_send
    http://api.zeromq.org/4–2:zmq-send
  90. Céčková C funkce zmq_recv
    http://api.zeromq.org/4–2:zmq-recv
  91. Třída Context (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#con­text
  92. Třída Socket (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#soc­ket
  93. Python binding
    http://zeromq.org/bindings:python
  94. Why should I have written ZeroMQ in C, not C++ (part I)
    http://250bpm.com/blog:4
  95. Why should I have written ZeroMQ in C, not C++ (part II)
    http://250bpm.com/blog:8
  96. About Nanomsg
    https://nanomsg.org/
  97. Advanced Message Queuing Protocol
    https://www.amqp.org/
  98. Advanced Message Queuing Protocol na Wikipedii
    https://en.wikipedia.org/wi­ki/Advanced_Message_Queuin­g_Protocol
  99. Dokumentace k příkazu rabbitmqctl
    https://www.rabbitmq.com/rab­bitmqctl.8.html
  100. RabbitMQ
    https://www.rabbitmq.com/
  101. RabbitMQ Tutorials
    https://www.rabbitmq.com/get­started.html
  102. RabbitMQ: Clients and Developer Tools
    https://www.rabbitmq.com/dev­tools.html
  103. RabbitMQ na Wikipedii
    https://en.wikipedia.org/wi­ki/RabbitMQ
  104. Streaming Text Oriented Messaging Protocol
    https://en.wikipedia.org/wi­ki/Streaming_Text_Oriented_Mes­saging_Protocol
  105. Message Queuing Telemetry Transport
    https://en.wikipedia.org/wiki/MQTT
  106. Erlang
    http://www.erlang.org/
  107. pika 0.12.0 na PyPi
    https://pypi.org/project/pika/
  108. Introduction to Pika
    https://pika.readthedocs.i­o/en/stable/
  109. Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
    http://clojurerabbitmq.info/
  110. AMQP 0–9–1 Model Explained
    http://www.rabbitmq.com/tutorials/amqp-concepts.html
  111. Part 1: RabbitMQ for beginners – What is RabbitMQ?
    https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html
  112. Downloading and Installing RabbitMQ
    https://www.rabbitmq.com/dow­nload.html
  113. celery na PyPi
    https://pypi.org/project/celery/
  114. Databáze Redis (nejenom) pro vývojáře používající Python
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/
  115. Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/
  116. Redis Queue (RQ)
    https://www.fullstackpython.com/redis-queue-rq.html
  117. Python Celery & RabbitMQ Tutorial
    https://tests4geeks.com/python-celery-rabbitmq-tutorial/
  118. Flower: Real-time Celery web-monitor
    http://docs.celeryproject­.org/en/latest/userguide/mo­nitoring.html#flower-real-time-celery-web-monitor
  119. Asynchronous Tasks With Django and Celery
    https://realpython.com/asynchronous-tasks-with-django-and-celery/
  120. First Steps with Celery
    http://docs.celeryproject­.org/en/latest/getting-started/first-steps-with-celery.html
  121. node-celery
    https://github.com/mher/node-celery
  122. Full Stack Python: web development
    https://www.fullstackpython.com/web-development.html
  123. Introducing RQ
    https://nvie.com/posts/introducing-rq/
  124. Asynchronous Tasks with Flask and Redis Queue
    https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue
  125. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  126. Stránky projektu Redis
    https://redis.io/
  127. Introduction to Redis
    https://redis.io/topics/introduction
  128. Try Redis
    http://try.redis.io/
  129. Redis tutorial, April 2010 (starší, ale pěkně udělaný)
    https://static.simonwilli­son.net/static/2010/redis-tutorial/
  130. Python Redis
    https://redislabs.com/lp/python-redis/
  131. Redis: key-value databáze v paměti i na disku
    https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/
  132. Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
    http://www.cloudsvet.cz/?p=253
  133. Praktický úvod do Redis (2): transakce
    http://www.cloudsvet.cz/?p=256
  134. Praktický úvod do Redis (3): cluster
    http://www.cloudsvet.cz/?p=258
  135. Connection pool
    https://en.wikipedia.org/wi­ki/Connection_pool
  136. Instant Redis Sentinel Setup
    https://github.com/ServiceStack/redis-config
  137. How to install REDIS in LInux
    https://linuxtechlab.com/how-install-redis-server-linux/
  138. Redis RDB Dump File Format
    https://github.com/sripat­hikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format
  139. Lempel–Ziv–Welch
    https://en.wikipedia.org/wi­ki/Lempel%E2%80%93Ziv%E2%80%93­Welch
  140. Redis Persistence
    https://redis.io/topics/persistence
  141. Redis persistence demystified
    http://oldblog.antirez.com/post/redis-persistence-demystified.html
  142. Redis reliable queues with Lua scripting
    http://oldblog.antirez.com/post/250
  143. Ost (knihovna)
    https://github.com/soveran/ost
  144. NoSQL
    https://en.wikipedia.org/wiki/NoSQL
  145. Shard (database architecture)
    https://en.wikipedia.org/wi­ki/Shard_%28database_archi­tecture%29
  146. What is sharding and why is it important?
    https://stackoverflow.com/qu­estions/992988/what-is-sharding-and-why-is-it-important
  147. What Is Sharding?
    https://btcmanager.com/what-sharding/
  148. Redis clients
    https://redis.io/clients
  149. Category:Lua-scriptable software
    https://en.wikipedia.org/wi­ki/Category:Lua-scriptable_software
  150. Seriál Programovací jazyk Lua
    https://www.root.cz/seria­ly/programovaci-jazyk-lua/
  151. Redis memory usage
    http://nosql.mypopescu.com/pos­t/1010844204/redis-memory-usage
  152. Ukázka konfigurace Redisu pro lokální testování
    https://github.com/tisnik/pre­sentations/blob/master/re­dis/redis.conf
  153. Resque
    https://github.com/resque/resque
  154. Nested transaction
    https://en.wikipedia.org/wi­ki/Nested_transaction
  155. Publish–subscribe pattern
    https://en.wikipedia.org/wi­ki/Publish%E2%80%93subscri­be_pattern
  156. Messaging pattern
    https://en.wikipedia.org/wi­ki/Messaging_pattern
  157. Using pipelining to speedup Redis queries
    https://redis.io/topics/pipelining
  158. Pub/Sub
    https://redis.io/topics/pubsub
  159. ZeroMQ distributed messaging
    http://zeromq.org/
  160. ZeroMQ: Modern & Fast Networking Stack
    https://www.igvita.com/2010/09/03/ze­romq-modern-fast-networking-stack/
  161. Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
    https://stackoverflow.com/qu­estions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr
  162. Python & Redis PUB/SUB
    https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7
  163. Message broker
    https://en.wikipedia.org/wi­ki/Message_broker
  164. RESP Arrays
    https://redis.io/topics/protocol#array-reply
  165. Redis Protocol specification
    https://redis.io/topics/protocol
  166. Redis Pub/Sub: Intro Guide
    https://www.redisgreen.net/blog/pubsub-intro/
  167. Redis Pub/Sub: Howto Guide
    https://www.redisgreen.net/blog/pubsub-howto/
  168. Comparing Publish-Subscribe Messaging and Message Queuing
    https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message
  169. Apache Kafka
    https://kafka.apache.org/
  170. Iron
    http://www.iron.io/mq
  171. kue (založeno na Redisu, určeno pro node.js)
    https://github.com/Automattic/kue
  172. Cloud Pub/Sub
    https://cloud.google.com/pubsub/
  173. Introduction to Redis Streams
    https://redis.io/topics/streams-intro
  174. glob (programming)
    https://en.wikipedia.org/wi­ki/Glob_(programming)
  175. Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
    https://www.slideshare.net/syl­vinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2
  176. Enqueueing internals
    http://python-rq.org/contrib/
  177. queue — A synchronized queue class
    https://docs.python.org/3/li­brary/queue.html
  178. Queue – A thread-safe FIFO implementation
    https://pymotw.com/2/Queue/
  179. Queues
    http://queues.io/
  180. Windows Subsystem for Linux Documentation
    https://docs.microsoft.com/en-us/windows/wsl/about
  181. RestMQ
    http://restmq.com/
  182. ActiveMQ
    http://activemq.apache.org/
  183. Amazon MQ
    https://aws.amazon.com/amazon-mq/
  184. Amazon Simple Queue Service
    https://aws.amazon.com/sqs/
  185. Celery: Distributed Task Queue
    http://www.celeryproject.org/
  186. Disque, an in-memory, distributed job queue
    https://github.com/antirez/disque
  187. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  188. Projekt RQ na PyPi
    https://pypi.org/project/rq/
  189. rq-dashboard 0.3.12
    https://pypi.org/project/rq-dashboard/
  190. Job queue
    https://en.wikipedia.org/wi­ki/Job_queue
  191. Why we moved from Celery to RQ
    https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq
  192. Running multiple workers using Celery
    https://serverfault.com/qu­estions/655387/running-multiple-workers-using-celery
  193. celery — Distributed processing
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.html
  194. Chains
    https://celery.readthedoc­s.io/en/latest/userguide/can­vas.html#chains
  195. Routing
    http://docs.celeryproject­.org/en/latest/userguide/rou­ting.html#automatic-routing
  196. Celery Distributed Task Queue in Go
    https://github.com/gocelery/gocelery/
  197. Python Decorators
    https://wiki.python.org/mo­in/PythonDecorators
  198. Periodic Tasks
    http://docs.celeryproject­.org/en/latest/userguide/pe­riodic-tasks.html
  199. celery.schedules
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.schedules.html#celery­.schedules.crontab
  200. Pros and cons to use Celery vs. RQ
    https://stackoverflow.com/qu­estions/13440875/pros-and-cons-to-use-celery-vs-rq
  201. Priority queue
    https://en.wikipedia.org/wi­ki/Priority_queue
  202. Jupyter
    https://jupyter.org/
  203. How IPython and Jupyter Notebook work
    https://jupyter.readthedoc­s.io/en/latest/architectu­re/how_jupyter_ipython_wor­k.html
  204. Context Managers
    http://book.pythontips.com/en/la­test/context_managers.html

Autor článku

Vystudoval VUT FIT a v současné době pracuje na projektech vytvářených v jazycích Python a Go.