Komunikace s message brokery z programovacího jazyka Go

7. 3. 2019
Doba čtení: 36 minut

Sdílet

 Autor: NATS Project
Seriál o message brokerech by nebyl úplný, pokud bychom se neseznámili s implementací klientů v jazyku Go. Proto si ukážeme balíčky stomp a rmq. Taktéž se seznámíme se systémem NATS, který je naprogramován právě v Go.

Obsah

1. Komunikace s message brokery z programovacího jazyka Go

2. Implementace producenta a konzumenta zpráv v jazyce Go s využitím balíčku stomp

3. Komunikace klientů s message brokerem ve vlastní gorutině

4. Producent i konzument jako součást jediné aplikace se třemi paralelně běžícími gorutinami

5. Jedna z možných implementací message brokera naprogramovaná v nativním Go a používající Redis jako storage

6. Instalace a nastavení Redisu i rmq

7. Jednoduchý producent posílající textové zprávy

8. Konzument zpracovávající jednoduché textové zprávy

9. Marshalling a unmarshalling datových struktur do formátu JSON pro posílání složitějších zpráv

10. Úprava producenta takovým způsobem, aby posílal zprávy ve formátu JSON

11. Konzument akceptující zprávy ve formátu JSON

12. Přenos binárních zpráv

13. Vytvoření a poslání binární zprávy

14. Příjem binární zprávy

15. Systém NATS aneb implementace systému pro doručování zpráv v jazyku Go

16. Komunikační strategie a protokol použitý klienty systému NATS

17. NATS Streaming

18. Repositář s demonstračními příklady

19. Odkazy na předchozí části seriálu

20. Odkazy na Internetu

1. Komunikace s message brokery z programovacího jazyka Go

V předchozích částech seriálu o frontách zpráv a o message brokerech byla většina demonstračních příkladů, s nimiž jsme se seznámili, naprogramována v Pythonu. Výběr tohoto programovacího jazyka samozřejmě nebyl v žádném případě náhodný, protože demonstrační příklady (ale i reálné aplikace) vytvořené v Pythonu jsou v naprosté většině případů krátké a dostatečně přehledné. To je umožněno jak vlastnostmi samotného Pythonu (úsporný zápis zdrojového kódu a dynamický typový systém), tak i knihovnami zabezpečujícími rozhraní mezi Pythonem (resp. přesněji řečeno aplikacemi naprogramovanými v Pythonu) a samotným message brokerem. Jen pro připomenutí jsou v následující tabulce vypsány knihovny určené pro Python, které jsme až doposud používali:

Message broker/protokol Knihovna
Redis Queue (RQ) rq
RabbitMQ pika
Celery celery
Apache ActiveMQ stomp.py, python-qpid-proton

S message brokery je pochopitelně možné komunikovat i z aplikací vytvořených v jiných programovacích jazycích (ostatně viděli jsme jeden příklad naprogramovaný v jazyce Clojure). V enterprise sféře se bude v první řadě jednat o Javu, ovšem zapomenout nesmíme ani na jazyk, který se stává populární v oblasti síťových aplikací a utilit. Jedná se o programovací jazyk Go, jímž se podrobněji zabýváme v samostatném seriálu. Jazyk Go může být ve skutečnosti pro komunikaci s message brokery velmi vhodnou alternativou, protože umožňuje díky podpoře kanálů a gorutin velmi snadnou tvorbu asynchronně běžících funkcí. V dnešním článku se s některými možnostmi nabízenými v této oblasti programovacím jazykem Go alespoň ve stručnosti seznámíme.

V závěrečné části článku si řekneme základní informace o systému pojmenovaném NATS, který je naprogramován právě v jazyce Go a který nabízí uživatelům robustní a přitom systémově nenáročnou implementaci message brokera, který podporuje i takzvaný streaming. Klienty je díky existujícím rozhraním samozřejmě možné psát i v jiných jazycích.

2. Implementace producenta a konzumenta zpráv v jazyce Go s využitím balíčku stomp

V samotném závěru předchozího článku jsme si ukázali, jakým způsobem je možné naprogramovat klienty (zdroje zpráv i jejich příjemce), které komunikují s message brokerem Apache ActiveMQ s využitím jednoduchého protokolu STOMP. Připomeňme si, že jednoduchého producenta zpráv můžeme implementovat následujícím způsobem:

package main
 
import (
        "fmt"
 
        "github.com/go-stomp/stomp"
)
 
const serverAddr = "localhost:61613"
const queueName = "/queue/go_test"
 
const messageCount = 10
 
func sendMessages() {
        conn, err := stomp.Dial("tcp", serverAddr, nil)
        if err != nil {
                println("cannot connect to server", err.Error())
                return
        } else {
                println("connected to server", serverAddr)
        }
        defer conn.Disconnect()
 
        for i := 1; i <= messageCount; i++ {
                text := fmt.Sprintf("Message #%d", i)
                err = conn.Send(queueName, "text/plain", []byte(text), nil)
                if err != nil {
                        println("failed to send to server", err)
                        return
                } else {
                        println("message sent")
                }
        }
        println("sending EXIT message")
        err = conn.Send(queueName, "text/plain", []byte("EXIT"), nil)
        if err != nil {
                println("failed to send EXIT message to server", err)
                return
        } else {
                println("message sent")
        }
        println("sender finished")
}
 
func main() {
        sendMessages()
}

Zdrojový kód klienta obsahuje následující části:

  1. Připojení k message brokeru přes funkci stomp.Dial()
  2. Poslání zprávy metodou conn.Send()
  3. Odpojení klienta metodou conn.Disconnect()

Konzument zpráv bude implementován podobným způsobem, ostatně se podívejme na jeho úplný zdrojový kód:

package main
 
import (
        "github.com/go-stomp/stomp"
)
 
const serverAddr = "localhost:61613"
const queueName = "/queue/go_test"
 
func receiveMessages() {
        conn, err := stomp.Dial("tcp", serverAddr, nil)
 
        if err != nil {
                println("cannot connect to server", err.Error())
                return
        } else {
                println("connected to server", serverAddr)
        }
 
        defer conn.Disconnect()
 
        sub, err := conn.Subscribe(queueName, stomp.AckAuto)
        if err != nil {
                println("cannot subscribe to", queueName, err.Error())
                return
        }
        for {
                msg := <-sub.C
                text := string(msg.Body)
                if text != "EXIT" {
                        println("Received message", text)
                } else {
                        println("Received EXIT command")
                        break
                }
        }
        println("receiver finished")
}
 
func main() {
        receiveMessages()
}

Konzument obsahuje tyto části:

  1. Připojení k message brokeru přes funkci stomp.Dial()
  2. Přihlášení k odběru zpráv metodou conn.Subscribe()
  3. Příjem zprávy/zpráv z automaticky vytvořeného kanálu
  4. Odpojení klienta metodou conn.Disconnect()

3. Komunikace klientů s message brokerem ve vlastní gorutině

Mnohdy se setkáme s požadavkem na to, aby klient (ať již producent zpráv či jejich konzument) komunikovat s message brokerem v samostatně běžící gorutině, která by pracovala paralelně s dalšími gorutinami. Tento požadavek, který dává smysl zejména u webových služeb, je relativně snadné implementovat:

package main
 
import (
        "fmt"
 
        "github.com/go-stomp/stomp"
)
 
const serverAddr = "localhost:61613"
const queueName = "/queue/go_test"
 
const messageCount = 10
 
var stop = make(chan bool)
 
func sendMessages() {
        defer func() {
                stop <- true
        }()
 
        conn, err := stomp.Dial("tcp", serverAddr, nil)
        if err != nil {
                println("cannot connect to server", err.Error())
                return
        } else {
                println("connected to server", serverAddr)
        }
        defer conn.Disconnect()
 
        for i := 1; i <= messageCount; i++ {
                text := fmt.Sprintf("Message #%d", i)
                err = conn.Send(queueName, "text/plain", []byte(text), nil)
                if err != nil {
                        println("failed to send to server", err)
                        return
                } else {
                        println("message sent")
                }
        }
        println("sending EXIT message")
        err = conn.Send(queueName, "text/plain", []byte("EXIT"), nil)
        if err != nil {
                println("failed to send EXIT message to server", err)
                return
        } else {
                println("message sent")
        }
        println("sender finished")
}
 
