Obsah
1. Další možnosti nabízené knihovnou ØMQ, implementace protokolů ØMQ v čisté Javě
2. Pipeline využívající strategii PUSH-PULL
4. Připojení k zařízení typu Streamer
6. Základní rozdíly mezi zařízeními 0MQ a PyZMQ
7. PyZMQ zařízení typu Queue versus ØMQ Queue
8. Úprava fronty takovým způsobem, aby se použilo zařízení PyZMQ
9. Kooperace mezi 0MQ a aplikacemi naprogramovanými v Javě
11. Komunikace typu REQ-REP: klient naprogramovaný v Pythonu a server v Javě
12. Otestování komunikace klient-server
14. Komunikační strategie typu PUSH-PULL: přepis producenta zpráv do Javy
15. Komunikační strategie typu PUSH-PULL: přepis konzumenta zpráv do Javy
16. Rozdělení zátěže mezi větší počet konzumentů (workerů)
17. Repositář s demonstračními příklady
18. Odkazy na předchozí části seriálu
1. Další možnosti nabízené knihovnou ØMQ, implementace protokolů ØMQ v čisté Javě
Dnešní článek o knihovně ØMQ je rozdělen na tři části. V úvodní části dokončíme popis využití komunikační strategie typu PUSH-PULL, s níž jsme se již v tomto seriálu několikrát setkali, ovšem prozatím jsme si neukázali všechny možnosti, které nám tato strategie nabízí. Na tuto část navážeme popisem vlastností zařízení PyZMQ (PyZMQ devices), která se používají jednodušeji, než zařízení samotné knihovny ØMQ (ØMQ devices). A konečně bude závěrečná část věnována základním způsobům použití knihovny ØMQ, přesněji řečeno její varianty nazvané JeroMQ, z programovacího jazyka Java. Díky závěrečné části článku se nám tedy podaří propojit témata tohoto seriálu a poněkud nepravidelně vycházejících článků o Javě a o virtuálním stroji Javy.
2. Pipeline využívající strategii PUSH-PULL
V této kapitole se na chvíli vrátíme k architektuře používající komunikační strategii PUSH-PULL.
Obrázek 1: Jednosměrná komunikace využívající strategii PUSH-PULL.
Minule jsme si ukázali, jakým způsobem je možné implementovat rozložení zátěže (úkolů) mezi větší množství workerů pomocí fan-out (fanout) a následně všechny výsledky práce workerů sloučit a popř. dále zpracovat (uložit atd.) v jediném uzlu (fan-in).
Obrázek 2: Využití většího množství workerů pomocí fan-in a fan-out.
Ovšem s využitím strategie PUSH-PULL je možné realizovat i „kolonu“ (pipeline) složenou z většího množství workerů, kteří si navzájem posílají výsledky své práce. Pojmenování pipeline je v tomto případě příznačné, protože prakticky tu stejnou funkci nám poskytuje shell při použití rour a FIFO. Ukažme si nyní, jak je možné realizovat relativně jednoduchou a přímočarou pipeline, v níž existuje jeden zdroj úkolů, workery dvou typů a na konci sběratel (collector) výsledků:
Obrázek 3: Jednoduchá pipeline.
Ve výše zobrazeném schématu se prozatím nenachází žádné zařízení, což je obecně poněkud problematické, protože to znamená, že jeden z workerů musí vystupovat v roli serveru (a přitom víme, že workeři jsou obecně nestabilním prvkem celého systému, jelikož je typicky připojujeme a odpojujeme podle aktuální zátěže, workeři mohou být restartováni po pádu, po zjištění velké spotřeby paměti atd.).
Implementace uzlu, který vytváří úkoly (job, task) pro jednotlivé workery a používá ve svém zdrojovém kódu dekorátory, vypadá následovně:
import zmq from zmq.decorators import context, socket from time import sleep CONNECTION_TYPE = zmq.PUSH PORT = 5556 def send_message(socket, message): """Poslání zprávy.""" print("Sending message '{m}'".format(m=message)) socket.send_string(message) @context() @socket(CONNECTION_TYPE) def start_producer(context, socket): """Spuštění serveru.""" address = "tcp://*:{port}".format(port=PORT) # socket.set_hwm(1) socket.bind(address) print("Bound to address {a}".format(a=address)) for i in range(11): send_message(socket, "Message #{i}".format(i=i)) sleep(0.1) start_producer()
Implementace prvního workera, který je umístěn do levé části pipeline:
import zmq from zmq.decorators import context, socket from os import getpid IN_PORT = 5556 OUT_PORT = 5557 @context() @socket(zmq.PULL) @socket(zmq.PUSH) def start_worker(context, in_socket, out_socket): """Spuštění workera.""" address = "tcp://localhost:{port}".format(port=IN_PORT) in_socket.connect(address) print("Connected to {a}".format(a=address)) address = "tcp://*:{port}".format(port=OUT_PORT) out_socket.bind(address) print("And to {a}".format(a=address)) print("Waiting for message from producer...") pid = getpid() while True: message = in_socket.recv_string() out_message = "Message from {pid}: '{m}'".format(pid=pid, m=message) print(out_message) out_socket.send_string(out_message) start_worker()
Implementace druhého workera, který je umístěn do pravé části pipeline:
import zmq from zmq.decorators import context, socket from os import getpid IN_PORT = 5557 OUT_PORT = 5558 @context() @socket(zmq.PULL) @socket(zmq.PUSH) def start_worker(context, in_socket, out_socket): """Spuštění workera.""" address = "tcp://localhost:{port}".format(port=IN_PORT) in_socket.connect(address) print("Connected to {a}".format(a=address)) address = "tcp://localhost:{port}".format(port=OUT_PORT) out_socket.connect(address) print("And to {a}".format(a=address)) print("Waiting for message from worker #1...") pid = getpid() while True: message = in_socket.recv_string() out_message = "Message from {pid}: '{m}'".format(pid=pid, m=message) print(out_message) out_socket.send_string(out_message) start_worker()
A nakonec si ukažme implementaci sběratele výsledků:
import zmq from zmq.decorators import context, socket from time import sleep CONNECTION_TYPE = zmq.PULL PORT = 5558 @context() @socket(CONNECTION_TYPE) def start_collector(context, socket): """Spuštění sběratele.""" address = "tcp://*:{port}".format(port=PORT) socket.bind(address) print("Connected to {a}".format(a=address)) print("Waiting for message from worker #2 ...") while True: message = socket.recv_string() print("Collecting message: '{m}'".format(m=message)) sleep(0) start_collector()
- 5556: od producenta k prvnímu workeru
- 5557: od prvního workera ke druhému workeru
- 5558: od druhého workera k procesu, který výsledky zpracuje
3. Zařízení typu Streamer
Posledním typem zařízení, s nímž se v dnešním článku seznámíme, je zařízení nazvané ZMQ.STREAMER. Toto zařízení se používá ve chvíli, kdy spolu jednotlivé uzly komunikují s využitím strategie PUSH-PULL. Streamer se do celého procesu zpracování přidává jednoduše, protože se na přední straně (frontend) chová jako běžný příjemze zpráv (PULL) a na zadní straně (backend) jako jejich zdroj. Na rozdíl od zařízení Queue tedy nebylo nutné přidat nové typy socketů typu XREP a XREQ:
Obrázek 4: Použití zařízení typu Streamer.
Opět si ukažme princip použití tohoto zařízení. Nejprve vytvoříme skripty se zdrojem zpráv a s jejich cílovým zpracováním. V obou skriptech se bude používat metoda Socket.connect, nikoli Socket.bind. Navíc se budou používat odlišná čísla portů, protože mezi producentem a konzumentem bude umístěno právě zařízení typu Streamer.
Implementace producenta zpráv:
import zmq from zmq.decorators import context, socket import time CONNECTION_TYPE = zmq.PUSH PORT = 5550 def send_message(socket, message): """Poslání zprávy.""" print("Sending message '{m}'".format(m=message)) socket.send_string(message) @context() @socket(CONNECTION_TYPE) def start_producer(port, context, socket): """Spuštění serveru.""" address = "tcp://localhost:{port}".format(port=port) # socket.set_hwm(1) socket.connect(address) print("Connected to address {a}".format(a=address)) for i in range(100): send_message(socket, "Message #{i}".format(i=i)) time.sleep(0.2) start_producer(PORT)
Implementace konzumenta zpráv:
import zmq from zmq.decorators import context, socket import time CONNECTION_TYPE = zmq.PULL PORT = 5551 @context() @socket(CONNECTION_TYPE) def start_consumer(port, context, socket): """Spuštění konzumenta.""" address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) print("Waiting for message...") cnt = 0 while True: message = socket.recv_string() cnt += 1 print("Received message {c} of 100: '{m}'".format(c=cnt, m=message)) time.sleep(0) start_consumer(PORT)
4. Připojení k zařízení typu Streamer
Vlastní implementace streameru je vlastně velmi jednoduchá a prakticky se neliší například od implementace zařízení typu Queue. Ostatně se můžeme podívat na zdrojový kód skriptu s implementací, v němž se na portu 5550 navazuje připojení typu PULL a na portu 5551 připojení typu PUSH, tj. přesně tak, jak to bylo naznačeno na čtvrtém obrázku. Na obou stranách se používá metoda Socket.bind, protože streamer bude ústředním a stabilním prvkem celé architektury:
import zmq from zmq.decorators import context, socket PULL_PORT = 5550 PUSH_PORT = 5551 @context() @socket(zmq.PULL) @socket(zmq.PUSH) def create_streamer(pull_port, push_port, context, frontend, backend): """Vytvoření streameru.""" context = zmq.Context() address = "tcp://*:{port}".format(port=pull_port) frontend.bind(address) print("Bound to {a} on port {p}".format(a=address, p=pull_port)) address = "tcp://*:{port}".format(port=push_port) backend.bind(address) print("Bound to {a} on port {p}".format(a=address, p=push_port)) zmq.device(zmq.STREAMER, frontend, backend) create_streamer(PULL_PORT, PUSH_PORT)
5. Zařízení PyZMQ
V předchozím článku jsme se mj. zabývali i popisem takzvaných zařízení ØMQ. Připomeňme si, že samotná knihovna ØMQ nabízí vývojářům následující zařízení, která se většinou používají při návrhu systémů se složitější architekturou a s požadavkem na jejich větší robustnost:
Zařízení | Stručný popis |
---|---|
ZMQ.QUEUE | prostředník používaný především v komunikaci REQ-REP (klasický klient, server) |
ZMQ.FORWARDER | používá se jako prostředník mezi zdroji zpráv a jejich příjemci PUB-SUB |
ZMQ.STREAMER | používá se v komunikační strategii PUSH-PULL |
Knihovna PyZMQ, která slouží jako rozhraní mezi nativní knihovnou ØMQ a programovacím jazykem Python, programátorům nabízí ještě jednu možnost – takzvaná zařízení PyZMQ (PyZMQ devices). Jejich základní význam je stejný, jako u zařízení ØMQ, ovšem zatímco u zařízení ØMQ jsme museli explicitně otevírat (popř. i zavírat) sockety, u zařízení PyZMQ se pracuje pouze s typy socketů. Všechny operace typu vytvoření socketu, vytvoření zařízení ØMQ atd., knihovna PyZMQ udělá za vývojáře sama, takže výsledný zdrojový kód je mnohem jednodušší a taktéž idiomatičtější (navíc se v naprosté většině případů vyhneme i použití dekorátorů, dokonce ani není nutné explicitně vytvářet instanci kontextu ØMQ).
Navíc je kód reprezentující zařízení spuštěn v samostatném (automaticky vytvořeném) vláknu, což znamená, že budeme moci velmi snadno v jednom skriptu implementovat jak příslušné zařízení, tak například i server či producenta. Z dalších pohledů však zařízení PyZMQ svými vlastnostmi plně odpovídají zařízením ØMQ:
Zařízení ØMQ | Zařízení PyZMQ |
---|---|
zmq.device(zmq.QUEUE, socket, socket) | ProcessDevice(zmq.QUEUE, typ_socketu, typ_socketu) |
zmq.device(zmq.FORWARDER, socket, socket) | ProcessDevice(zmq.FORWARDER, typ_socketu, typ_socketu) |
zmq.device(zmq.STREAMER, socket, socket) | ProcessDevice(zmq.STREAMER, typ_socketu, typ_socketu) |
6. Základní rozdíly mezi zařízeními 0MQ a PyZMQ
Podívejme se nyní na praktické rozdíly mezi zařízeními knihovny ØMQ a zařízeními rozhraní PyZMQ z pohledu běžného vývojáře.
Nejdříve si připomeňme, jakým způsobem jsme použili zařízení Streamer, které se používá při komunikaci typu PUSH-PULL. Toto zařízení bude vloženo mezi producenta zpráv a mezi jejich konzumenta.
Samotnou implementaci meziuzlu reprezentovaného zařízením Streamer můžeme přepsat s využitím zařízení PyZMQ. Nejprve musíme importovat třídu ProcessDevice z modulu zmq.devices.basedevice:
from zmq.devices.basedevice import ProcessDevice
Dále zkonstruujeme instanci třídy ProcessDevice, přičemž budeme muset specifikovat jak typ zařízení (Streamer), tak i typ socketu na frontendu i na backendu (skutečně se specifikují pouze typy socketů, nikoli samotné sockety – ty nemusíme explicitně vytvářet):
streamer_device = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH)
Jakmile je zařízení vytvořeno, musíme nastavit adresy, na nichž zařízení typu Streamer naslouchá. K tomu se používají metody nazvané bind_in a bind_out:
frontend_address = "tcp://*:{port}".format(port=pull_port) backend_address = "tcp://*:{port}".format(port=push_port) streamer_device.bind_in(frontend_address) streamer_device.bind_out(backend_address)
Nakonec pouze zařízení spustíme, takže se interně v novém vláknu (tedy z našeho pohledu na pozadí) spustí nekonečná smyčka zpracovávající zprávy přicházející na frontend a posílající zprávy na backend:
streamer_device.start()
Úplný zdrojový kód meziuzlu se zařízením typu Streamer a současně i s producentem zpráv vypadá takto:
import zmq from zmq.decorators import context, socket from zmq.devices.basedevice import ProcessDevice import time PULL_PORT = 5550 PUSH_PORT = 5551 PRODUCER_CONNECTION_TYPE = zmq.PUSH PRODUCER_PORT = 5550 def create_streamer(pull_port, push_port): """Vytvoření streameru.""" streamer_device = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH) frontend_address = "tcp://*:{port}".format(port=pull_port) backend_address = "tcp://*:{port}".format(port=push_port) streamer_device.bind_in(frontend_address) streamer_device.bind_out(backend_address) print("Bound to {a} on port {p}".format(a=frontend_address, p=pull_port)) print("Bound to {a} on port {p}".format(a=backend_address, p=push_port)) streamer_device.start() print("Device started in background") def send_message(socket, message): """Poslání zprávy.""" print("Sending message '{m}'".format(m=message)) socket.send_string(message) @socket(PRODUCER_CONNECTION_TYPE) def start_producer(port, context, socket): """Spuštění zdroje zprav.""" address = "tcp://localhost:{port}".format(port=port) # socket.set_hwm(1) socket.connect(address) print("Connected to address {a}".format(a=address)) for i in range(100): send_message(socket, "Message #{i}".format(i=i)) time.sleep(0.2) @context() def start_device_and_producer(context): create_streamer(PULL_PORT, PUSH_PORT) start_producer(PRODUCER_PORT, context) start_device_and_producer()
Kód konzumenta se žádným způsobem nezmění, takže jen pro úplnost:
import zmq from zmq.decorators import context, socket import time CONNECTION_TYPE = zmq.PULL PORT = 5551 @context() @socket(CONNECTION_TYPE) def start_consumer(port, context, socket): """Spuštění konzumenta.""" address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) print("Waiting for message...") cnt = 0 while True: message = socket.recv_string() cnt += 1 print("Received message {c} of 100: '{m}'".format(c=cnt, m=message)) time.sleep(0) start_consumer(PORT)
7. PyZMQ zařízení typu Queue versus ØMQ Queue
Druhým typem zařízení rozhraní PyZMQ, s nímž se dnes seznámíme, je zařízení typu Queue, které se využívá především v komunikační strategii REP-REQ, s níž jsme se již taktéž setkali. Opět si nejdříve ukažme implementaci jednoduchého serveru a klienta, mezi něž je vložena fronta vytvořená klasicky pomocí funkcí 0MQ:
Server vypadal takto:
import zmq from math import factorial PORT = 5557 def connect(port, connection_type): """Otevření socketu se specifikovaným typem spojení.""" context = zmq.Context() socket = context.socket(connection_type) address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) return socket def send_response(socket, response): """Odeslání odpovědi.""" print("Sending response '{r}'".format(r=response)) socket.send_string(response) def receive_request(socket): """Zpracování požadavku klienta.""" request = socket.recv_string() print("Received request from client: '{r}'".format(r=request)) return request def start_server(): """Spuštění serveru.""" socket = connect(PORT, zmq.REP) while True: request = receive_request(socket) try: n = int(request) fact = factorial(n) send_response(socket, "{n}! = {f}".format(n=n, f=fact)) except Exception as e: send_response(socket, "Wrong input") start_server()
import zmq PORT = 5556 def connect(port, connection_type): """Otevření socketu se specifikovaným typem spojení.""" context = zmq.Context() socket = context.socket(connection_type) address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) return socket def send_request(socket, request): """Poslání požadavku.""" print("Sending request '{r}'".format(r=request)) socket.send_string(request) def start_client(): """Spuštění klienta.""" socket = connect(PORT, zmq.REQ) send_request(socket, "1") print(socket.recv_string()) print() send_request(socket, "10") print(socket.recv_string()) print() send_request(socket, "xyzzy") print(socket.recv_string()) print() send_request(socket, "-10") print(socket.recv_string()) print() send_request(socket, "100") print(socket.recv_string()) print() start_client()
A konečně vlastní zařízení typu Queue:
import zmq XREP_PORT = 5556 XREQ_PORT = 5557 def create_queue(xrep_port, xreq_port): """Vytvoření fronty.""" context = zmq.Context() frontend = context.socket(zmq.XREP) address = "tcp://*:{port}".format(port=xrep_port) frontend.bind(address) print("Bound to {a} on port {p}".format(a=address, p=xrep_port)) backend = context.socket(zmq.XREQ) address = "tcp://*:{port}".format(port=xreq_port) backend.bind(address) print("Bound to {a} on port {p}".format(a=address, p=xreq_port)) zmq.device(zmq.QUEUE, frontend, backend) create_queue(XREP_PORT, XREQ_PORT)
8. Úprava fronty takovým způsobem, aby se použilo zařízení PyZMQ
Úprava při použití zařízení PyZMQ bude spočívat se sloučení kódu klienta a fronty do následujícího skriptu:
import zmq from zmq.decorators import context from zmq.devices.basedevice import ProcessDevice from math import factorial XREP_PORT = 5556 XREQ_PORT = 5557 SERVER_PORT = 5557 def create_queue(xrep_port, xreq_port): """Vytvoření fronty.""" queue_device = ProcessDevice(zmq.QUEUE, zmq.XREP, zmq.XREQ) frontend_address = "tcp://*:{port}".format(port=xrep_port) backend_address = "tcp://*:{port}".format(port=xreq_port) queue_device.bind_in(frontend_address) queue_device.bind_out(backend_address) print("Bound to {a} on port {p}".format(a=frontend_address, p=xrep_port)) print("Bound to {a} on port {p}".format(a=backend_address, p=xreq_port)) queue_device.start() print("Queue device started in background") def connect(context, port, connection_type): """Otevření socketu se specifikovaným typem spojení.""" socket = context.socket(connection_type) address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) return socket def send_response(socket, response): """Odeslání odpovědi.""" print("Sending response '{r}'".format(r=response)) socket.send_string(response) def receive_request(socket): """Zpracování požadavku klienta.""" request = socket.recv_string() print("Received request from client: '{r}'".format(r=request)) return request def start_server(context, port): """Spuštění serveru.""" socket = connect(context, port, zmq.REP) while True: request = receive_request(socket) try: n = int(request) fact = factorial(n) send_response(socket, "{n}! = {f}".format(n=n, f=fact)) except Exception as e: send_response(socket, "Wrong input") @context() def start_device_and_server(context): create_queue(XREP_PORT, XREQ_PORT) start_server(context, SERVER_PORT) start_device_and_server()
9. Kooperace mezi 0MQ a aplikacemi naprogramovanými v Javě
V závěrečné části dnešního článku si ukážeme, jakým způsobem je možné ØMQ využít v Javě a (nepřímo) i v dalších programovacích jazycích, které jsou postaveny nad virtuálním strojem Javy (Groovy, Scale, Clojure, pravděpodobně i dnes již poněkud obstarožní Jython atd.). Připomeňme si, že ØMQ je naprogramována v jazyce C++ a existuje pro ni nativní rozhraní jak pro C, tak i pro C++. Díky tomu je relativně přímočaré použít rozhraní JNI (Java Native Interface) pro volání nativního kódu z Javy. Tento přístup je použit v knihovně JZMQ, jejíž zdrojové kódy naleznete v GitHub repositáři https://github.com/zeromq/jzmq. Předností tohoto způsobu je automaticky zaručená kompatibilita s ostatními aplikacemi a nástroji, které ØMQ používají. Nevýhodou pak především složitější překlad a instalace JZMQ (ostatně většinou se při kombinaci nativních knihoven a JVM dostaneme do podobné situace).
10. Knihovna JeroMQ
Pokud vám z nějakého důvodu knihovna JZMQ nebude vyhovovat, například při požadavku na to, že se může nasazovat jen bajtkód JVM, můžete využít alternativní knihovnu nazvanou JeroMQ. Tato knihovna, jejíž repositář je umístěn na adrese https://github.com/zeromq/jeromq, reimplementuje celou infrastrukturu ØMQ, ovšem na rozdíl od jazyka C++ je použita Java. Tím se podařilo obejít JNI, zjednodušit celý proces překladu a instalace knihovny atd. Možná poněkud paradoxní je, že výsledek nemusí být pomalejší, než nativní varianta ØMQ, a to z toho důvodu, že JNI je samo o sobě dosti problematické a pomalé, takže záleží především na tom, jak často se nativní funkce volají a jak velké datové struktury se jim předávají. Viz též několik porovnání dostupných na wiki https://github.com/zeromq/jeromq/wiki/Performance.
Knihovnu JeroMQ můžeme buď přidat do svého pom.xml (pokud používáte Maven popř. systém postavený na Mavenu), popř. si pro pouhé otestování můžete stáhnout již připravený java archiv s přeloženou knihovnou:
wget https://oss.sonatype.org/content/repositories/snapshots/org/zeromq/jeromq/0.4.4-SNAPSHOT/jeromq-0.4.4-20180323.141003-1.jar
Tento druhý způsob využijeme v dalších příkladech.
11. Komunikace typu REQ-REP: klient naprogramovaný v Pythonu a server v Javě
Ukažme si nyní, jak je možné komunikovat mezi klientem naprogramovaným v Pythonu a používajícím nativní ØMQ na jedné straně, se serverem naprogramovaným v Javě a používajícím JeroMQ na straně druhé. Využijeme základní komunikační strategii REQ-REP, s níž jsme již dobře seznámeni.
Jak jsme si již řekli v úvodním odstavci, je klient vytvořen v Pythonu a prozatím v něm nenajdeme žádnou novou funkcionalitu:
import zmq def connect(port, connection_type): """Otevření socketu se specifikovaným typem spojení.""" context = zmq.Context() socket = context.socket(connection_type) address = "tcp://localhost:{port}".format(port=port) socket.connect(address) print("Connected to {a}".format(a=address)) return socket def send_request(socket, request): """Poslání požadavku.""" print("Sending request '{r}'".format(r=request)) socket.send_string(request) def start_client(): """Spuštění klienta.""" socket = connect(5555, zmq.REQ) send_request(socket, "1") print(socket.recv_string()) print() send_request(socket, "10") print(socket.recv_string()) print() send_request(socket, "xyzzy") print(socket.recv_string()) print() start_client()
Naproti tomu pro implementaci serveru použijeme Javu. Server bude přijímat požadavky a pokud bude v požadavku předáno celé číslo (zapsané do řetězce), vypočte server faktoriál tohoto čísla a vrátí výsledek. V případě jakéhokoli problému se vrátí zpráva s informací o tom, proč k problému došlo. Zdrojový kód serveru může vypadat následovně:
import org.zeromq.ZMQ; import org.zeromq.ZContext; public class Server { public static long factorial(long n) { int i, result=1; for(i=2; i<=n; i++) { result *= i; } return result; } public static void main(String[] args) throws Exception { try (ZContext context = new ZContext()) { ZMQ.Socket socket = context.createSocket(ZMQ.REP); socket.bind("tcp://*:5555"); while (!Thread.currentThread().isInterrupted()) { byte[] raw_request = socket.recv(0); String request = new String(raw_request, ZMQ.CHARSET); String response = null; System.out.println("Received: " + request); try { long n = Integer.parseInt(request); if (n < 0) { response = "Invalid input!"; } else { response = n + "! = " + Server.factorial(n); } } catch (Exception e) { response = "Wrong input!"; } socket.send(response.getBytes(ZMQ.CHARSET), 0); } } } }
Oproti kódu naprogramovanému v Pythonu je zde provedeno několik změn, které se týkají kódování a dekódování řetězců. Musíme mít totiž na paměti, že v ØMQ se přenáší zprávy obsahující pouze sekvenci bajtů a o případnou interpretaci těchto bajtů se musíme postarat sami. Týká se to především interpretace konce řetězce a potom taktéž kódování, resp. přesněji řečeno mapování mezi bajty a znaky (typicky v Unicode). Proto můžeme v kódu výše vidět dekódování přijatého požadavku a na konci kódování řetězce se zprávou zpět na pole bajtů (byte[]).
12. Otestování komunikace klient-server
V případě, že jste si již v rámci předchozí kapitoly stáhli java archiv s přeloženou knihovnou JeroMQ, může překlad serveru proběhnout takto:
javac -cp jeromq-0.4.4-20180323.141003-1.jar Server.java
Spuštění serveru:
java -cp .:jeromq-0.4.4-20180323.141003-1.jar Server
Nyní můžeme spustit klienta a sledovat, že se serverem skutečně dokáže bez problémů komunikovat, i když se na každé komunikující straně používá zcela odlišná implementace ØMQ:
$ python3 client.py Connected to tcp://localhost:5555 Sending request '1' 1! = 1 Sending request '10' 10! = 3628800 Sending request 'xyzzy' Wrong input!
Logy na straně serveru:
Received: 1 Received: 10 Received: xyzzy
13. Portace klienta do Javy
Samozřejmě nám nic nebrání přepsat do Javy i samotného klienta. Jedna z možných reimplementací může vypadat následovně:
import org.zeromq.ZMQ; import org.zeromq.ZContext; public class Client { static void send_request(ZMQ.Socket socket, int number) { String request = String.valueOf(number); socket.send(request.getBytes(ZMQ.CHARSET), 0); System.out.println("Sent: " + request); byte[] raw_response = socket.recv(0); String response = new String(raw_response, ZMQ.CHARSET); System.out.println("Received from server: " + response); } public static void main(String[] args) throws Exception { try (ZContext context = new ZContext()) { ZMQ.Socket socket = context.createSocket(ZMQ.REQ); socket.connect("tcp://localhost:5555"); for (int i=0; i<10; i++) { send_request(socket, i); } send_request(socket, -10); } } }
Překlad klienta:
java -cp .:jeromq-0.4.4-20180323.141003-1.jar Client
Spuštění nové verze klienta:
java -cp .:jeromq-0.4.4-20180323.141003-1.jar Client
14. Komunikační strategie typu PUSH-PULL: přepis producenta zpráv do Javy
Další příklad, který přepíšeme z Pythonu do Javy, se skládá z producenta a konzumenta zpráv, kteří mezi sebou komunikují s využitím strategie PUSH-PULL, kterou jsme si připomněli mj. i v úvodních kapitolách. Javovská implementace producenta zpráv vznikla prakticky přímým přepisem původního kódu napsaného v Pythonu (samozřejmě až na větší „ukecanost“Javy a striktní typování):
import org.zeromq.ZMQ; import org.zeromq.ZContext; public class Producer { static final int PORT = 5555; static void send_message(ZMQ.Socket socket, String message) { socket.send(message.getBytes(ZMQ.CHARSET), 0); } public static void main(String[] args) throws Exception { try (ZContext context = new ZContext()) { ZMQ.Socket socket = context.createSocket(ZMQ.PUSH); String address = "tcp://*:" + PORT; socket.bind(address); System.out.println("Bound to address " + address); for (int i=0; i<100; i++) { String message = "Messsage #" + i; send_message(socket, message); try { Thread.sleep(50); } catch (InterruptedException e) { System.out.println("Interrupted"); return; } } } } }
Spuštění producenta se nebude nijak lišit od předchozích implementací serveru či klienta – pouze musíme zajistit, aby na classpath byl uložen mj. i Java archiv s knihovnou JeroMQ:
java -cp .:jeromq-0.4.4-20180323.141003-1.jar Producer
15. Komunikační strategie typu PUSH-PULL: přepis konzumenta zpráv do Javy
I při přepisu konzumenta zpráv jsme vycházeli z originálního skriptu vytvořeného v Pythonu. Povšimněte si, že se v javovské verzi konzumenta opět převádí pole bajtů na řetězec:
import org.zeromq.ZMQ; import org.zeromq.ZContext; public class Consumer { static final int PORT = 5555; static void receive_messages(ZMQ.Socket socket) { int cnt = 0; while (true) { byte[] raw_message = socket.recv(0); cnt++; String message = new String(raw_message, ZMQ.CHARSET); System.out.println("Received message: " + message); } } public static void main(String[] args) throws Exception { try (ZContext context = new ZContext()) { ZMQ.Socket socket = context.createSocket(ZMQ.PULL); String address = "tcp://localhost:" + PORT; socket.connect(address); System.out.println("Connected to " + address); System.out.println("Waiting for message..."); receive_messages(socket); } } }
Spuštění konzumenta:
java -cp .:jeromq-0.4.4-20180323.141003-1.jar Consumer
Ukázka vzájemného posílání zpráv od producenta ke konzumentovi:
$ ./run_producer.sh Bound to address tcp://*:5555
$ ./run_consumer.sh Connected to tcp://localhost:5555 Waiting for message... Received message: Messsage #0 Received message: Messsage #1 Received message: Messsage #2 ... ... ... Received message: Messsage #97 Received message: Messsage #98 Received message: Messsage #99
16. Rozdělení zátěže mezi větší počet konzumentů (workerů)
Nic nám samozřejmě nebrání spustit větší množství konzumentů zpráv (workerů) a tím pádem rozdělit zátěž – úkoly vytvářené producentem. Zkusme si například pustit tři workery a sledovat algoritmus round robin v praxi:
Worker #1:
$ ./run_consumer.sh Connected to tcp://localhost:5555 Waiting for message... Received message: Messsage #0 Received message: Messsage #1 Received message: Messsage #4 Received message: Messsage #7 ... ... ... Received message: Messsage #91 Received message: Messsage #94 Received message: Messsage #97
Worker #2:
$ ./run_consumer.sh Connected to tcp://localhost:5555 Waiting for message... Received message: Messsage #2 Received message: Messsage #5 Received message: Messsage #8 ... ... ... Received message: Messsage #92 Received message: Messsage #95 Received message: Messsage #98
Worker #3:
$ ./run_consumer.sh Connected to tcp://localhost:5555 Waiting for message... Received message: Messsage #3 Received message: Messsage #6 Received message: Messsage #9 ... ... ... Received message: Messsage #93 Received message: Messsage #96 Received message: Messsage #99
17. Repositář s demonstračními příklady
Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v Pythonu a taktéž v programovacím jazyku C byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce.
18. Odkazy na předchozí části seriálu
V této kapitole jsou uvedeny odkazy na všech osm předchozích částí seriálu, v němž se zabýváme různými způsoby implementace front zpráv a k nim přidružených technologií:
- Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí
https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/ - Celery: systém implementující asynchronní fronty úloh pro Python
https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python/ - Celery: systém implementující asynchronní fronty úloh pro Python (dokončení)
https://www.root.cz/clanky/celery-system-implementujici-asynchronni-fronty-uloh-pro-python-dokonceni/ - RabbitMQ: jedna z nejúspěšnějších implementací brokera
https://www.root.cz/clanky/rabbitmq-jedna-z-nejuspesnejsich-implementaci-brokera/ - Pokročilejší operace nabízené systémem RabbitMQ
https://www.root.cz/clanky/pokrocilejsi-operace-nabizene-systemem-rabbitmq/ - ØMQ: knihovna pro asynchronní předávání zpráv
https://www.root.cz/clanky/0mq-knihovna-pro-asynchronni-predavani-zprav/ - Další možnosti poskytované knihovnou ØMQ
https://www.root.cz/clanky/dalsi-moznosti-poskytovane-knihovnou-mq/
19. Odkazy na Internetu
- Snapshot JeroMQ verze 0.4.4
https://oss.sonatype.org/content/repositories/snapshots/org/zeromq/jeromq/0.4.4-SNAPSHOT/ - Repositář knihovny JeroMQ
https://github.com/zeromq/jeromq/ - ØMQ – Distributed Messaging
http://zeromq.org/ - ØMQ Community
http://zeromq.org/community - Get The Software
http://zeromq.org/intro:get-the-software - PyZMQ Documentation
https://pyzmq.readthedocs.io/en/latest/ - Module: zmq.decorators
https://pyzmq.readthedocs.io/en/latest/api/zmq.decorators.html - ZeroMQ is the answer, by Ian Barber
https://vimeo.com/20605470 - ZeroMQ RFC
https://rfc.zeromq.org/ - ZeroMQ and Clojure, a brief introduction
https://antoniogarrote.wordpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/ - zeromq/czmq
https://github.com/zeromq/czmq - golang wrapper for CZMQ
https://github.com/zeromq/goczmq - ZeroMQ version reporting in Python
http://zguide.zeromq.org/py:version - A Go interface to ZeroMQ version 4
https://github.com/pebbe/zmq4 - Broker vs. Brokerless
http://zeromq.org/whitepapers:brokerless - Learning ØMQ with pyzmq
https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/ - Céčková funkce zmq_ctx_new
http://api.zeromq.org/4–2:zmq-ctx-new - Céčková funkce zmq_ctx_destroy
http://api.zeromq.org/4–2:zmq-ctx-destroy - Céčková funkce zmq_bind
http://api.zeromq.org/4–2:zmq-bind - Céčková funkce zmq_unbind
http://api.zeromq.org/4–2:zmq-unbind - Céčková C funkce zmq_connect
http://api.zeromq.org/4–2:zmq-connect - Céčková C funkce zmq_disconnect
http://api.zeromq.org/4–2:zmq-disconnect - Céčková C funkce zmq_send
http://api.zeromq.org/4–2:zmq-send - Céčková C funkce zmq_recv
http://api.zeromq.org/4–2:zmq-recv - Třída Context (Python)
https://pyzmq.readthedocs.io/en/latest/api/zmq.html#context - Třída Socket (Python)
https://pyzmq.readthedocs.io/en/latest/api/zmq.html#socket - Python binding
http://zeromq.org/bindings:python - Why should I have written ZeroMQ in C, not C++ (part I)
http://250bpm.com/blog:4 - Why should I have written ZeroMQ in C, not C++ (part II)
http://250bpm.com/blog:8 - About Nanomsg
https://nanomsg.org/ - Advanced Message Queuing Protocol
https://www.amqp.org/ - Advanced Message Queuing Protocol na Wikipedii
https://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol - Dokumentace k příkazu rabbitmqctl
https://www.rabbitmq.com/rabbitmqctl.8.html - RabbitMQ
https://www.rabbitmq.com/ - RabbitMQ Tutorials
https://www.rabbitmq.com/getstarted.html - RabbitMQ: Clients and Developer Tools
https://www.rabbitmq.com/devtools.html - RabbitMQ na Wikipedii
https://en.wikipedia.org/wiki/RabbitMQ - Streaming Text Oriented Messaging Protocol
https://en.wikipedia.org/wiki/Streaming_Text_Oriented_Messaging_Protocol - Message Queuing Telemetry Transport
https://en.wikipedia.org/wiki/MQTT - Erlang
http://www.erlang.org/ - pika 0.12.0 na PyPi
https://pypi.org/project/pika/ - Introduction to Pika
https://pika.readthedocs.io/en/stable/ - Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
http://clojurerabbitmq.info/ - AMQP 0–9–1 Model Explained
http://www.rabbitmq.com/tutorials/amqp-concepts.html - Part 1: RabbitMQ for beginners – What is RabbitMQ?
https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html - Downloading and Installing RabbitMQ
https://www.rabbitmq.com/download.html - celery na PyPi
https://pypi.org/project/celery/ - Databáze Redis (nejenom) pro vývojáře používající Python
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/ - Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/ - Redis Queue (RQ)
https://www.fullstackpython.com/redis-queue-rq.html - Python Celery & RabbitMQ Tutorial
https://tests4geeks.com/python-celery-rabbitmq-tutorial/ - Flower: Real-time Celery web-monitor
http://docs.celeryproject.org/en/latest/userguide/monitoring.html#flower-real-time-celery-web-monitor - Asynchronous Tasks With Django and Celery
https://realpython.com/asynchronous-tasks-with-django-and-celery/ - First Steps with Celery
http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html - node-celery
https://github.com/mher/node-celery - Full Stack Python: web development
https://www.fullstackpython.com/web-development.html - Introducing RQ
https://nvie.com/posts/introducing-rq/ - Asynchronous Tasks with Flask and Redis Queue
https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue - rq-dashboard
https://github.com/eoranged/rq-dashboard - Stránky projektu Redis
https://redis.io/ - Introduction to Redis
https://redis.io/topics/introduction - Try Redis
http://try.redis.io/ - Redis tutorial, April 2010 (starší, ale pěkně udělaný)
https://static.simonwillison.net/static/2010/redis-tutorial/ - Python Redis
https://redislabs.com/lp/python-redis/ - Redis: key-value databáze v paměti i na disku
https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/ - Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
http://www.cloudsvet.cz/?p=253 - Praktický úvod do Redis (2): transakce
http://www.cloudsvet.cz/?p=256 - Praktický úvod do Redis (3): cluster
http://www.cloudsvet.cz/?p=258 - Connection pool
https://en.wikipedia.org/wiki/Connection_pool - Instant Redis Sentinel Setup
https://github.com/ServiceStack/redis-config - How to install REDIS in LInux
https://linuxtechlab.com/how-install-redis-server-linux/ - Redis RDB Dump File Format
https://github.com/sripathikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format - Lempel–Ziv–Welch
https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch - Redis Persistence
https://redis.io/topics/persistence - Redis persistence demystified
http://oldblog.antirez.com/post/redis-persistence-demystified.html - Redis reliable queues with Lua scripting
http://oldblog.antirez.com/post/250 - Ost (knihovna)
https://github.com/soveran/ost - NoSQL
https://en.wikipedia.org/wiki/NoSQL - Shard (database architecture)
https://en.wikipedia.org/wiki/Shard_%28database_architecture%29 - What is sharding and why is it important?
https://stackoverflow.com/questions/992988/what-is-sharding-and-why-is-it-important - What Is Sharding?
https://btcmanager.com/what-sharding/ - Redis clients
https://redis.io/clients - Category:Lua-scriptable software
https://en.wikipedia.org/wiki/Category:Lua-scriptable_software - Seriál Programovací jazyk Lua
https://www.root.cz/serialy/programovaci-jazyk-lua/ - Redis memory usage
http://nosql.mypopescu.com/post/1010844204/redis-memory-usage - Ukázka konfigurace Redisu pro lokální testování
https://github.com/tisnik/presentations/blob/master/redis/redis.conf - Resque
https://github.com/resque/resque - Nested transaction
https://en.wikipedia.org/wiki/Nested_transaction - Publish–subscribe pattern
https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern - Messaging pattern
https://en.wikipedia.org/wiki/Messaging_pattern - Using pipelining to speedup Redis queries
https://redis.io/topics/pipelining - Pub/Sub
https://redis.io/topics/pubsub - ZeroMQ distributed messaging
http://zeromq.org/ - ZeroMQ: Modern & Fast Networking Stack
https://www.igvita.com/2010/09/03/zeromq-modern-fast-networking-stack/ - Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
https://stackoverflow.com/questions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr - Python & Redis PUB/SUB
https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7 - Message broker
https://en.wikipedia.org/wiki/Message_broker - RESP Arrays
https://redis.io/topics/protocol#array-reply - Redis Protocol specification
https://redis.io/topics/protocol - Redis Pub/Sub: Intro Guide
https://www.redisgreen.net/blog/pubsub-intro/ - Redis Pub/Sub: Howto Guide
https://www.redisgreen.net/blog/pubsub-howto/ - Comparing Publish-Subscribe Messaging and Message Queuing
https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message - Apache Kafka
https://kafka.apache.org/ - Iron
http://www.iron.io/mq - kue (založeno na Redisu, určeno pro node.js)
https://github.com/Automattic/kue - Cloud Pub/Sub
https://cloud.google.com/pubsub/ - Introduction to Redis Streams
https://redis.io/topics/streams-intro - glob (programming)
https://en.wikipedia.org/wiki/Glob_(programming) - Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
https://www.slideshare.net/sylvinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2 - Enqueueing internals
http://python-rq.org/contrib/ - queue — A synchronized queue class
https://docs.python.org/3/library/queue.html - Queues
http://queues.io/ - Windows Subsystem for Linux Documentation
https://docs.microsoft.com/en-us/windows/wsl/about - RestMQ
http://restmq.com/ - ActiveMQ
http://activemq.apache.org/ - Amazon MQ
https://aws.amazon.com/amazon-mq/ - Amazon Simple Queue Service
https://aws.amazon.com/sqs/ - Celery: Distributed Task Queue
http://www.celeryproject.org/ - Disque, an in-memory, distributed job queue
https://github.com/antirez/disque - rq-dashboard
https://github.com/eoranged/rq-dashboard - Projekt RQ na PyPi
https://pypi.org/project/rq/ - rq-dashboard 0.3.12
https://pypi.org/project/rq-dashboard/ - Job queue
https://en.wikipedia.org/wiki/Job_queue - Why we moved from Celery to RQ
https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq - Running multiple workers using Celery
https://serverfault.com/questions/655387/running-multiple-workers-using-celery - celery — Distributed processing
http://docs.celeryproject.org/en/latest/reference/celery.html - Chains
https://celery.readthedocs.io/en/latest/userguide/canvas.html#chains - Routing
http://docs.celeryproject.org/en/latest/userguide/routing.html#automatic-routing - Celery Distributed Task Queue in Go
https://github.com/gocelery/gocelery/ - Python Decorators
https://wiki.python.org/moin/PythonDecorators - Periodic Tasks
http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html - celery.schedules
http://docs.celeryproject.org/en/latest/reference/celery.schedules.html#celery.schedules.crontab - Pros and cons to use Celery vs. RQ
https://stackoverflow.com/questions/13440875/pros-and-cons-to-use-celery-vs-rq - Priority queue
https://en.wikipedia.org/wiki/Priority_queue - Jupyter
https://jupyter.org/ - How IPython and Jupyter Notebook work
https://jupyter.readthedocs.io/en/latest/architecture/how_jupyter_ipython_work.html - Context Managers
http://book.pythontips.com/en/latest/context_managers.html