Obsah
1. Komunikace s message brokery z programovacího jazyka Go
2. Implementace producenta a konzumenta zpráv v jazyce Go s využitím balíčku stomp
3. Komunikace klientů s message brokerem ve vlastní gorutině
4. Producent i konzument jako součást jediné aplikace se třemi paralelně běžícími gorutinami
6. Instalace a nastavení Redisu i rmq
7. Jednoduchý producent posílající textové zprávy
8. Konzument zpracovávající jednoduché textové zprávy
9. Marshalling a unmarshalling datových struktur do formátu JSON pro posílání složitějších zpráv
10. Úprava producenta takovým způsobem, aby posílal zprávy ve formátu JSON
11. Konzument akceptující zprávy ve formátu JSON
13. Vytvoření a poslání binární zprávy
15. Systém NATS aneb implementace systému pro doručování zpráv v jazyku Go
16. Komunikační strategie a protokol použitý klienty systému NATS
18. Repositář s demonstračními příklady
19. Odkazy na předchozí části seriálu
1. Komunikace s message brokery z programovacího jazyka Go
V předchozích částech seriálu o frontách zpráv a o message brokerech byla většina demonstračních příkladů, s nimiž jsme se seznámili, naprogramována v Pythonu. Výběr tohoto programovacího jazyka samozřejmě nebyl v žádném případě náhodný, protože demonstrační příklady (ale i reálné aplikace) vytvořené v Pythonu jsou v naprosté většině případů krátké a dostatečně přehledné. To je umožněno jak vlastnostmi samotného Pythonu (úsporný zápis zdrojového kódu a dynamický typový systém), tak i knihovnami zabezpečujícími rozhraní mezi Pythonem (resp. přesněji řečeno aplikacemi naprogramovanými v Pythonu) a samotným message brokerem. Jen pro připomenutí jsou v následující tabulce vypsány knihovny určené pro Python, které jsme až doposud používali:
Message broker/protokol | Knihovna |
---|---|
Redis Queue (RQ) | rq |
RabbitMQ | pika |
Celery | celery |
Apache ActiveMQ | stomp.py, python-qpid-proton |
S message brokery je pochopitelně možné komunikovat i z aplikací vytvořených v jiných programovacích jazycích (ostatně viděli jsme jeden příklad naprogramovaný v jazyce Clojure). V enterprise sféře se bude v první řadě jednat o Javu, ovšem zapomenout nesmíme ani na jazyk, který se stává populární v oblasti síťových aplikací a utilit. Jedná se o programovací jazyk Go, jímž se podrobněji zabýváme v samostatném seriálu. Jazyk Go může být ve skutečnosti pro komunikaci s message brokery velmi vhodnou alternativou, protože umožňuje díky podpoře kanálů a gorutin velmi snadnou tvorbu asynchronně běžících funkcí. V dnešním článku se s některými možnostmi nabízenými v této oblasti programovacím jazykem Go alespoň ve stručnosti seznámíme.
V závěrečné části článku si řekneme základní informace o systému pojmenovaném NATS, který je naprogramován právě v jazyce Go a který nabízí uživatelům robustní a přitom systémově nenáročnou implementaci message brokera, který podporuje i takzvaný streaming. Klienty je díky existujícím rozhraním samozřejmě možné psát i v jiných jazycích.
2. Implementace producenta a konzumenta zpráv v jazyce Go s využitím balíčku stomp
V samotném závěru předchozího článku jsme si ukázali, jakým způsobem je možné naprogramovat klienty (zdroje zpráv i jejich příjemce), které komunikují s message brokerem Apache ActiveMQ s využitím jednoduchého protokolu STOMP. Připomeňme si, že jednoduchého producenta zpráv můžeme implementovat následujícím způsobem:
package main import ( "fmt" "github.com/go-stomp/stomp" ) const serverAddr = "localhost:61613" const queueName = "/queue/go_test" const messageCount = 10 func sendMessages() { conn, err := stomp.Dial("tcp", serverAddr, nil) if err != nil { println("cannot connect to server", err.Error()) return } else { println("connected to server", serverAddr) } defer conn.Disconnect() for i := 1; i <= messageCount; i++ { text := fmt.Sprintf("Message #%d", i) err = conn.Send(queueName, "text/plain", []byte(text), nil) if err != nil { println("failed to send to server", err) return } else { println("message sent") } } println("sending EXIT message") err = conn.Send(queueName, "text/plain", []byte("EXIT"), nil) if err != nil { println("failed to send EXIT message to server", err) return } else { println("message sent") } println("sender finished") } func main() { sendMessages() }
Zdrojový kód klienta obsahuje následující části:
- Připojení k message brokeru přes funkci stomp.Dial()
- Poslání zprávy metodou conn.Send()
- Odpojení klienta metodou conn.Disconnect()
Konzument zpráv bude implementován podobným způsobem, ostatně se podívejme na jeho úplný zdrojový kód:
package main import ( "github.com/go-stomp/stomp" ) const serverAddr = "localhost:61613" const queueName = "/queue/go_test" func receiveMessages() { conn, err := stomp.Dial("tcp", serverAddr, nil) if err != nil { println("cannot connect to server", err.Error()) return } else { println("connected to server", serverAddr) } defer conn.Disconnect() sub, err := conn.Subscribe(queueName, stomp.AckAuto) if err != nil { println("cannot subscribe to", queueName, err.Error()) return } for { msg := <-sub.C text := string(msg.Body) if text != "EXIT" { println("Received message", text) } else { println("Received EXIT command") break } } println("receiver finished") } func main() { receiveMessages() }
Konzument obsahuje tyto části:
- Připojení k message brokeru přes funkci stomp.Dial()
- Přihlášení k odběru zpráv metodou conn.Subscribe()
- Příjem zprávy/zpráv z automaticky vytvořeného kanálu
- Odpojení klienta metodou conn.Disconnect()
3. Komunikace klientů s message brokerem ve vlastní gorutině
Mnohdy se setkáme s požadavkem na to, aby klient (ať již producent zpráv či jejich konzument) komunikovat s message brokerem v samostatně běžící gorutině, která by pracovala paralelně s dalšími gorutinami. Tento požadavek, který dává smysl zejména u webových služeb, je relativně snadné implementovat:
package main import ( "fmt" "github.com/go-stomp/stomp" ) const serverAddr = "localhost:61613" const queueName = "/queue/go_test" const messageCount = 10 var stop = make(chan bool) func sendMessages() { defer func() { stop <- true }() conn, err := stomp.Dial("tcp", serverAddr, nil) if err != nil { println("cannot connect to server", err.Error()) return } else { println("connected to server", serverAddr) } defer conn.Disconnect() for i := 1; i <= messageCount; i++ { text := fmt.Sprintf("Message #%d", i) err = conn.Send(queueName, "text/plain", []byte(text), nil) if err != nil { println("failed to send to server", err) return } else { println("message sent") } } println("sending EXIT message") err = conn.Send(queueName, "text/plain", []byte("EXIT"), nil) if err != nil { println("failed to send EXIT message to server", err) return } else { println("message sent") } println("sender finished") } func main() { go sendMessages() <-stop }
Prakticky stejným způsobem se upraví i příjemce zpráv:
package main import ( "github.com/go-stomp/stomp" ) const serverAddr = "localhost:61613" const queueName = "/queue/go_test" var stop = make(chan bool) func receiveMessages(subscribed chan bool) { defer func() { stop <- true }() conn, err := stomp.Dial("tcp", serverAddr, nil) if err != nil { println("cannot connect to server", err.Error()) return } else { println("connected to server", serverAddr) } defer conn.Disconnect() sub, err := conn.Subscribe(queueName, stomp.AckAuto) if err != nil { println("cannot subscribe to", queueName, err.Error()) return } close(subscribed) for { msg := <-sub.C text := string(msg.Body) if text != "EXIT" { println("Received message", text) } else { println("Received EXIT command") break } } println("receiver finished") } func main() { subscribed := make(chan bool) go receiveMessages(subscribed) <-subscribed <-stop }
4. Producent i konzument jako součást jediné aplikace se třemi paralelně běžícími gorutinami
Pro zajímavost se podívejme na způsob implementace producenta i konzumenta zpráv v jediné aplikaci, ve které se tedy používají tři gorutiny – hlavní gorutina, gorutina producenta zpráv a gorutina jejich konzumenta:
package main import ( "fmt" "github.com/go-stomp/stomp" "time" ) const serverAddr = "localhost:61613" const queueName = "/queue/go_test" const messageCount = 10 var stop = make(chan bool) func sendMessages() { defer func() { stop <- true }() conn, err := stomp.Dial("tcp", serverAddr, nil) if err != nil { println("Cannot connect to server", err.Error()) return } else { println("Publisher part connected to server", serverAddr) } defer conn.Disconnect() time.Sleep(5 * time.Second) for i := 1; i <= messageCount; i++ { text := fmt.Sprintf("Message #%d", i) err = conn.Send(queueName, "text/plain", []byte(text), nil) if err != nil { println("Failed to send to server", err) return } else { println("Message sent") } } println("Sending EXIT message") err = conn.Send(queueName, "text/plain", []byte("EXIT"), nil) if err != nil { println("Failed to send EXIT message to server", err) return } else { println("Message sent") } println("Publisher finished") } func receiveMessages(subscribed chan bool) { defer func() { stop <- true }() conn, err := stomp.Dial("tcp", serverAddr, nil) if err != nil { println("Cannot connect to server", err.Error()) return } else { println("Subscriber part connected to server", serverAddr) } defer conn.Disconnect() time.Sleep(5 * time.Second) sub, err := conn.Subscribe(queueName, stomp.AckAuto) if err != nil { println("Cannot subscribe to", queueName, err.Error()) return } close(subscribed) for { msg := <-sub.C text := string(msg.Body) if text != "EXIT" { println("Received message", text) } else { println("Received EXIT command") break } } println("Subscriber finished") } func main() { go sendMessages() subscribed := make(chan bool) go receiveMessages(subscribed) <-subscribed <-stop <-stop }
Na tomto místě se můžete oprávněně zeptat, proč vlastně pro komunikaci mezi několika gorutinami, které jsou součástí jediné aplikace, vlastně používáme message brokera se všemi komplikacemi, které toto řešení přináší. Můžeme totiž namísto toho vytvořit běžný komunikační kanál, nastavit mu vysokou kapacitu bufferu a používat přímo operátor ← pro přidání hodnoty (zprávy) do kanálu i pro vyjmutí a zpracování této hodnoty (zprávy) na straně druhé. To je samozřejmě pravda a toto řešení bude plně funkční a zajistí i dobrou výkonnost. Ovšem ve chvíli, kdy je zapotřebí zaručit, že zpráva bude skutečně doručena, i když dojde například k pádu aplikace, již může být použití message brokera užitečným řešením. A pokud se navíc přidá i požadavek na zajištění persistence zpráv, je message broker (například běžící pouze lokálně) nejenom vhodné, ale pro tuto část architektury celé aplikace vlastně i idiomatické řešení (idiomy se totiž netýkají jen programových konstrukcí, ale i celé architektury).
package main import ( "fmt" "github.com/go-stomp/stomp" "time" ) const serverAddr = "localhost:61613" const queueName = "/queue/go_test" const messageCount = 10 var stopProducer = make(chan bool) var stopConsumer = make(chan bool) func sendMessages(stop chan bool) { defer func() { stop <- true }() conn, err := stomp.Dial("tcp", serverAddr, nil) if err != nil { println("Cannot connect to server", err.Error()) return } else { println("Publisher part connected to server", serverAddr) } defer conn.Disconnect() time.Sleep(5 * time.Second) for i := 1; i <= messageCount; i++ { text := fmt.Sprintf("Message #%d", i) err = conn.Send(queueName, "text/plain", []byte(text), nil) if err != nil { println("Failed to send to server", err) return } else { println("Message sent") } } println("Sending EXIT message") err = conn.Send(queueName, "text/plain", []byte("EXIT"), nil) if err != nil { println("Failed to send EXIT message to server", err) return } else { println("Message sent") } println("Publisher finished") } func receiveMessages(subscribed chan bool, stop chan bool) { defer func() { stop <- true }() conn, err := stomp.Dial("tcp", serverAddr, nil) if err != nil { println("Cannot connect to server", err.Error()) return } else { println("Subscriber part connected to server", serverAddr) } defer conn.Disconnect() time.Sleep(5 * time.Second) sub, err := conn.Subscribe(queueName, stomp.AckAuto) if err != nil { println("Cannot subscribe to", queueName, err.Error()) return } close(subscribed) for { msg := <-sub.C text := string(msg.Body) if text != "EXIT" { println("Received message", text) } else { println("Received EXIT command") break } } println("Subscriber finished") } func main() { go sendMessages(stopProducer) subscribed := make(chan bool) go receiveMessages(subscribed, stopConsumer) <-subscribed <-stopProducer <-stopConsumer }
5. Jedna z možných implementací message brokera naprogramovaná v nativním Go a používající Redis jako storage
Hned v úvodní části seriálu o message brokerech jsme se zabývali popisem nástroje nazvaného Redis Queue, jenž je postaven – jak již ostatně jeho název velmi dobře napovídá – na databázi Redis [1] [2], která se používá pro ukládání jednotlivých zpráv do front. Podobný koncept používá i další dnes popisovaná knihovna, která se jmenuje rmq a nalezneme ji na GitHubu na adrese https://github.com/adjust/rmq.
Jedná se o balíček určený pro vývojáře využívající programovací jazyk Go (ostatně i samotné rmq je vytvořeno v čistém Go), kteří potřebují vytvářet klienty připojované k message brokerovi, přičemž cílem autorů rmq je dosáhnout toho, aby kód vytvořený v klientech byl co nejkratší a navíc i snadno pochopitelný. Samotný message broker je opět tvořen databází Redis a balíček rmq „pouze“ zabezpečuje posílání, výběr a ukládání zpráv do Redisu.
6. Instalace a nastavení Redisu i rmq
Nejprve si řekněme, jak se vlastně rmq instaluje.
V prvním kroku musíme nainstalovat a spustit samotný Redis, což je snadné, protože je balíček s Redisem součástí většiny Linuxových distribucí. Například na Fedoře může instalace vypadat následovně:
$ sudo dnf install redis Last metadata expiration check: 0:15:30 ago on Wed 24 Oct 2018, 22:50:11 CEST. Dependencies resolved. ================================================================================ Package Arch Version Repository Size ================================================================================ Installing: redis x86_64 4.0.9-1.fc27 updates 580 k Installing dependencies: jemalloc x86_64 4.5.0-5.fc27 updates 210 k Transaction Summary ================================================================================ Install 2 Packages Total download size: 790 k Installed size: 2.0 M Is this ok [y/N]:
Na systémech založených na Debianu (včetně Ubuntu) lze pro instalaci použít příkaz:
$ apt-get install redis-server
V případě, že budete potřebovat použít nejnovější verzi Redisu, můžete si ho sami přeložit. Postup je jednoduchý (mj. i díky minimálním závislostem na dalších knihovnách) a je podrobně popsán na stránce https://redis.io/topics/quickstart.
Pro vlastní databázi, konfigurační soubor, žurnál a logy Redisu použijeme samostatný adresář, který vytvoříme v domácím adresáři připojeného uživatele:
$ mkdir redis $ cd redis
Po instalaci se můžeme přesvědčit, že je skutečně k dispozici spustitelný soubor s implementací serveru i řádkového klienta:
$ whereis -b redis-cli redis-cli: /usr/bin/redis-cli
$ whereis -b redis-server redis-server: /usr/bin/redis-server
Následně přímo v tomto adresáři vytvoříme konfigurační soubor nazvaný redis.conf. Můžeme se přitom inspirovat souborem /etc/redis/redis.conf (Debian a systémy od něj odvozené), popř. /etc/redis.conf (Fedora, RHEL, CentOS), který je však poměrně rozsáhlý, protože kromě vlastních konfiguračních voleb obsahuje i podrobné informace o významu jednotlivých konfiguračních voleb. Tento soubor je taktéž dostupný na internetu na adrese https://raw.githubusercontent.com/antirez/redis/4.0/redis.conf.
Následuje výpis obsahu konfiguračního souboru, který je připraven pro lokální spuštění Redisu, bez nebezpečí, že se k běžícímu serveru připojí případný útočník. Důležité volby jsou zvýrazněny. Pokud se vám soubor nechce kopírovat, naleznete ho na adrese https://github.com/tisnik/presentations/blob/master/redis/redis.conf:
bind 127.0.0.1 protected-mode yes port 6379 tcp-backlog 511 timeout 0 tcp-keepalive 300 daemonize no supervised no pidfile /var/run/redis_6379.pid loglevel notice logfile redis.log databases 16 always-show-logo yes save 900 1 save 300 10 save 60 10000 stop-writes-on-bgsave-error yes rdbcompression yes rdbchecksum yes dbfilename dump.rdb dir . slave-serve-stale-data yes slave-read-only yes repl-diskless-sync no repl-diskless-sync-delay 5 repl-disable-tcp-nodelay no slave-priority 100 lazyfree-lazy-eviction no lazyfree-lazy-expire no lazyfree-lazy-server-del no slave-lazy-flush no appendonly yes appendfilename "appendonly.aof" appendfsync everysec no-appendfsync-on-rewrite no auto-aof-rewrite-percentage 100 auto-aof-rewrite-min-size 64mb aof-load-truncated yes aof-use-rdb-preamble no lua-time-limit 5000 slowlog-log-slower-than 10000 slowlog-max-len 128 latency-monitor-threshold 0 notify-keyspace-events "" hash-max-ziplist-entries 512 hash-max-ziplist-value 64 list-max-ziplist-size -2 list-compress-depth 0 set-max-intset-entries 512 zset-max-ziplist-entries 128 zset-max-ziplist-value 64 hll-sparse-max-bytes 3000 activerehashing yes client-output-buffer-limit normal 0 0 0 client-output-buffer-limit slave 256mb 64mb 60 client-output-buffer-limit pubsub 32mb 8mb 60 hz 10 aof-rewrite-incremental-fsync yes
Nyní již můžeme databázi Redis spustit, aniž by došlo k tomu, že bude její API viditelné ostatním počítačům připojeným do sítě:
$ redis-server redis.conf
Na druhém terminálu pak již můžeme (čistě pro otestování) spustit klienta Redisu, který uživatelům nabízí interaktivní příkazový řádek:
$ redis-cli 127.0.0.1:6379>
Příkazem „ping“ můžeme otestovat, jestli se klient připojí k serveru a zda od něj dokáže získávat odpovědi:
127.0.0.1:6379> ping PONG 127.0.0.1:6379> ping test "test"
Nyní by měl být systém Redis připraven pro připojení klientů, kteří s využitím balíčku rmq využijí tuto databázi ve funkci storage message brokera. Nastává tedy čas pro instalaci balíčku rmq. Nejprve přejdeme do adresáře ~/go (ten již byl připraven v rámci instalace jazyka Go, kterou jsme si popsali minule):
$ cd ~/go
Dále v tomto adresáři zadáme následující příkaz:
$ go get https://github.com/adjust/rmq
V adresáři by se měla objevit následující struktura (pod)adresářů:
. ├── pkg │ └── linux_amd64 │ └── github.com │ └── adjust └── src └── github.com ├── adjust │ ├── rmq │ │ └── example │ │ ├── batch_consumer │ │ ├── cleaner │ │ ├── consumer │ │ ├── handler │ │ ├── producer │ │ ├── purger │ │ └── returner │ └── uniuri └── go-redis └── redis ├── internal │ ├── consistenthash │ ├── hashtag │ ├── pool │ ├── proto │ └── util └── testdata
7. Jednoduchý producent posílající textové zprávy
Při implementaci producenta jednoduchých textových zpráv využijeme především funkci pojmenovanou OpenConnection, která se pokusí o připojení k Redisu:
func OpenConnection(tag, network, address string, db int) *redisConnection
Připojení může být realizováno následovně:
connection := rmq.OpenConnection("test_service_producer", "tcp", "localhost:6379", 1)
Získáme tak datovou strukturu, která mj. implementuje rozhraní nazvané Connection:
type Connection interface { OpenQueue(name string) Queue CollectStats(queueList []string) Stats GetOpenQueues() []string }
Dnes nás bude zajímat jen jediná metoda z tohoto rozhraní, a to konkrétně metoda pro otevření (či vytvoření) fronty se zadaným jménem:
func (connection *redisConnection) OpenQueue(name string) Queue
Konkrétní příklad použití:
connection := rmq.OpenConnection("test_service_producer", "tcp", "localhost:6379", 1) taskQueue := connection.OpenQueue("task_queue")
Jakmile je fronta získána či vytvořena, můžeme použít její metodu Publish pro publikování nějaké zprávy, tj. pro její poslání do fronty:
func (queue *redisQueue) Publish(payload string) bool
Opět si ukažme praktické použití:
connection := rmq.OpenConnection("test_service_producer", "tcp", "localhost:6379", 1) taskQueue := connection.OpenQueue("task_queue") delivery := "task payload 1" taskQueue.Publish(delivery)
Celý zdrojový kód klienta je nakonec velmi krátký a přehledný:
package main import "github.com/adjust/rmq" func main() { connection := rmq.OpenConnection("test_service_producer", "tcp", "localhost:6379", 1) println("Connection object: ", connection) taskQueue := connection.OpenQueue("task_queue") println("Queue: ", taskQueue) delivery := "task payload 1" taskQueue.Publish(delivery) delivery = "task payload 2" taskQueue.Publish(delivery) }
Objekt | Stručný popis |
---|---|
rmq::connections | množina aktivních připojení |
rmq::queues | množina front |
8. Konzument zpracovávající jednoduché textové zprávy
Konzument zpráv je nepatrně složitější, protože se zde používá callback metoda nazvaná Consume. Po připojení a vytvoření fronty nám již známým způsobem:
connection := rmq.OpenConnection("test_service_consumer", "tcp", "localhost:6379", 1) taskQueue := connection.OpenQueue("task_queue")
je nutné se přihlásit k odběru zpráv a zaregistrovat objekt typu Consumer, který bude zprávy odebírat:
taskQueue.StartConsuming(10, time.Second) taskQueue.AddConsumer("consumer", NewConsumer())
Samotný odběratel zpráv musí implementovat toto rozhraní s jedinou metodou:
type Consumer interface { Consume(delivery Delivery) }
Přičemž Delivery je rozhraní implementované datovou strukturou se zprávou:
type Delivery interface { Payload() string Ack() bool Reject() bool Push() bool }
Důležité jsou metody pro potvrzení zprávy či naopak pro její nepotvrzení:
func (delivery *wrapDelivery) Ack() bool
func (delivery *wrapDelivery) Reject() bool
Ty využijeme v našem konzumentovi/příjemci zpráv:
func (consumer *Consumer) Consume(delivery rmq.Delivery) { println("consume begin") println(delivery.Payload()) delivery.Ack() println("consume end") }
Úplná implementace klienta může vypadat následovně:
package main import ( "github.com/adjust/rmq" "time" ) type Consumer struct { } func NewConsumer() *Consumer { return &Consumer{} } func (consumer *Consumer) Consume(delivery rmq.Delivery) { println("consume begin") println(delivery.Payload()) delivery.Ack() println("consume end") } func main() { connection := rmq.OpenConnection("test_service_consumer", "tcp", "localhost:6379", 1) println("Connection object: ", connection) taskQueue := connection.OpenQueue("task_queue") println("Queue: ", taskQueue) taskQueue.StartConsuming(10, time.Second) taskQueue.AddConsumer("consumer", NewConsumer()) select {} }
9. Marshalling a unmarshalling datových struktur do formátu JSON pro posílání složitějších zpráv
Často se setkáme i s požadavkem, aby byly zprávy posílány ve formátu JSON. Problematice převodu datových struktur z jazyka Go do JSONu (marshalling) a samozřejmě i zpětného převodu (unmarshalling) jsme se nedávno věnovali v samostatném článku Vývoj síťových aplikací v programovacím jazyku Go (práce s JSONem a rastrovými obrázky), takže jen ve stručnosti:
Pro převod libovolného typu (přesněji řečeno hodnoty libovolného typu) do JSONu se používá funkce nazvaná Marshal, kterou nalezneme v balíčku encoding/json:
func Marshal(v interface{}) ([]byte, error)
Opačná operace spočívá v importu dat z formátu JSON do interních datových struktur programovacího jazyka Go. Pro tuto operaci, která se nazývá unmarshalling, slouží následující funkce:
func Unmarshal(data []byte, v interface{}) error
Vstupem je v tomto případě pole (řez) bajtů, výstup je vrácen přes ukazatel předaný ve druhém parametru (což znamená, že se musíme sami postarat o případnou alokaci paměti pro strukturu či pro mapu). Samozřejmě, že při unmarshallingu může dojít k nějaké chybě, která je vrácena volající funkci. Pokud k chybě nedošlo, je návratová hodnota rovna nil.
10. Úprava producenta takovým způsobem, aby posílal zprávy ve formátu JSON
Vyzkoušejme si nyní upravit producenta zpráv tak, aby posílal obsah datové struktury nazvané TaskPayload ve formátu JSON. Strukturu nejdříve převedeme do JSONu a následně výsledek takzvaného „marshallingu“ pošleme do message brokera:
bytes, err := json.Marshal(payload) if err != nil { println(err) return } taskQueue.PublishBytes(bytes)
Následuje výpis úplného zdrojového kódu tohoto příkladu:
package main import ( "encoding/json" "github.com/adjust/rmq" ) type TaskPayload struct { Id int32 Name string Param1 int32 Param2 int32 } func SendTask(taskQueue rmq.Queue, payload TaskPayload) { bytes, err := json.Marshal(payload) if err != nil { println(err) return } taskQueue.PublishBytes(bytes) } func main() { connection := rmq.OpenConnection("test_service_producer", "tcp", "localhost:6379", 1) println("Connection object: ", connection) taskQueue := connection.OpenQueue("task_queue") println("Queue: ", taskQueue) SendTask(taskQueue, TaskPayload{1, "test1", 0, 0}) SendTask(taskQueue, TaskPayload{2, "test2", 6, 7}) }
11. Konzument akceptující zprávy ve formátu JSON
Konzument zpráv posílaných ve formátu JSON je složitější. Nejdříve totiž musíme získat obsah zprávy (payload), ten převést z JSONu do datové struktury (unmarshalling) a výsledek použít:
var task TaskPayload if err := json.Unmarshal([]byte(delivery.Payload()), &task); err != nil { delivery.Reject() return } println("performing task", task.Id, "name", task.Name, "with parameters", task.Param1, task.Param2) delivery.Ack()
Výsledný zdrojový kód může vypadat následovně:
package main import ( "encoding/json" "github.com/adjust/rmq" "time" ) type TaskPayload struct { Id int32 Name string Param1 int32 Param2 int32 } type Consumer struct { } func NewConsumer() *Consumer { return &Consumer{} } func (consumer *Consumer) Consume(delivery rmq.Delivery) { println("consume begin") println(delivery.Payload()) var task TaskPayload if err := json.Unmarshal([]byte(delivery.Payload()), &task); err != nil { delivery.Reject() return } println("performing task", task.Id, "name", task.Name, "with parameters", task.Param1, task.Param2) delivery.Ack() println("consume end") } func main() { connection := rmq.OpenConnection("test_service_consumer", "tcp", "localhost:6379", 1) println("Connection object: ", connection) taskQueue := connection.OpenQueue("task_queue") println("Queue: ", taskQueue) taskQueue.StartConsuming(10, time.Second) taskQueue.AddConsumer("consumer", NewConsumer()) select {} }
12. Přenos binárních zpráv
Mnohdy se setkáme s požadavkem, aby klienti do message brokera posílali binární zprávy, samozřejmě s tím, že po vyzvednutí bude mít zpráva naprosto stejný obsah, jako originální data (tj. nesmí dojít například k porušení obsahu nejvyšších bitů každého bajtu atd.).
Při psaní klientů naprogramovaných v Pythonu, kteří spolu komunikovali přes message broker Apache ActiveMQ, jsme se posílání binárních zpráv vyhnuli díky zakódování binárních dat do sekvence tisknutelných znaků, například s využitím Base64, který má tu výhodu, že algoritmus kódování/dekódování lze vytvořit velmi snadno (pokud již pro daný programovací jazyk neexistuje příslušná knihovna – většinou je již k dispozici). Nevýhodou tohoto řešení jsou delší zprávy, které musí být v message brokerovi uloženy (ve frontách) a – což je většinou kritičtější – pomalejší posílání či příjem zpráv – pokud se ovšem zpracovávají relativně krátké zprávy s menší frekvencí (desítky za sekundu pro jednoho producenta), nemělo by být zpoždění vůbec patrné.
Pro otestování posílání a příjmu zpráv s binárními daty vytvoříme producenta, který do vybrané fronty pošle data reprezentující rastrový obrázek ve formátu GIF. Samotný obrázek získáme snadno (jedná se o ikonu Vimu, v němž ostatně celý tento článek vznikl):
wget https://www.vim.org/images/vim_editor.gif
Jedná se o následující obrázek – ikonu:
Obrázek 1: Ikona použitá jako příklad binárních dat.
13. Vytvoření a poslání binární zprávy
Pro posílání textové zprávy jsme v předchozích dvou příkladech využívali metodu Publish:
func (queue *redisQueue) Publish(payload string) bool
U binárních dat to není vhodné, ovšem existuje i alternativní metoda nazvaná příznačně PublishBytes:
func (queue *redisQueue) PublishBytes(payload []byte) bool
Ve zdroji zpráv ještě uděláme jednu změnu – vytvoříme funkci, která načte obsah binárního souboru a ten následně pošle do zvolené fronty:
bytes, err := ioutil.ReadFile(filename) if err == nil { println("Read", len(bytes), "bytes") SendBinaryMessage(binaryMessagesQueue, bytes) println("Sent") } else { println("Error opening file", err) }
Úplný zdrojový kód producenta binární zprávy může vypadat následovně:
package main import ( "github.com/adjust/rmq" "io/ioutil" ) func SendBinaryMessage(binaryMessagesQueue rmq.Queue, binaryMessage []byte) { binaryMessagesQueue.PublishBytes(binaryMessage) } func SendFileContent(binaryMessagesQueue rmq.Queue, filename string) { bytes, err := ioutil.ReadFile(filename) if err == nil { println("Read", len(bytes), "bytes") SendBinaryMessage(binaryMessagesQueue, bytes) println("Sent") } else { println("Error opening file", err) } } func main() { connection := rmq.OpenConnection("binary_message_app", "tcp", "localhost:6379", 1) println("Connection object: ", connection) binaryMessagesQueue := connection.OpenQueue("binary_messages_queue") println("Queue: ", binaryMessagesQueue) SendFileContent(binaryMessagesQueue, "vim_editor.gif") }
14. Příjem binární zprávy
Příjemce binární zprávy bude muset být také upraven. Zprávu přijmeme běžným způsobem, ovšem samotný její obsah (payload) bude uložen do souboru s využitím funkce WriteFile z balíčku ioutil. Povšimněte si, že této funkci je nutné předat i příznaky s přístupovými právy k souboru:
const FileFlags = 0664 payload := delivery.Payload() println("Received binary message", len(payload), "bytes") err := ioutil.WriteFile("received.gif", []byte(payload), FileFlags) if err == nil { println("Written") } else { println(err) } delivery.Ack()
Opět si samozřejmě ukážeme úplný zdrojový kód tohoto příkladu:
package main import ( "github.com/adjust/rmq" "io/ioutil" "time" ) type Consumer struct{} func NewConsumer() *Consumer { return &Consumer{} } func (consumer *Consumer) Consume(delivery rmq.Delivery) { const FileFlags = 0664 println("consume begin") payload := delivery.Payload() println("Received binary message", len(payload), "bytes") err := ioutil.WriteFile("received.gif", []byte(payload), FileFlags) if err == nil { println("Written") } else { println(err) } delivery.Ack() println("consume end") } func main() { connection := rmq.OpenConnection("binary_messages_queue", "tcp", "localhost:6379", 1) println("Connection object: ", connection) binaryMessagesQueue := connection.OpenQueue("binary_messages_queue") println("Queue: ", binaryMessagesQueue) binaryMessagesQueue.StartConsuming(10, time.Second) binaryMessagesQueue.AddConsumer("consumer", NewConsumer()) select {} }
15. Systém NATS aneb implementace systému pro doručování zpráv v jazyku Go
V závěrečné části dnešního článku se zmíníme o systému pro doručování, ukládání a (re)distribuci zpráv, který se jmenuje NATS. Jedná se o poměrně úspěšný projekt, jenž je vyvinut právě v programovacím jazyku Go, což mu do jisté míry zajišťuje stabilitu i škálovatelnost. To jsou vlastnosti, které u message brokerů většinou očekáváme.
Původní varianta NATSu byla vytvořena Derekem Collisonem; zajímavé je, že tato varianta nebyla naprogramována v jazyce Go, ale v programovacím jazyce Ruby. Dnes se ovšem budeme zabývat moderní (a jedinou podporovanou) verzí systému NATS, která byla přeportována do jazyka Go. Celý systém NATS se skládá z několika komponent:
- V první řadě se jedná o samotný server, jenž se spouští příkazem gnatsd. Server je naprogramovaný v Go a při jeho vývoji bylo dbáno na to, aby byla zaručena vysoká dostupnost celé služby a přitom byla samotná služba s běžícím serverem málo náročná na systémové zdroje, především na spotřebu operační paměti (to má v době Dockeru a podobných nástrojů poměrně velký význam).
- Dalším typem komponenty jsou programátorská rozhraní pro klienty, která v současnosti existují pro několik ekosystémů (což je většinou kombinace programovacího jazyka, knihoven a popř. jeho virtuálního stroje); viz též tabulky s podporovanými ekosystémy, které jsou zobrazeny pod tímto odstavcem.
- Třetí komponentou je NATS Streaming Server, který je opět naprogramován v Go a který si popíšeme v sedmnácté kapitole.
- Čtvrtým typem komponenty je takzvaný NATS Connector Framework zajišťující propojení systému NATS s dalšími technologiemi (XMPP, logování, notifikační služby aj.). Ten je naprogramovaný v Javě a v současnosti je podporován například konektor pro Redis (https://github.com/nats-io/nats-connector-redis).
Oficiálně jsou podporována rozhraní pro následující ekosystémy:
# | Programovací jazyk/ekosystém |
---|---|
1 | C |
2 | C# |
3 | Elixir |
4 | Go |
5 | Java |
6 | NGINX |
7 | Node.js |
8 | Python Asyncio |
9 | Python Tornado |
10 | Ruby |
11 | TypeScript |
Existují však i další rozhraní, která sice nejsou podporována oficiálně, ale většinou jsou dostatečně stabilní na to, aby byla reálně použitelná:
# | Programovací jazyk/ekosystém |
---|---|
1 | .NET |
2 | Arduino |
3 | Clojure |
4 | Elm |
5 | Erlang |
6 | Haskell |
7 | Lua |
8 | MicroPython |
9 | PHP |
10 | Perl |
11 | Python |
12 | Python Twisted |
13 | Qt5 C++ |
14 | Rust |
15 | Scala |
16 | Spring API |
17 | Swift |
16. Komunikační strategie a protokol použitý klienty systému NATS
NATS svým uživatelům nabízí klasickou komunikační strategii typu pub-sub (zprávy jsou doručovány na základě nastaveného tématu – topicu či v jiné terminologii subjectu), s níž jsme se již seznámili v předchozích článcích. Ovšem kromě toho je možné využít i strategie request-reply (ta je implementačně nejsložitější, protože vyžaduje určitou koordinaci obou klientů se zapamatováním kontextu) a taktéž strategii typu push-pull (zde se právě využívají fronty zpráv). Strategie request-reply se dále rozděluje na dvě podkategorie podle toho, zda se zpráva doručuje jen jedinému příjemci (point-to-point) nebo více příjemcům (one-to-many). Ve chvíli, kdy klient pošle žádost (request), vytvoří se pro ni takzvaný inbox, který mohou příjemci použít po odeslání odpovědi. S příklady použití této zajímavé strategie se seznámíme v navazujícím článku.
Samotný komunikační protokol používaný klienty při přístupu k systému NATS je textový a je navržen takovým způsobem, aby bylo možné samotnou komunikaci naprogramovat relativně snadno, popř. si komunikaci otestovat přes telnet či podobný nástroj (což může být velmi vhodné při hledání chyb v celém systému; ostatně ti administrátoři, kteří museli ladit nějaký systém využívající binární protokol jistě budou souhlasit). V samotném protokolu se rozeznává jen několik typů příkazů, které jsou vypsány v další tabulce:
Příkaz | Posílá | Stručný popis |
---|---|---|
INFO | server | posláno klientovi po jeho připojení k serveru |
CONNECT | klient | žádost o připojení se specifikací jeho parametrů |
PUB | klient | publikace zprávy klientem (formát si popíšeme příště) |
SUB | klient | přihlášení klienta k odebírání určitého tématu |
UNSUB | klient | odhlášení klienta od odebírání určitého tématu |
MSG | server | poslání zprávy serverem odebírateli |
PING | klient i server | klasický systém ping-pong pro ověření, zda druhá strana (ještě) komunikuje |
PONG | klient i server | klasický systém ping-pong pro ověření, zda druhá strana (ještě) komunikuje |
+OK | server | potvrzení zprávy či příkazu (+ je součástí příkazu) |
-ERR | server | informace o tom, že poslaná zpráva nebo příkaz nemá správný formát (- je opět součástí příkazu) |
17. NATS Streaming
Jedná se o rozšíření klasického message brokera o další užitečné vlastnosti, zejména o konfigurovatelnou persistenci zpráv, dále o systém zaručující doručení zprávy (jak na message brokera, tak i ke klientovi – odběrateli zpráv) a především pak systémem určujícím maximální množství zpráv čekajících na doručení (publisher rate limit, rate limiting per subscriber). Nesmíme zapomenout ani na další poměrně unikátní vlastnost: možnost opětovného „přehrátí“ zpráv podle zadaných kritérií (ke zprávám je přiřazen určitý subject a taktéž časové razítko, takže pořadí zpráv je možné zaručit – ostatně právě proto se také používá termín „streaming“). Mimochodem: časové razítko má přesnost v řádu nanosekund.
Oficiálně podporovaní klienti komponenty NATS Streaming:
# | Jazyk/ekosystém |
---|---|
1 | C |
2 | C# |
3 | Go |
4 | Java |
5 | Node.js |
6 | Python Asyncio |
7 | Ruby |
Podrobněji si tento systém vysvětlíme příště.
18. Repositář s demonstračními příklady
Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v programovacím jazyku Go byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce.
19. Odkazy na předchozí části seriálu
V této kapitole jsou uvedeny odkazy na všech jedenáct předchozích částí seriálu, v němž se zabýváme různými způsoby implementace front zpráv a k nim přidružených technologií:
- Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí
https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/ - Celery: systém implementující asynchronní fronty úloh pro Python
https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python/ - Celery: systém implementující asynchronní fronty úloh pro Python (dokončení)
https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python-dokonceni/ - RabbitMQ: jedna z nejúspěšnějších implementací brokera
https://www.root.cz/clanky/rabbitmq-jedna-z-nejuspesnejsich-implementaci-brokera/ - Pokročilejší operace nabízené systémem RabbitMQ
https://www.root.cz/clanky/pokrocilejsi-operace-nabizene-systemem-rabbitmq/ - ØMQ: knihovna pro asynchronní předávání zpráv
https://www.root.cz/clanky/0mq-knihovna-pro-asynchronni-predavani-zprav/ - Další možnosti poskytované knihovnou ØMQ
https://www.root.cz/clanky/dalsi-moznosti-poskytovane-knihovnou-mq/ - Další možnosti nabízené knihovnou ØMQ, implementace protokolů ØMQ v čisté Javě
https://www.root.cz/clanky/dalsi-moznosti-nabizene-knihovnou-mq-implementace-protokolu-mq-v-ciste-jave/ - Apache ActiveMQ – další systém implementující message brokera
https://www.root.cz/clanky/apache-activemq-dalsi-system-implementujici-message-brokera/ - Použití Apache ActiveMQ s protokolem STOMP
https://www.root.cz/clanky/pouziti-apache-activemq-s-protokolem-stomp/ - Použití Apache ActiveMQ s protokolem AMQP, jazyk Go a message brokeři
https://www.root.cz/clanky/pouziti-apache-activemq-s-protokolem-amqp-jazyk-go-a-message-brokeri/
20. Odkazy na Internetu
- NATS
https://nats.io/about/ - NATS Streaming Concepts
https://nats.io/documentation/streaming/nats-streaming-intro/ - NATS Streaming Server
https://nats.io/download/nats-io/nats-streaming-server/ - NATS Introduction
https://nats.io/documentation/ - NATS Client Protocol
https://nats.io/documentation/internals/nats-protocol/ - NATS Messaging (Wikipedia)
https://en.wikipedia.org/wiki/NATS_Messaging - Stránka Apache Software Foundation
http://www.apache.org/ - Informace o portu 5672
http://www.tcp-udp-ports.com/port-5672.htm - Třída MessagingHandler knihovny Qpid Proton
https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/proton._handlers.MessagingHandler-class.html - Třída Event knihovny Qpid Proton
https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/proton._events.Event-class.html - package stomp (Go)
https://godoc.org/github.com/go-stomp/stomp - Go language library for STOMP protocol
https://github.com/go-stomp/stomp - python-qpid-proton 0.26.0 na PyPi
https://pypi.org/project/python-qpid-proton/ - Qpid Proton
http://qpid.apache.org/proton/ - Using the AMQ Python Client
https://access.redhat.com/documentation/en-us/red_hat_amq/7.1/html-single/using_the_amq_python_client/ - Apache ActiveMQ
http://activemq.apache.org/ - Apache ActiveMQ Artemis
https://activemq.apache.org/artemis/ - Apache ActiveMQ Artemis User Manual
https://activemq.apache.org/artemis/docs/latest/index.html - KahaDB
http://activemq.apache.org/kahadb.html - Understanding the KahaDB Message Store
https://access.redhat.com/documentation/en-US/Fuse_MQ_Enterprise/7.1/html/Configuring_Broker_Persistence/files/KahaDBOverview.html - Command Line Tools (Apache ActiveMQ)
https://activemq.apache.org/activemq-command-line-tools-reference.html - stomp.py 4.1.21 na PyPi
https://pypi.org/project/stomp.py/ - Stomp Tutorial
https://access.redhat.com/documentation/en-US/Fuse_Message_Broker/5.5/html/Connectivity_Guide/files/FMBConnectivityStompTelnet.html - Heartbeat (computing)
https://en.wikipedia.org/wiki/Heartbeat_(computing) - Apache Camel
https://camel.apache.org/ - Red Hat Fuse
https://developers.redhat.com/products/fuse/overview/ - Confusion between ActiveMQ and ActiveMQ-Artemis?
https://serverfault.com/questions/873533/confusion-between-activemq-and-activemq-artemis - Staré stránky projektu HornetQ
http://hornetq.jboss.org/ - Snapshot JeroMQ verze 0.4.4
https://oss.sonatype.org/content/repositories/snapshots/org/zeromq/jeromq/0.4.4-SNAPSHOT/ - Difference between ActiveMQ vs Apache ActiveMQ Artemis
http://activemq.2283324.n4.nabble.com/Difference-between-ActiveMQ-vs-Apache-ActiveMQ-Artemis-td4703828.html - Microservices communications. Why you should switch to message queues
https://dev.to/matteojoliveau/microservices-communications-why-you-should-switch-to-message-queues–48ia - Stomp.py 4.1.19 documentation
https://stomppy.readthedocs.io/en/stable/ - Repositář knihovny JeroMQ
https://github.com/zeromq/jeromq/ - ØMQ – Distributed Messaging
http://zeromq.org/ - ØMQ Community
http://zeromq.org/community - Get The Software
http://zeromq.org/intro:get-the-software - PyZMQ Documentation
https://pyzmq.readthedocs.io/en/latest/ - Module: zmq.decorators
https://pyzmq.readthedocs.io/en/latest/api/zmq.decorators.html - ZeroMQ is the answer, by Ian Barber
https://vimeo.com/20605470 - ZeroMQ RFC
https://rfc.zeromq.org/ - ZeroMQ and Clojure, a brief introduction
https://antoniogarrote.wordpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/ - zeromq/czmq
https://github.com/zeromq/czmq - golang wrapper for CZMQ
https://github.com/zeromq/goczmq - ZeroMQ version reporting in Python
http://zguide.zeromq.org/py:version - A Go interface to ZeroMQ version 4
https://github.com/pebbe/zmq4 - Broker vs. Brokerless
http://zeromq.org/whitepapers:brokerless - Learning ØMQ with pyzmq
https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/ - Céčková funkce zmq_ctx_new
http://api.zeromq.org/4–2:zmq-ctx-new - Céčková funkce zmq_ctx_destroy
http://api.zeromq.org/4–2:zmq-ctx-destroy - Céčková funkce zmq_bind
http://api.zeromq.org/4–2:zmq-bind - Céčková funkce zmq_unbind
http://api.zeromq.org/4–2:zmq-unbind - Céčková C funkce zmq_connect
http://api.zeromq.org/4–2:zmq-connect - Céčková C funkce zmq_disconnect
http://api.zeromq.org/4–2:zmq-disconnect - Céčková C funkce zmq_send
http://api.zeromq.org/4–2:zmq-send - Céčková C funkce zmq_recv
http://api.zeromq.org/4–2:zmq-recv - Třída Context (Python)
https://pyzmq.readthedocs.io/en/latest/api/zmq.html#context - Třída Socket (Python)
https://pyzmq.readthedocs.io/en/latest/api/zmq.html#socket - Python binding
http://zeromq.org/bindings:python - Why should I have written ZeroMQ in C, not C++ (part I)
http://250bpm.com/blog:4 - Why should I have written ZeroMQ in C, not C++ (part II)
http://250bpm.com/blog:8 - About Nanomsg
https://nanomsg.org/ - Advanced Message Queuing Protocol
https://www.amqp.org/ - Advanced Message Queuing Protocol na Wikipedii
https://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol - Dokumentace k příkazu rabbitmqctl
https://www.rabbitmq.com/rabbitmqctl.8.html - RabbitMQ
https://www.rabbitmq.com/ - RabbitMQ Tutorials
https://www.rabbitmq.com/getstarted.html - RabbitMQ: Clients and Developer Tools
https://www.rabbitmq.com/devtools.html - RabbitMQ na Wikipedii
https://en.wikipedia.org/wiki/RabbitMQ - Streaming Text Oriented Messaging Protocol
https://en.wikipedia.org/wiki/Streaming_Text_Oriented_Messaging_Protocol - Message Queuing Telemetry Transport
https://en.wikipedia.org/wiki/MQTT - Erlang
http://www.erlang.org/ - pika 0.12.0 na PyPi
https://pypi.org/project/pika/ - Introduction to Pika
https://pika.readthedocs.io/en/stable/ - Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
http://clojurerabbitmq.info/ - AMQP 0–9–1 Model Explained
http://www.rabbitmq.com/tutorials/amqp-concepts.html - Part 1: RabbitMQ for beginners – What is RabbitMQ?
https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html - Downloading and Installing RabbitMQ
https://www.rabbitmq.com/download.html - celery na PyPi
https://pypi.org/project/celery/ - Databáze Redis (nejenom) pro vývojáře používající Python
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/ - Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/ - Redis Queue (RQ)
https://www.fullstackpython.com/redis-queue-rq.html - Python Celery & RabbitMQ Tutorial
https://tests4geeks.com/python-celery-rabbitmq-tutorial/ - Flower: Real-time Celery web-monitor
http://docs.celeryproject.org/en/latest/userguide/monitoring.html#flower-real-time-celery-web-monitor - Asynchronous Tasks With Django and Celery
https://realpython.com/asynchronous-tasks-with-django-and-celery/ - First Steps with Celery
http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html - node-celery
https://github.com/mher/node-celery - Full Stack Python: web development
https://www.fullstackpython.com/web-development.html - Introducing RQ
https://nvie.com/posts/introducing-rq/ - Asynchronous Tasks with Flask and Redis Queue
https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue - rq-dashboard
https://github.com/eoranged/rq-dashboard - Stránky projektu Redis
https://redis.io/ - Introduction to Redis
https://redis.io/topics/introduction - Try Redis
http://try.redis.io/ - Redis tutorial, April 2010 (starší, ale pěkně udělaný)
https://static.simonwillison.net/static/2010/redis-tutorial/ - Python Redis
https://redislabs.com/lp/python-redis/ - Redis: key-value databáze v paměti i na disku
https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/ - Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
http://www.cloudsvet.cz/?p=253 - Praktický úvod do Redis (2): transakce
http://www.cloudsvet.cz/?p=256 - Praktický úvod do Redis (3): cluster
http://www.cloudsvet.cz/?p=258 - Connection pool
https://en.wikipedia.org/wiki/Connection_pool - Instant Redis Sentinel Setup
https://github.com/ServiceStack/redis-config - How to install REDIS in LInux
https://linuxtechlab.com/how-install-redis-server-linux/ - Redis RDB Dump File Format
https://github.com/sripathikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format - Lempel–Ziv–Welch
https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch - Redis Persistence
https://redis.io/topics/persistence - Redis persistence demystified
http://oldblog.antirez.com/post/redis-persistence-demystified.html - Redis reliable queues with Lua scripting
http://oldblog.antirez.com/post/250 - Ost (knihovna)
https://github.com/soveran/ost - NoSQL
https://en.wikipedia.org/wiki/NoSQL - Shard (database architecture)
https://en.wikipedia.org/wiki/Shard_%28database_architecture%29 - What is sharding and why is it important?
https://stackoverflow.com/questions/992988/what-is-sharding-and-why-is-it-important - What Is Sharding?
https://btcmanager.com/what-sharding/ - Redis clients
https://redis.io/clients - Category:Lua-scriptable software
https://en.wikipedia.org/wiki/Category:Lua-scriptable_software - Seriál Programovací jazyk Lua
https://www.root.cz/serialy/programovaci-jazyk-lua/ - Redis memory usage
http://nosql.mypopescu.com/post/1010844204/redis-memory-usage - Ukázka konfigurace Redisu pro lokální testování
https://github.com/tisnik/presentations/blob/master/redis/redis.conf - Resque
https://github.com/resque/resque - Nested transaction
https://en.wikipedia.org/wiki/Nested_transaction - Publish–subscribe pattern
https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern - Messaging pattern
https://en.wikipedia.org/wiki/Messaging_pattern - Using pipelining to speedup Redis queries
https://redis.io/topics/pipelining - Pub/Sub
https://redis.io/topics/pubsub - ZeroMQ distributed messaging
http://zeromq.org/ - ZeroMQ: Modern & Fast Networking Stack
https://www.igvita.com/2010/09/03/zeromq-modern-fast-networking-stack/ - Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
https://stackoverflow.com/questions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr - Python & Redis PUB/SUB
https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7 - Message broker
https://en.wikipedia.org/wiki/Message_broker - RESP Arrays
https://redis.io/topics/protocol#array-reply - Redis Protocol specification
https://redis.io/topics/protocol - Redis Pub/Sub: Intro Guide
https://www.redisgreen.net/blog/pubsub-intro/ - Redis Pub/Sub: Howto Guide
https://www.redisgreen.net/blog/pubsub-howto/ - Comparing Publish-Subscribe Messaging and Message Queuing
https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message - Apache Kafka
https://kafka.apache.org/ - Iron
http://www.iron.io/mq - kue (založeno na Redisu, určeno pro node.js)
https://github.com/Automattic/kue - Cloud Pub/Sub
https://cloud.google.com/pubsub/ - Introduction to Redis Streams
https://redis.io/topics/streams-intro - glob (programming)
https://en.wikipedia.org/wiki/Glob_(programming) - Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
https://www.slideshare.net/sylvinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2 - Enqueueing internals
http://python-rq.org/contrib/ - queue — A synchronized queue class
https://docs.python.org/3/library/queue.html - Queue – A thread-safe FIFO implementation
https://pymotw.com/2/Queue/ - Queues
http://queues.io/ - Windows Subsystem for Linux Documentation
https://docs.microsoft.com/en-us/windows/wsl/about - RestMQ
http://restmq.com/ - ActiveMQ
http://activemq.apache.org/ - Amazon MQ
https://aws.amazon.com/amazon-mq/ - Amazon Simple Queue Service
https://aws.amazon.com/sqs/ - Celery: Distributed Task Queue
http://www.celeryproject.org/ - Disque, an in-memory, distributed job queue
https://github.com/antirez/disque - rq-dashboard
https://github.com/eoranged/rq-dashboard - Projekt RQ na PyPi
https://pypi.org/project/rq/ - rq-dashboard 0.3.12
https://pypi.org/project/rq-dashboard/ - Job queue
https://en.wikipedia.org/wiki/Job_queue - Why we moved from Celery to RQ
https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq - Running multiple workers using Celery
https://serverfault.com/questions/655387/running-multiple-workers-using-celery - celery — Distributed processing
http://docs.celeryproject.org/en/latest/reference/celery.html - Chains
https://celery.readthedocs.io/en/latest/userguide/canvas.html#chains - Routing
http://docs.celeryproject.org/en/latest/userguide/routing.html#automatic-routing - Celery Distributed Task Queue in Go
https://github.com/gocelery/gocelery/ - Python Decorators
https://wiki.python.org/moin/PythonDecorators - Periodic Tasks
http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html - celery.schedules
http://docs.celeryproject.org/en/latest/reference/celery.schedules.html#celery.schedules.crontab - Pros and cons to use Celery vs. RQ
https://stackoverflow.com/questions/13440875/pros-and-cons-to-use-celery-vs-rq - Priority queue
https://en.wikipedia.org/wiki/Priority_queue - Jupyter
https://jupyter.org/ - How IPython and Jupyter Notebook work
https://jupyter.readthedocs.io/en/latest/architecture/how_jupyter_ipython_work.html - Context Managers
http://book.pythontips.com/en/latest/context_managers.html