func main() {
        go sendMessages()
 
        <-stop
}
Poznámka: na konci funkce main musíme počkat na dokončení druhé gorutiny. Právě k tomuto účelu se používá pomocný kanál nazvaný jednoduše stop.

Prakticky stejným způsobem se upraví i příjemce zpráv:

package main
 
import (
        "github.com/go-stomp/stomp"
)
 
const serverAddr = "localhost:61613"
const queueName = "/queue/go_test"
 
var stop = make(chan bool)
 
func receiveMessages(subscribed chan bool) {
        defer func() {
                stop <- true
        }()
 
        conn, err := stomp.Dial("tcp", serverAddr, nil)
 
        if err != nil {
                println("cannot connect to server", err.Error())
                return
        } else {
                println("connected to server", serverAddr)
        }
 
        defer conn.Disconnect()
 
        sub, err := conn.Subscribe(queueName, stomp.AckAuto)
        if err != nil {
                println("cannot subscribe to", queueName, err.Error())
                return
        }
        close(subscribed)
 
        for {
                msg := <-sub.C
                text := string(msg.Body)
                if text != "EXIT" {
                        println("Received message", text)
                } else {
                        println("Received EXIT command")
                        break
                }
        }
        println("receiver finished")
}
 
func main() {
        subscribed := make(chan bool)
        go receiveMessages(subscribed)
 
        <-subscribed
 
        <-stop
}

4. Producent i konzument jako součást jediné aplikace se třemi paralelně běžícími gorutinami

Pro zajímavost se podívejme na způsob implementace producenta i konzumenta zpráv v jediné aplikaci, ve které se tedy používají tři gorutiny – hlavní gorutina, gorutina producenta zpráv a gorutina jejich konzumenta:

package main
 
import (
        "fmt"
        "github.com/go-stomp/stomp"
        "time"
)
 
const serverAddr = "localhost:61613"
const queueName = "/queue/go_test"
 
const messageCount = 10
 
var stop = make(chan bool)
 
func sendMessages() {
        defer func() {
                stop <- true
        }()
 
        conn, err := stomp.Dial("tcp", serverAddr, nil)
        if err != nil {
                println("Cannot connect to server", err.Error())
                return
        } else {
                println("Publisher part connected to server", serverAddr)
        }
        defer conn.Disconnect()
 
        time.Sleep(5 * time.Second)
 
        for i := 1; i <= messageCount; i++ {
                text := fmt.Sprintf("Message #%d", i)
                err = conn.Send(queueName, "text/plain", []byte(text), nil)
                if err != nil {
                        println("Failed to send to server", err)
                        return
                } else {
                        println("Message sent")
                }
        }
        println("Sending EXIT message")
        err = conn.Send(queueName, "text/plain", []byte("EXIT"), nil)
        if err != nil {
                println("Failed to send EXIT message to server", err)
                return
        } else {
                println("Message sent")
        }
        println("Publisher finished")
}
 
func receiveMessages(subscribed chan bool) {
        defer func() {
                stop <- true
        }()
 
        conn, err := stomp.Dial("tcp", serverAddr, nil)
 
        if err != nil {
                println("Cannot connect to server", err.Error())
                return
        } else {
                println("Subscriber part connected to server", serverAddr)
        }
 
        defer conn.Disconnect()
 
        time.Sleep(5 * time.Second)
 
        sub, err := conn.Subscribe(queueName, stomp.AckAuto)
        if err != nil {
                println("Cannot subscribe to", queueName, err.Error())
                return
        }
        close(subscribed)
 
        for {
                msg := <-sub.C
                text := string(msg.Body)
                if text != "EXIT" {
                        println("Received message", text)
                } else {
                        println("Received EXIT command")
                        break
                }
        }
        println("Subscriber finished")
}
 
func main() {
        go sendMessages()
        subscribed := make(chan bool)
        go receiveMessages(subscribed)
 
        <-subscribed
 
        <-stop
        <-stop
}

Na tomto místě se můžete oprávněně zeptat, proč vlastně pro komunikaci mezi několika gorutinami, které jsou součástí jediné aplikace, vlastně používáme message brokera se všemi komplikacemi, které toto řešení přináší. Můžeme totiž namísto toho vytvořit běžný komunikační kanál, nastavit mu vysokou kapacitu bufferu a používat přímo operátor ← pro přidání hodnoty (zprávy) do kanálu i pro vyjmutí a zpracování této hodnoty (zprávy) na straně druhé. To je samozřejmě pravda a toto řešení bude plně funkční a zajistí i dobrou výkonnost. Ovšem ve chvíli, kdy je zapotřebí zaručit, že zpráva bude skutečně doručena, i když dojde například k pádu aplikace, již může být použití message brokera užitečným řešením. A pokud se navíc přidá i požadavek na zajištění persistence zpráv, je message broker (například běžící pouze lokálně) nejenom vhodné, ale pro tuto část architektury celé aplikace vlastně i idiomatické řešení (idiomy se totiž netýkají jen programových konstrukcí, ale i celé architektury).

Poznámka: samozřejmě je možné použít pro čekání na obě gorutiny (zdroje zpráv i příjemce) dva samostatné kanály pojmenované například stopProducer a stopConsumer, čímž se automaticky zajistí, že například producent zpráv nezapíše do kanálu omylem dvě hodnoty. Na konci funkce main je potom prakticky jedno, ze kterého kanálu bude provedeno čtení dříve. Úprava programu s implementací producenta i konzumenta běžících v samostatném vláknu by tedy vypadala následovně:
package main
 
import (
        "fmt"
        "github.com/go-stomp/stomp"
        "time"
)
 
const serverAddr = "localhost:61613"
const queueName = "/queue/go_test"
 
const messageCount = 10
 
var stopProducer = make(chan bool)
var stopConsumer = make(chan bool)
 
func sendMessages(stop chan bool) {
        defer func() {
                stop <- true
        }()
 
        conn, err := stomp.Dial("tcp", serverAddr, nil)
        if err != nil {
                println("Cannot connect to server", err.Error())
                return
        } else {
                println("Publisher part connected to server", serverAddr)
        }
        defer conn.Disconnect()
 
        time.Sleep(5 * time.Second)
 
        for i := 1; i <= messageCount; i++ {
                text := fmt.Sprintf("Message #%d", i)
                err = conn.Send(queueName, "text/plain", []byte(text), nil)
                if err != nil {
                        println("Failed to send to server", err)
                        return
                } else {
                        println("Message sent")
                }
        }
        println("Sending EXIT message")
        err = conn.Send(queueName, "text/plain", []byte("EXIT"), nil)
        if err != nil {
                println("Failed to send EXIT message to server", err)
                return
        } else {
                println("Message sent")
        }
        println("Publisher finished")
}
 
func receiveMessages(subscribed chan bool, stop chan bool) {
        defer func() {
                stop <- true
        }()
 
        conn, err := stomp.Dial("tcp", serverAddr, nil)
 
        if err != nil {
                println("Cannot connect to server", err.Error())
                return
        } else {
                println("Subscriber part connected to server", serverAddr)
        }
 
        defer conn.Disconnect()
 
        time.Sleep(5 * time.Second)
 
        sub, err := conn.Subscribe(queueName, stomp.AckAuto)
        if err != nil {
                println("Cannot subscribe to", queueName, err.Error())
                return
        }
        close(subscribed)
 
        for {
                msg := <-sub.C
                text := string(msg.Body)
                if text != "EXIT" {
                        println("Received message", text)
                } else {
                        println("Received EXIT command")
                        break
                }
        }
        println("Subscriber finished")
}
 
func main() {
        go sendMessages(stopProducer)
        subscribed := make(chan bool)
        go receiveMessages(subscribed, stopConsumer)
 
        <-subscribed
 
        <-stopProducer
        <-stopConsumer
}

