Použití Apache ActiveMQ s protokolem STOMP

21. 2. 2019
Doba čtení: 41 minut

Sdílet

Dnes si ukážeme některé praktické příklady naprogramované v Pythonu, které komunikují s message brokerem Apache ActiveMQ (AMQ) s využitím protokolu STOMP. Vysvětlíme si potvrzování zpráv, použití transakcí i posílání binárních zpráv.

Obsah

1. Použití Apache ActiveMQ s protokolem STOMP

2. Zjednodušení producenta a konzumenta zpráv při použití strategie PUB-SUB

3. Totéž zjednodušení při použití komunikační strategie PUSH-PULL

4. Explicitní potvrzování zpráv (ACK) vybíraných z fronty

5. Záporné potvrzení zprávy (NACK)

6. Chování AMQ při záporném potvrzení

7. Použití vlastní fronty na straně příjemce při využití komunikace PUSH-PULL

8. Transakce

9. Komunikace typu PUB-SUB bez transakce

10. Komunikace typu PUB-SUB v transakci

11. Komunikace typu PUSH-PULL bez transakce

12. Komunikace typu PUSH-PULL v transakci

13. Zrušení celé transakce (rollback namísto commit)

14. Využití heartbeat pro ověření funkčnosti jednotlivých částí systému

15. Posílání binárních zpráv

16. Producent binární zprávy

17. Konzument binární zprávy

18. Repositář s demonstračními příklady

19. Odkazy na předchozí části seriálu

20. Odkazy na Internetu

1. Použití Apache ActiveMQ s protokolem STOMP

V předchozím článku seriálu o message brokerech jsme se seznámili s nástrojem Apache ActiveMQ (AMQ) a ukázali jsme si i dva krátké demonstrační příklady, které AMQ používaly jak pro komunikační strategii PUB-SUB, tak i pro strategii PUSH-PULL (tj. pro klasické fronty zpráv). Dnes si ukážeme další možnosti nabízené nástrojem AMQ při použití protokolu STOMP (Streaming Text Oriented Messaging Protocol), který sice nedokáže využít všechny možnosti AMQ, ovšem základní architekturu systému založeného na kombinaci AMQ+STOMP celkem bez problémů dokážeme postavit (ostatně největší problémy nás většinou čekají při návrhu subsystému pro persistenci zpráv, popř. pro clusterování a load balancing).

Poznámka: všechny možnosti AMQ je možné využít při použití protokolu AMQP 1.0, který je však mnohem složitější než STOMP. Navíc, pokud již postavíte systém na protokolu STOMP, je výměna message brokera relativně jednoduchá – použít je možné například již popsaný nástroj RabbitMQ, popř. projekt Apollo (následovníka AMQ).

V tabulce zobrazené pod tímto odstavcem jsou vypsány všechny příkazy podporované protokolem STOMP verze 1.2 (verze 1.1 se v tomto ohledu příliš neliší):

# Příkaz Stručný popis
1 SEND poslání zprávy
2 SUBSCRIBE přihlášení k odběru zpráv
3 UNSUBSCRIBE odhlášení od odběru zpráv
4 ACK potvrzení zprávy
5 NACK negativní potvrzení (vysvětlíme si příště)
6 BEGIN začátek transakce (vysvětlíme si příště)
7 COMMIT commit v transakci (provedení všech operací)
8 ABORT rollback transakce
9 DISCONNECT odpojení klienta

Ještě před popisem demonstračních příkladů se krátce zmiňme o persistenci zpráv. Ve výchozím nastavení jsou zprávy ukládány do databáze KahaDB, která nabízí mj. i žurnálování. Soubory s databází naleznete v podadresáři „data/kahadb“:

$ ls -1 data/kahadb/
 
db-1.log
db.data
db.free
db.redo
lock

Konfiguraci ukládání zpráv nalezneme v souboru activemq.xml:

<broker brokerName="broker">
   <persistenceAdapter>
     <kahaDB directory="activemq-data" journalMaxFileLength="32mb"/>
   </persistenceAdapter>
</broker>

Bližší informace o konfiguraci i o principu činnosti KahaDB naleznete na stránkách:

  1. KahaDB
    http://activemq.apache.or­g/kahadb.html
  2. Understanding the KahaDB Message Store
    https://access.redhat.com/do­cumentation/en-US/Fuse_MQ_Enterprise/7.1/html/Con­figuring_Broker_Persisten­ce/files/KahaDBOverview.html

Nastavení perzistence zpráv lze samozřejmě změnit, dokonce je možné i vyměnit typ databáze. Podrobnosti budou popsány příště v rámci konfigurace AMQ.

2. Zjednodušení producenta a konzumenta zpráv při použití strategie PUB-SUB

Nejprve si zopakujeme dva základní způsoby použití message brokera. Většinou jsou podporovány dvě rozdílně pracující komunikační strategie:

  1. PUB-SUB: zprávy jsou posílány producentem do message brokera, který má k dispozici dynamicky se měnící seznam příjemců (konzumentů) zpráv. Pokud příjemce daného tématu zpráv existuje, je mu zpráva přeposlána. Podobně se systém chová při existenci většího množství příjemců. Zpráva je od producenta přečtena i ve chvíli, kdy neexistuje žádný příjemce – v takovém případě je zahozena (pokud se interně používá fronta, je jen dočasná).
  2. PUSH-PULL: tato strategie je založena na použití fronty. Producent pošle zprávu message brokerovi, který ji uloží do fronty (každá fronta je pojmenovaná), konzumenti se mohou k frontě (frontám) připojit a zprávy přečíst. V tomto případě se zpráva doručí jen jedinému konzumentovi. Pokud se k frontě připojí větší množství konzumentů, je typicky použit algoritmus round-robin pro (polo)spravedlivé rozložení zátěže.

Ukažme si nejdříve implementaci producenta zpráv při použití strategie PUB-SUB. Jak jsme si již naznačili v předchozím textu, je každé zprávě přiřazeno téma (topic), které může být v případě použití protokolu STOMP specifikováno následovně:

"/topic/event"
"/topic/téma2"
"/topic/jiné_téma"
Poznámka: podobnost s cestou v rámci souborového systému není náhodná.

Producent zpráv se nejprve musí připojit k běžícímu message brokerovi. V demonstračním příkladu popsaném minule jsme pro připojení použili volání:

conn = stomp.Connection(host_and_ports=[("localhost", 61613)])
conn.start()
conn.connect(login="admin", passcode="admin")

Ve skutečnosti je ovšem možné využít i zjednodušeného zápisu při volání konstruktoru Connection:

conn = stomp.Connection([("localhost", 61613)])
conn.start()
conn.connect(login="admin", passcode="admin")

Zprávy se posílají metodou send, přičemž je nutné specifikovat jak téma, tak i vlastní zprávu:

conn.send("/topic/event", "Hello world!")

Podporovány jsou i další nepovinné parametry, například:

conn.send("/topic/event", "Hello world!", persistent='true')
conn.send("/topic/event", "Hello world!", transaction='id-transakce')
conn.send("/topic/event", "Hello world!", content_type='text/plain')

Po poslání všech zpráv je vhodné, aby se producent od message brokera odpojil:

conn.disconnect()

Podívejme se nyní na úplný zdrojový kód jednoduchého producenta zpráv, který použije dvě témata (topicy) se jmény „event“ a „event2“:

#!/usr/bin/env python
 
import time
import stomp
 
 
destination1 = "/topic/event"
destination2 = "/topic/event2"
 
MESSAGES = 10
 
conn = stomp.Connection([("localhost", 61613)])
conn.start()
conn.connect(login="admin", passcode="admin")
 
 
for i in range(0, MESSAGES):
    message = "Hello world #{i}!".format(i=i)
    conn.send(destination1, message, persistent='true')
    conn.send(destination2, message, persistent='true')
 
 
conn.disconnect()

Konzument (příjemce) zpráv je složitější, protože se samotný příjem zprávy (nebo vznik chyby) programuje s využitím callback metod. Samotný handler jednotlivých událostí může v nejjednodušším případě obsahovat pouze metody nazvané on_message (zavoláno při přijetí zprávy) a on_error (zavoláno v případě nějaké chyby). Existují ovšem i další callback metody, například on_disconnected a on_heartbeat, které si popíšeme později. Implementace handleru událostí vypadá takto:

class SimpleListener(object):
 
    def __init__(self, conn):
        self.conn = conn
 
    def on_message(self, headers, message):
        print("Received message: {m}".format(m=message))
 
    def on_error(self, headers, message):
        print("Received an error {e}".format(e=message))

Samotné připojení příjemce zpráv je řešeno shodným programovým kódem, jako u producenta, ovšem navíc musíme zaregistrovat výše zmíněný handler metodou set_listener:

