Dokončení popisu komunikačních strategií poskytovaných knihovnou nanomsg

2. 5. 2019
Doba čtení: 47 minut

Sdílet

V dnešní části seriálu o message brokerech dokončíme popis základních komunikačních strategií poskytovaných knihovnou nanomsg. Všechny strategie budou pochopitelně implementovány i v demonstračních příkladech.

Obsah

1. Komunikační strategie Publish-Subscribe

2. Zpracování většího množství zpráv při použití strategie Publish-Subscribe

3. Nastavení tématu (topic) pro strategii Publish-Subscribe

4. Implementace producenta zpráv

5. Implementace různých konzumentů zpráv připojených k nastaveným tématům

6. Komunikační strategie Request-Response

7. Implementace klienta používajícího strategii Request-Response

8. Implementace serveru zpracovávajícího požadavky

9. Server zpracovávající dotazy v jediném vláknu

10. Chování serveru při zpracování velkého množství dotazů

11. Komunikační strategie Survey

12. Implementace klienta

13. Implementace serveru

14. Ukázka poslání otázky a zpracování odpovědí

15. Komunikační strategie Bus

16. Implementace prvního uzlu

17. Implementace druhého uzlu

18. Repositář s demonstračními příklady

19. Odkazy na předchozí části seriálu o message brokerech

20. Odkazy na Internetu

1. Komunikační strategie Publish-Subscribe

V předchozí části seriálu o message brokerech jsme se seznámili s některými možnostmi poskytovanými nativní knihovnou (naprogramovanou v céčku) nazvanou nanomsg. Jedná se o ideového nástupce knihovny ØMQ, kterou jsme si již v tomto seriálu popsali [1] [2] [3] [4]. Oproti ØMQ došlo z pohledu vývojáře k několika podstatným změnám a současně i ke zjednodušením, takže se například nemusíme starat o takzvaný kontext. S využitím knihovny nanomsg je možné vytvářet aplikace (resp. části aplikace), které mezi sebou komunikují pomocí šesti typů strategií, které jsou vypsány v následující tabulce:

# Strategie/vzor Stručný popis významu strategie
1 PAIR jedna z nejjednodušších komunikačních strategií s dvojicí uzlů a vazbou 1:1; komunikace je obecně obousměrná (samozřejmě lze použít i komunikaci jednosměrnou)
2 BUS složitější strategie, v níž se používá obecnější vazba M:N; tuto strategii si popíšeme v závěrečné části článku
3 PUBSUB klasická komunikační strategie PUB-SUB neboli PUBLISH-SUBSCRIBE
4 REQREP klasická komunikační strategie REQ-REP neboli REQUEST-RESPONSE, bude popsána v navazujících kapitolách
5 PIPELINE jednosměrná komunikace buď s vazbami 1:1 (jeden vysílač a jeden přijímač), popř. mezi více vysílači a několika přijímači
6 SURVEY speciální strategie umožňující získat stav více uzlů (procesů) jediným dotazem a mnoha odpovědmi; tato zcela nová strategie bude popsána v navazujících kapitolách

Strategii pojmenovanou PIPELINE a zajišťující jednosměrný přenos zpráv od vysílající aplikace (vlákna, procesu) k aplikaci přijímající jsme si již popsali minule, konkrétně v osmé kapitole. Jedná se čistě o jednosměrnou komunikaci naznačenou na prvním obrázku:

Obrázek 1: Jednosměrná komunikace využívající strategii PIPELINE.

Složitější komunikační schéma je zajištěno strategií PAIR popsané minule ve dvanácté kapitole, která umožňuje buď jednosměrné posílání zpráv z prvního uzlu do uzlu druhého:

Obrázek 2: Jednosměrná komunikace využívající strategii PAIR-PAIR.

nebo (a to častěji) obousměrné posílání zpráv:

Obrázek 3: Obousměrná komunikace využívající strategii PAIR-PAIR.

A konečně jsme si minule v sedmnácté kapitole ukázali (i když prozatím ve stručnosti) strategii typu PUB-SUB či zkráceně pouze PUBSUB. Tato velmi často používaná strategie umožňuje rozesílat zprávy libovolnému množství příjemců (či v mezním případě žádnému příjemci):

Obrázek 4: Jednosměrná komunikace využívající strategii PUB-SUB.

V dnešním článku nejdříve dokončíme popis strategie PUB-SUB a posléze se zaměříme na tři zbývající strategie REQREP, SURVEY a BUS, které jsou již nepatrně složitější, ovšem v některých případech i užitečnější.

Všechny dále popsané demonstrační příklady budou vyvinuty v programovacím jazyku C (většina jich je kompatibilních s ANSI C, pouze u některých požadujeme funkce, které v tomto standardu neexistují), protože rozhraními pro další programovací jazyky se budeme zabývat v navazující části tohoto seriálu. Příklady jsou navíc napsány s ohledem na co největší stručnost zápisu, takže u některých z nich nejsou provedeny všechny kontroly chyb, jež by se ovšem v produkčním kódu pochopitelně měly použít (na případné možnosti vylepšení se ovšem taktéž zaměříme).

Pro překlad, slinkování a spuštění je u každého příkladu použit Makefile, takže využijete příkaz/nástroj make, který by měl být nainstalován.

2. Zpracování většího množství zpráv při použití strategie Publish-Subscribe

V úvodním článku o knihovně nanomsg jsme si mj. popsali i velmi často používanou komunikační strategii nazvanou PUB-SUB, v níž spolu komunikují dva typy uzlů: zdroje zpráv (publishers) a konzumenti zpráv (subscribers). Konzumenti se mohou k odebírání zpráv kdykoli přihlásit a kdykoli se také odpojit, což zdroj zpráv nijak neovlivní – ten bude posílat zprávy nezávisle na tom, kdo je přijímá (v knihovně nanomsg se i filtrace zpráv na základě témat/topiců provádí až na straně konzumenta, nikoli na straně zdroje!).

Toto chování, v němž zcela cíleně chybí většina kontrolních a řídicích Lmechanismů, má ovšem jeden důsledek, který je poměrně problematický a který vyplývá z toho, že nanomsg ve skutečnosti není klasickým message brokerem, ale jen „pouhou“ knihovnou: pokud konzument zpráv nebude mít k dispozici dostatečné množství prostředků ke zpracování zpráv, může se stát, že nějakou zprávu nepřijme. Popsané chování si můžeme velmi snadno odsimulovat, a to tak, že vytvoříme producenta, který odešle předem nastavený počet zpráv, ovšem mezi posláním zpráv nebudou žádné pauzy – producent tedy bude zprávy posílat maximální možnou rychlostí.

Úplný zdrojový kód producenta vypadá následovně:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>
 
const int MAX_MESSAGES = 1000;
 
const char *URL = "ipc:///tmp/example7";
 
void report_error(const char *func)
{
    fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
    exit(1);
}
 
void send_message(const int socket, const char *message)
{
    int bytes;
 
    printf("Sending message '%s'\n", message);
    if ((bytes = nn_send(socket, message, strlen(message)+1, 0)) < 0) {
        report_error("nn_send");
    }
    printf("Message with length %d bytes sent, flushing\n", bytes);
    sleep(1);
}
 
void publisher(const char *url)
{
    int socket;
    int endpoint;
    int i;
 
    if ((socket = nn_socket(AF_SP, NN_PUB)) < 0) {
        report_error("nn_socket");
    }
    puts("Socket created");
 
    if ((endpoint = nn_bind(socket, url)) < 0) {
        report_error("nn_bind");
    }
    puts("Remote endpoint bound to the socket");
 
    for (i=0; i < MAX_MESSAGES; i++) {
        char buffer[45];
        int number = rand() % 10000;
        sprintf(buffer, "Hello, this is my top secret PIN: %04d", number);
        send_message(socket, buffer);
    }
 
    if (nn_shutdown(socket, endpoint) < 0) {
        report_error("nn_shutdown");
    }
    puts("Shutdown completed");
}
 
int main(const int argc, const char **argv)
{
    publisher(URL);
    return 0;
}

Samotný konzument, jehož úplný zdrojový kód je zobrazen pod tímto odstavcem, zůstal prakticky ve stejné podobě, s jakou jsme se seznámili v předchozím článku:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>
 
const char *URL = "ipc:///tmp/example7";
 
void report_error(const char *func)
{
    fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
    exit(1);
}
 
void receive_message(const int socket)
{
    char *message = NULL;
    int bytes;
 
    if ((bytes = nn_recv(socket, &message, NN_MSG, 0)) < 0) {
        report_error("nn_recv");
    }
    printf("Received message '%s' with length %d bytes\n", message, bytes);
    if (nn_freemsg(message) < 0) {
        report_error("nn_freemsg");
    }
}
 