5. Jedna z možných implementací message brokera naprogramovaná v nativním Go a používající Redis jako storage

Hned v úvodní části seriálu o message brokerech jsme se zabývali popisem nástroje nazvaného Redis Queue, jenž je postaven – jak již ostatně jeho název velmi dobře napovídá – na databázi Redis [1] [2], která se používá pro ukládání jednotlivých zpráv do front. Podobný koncept používá i další dnes popisovaná knihovna, která se jmenuje rmq a nalezneme ji na GitHubu na adrese https://github.com/adjust/rmq.

Jedná se o balíček určený pro vývojáře využívající programovací jazyk Go (ostatně i samotné rmq je vytvořeno v čistém Go), kteří potřebují vytvářet klienty připojované k message brokerovi, přičemž cílem autorů rmq je dosáhnout toho, aby kód vytvořený v klientech byl co nejkratší a navíc i snadno pochopitelný. Samotný message broker je opět tvořen databází Redis a balíček rmq „pouze“ zabezpečuje posílání, výběr a ukládání zpráv do Redisu.

6. Instalace a nastavení Redisu i rmq

Nejprve si řekněme, jak se vlastně rmq instaluje.

V prvním kroku musíme nainstalovat a spustit samotný Redis, což je snadné, protože je balíček s Redisem součástí většiny Linuxových distribucí. Například na Fedoře může instalace vypadat následovně:

$ sudo dnf install redis
 
Last metadata expiration check: 0:15:30 ago on Wed 24 Oct 2018, 22:50:11 CEST.
Dependencies resolved.
================================================================================
 Package           Arch            Version               Repository        Size
================================================================================
Installing:
 redis             x86_64          4.0.9-1.fc27          updates          580 k
Installing dependencies:
 jemalloc          x86_64          4.5.0-5.fc27          updates          210 k
 
Transaction Summary
================================================================================
Install  2 Packages
 
Total download size: 790 k
Installed size: 2.0 M
Is this ok [y/N]:
 

Na systémech založených na Debianu (včetně Ubuntu) lze pro instalaci použít příkaz:

$ apt-get install redis-server

V případě, že budete potřebovat použít nejnovější verzi Redisu, můžete si ho sami přeložit. Postup je jednoduchý (mj. i díky minimálním závislostem na dalších knihovnách) a je podrobně popsán na stránce https://redis.io/topics/quickstart.

Pro vlastní databázi, konfigurační soubor, žurnál a logy Redisu použijeme samostatný adresář, který vytvoříme v domácím adresáři připojeného uživatele:

$ mkdir redis
$ cd redis

Po instalaci se můžeme přesvědčit, že je skutečně k dispozici spustitelný soubor s implementací serveru i řádkového klienta:

$ whereis -b redis-cli
redis-cli: /usr/bin/redis-cli
$ whereis -b redis-server
redis-server: /usr/bin/redis-server

Následně přímo v tomto adresáři vytvoříme konfigurační soubor nazvaný redis.conf. Můžeme se přitom inspirovat souborem /etc/redis/redis.conf (Debian a systémy od něj odvozené), popř. /etc/redis.conf (Fedora, RHEL, CentOS), který je však poměrně rozsáhlý, protože kromě vlastních konfiguračních voleb obsahuje i podrobné informace o významu jednotlivých konfiguračních voleb. Tento soubor je taktéž dostupný na internetu na adrese https://raw.githubusercon­tent.com/antirez/redis/4.0/re­dis.conf.

Následuje výpis obsahu konfiguračního souboru, který je připraven pro lokální spuštění Redisu, bez nebezpečí, že se k běžícímu serveru připojí případný útočník. Důležité volby jsou zvýrazněny. Pokud se vám soubor nechce kopírovat, naleznete ho na adrese https://github.com/tisnik/pre­sentations/blob/master/re­dis/redis.conf:

bind 127.0.0.1
protected-mode yes
port 6379
tcp-backlog 511
timeout 0
tcp-keepalive 300
daemonize no
supervised no
pidfile /var/run/redis_6379.pid
loglevel notice
logfile redis.log
databases 16
always-show-logo yes
save 900 1
save 300 10
save 60 10000
stop-writes-on-bgsave-error yes
rdbcompression yes
rdbchecksum yes
dbfilename dump.rdb
dir .
slave-serve-stale-data yes
slave-read-only yes
repl-diskless-sync no
repl-diskless-sync-delay 5
repl-disable-tcp-nodelay no
slave-priority 100
lazyfree-lazy-eviction no
lazyfree-lazy-expire no
lazyfree-lazy-server-del no
slave-lazy-flush no
appendonly yes
appendfilename "appendonly.aof"
appendfsync everysec
no-appendfsync-on-rewrite no
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb
aof-load-truncated yes
aof-use-rdb-preamble no
lua-time-limit 5000
slowlog-log-slower-than 10000
slowlog-max-len 128
latency-monitor-threshold 0
notify-keyspace-events ""
hash-max-ziplist-entries 512
hash-max-ziplist-value 64
list-max-ziplist-size -2
list-compress-depth 0
set-max-intset-entries 512
zset-max-ziplist-entries 128
zset-max-ziplist-value 64
hll-sparse-max-bytes 3000
activerehashing yes
client-output-buffer-limit normal 0 0 0
client-output-buffer-limit slave 256mb 64mb 60
client-output-buffer-limit pubsub 32mb 8mb 60
hz 10
aof-rewrite-incremental-fsync yes

Nyní již můžeme databázi Redis spustit, aniž by došlo k tomu, že bude její API viditelné ostatním počítačům připojeným do sítě:

$ redis-server redis.conf 

Na druhém terminálu pak již můžeme (čistě pro otestování) spustit klienta Redisu, který uživatelům nabízí interaktivní příkazový řádek:

$ redis-cli
127.0.0.1:6379>

Příkazem „ping“ můžeme otestovat, jestli se klient připojí k serveru a zda od něj dokáže získávat odpovědi:

127.0.0.1:6379> ping
PONG
 
127.0.0.1:6379> ping test
"test"

Nyní by měl být systém Redis připraven pro připojení klientů, kteří s využitím balíčku rmq využijí tuto databázi ve funkci storage message brokera. Nastává tedy čas pro instalaci balíčku rmq. Nejprve přejdeme do adresáře ~/go (ten již byl připraven v rámci instalace jazyka Go, kterou jsme si popsali minule):

$ cd ~/go

Dále v tomto adresáři zadáme následující příkaz:

$ go get https://github.com/adjust/rmq

V adresáři by se měla objevit následující struktura (pod)adresářů:

.
├── pkg
│   └── linux_amd64
│       └── github.com
│           └── adjust
└── src
    └── github.com
        ├── adjust
        │   ├── rmq
        │   │   └── example
        │   │       ├── batch_consumer
        │   │       ├── cleaner
        │   │       ├── consumer
        │   │       ├── handler
        │   │       ├── producer
        │   │       ├── purger
        │   │       └── returner
        │   └── uniuri
        └── go-redis
            └── redis
                ├── internal
                │   ├── consistenthash
                │   ├── hashtag
                │   ├── pool
                │   ├── proto
                │   └── util
                └── testdata
Poznámka: povšimněte si, že se ve skutečnosti nainstalovaly dva balíčky nazvané rmq a taktéž redis.

7. Jednoduchý producent posílající textové zprávy

Při implementaci producenta jednoduchých textových zpráv využijeme především funkci pojmenovanou OpenConnection, která se pokusí o připojení k Redisu:

func OpenConnection(tag, network, address string, db int) *redisConnection

Připojení může být realizováno následovně:

connection := rmq.OpenConnection("test_service_producer", "tcp", "localhost:6379", 1)

Získáme tak datovou strukturu, která mj. implementuje rozhraní nazvané Connection:

type Connection interface {
        OpenQueue(name string) Queue
        CollectStats(queueList []string) Stats
        GetOpenQueues() []string
}

Dnes nás bude zajímat jen jediná metoda z tohoto rozhraní, a to konkrétně metoda pro otevření (či vytvoření) fronty se zadaným jménem:

func (connection *redisConnection) OpenQueue(name string) Queue