conn = stomp.Connection([("localhost", 61613)])
conn.set_listener('', SimpleListener(conn))
conn.start()
 
conn.connect(login="admin", passcode="admin")

K odebírání zpráv pro zvolené téma se přihlásíme metodou subscribe:

conn.subscribe(id='simple_listener', destination="/topic/zvolené_téma", ack='auto')
Poznámka: nepovinný parametr ack určuje, jakým způsobem se potvrzuje zpracování zpráv. My jsme zvolili možnost „auto“. tj. zpráva je ihned po svém přijetí potvrzena, i když ve skutečnosti není zaručeno, že bude bez problémů i zpracována.

Opět si ukažme úplný zdrojový kód jednoduchého konzumenta zpráv:

#!/usr/bin/env python
 
import time
import stomp
 
 
class SimpleListener(object):
 
    def __init__(self, conn):
        self.conn = conn
 
    def on_message(self, headers, message):
        print("Received message: {m}".format(m=message))
 
    def on_error(self, headers, message):
        print("Received an error {e}".format(e=message))
 
 
destination = "/topic/event"
 
conn = stomp.Connection([("localhost", 61613)])
conn.set_listener('', SimpleListener(conn))
conn.start()
 
conn.connect(login="admin", passcode="admin")
conn.subscribe(id='simple_listener', destination=destination, ack='auto')
 
print("Waiting for messages...")
 
while True:
    time.sleep(10)

3. Totéž zjednodušení při použití komunikační strategie PUSH-PULL

Podobným způsobem můžeme implementovat producenta zpráv posílaných do front a tudíž používajících komunikační strategii PUSH-PULL a nikoli PUB-SUB. Na straně producenta se změní pouhé dva řádky, na nichž je specifikován cíl zpráv (z pohledu producenta je cílem nějaký bod message brokera). U strategie PUB-SUB byly cíle pojmenovány takto:

destination1 = "/topic/event"
destination2 = "/topic/event2"

kdežto v případě strategie PUSH-PULL použijeme:

destination1 = "/queue/test"
destination2 = "/queue/test2"
Poznámka: záleží samozřejmě na nás, kolik cílů zpráv budeme používat. V případě většiny producentů zpráv se s velkou pravděpodobnstí použije jen jediný cíl.

Nejjednodušší producent zpráv posílaných do dvojice front může vypadat takto:

#!/usr/bin/env python
 
import time
import stomp
 
 
destination1 = "/queue/test"
destination2 = "/queue/test2"
 
MESSAGES = 10
 
conn = stomp.Connection([("localhost", 61613)])
conn.start()
conn.connect(login="admin", passcode="admin")
 
 
for i in range(0, MESSAGES):
    message = "Hello world #{i}!".format(i=i)
    conn.send(destination1, message, persistent='true')
    conn.send(destination2, message, persistent='true')
 
 
conn.disconnect()

Konzument (příjemce) zpráv se změní jen nepatrně:

#!/usr/bin/env python
 
import time
import stomp
 
 
class SimpleListener(object):
 
    def __init__(self, conn):
        self.conn = conn
 
    def on_message(self, headers, message):
        print("Received message: {m}".format(m=message))
 
    def on_error(self, headers, message):
        print("Received an error {e}".format(e=message))
 
 
destination = "/queue/test"
 
conn = stomp.Connection([("localhost", 61613)])
conn.set_listener('', SimpleListener(conn))
conn.start()
 
conn.connect(login="admin", passcode="admin")
conn.subscribe(id='simple_listener', destination=destination, ack='auto')
 
print("Waiting for messages...")
 
while True:
    time.sleep(10)

4. Explicitní potvrzování zpráv (ACK) vybíraných z fronty

U všech prozatím zmíněných konzumentů zpráv jsme používali automatické potvrzení jejich zpracování. To spočívá v tom, že se zpráva ihned po svém přijetí označí za zpracovanou a je vyřazena z příslušné fronty (message broker na ní zcela „zapomene“). Nastavení automatického potvrzení se provádělo přímo při přihlašování konzumenta zpráv k nějaké frontě či k tématu:

conn.subscribe(id='simple_listener', destination=destination, ack='auto')

Můžeme ovšem specifikovat i odlišné chování – explicitní potvrzení o tom, že zpráva byla zpracována:

conn.subscribe(id='simple_listener', destination=destination, ack='client')

V tomto případě je konzument povinen explicitně potvrdit zpracování zprávy:

def on_message(self, headers, message):
    print("Received message: {m}".format(m=message))
    print(headers)
    message_id = headers['message-id']
    subscription = headers['subscription']
    self.conn.ack(message_id, subscription)

Nebo naopak zprávu nepotvrdit:

def on_message(self, headers, message):
    print("Received message: {m}".format(m=message))
    print(headers)
    message_id = headers['message-id']
    subscription = headers['subscription']
    self.conn.nack(message_id, subscription)

Existuje ještě třetí stav – nepošle se ani ACK ani NACK. Tato zpráva není považována za zpracovanou a záleží na dalším nastavení (životnost), co se s ní stane.

Povšimněte si, že message brokerovi musíme předat jednoznačný identifikátor zprávy – klientská knihovna si totiž nedrží (a ani nemusí držet) informace o kontextu či stavu fronty.
Poznámka: ve skutečnosti je možné, aby se potvrzení/nepotvrzení zprávy vykonalo i v rámci transakce. V tomto případě je nutné uvést i nepovinný parametr transaction (viz další text).

Konzumenta zpráv upravíme pro explicitní potvrzování zpráv takto:

#!/usr/bin/env python
 
import time
import stomp
 
 
class SimpleListener(object):
 
    def __init__(self, conn):
        self.conn = conn
 
    def on_message(self, headers, message):
        print("Received message: {m}".format(m=message))
        print(headers)
        message_id = headers['message-id']
        subscription = headers['subscription']
        self.conn.ack(message_id, subscription)
 
    def on_error(self, headers, message):
        print("Received an error {e}".format(e=message))
 
 
destination = "/queue/test"
 
conn = stomp.Connection(host_and_ports=[("localhost", 61613)])
conn.set_listener('', SimpleListener(conn))
conn.start()
 
conn.connect(login="admin", passcode="admin")
conn.subscribe(id='simple_listener', destination=destination, ack='client')
 
print("Waiting for messages...")
 
while True:
    time.sleep(10)

5. Záporné potvrzení zprávy (NACK)

Podívejme se nyní, jakým způsobem je možné realizovat konzumenta zpráv, který zprávy sice přečte ze zvolené fronty (zde konkrétně z fronty nazvané „test“), ovšem nepotvrdí korektní zpracování zprávy. Konzument tedy musí zprávu přečíst v rámci svého handleru a posléze zavolá metodu connection.nack(), které předá jednoznačné ID zprávy:

class SimpleListener:
 
    def __init__(self, conn):
        self.conn = conn
 
    def on_message(self, headers, message):
        print("Received message: {m}".format(m=message))
        print(headers)
        message_id = headers['message-id']
        subscription = headers['subscription']
        self.conn.nack(message_id, subscription)

Další změny ve zdrojovém kódu konzumenta nenastaly, jak je to ostatně patrné i při pohledu na jeho zdrojový kód:

#!/usr/bin/env python
 
import time
import stomp
 
 
class SimpleListener(object):
 
    def __init__(self, conn):
        self.conn = conn
 
    def on_message(self, headers, message):
        print("Received message: {m}".format(m=message))
        print(headers)
        message_id = headers['message-id']
        subscription = headers['subscription']
        self.conn.nack(message_id, subscription)
 
    def on_error(self, headers, message):
        print("Received an error {e}".format(e=message))
 
 
destination = "/queue/test"
 
conn = stomp.Connection(host_and_ports=[("localhost", 61613)])
conn.set_listener('', SimpleListener(conn))
conn.start()
 
conn.connect(login="admin", passcode="admin")
conn.subscribe(id='simple_listener', destination=destination, ack='client')
 
print("Waiting for messages...")
 
while True:
    time.sleep(10)

V navazující kapitole si ukážeme, co se vlastně stane se zprávami, u nichž nedošlo k potvrzení jejich zpracování. Message broker totiž musí v tomto případě předpokládat, že problém nastal již ve vlastním obsahu zprávy, tedy že se nutně nemusí jednat o špatně naimplementovaného klienta.

6. Chování AMQ při záporném potvrzení

Podívejme se nyní, co se stane se zprávami, které sice byly přijaty (přečteny z fronty), ovšem došlo k zápornému potvrzení jejich zpracování (NACK).

Následující výpis byl vytvořen s využitím příkazu activemq dstat, který vypíše informace o všech frontách a taktéž o všech tématech.