void subscriber(const char *url)
{
    int socket;
    int endpoint;
    int messages;
 
    if ((socket = nn_socket(AF_SP, NN_SUB)) < 0) {
        report_error("nn_socket");
    }
    puts("Socket created");
 
    if (nn_setsockopt(socket, NN_SUB, NN_SUB_SUBSCRIBE, "", 0) < 0) {
        report_error("nn_setsockopt");
    }
 
    if ((endpoint = nn_connect(socket, url)) < 0) {
        report_error("nn_connect");
    }
    puts("Endpoint connected to socket");
 
    puts("Waiting for messages...");
 
    while (1) {
        receive_message(socket);
        messages++;
        printf("Processed %d messages so far\n", messages);
    }
 
    if (nn_shutdown(socket, endpoint) < 0) {
        report_error("nn_shutdown");
    }
    puts("Shutdown completed");
}
 
int main(int argc, char **argv)
{
    subscriber(URL);
    return 0;
}

Podívejme se ještě na soubor Makefile využitý jak pro překlad, tak i spuštění producenta i konzumenta:

CC=gcc
LINKER=gcc
 
LIBS=nanomsg
 
CFLAGS=-O0 -Wall -ansi -pedantic
LFLAGS=-l$(LIBS)
 
LIBRARY_PATH=/usr/local/lib64/
 
.PHONY: clean run_publisher run_subscriber
 
all:    publisher subscriber
 
%.o:    %.c
        $(CC) -c -o $@ $(CFLAGS) $<
 
publisher:      publisher.o
        $(CC) -o $@ $(LFLAGS) $<
 
subscriber:     subscriber.o
        $(CC) -o $@ $(LFLAGS) $<
 
clean:
        rm -f publisher.o \
        rm -f subscriber.o \
        rm -f publisher \
        rm -f subscriber
 
run_publisher:
        LD_LIBRARY_PATH=$(LIBRARY_PATH) ./publisher
 
run_subscriber:
        LD_LIBRARY_PATH=$(LIBRARY_PATH) ./subscriber

Podívejme se nyní na chování takto vytvořené architektury sestávající ze dvou komunikujících uzlů – jednoho producenta a jednoho konzumenta zpráv. Nejprve, například z nového terminálu, přeložíme a spustíme konzumenta zpráv:

$ make consumer
 
$ make run_consumer
 

Dále pochopitelně budeme muset přeložit a spustit producenta:

$ make producer
 
$ make run_producer
 

Chování producenta je jednoduché a snadno pochopitelné – bude postupně vytvářet a posílat (publikovat) zadaný počet zpráv.

Konzument se bude snažit tyto zprávy přijímat, ovšem (v závislosti na mnoha faktorech) se mu to nemusí vždy podařit a může nějakou zprávu vynechat:

...
...
...
Processed 564 messages so far
Received message 'Hello, this is my top secret PIN: 8382' with length 39 bytes
Processed 565 messages so far
Received message 'Hello, this is my top secret PIN: 5421' with length 39 bytes
Processed 566 messages so far
Poznámka: můžeme zde vidět, že konzument zpracoval pouze 566 zpráv z celkového počtu 1000.

Na toto chování můžeme v praxi narazit v mnoha případech. Zde se konkrétně jedná o fakt, že je jak producent, tak i konzument spuštěn na jednom počítači, takže záleží na tom, kdy dojde k přepnutí kontextu. U aplikací, jejichž uzly jsou umístěny na různých počítačích a pro komunikaci se používá síť, se může situace dále zkomplikovat. Jedno z možných řešení spočívá v použití takzvaného zařízení (device) a v lepší správě bufferů na straně producenta, ovšem my si vyzkoušíme, jak se chování změní ve chvíli, kdy se mezi posláním jednotlivých zpráv vloží krátká pauza. Úprava bude provedena následujícím způsobem:

void send_message(const int socket, const char *message)
{
    int bytes;
 
    printf("Sending message '%s'\n", message);
    if ((bytes = nn_send(socket, message, strlen(message)+1, 0)) < 0) {
        report_error("nn_send");
    }
    printf("Message with length %d bytes sent, flushing\n", bytes);
    usleep(100*1000);
}

S výsledkem, který je již mnohem lepší:

...
...
...
Processed 96 messages so far
Received message '8586 is my top secret PIN' with length 26 bytes
Processed 97 messages so far
Received message '8094 is my top secret PIN' with length 26 bytes
Processed 98 messages so far
Received message '7539 is my top secret PIN' with length 26 bytes
Processed 99 messages so far

Zbývá nám vyřešit ještě jeden problém, který spočívá v tom, že konzument nepřijme první poslanou zprávu. Je to ostatně patrné i při pohledu na jeho výstup zobrazený před tímto odstavcem.

Toto chování je ve skutečnosti možné napravit, a to zcela jednoduše. Postačuje totiž, aby se vložila krátká pauza mezi voláním funkcí nn_bind() a vlastním posíláním zpráv:

if ((endpoint = nn_bind(socket, url)) < 0) {
    report_error("nn_bind");
}
puts("Remote endpoint bound to the socket");
 
sleep(1);
 
for (i=0; i < MAX_MESSAGES; i++) {
    char buffer[45];
    int number = rand() % 10000;
    sprintf(buffer, "%04d is my top secret PIN", number);
    send_message(socket, buffer);
}

Výsledek je již mnohem lepší a bude odpovídat očekávanému chování:

...
...
...
Processed 97 messages so far
Received message '8586 is my top secret PIN' with length 26 bytes
Processed 98 messages so far
Received message '8094 is my top secret PIN' with length 26 bytes
Processed 99 messages so far
Received message '7539 is my top secret PIN' with length 26 bytes
Processed 100 messages so far
Poznámka: zde jsme poprvé narazili na to, že knihovna nanomsg je poměrně nízkoúrovňová, takže je nutné při tvorbě aplikací myslet i na chování konkrétních operačních systémů. V knihovnách napsaných nad nanomsg by měly být tyto stavy vyřešeny takovým způsobem, aby se o ně již nemuseli programátoři sami starat.

3. Nastavení tématu (topic) pro strategii Publish-Subscribe

Při použití komunikační strategie PUB-SUB se velmi často setkáme s požadavkem na to, aby se zprávy filtrovaly podle nastaveného tématu (topic). V klasických implementacích message brokerů bývají zprávy rozšířeny o hlavičku a téma bývá součástí této hlavičky. Ovšem knihovna nanomsg je nízkoúrovňová, takže samotnou zprávu považuje za dále nijak neinterpretovaná binární data. Tématem je poté libovolný prefix, tj. několik prvních znaků zprávy, přičemž filtrace probíhá až u konzumenta, nikoli u zdroje zpráv. Samotné nastavení filtrace zpráv lze provést funkcí nn_setsockopt(), které se předá prefix a jeho délka:

if (nn_setsockopt(socket, NN_SUB, NN_SUB_SUBSCRIBE, topic, strlen(topic)) < 0) {
    report_error("nn_setsockopt");
}
else {
    printf("Topic set to '%s'\n", topic);
}

4. Implementace producenta zpráv

Zdroj (producent) zpráv, které budou filtrovány na základě nastaveného tématu na straně konzumentů, se nebude nijak zásadně lišit od již popsaných producentů, pouze si vystačíme jen se sto zprávami. Jeho zdrojový kód je následující:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>
 
const int MAX_MESSAGES = 100;
 
const char *URL = "ipc:///tmp/example10";
 
void report_error(const char *func)
{
    fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
    exit(1);
}
 
void send_message(const int socket, const char *message)
{
    int bytes;

    printf("Sending message '%s'\n", message);
    if ((bytes = nn_send(socket, message, strlen(message)+1, 0)) < 0) {
        report_error("nn_send");
    }
    printf("Message with length %d bytes sent, flushing\n", bytes);
    usleep(100*1000);
}
 
void publisher(const char *url)
{
    int socket;
    int endpoint;
    int i;
 
    if ((socket = nn_socket(AF_SP, NN_PUB)) < 0) {
        report_error("nn_socket");
    }
    puts("Socket created");
 
    if ((endpoint = nn_bind(socket, url)) < 0) {
        report_error("nn_bind");
    }
    puts("Remote endpoint bound to the socket");
 
    for (i=0; i < MAX_MESSAGES; i++) {
        char buffer[45];
        int number = rand() % 10000;
        sprintf(buffer, "%04d is my top secret PIN", number);
        send_message(socket, buffer);
    }
 
    if (nn_shutdown(socket, endpoint) < 0) {
        report_error("nn_shutdown");
    }
    puts("Shutdown completed");
}
 
int main(const int argc, const char **argv)
{
    publisher(URL);
    return 0;
}
Poznámka: povšimněte si, že zprávy vždy začínají celým čtyřciferným číslem od 0000 do 9999.

5. Implementace různých konzumentů zpráv připojených k nastaveným tématům

Zatímco u producenta došlo jen k nepatrné změně, konkrétně k přesunu čísla s PINem na začátek zprávy, budou konzumenti odlišní. První z nich bude zpracovávat jen ty zprávy, které začínají znakem „0“, které je zde považováno za téma:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/pubsub.h>
 
const char *URL = "ipc:///tmp/example10";
 
void report_error(const char *func)
{
    fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
    exit(1);
}
 
void receive_message(const int socket)
{
    char *message = NULL;
    int bytes;
 
    if ((bytes = nn_recv(socket, &message, NN_MSG, 0)) < 0) {
        report_error("nn_recv");
    }
    printf("Received message '%s' with length %d bytes\n", message, bytes);
    if (nn_freemsg(message) < 0) {
        report_error("nn_freemsg");
    }
}
 