Konkrétní příklad použití:

connection := rmq.OpenConnection("test_service_producer", "tcp", "localhost:6379", 1)
taskQueue := connection.OpenQueue("task_queue")

Jakmile je fronta získána či vytvořena, můžeme použít její metodu Publish pro publikování nějaké zprávy, tj. pro její poslání do fronty:

func (queue *redisQueue) Publish(payload string) bool

Opět si ukažme praktické použití:

connection := rmq.OpenConnection("test_service_producer", "tcp", "localhost:6379", 1)
taskQueue := connection.OpenQueue("task_queue")
delivery := "task payload 1"
taskQueue.Publish(delivery)

Celý zdrojový kód klienta je nakonec velmi krátký a přehledný:

package main
 
import "github.com/adjust/rmq"
 
func main() {
        connection := rmq.OpenConnection("test_service_producer", "tcp", "localhost:6379", 1)
        println("Connection object: ", connection)
 
        taskQueue := connection.OpenQueue("task_queue")
        println("Queue: ", taskQueue)
 
        delivery := "task payload 1"
        taskQueue.Publish(delivery)
 
        delivery = "task payload 2"
        taskQueue.Publish(delivery)
}
Poznámka: na straně Redisu se vytvoří dva objekty v databázi:
Objekt Stručný popis
rmq::connections množina aktivních připojení
rmq::queues množina front

8. Konzument zpracovávající jednoduché textové zprávy

Konzument zpráv je nepatrně složitější, protože se zde používá callback metoda nazvaná Consume. Po připojení a vytvoření fronty nám již známým způsobem:

connection := rmq.OpenConnection("test_service_consumer", "tcp", "localhost:6379", 1)
taskQueue := connection.OpenQueue("task_queue")

je nutné se přihlásit k odběru zpráv a zaregistrovat objekt typu Consumer, který bude zprávy odebírat:

taskQueue.StartConsuming(10, time.Second)
taskQueue.AddConsumer("consumer", NewConsumer())

Samotný odběratel zpráv musí implementovat toto rozhraní s jedinou metodou:

type Consumer interface {
        Consume(delivery Delivery)
}

Přičemž Delivery je rozhraní implementované datovou strukturou se zprávou:

type Delivery interface {
        Payload() string
        Ack() bool
        Reject() bool
        Push() bool
}

Důležité jsou metody pro potvrzení zprávy či naopak pro její nepotvrzení:

func (delivery *wrapDelivery) Ack() bool
func (delivery *wrapDelivery) Reject() bool

Ty využijeme v našem konzumentovi/příjemci zpráv:

func (consumer *Consumer) Consume(delivery rmq.Delivery) {
        println("consume begin")
 
        println(delivery.Payload())
        delivery.Ack()
 
        println("consume end")
}

Úplná implementace klienta může vypadat následovně:

package main
 
import (
        "github.com/adjust/rmq"
        "time"
)
 
type Consumer struct {
}
 
func NewConsumer() *Consumer {
        return &Consumer{}
}
 
func (consumer *Consumer) Consume(delivery rmq.Delivery) {
        println("consume begin")
 
        println(delivery.Payload())
        delivery.Ack()
 
        println("consume end")
}
 
func main() {
        connection := rmq.OpenConnection("test_service_consumer", "tcp", "localhost:6379", 1)
        println("Connection object: ", connection)
 
        taskQueue := connection.OpenQueue("task_queue")
        println("Queue: ", taskQueue)
 
        taskQueue.StartConsuming(10, time.Second)
        taskQueue.AddConsumer("consumer", NewConsumer())
        select {}
}

9. Marshalling a unmarshalling datových struktur do formátu JSON pro posílání složitějších zpráv

Často se setkáme i s požadavkem, aby byly zprávy posílány ve formátu JSON. Problematice převodu datových struktur z jazyka Go do JSONu (marshalling) a samozřejmě i zpětného převodu (unmarshalling) jsme se nedávno věnovali v samostatném článku Vývoj síťových aplikací v programovacím jazyku Go (práce s JSONem a rastrovými obrázky), takže jen ve stručnosti:

Pro převod libovolného typu (přesněji řečeno hodnoty libovolného typu) do JSONu se používá funkce nazvaná Marshal, kterou nalezneme v balíčku encoding/json:

func Marshal(v interface{}) ([]byte, error)

Opačná operace spočívá v importu dat z formátu JSON do interních datových struktur programovacího jazyka Go. Pro tuto operaci, která se nazývá unmarshalling, slouží následující funkce:

func Unmarshal(data []byte, v interface{}) error

Vstupem je v tomto případě pole (řez) bajtů, výstup je vrácen přes ukazatel předaný ve druhém parametru (což znamená, že se musíme sami postarat o případnou alokaci paměti pro strukturu či pro mapu). Samozřejmě, že při unmarshallingu může dojít k nějaké chybě, která je vrácena volající funkci. Pokud k chybě nedošlo, je návratová hodnota rovna nil.

10. Úprava producenta takovým způsobem, aby posílal zprávy ve formátu JSON

Vyzkoušejme si nyní upravit producenta zpráv tak, aby posílal obsah datové struktury nazvané TaskPayload ve formátu JSON. Strukturu nejdříve převedeme do JSONu a následně výsledek takzvaného „marshallingu“ pošleme do message brokera:

bytes, err := json.Marshal(payload)
if err != nil {
        println(err)
        return
}
taskQueue.PublishBytes(bytes)

Následuje výpis úplného zdrojového kódu tohoto příkladu:

package main
 
import (
        "encoding/json"
        "github.com/adjust/rmq"
)
 
type TaskPayload struct {
        Id     int32
        Name   string
        Param1 int32
        Param2 int32
}
 
func SendTask(taskQueue rmq.Queue, payload TaskPayload) {
        bytes, err := json.Marshal(payload)
        if err != nil {
                println(err)
                return
        }
        taskQueue.PublishBytes(bytes)
}
 
func main() {
        connection := rmq.OpenConnection("test_service_producer", "tcp", "localhost:6379", 1)
        println("Connection object: ", connection)
 
        taskQueue := connection.OpenQueue("task_queue")
        println("Queue: ", taskQueue)
 
        SendTask(taskQueue, TaskPayload{1, "test1", 0, 0})
        SendTask(taskQueue, TaskPayload{2, "test2", 6, 7})
}

11. Konzument akceptující zprávy ve formátu JSON

Konzument zpráv posílaných ve formátu JSON je složitější. Nejdříve totiž musíme získat obsah zprávy (payload), ten převést z JSONu do datové struktury (unmarshalling) a výsledek použít:

var task TaskPayload
if err := json.Unmarshal([]byte(delivery.Payload()), &task); err != nil {
        delivery.Reject()
        return
}
 
println("performing task", task.Id, "name", task.Name, "with parameters", task.Param1, task.Param2)
delivery.Ack()

Výsledný zdrojový kód může vypadat následovně:

package main
 
import (
        "encoding/json"
        "github.com/adjust/rmq"
        "time"
)
 
type TaskPayload struct {
        Id     int32
        Name   string
        Param1 int32
        Param2 int32
}
 
type Consumer struct {
}
 
func NewConsumer() *Consumer {
        return &Consumer{}
}
 
func (consumer *Consumer) Consume(delivery rmq.Delivery) {
        println("consume begin")
 
        println(delivery.Payload())
 
        var task TaskPayload
        if err := json.Unmarshal([]byte(delivery.Payload()), &task); err != nil {
                delivery.Reject()
                return
        }
 
        println("performing task", task.Id, "name", task.Name, "with parameters", task.Param1, task.Param2)
        delivery.Ack()
 
        println("consume end")
}
 
func main() {
        connection := rmq.OpenConnection("test_service_consumer", "tcp", "localhost:6379", 1)
        println("Connection object: ", connection)
 
        taskQueue := connection.OpenQueue("task_queue")
        println("Queue: ", taskQueue)
 
        taskQueue.StartConsuming(10, time.Second)
        taskQueue.AddConsumer("consumer", NewConsumer())
        select {}
}