Name                                  Queue Size  Producer #  Consumer #   Enqueue #   Dequeue #   Forward #    Memory %
ActiveMQ.Advisory.Connection                   0           0           0           6           0           0           0
ActiveMQ.Advisory.Consumer.Queue.test          0           0           0          10           0           0           0
ActiveMQ.Advisory.MasterBroker                 0           0           0           1           0           0           0
ActiveMQ.Advisory.Queue                        0           0           0           9           0           0           0
ActiveMQ.Advisory.Topic                        0           0           0           2           0           0           0
ActiveMQ.DLQ                                   0           0           0           0           0           0           0
event                                          0           0           0           0           0           0           0
event2                                         0           0           0           0           0           0           0
test                                           0           0           0           1           1           0           0
test2                                          0           0           0           0          20           0           0

Nás ovšem budou zajímat pouze informace o frontách (queue), takže použijeme filtr představovaný příkazem activemq dstat queues. Nyní bude výsledek vypadat odlišně:

Name                Queue Size  Producer #  Consumer #   Enqueue #   Dequeue #   Forward #    Memory %
ActiveMQ.DLQ                 0           0           0           0           0           0           0
test                         0           0           0           1           1           0           0
test2                        0           0           0           0          20           0           0

Z výpisu je patrné, že v systému AMQ existují v tomto okamžiku tři fronty, z nichž dvě test a test2 jsme vytvořili našimi demonstračními příklady a fronta ActiveMQ.DLQ byla vytvořena automaticky. Jméno poslední fronty je odvozeno z plného názvu „Dead Letter Queue“.

Poznámka: fronty byly před spuštěním příkazu vymazány, proto mají velikost rovnou 0. Vymazání se provádí například přes webové rozhraní volbou (odkazem/tlačítkem) „purge“.

Pokud nyní spustíme výše popsaného producenta zpráv, mělo by se vygenerovat celkem dvacet zpráv, z nichž deset bude uloženo do fronty nazvané test a dalších deset do fronty nazvané test2:

Name                Queue Size  Producer #  Consumer #   Enqueue #   Dequeue #   Forward #    Memory %
ActiveMQ.DLQ                 0           0           0           0           0           0           0
test                        10           0           0          11           1           0           0
test2                       10           0           0          10          20           0           0

Po spuštění konzumenta (příjemce) zpráv, který odpovídá stavovým kódem NACK se sice přečte všech deset zpráv z fronty test, ovšem tyto zprávy se ihned poté, co message broker získá stav NACK, uloží do speciální fronty ActiveMQ.DLQ:

Name                Queue Size  Producer #  Consumer #   Enqueue #   Dequeue #   Forward #    Memory %
ActiveMQ.DLQ                10           0           0          10           0           0           0
test                         0           0           0          11          11           0           0
test2                       10           0           0          10          20           0           0

Informace o těchto zprávách si můžeme relativně snadno zobrazit tímto příkazem:

$ ./activemq browse ActiveMQ.DLQ

Výsledky by mohly vypadat následovně:

JMS_HEADER_FIELD:JMSDestination = ActiveMQ.DLQ
JMS_CUSTOM_FIELD:originalExpiration = 0
JMS_HEADER_FIELD:JMSDeliveryMode = persistent
JMS_HEADER_FIELD:JMSMessageID = ID:localhost.localdomain-33489-1550598531973-3:3:-1:1:1
JMS_BODY_FIELD:JMSBytes:1 =
JMS_CUSTOM_FIELD:OriginalDestination = test
JMS_HEADER_FIELD:JMSExpiration = 0
JMS_HEADER_FIELD:JMSPriority = 4
JMS_HEADER_FIELD:JMSRedelivered = false
JMS_HEADER_FIELD:JMSTimestamp = 1550607287050
 
JMS_HEADER_FIELD:JMSDestination = ActiveMQ.DLQ
JMS_CUSTOM_FIELD:originalExpiration = 0
JMS_HEADER_FIELD:JMSDeliveryMode = persistent
JMS_HEADER_FIELD:JMSMessageID = ID:localhost.localdomain-33489-1550598531973-3:3:-1:1:3
JMS_BODY_FIELD:JMSBytes:1 =
JMS_CUSTOM_FIELD:OriginalDestination = test
JMS_HEADER_FIELD:JMSExpiration = 0
JMS_HEADER_FIELD:JMSPriority = 4
JMS_HEADER_FIELD:JMSRedelivered = false
JMS_HEADER_FIELD:JMSTimestamp = 1550607287052
...
...
...

Povšimněte si, že je možné získat mj. i informaci o tom, ve které frontě byla zpráva původně uložena (hodnota atributu OriginalDestination).

V případě, že nám existence jediné fronty typu „DLQ“ pro všechny nepotvrzené zprávy nevyhovuje, můžeme si vytvořit vlastní pravidla editací souboru activemq.xml, který naleznete v podadresáři conf. Příkladem může být pravidlo pro zprávy, které byly původně poslány do fronty „test2“:

<broker>
  <destinationPolicy>
    <policyMap>
      <policyEntries>
        <policyEntry queue="test2">
          <deadLetterStrategy>
            <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
          </deadLetterStrategy>
        </policyEntry>
      </policyEntries>
    </policyMap>
  </destinationPolicy>
</broker>

Toto nastavení můžeme provést pro všechny fronty, protože v názvu fronty je možné použít žolíkové znaky. Podporovány jsou znaky „.“ (oddělovač), „*“ (jakékoli jméno/slovo v cestě) a „>“ (musí být uvedeno na konci, rekurzivní výskyty začínající daným řetězcem). To znamená, že všechny fronty mohou být reprezentovány pouze znakem „>“:

<broker>
  <destinationPolicy>
    <policyMap>
      <policyEntries>
        <policyEntry queue=">">
          <deadLetterStrategy>
            <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
          </deadLetterStrategy>
        </policyEntry>
      </policyEntries>
    </policyMap>
  </destinationPolicy>
</broker>
Poznámka: v aplikacích se složitější architekturou se se změnou fronty typu „DLG“ setkáme poměrně často, protože si úpravy vyžaduje samotná business logika.

7. Použití vlastní fronty na straně příjemce při využití komunikace PUSH-PULL

V dalším demonstračním příkladu si ukážeme, jak je možné upravit konzumenta zpráv takovým způsobem, aby konzument akceptoval všechny zprávy bez blokování (prakticky okamžitě). Řešení spočívá v tom, že v konzumentovi použijeme jeho vlastní lokální frontu, do které se zprávy budou ukládat (enqueue) v jednom vláknu, zatímco v hlavním vláknu se budou zprávy z této fronty postupně vybírat a zpracovávat, nezávisle na tom, jak rychle či naopak pomalu jsou zprávy generovány. Pro implementaci vlastní lokální fronty použijeme standardní třídu Queue, která umožňuje provádění operací s frontou z většího množství vláken (operace jsou synchronizovány). Bližší informace o této užitečné třídě naleznete například na stránce https://docs.python.org/3/li­brary/queue.html, popř. přímo z interaktivní konzole Pythonu:

$ python3
 