void subscriber(const char *url)
{
    const char *topic = "0";
 
    int socket;
    int endpoint;
    int messages;
 
    if ((socket = nn_socket(AF_SP, NN_SUB)) < 0) {
        report_error("nn_socket");
    }
    puts("Socket created");
 
    if (nn_setsockopt(socket, NN_SUB, NN_SUB_SUBSCRIBE, topic, strlen(topic)) < 0) {
        report_error("nn_setsockopt");
    }
    else {
        printf("Topic set to '%s'\n", topic);
    }
 
    if ((endpoint = nn_connect(socket, url)) < 0) {
        report_error("nn_connect");
    }
    puts("Endpoint connected to socket");
 
    puts("Waiting for messages...");
 
    while (1) {
        receive_message(socket);
        messages++;
        printf("Processed %d messages so far\n", messages);
    }
 
    if (nn_shutdown(socket, endpoint) < 0) {
        report_error("nn_shutdown");
    }
    puts("Shutdown completed");
}
 
int main(int argc, char **argv)
{
    subscriber(URL);
    return 0;
}

Druhý konzument se odlišuje pouze v jinak nastaveném tématu:

const char *topic = "1";

A konečně třetí konzument akceptuje všechny zprávy.

Chování jednotlivých konzumentů:

$ make run_subscriber_0
 
LD_LIBRARY_PATH=/usr/local/lib64/ ./subscriber_0
Socket created
Topic set to '0'
Endpoint connected to socket
Waiting for messages...
Received message '0886 is my top secret PIN' with length 26 bytes
Processed 1 messages so far
Received message '0492 is my top secret PIN' with length 26 bytes
Processed 2 messages so far
Received message '0027 is my top secret PIN' with length 26 bytes
Processed 3 messages so far
Received message '0059 is my top secret PIN' with length 26 bytes
Processed 4 messages so far
Received message '0540 is my top secret PIN' with length 26 bytes
Processed 5 messages so far
Received message '0925 is my top secret PIN' with length 26 bytes
Processed 6 messages so far
Received message '0336 is my top secret PIN' with length 26 bytes
Processed 7 messages so far
Received message '0846 is my top secret PIN' with length 26 bytes
Processed 8 messages so far
Received message '0545 is my top secret PIN' with length 26 bytes
Processed 9 messages so far
Received message '0364 is my top secret PIN' with length 26 bytes
Processed 10 messages so far
Received message '0012 is my top secret PIN' with length 26 bytes
Processed 11 messages so far
$ make run_subscriber_1
 
LD_LIBRARY_PATH=/usr/local/lib64/ ./subscriber_1
Socket created
Topic set to '1'
Endpoint connected to socket
Waiting for messages...
Received message '1421 is my top secret PIN' with length 26 bytes
Processed 1 messages so far
Received message '1530 is my top secret PIN' with length 26 bytes
Processed 2 messages so far
Received message '1393 is my top secret PIN' with length 26 bytes
Processed 3 messages so far
Received message '1873 is my top secret PIN' with length 26 bytes
Processed 4 messages so far
Received message '1729 is my top secret PIN' with length 26 bytes
Processed 5 messages so far
Received message '1313 is my top secret PIN' with length 26 bytes
Processed 6 messages so far
Received message '1087 is my top secret PIN' with length 26 bytes
Processed 7 messages so far
$ make run_subscriber_all
 
LD_LIBRARY_PATH=/usr/local/lib64/ ./subscriber_all
Socket created
Endpoint connected to socket
Waiting for messages...
Received message '0886 is my top secret PIN' with length 26 bytes
Processed 1 messages so far
Received message '2777 is my top secret PIN' with length 26 bytes
Processed 2 messages so far
Received message '6915 is my top secret PIN' with length 26 bytes
Processed 3 messages so far
Received message '7793 is my top secret PIN' with length 26 bytes
Processed 4 messages so far
Received message '8335 is my top secret PIN' with length 26 bytes
Processed 5 messages so far
Received message '5386 is my top secret PIN' with length 26 bytes
Processed 6 messages so far
Received message '0492 is my top secret PIN' with length 26 bytes
Processed 7 messages so far
Received message '6649 is my top secret PIN' with length 26 bytes
Processed 8 messages so far
Received message '1421 is my top secret PIN' with length 26 bytes
Processed 9 messages so far
Received message '2362 is my top secret PIN' with length 26 bytes
Processed 10 messages so far
Poznámka: můžeme vidět, že první konzument skutečně akceptuje pouze zprávy začínající na řetězec „0“, druhý konzument zprávy začínající řetězcem „1“ a třetí konzument čte všechny zprávy bez ohledu na jejich prefix.

6. Komunikační strategie Request-Response

Další velmi často používaná komunikační strategie se nazývá Request-Response nebo zkráceně REQ-REP popř. pouze REQREP. Při použití této strategie spolu komunikují dva typy uzlů – server a teoreticky neomezené množství klientů. Server přijímá požadavky (request) a odpovídá na ně (response), přičemž je možné, aby požadavky posílalo několik klientů (a jeden klient naopak může v případě potřeby posílat požadavky více serverům). Tato velmi asymetrická komunikace se strategií REQ-REP je v praxi poměrně častá, ostatně je na ní založen i známý protokol HTTP a jeho pozdější varianty (dokonce se někdy nad HTTP staví jiné komunikační strategie, což je ovšem neoptimální řešení).

Nejjednodušší forma komunikace probíhá mezi pouhými dvěma uzly, jak je to naznačeno na dalším obrázku:

Obrázek 5: Obousměrná komunikace využívající strategii REQ-REP mezi jedním klientem a jedním serverem.

Samozřejmě nám však nic nebrání v použití většího množství klientů, které se připojí k jednomu serveru:

Obrázek 6: Obousměrná komunikace využívající strategii REQ-REP mezi dvěma klienty a jedním serverem.

Poznámka: nikdo pochopitelně nebrání ani tomu, aby se jeden klient postupně připojoval na různé servery (ovšem s využitím různých socketů).

Tato komunikační strategie vyžaduje, aby server používal sockety typu NN_REP a klienti sockety typu NN_REQ. Server typicky využívá funkci nn_bind(), klienti funkci nn_connect(), protože port (či koncový bod) serveru je z pohledu administrace systému „stabilní“, na rozdíl od klientů. Komunikaci vždy iniciuje klient posláním požadavku (request); server na tento požadavek odpovídá (response). Jedná se o strategii umožňující load balancing a taktéž znovuodeslání požadavku, pokud v nastaveném čase nepřišla žádná odpověď.

7. Implementace klienta používajícího strategii Request-Response

Nejprve si ukažme, jak vypadá implementace klienta používajícího strategii REQ-REP. Ta je ve skutečnosti velmi jednoduchá, protože postačuje otevřít socket se specifikací strategie NN_REQ, poslat požadavek s využitím funkce nn_send() a následně počkat a přijmout výsledek (odpověď serveru) funkcí nn_recv(). Pro lepší přehlednost příkladu je poslání požadavku implementováno v uživatelské funkci nazvané send_request() a přijetí odpovědi serveru ve funkci nazvané receive_response().

Vytvoření socketu a připojení k serveru:

if ((socket = nn_socket(AF_SP, NN_REQ)) < 0) {
    report_error("nn_socket");
}
 
if ((endpoint = nn_connect(socket, url)) < 0) {
    report_error("nn_connect");
}

Poslání zprávy (přesněji řečeno požadavku):

void send_request(const int socket, const char *message)
{
    int bytes;
 
    printf("Sending message '%s'\n", message);
    if ((bytes = nn_send(socket, message, strlen(message)+1, 0)) < 0) {
        report_error("nn_send");
    }
    printf("Message with length %d bytes sent, flushing\n", bytes);
    sleep(1);
}

Příjem zprávy (přesněji řečeno odpovědi serveru):

void receive_response(const int socket)
{
    char *response = NULL;
    int bytes;
 
    if ((bytes = nn_recv(socket, &response, NN_MSG, 0)) < 0) {
        report_error("nn_recv");
    }
    printf("Received response '%s' with length %d bytes\n", response, bytes);
    if (nn_freemsg(response) < 0) {
        report_error("nn_freemsg");
    }
}

Úplný zdrojový kód klienta vypadá následovně:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/reqrep.h>
 
const char *URL = "ipc:///tmp/example11";
 
void report_error(const char *func)
{
    fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
    exit(1);
}
 
void send_request(const int socket, const char *message)
{
    int bytes;
 
    printf("Sending message '%s'\n", message);
    if ((bytes = nn_send(socket, message, strlen(message)+1, 0)) < 0) {
        report_error("nn_send");
    }
    printf("Message with length %d bytes sent, flushing\n", bytes);
    sleep(1);
}
 
void receive_response(const int socket)
{
    char *response = NULL;
    int bytes;
 
    if ((bytes = nn_recv(socket, &response, NN_MSG, 0)) < 0) {
        report_error("nn_recv");
    }
    printf("Received response '%s' with length %d bytes\n", response, bytes);
    if (nn_freemsg(response) < 0) {
        report_error("nn_freemsg");
    }
}
 