12. Přenos binárních zpráv

Mnohdy se setkáme s požadavkem, aby klienti do message brokera posílali binární zprávy, samozřejmě s tím, že po vyzvednutí bude mít zpráva naprosto stejný obsah, jako originální data (tj. nesmí dojít například k porušení obsahu nejvyšších bitů každého bajtu atd.).

Při psaní klientů naprogramovaných v Pythonu, kteří spolu komunikovali přes message broker Apache ActiveMQ, jsme se posílání binárních zpráv vyhnuli díky zakódování binárních dat do sekvence tisknutelných znaků, například s využitím Base64, který má tu výhodu, že algoritmus kódování/dekódování lze vytvořit velmi snadno (pokud již pro daný programovací jazyk neexistuje příslušná knihovna – většinou je již k dispozici). Nevýhodou tohoto řešení jsou delší zprávy, které musí být v message brokerovi uloženy (ve frontách) a – což je většinou kritičtější – pomalejší posílání či příjem zpráv – pokud se ovšem zpracovávají relativně krátké zprávy s menší frekvencí (desítky za sekundu pro jednoho producenta), nemělo by být zpoždění vůbec patrné.

Pro otestování posílání a příjmu zpráv s binárními daty vytvoříme producenta, který do vybrané fronty pošle data reprezentující rastrový obrázek ve formátu GIF. Samotný obrázek získáme snadno (jedná se o ikonu Vimu, v němž ostatně celý tento článek vznikl):

wget https://www.vim.org/images/vim_editor.gif

Jedná se o následující obrázek – ikonu:

Obrázek 1: Ikona použitá jako příklad binárních dat.

13. Vytvoření a poslání binární zprávy

Pro posílání textové zprávy jsme v předchozích dvou příkladech využívali metodu Publish:

func (queue *redisQueue) Publish(payload string) bool

U binárních dat to není vhodné, ovšem existuje i alternativní metoda nazvaná příznačně PublishBytes:

func (queue *redisQueue) PublishBytes(payload []byte) bool

Ve zdroji zpráv ještě uděláme jednu změnu – vytvoříme funkci, která načte obsah binárního souboru a ten následně pošle do zvolené fronty:

bytes, err := ioutil.ReadFile(filename)
if err == nil {
        println("Read", len(bytes), "bytes")
        SendBinaryMessage(binaryMessagesQueue, bytes)
        println("Sent")
} else {
        println("Error opening file", err)
}

Úplný zdrojový kód producenta binární zprávy může vypadat následovně:

package main
 
import (
        "github.com/adjust/rmq"
        "io/ioutil"
)
 
func SendBinaryMessage(binaryMessagesQueue rmq.Queue, binaryMessage []byte) {
        binaryMessagesQueue.PublishBytes(binaryMessage)
}
 
func SendFileContent(binaryMessagesQueue rmq.Queue, filename string) {
        bytes, err := ioutil.ReadFile(filename)
        if err == nil {
                println("Read", len(bytes), "bytes")
                SendBinaryMessage(binaryMessagesQueue, bytes)
                println("Sent")
        } else {
                println("Error opening file", err)
        }
}
 
func main() {
        connection := rmq.OpenConnection("binary_message_app", "tcp", "localhost:6379", 1)
        println("Connection object: ", connection)
 
        binaryMessagesQueue := connection.OpenQueue("binary_messages_queue")
        println("Queue: ", binaryMessagesQueue)
 
        SendFileContent(binaryMessagesQueue, "vim_editor.gif")
}

14. Příjem binární zprávy

Příjemce binární zprávy bude muset být také upraven. Zprávu přijmeme běžným způsobem, ovšem samotný její obsah (payload) bude uložen do souboru s využitím funkce WriteFile z balíčku ioutil. Povšimněte si, že této funkci je nutné předat i příznaky s přístupovými právy k souboru:

const FileFlags = 0664
 
payload := delivery.Payload()
 
println("Received binary message", len(payload), "bytes")
 
err := ioutil.WriteFile("received.gif", []byte(payload), FileFlags)
if err == nil {
        println("Written")
} else {
        println(err)
}
delivery.Ack()

Opět si samozřejmě ukážeme úplný zdrojový kód tohoto příkladu:

package main
 
import (
        "github.com/adjust/rmq"
        "io/ioutil"
        "time"
)
 
type Consumer struct{}
 
func NewConsumer() *Consumer {
        return &Consumer{}
}
 
func (consumer *Consumer) Consume(delivery rmq.Delivery) {
        const FileFlags = 0664
 
        println("consume begin")
        payload := delivery.Payload()
 
        println("Received binary message", len(payload), "bytes")
 
        err := ioutil.WriteFile("received.gif", []byte(payload), FileFlags)
        if err == nil {
                println("Written")
        } else {
                println(err)
        }
        delivery.Ack()
 
        println("consume end")
}
 
func main() {
        connection := rmq.OpenConnection("binary_messages_queue", "tcp", "localhost:6379", 1)
        println("Connection object: ", connection)
 
        binaryMessagesQueue := connection.OpenQueue("binary_messages_queue")
        println("Queue: ", binaryMessagesQueue)
 
        binaryMessagesQueue.StartConsuming(10, time.Second)
        binaryMessagesQueue.AddConsumer("consumer", NewConsumer())
        select {}
}

15. Systém NATS aneb implementace systému pro doručování zpráv v jazyku Go

V závěrečné části dnešního článku se zmíníme o systému pro doručování, ukládání a (re)distribuci zpráv, který se jmenuje NATS. Jedná se o poměrně úspěšný projekt, jenž je vyvinut právě v programovacím jazyku Go, což mu do jisté míry zajišťuje stabilitu i škálovatelnost. To jsou vlastnosti, které u message brokerů většinou očekáváme.