Python 3.6.3 (default, Oct  9 2017, 12:11:29)
[GCC 7.2.1 20170915 (Red Hat 7.2.1-2)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> help("queue")

S výsledkem:

Help on module queue:
 
NAME
    queue - A multi-producer, multi-consumer queue.
 
CLASSES
    builtins.Exception(builtins.BaseException)
        Empty
        Full
    builtins.object
        Queue
            LifoQueue
            PriorityQueue
...
...
...

V naší implementaci lokální fronty použijeme pouze metody Queue.put() a Queue.get():

>>> help("queue.Queue.put")
 
queue.Queue.put = put(self, item, block=True, timeout=None)
    Put an item into the queue.
 
    If optional args 'block' is true and 'timeout' is None (the default),
    block if necessary until a free slot is available. If 'timeout' is
    a non-negative number, it blocks at most 'timeout' seconds and raises
    the Full exception if no free slot was available within that time.
    Otherwise ('block' is false), put an item on the queue if a free slot
    is immediately available, else raise the Full exception ('timeout'
    is ignored in that case).

Povšimněte si, že metoda Queue.get() je implicitně blokující, tj. pokud nebudou k dispozici žádné zprávy, bude hlavní vlákno pasivně čekat na jejich příchod:

>>> help("queue.Queue.get")
 
Help on function get in queue.Queue:
 
queue.Queue.get = get(self, block=True, timeout=None)
    Remove and return an item from the queue.
 
    If optional args 'block' is true and 'timeout' is None (the default),
    block if necessary until an item is available. If 'timeout' is
    a non-negative number, it blocks at most 'timeout' seconds and raises
    the Empty exception if no item was available within that time.
    Otherwise ('block' is false), return an item if one is immediately
    available, else raise the Empty exception ('timeout' is ignored
    in that case).

Nyní si ukažme vlastní implementaci. Přijetí zprávy je realizováno v handleru, jehož jedním atributem je právě reference na lokální frontu:

class SimpleListener(object):
 
    def __init__(self, queue):
        self.queue = queue
 
    def on_message(self, headers, message):
        print("Received message: {m}, putting it into local queue".format(m=message))
        self.queue.put(message)
 
    def on_error(self, headers, message):
        print("Received an error {e}".format(e=message))

Naproti tomu zpracování zpráv z lokální fronty je snadné – využijeme zde blokující operaci Queue.get():

while True:
    item = q.get()
    print("Begin working on " + item)
    if item == "exit":
        break
 
    time.sleep(1)
    q.task_done()
    print("Finished working on " + item)

Úplný zdrojový kód konzumenta (příjemce) zprávy může vypadat takto:

#!/usr/bin/env python
 
import time
import stomp
 
from queue import Queue
 
 
class SimpleListener(object):
 
    def __init__(self, queue):
        self.queue = queue
 
    def on_message(self, headers, message):
        print("Received message: {m}, putting it into local queue".format(m=message))
        self.queue.put(message)
 
    def on_error(self, headers, message):
        print("Received an error {e}".format(e=message))
 
 
q = Queue()
 
destination = "/queue/test"
 
conn = stomp.Connection([("localhost", 61613)])
conn.set_listener('', SimpleListener(q))
conn.start()
 
conn.connect(login="admin", passcode="admin")
conn.subscribe(id='simple_listener', destination=destination, ack='auto')
 
print("Waiting for messages...")
 
while True:
    item = q.get()
    print("Begin working on " + item)
    if item == "exit":
        break
 
    time.sleep(1)
    q.task_done()
    print("Finished working on " + item)
 
conn.disconnect()

Samotný producent zpráv je prakticky totožný s předchozími producenty, ovšem poslední zprávou bude „exit“, kterým automaticky ukončíme konzumenta (příjemce) zprávy:

#!/usr/bin/env python
 
import time
import stomp
 
 
destination1 = "/queue/test"
destination2 = "/queue/test2"
 
MESSAGES = 10
 
conn = stomp.Connection([("localhost", 61613)])
conn.start()
conn.connect(login="admin", passcode="admin")
 
 
for i in range(0, MESSAGES):
    message = "Hello world #{i}!".format(i=i)
    conn.send(destination1, message, persistent='true')
    conn.send(destination2, message, persistent='true')
 
 
conn.send(destination1, "exit", persistent='true')
conn.send(destination2, "exit", persistent='true')
 
conn.disconnect()
 
print("Done")
Poznámka: tato implementace má jednu závažnou nevýhodu (která souvisí s tím, proč se vlastně message brokery zavádějí) – ve chvíli, kdy konzument z nějakého důvodu zhavaruje, budou všechny zprávy ztraceny. Jedno z řešení spočívá v tom, že zvolíme manuální potvrzování zpráv, čímž zajistíme, že prozatím nepotvrzené zprávy zůstanou ve frontě message brokera, kde je možná jejich snadná a bezpečná perzistence.

8. Transakce

Jednou z důležitých a mnohdy velmi užitečných vlastností protokolu STOMP, která je samozřejmě podporovaná i v systému AMQ, je možnost zpracovávat zprávy v transakci. Samotné transakce se specifikují prakticky stejným způsobem, jako například v databázových systémech – transakce je spuštěna metodou Connection.begin() a končí buď zavoláním metody Connection.commit() (dokončení transakce) nebo naopak zavoláním metody Connection.abort() (odvolání celé transakce, rollback).

Pro podporu transakcí existují v protokolu STOMP tři příkazy:

# Příkaz Stručný popis
1 BEGIN začátek transakce (vysvětlíme si příště)
2 COMMIT commit v transakci (provedení všech operací)
3 ABORT rollback transakce

Použití těchto příkazů si můžeme vyzkoušet z klienta protokolu STOMP ovládaného z příkazového řádku:

$ stomp -H 127.0.0.1 -P 61613 -U admin -W admin

Příkazy zadávané uživatelem jsou zvýrazněny tučně:

> begin
Transaction id: 72c39904-080c-478f-a540-550b2deb9d99
 
> send /queue/test x
> send /queue/test y
> send /queue/test z
> commit
Committing 72c39904-080c-478f-a540-550b2deb9d99
 
> begin
Transaction id: 37d70eb2-5e91-4988-932b-e2acfe1bc275
 
> send /queue/test u
> send /queue/test v
> send /queue/test w
>
> rollback

Na straně příjemce zpráv se objeví pouze informace o přijetí prvních tří zpráv „x“, „y“ a „z“, zatímco zprávy „u“, „v“ a „w“ nejsou do fronty vloženy a tudíž je ani příjemce nemůže získat:

Waiting for messages...
Received message: x
Received message: y
Received message: z
Poznámka: povšimněte si, že po zadání příkazu begin (začátek transakce) se vypíše řetězec s jednoznačným identifikátorem transakce. Ten budeme potřebovat v dalších demonstračních příkladech, protože se jedná o jeden z atributů zprávy.

Rozdíl mezi použitím a nepoužitím transakcí si ukážeme na dvou příkladech:

  1. Producent-konzument používající PUB-SUB
  2. Producent-konzument používající PUSH-PULL

9. Komunikace typu PUB-SUB bez transakce

Zdrojový kód producenta zpráv:

#!/usr/bin/env python
 
import time
import stomp
 
 
destination1 = "/topic/event"
destination2 = "/topic/event2"
 
MESSAGES = 10
 
conn = stomp.Connection([("localhost", 61613)])
conn.start()
conn.connect(login="admin", passcode="admin")
 
 
for i in range(0, MESSAGES):
    message = "Hello world #{i}!".format(i=i)
    print("Publishing message: " + message)
    conn.send(destination1, message, persistent='true')
    conn.send(destination2, message, persistent='true')
    time.sleep(1)
    print("Done")
 
 
conn.disconnect()

Zdrojový kód konzumenta zpráv:

#!/usr/bin/env python
 
import time
import stomp
 
 
class SimpleListener:
 
    def __init__(self, conn):
        self.conn = conn
 
    def on_message(self, headers, message):
        print("Received message: {m}".format(m=message))
 
    def on_error(self, headers, message):
        print("Received an error {e}".format(e=message))
 
 
destination = "/topic/event"
 
conn = stomp.Connection([("localhost", 61613)])
conn.set_listener('', SimpleListener(conn))
conn.start()
 
conn.connect(login="admin", passcode="admin")
conn.subscribe(id='simple_listener', destination=destination, ack='auto')
 
print("Waiting for messages...")
 
while True:
    time.sleep(10)

Chování takto nakonfigurovaného systému bude následující (vedle sebe jsou zobrazeny terminály producenta a konzumenta):

                                     |   Waiting for messages...
Publishing message: Hello world #0!  |
Done                                 |   Received message: Hello world #0!
Publishing message: Hello world #1!  |
Done                                 |   Received message: Hello world #1!
Publishing message: Hello world #2!  |
Done                                 |   Received message: Hello world #2!
Publishing message: Hello world #3!  |
Done                                 |   Received message: Hello world #3!
Publishing message: Hello world #4!  |
Done                                 |   Received message: Hello world #4!
Publishing message: Hello world #5!  |
Done                                 |   Received message: Hello world #5!
Publishing message: Hello world #6!  |
Done                                 |   Received message: Hello world #6!
Publishing message: Hello world #7!  |
Done                                 |   Received message: Hello world #7!
Publishing message: Hello world #8!  |
Done                                 |   Received message: Hello world #8!
Publishing message: Hello world #9!  |
Done                                 |   Received message: Hello world #9!

10. Komunikace typu PUB-SUB v transakci

Zdrojový kód producenta zpráv:

#!/usr/bin/env python
 
import time
import stomp
 
 
destination1 = "/topic/event"
destination2 = "/topic/event2"
 
MESSAGES = 10
 
conn = stomp.Connection([("localhost", 61613)])
conn.start()
conn.connect(login="admin", passcode="admin")
 
 
transaction=conn.begin()
print(transaction)
 
for i in range(0, MESSAGES):
    message = "Hello world #{i}!".format(i=i)
    print("Publishing message: " + message)
    conn.send(destination1, message, persistent='true', transaction=transaction)
    conn.send(destination2, message, persistent='true', transaction=transaction)
    time.sleep(1)
    print("Done")
 
conn.commit(transaction=transaction)
 
 
conn.disconnect()

Zdrojový kód konzumenta zpráv:

#!/usr/bin/env python
 
import time
import stomp
 
 
class SimpleListener:
 
    def __init__(self, conn):
        self.conn = conn
 
    def on_message(self, headers, message):
        print("Received message: {m}".format(m=message))
 
    def on_error(self, headers, message):
        print("Received an error {e}".format(e=message))
 
 
destination = "/topic/event"
 
conn = stomp.Connection([("localhost", 61613)])
conn.set_listener('', SimpleListener(conn))
conn.start()
 
conn.connect(login="admin", passcode="admin")
conn.subscribe(id='simple_listener', destination=destination, ack='auto')
 
print("Waiting for messages...")
 
while True:
    time.sleep(10)

Chování takto nakonfigurovaného systému bude následující (vedle sebe jsou zobrazeny terminály producenta a konzumenta):

                                     |   Waiting for messages...
Publishing message: Hello world #0!  |
Done                                 |
Publishing message: Hello world #1!  |
Done                                 |
Publishing message: Hello world #2!  |
Done                                 |
Publishing message: Hello world #3!  |
Done                                 |
Publishing message: Hello world #4!  |
Done                                 |
Publishing message: Hello world #5!  |
Done                                 |
Publishing message: Hello world #6!  |
Done                                 |
Publishing message: Hello world #7!  |
Done                                 |
Publishing message: Hello world #8!  |
Done                                 |
Publishing message: Hello world #9!  |
Done                                 | Received message: Hello world #0!
                                     | Received message: Hello world #1!
                                     | Received message: Hello world #2!
                                     | Received message: Hello world #3!
                                     | Received message: Hello world #4!
                                     | Received message: Hello world #5!
                                     | Received message: Hello world #6!
                                     | Received message: Hello world #7!
                                     | Received message: Hello world #8!
                                     | Received message: Hello world #9!

11. Komunikace typu PUSH-PULL bez transakce

Zdrojový kód producenta zpráv:

#!/usr/bin/env python
 
import time
import stomp
 
 
destination1 = "/queue/test"
destination2 = "/queue/test2"
 
MESSAGES = 10
 
conn = stomp.Connection([("localhost", 61613)])
conn.start()
conn.connect(login="admin", passcode="admin")
 
 
for i in range(0, MESSAGES):
    message = "Hello world #{i}!".format(i=i)
    print("Publishing message: " + message)
    conn.send(destination1, message, persistent='true')
    conn.send(destination2, message, persistent='true')
    time.sleep(2)
    print("Done")
 
 
conn.disconnect()

Chování systému:

                                     |   Waiting for messages...
Publishing message: Hello world #0!  |
Done                                 |   Received message: Hello world #0!
Publishing message: Hello world #1!  |
Done                                 |   Received message: Hello world #1!
Publishing message: Hello world #2!  |
Done                                 |   Received message: Hello world #2!
Publishing message: Hello world #3!  |
Done                                 |   Received message: Hello world #3!
Publishing message: Hello world #4!  |
Done                                 |   Received message: Hello world #4!
Publishing message: Hello world #5!  |
Done                                 |   Received message: Hello world #5!
Publishing message: Hello world #6!  |
Done                                 |   Received message: Hello world #6!
Publishing message: Hello world #7!  |
Done                                 |   Received message: Hello world #7!
Publishing message: Hello world #8!  |
Done                                 |   Received message: Hello world #8!
Publishing message: Hello world #9!  |
Done                                 |   Received message: Hello world #9!

12. Komunikace typu PUSH-PULL v transakci

Zdrojový kód producenta zpráv:

#!/usr/bin/env python
 
import time
import stomp
 
 
destination1 = "/queue/test"
destination2 = "/queue/test2"
 
MESSAGES = 10
 
conn = stomp.Connection([("localhost", 61613)])
conn.start()
conn.connect(login="admin", passcode="admin")
 
 
transaction=conn.begin()
print(transaction)
 
for i in range(0, MESSAGES):
    message = "Hello world #{i}!".format(i=i)
    print("Publishing message: " + message)
    conn.send(destination1, message, persistent='true', transaction=transaction)
    conn.send(destination2, message, persistent='true', transaction=transaction)
    time.sleep(1)
    print("Done")
 
conn.commit(transaction=transaction)
 
 
conn.disconnect()

Chování systému:

                                     |   Waiting for messages...
Publishing message: Hello world #0!  |
Done                                 |
Publishing message: Hello world #1!  |
Done                                 |
Publishing message: Hello world #2!  |
Done                                 |
Publishing message: Hello world #3!  |
Done                                 |
Publishing message: Hello world #4!  |
Done                                 |
Publishing message: Hello world #5!  |
Done                                 |
Publishing message: Hello world #6!  |
Done                                 |
Publishing message: Hello world #7!  |
Done                                 |
Publishing message: Hello world #8!  |
Done                                 |
Publishing message: Hello world #9!  |
Done                                 | Received message: Hello world #0!
                                     | Received message: Hello world #1!
                                     | Received message: Hello world #2!
                                     | Received message: Hello world #3!
                                     | Received message: Hello world #4!
                                     | Received message: Hello world #5!
                                     | Received message: Hello world #6!
                                     | Received message: Hello world #7!
                                     | Received message: Hello world #8!
                                     | Received message: Hello world #9!

13. Zrušení celé transakce (rollback namísto commit)

Zdrojový kód producenta zpráv:

#!/usr/bin/env python
 
import time
import stomp
 
 
destination1 = "/queue/test"
destination2 = "/queue/test2"
 
MESSAGES = 10
 
conn = stomp.Connection([("localhost", 61613)])
conn.start()
conn.connect(login="admin", passcode="admin")
 
 
transaction=conn.begin()
print(transaction)
 
for i in range(0, MESSAGES):
    message = "Hello world #{i}!".format(i=i)
    print("Publishing message: " + message)
    conn.send(destination1, message, persistent='true', transaction=transaction)
    conn.send(destination2, message, persistent='true', transaction=transaction)
    time.sleep(1)
    print("Done")
 
conn.abort(transaction=transaction)
 
 
conn.disconnect()

Chování systému:

                                     |   Waiting for messages...
Publishing message: Hello world #0!  |
Done                                 |
Publishing message: Hello world #1!  |
Done                                 |
Publishing message: Hello world #2!  |
Done                                 |
Publishing message: Hello world #3!  |
Done                                 |
Publishing message: Hello world #4!  |
Done                                 |
Publishing message: Hello world #5!  |
Done                                 |
Publishing message: Hello world #6!  |
Done                                 |
Publishing message: Hello world #7!  |
Done                                 |
Publishing message: Hello world #8!  |
Done                                 |
Publishing message: Hello world #9!  |
Done                                 |

14. Využití heartbeat pro ověření funkčnosti jednotlivých částí systému

Při připojení klientů k message brokerovi je možné specifikovat parametr heartbeats, kterým se specifikuje perioda „tepů“ posílaných klientem a taktéž perioda „tepů“, které klient očekává:

#!/usr/bin/env python
 
import time
import stomp
 
 
destination1 = "/topic/event"
destination2 = "/topic/event2"
 
MESSAGES = 10
 
conn = stomp.Connection([("localhost", 61613)], heartbeats=(0, 0))
conn.start()
conn.connect(login="admin", passcode="admin")
 
 
for i in range(0, MESSAGES):
    message = "Hello world #{i}!".format(i=i)
    print("Publishing message: " + message)
    conn.send(destination1, message, persistent='true')
    conn.send(destination2, message, persistent='true')
    time.sleep(0.5)
    print("Done")
 
 
conn.disconnect()

V druhém klientovi je nastaveno, že „tep“ je očekáván každou sekundu. Pokud není detekován, je klient odpojen:

#!/usr/bin/env python
 
import time
import stomp
 
 
class SimpleListener:
 
    def __init__(self, conn):
        self.conn = conn
 
    def on_message(self, headers, message):
        print("Received message: {m}".format(m=message))
 
    def on_error(self, headers, message):
        print("Received an error {e}".format(e=message))
 
    def on_disconnected(self):
        print("Disconnected")
 
    def on_heartbeat(self):
        print("Heartbeat")
 
    def on_heartbeat_timeout(self):
        print("Heartbeat timeout")
 
 
destination = "/topic/event"
 
conn = stomp.Connection([("localhost", 61613)], heartbeats=(0,1000))
conn.set_listener('', SimpleListener(conn))
conn.start()
 
conn.connect(login="admin", passcode="admin")
conn.subscribe(id='simple_listener', destination=destination, ack='auto')
 
print("Waiting for messages...")
 
while True:
    time.sleep(10)
 
print("Done...")

Na straně příjemce zpráv získáme jak informace o přijatých zprávách, tak i o „heartbeat“:

Received message: Hello world #7!
Heartbeat
Heartbeat
Heartbeat
Heartbeat
Heartbeat
Received message: Hello world #0!
Received message: Hello world #1!

Pokud „heartbeat“ nepřijmeme ve specifikovaném intervalu, bude klient odpojen:

Heartbeat timeout: diff_receive=1.5015023681335151, time=2968372.729240428, lastrec=2968371.22773806
Disconnected
Disconnected
Heartbeat timeout

15. Posílání binárních zpráv

V posledním příkladu si ukážeme jednu z možností posílání zpráv obsahujících binární data. Protokol STOMP sice přímo podporuje práci s binárními daty (zprávy nemusí být čistě textové), ovšem na problémy narazíme na straně producentů a konzumentů, zejména ve chvíli, kdy je producent realizován v jiném programovacím jazyce než konzument (popř. v odlišné verzi interpretru – typicky Python 2 vs. Python 3). Taktéž případné ladění, popř. využití telnetu a podobných nástrojů při hledání problémů v systému je v případě přímého posílání binárních dat komplikovanější.

Z tohoto důvodu může být výhodnější binární data nejdříve nějakým způsobem zakódovat do sekvence binárních znaků, například s využitím Base64, který má tu výhodu, že algoritmus kódování/dekódování lze vytvořit velmi snadno (pokud již pro daný programovací jazyk neexistuje příslušná knihovna – většinou je již k dispozici). Nevýhodou jsou delší zprávy, které musí být v message brokerovi uloženy (ve frontách) a pomalejší posílání či příjem zpráv – pokud se ovšem zpracovávají relativně krátké zprávy s menší frekvencí (desítky za sekundu pro jednoho producenta), nemělo by být zpoždění vůbec patrné.

Pro otestování posílání a příjmu zpráv s binárními daty vytvoříme producenta, který do fronty pošle data s rastrovým obrázkem. Samotný obrázek získáme snadno (jedná se o ikonu Vimu, v němž tento článek vznikl):

wget https://www.vim.org/images/vim_editor.gif

Obrázek 1: Ikona použitá jako příklad binárních dat.

16. Producent binární zprávy

Na straně producenta (zdroje) zpráv jsou provedeny tyto operace:

  1. Načtení obsahu binárního souboru s rastrovým obrázkem.
  2. Převod na sekvenci čitelných znaků (podmnožina ASCII) pomocí Base64, konkrétně funkcí base64.b64encode().
  3. Výsledek bude poslán do message brokera jako běžná zpráva.

Realizace:

with open("vim_editor.gif", mode="rb") as file:
    binaryData = file.read()
    print("Read {l} bytes from binary file".format(l=len(binaryData)))
 
    textData = base64.b64encode(binaryData)
    conn.send(destination, textData, persistent='true')
    print("Sent {l} characters".format(l=len(textData)))

Producent tedy může vypadat takto:

#!/usr/bin/env python
 
import time
import stomp
import base64
 
 
destination = "/queue/test"
 
MESSAGES = 10
 
conn = stomp.Connection(host_and_ports=[("localhost", 61613)])
conn.start()
conn.connect(login="admin", passcode="admin")
 
 
with open("vim_editor.gif", mode="rb") as file:
    binaryData = file.read()
    print("Read {l} bytes from binary file".format(l=len(binaryData)))
 
    textData = base64.b64encode(binaryData)
    conn.send(destination, textData, persistent='true')
    print("Sent {l} characters".format(l=len(textData)))
 
 
conn.disconnect()

Povšimněte si, že délka zprávy (ve znacích) je větší, než počet načtených bajtů z binárního souboru:

Read 1091 bytes from binary file
Sent 1456 characters

Pokud se nyní podíváme na fyzický tvar zprávy uložené do fronty přes webové rozhraní Active MQ, uvidíme, že zpráva byla skutečně převedena do BASE64:

Obrázek 2: Takto vypadá zpráva uložená do fronty nazvané „test“. Vlastní obsah zprávy je červeně orámován.

17. Konzument binární zprávy

Na straně konzumenta se provedou opačné kroky:

  1. Přečtení zprávy, o které víme, že používá kódování Base64.
  2. Zpětný převod na objekt typu bytes.
  3. Uložení získané sekvence bajtů do nového binárního souboru.

Realizace:

    def on_message(self, headers, message):
        binaryData = base64.b64decode(message)
        print("Received {l} characters".format(l=len(message)))
        print("   converted into {l} bytes".format(l=len(binaryData)))
 
        with open("output.gif", "wb") as f:
            f.write(binaryData)

Celý zdrojový kód producenta zpráv:

#!/usr/bin/env python
 
import time
import stomp
import base64
 
 
class SimpleListener:
 
    def __init__(self, conn):
        self.conn = conn
 
    def on_message(self, headers, message):
        binaryData = base64.b64decode(message)
        print("Received {l} characters".format(l=len(message)))
        print("   converted into {l} bytes".format(l=len(binaryData)))
 
        with open("output.gif", "wb") as f:
            f.write(binaryData)
 
    def on_error(self, headers, message):
        print("Received an error {e}".format(e=message))
 
 
destination = "/queue/test"
 
conn = stomp.Connection(host_and_ports=[("localhost", 61613)])
conn.set_listener('', SimpleListener(conn))
conn.start()
 
conn.connect(login="admin", passcode="admin")
conn.subscribe(id='simple_listener', destination=destination, ack='auto')
 
print("Waiting for messages...")
 
while True:
    time.sleep(10)

Po spuštění konzumenta by se měla zpráva načíst, převést na binární tvar a uložit do souboru „output.gif“:

bitcoin_skoleni

Waiting for messages...
Received 1456 characters
   converted into 1091 bytes
Poznámka: pokud porovnáte originální soubor „vim_editor.gif“ s nově vytvořeným souborem „output.gif“, měly by mít tyto soubory naprosto stejný obsah:
$ cmp -l vim_editor.gif output.gif
 
$ echo $?
0

18. Repositář s demonstračními příklady

Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v Pythonu byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce.

Příklad Skript/kód Popis Cesta
1 publisher.py zdroj zpráv posílaných se strategií PUB-SUB https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample01_publisher_listener/pu­blisher.py
1 subscriber.py příjemce zpráv posílaných se strategií PUB-SUB https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample01_publisher_listener/sub­scriber.py
       
2 publisher.py zdroj zpráv posílaných se strategií PUSH-PULL https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample02_publisher_listener_qu­eue/publisher.py
2 subscriber.py příjemce zpráv posílaných se strategií PUSH-PULL https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample02_publisher_listener_qu­eue/subscriber.py
       
3 publisher.py obdoba prvního příkladu, ovšem samotné připojení k message brokerovi se realizuje jednodušeji https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample03_simple_connection_ca­ll/publisher.py
3 subscriber.py obdoba prvního příkladu, ovšem samotné připojení k message brokerovi se realizuje jednodušeji https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample03_simple_connection_ca­ll/subscriber.py
       
4 publisher.py obdoba druhého příkladu, ovšem samotné připojení k message brokerovi se realizuje jednodušeji https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample04_publisher_listener_qu­eue_simple_connection_call/pu­blisher.py
4 subscriber.py obdoba druhého příkladu, ovšem samotné připojení k message brokerovi se realizuje jednodušeji https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample04_publisher_listener_qu­eue_simple_connection_call/sub­scriber.py
       
5 publisher.py běžný producent zpráv https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample05_ack/publisher.py
5 subscriber.py konzument zpráv potvrzujících jejich zpracování https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample05_ack/subscriber.py
       
6 publisher.py běžný producent zpráv https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample06_nack/publisher.py
6 subscriber.py konzument zpráv NEpotvrzujících jejich zpracování https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample06_nack/subscriber.py
       
7 publisher.py producent zpráv, na konci vyšle zprávu „exit“ https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample07_queue/publisher.py
7 subscriber.py konzument zpráv s vlastní lokální frontou https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample07_queue/subscriber­.py
       
8 publisher.py strategie PUB-SUB, transakce se nepoužívají https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample08_pub_sub_no_transac­tions/publisher.py
8 subscriber.py strategie PUB-SUB, transakce se nepoužívají https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample08_pub_sub_no_transac­tions/subscriber.py
       
9 publisher.py strategie PUB-SUB s využitím transakcí https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample09_pub_sub_transacti­ons/publisher.py
9 subscriber.py strategie PUB-SUB s využitím transakcí https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample09_pub_sub_transacti­ons/subscriber.py
       
10 publisher.py strategie PUSH-PULL, transakce se nepoužívají https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample10_push_pull_no_tran­sactions/publisher.py
10 subscriber.py strategie PUSH-PULL, transakce se nepoužívají https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample10_push_pull_no_tran­sactions/subscriber.py
       
11 publisher.py strategie PUSH-PULL s využitím transakcí https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample11_push_pull_transac­tions/publisher.py
11 subscriber.py strategie PUSH-PULL s využitím transakcí https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample11_push_pull_transac­tions/subscriber.py
       
12 publisher.py zrušení transakce https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample12_transaction_abor­t/publisher.py
12 subscriber.py zrušení transakce https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample12_transaction_abor­t/subscriber.py
       
13 publisher.py využití „heartbeat“ https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample13_heartbeat/publisher­.py
13 subscriber.py využití „heartbeat“ https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample13_heartbeat/subscri­ber.py
       
14 publisher.py producent zpráv s binárním obsahem https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample14_binary_messages/pu­blisher.py
14 subscriber.py konzument zpráv s binárním obsahem https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample14_binary_messages/sub­scriber.py
14 get_image skript pro stažení testovacího rastrového obrázku https://github.com/tisnik/message-queues-examples/blob/master/amq/e­xample14_binary_messages/get_i­mage
Poznámka: v tabulce jsou pro úplnost vypsány i dva příklady, s nimiž jsme se seznámili v předchozí části tohoto seriálu.

19. Odkazy na předchozí části seriálu

V této kapitole jsou uvedeny odkazy na všech devět předchozích částí seriálu, v němž se zabýváme různými způsoby implementace front zpráv a k nim přidružených technologií:

  1. Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí
    https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/
  2. Celery: systém implementující asynchronní fronty úloh pro Python
    https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python/
  3. Celery: systém implementující asynchronní fronty úloh pro Python (dokončení)
    https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python-dokonceni/
  4. RabbitMQ: jedna z nejúspěšnějších implementací brokera
    https://www.root.cz/clanky/rabbitmq-jedna-z-nejuspesnejsich-implementaci-brokera/
  5. Pokročilejší operace nabízené systémem RabbitMQ
    https://www.root.cz/clanky/po­krocilejsi-operace-nabizene-systemem-rabbitmq/
  6. ØMQ: knihovna pro asynchronní předávání zpráv
    https://www.root.cz/clanky/0mq-knihovna-pro-asynchronni-predavani-zprav/
  7. Další možnosti poskytované knihovnou ØMQ
    https://www.root.cz/clanky/dalsi-moznosti-poskytovane-knihovnou-mq/
  8. Další možnosti nabízené knihovnou ØMQ, implementace protokolů ØMQ v čisté Javě
    https://www.root.cz/clanky/dalsi-moznosti-nabizene-knihovnou-mq-implementace-protokolu-mq-v-ciste-jave/
  9. Apache ActiveMQ – další systém implementující message brokera
    https://www.root.cz/clanky/apache-activemq-dalsi-system-implementujici-message-brokera/

20. Odkazy na Internetu

  1. Apache ActiveMQ
    http://activemq.apache.org/
  2. Apache ActiveMQ Artemis
    https://activemq.apache.org/artemis/
  3. Apache ActiveMQ Artemis User Manual
    https://activemq.apache.or­g/artemis/docs/latest/index­.html
  4. KahaDB
    http://activemq.apache.or­g/kahadb.html
  5. Understanding the KahaDB Message Store
    https://access.redhat.com/do­cumentation/en-US/Fuse_MQ_Enterprise/7.1/html/Con­figuring_Broker_Persisten­ce/files/KahaDBOverview.html
  6. Command Line Tools (Apache ActiveMQ)
    https://activemq.apache.org/activemq-command-line-tools-reference.html
  7. stomp.py 4.1.21 na PyPi
    https://pypi.org/project/stomp.py/
  8. Stomp Tutorial
    https://access.redhat.com/do­cumentation/en-US/Fuse_Message_Broker/5.5/html/Con­nectivity_Guide/files/FMBCon­nectivityStompTelnet.html
  9. Heartbeat (computing)
    https://en.wikipedia.org/wi­ki/Heartbeat_(computing)
  10. Apache Camel
    https://camel.apache.org/
  11. Red Hat Fuse
    https://developers.redhat­.com/products/fuse/overvi­ew/
  12. Confusion between ActiveMQ and ActiveMQ-Artemis?
    https://serverfault.com/qu­estions/873533/confusion-between-activemq-and-activemq-artemis
  13. Staré stránky projektu HornetQ
    http://hornetq.jboss.org/
  14. Snapshot JeroMQ verze 0.4.4
    https://oss.sonatype.org/con­tent/repositories/snapshot­s/org/zeromq/jeromq/0.4.4-SNAPSHOT/
  15. Difference between ActiveMQ vs Apache ActiveMQ Artemis
    http://activemq.2283324.n4­.nabble.com/Difference-between-ActiveMQ-vs-Apache-ActiveMQ-Artemis-td4703828.html
  16. Microservices communications. Why you should switch to message queues
    https://dev.to/matteojoli­veau/microservices-communications-why-you-should-switch-to-message-queues–48ia
  17. Stomp.py 4.1.19 documentation
    https://stomppy.readthedoc­s.io/en/stable/
  18. Repositář knihovny JeroMQ
    https://github.com/zeromq/jeromq/
  19. ØMQ – Distributed Messaging
    http://zeromq.org/
  20. ØMQ Community
    http://zeromq.org/community
  21. Get The Software
    http://zeromq.org/intro:get-the-software
  22. PyZMQ Documentation
    https://pyzmq.readthedocs­.io/en/latest/
  23. Module: zmq.decorators
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.deco­rators.html
  24. ZeroMQ is the answer, by Ian Barber
    https://vimeo.com/20605470
  25. ZeroMQ RFC
    https://rfc.zeromq.org/
  26. ZeroMQ and Clojure, a brief introduction
    https://antoniogarrote.wor­dpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/
  27. zeromq/czmq
    https://github.com/zeromq/czmq
  28. golang wrapper for CZMQ
    https://github.com/zeromq/goczmq
  29. ZeroMQ version reporting in Python
    http://zguide.zeromq.org/py:version
  30. A Go interface to ZeroMQ version 4
    https://github.com/pebbe/zmq4
  31. Broker vs. Brokerless
    http://zeromq.org/whitepa­pers:brokerless
  32. Learning ØMQ with pyzmq
    https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/
  33. Céčková funkce zmq_ctx_new
    http://api.zeromq.org/4–2:zmq-ctx-new
  34. Céčková funkce zmq_ctx_destroy
    http://api.zeromq.org/4–2:zmq-ctx-destroy
  35. Céčková funkce zmq_bind
    http://api.zeromq.org/4–2:zmq-bind
  36. Céčková funkce zmq_unbind
    http://api.zeromq.org/4–2:zmq-unbind
  37. Céčková C funkce zmq_connect
    http://api.zeromq.org/4–2:zmq-connect
  38. Céčková C funkce zmq_disconnect
    http://api.zeromq.org/4–2:zmq-disconnect
  39. Céčková C funkce zmq_send
    http://api.zeromq.org/4–2:zmq-send
  40. Céčková C funkce zmq_recv
    http://api.zeromq.org/4–2:zmq-recv
  41. Třída Context (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#con­text
  42. Třída Socket (Python)
    https://pyzmq.readthedocs­.io/en/latest/api/zmq.html#soc­ket
  43. Python binding
    http://zeromq.org/bindings:python
  44. Why should I have written ZeroMQ in C, not C++ (part I)
    http://250bpm.com/blog:4
  45. Why should I have written ZeroMQ in C, not C++ (part II)
    http://250bpm.com/blog:8
  46. About Nanomsg
    https://nanomsg.org/
  47. Advanced Message Queuing Protocol
    https://www.amqp.org/
  48. Advanced Message Queuing Protocol na Wikipedii
    https://en.wikipedia.org/wi­ki/Advanced_Message_Queuin­g_Protocol
  49. Dokumentace k příkazu rabbitmqctl
    https://www.rabbitmq.com/rab­bitmqctl.8.html
  50. RabbitMQ
    https://www.rabbitmq.com/
  51. RabbitMQ Tutorials
    https://www.rabbitmq.com/get­started.html
  52. RabbitMQ: Clients and Developer Tools
    https://www.rabbitmq.com/dev­tools.html
  53. RabbitMQ na Wikipedii
    https://en.wikipedia.org/wi­ki/RabbitMQ
  54. Streaming Text Oriented Messaging Protocol
    https://en.wikipedia.org/wi­ki/Streaming_Text_Oriented_Mes­saging_Protocol
  55. Message Queuing Telemetry Transport
    https://en.wikipedia.org/wiki/MQTT
  56. Erlang
    http://www.erlang.org/
  57. pika 0.12.0 na PyPi
    https://pypi.org/project/pika/
  58. Introduction to Pika
    https://pika.readthedocs.i­o/en/stable/
  59. Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
    http://clojurerabbitmq.info/
  60. AMQP 0–9–1 Model Explained
    http://www.rabbitmq.com/tutorials/amqp-concepts.html
  61. Part 1: RabbitMQ for beginners – What is RabbitMQ?
    https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html
  62. Downloading and Installing RabbitMQ
    https://www.rabbitmq.com/dow­nload.html
  63. celery na PyPi
    https://pypi.org/project/celery/
  64. Databáze Redis (nejenom) pro vývojáře používající Python
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/
  65. Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/
  66. Redis Queue (RQ)
    https://www.fullstackpython.com/redis-queue-rq.html
  67. Python Celery & RabbitMQ Tutorial
    https://tests4geeks.com/python-celery-rabbitmq-tutorial/
  68. Flower: Real-time Celery web-monitor
    http://docs.celeryproject­.org/en/latest/userguide/mo­nitoring.html#flower-real-time-celery-web-monitor
  69. Asynchronous Tasks With Django and Celery
    https://realpython.com/asynchronous-tasks-with-django-and-celery/
  70. First Steps with Celery
    http://docs.celeryproject­.org/en/latest/getting-started/first-steps-with-celery.html
  71. node-celery
    https://github.com/mher/node-celery
  72. Full Stack Python: web development
    https://www.fullstackpython.com/web-development.html
  73. Introducing RQ
    https://nvie.com/posts/introducing-rq/
  74. Asynchronous Tasks with Flask and Redis Queue
    https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue
  75. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  76. Stránky projektu Redis
    https://redis.io/
  77. Introduction to Redis
    https://redis.io/topics/introduction
  78. Try Redis
    http://try.redis.io/
  79. Redis tutorial, April 2010 (starší, ale pěkně udělaný)
    https://static.simonwilli­son.net/static/2010/redis-tutorial/
  80. Python Redis
    https://redislabs.com/lp/python-redis/
  81. Redis: key-value databáze v paměti i na disku
    https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/
  82. Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
    http://www.cloudsvet.cz/?p=253
  83. Praktický úvod do Redis (2): transakce
    http://www.cloudsvet.cz/?p=256
  84. Praktický úvod do Redis (3): cluster
    http://www.cloudsvet.cz/?p=258
  85. Connection pool
    https://en.wikipedia.org/wi­ki/Connection_pool
  86. Instant Redis Sentinel Setup
    https://github.com/ServiceStack/redis-config
  87. How to install REDIS in LInux
    https://linuxtechlab.com/how-install-redis-server-linux/
  88. Redis RDB Dump File Format
    https://github.com/sripat­hikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format
  89. Lempel–Ziv–Welch
    https://en.wikipedia.org/wi­ki/Lempel%E2%80%93Ziv%E2%80%93­Welch
  90. Redis Persistence
    https://redis.io/topics/persistence
  91. Redis persistence demystified
    http://oldblog.antirez.com/post/redis-persistence-demystified.html
  92. Redis reliable queues with Lua scripting
    http://oldblog.antirez.com/post/250
  93. Ost (knihovna)
    https://github.com/soveran/ost
  94. NoSQL
    https://en.wikipedia.org/wiki/NoSQL
  95. Shard (database architecture)
    https://en.wikipedia.org/wi­ki/Shard_%28database_archi­tecture%29
  96. What is sharding and why is it important?
    https://stackoverflow.com/qu­estions/992988/what-is-sharding-and-why-is-it-important
  97. What Is Sharding?
    https://btcmanager.com/what-sharding/
  98. Redis clients
    https://redis.io/clients
  99. Category:Lua-scriptable software
    https://en.wikipedia.org/wi­ki/Category:Lua-scriptable_software
  100. Seriál Programovací jazyk Lua
    https://www.root.cz/seria­ly/programovaci-jazyk-lua/
  101. Redis memory usage
    http://nosql.mypopescu.com/pos­t/1010844204/redis-memory-usage
  102. Ukázka konfigurace Redisu pro lokální testování
    https://github.com/tisnik/pre­sentations/blob/master/re­dis/redis.conf
  103. Resque
    https://github.com/resque/resque
  104. Nested transaction
    https://en.wikipedia.org/wi­ki/Nested_transaction
  105. Publish–subscribe pattern
    https://en.wikipedia.org/wi­ki/Publish%E2%80%93subscri­be_pattern
  106. Messaging pattern
    https://en.wikipedia.org/wi­ki/Messaging_pattern
  107. Using pipelining to speedup Redis queries
    https://redis.io/topics/pipelining
  108. Pub/Sub
    https://redis.io/topics/pubsub
  109. ZeroMQ distributed messaging
    http://zeromq.org/
  110. ZeroMQ: Modern & Fast Networking Stack
    https://www.igvita.com/2010/09/03/ze­romq-modern-fast-networking-stack/
  111. Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
    https://stackoverflow.com/qu­estions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr
  112. Python & Redis PUB/SUB
    https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7
  113. Message broker
    https://en.wikipedia.org/wi­ki/Message_broker
  114. RESP Arrays
    https://redis.io/topics/protocol#array-reply
  115. Redis Protocol specification
    https://redis.io/topics/protocol
  116. Redis Pub/Sub: Intro Guide
    https://www.redisgreen.net/blog/pubsub-intro/
  117. Redis Pub/Sub: Howto Guide
    https://www.redisgreen.net/blog/pubsub-howto/
  118. Comparing Publish-Subscribe Messaging and Message Queuing
    https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message
  119. Apache Kafka
    https://kafka.apache.org/
  120. Iron
    http://www.iron.io/mq
  121. kue (založeno na Redisu, určeno pro node.js)
    https://github.com/Automattic/kue
  122. Cloud Pub/Sub
    https://cloud.google.com/pubsub/
  123. Introduction to Redis Streams
    https://redis.io/topics/streams-intro
  124. glob (programming)
    https://en.wikipedia.org/wi­ki/Glob_(programming)
  125. Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
    https://www.slideshare.net/syl­vinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2
  126. Enqueueing internals
    http://python-rq.org/contrib/
  127. queue — A synchronized queue class
    https://docs.python.org/3/li­brary/queue.html
  128. Queue – A thread-safe FIFO implementation
    https://pymotw.com/2/Queue/
  129. Queues
    http://queues.io/
  130. Windows Subsystem for Linux Documentation
    https://docs.microsoft.com/en-us/windows/wsl/about
  131. RestMQ
    http://restmq.com/
  132. ActiveMQ
    http://activemq.apache.org/
  133. Amazon MQ
    https://aws.amazon.com/amazon-mq/
  134. Amazon Simple Queue Service
    https://aws.amazon.com/sqs/
  135. Celery: Distributed Task Queue
    http://www.celeryproject.org/
  136. Disque, an in-memory, distributed job queue
    https://github.com/antirez/disque
  137. rq-dashboard
    https://github.com/eoranged/rq-dashboard
  138. Projekt RQ na PyPi
    https://pypi.org/project/rq/
  139. rq-dashboard 0.3.12
    https://pypi.org/project/rq-dashboard/
  140. Job queue
    https://en.wikipedia.org/wi­ki/Job_queue
  141. Why we moved from Celery to RQ
    https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq
  142. Running multiple workers using Celery
    https://serverfault.com/qu­estions/655387/running-multiple-workers-using-celery
  143. celery — Distributed processing
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.html
  144. Chains
    https://celery.readthedoc­s.io/en/latest/userguide/can­vas.html#chains
  145. Routing
    http://docs.celeryproject­.org/en/latest/userguide/rou­ting.html#automatic-routing
  146. Celery Distributed Task Queue in Go
    https://github.com/gocelery/gocelery/
  147. Python Decorators
    https://wiki.python.org/mo­in/PythonDecorators
  148. Periodic Tasks
    http://docs.celeryproject­.org/en/latest/userguide/pe­riodic-tasks.html
  149. celery.schedules
    http://docs.celeryproject­.org/en/latest/reference/ce­lery.schedules.html#celery­.schedules.crontab
  150. Pros and cons to use Celery vs. RQ
    https://stackoverflow.com/qu­estions/13440875/pros-and-cons-to-use-celery-vs-rq
  151. Priority queue
    https://en.wikipedia.org/wi­ki/Priority_queue
  152. Jupyter
    https://jupyter.org/
  153. How IPython and Jupyter Notebook work
    https://jupyter.readthedoc­s.io/en/latest/architectu­re/how_jupyter_ipython_wor­k.html
  154. Context Managers
    http://book.pythontips.com/en/la­test/context_managers.html

Autor článku

Vystudoval VUT FIT a v současné době pracuje na projektech vytvářených v jazycích Python a Go.