void client(const char *url)
{
    int socket;
    int endpoint;
 
    if ((socket = nn_socket(AF_SP, NN_REQ)) < 0) {
        report_error("nn_socket");
    }
    puts("Socket created");
 
    if ((endpoint = nn_connect(socket, url)) < 0) {
        report_error("nn_connect");
    }
    puts("Remote endpoint added to the socket");
 
    send_request(socket, "Hello from 'first'!");
 
    puts("Waiting for response...");
    receive_response(socket);
 
    if (nn_shutdown(socket, endpoint) < 0) {
        report_error("nn_shutdown");
    }
    puts("Shutdown completed");
}
 
int main(const int argc, const char **argv)
{
    client(URL);
    return 0;
}

8. Implementace serveru zpracovávajícího požadavky

Server založený na strategii REQ-REP bude používat socket typu NN_REP a pro vytvoření koncového bodu pochopitelně použijeme funkci nn_bind() a nikoli nn_connect():

if ((socket = nn_socket(AF_SP, NN_REP)) < 0) {
    report_error("nn_socket");
}
 
if ((endpoint = nn_bind(socket, url)) < 0) {
    report_error("nn_bind");
}

Zcela nejjednodušší varianta serveru přijme pouze jediný požadavek a pošle na něj odpověď zpět klientovi. Žádné další požadavky již nebudou akceptovány, protože se server odpojí:

receive_request(socket);
 
send_response(socket, "ACK!");
 
if (nn_shutdown(socket, endpoint) < 0) {
    report_error("nn_shutdown");
}

Samozřejmě si opět ukážeme úplný zdrojový kód serveru, který vypadá následovně:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/reqrep.h>
 
const char *URL = "ipc:///tmp/example11";
 
void report_error(const char *func)
{
    fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
    exit(1);
}
 
void receive_request(const int socket)
{
    char *message = NULL;
    int bytes;
 
    if ((bytes = nn_recv(socket, &message, NN_MSG, 0)) < 0) {
        report_error("nn_recv");
    }
    printf("Received message '%s' with length %d bytes\n", message, bytes);
    if (nn_freemsg(message) < 0) {
        report_error("nn_freemsg");
    }
}
 
void send_response(const int socket, const char *response)
{
    int bytes;
 
    printf("Sending response '%s'\n", response);
    if ((bytes = nn_send(socket, response, strlen(response)+1, 0)) < 0) {
        report_error("nn_send");
    }
    printf("Response with length %d bytes sent, flushing\n", bytes);
    sleep(1);
}
 
void server(const char *url)
{
    int socket;
    int endpoint;
 
    if ((socket = nn_socket(AF_SP, NN_REP)) < 0) {
        report_error("nn_socket");
    }
    puts("Socket created");
 
    if ((endpoint = nn_bind(socket, url)) < 0) {
        report_error("nn_bind");
    }
    puts("Endpoint bound to socket");
 
    puts("Waiting for message...");
    receive_request(socket);
 
    send_response(socket, "ACK!");
 
    if (nn_shutdown(socket, endpoint) < 0) {
        report_error("nn_shutdown");
    }
    puts("Shutdown completed");
}
 
int main(int argc, char **argv)
{
    server(URL);
    return 0;
}

9. Server zpracovávající dotazy v jediném vláknu

Pochopitelně nám nic nebrání ve vytvoření serveru, který bude jednotlivé dotazy zpracovávat postupně a v takovém pořadí, v jakém je bude přijímat. Prozatím nepoužijeme žádnou formu multitaskingu ani multithreadingu, protože toto řešení není ideální – výhodnější je použít takzvaná zařízení (device).

Zdrojový kód takto upraveného serveru naleznete na adrese https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/13_req_res_single_thre­ad/server.c:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/reqrep.h>
 
const char *URL = "ipc:///tmp/example12";
 
void report_error(const char *func)
{
    fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
    exit(1);
}
 
void receive_request(socket)
{
    char *message = NULL;
    int bytes;
 
    if ((bytes = nn_recv(socket, &message, NN_MSG, 0)) < 0) {
        report_error("nn_recv");
    }
    printf("Received message '%s' with length %d bytes\n", message, bytes);
    if (nn_freemsg(message) < 0) {
        report_error("nn_freemsg");
    }
}
 
void send_response(const int socket, const char *response)
{
    int bytes;
 
    printf("Sending response '%s'\n", response);
    if ((bytes = nn_send(socket, response, strlen(response)+1, 0)) < 0) {
        report_error("nn_send");
    }
    printf("Response with length %d bytes sent, flushing\n", bytes);
    sleep(1);
}
 
void server(const char *url)
{
    int socket;
    int endpoint;
 
    if ((socket = nn_socket(AF_SP, NN_REP)) < 0) {
        report_error("nn_socket");
    }
    puts("Socket created");
 
    if ((endpoint = nn_bind(socket, url)) < 0) {
        report_error("nn_bind");
    }
    puts("Endpoint bound to socket");
 
    puts("Waiting for messages...");
    while (1) {
        receive_request(socket);
        send_response(socket, "ACK!");
    }
 
    if (nn_shutdown(socket, endpoint) < 0) {
        report_error("nn_shutdown");
    }
    puts("Shutdown completed");
}
 
int main(int argc, char **argv)
{
    server(URL);
    return 0;
}

10. Chování serveru při zpracování velkého množství dotazů

Na rozdíl od strategie PUB-SUB by se při použití strategie REQ-REP neměly žádné zprávy ztrácet. Můžeme se o tom snadno přesvědčit úpravou klienta i serveru.

Klient, jehož zdrojový kód je možné nalézt na adrese https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/13_req_res_single_thre­ad/client.c, je nakonfigurován tak, aby poslal celkem 100 zpráv:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/reqrep.h>
 
const int MESSAGES = 100;
 
const char *URL = "ipc:///tmp/example13";
 
void report_error(const char *func)
{
    fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
    exit(1);
}
 
void send_request(const int socket, const char *message)
{
    int bytes;
 
    printf("Sending message '%s'\n", message);
    if ((bytes = nn_send(socket, message, strlen(message)+1, 0)) < 0) {
        report_error("nn_send");
    }
    printf("Message with length %d bytes sent, flushing\n", bytes);
}
 
void receive_response(socket)
{
    char *response = NULL;
    int bytes;
 
    if ((bytes = nn_recv(socket, &response, NN_MSG, 0)) < 0) {
        report_error("nn_recv");
    }
    printf("Received response '%s' with length %d bytes\n", response, bytes);
    if (nn_freemsg(response) < 0) {
        report_error("nn_freemsg");
    }
}
 
void client(const char *url)
{
    int socket;
    int endpoint;
    int i;
 
    if ((socket = nn_socket(AF_SP, NN_REQ)) < 0) {
        report_error("nn_socket");
    }
    puts("Socket created");
 
    if ((endpoint = nn_connect(socket, url)) < 0) {
        report_error("nn_connect");
    }
    puts("Remote endpoint added to the socket");
 
    for (i=0; i < MESSAGES; i++) {
        send_request(socket, "Hello from 'first'!");
 
        puts("Waiting for response...");
        receive_response(socket);
    }
 
    if (nn_shutdown(socket, endpoint) < 0) {
        report_error("nn_shutdown");
    }
    puts("Shutdown completed");
}
 
int main(const int argc, const char **argv)
{
    client(URL);
    return 0;
}

Server tyto zprávy bude přijímat a současně vypíše, kolik zpráv již dokázal zpracovat:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/reqrep.h>
 
const char *URL = "ipc:///tmp/example13";
 
void report_error(const char *func)
{
    fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
    exit(1);
}
 
void receive_request(socket)
{
    char *message = NULL;
    int bytes;
 
    if ((bytes = nn_recv(socket, &message, NN_MSG, 0)) < 0) {
        report_error("nn_recv");
    }
    printf("Received message '%s' with length %d bytes\n", message, bytes);
    if (nn_freemsg(message) < 0) {
        report_error("nn_freemsg");
    }
}
 
void send_response(const int socket, const char *response)
{
    int bytes;
 
    printf("Sending response '%s'\n", response);
    if ((bytes = nn_send(socket, response, strlen(response)+1, 0)) < 0) {
        report_error("nn_send");
    }
    printf("Response with length %d bytes sent, flushing\n", bytes);
}
 
void server(const char *url)
{
    int socket;
    int endpoint;
    int messages;
 
    if ((socket = nn_socket(AF_SP, NN_REP)) < 0) {
        report_error("nn_socket");
    }
    puts("Socket created");
 
    if ((endpoint = nn_bind(socket, url)) < 0) {
        report_error("nn_bind");
    }
    puts("Endpoint bound to socket");
 
    puts("Waiting for messages...");
    while (1) {
        receive_request(socket);
        send_response(socket, "ACK!");
        messages++;
        printf("Processed %d messages so far\n", messages);
    }
 
    if (nn_shutdown(socket, endpoint) < 0) {
        report_error("nn_shutdown");
    }
    puts("Shutdown completed");
}
 