Původní varianta NATSu byla vytvořena Derekem Collisonem; zajímavé je, že tato varianta nebyla naprogramována v jazyce Go, ale v programovacím jazyce Ruby. Dnes se ovšem budeme zabývat moderní (a jedinou podporovanou) verzí systému NATS, která byla přeportována do jazyka Go. Celý systém NATS se skládá z několika komponent:

  1. V první řadě se jedná o samotný server, jenž se spouští příkazem gnatsd. Server je naprogramovaný v Go a při jeho vývoji bylo dbáno na to, aby byla zaručena vysoká dostupnost celé služby a přitom byla samotná služba s běžícím serverem málo náročná na systémové zdroje, především na spotřebu operační paměti (to má v době Dockeru a podobných nástrojů poměrně velký význam).
  2. Dalším typem komponenty jsou programátorská rozhraní pro klienty, která v současnosti existují pro několik ekosystémů (což je většinou kombinace programovacího jazyka, knihoven a popř. jeho virtuálního stroje); viz též tabulky s podporovanými ekosystémy, které jsou zobrazeny pod tímto odstavcem.
  3. Třetí komponentou je NATS Streaming Server, který je opět naprogramován v Go a který si popíšeme v sedmnácté kapitole.
  4. Čtvrtým typem komponenty je takzvaný NATS Connector Framework zajišťující propojení systému NATS s dalšími technologiemi (XMPP, logování, notifikační služby aj.). Ten je naprogramovaný v Javě a v současnosti je podporován například konektor pro Redis (https://github.com/nats-io/nats-connector-redis).

Oficiálně jsou podporována rozhraní pro následující ekosystémy:

# Programovací jazyk/ekosystém
1 C
2 C#
3 Elixir
4 Go
5 Java
6 NGINX
7 Node.js
8 Python Asyncio
9 Python Tornado
10 Ruby
11 TypeScript

Existují však i další rozhraní, která sice nejsou podporována oficiálně, ale většinou jsou dostatečně stabilní na to, aby byla reálně použitelná:

# Programovací jazyk/ekosystém
1 .NET
2 Arduino
3 Clojure
4 Elm
5 Erlang
6 Haskell
7 Lua
8 MicroPython
9 PHP
10 Perl
11 Python
12 Python Twisted
13 Qt5 C++
14 Rust
15 Scala
16 Spring API
17 Swift

16. Komunikační strategie a protokol použitý klienty systému NATS

NATS svým uživatelům nabízí klasickou komunikační strategii typu pub-sub (zprávy jsou doručovány na základě nastaveného tématu – topicu či v jiné terminologii subjectu), s níž jsme se již seznámili v předchozích článcích. Ovšem kromě toho je možné využít i strategie request-reply (ta je implementačně nejsložitější, protože vyžaduje určitou koordinaci obou klientů se zapamatováním kontextu) a taktéž strategii typu push-pull (zde se právě využívají fronty zpráv). Strategie request-reply se dále rozděluje na dvě podkategorie podle toho, zda se zpráva doručuje jen jedinému příjemci (point-to-point) nebo více příjemcům (one-to-many). Ve chvíli, kdy klient pošle žádost (request), vytvoří se pro ni takzvaný inbox, který mohou příjemci použít po odeslání odpovědi. S příklady použití této zajímavé strategie se seznámíme v navazujícím článku.

Samotný komunikační protokol používaný klienty při přístupu k systému NATS je textový a je navržen takovým způsobem, aby bylo možné samotnou komunikaci naprogramovat relativně snadno, popř. si komunikaci otestovat přes telnet či podobný nástroj (což může být velmi vhodné při hledání chyb v celém systému; ostatně ti administrátoři, kteří museli ladit nějaký systém využívající binární protokol jistě budou souhlasit). V samotném protokolu se rozeznává jen několik typů příkazů, které jsou vypsány v další tabulce:

Příkaz Posílá Stručný popis
INFO server posláno klientovi po jeho připojení k serveru
CONNECT klient žádost o připojení se specifikací jeho parametrů
PUB klient publikace zprávy klientem (formát si popíšeme příště)
SUB klient přihlášení klienta k odebírání určitého tématu
UNSUB klient odhlášení klienta od odebírání určitého tématu
MSG server poslání zprávy serverem odebírateli
PING klient i server klasický systém ping-pong pro ověření, zda druhá strana (ještě) komunikuje
PONG klient i server klasický systém ping-pong pro ověření, zda druhá strana (ještě) komunikuje
+OK server potvrzení zprávy či příkazu (+ je součástí příkazu)
-ERR server informace o tom, že poslaná zpráva nebo příkaz nemá správný formát (- je opět součástí příkazu)
Poznámka: i když je samotný protokol textový, jsou těla zpráv považována za binární data, čehož se často využívá například v oblasti IoT, kdy čidla mohou přímo posílat naměřená data, bez nutnosti jejich převodu do JSONu či podobného formátu.
Poznámka: zajímavé je porovnání s již popsaným protokolem STOMP, který používá prakticky stejný přístup, i když se pochopitelně jednotlivé příkazy od sebe odlišují (v NATS chybí přímé příkazy pro transakce atd.).

17. NATS Streaming

Jedná se o rozšíření klasického message brokera o další užitečné vlastnosti, zejména o konfigurovatelnou persistenci zpráv, dále o systém zaručující doručení zprávy (jak na message brokera, tak i ke klientovi – odběrateli zpráv) a především pak systémem určujícím maximální množství zpráv čekajících na doručení (publisher rate limit, rate limiting per subscriber). Nesmíme zapomenout ani na další poměrně unikátní vlastnost: možnost opětovného „přehrátí“ zpráv podle zadaných kritérií (ke zprávám je přiřazen určitý subject a taktéž časové razítko, takže pořadí zpráv je možné zaručit – ostatně právě proto se také používá termín „streaming“). Mimochodem: časové razítko má přesnost v řádu nanosekund.

Oficiálně podporovaní klienti komponenty NATS Streaming:

# Jazyk/ekosystém
1 C
2 C#
3 Go
4 Java
5 Node.js
6 Python Asyncio
7 Ruby

Podrobněji si tento systém vysvětlíme příště.

bitcoin_skoleni

18. Repositář s demonstračními příklady

Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v programovacím jazyku Go byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce.

Příklad Skript/kód Popis Cesta
1 publisher.go producent zpráv naprogramovaný v Go https://github.com/tisnik/message-queues-examples/blob/master/go-stomp/push-pull/publisher.go
1 subscriber.go konzument zpráv naprogramovaný v Go https://github.com/tisnik/message-queues-examples/blob/master/go-stomp/push-pull/subscriber.go
       
2 publisher.go producent zpráv naprogramovaný v Go využívající gorutiny https://github.com/tisnik/message-queues-examples/blob/master/go-stomp/push-pull-goroutines/publisher.go
2 subscriber.go konzument zpráv naprogramovaný v Go využívající gorutiny https://github.com/tisnik/message-queues-examples/blob/master/go-stomp/push-pull-goroutines/subscriber.go
       
3 publisher_subscriber.go producent i konzument zpráv běžící paralelně v jediném procesu https://github.com/tisnik/message-queues-examples/blob/master/go-stomp/push-pull-one-app/publisher_subscriber.go
       
4 publisher_subscriber.go další verze producenta a konzumenta zpráv běžících paralelně v jediném procesu https://github.com/tisnik/message-queues-examples/blob/master/go-stomp/push-pull-one-app-2/publisher_subscriber.go
       
5 producer.go producent textových zpráv založených na systému rmq https://github.com/tisnik/message-queues-examples/blob/master/go-rmq/01-producer-consumer/producer.go
5 consumer.go konzument textových zpráv založených na systému rmq https://github.com/tisnik/message-queues-examples/blob/master/go-rmq/01-producer-consumer/consumer.go
       
6 producer.go producent JSON zpráv založených na systému rmq https://github.com/tisnik/message-queues-examples/blob/master/go-rmq/02-producer-consumer-json/producer.go
6 consumer.go konzument JSON zpráv založených na systému rmq https://github.com/tisnik/message-queues-examples/blob/master/go-rmq/02-producer-consumer-json/consumer.go
       
7 producer.go producent binárních zpráv založených na systému rmq https://github.com/tisnik/message-queues-examples/blob/master/go-rmq/03-producer-consumer-binary/producer.go
7 consumer.go konzument binárních zpráv založených na systému rmq https://github.com/tisnik/message-queues-examples/blob/master/go-rmq/03-producer-consumer-binary/consumer.go
7 get_image skript pro stažení testovacího obrázku https://github.com/tisnik/message-queues-examples/blob/master/go-rmq/03-producer-consumer-binary/get_image

19. Odkazy na předchozí části seriálu

V této kapitole jsou uvedeny odkazy na všech jedenáct předchozích částí seriálu, v němž se zabýváme různými způsoby implementace front zpráv a k nim přidružených technologií:

  1. Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí
    https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/
  2. Celery: systém implementující asynchronní fronty úloh pro Python
    https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python/
  3. Celery: systém implementující asynchronní fronty úloh pro Python (dokončení)
    https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python-dokonceni/
  4. RabbitMQ: jedna z nejúspěšnějších implementací brokera
    https://www.root.cz/clanky/rabbitmq-jedna-z-nejuspesnejsich-implementaci-brokera/
  5. Pokročilejší operace nabízené systémem RabbitMQ
    https://www.root.cz/clanky/po­krocilejsi-operace-nabizene-systemem-rabbitmq/
  6. ØMQ: knihovna pro asynchronní předávání zpráv
    https://www.root.cz/clanky/0mq-knihovna-pro-asynchronni-predavani-zprav/
  7. Další možnosti poskytované knihovnou ØMQ
    https://www.root.cz/clanky/dalsi-moznosti-poskytovane-knihovnou-mq/
  8. Další možnosti nabízené knihovnou ØMQ, implementace protokolů ØMQ v čisté Javě
    https://www.root.cz/clanky/dalsi-moznosti-nabizene-knihovnou-mq-implementace-protokolu-mq-v-ciste-jave/
  9. Apache ActiveMQ – další systém implementující message brokera
    https://www.root.cz/clanky/apache-activemq-dalsi-system-implementujici-message-brokera/
  10. Použití Apache ActiveMQ s protokolem STOMP
    https://www.root.cz/clanky/pouziti-apache-activemq-s-protokolem-stomp/
  11. Použití Apache ActiveMQ s protokolem AMQP, jazyk Go a message brokeři
    https://www.root.cz/clanky/pouziti-apache-activemq-s-protokolem-amqp-jazyk-go-a-message-brokeri/

20. Odkazy na Internetu

  1. NATS
    https://nats.io/about/
  2. NATS Streaming Concepts
    https://nats.io/documenta­tion/streaming/nats-streaming-intro/
  3. NATS Streaming Server
    https://nats.io/download/nats-io/nats-streaming-server/
  4. NATS Introduction
    https://nats.io/documentation/
  5. NATS Client Protocol
    https://nats.io/documenta­tion/internals/nats-protocol/
  6. NATS Messaging (Wikipedia)
    https://en.wikipedia.org/wi­ki/NATS_Messaging
  7. Stránka Apache Software Foundation
    http://www.apache.org/
  8. Informace o portu 5672
    http://www.tcp-udp-ports.com/port-5672.htm
  9. Třída MessagingHandler knihovny Qpid Proton
    https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/pro­ton._handlers.MessagingHan­dler-class.html
  10. Třída Event knihovny Qpid Proton
    https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/pro­ton._events.Event-class.html
  11. package stomp (Go)
    https://godoc.org/github.com/go-stomp/stomp
  12. Go language library for STOMP protocol
    https://github.com/go-stomp/stomp
  13. python-qpid-proton 0.26.0 na PyPi
    https://pypi.org/project/python-qpid-proton/
  14. Qpid Proton
    http://qpid.apache.org/proton/
  15. Using the AMQ Python Client
    https://access.redhat.com/do­cumentation/en-us/red_hat_amq/7.1/html-single/using_the_amq_python_client/
  16. Apache ActiveMQ
    http://activemq.apache.org/
  17. Apache ActiveMQ Artemis
    https://activemq.apache.org/artemis/
  18. Apache ActiveMQ Artemis User Manual
    https://activemq.apache.or­g/artemis/docs/latest/index­.html
  19. KahaDB
    http://activemq.apache.or­g/kahadb.html
  20. Understanding the KahaDB Message Store
    https://access.redhat.com/do­cumentation/en-US/Fuse_MQ_Enterprise/7.1/html/Con­figuring_Broker_Persisten­ce/files/KahaDBOverview.html
  21. Command Line Tools (Apache ActiveMQ)
    https://activemq.apache.org/activemq-command-line-tools-reference.html
  22. stomp.py 4.1.21 na PyPi
    https://pypi.org/project/stomp.py/
  23. Stomp Tutorial
    https://access.redhat.com/do­cumentation/en-US/Fuse_Message_Broker/5.5/html/Con­nectivity_Guide/files/FMBCon­nectivityStompTelnet.html
  24. Heartbeat (computing)
    https://en.wikipedia.org/wi­ki/Heartbeat_(computing)
  25. Apache Camel
    https://camel.apache.org/
  26. Red Hat Fuse
    https://developers.redhat­.com/products/fuse/overvi­ew/
  27. Confusion between ActiveMQ and ActiveMQ-Artemis?
    https://serverfault.com/qu­estions/873533/confusion-between-activemq-and-activemq-artemis
  28. Staré stránky projektu HornetQ
    http://hornetq.jboss.org/
  29. Snapshot JeroMQ verze 0.4.4
    https://oss.sonatype.org/con­tent/repositories/snapshot­s/org/zeromq/jeromq/0.4.4-SNAPSHOT/
  30. Difference between ActiveMQ vs Apache ActiveMQ Artemis
    http://activemq.2283324.n4­.nabble.com/Difference-between-ActiveMQ-vs-Apache-ActiveMQ-Artemis-td4703828.html
  31. Microservices communications. Why you should switch to message queues
    https://dev.to/matteojoli­veau/microservices-communications-why-you-should-switch-to-message-queues–48ia
  32. Stomp.py 4.1.19 documentation
    https://stomppy.readthedoc­s.io/en/stable/
  33. Repositář knihovny JeroMQ
    https://github.com/zeromq/jeromq/
  34. ØMQ – Distributed Messaging
    http://zeromq.org/
  35. ØMQ Community
    http://zeromq.org/community
  36. Get The Software
    http://zeromq.org/intro:get-the-software
  37. PyZMQ Documentation
    https://pyzmq.readthedocs­.io/en/latest/
  38. Module: zmq.decorators
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.deco­rators.html
  39. ZeroMQ is the answer, by Ian Barber
    https://vimeo.com/20605470
  40. ZeroMQ RFC
    https://rfc.zeromq.org/
  41. ZeroMQ and Clojure, a brief introduction
    https://antoniogarrote.wor­dpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/
  42. zeromq/czmq
    https://github.com/zeromq/czmq
  43. golang wrapper for CZMQ
    https://github.com/zeromq/goczmq
  44. ZeroMQ version reporting in Python
    http://zguide.zeromq.org/py:version
  45. A Go interface to ZeroMQ version 4
    https://github.com/pebbe/zmq4
  46. Broker vs. Brokerless
    http://zeromq.org/whitepa­pers:brokerless
  47. Learning ØMQ with pyzmq
    https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/
  48. Céčková funkce zmq_ctx_new
    http://api.zeromq.org/4–2:zmq-ctx-new
  49. Céčková funkce zmq_ctx_destroy
    http://api.zeromq.org/4–2:zmq-ctx-destroy
  50. Céčková funkce zmq_bind
    http://api.zeromq.org/4–2:zmq-bind
  51. Céčková funkce zmq_unbind
    http://api.zeromq.org/4–2:zmq-unbind
  52. Céčková C funkce zmq_connect
    http://api.zeromq.org/4–2:zmq-connect
  53. Céčková C funkce zmq_disconnect
    http://api.zeromq.org/4–2:zmq-disconnect
  54. Céčková C funkce zmq_send
    http://api.zeromq.org/4–2:zmq-send
  55. Céčková C funkce zmq_recv
    http://api.zeromq.org/4–2:zmq-recv
  56. Třída Context (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#con­text
  57. Třída Socket (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#soc­ket
  58. Python binding
    http://zeromq.org/bindings:python
  59. Why should I have written ZeroMQ in C, not C++ (part I)
    http://250bpm.com/blog:4
  60. Why should I have written ZeroMQ in C, not C++ (part II)
    http://250bpm.com/blog:8
  61. About Nanomsg
    https://nanomsg.org/
  62. Advanced Message Queuing Protocol
    https://www.amqp.org/
  63. Advanced Message Queuing Protocol na Wikipedii
    https://en.wikipedia.org/wi­ki/Advanced_Message_Queuin­g_Protocol
  64. Dokumentace k příkazu rabbitmqctl
    https://www.rabbitmq.com/rab­bitmqctl.8.html
  65. RabbitMQ
    https://www.rabbitmq.com/
  66. RabbitMQ Tutorials
    https://www.rabbitmq.com/get­started.html
  67. RabbitMQ: Clients and Developer Tools
    https://www.rabbitmq.com/dev­tools.html
  68. RabbitMQ na Wikipedii
    https://en.wikipedia.org/wi­ki/RabbitMQ
  69. Streaming Text Oriented Messaging Protocol
    https://en.wikipedia.org/wi­ki/Streaming_Text_Oriented_Mes­saging_Protocol
  70. Message Queuing Telemetry Transport
    https://en.wikipedia.org/wiki/MQTT
  71. Erlang
    http://www.erlang.org/
  72. pika 0.12.0 na PyPi
    https://pypi.org/project/pika/
  73. Introduction to Pika
    https://pika.readthedocs.i­o/en/stable/
  74. Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
    http://clojurerabbitmq.info/
  75. AMQP 0–9–1 Model Explained
    http://www.rabbitmq.com/tutorials/amqp-concepts.html
  76. Part 1: RabbitMQ for beginners – What is RabbitMQ?
    https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html
  77. Downloading and Installing RabbitMQ
    https://www.rabbitmq.com/dow­nload.html
  78. celery na PyPi
    https://pypi.org/project/celery/
  79. Databáze Redis (nejenom) pro vývojáře používající Python
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/
  80. Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/
  81. Redis Queue (RQ)
    https://www.fullstackpython.com/redis-queue-rq.html
  82. Python Celery & RabbitMQ Tutorial
    https://tests4geeks.com/python-celery-rabbitmq-tutorial/
  83. Flower: Real-time Celery web-monitor
    http://docs.celeryproject­.org/en/latest/userguide/mo­nitoring.html#flower-real-time-celery-web-monitor
  84. Asynchronous Tasks With Django and Celery
    https://realpython.com/asynchronous-tasks-with-django-and-celery/
  85. First Steps with Celery
    http://docs.celeryproject­.org/en/latest/getting-started/first-steps-with-celery.html
  86. node-celery
    https://github.com/mher/node-celery
  87. Full Stack Python: web development
    https://www.fullstackpython.com/web-development.html
  88. Introducing RQ
    https://nvie.com/posts/introducing-rq/
  89. Asynchronous Tasks with Flask and Redis Queue
    https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue
  90. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  91. Stránky projektu Redis
    https://redis.io/
  92. Introduction to Redis
    https://redis.io/topics/introduction
  93. Try Redis
    http://try.redis.io/
  94. Redis tutorial, April 2010 (starší, ale pěkně udělaný)
    https://static.simonwilli­son.net/static/2010/redis-tutorial/
  95. Python Redis
    https://redislabs.com/lp/python-redis/
  96. Redis: key-value databáze v paměti i na disku
    https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/
  97. Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
    http://www.cloudsvet.cz/?p=253
  98. Praktický úvod do Redis (2): transakce
    http://www.cloudsvet.cz/?p=256
  99. Praktický úvod do Redis (3): cluster
    http://www.cloudsvet.cz/?p=258
  100. Connection pool
    https://en.wikipedia.org/wi­ki/Connection_pool
  101. Instant Redis Sentinel Setup
    https://github.com/ServiceStack/redis-config
  102. How to install REDIS in LInux
    https://linuxtechlab.com/how-install-redis-server-linux/
  103. Redis RDB Dump File Format
    https://github.com/sripat­hikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format
  104. Lempel–Ziv–Welch
    https://en.wikipedia.org/wi­ki/Lempel%E2%80%93Ziv%E2%80%93­Welch
  105. Redis Persistence
    https://redis.io/topics/persistence
  106. Redis persistence demystified
    http://oldblog.antirez.com/post/redis-persistence-demystified.html
  107. Redis reliable queues with Lua scripting
    http://oldblog.antirez.com/post/250
  108. Ost (knihovna)
    https://github.com/soveran/ost
  109. NoSQL
    https://en.wikipedia.org/wiki/NoSQL
  110. Shard (database architecture)
    https://en.wikipedia.org/wi­ki/Shard_%28database_archi­tecture%29
  111. What is sharding and why is it important?
    https://stackoverflow.com/qu­estions/992988/what-is-sharding-and-why-is-it-important
  112. What Is Sharding?
    https://btcmanager.com/what-sharding/
  113. Redis clients
    https://redis.io/clients
  114. Category:Lua-scriptable software
    https://en.wikipedia.org/wi­ki/Category:Lua-scriptable_software
  115. Seriál Programovací jazyk Lua
    https://www.root.cz/seria­ly/programovaci-jazyk-lua/
  116. Redis memory usage
    http://nosql.mypopescu.com/pos­t/1010844204/redis-memory-usage
  117. Ukázka konfigurace Redisu pro lokální testování
    https://github.com/tisnik/pre­sentations/blob/master/re­dis/redis.conf
  118. Resque
    https://github.com/resque/resque
  119. Nested transaction
    https://en.wikipedia.org/wi­ki/Nested_transaction
  120. Publish–subscribe pattern
    https://en.wikipedia.org/wi­ki/Publish%E2%80%93subscri­be_pattern
  121. Messaging pattern
    https://en.wikipedia.org/wi­ki/Messaging_pattern
  122. Using pipelining to speedup Redis queries
    https://redis.io/topics/pipelining
  123. Pub/Sub
    https://redis.io/topics/pubsub
  124. ZeroMQ distributed messaging
    http://zeromq.org/
  125. ZeroMQ: Modern & Fast Networking Stack
    https://www.igvita.com/2010/09/03/ze­romq-modern-fast-networking-stack/
  126. Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
    https://stackoverflow.com/qu­estions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr
  127. Python & Redis PUB/SUB
    https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7
  128. Message broker
    https://en.wikipedia.org/wi­ki/Message_broker
  129. RESP Arrays
    https://redis.io/topics/protocol#array-reply
  130. Redis Protocol specification
    https://redis.io/topics/protocol
  131. Redis Pub/Sub: Intro Guide
    https://www.redisgreen.net/blog/pubsub-intro/
  132. Redis Pub/Sub: Howto Guide
    https://www.redisgreen.net/blog/pubsub-howto/
  133. Comparing Publish-Subscribe Messaging and Message Queuing
    https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message
  134. Apache Kafka
    https://kafka.apache.org/
  135. Iron
    http://www.iron.io/mq
  136. kue (založeno na Redisu, určeno pro node.js)
    https://github.com/Automattic/kue
  137. Cloud Pub/Sub
    https://cloud.google.com/pubsub/
  138. Introduction to Redis Streams
    https://redis.io/topics/streams-intro
  139. glob (programming)
    https://en.wikipedia.org/wi­ki/Glob_(programming)
  140. Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
    https://www.slideshare.net/syl­vinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2
  141. Enqueueing internals
    http://python-rq.org/contrib/
  142. queue — A synchronized queue class
    https://docs.python.org/3/li­brary/queue.html
  143. Queue – A thread-safe FIFO implementation
    https://pymotw.com/2/Queue/
  144. Queues
    http://queues.io/
  145. Windows Subsystem for Linux Documentation
    https://docs.microsoft.com/en-us/windows/wsl/about
  146. RestMQ
    http://restmq.com/
  147. ActiveMQ
    http://activemq.apache.org/
  148. Amazon MQ
    https://aws.amazon.com/amazon-mq/
  149. Amazon Simple Queue Service
    https://aws.amazon.com/sqs/
  150. Celery: Distributed Task Queue
    http://www.celeryproject.org/
  151. Disque, an in-memory, distributed job queue
    https://github.com/antirez/disque
  152. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  153. Projekt RQ na PyPi
    https://pypi.org/project/rq/
  154. rq-dashboard 0.3.12
    https://pypi.org/project/rq-dashboard/
  155. Job queue
    https://en.wikipedia.org/wi­ki/Job_queue
  156. Why we moved from Celery to RQ
    https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq
  157. Running multiple workers using Celery
    https://serverfault.com/qu­estions/655387/running-multiple-workers-using-celery
  158. celery — Distributed processing
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.html
  159. Chains
    https://celery.readthedoc­s.io/en/latest/userguide/can­vas.html#chains
  160. Routing
    http://docs.celeryproject­.org/en/latest/userguide/rou­ting.html#automatic-routing
  161. Celery Distributed Task Queue in Go
    https://github.com/gocelery/gocelery/
  162. Python Decorators
    https://wiki.python.org/mo­in/PythonDecorators
  163. Periodic Tasks
    http://docs.celeryproject­.org/en/latest/userguide/pe­riodic-tasks.html
  164. celery.schedules
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.schedules.html#celery­.schedules.crontab
  165. Pros and cons to use Celery vs. RQ
    https://stackoverflow.com/qu­estions/13440875/pros-and-cons-to-use-celery-vs-rq
  166. Priority queue
    https://en.wikipedia.org/wi­ki/Priority_queue
  167. Jupyter
    https://jupyter.org/
  168. How IPython and Jupyter Notebook work
    https://jupyter.readthedoc­s.io/en/latest/architectu­re/how_jupyter_ipython_wor­k.html
  169. Context Managers
    http://book.pythontips.com/en/la­test/context_managers.html

Autor článku

Vystudoval VUT FIT a v současné době pracuje na projektech vytvářených v jazycích Python a Go.