int main(int argc, char **argv)
{
    server(URL);
    return 0;
}

Chování klienta:

Chování serveru:

$ make run_server
 
LD_LIBRARY_PATH=/usr/local/lib64/ ./server
Socket created
Endpoint bound to socket
Waiting for messages...
Received message 'Hello from 'first'!' with length 20 bytes
Sending response 'ACK!'
Response with length 5 bytes sent, flushing
Processed 1 messages so far
Received message 'Hello from 'first'!' with length 20 bytes
Sending response 'ACK!'
...
...
...
Processed 97 messages so far
Received message 'Hello from 'first'!' with length 20 bytes
Sending response 'ACK!'
Response with length 5 bytes sent, flushing
Processed 98 messages so far
Received message 'Hello from 'first'!' with length 20 bytes
Sending response 'ACK!'
Response with length 5 bytes sent, flushing
Processed 99 messages so far
Received message 'Hello from 'first'!' with length 20 bytes
Sending response 'ACK!'
Response with length 5 bytes sent, flushing
Processed 100 messages so far

Chování klienta:

$ make run_client 
 
LD_LIBRARY_PATH=/usr/local/lib64/ ./client
Socket created
Remote endpoint added to the socket
Sending message 'Hello from 'first'!'
Message with length 20 bytes sent, flushing
Waiting for response...
Received response 'ACK!' with length 5 bytes
Sending message 'Hello from 'first'!'
Message with length 20 bytes sent, flushing
Waiting for response...
...
...
...
Poznámka: při komunikaci tímto způsobem nedochází ke ztrátám zpráv, klient ovšem v některých případech musí déle čekat na akceptaci zprávy a poslání odpovědi.

11. Komunikační strategie Survey

V pořadí již pátou komunikační strategií, se kterou se dnes seznámíme, je strategie nazvaná Survey. Jedná se o dosti příhodný název, protože v této strategii vystupuje jeden uzel, který se dotáže ostatních uzlů například na jejich stav a uzly následně mohou odpovídat. Dotazující se uzel má nastavenu dobu čekání, takže může jednotlivé odpovědi zaznamenat a po uplynutí zadaného časového okamžiku zjistit, které uzly vůbec neodpověděly (nebo to nestihly, což je ovšem z pohledu dotazujícího to samé):

Obrázek 7: Komunikační strategie Survey.

Tato strategie je v praxi velmi užitečná, protože umožňuje například sledování velkého množství čidel, počítačů v clusteru atd., aniž by tato zařízení musela aktivně někam posílat svůj stav, typicky s využitím strategie PUB-SUB. Nevýhodou může být nárazový způsob využívání sítě, to ovšem již do určité míry záleží na její topologii.

12. Implementace klienta

Implementace klienta odpovídajícího na otázky serveru je relativně jednoduchá a nalezneme ji na adrese https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/15_survey/client.c. Povšimněte si, že klient v odpovědi pošle pseudonáhodné číslo:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/survey.h>
 
const char *URL = "ipc:///tmp/example14";
 
void report_error(const char *func)
{
    fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
    exit(1);
}
 
void receive_question(const int socket)
{
    char *message = NULL;
    int bytes;
 
    if ((bytes = nn_recv(socket, &message, NN_MSG, 0)) < 0) {
        report_error("nn_recv");
    }
    printf("Received message '%s' with length %d bytes\n", message, bytes);
    if (nn_freemsg(message) < 0) {
        report_error("nn_freemsg");
    }
}
 
void send_answer(const int socket, const char *answer)
{
    int bytes;
 
    printf("Sending answer '%s'\n", answer);
    if ((bytes = nn_send(socket, answer, strlen(answer)+1, 0)) < 0) {
        report_error("nn_send");
    }
    printf("Answer with length %d bytes sent, flushing\n", bytes);
}
 
#define ANSWER_LENGTH 100
 
void client(const char *url)
{
    int socket;
    int endpoint;
    char answer[ANSWER_LENGTH];
    int number;
 
    if ((socket = nn_socket(AF_SP, NN_RESPONDENT)) < 0) {
        report_error("nn_socket");
    }
    puts("Socket created");
 
    if ((endpoint = nn_connect(socket, url)) < 0) {
        report_error("nn_connect");
    }
    puts("Remote endpoint added to the socket");
 
    while (1) {
        receive_question(socket);
        puts("Question received");
 
        /* nemame vypocetni vykon Hlubiny mysleni... */
        srand((unsigned) getpid());
        number = rand() % 100;
 
        snprintf(answer, ANSWER_LENGTH, "It must be %d", number);
        send_answer(socket, answer);
        puts("Answer sent");
    }
 
    if (nn_shutdown(socket, endpoint) < 0) {
        report_error("nn_shutdown");
    }
    puts("Shutdown completed");
}
 
int main(const int argc, const char **argv)
{
    client(URL);
    return 0;
}

13. Implementace serveru

Server, který implementuje strategii Survey, nalezneme na adrese https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/15_survey/server.c. Jeho základem je tento kód posílající dotaz a očekávající libovolné množství odpovědí:

send_survey(socket, "What do you get when you multiply six by nine?");
puts("Survey send, waiting for answers...");
 
while (1) {
    receive_answer(socket);
    answers++;
    printf("Processed %d answers so far\n", answers);
}

Následuje výpis úplného zdrojového kódu tohoto příkladu:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/survey.h>
 
const char *URL = "ipc:///tmp/example14";
 
void report_error(const char *func)
{
    fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
    exit(1);
}
 
void send_survey(const int socket, const char *message)
{
    int bytes;

    printf("Sending message '%s'\n", message);
    if ((bytes = nn_send(socket, message, strlen(message)+1, 0)) < 0) {
        report_error("nn_send");
    }
    printf("Message with length %d bytes sent, flushing\n", bytes);
}
 
void receive_answer(const int socket)
{
    char *response = NULL;
    int bytes;
 
    if ((bytes = nn_recv(socket, &response, NN_MSG, 0)) < 0) {
        report_error("nn_recv");
    }
    printf("Received answer '%s' with length %d bytes\n", response, bytes);
    if (nn_freemsg(response) < 0) {
        report_error("nn_freemsg");
    }
}
 
void wait_for_clients(int seconds)
{
    int i;
    puts("Waiting for clients to connect...");
    for (i=10; i>0; i--) {
        printf("%d  ", i);
        fflush(stdout);
        sleep(1);
    }
    puts("\nDone");
}
 
void server(const char *url)
{
    int socket;
    int endpoint;
    int answers;
 
    if ((socket = nn_socket(AF_SP, NN_SURVEYOR)) < 0) {
        report_error("nn_socket");
    }
    puts("Socket created");
 
    if ((endpoint = nn_bind(socket, url)) < 0) {
        report_error("nn_bind");
    }
    puts("Endpoint bound to socket");
 
    wait_for_clients(10);
 
    send_survey(socket, "What do you get when you multiply six by nine?");
    puts("Survey send, waiting for answers...");
 
    while (1) {
        receive_answer(socket);
        answers++;
        printf("Processed %d answers so far\n", answers);
    }
 
    if (nn_shutdown(socket, endpoint) < 0) {
        report_error("nn_shutdown");
    }
    puts("Shutdown completed");
}
 
int main(int argc, char **argv)
{
    server(URL);
    return 0;
}

14. Ukázka poslání otázky a zpracování odpovědí

Podívejme se nyní na to, jakým způsobem bude pracovat server. Nejprve se spustí a počká deset sekund na připojení klientů (ty spustíme například v paralelně běžících terminálech):

Socket created
Endpoint bound to socket
Waiting for clients to connect...
10  9  8  7  6  5  4  3  2  1
Done

Následně server pošle dotaz, a to všem aktuálně připojeným klientům. Ihned po poslání dotazu začne očekávat jednotlivé odpovědi:

Sending message 'What do you get when you multiply six by nine?'
Message with length 47 bytes sent, flushing
Survey send, waiting for answers...

Odpovědi skutečně dostane, a to v našem konkrétním případě tři (od třech klientů):

Received answer 'It must be 99' with length 14 bytes
Processed 1 answers so far
Received answer 'It must be 88' with length 14 bytes
Processed 2 answers so far
Received answer 'It must be 73' with length 14 bytes
Processed 3 answers so far

Následně ovšem již další odpovědi nepřijdou a dojde k detekovatelné chybě (timeout):

nn_recv: Connection timed out
make: *** [Makefile:34: run_server] Error 1
Poznámka: v praxi by samozřejmě bylo možné, aby se server z timeoutu nějakým způsobem „vzpamatoval“ a pokračoval v další činnosti.

15. Komunikační strategie Bus

Poslední komunikační strategie, s níž se seznámíme a která je knihovnou nanomsg podporována, se jmenuje Bus neboli sběrnice. Tato strategie obecně podporuje komunikaci libovolného uzlu s ostatními uzly, ovšem musíme si uvědomit, že zde není k dispozici jediný centrální message broker ani skutečný message bus. Efektu sběrnice se dosahuje takovým způsobem, že se jednotlivé uzly propojí mezi sebou, což je sice nejobecnější schéma, ovšem taktéž nejvíce náchylné na různé neočekávané výpadky a rozpady připojení. Taktéž samotná konfigurace aplikace může být problematická, protože jednotlivé uzly o sobě musí navzájem vědět, přesněji řečeno musí být k dispozici konfigurační mechanismus popř. mechanismus nalézání uzlů. V demonstračním příkladu z tohoto důvodu pro jednoduchost použijeme pouze dva uzly, přičemž jeden bude zprávy pouze přijímat a druhý vysílat.

Poznámka: v praxi se většinou uzly musí propojit složitějším způsobem a většinou není zcela zřejmé, který uzel má používat nn_bind() a který nn_connect(). Z hlediska administrace bývá výhodnější namísto sběrnice použít jinou architekturu s centrálním uzlem – message brokerem –, pokud tomu ovšem nezabraňují požadavky na rychlost posílání zpráv či struktura sítě.

16. Implementace prvního uzlu

První uzel, jehož úplný zdrojový kód je uložen na adrese https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/16_bus/node1.c, pouze na svém portu očekává zprávy. Je tedy pasivní a sám o sobě žádné zprávy dalším uzlům neposílá:

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/bus.h>
 
const char *URL = "ipc:///tmp/example14";
 
void report_error(const char *func)
{
    fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
    exit(1);
}
 
void receive_message(const int socket)
{
    char *message = NULL;
    int bytes;
 
    if ((bytes = nn_recv(socket, &message, NN_MSG, 0)) < 0) {
        report_error("nn_recv");
    }
    printf("Received message '%s' with length %d bytes\n", message, bytes);
    if (nn_freemsg(message) < 0) {
        report_error("nn_freemsg");
    }
}
 
void node1(const char *url)
{
    int socket;
    int endpoint;
    int messages = 0;
 
    if ((socket = nn_socket(AF_SP, NN_BUS)) < 0) {
        report_error("nn_socket");
    }
    puts("Socket created");
 
    if ((endpoint = nn_bind(socket, url)) < 0) {
        report_error("nn_bind");
    }
    printf("Remote endpoint %s bound to the socket\n", url);
 
    puts("Waiting for messages...");
 
    while (1) {
        receive_message(socket);
        messages++;
        printf("Processed %d messages so far\n", messages);
    }
 
    if (nn_shutdown(socket, endpoint) < 0) {
        report_error("nn_shutdown");
    }
    puts("Shutdown completed");
}
 
int main(const int argc, const char **argv)
{
    node1(URL);
    return 0;
}

17. Implementace druhého uzlu

Druhý uzel naopak zprávy posílá, a to uzlu prvnímu. Zdrojový kód implementace tohoto uzlu naleznete zde:

ict ve školství 24

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nanomsg/nn.h>
#include <nanomsg/bus.h>
 
const char *URL = "ipc:///tmp/example14";
 
void report_error(const char *func)
{
    fprintf(stderr, "%s: %s\n", func, nn_strerror(nn_errno()));
    exit(1);
}
 
void send_message(const int socket, const char *message)
{
    int bytes;
 
    printf("Sending message '%s'\n", message);
    if ((bytes = nn_send(socket, message, strlen(message)+1, 0)) < 0) {
        report_error("nn_send");
    }
    printf("Message with length %d bytes sent, flushing\n", bytes);
    sleep(1);
}
 
#define BUF_LEN 100
 
void node2(const char *url)
{
    int socket;
    int endpoint;
    char buffer[BUF_LEN];
    int i;
 
    if ((socket = nn_socket(AF_SP, NN_BUS)) < 0) {
        report_error("nn_socket");
    }
    puts("Socket created");
 
    endpoint = nn_connect(socket, url);
    if (endpoint < 0) {
        report_error("nn_connect");
    }
    printf("Connected to the remote %s endpoint\n", url);
    sleep(1);
 
    for (i=0; i<10; i++) {
        sprintf(buffer, "Message #%d from node2", i);
        send_message(socket, buffer);
    }
 
    if (nn_shutdown(socket, endpoint) < 0) {
        report_error("nn_shutdown");
    }
    puts("Shutdown completed");
}
 
int main(const int argc, const char **argv)
{
    node2(URL);
    return 0;
}

18. Repositář s demonstračními příklady

Zdrojové kódy všech dnes popsaných demonstračních příkladů vyvinutých v programovacím jazyku Go byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce. Každý příklad se skládá ze dvou samostatně překládaných a spouštěných souborů – producenta zpráv a konzumenta zpráv:

Příklad Skript/kód Popis Cesta
1 publisher.c producent zpráv https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/07_publisher_subscriber_1000_mes­sages/publisher.c
1 subscriber.c konzument zpráv https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/07_publisher_subscriber_1000_mes­sages/subscriber.c
1 Makefile soubor pro překlad i spuštění jednotlivých částí aplikace https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/07_publisher_subscriber_1000_mes­sages/Makefile
       
2 publisher.c producent zpráv bez pauz mezi jejich posíláním https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/08_publisher_subscriber_no_sle­ep/publisher.c
2 subscriber.c konzument zpráv https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/08_publisher_subscriber_no_sle­ep/subscriber.c
2 Makefile soubor pro překlad i spuštění jednotlivých částí aplikace https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/08_publisher_subscriber_no_sle­ep/Makefile
       
3 publisher.c producent zpráv s pauzami mezi jejich posíláním https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/09_pub_sub_tiny_sleep/pu­blisher.c
3 subscriber.c konzument zpráv https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/09_pub_sub_tiny_sleep/sub­scriber.c
3 Makefile soubor pro překlad i spuštění jednotlivých částí aplikace https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/09_pub_sub_tiny_sleep/Ma­kefile
       
4 publisher.c producent zpráv s pauzami mezi jejich posíláním https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/10_pub_sub_tiny_sleep_pro­per_init/publisher.c
4 subscriber.c konzument zpráv, korektní inicializace https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/10_pub_sub_tiny_sleep_pro­per_init/subscriber.c
4 Makefile soubor pro překlad i spuštění jednotlivých částí aplikace https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/10_pub_sub_tiny_sleep_pro­per_init/Makefile
       
5 publisher.c producent zpráv https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/11_pub_sub_topics/publisher­.c
5 subscriber0.c příjemce zpráv se zadaným tématem https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/11_pub_sub_topics/subscri­ber0.c
5 subscriber1.c příjemce zpráv se zadaným tématem https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/11_pub_sub_topics/subscri­ber1.c
5 subscriber_all.c příjemce zpráv se všemi tématy https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/11_pub_sub_topics/subscri­ber_all.c
5 Makefile soubor pro překlad i spuštění jednotlivých částí aplikace https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/11_pub_sub_topics/Make­file
       
6 client.c klient používající ke komunikaci strategii REQ-REP https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/12_req_res/client.c
6 server.c server používající ke komunikaci strategii REQ-REP https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/12_req_res/server.c
6 Makefile soubor pro překlad i spuštění jednotlivých částí aplikace https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/12_req_res/Makefile
       
7 client.c klient používající ke komunikaci strategii REQ-REP https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/13_req_res_single_thre­ad/client.c
7 server.c server běžící v jednom vlákně https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/13_req_res_single_thre­ad/server.c
7 Makefile soubor pro překlad i spuštění jednotlivých částí aplikace https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/13_req_res_single_thre­ad/Makefile
       
8 client.c klient – benchmark https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/14_req_resp_fast/clien­t.c
8 server.c server použitý pro benchmark https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/14_req_resp_fast/server­.c
8 Makefile soubor pro překlad i spuštění jednotlivých částí aplikace https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/14_req_resp_fast/Makefi­le
       
9 client.c klient odpovídající na dotazy ve strategii Survey https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/15_survey/client.c
9 server.c server posílající dotaze vy strategii Survey https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/15_survey/server.c
9 Makefile soubor pro překlad i spuštění jednotlivých částí aplikace https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/15_survey/Makefile
       
10 node1.c první uzel využívající strategii Bus https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/16_bus/node1.c
10 node2.c druhý uzel využívající strategii Bus https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/16_bus/node2.c
10 Makefile soubor pro překlad i spuštění jednotlivých částí aplikace https://github.com/tisnik/message-queues-examples/blob/master/nanom­sg/16_bus/Makefile

19. Odkazy na předchozí části seriálu o message brokerech

V této kapitole jsou uvedeny odkazy na všech šestnáct předchozích částí seriálu, v němž se zabýváme různými způsoby implementace front zpráv a k nim přidružených technologií message brokerů:

  1. Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí
    https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/
  2. Celery: systém implementující asynchronní fronty úloh pro Python
    https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python/
  3. Celery: systém implementující asynchronní fronty úloh pro Python (dokončení)
    https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python-dokonceni/
  4. RabbitMQ: jedna z nejúspěšnějších implementací brokera
    https://www.root.cz/clanky/rabbitmq-jedna-z-nejuspesnejsich-implementaci-brokera/
  5. Pokročilejší operace nabízené systémem RabbitMQ
    https://www.root.cz/clanky/po­krocilejsi-operace-nabizene-systemem-rabbitmq/
  6. ØMQ: knihovna pro asynchronní předávání zpráv
    https://www.root.cz/clanky/0mq-knihovna-pro-asynchronni-predavani-zprav/
  7. Další možnosti poskytované knihovnou ØMQ
    https://www.root.cz/clanky/dalsi-moznosti-poskytovane-knihovnou-mq/
  8. Využití zařízení v knihovně ØMQ při tvorbě systému se složitější architekturou
    https://www.root.cz/clanky/vyuziti-zarizeni-v-knihovne-mq-pri-tvorbe-systemu-se-slozitejsi-architekturou/
  9. Další možnosti nabízené knihovnou ØMQ, implementace protokolů ØMQ v čisté Javě
    https://www.root.cz/clanky/dalsi-moznosti-nabizene-knihovnou-mq-implementace-protokolu-mq-v-ciste-jave/
  10. Apache ActiveMQ – další systém implementující message brokera
    https://www.root.cz/clanky/apache-activemq-dalsi-system-implementujici-message-brokera/
  11. Použití Apache ActiveMQ s protokolem STOMP
    https://www.root.cz/clanky/pouziti-apache-activemq-s-protokolem-stomp/
  12. Použití Apache ActiveMQ s protokolem AMQP, jazyk Go a message brokeři
    https://www.root.cz/clanky/pouziti-apache-activemq-s-protokolem-amqp-jazyk-go-a-message-brokeri/
  13. Komunikace s message brokery z programovacího jazyka Go
    https://www.root.cz/clanky/komunikace-s-message-brokery-z-programovaciho-jazyka-go/
  14. Použití message brokeru NATS
    https://www.root.cz/clanky/pouziti-message-brokeru-nats/
  15. NATS Streaming Server
    https://www.root.cz/clanky/nats-streaming-server/
  16. Implementace různých komunikačních strategií s využitím knihovny nanomsg
    https://www.root.cz/clanky/im­plementace-ruznych-komunikacnich-strategii-s-vyuzitim-knihovny-nanomsg/

20. Odkazy na Internetu

  1. nanomsg na GitHubu
    https://github.com/nanomsg/nanomsg
  2. Referenční příručka knihovny nanomsg
    https://nanomsg.org/v1.1.5/na­nomsg.html
  3. nng (nanomsg-next-generation)
    https://github.com/nanomsg/nng
  4. Differences between nanomsg and ZeroMQ
    https://nanomsg.org/documentation-zeromq.html
  5. NATS
    https://nats.io/about/
  6. NATS Streaming Concepts
    https://nats.io/documenta­tion/streaming/nats-streaming-intro/
  7. NATS Streaming Server
    https://nats.io/download/nats-io/nats-streaming-server/
  8. NATS Introduction
    https://nats.io/documentation/
  9. NATS Client Protocol
    https://nats.io/documenta­tion/internals/nats-protocol/
  10. NATS Messaging (Wikipedia)
    https://en.wikipedia.org/wi­ki/NATS_Messaging
  11. Stránka Apache Software Foundation
    http://www.apache.org/
  12. Informace o portu 5672
    http://www.tcp-udp-ports.com/port-5672.htm
  13. Třída MessagingHandler knihovny Qpid Proton
    https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/pro­ton._handlers.MessagingHan­dler-class.html
  14. Třída Event knihovny Qpid Proton
    https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/pro­ton._events.Event-class.html
  15. package stomp (Go)
    https://godoc.org/github.com/go-stomp/stomp
  16. Go language library for STOMP protocol
    https://github.com/go-stomp/stomp
  17. python-qpid-proton 0.26.0 na PyPi
    https://pypi.org/project/python-qpid-proton/
  18. Qpid Proton
    http://qpid.apache.org/proton/
  19. Using the AMQ Python Client
    https://access.redhat.com/do­cumentation/en-us/red_hat_amq/7.1/html-single/using_the_amq_python_client/
  20. Apache ActiveMQ
    http://activemq.apache.org/
  21. Apache ActiveMQ Artemis
    https://activemq.apache.org/artemis/
  22. Apache ActiveMQ Artemis User Manual
    https://activemq.apache.or­g/artemis/docs/latest/index­.html
  23. KahaDB
    http://activemq.apache.or­g/kahadb.html
  24. Understanding the KahaDB Message Store
    https://access.redhat.com/do­cumentation/en-US/Fuse_MQ_Enterprise/7.1/html/Con­figuring_Broker_Persisten­ce/files/KahaDBOverview.html
  25. Command Line Tools (Apache ActiveMQ)
    https://activemq.apache.org/activemq-command-line-tools-reference.html
  26. stomp.py 4.1.21 na PyPi
    https://pypi.org/project/stomp.py/
  27. Stomp Tutorial
    https://access.redhat.com/do­cumentation/en-US/Fuse_Message_Broker/5.5/html/Con­nectivity_Guide/files/FMBCon­nectivityStompTelnet.html
  28. Heartbeat (computing)
    https://en.wikipedia.org/wi­ki/Heartbeat_(computing)
  29. Apache Camel
    https://camel.apache.org/
  30. Red Hat Fuse
    https://developers.redhat­.com/products/fuse/overvi­ew/
  31. Confusion between ActiveMQ and ActiveMQ-Artemis?
    https://serverfault.com/qu­estions/873533/confusion-between-activemq-and-activemq-artemis
  32. Staré stránky projektu HornetQ
    http://hornetq.jboss.org/
  33. Snapshot JeroMQ verze 0.4.4
    https://oss.sonatype.org/con­tent/repositories/snapshot­s/org/zeromq/jeromq/0.4.4-SNAPSHOT/
  34. Difference between ActiveMQ vs Apache ActiveMQ Artemis
    http://activemq.2283324.n4­.nabble.com/Difference-between-ActiveMQ-vs-Apache-ActiveMQ-Artemis-td4703828.html
  35. Microservices communications. Why you should switch to message queues
    https://dev.to/matteojoli­veau/microservices-communications-why-you-should-switch-to-message-queues–48ia
  36. Stomp.py 4.1.19 documentation
    https://stomppy.readthedoc­s.io/en/stable/
  37. Repositář knihovny JeroMQ
    https://github.com/zeromq/jeromq/
  38. ØMQ – Distributed Messaging
    http://zeromq.org/
  39. ØMQ Community
    http://zeromq.org/community
  40. Get The Software
    http://zeromq.org/intro:get-the-software
  41. PyZMQ Documentation
    https://pyzmq.readthedocs­.io/en/latest/
  42. Module: zmq.decorators
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.deco­rators.html
  43. ZeroMQ is the answer, by Ian Barber
    https://vimeo.com/20605470
  44. ZeroMQ RFC
    https://rfc.zeromq.org/
  45. ZeroMQ and Clojure, a brief introduction
    https://antoniogarrote.wor­dpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/
  46. zeromq/czmq
    https://github.com/zeromq/czmq
  47. golang wrapper for CZMQ
    https://github.com/zeromq/goczmq
  48. ZeroMQ version reporting in Python
    http://zguide.zeromq.org/py:version
  49. A Go interface to ZeroMQ version 4
    https://github.com/pebbe/zmq4
  50. Broker vs. Brokerless
    http://zeromq.org/whitepa­pers:brokerless
  51. Learning ØMQ with pyzmq
    https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/
  52. Céčková funkce zmq_ctx_new
    http://api.zeromq.org/4–2:zmq-ctx-new
  53. Céčková funkce zmq_ctx_destroy
    http://api.zeromq.org/4–2:zmq-ctx-destroy
  54. Céčková funkce zmq_bind
    http://api.zeromq.org/4–2:zmq-bind
  55. Céčková funkce zmq_unbind
    http://api.zeromq.org/4–2:zmq-unbind
  56. Céčková C funkce zmq_connect
    http://api.zeromq.org/4–2:zmq-connect
  57. Céčková C funkce zmq_disconnect
    http://api.zeromq.org/4–2:zmq-disconnect
  58. Céčková C funkce zmq_send
    http://api.zeromq.org/4–2:zmq-send
  59. Céčková C funkce zmq_recv
    http://api.zeromq.org/4–2:zmq-recv
  60. Třída Context (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#con­text
  61. Třída Socket (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#soc­ket
  62. Python binding
    http://zeromq.org/bindings:python
  63. Why should I have written ZeroMQ in C, not C++ (part I)
    http://250bpm.com/blog:4
  64. Why should I have written ZeroMQ in C, not C++ (part II)
    http://250bpm.com/blog:8
  65. About Nanomsg
    https://nanomsg.org/
  66. Advanced Message Queuing Protocol
    https://www.amqp.org/
  67. Advanced Message Queuing Protocol na Wikipedii
    https://en.wikipedia.org/wi­ki/Advanced_Message_Queuin­g_Protocol
  68. Dokumentace k příkazu rabbitmqctl
    https://www.rabbitmq.com/rab­bitmqctl.8.html
  69. RabbitMQ
    https://www.rabbitmq.com/
  70. RabbitMQ Tutorials
    https://www.rabbitmq.com/get­started.html
  71. RabbitMQ: Clients and Developer Tools
    https://www.rabbitmq.com/dev­tools.html
  72. RabbitMQ na Wikipedii
    https://en.wikipedia.org/wi­ki/RabbitMQ
  73. Streaming Text Oriented Messaging Protocol
    https://en.wikipedia.org/wi­ki/Streaming_Text_Oriented_Mes­saging_Protocol
  74. Message Queuing Telemetry Transport
    https://en.wikipedia.org/wiki/MQTT
  75. Erlang
    http://www.erlang.org/
  76. pika 0.12.0 na PyPi
    https://pypi.org/project/pika/
  77. Introduction to Pika
    https://pika.readthedocs.i­o/en/stable/
  78. Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
    http://clojurerabbitmq.info/
  79. AMQP 0–9–1 Model Explained
    http://www.rabbitmq.com/tutorials/amqp-concepts.html
  80. Part 1: RabbitMQ for beginners – What is RabbitMQ?
    https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html
  81. Downloading and Installing RabbitMQ
    https://www.rabbitmq.com/dow­nload.html
  82. celery na PyPi
    https://pypi.org/project/celery/
  83. Databáze Redis (nejenom) pro vývojáře používající Python
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/
  84. Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/
  85. Redis Queue (RQ)
    https://www.fullstackpython.com/redis-queue-rq.html
  86. Python Celery & RabbitMQ Tutorial
    https://tests4geeks.com/python-celery-rabbitmq-tutorial/
  87. Flower: Real-time Celery web-monitor
    http://docs.celeryproject­.org/en/latest/userguide/mo­nitoring.html#flower-real-time-celery-web-monitor
  88. Asynchronous Tasks With Django and Celery
    https://realpython.com/asynchronous-tasks-with-django-and-celery/
  89. First Steps with Celery
    http://docs.celeryproject­.org/en/latest/getting-started/first-steps-with-celery.html
  90. node-celery
    https://github.com/mher/node-celery
  91. Full Stack Python: web development
    https://www.fullstackpython.com/web-development.html
  92. Introducing RQ
    https://nvie.com/posts/introducing-rq/
  93. Asynchronous Tasks with Flask and Redis Queue
    https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue
  94. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  95. Stránky projektu Redis
    https://redis.io/
  96. Introduction to Redis
    https://redis.io/topics/introduction
  97. Try Redis
    http://try.redis.io/
  98. Redis tutorial, April 2010 (starší, ale pěkně udělaný)
    https://static.simonwilli­son.net/static/2010/redis-tutorial/
  99. Python Redis
    https://redislabs.com/lp/python-redis/
  100. Redis: key-value databáze v paměti i na disku
    https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/
  101. Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
    http://www.cloudsvet.cz/?p=253
  102. Praktický úvod do Redis (2): transakce
    http://www.cloudsvet.cz/?p=256
  103. Praktický úvod do Redis (3): cluster
    http://www.cloudsvet.cz/?p=258
  104. Connection pool
    https://en.wikipedia.org/wi­ki/Connection_pool
  105. Instant Redis Sentinel Setup
    https://github.com/ServiceStack/redis-config
  106. How to install REDIS in LInux
    https://linuxtechlab.com/how-install-redis-server-linux/
  107. Redis RDB Dump File Format
    https://github.com/sripat­hikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format
  108. Lempel–Ziv–Welch
    https://en.wikipedia.org/wi­ki/Lempel%E2%80%93Ziv%E2%80%93­Welch
  109. Redis Persistence
    https://redis.io/topics/persistence
  110. Redis persistence demystified
    http://oldblog.antirez.com/post/redis-persistence-demystified.html
  111. Redis reliable queues with Lua scripting
    http://oldblog.antirez.com/post/250
  112. Ost (knihovna)
    https://github.com/soveran/ost
  113. NoSQL
    https://en.wikipedia.org/wiki/NoSQL
  114. Shard (database architecture)
    https://en.wikipedia.org/wi­ki/Shard_%28database_archi­tecture%29
  115. What is sharding and why is it important?
    https://stackoverflow.com/qu­estions/992988/what-is-sharding-and-why-is-it-important
  116. What Is Sharding?
    https://btcmanager.com/what-sharding/
  117. Redis clients
    https://redis.io/clients
  118. Category:Lua-scriptable software
    https://en.wikipedia.org/wi­ki/Category:Lua-scriptable_software
  119. Seriál Programovací jazyk Lua
    https://www.root.cz/seria­ly/programovaci-jazyk-lua/
  120. Redis memory usage
    http://nosql.mypopescu.com/pos­t/1010844204/redis-memory-usage
  121. Ukázka konfigurace Redisu pro lokální testování
    https://github.com/tisnik/pre­sentations/blob/master/re­dis/redis.conf
  122. Resque
    https://github.com/resque/resque
  123. Nested transaction
    https://en.wikipedia.org/wi­ki/Nested_transaction
  124. Publish–subscribe pattern
    https://en.wikipedia.org/wi­ki/Publish%E2%80%93subscri­be_pattern
  125. Messaging pattern
    https://en.wikipedia.org/wi­ki/Messaging_pattern
  126. Using pipelining to speedup Redis queries
    https://redis.io/topics/pipelining
  127. Pub/Sub
    https://redis.io/topics/pubsub
  128. ZeroMQ distributed messaging
    http://zeromq.org/
  129. ZeroMQ: Modern & Fast Networking Stack
    https://www.igvita.com/2010/09/03/ze­romq-modern-fast-networking-stack/
  130. Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
    https://stackoverflow.com/qu­estions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr
  131. Python & Redis PUB/SUB
    https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7
  132. Message broker
    https://en.wikipedia.org/wi­ki/Message_broker
  133. RESP Arrays
    https://redis.io/topics/protocol#array-reply
  134. Redis Protocol specification
    https://redis.io/topics/protocol
  135. Redis Pub/Sub: Intro Guide
    https://www.redisgreen.net/blog/pubsub-intro/
  136. Redis Pub/Sub: Howto Guide
    https://www.redisgreen.net/blog/pubsub-howto/
  137. Comparing Publish-Subscribe Messaging and Message Queuing
    https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message
  138. Apache Kafka
    https://kafka.apache.org/
  139. Iron
    http://www.iron.io/mq
  140. kue (založeno na Redisu, určeno pro node.js)
    https://github.com/Automattic/kue
  141. Cloud Pub/Sub
    https://cloud.google.com/pubsub/
  142. Introduction to Redis Streams
    https://redis.io/topics/streams-intro
  143. glob (programming)
    https://en.wikipedia.org/wi­ki/Glob_(programming)
  144. Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
    https://www.slideshare.net/syl­vinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2
  145. Enqueueing internals
    http://python-rq.org/contrib/
  146. queue — A synchronized queue class
    https://docs.python.org/3/li­brary/queue.html
  147. Queue – A thread-safe FIFO implementation
    https://pymotw.com/2/Queue/
  148. Queues
    http://queues.io/
  149. Windows Subsystem for Linux Documentation
    https://docs.microsoft.com/en-us/windows/wsl/about
  150. RestMQ
    http://restmq.com/
  151. ActiveMQ
    http://activemq.apache.org/
  152. Amazon MQ
    https://aws.amazon.com/amazon-mq/
  153. Amazon Simple Queue Service
    https://aws.amazon.com/sqs/
  154. Celery: Distributed Task Queue
    http://www.celeryproject.org/
  155. Disque, an in-memory, distributed job queue
    https://github.com/antirez/disque
  156. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  157. Projekt RQ na PyPi
    https://pypi.org/project/rq/
  158. rq-dashboard 0.3.12
    https://pypi.org/project/rq-dashboard/
  159. Job queue
    https://en.wikipedia.org/wi­ki/Job_queue
  160. Why we moved from Celery to RQ
    https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq
  161. Running multiple workers using Celery
    https://serverfault.com/qu­estions/655387/running-multiple-workers-using-celery
  162. celery — Distributed processing
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.html
  163. Chains
    https://celery.readthedoc­s.io/en/latest/userguide/can­vas.html#chains
  164. Routing
    http://docs.celeryproject­.org/en/latest/userguide/rou­ting.html#automatic-routing
  165. Celery Distributed Task Queue in Go
    https://github.com/gocelery/gocelery/
  166. Python Decorators
    https://wiki.python.org/mo­in/PythonDecorators
  167. Periodic Tasks
    http://docs.celeryproject­.org/en/latest/userguide/pe­riodic-tasks.html
  168. celery.schedules
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.schedules.html#celery­.schedules.crontab
  169. Pros and cons to use Celery vs. RQ
    https://stackoverflow.com/qu­estions/13440875/pros-and-cons-to-use-celery-vs-rq
  170. Priority queue
    https://en.wikipedia.org/wi­ki/Priority_queue
  171. Jupyter
    https://jupyter.org/
  172. How IPython and Jupyter Notebook work
    https://jupyter.readthedoc­s.io/en/latest/architectu­re/how_jupyter_ipython_wor­k.html
  173. Context Managers
    http://book.pythontips.com/en/la­test/context_managers.html

Autor článku

Vystudoval VUT FIT a v současné době pracuje na projektech vytvářených v jazycích Python a Go.