Obsah
1. Dramatiq – další užitečná knihovna pro práci s frontami úloh v Pythonu
2. Koncept workerů a skriptů pro plánování úloh
3. Instalace nástroje Dramatiq
4. Implementace prvního skriptu pro naplánování úlohy
6. Naplánování úlohy a její zpracování workerem
7. Druhý demonstrační příklad – předání parametrů workerům
8. Spuštění druhého demonstračního příkladu
9. Třetí demonstrační příklad – worker vyhazující výjimku
10. Spuštění třetího demonstračního příkladu
11. Čtvrtý příklad – omezení počtu znovuposlaných zpráv do workera
12. Pátý příklad – nastavení minimálního a maximálního časového intervalu mezi znovuposláním zprávy
13. Konfigurace aplikace v případě použití většího množství workerů a front
14. Šestý demonstrační příklad – tři na sobě nezávislí workeři
15. Úlohy spouštěné ve skupině
16. Uložení výsledků práce workerů
18. Nástroj dramatiq-dashboard pro sledování front
19. Repositář s demonstračními příklady
1. Dramatiq – další užitečná knihovna pro práci s frontami úloh v Pythonu
V seriálu o message brokerech a na nich založených technologiích jsme se již setkali s několika projekty, které zprostředkovávají vysokoúrovňové rozhraní k message brokerům pro vývojáře používající programovací jazyk Python. Připomeňme si, že se jedná především o projekty RQ (Redis Queue) a Celery, ovšem existují i další podobné nástroje, například příště popsaný projekt s názvem Huey. Dnes se seznámíme s dalším podobně koncipovaným projektem, který se jmenuje Dramatiq a který je určen pro plánování úloh (job, task) pro samostatně běžící workery. V případě, že worker nějakou úlohu nezpracuje (tj. vyhodí výjimku), je možné přesně specifikovat, co se má s úlohou dále udělat; může se například přeposlat znovu, uložit do takzvané DLQ apod. To však není vše, protože lze vytvářet skupiny (groups) workerů, čekat na jejich výsledky, tvořit kolony (pipeline) workerů s automatickým předáváním mezivýsledků atd.
Sám autor projektu Dramatiq píše, že je dlouholetým uživatelem systému Celery a že je frustrován některými jeho vlastnostmi. A právě z těchto důvodů vytvořil dnes popisovaný projekt Dramatiq, který by měl být oproti Celery v mnoha směrech vylepšen a/nebo zjednodušen. Do jaké míry se to podařilo, by mohlo být patrné z demonstračních příkladů, které budou popsány v navazujících kapitolách.
V následující tabulce jsou pro přehlednost porovnány čtyři systémy s implementací front zpráv pro Python, samozřejmě včetně projektu Dramatiq:
# | Vlastnost | Dramatiq | Celery | Huey | RQ |
---|---|---|---|---|---|
1 | podpora Pythonu 2 | ❌ | ✓ | ✓ | ✓ |
2 | jednoduchá implementace | ✓ | ❌ | ✓ | ✓ |
3 | automatické přeposlání zhavarovaných úloh | ✓ | ❌ | ✓ | ❌ |
4 | zajištění doručení úlohy | ✓ | ❌ | ❌ | ❌ |
5 | omezení počtu zpráv | ✓ | ❌ | ✓ | ❌ |
6 | specifikace priority úlohy | ✓ | ❌ | ❌ | ❌ |
7 | úlohy naplánované na pozdější dobu | ✓ | ✓ | ✓ | ❌ |
8 | plánování úloh ve stylu cronu | ❌ | ✓ | ✓ | ❌ |
9 | podpora pro kolony (pipeline) | ✓ | ✓ | ✓ | ❌ |
10 | možnost uložení výsledků do databáze (Redis…) | ✓ | ✓ | ✓ | ✓ |
11 | automatické znovunačtení kódu workera při změně | ✓ | ❌ | ❌ | ❌ |
12 | podpora RabbitMQ jako brokera | ✓ | ✓ | ✓ | ❌ |
13 | podpora Redisu jako brokera | ✓ | ✓ | ✓ | ✓ |
14 | podpora brokera umístěného v paměti | ✓ | ❌ | ✓ | ❌ |
15 | podpora greenletů | ✓ | ✓ | ✓ | ❌ |
2. Koncept workerů a skriptů pro plánování úloh
Zprávy (či přesněji řečeno úlohy), s nimiž projekt Dramatiq pracuje, jsou interně spravovány nějakým obecným brokerem. Nativně jsou podporováni dva brokeři. První z nich je založen na databázi Redis, s níž jsme se seznámili v článku Databáze Redis (nejenom) pro vývojáře používající Python. Druhým standardně podporovaným brokerem je RabbitMQ. I ten již známe, viz též články RabbitMQ: jedna z nejúspěšnějších implementací brokera a Pokročilejší operace nabízené systémem RabbitMQ. To však není vše, protože v případě potřeby lze použít i další brokery a dokonce i služby s implementací front zpráv, zejména pak SQS, což je služba nabízená jako součást systému Amazon Web Services (AWS).
3. Instalace nástroje Dramatiq
Instalace celého systému Dramatiq se skládá ze dvou kroků. V prvním kroku je nutné nainstalovat zvoleného message brokera, který bude interně používán jak pro správu front, tak i pro případné ukládání výsledků práce jednotlivých workerů. V kroku druhém se nainstaluje samotný Dramatiq, a to s využitím nástroje pip nebo pip3.
Nejprve si popišme první krok, a to konkrétně instalaci Redisu. To je většinou snadné, protože balíček s Redisem je součástí většiny linuxových distribucí. Například na Fedoře může instalace vypadat následovně:
$ sudo dnf install redis Last metadata expiration check: 0:15:30 ago on Wed 24 Oct 2018, 22:50:11 CEST. Dependencies resolved. ================================================================================ Package Arch Version Repository Size ================================================================================ Installing: redis x86_64 4.0.9-1.fc27 updates 580 k Installing dependencies: jemalloc x86_64 4.5.0-5.fc27 updates 210 k Transaction Summary ================================================================================ Install 2 Packages Total download size: 790 k Installed size: 2.0 M Is this ok [y/N]:
Na systémech založených na Debianu (včetně Ubuntu) lze pro instalaci použít příkaz:
$ apt-get install redis-server
V případě, že budete potřebovat použít nejnovější verzi Redisu, můžete si ho sami přeložit. Postup je jednoduchý (mj. i díky minimálním závislostem na dalších knihovnách) a je podrobně popsán na stránce https://redis.io/topics/quickstart.
Pro vlastní databázi, konfigurační soubor, žurnál a logy Redisu použijeme samostatný adresář, který vytvoříme v domácím adresáři připojeného uživatele:
$ mkdir redis $ cd redis
Po instalaci se můžeme přesvědčit, že je skutečně k dispozici spustitelný soubor s implementací serveru i řádkového klienta:
$ whereis -b redis-cli redis-cli: /usr/bin/redis-cli
$ whereis -b redis-server redis-server: /usr/bin/redis-server
Následně přímo v tomto adresáři vytvoříme konfigurační soubor nazvaný redis.conf. Můžeme se přitom inspirovat souborem /etc/redis/redis.conf (Debian a systémy od něj odvozené), popř. /etc/redis.conf (Fedora, RHEL, CentOS), který je však poměrně rozsáhlý, protože kromě vlastních konfiguračních voleb obsahuje i podrobné informace o významu jednotlivých konfiguračních voleb. Tento soubor je taktéž dostupný na internetu na adrese https://raw.githubusercontent.com/antirez/redis/4.0/redis.conf.
Následuje výpis obsahu konfiguračního souboru, který je připraven pro lokální spuštění Redisu, bez nebezpečí, že se k běžícímu serveru připojí případný útočník. Důležité volby jsou zvýrazněny. Pokud se vám soubor nechce kopírovat, naleznete ho na adrese https://github.com/tisnik/presentations/blob/master/redis/redis.conf:
bind 127.0.0.1 protected-mode yes port 6379 tcp-backlog 511 timeout 0 tcp-keepalive 300 daemonize no supervised no pidfile /var/run/redis_6379.pid loglevel notice logfile redis.log databases 16 always-show-logo yes save 900 1 save 300 10 save 60 10000 stop-writes-on-bgsave-error yes rdbcompression yes rdbchecksum yes dbfilename dump.rdb dir . slave-serve-stale-data yes slave-read-only yes repl-diskless-sync no repl-diskless-sync-delay 5 repl-disable-tcp-nodelay no slave-priority 100 lazyfree-lazy-eviction no lazyfree-lazy-expire no lazyfree-lazy-server-del no slave-lazy-flush no appendonly yes appendfilename "appendonly.aof" appendfsync everysec no-appendfsync-on-rewrite no auto-aof-rewrite-percentage 100 auto-aof-rewrite-min-size 64mb aof-load-truncated yes aof-use-rdb-preamble no lua-time-limit 5000 slowlog-log-slower-than 10000 slowlog-max-len 128 latency-monitor-threshold 0 notify-keyspace-events "" hash-max-ziplist-entries 512 hash-max-ziplist-value 64 list-max-ziplist-size -2 list-compress-depth 0 set-max-intset-entries 512 zset-max-ziplist-entries 128 zset-max-ziplist-value 64 hll-sparse-max-bytes 3000 activerehashing yes client-output-buffer-limit normal 0 0 0 client-output-buffer-limit slave 256mb 64mb 60 client-output-buffer-limit pubsub 32mb 8mb 60 hz 10 aof-rewrite-incremental-fsync yes
Nyní již můžeme databázi Redis spustit, aniž by došlo k tomu, že bude její API viditelné ostatním počítačům připojeným do sítě:
$ redis-server redis.conf
Na druhém terminálu pak již můžeme (čistě pro otestování) spustit klienta Redisu, který uživatelům nabízí interaktivní příkazový řádek:
$ redis-cli 127.0.0.1:6379>
Příkazem „ping“ můžeme otestovat, jestli se klient připojí k serveru a zda od něj dokáže získávat odpovědi:
127.0.0.1:6379> ping PONG 127.0.0.1:6379> ping test "test"
Jak již víme z předchozího textu, bude následovat instalace samotného systému Dramatiq. Samotná instalace se provede s využitím nástroje pip resp. pip3, protože samotný Dramatiq je pochopitelně dostupný jako balíček na PyPi. Musíme si ovšem dát pozor na to, že instalaci nelze provést pouze takto:
$ pip3 install --user dramatiq
Musíme totiž specifikovat i to, na jakém brokerovi bude nástroj Dramatiq založen. Pokud použijeme RabbitMQ, mohl by příkaz pro instalaci vypadat takto:
$ pip3 install --user 'dramatiq[rabbitmq, watch]'
My ovšem použijeme Redis, takže příkaz pro instalaci bude nepatrně odlišný:
$ pip3 install --user 'dramatiq[redis, watch]'
Samotná instalace proběhne velmi rychle (v mém případě ještě rychleji, protože jsem již mnoho závislostí měl nainstalovaných):
Requirement already satisfied: dramatiq[redis,watch] in ./.local/lib/python3.6/site-packages Requirement already satisfied: prometheus-client<0.3,>=0.2 in ./.local/lib/python3.6/site-packages (from dramatiq[redis,watch]) Requirement already satisfied: redis<4.0,>=2.0; extra == "redis" in ./.local/lib/python3.6/site-packages (from dramatiq[redis,watch]) Collecting watchdog<0.9,>=0.8; extra == "watch" (from dramatiq[redis,watch]) Collecting watchdog-gevent==0.1; extra == "watch" (from dramatiq[redis,watch]) Using cached https://files.pythonhosted.org/packages/c1/85/14264c65d46c3e15d201dcf86722838a1d9d8b8443de8bfd8b19299f429b/watchdog_gevent-0.1.0-py3-none-any.whl Collecting argh>=0.24.1 (from watchdog<0.9,>=0.8; extra == "watch"->dramatiq[redis,watch]) Using cached https://files.pythonhosted.org/packages/06/1c/e667a7126f0b84aaa1c56844337bf0ac12445d1beb9c8a6199a7314944bf/argh-0.26.2-py2.py3-none-any.whl Collecting pathtools>=0.1.1 (from watchdog<0.9,>=0.8; extra == "watch"->dramatiq[redis,watch]) Requirement already satisfied: PyYAML>=3.10 in /usr/lib64/python3.6/site-packages (from watchdog<0.9,>=0.8; extra == "watch"->dramatiq[redis,watch]) Collecting gevent>=1.1 (from watchdog-gevent==0.1; extra == "watch"->dramatiq[redis,watch]) Using cached https://files.pythonhosted.org/packages/f2/ca/5b5962361ed832847b6b2f9a2d0452c8c2f29a93baef850bb8ad067c7bf9/gevent-1.4.0-cp36-cp36m-manylinux1_x86_64.whl Requirement already satisfied: greenlet>=0.4.14; platform_python_implementation == "CPython" in ./.local/lib/python3.6/site-packages (from gevent>=1.1->watchdog-gevent==0.1; extra == "watch"->dramatiq[redis,watch]) Installing collected packages: argh, pathtools, watchdog, gevent, watchdog-gevent Successfully installed argh-0.26.2 gevent-1.4.0 pathtools-0.1.2 watchdog-0.8.3 watchdog-gevent-0.1.0
4. Implementace prvního skriptu pro naplánování úlohy
Nyní již tedy máme nainstalovány a nakonfigurovány všechny potřebné balíčky a můžeme se pokusit vytvořit příklad, v němž bude implementován jak worker, tak i skript, který workerovi přiřadí nějakou úlohu (job, task). Příklady budou nepatrně zkomplikovány tím, že budeme používat Redis namísto výchozího RabbitMQ, takže bude nutné explicitně specifikovat, který broker bude použit (v případě RabbitMQ to není bezpodmínečně nutné).
Celá činnost dnešního prvního demonstračního příkladu se dá shrnout do několika bodů:
- Skript naplánuje úlohu. Ta se přenese a uloží do brokera, v našem případě do Redisu.
- Systém Dramatiq (ve chvíli, kdy bude spuštěn) postupně vybírá úlohy/zprávy z brokera, které jsou určeny zvolenému workeru.
- Pokud zprávu přijme, zavolá kód brokera a předá mu příslušný parametr/parametry uložené ve zprávě.
- Worker může v případě potřeby uložit výsledky do Redisu.
- V případě, že worker zhavaruje (což se může velmi snadno stát, ostatně si to i ukážeme), může být zpráva/úloha zopakována popř. uložena do fronty nazvané DLQ (Dead Letter Queue), kde implicitně vydrží týden.
Podívejme se, jak může vypadat skript, který workerovi zadá (naplánuje) úlohu. Nejprve si ukažme celý kód tohoto skriptu:
import time import dramatiq from dramatiq.brokers.redis import RedisBroker from test_worker_1 import test_worker, setup_broker setup_broker() test_worker.send()
Povšimněte si, že musíme importovat samotného workera, aby bylo možné pracovat s identifikátorem test_worker (to je název funkce s implementací workera – viz další kapitolu). Zadání úlohy workerovi je snadné – vše se provede na tomto řádku:
test_worker.send()
5. Implementace workera
Dále se podívejme na implementaci workera. Celý skript s workerem může vypadat následovně:
import time import dramatiq from dramatiq.brokers.redis import RedisBroker def setup_broker(): redis_broker = RedisBroker(host="localhost", port=6379) dramatiq.set_broker(redis_broker) return redis_broker setup_broker() @dramatiq.actor def test_worker(): print("Working") time.sleep(1) print("Done")
Povšimněte si především pomocné funkce setup_broker sloužící pro konfiguraci message brokera. V této funkci specifikujeme, na jaké adrese a na jakém portu je dostupné API message brokera; následně je broker nastaven jako výchozí message broker pro všechny dále specifikované workery:
def setup_broker(): redis_broker = RedisBroker(host="localhost", port=6379) dramatiq.set_broker(redis_broker) return redis_broker
Výše popsanou funkci setup_broker je nutné zavolat ještě předtím, než se bude interpretovat samotná implementace workera. Ta je prozatím velmi jednoduchá, protože se jedná o funkci s dekorátorem @dramatiq.actor. Vzhledem k tomu, že našemu prvnímu workeru nebudeme předávat žádné parametry, bude i funkce s jeho implementací bez argumentů:
@dramatiq.actor def test_worker(): print("Working") time.sleep(1) print("Done")
6. Naplánování úlohy a její zpracování workerem
Workeři, kterým jsou předávány jednotlivé úlohy, pochopitelně nejsou spouštěni přímo zavoláním funkce, v níž jsou implementováni (zde konkrétně funkce test_worker). Namísto toho je předávání úloh a plánování práce workerů ponecháno na nástroji dramatiq ovládaném z příkazového řádku. Tento nástroj se spustí jednoduše – přesuneme se do adresáře se skriptem obsahujícím kód workera a napíšeme:
$ dramatiq test_worker_1
Povšimněte si, že se spustí celkem osm procesů s čekajícím workerem. Je tomu tak z toho důvodu, že na mém počítači je dostupných osm jader a nástroj dramatiq se snaží využít všechna dostupná jádra (toto nastavení je pochopitelně možné ovlivnit z příkazové řádky):
[2019-07-23 14:01:57,290] [PID 3229] [MainThread] [dramatiq.MainProcess] [INFO] Dramatiq '1.6.0' is booting up. [2019-07-23 14:01:57,323] [PID 3235] [MainThread] [dramatiq.WorkerProcess(0)] [INFO] Worker process is ready for action. [2019-07-23 14:01:57,324] [PID 3237] [MainThread] [dramatiq.WorkerProcess(1)] [INFO] Worker process is ready for action. [2019-07-23 14:01:57,324] [PID 3238] [MainThread] [dramatiq.WorkerProcess(2)] [INFO] Worker process is ready for action. [2019-07-23 14:01:57,330] [PID 3239] [MainThread] [dramatiq.WorkerProcess(3)] [INFO] Worker process is ready for action. [2019-07-23 14:01:57,332] [PID 3240] [MainThread] [dramatiq.WorkerProcess(4)] [INFO] Worker process is ready for action. [2019-07-23 14:01:57,332] [PID 3242] [MainThread] [dramatiq.WorkerProcess(6)] [INFO] Worker process is ready for action. [2019-07-23 14:01:57,334] [PID 3241] [MainThread] [dramatiq.WorkerProcess(5)] [INFO] Worker process is ready for action. [2019-07-23 14:01:57,348] [PID 3243] [MainThread] [dramatiq.WorkerProcess(7)] [INFO] Worker process is ready for action.
Náš worker by se měl spustit a vypsat na terminál informaci o tom, že začal zpracovávat (jedinou) úlohu a následně ji i ukončil po cca jedné sekundě „práce“:
Working Done
Jakmile byla úloha zpracována, můžeme všechny procesy i samotný nástroj dramatiq ukončit, například klávesovou zkratkou Ctrl+C:
[2019-07-23 14:02:51,893] [PID 3414] [MainThread] [dramatiq.MainProcess] [INFO] Sending signal 'SIGTERM' to worker processes... [2019-07-23 14:02:51,895] [PID 3419] [MainThread] [dramatiq.WorkerProcess(0)] [INFO] Stopping worker process... [2019-07-23 14:02:51,895] [PID 3420] [MainThread] [dramatiq.WorkerProcess(1)] [INFO] Stopping worker process... [2019-07-23 14:02:51,895] [PID 3421] [MainThread] [dramatiq.WorkerProcess(2)] [INFO] Stopping worker process... [2019-07-23 14:02:51,895] [PID 3422] [MainThread] [dramatiq.WorkerProcess(3)] [INFO] Stopping worker process... [2019-07-23 14:02:51,895] [PID 3423] [MainThread] [dramatiq.WorkerProcess(4)] [INFO] Stopping worker process... [2019-07-23 14:02:51,895] [PID 3424] [MainThread] [dramatiq.WorkerProcess(5)] [INFO] Stopping worker process... [2019-07-23 14:02:51,895] [PID 3425] [MainThread] [dramatiq.WorkerProcess(6)] [INFO] Stopping worker process... [2019-07-23 14:02:51,895] [PID 3426] [MainThread] [dramatiq.WorkerProcess(7)] [INFO] Stopping worker process... [2019-07-23 14:02:52,741] [PID 3420] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down... [2019-07-23 14:02:52,746] [PID 3421] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down... [2019-07-23 14:02:52,750] [PID 3419] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down... [2019-07-23 14:02:52,750] [PID 3422] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down... [2019-07-23 14:02:52,750] [PID 3423] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down... [2019-07-23 14:02:52,751] [PID 3424] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down... [2019-07-23 14:02:52,751] [PID 3425] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down... [2019-07-23 14:02:52,761] [PID 3426] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down... [2019-07-23 14:02:53,990] [PID 3424] [MainThread] [dramatiq.worker.Worker] [INFO] Worker has been shut down. [2019-07-23 14:02:54,052] [PID 3421] [MainThread] [dramatiq.worker.Worker] [INFO] Worker has been shut down. [2019-07-23 14:02:54,082] [PID 3423] [MainThread] [dramatiq.worker.Worker] [INFO] Worker has been shut down. [2019-07-23 14:02:54,114] [PID 3426] [MainThread] [dramatiq.worker.Worker] [INFO] Worker has been shut down. [2019-07-23 14:02:54,154] [PID 3422] [MainThread] [dramatiq.worker.Worker] [INFO] Worker has been shut down. [2019-07-23 14:02:54,202] [PID 3419] [MainThread] [dramatiq.worker.Worker] [INFO] Worker has been shut down. [2019-07-23 14:02:54,283] [PID 3425] [MainThread] [dramatiq.worker.Worker] [INFO] Worker has been shut down. [2019-07-23 14:02:54,744] [PID 3420] [MainThread] [dramatiq.worker.Worker] [INFO] Worker has been shut down.
7. Druhý demonstrační příklad – předání parametrů workerům
První worker byl ve skutečnosti velmi primitivní, protože neakceptoval žádné parametry. Teoreticky by tedy měl při každém zavolání vykonat tu samou činnost – jinými slovy by měl být idempotentní. Můžeme se ovšem pochopitelně pokusit vytvořit workera, který akceptuje nějaký parametr či parametry. Prozatím se předaný parametr pouze vypíše na standardní výstup. Nová implementace workera může vypadat následovně:
@dramatiq.actor def test_worker(parameter): print("Working, received parameter: {param}".format(param=parameter)) time.sleep(1) print("Done")
Uveďme si celý skript s workerem:
import time import dramatiq from dramatiq.brokers.redis import RedisBroker def setup_broker(): redis_broker = RedisBroker(host="localhost", port=6379) dramatiq.set_broker(redis_broker) return redis_broker setup_broker() @dramatiq.actor def test_worker(parameter): print("Working, received parameter: {param}".format(param=parameter)) time.sleep(1) print("Done")
Pochopitelně se změní i skript, který plánuje jednotlivé úlohy. Nyní tento skript musí generovat i parametry předané workeru. Celkem vytvoříme deset úloh:
for i in range(1, 11): test_worker.send(i)
8. Spuštění druhého demonstračního příkladu
Ihned po spuštění druhého demonstračního příkladu (tedy jak skriptu pro naplánování úloh, tak i nástroje dramatiq) je zajímavé se podívat do logů. Můžeme zde vidět, že se nejprve spustilo osm úloh (protože na testovacím počítači je k dispozici osm jader), a další dvě úlohy mohly být spuštěny až ve chvíli, kdy byly předchozí úlohy dokončeny:
Working, received parameter: 10 Working, received parameter: 3 Working, received parameter: 4 Working, received parameter: 1 Working, received parameter: 5 Working, received parameter: 2 Working, received parameter: 8 Working, received parameter: 7 Done Done Done Done Done Working, received parameter: 6 Done Working, received parameter: 9 Done Done Done Done
9. Třetí demonstrační příklad – worker vyhazující výjimku
Třetí demonstrační příklad se liší od druhého příkladu v tom, že worker pro některý vstupní parametr (konkrétně pro celočíselný parametr dělitelný třemi) vyhodí výjimku:
@dramatiq.actor def test_worker(parameter): print("Working, received parameter: {param}".format(param=parameter)) if parameter % 3 == 0: raise Exception("I don't like this parameter!") time.sleep(1) print("Done")
Tímto způsobem se budeme snažit simulovat reálné chyby a výjimky, které mohou při práci workerů nastat.
10. Spuštění třetího demonstračního příkladu
Po spuštění workerů příkazem dramatiq test_worker3 uvidíme, jak workeři přijali jednotlivé úlohy, ovšem worker, který přijal úlohu s parametrem nastaveným na trojku, posléze zhavaroval:
Working, received parameter: 10 Working, received parameter: 3 Working, received parameter: 4 Working, received parameter: 1 Working, received parameter: 5 Working, received parameter: 2 Working, received parameter: 8 Working, received parameter: 7 [2019-07-23 14:09:40,209] [PID 4347] [Thread-8] [dramatiq.worker.WorkerThread] [WARNING] Failed to process message test_worker(3) with unhandled exception. Traceback (most recent call last): File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/worker.py", line 470, in process_message res = actor(*message.args, **message.kwargs) File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/actor.py", line 145, in __call__ return self.fn(*args, **kwargs) File "./test_worker_3.py", line 20, in test_worker raise Exception("I don't like this parameter!") Exception: I don't like this parameter!
Podobně pochopitelně zhavarují i workery, kterým se předal parametr 6 a 9.
Nástroj Dramatiq se havárii úlohy snaží vyřešit tím, že po určitém čase naplánuje tu samou úlohu, přičemž se časy mezi jednotlivými úlohami postupně zvětšují (protože v naprosté většině případů nechceme dojít do okamžiku, kdy počítač stále dokola opakuje ty samé kroky vedoucí k chybě):
[2019-07-23 14:09:40,216] [PID 4347] [Thread-8] [dramatiq.middleware.retries.Retries] [INFO] Retrying message '73c0f344-b1ae-4601-852e-bab6dc77affb' in 13434 milliseconds. Working, received parameter: 6 [2019-07-23 14:09:40,221] [PID 4347] [Thread-8] [dramatiq.worker.WorkerThread] [WARNING] Failed to process message test_worker(3) with unhandled exception. Traceback (most recent call last): File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/worker.py", line 470, in process_message res = actor(*message.args, **message.kwargs) File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/actor.py", line 145, in __call__ return self.fn(*args, **kwargs) File "./test_worker_3.py", line 20, in test_worker raise Exception("I don't like this parameter!") Exception: I don't like this parameter!
11. Čtvrtý příklad – omezení počtu znovuposlaných zpráv do workera
V mnoha případech je výchozí chování systému Dramatiq rozumné – pokud dojde k chybě, bude zpráva obsahující informaci o naplánované úloze znovu vložena do fronty a worker ji později zopakuje s tím, že doufáme, že zopakování již proběhne v pořádku. Ovšem toto chování můžeme ovlivnit, a to již při plánování úlohy. Můžeme totiž určit maximální počet pokusů o spuštění úlohy. Pro tento účel slouží nepovinný parametr max_retries dekorátoru dramatiq.actor:
@dramatiq.actor(max_retries=3) def test_worker(parameter): print("Working, received parameter: {param}".format(param=parameter)) if parameter % 3 == 0: raise Exception("I don't like this parameter!") time.sleep(1) print("Done")
Nyní se chování systému Dramatiq při práci s havarovanými úlohami změní. První pády vedou k zopakování úlohy:
Working, received parameter: 6 [2019-07-23 14:15:05,061] [PID 4946] [Thread-12] [dramatiq.worker.WorkerThread] [WARNING] Failed to process message test_worker(6) with unhandled exception. Traceback (most recent call last): File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/worker.py", line 470, in process_message res = actor(*message.args, **message.kwargs) File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/actor.py", line 145, in __call__ return self.fn(*args, **kwargs) File "./test_worker_4.py", line 20, in test_worker raise Exception("I don't like this parameter!") Exception: I don't like this parameter!
Posléze (po třech opakováních) se však již jen vypíše poslední varování, že úloha nebyla ani napotřetí dokončena a již nebude znovu provedena:
[2019-07-23 14:15:05,065] [PID 4946] [Thread-12] [dramatiq.middleware.retries.Retries] [WARNING] Retries exceeded for message 'c59cf6e4-f226-4f91-9067-666d0ccd51a1'. Working, received parameter: 3 [2019-07-23 14:15:14,667] [PID 4949] [Thread-5] [dramatiq.worker.WorkerThread] [WARNING] Failed to process message test_worker(3) with unhandled exception. Traceback (most recent call last): File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/worker.py", line 470, in process_message res = actor(*message.args, **message.kwargs) File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/actor.py", line 145, in __call__ return self.fn(*args, **kwargs) File "./test_worker_4.py", line 20, in test_worker raise Exception("I don't like this parameter!") Exception: I don't like this parameter! [2019-07-23 14:15:14,668] [PID 4949] [Thread-5] [dramatiq.middleware.retries.Retries] [WARNING] Retries exceeded for message 'c83ee481-47c1-4dd1-9bbb-c7a1e3083233'. Working, received parameter: 9 [2019-07-23 14:15:24,905] [PID 4948] [Thread-5] [dramatiq.worker.WorkerThread] [WARNING] Failed to process message test_worker(9) with unhandled exception. Traceback (most recent call last): File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/worker.py", line 470, in process_message res = actor(*message.args, **message.kwargs) File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/actor.py", line 145, in __call__ return self.fn(*args, **kwargs) File "./test_worker_4.py", line 20, in test_worker raise Exception("I don't like this parameter!") Exception: I don't like this parameter! [2019-07-23 14:15:24,908] [PID 4948] [Thread-5] [dramatiq.middleware.retries.Retries] [WARNING] Retries exceeded for message 'e32d6343-5af0-4ab8-a7a7-b5ef6cd31ad7'.
12. Pátý příklad – nastavení minimálního a maximálního časového intervalu mezi znovuposláním zprávy
Existuje ještě jeden způsob, jakým můžeme omezit opětovnému naplánování úloh, které zhavarovaly. Je totiž možné použít parametry min_backoff a max_backoff, kterými se řídí časy mezi havárií úlohy a jejím novým naplánovaným časem. Oba časy jsou uvedeny v milisekundách a časová prodleva se postupně zvětšuje (násobí konstantou větší než 1, většinou dvojkou). Jakmile je překročena druhá hodnota (maximální), nebude havarující úloha znovu naplánována:
@dramatiq.actor(min_backoff=100, max_backoff=2000) def test_worker(parameter): print("Working, received parameter: {param}".format(param=parameter)) raise Exception("I don't like this parameter!") time.sleep(1) print("Done")
Opět se podívejme na konkrétní chování systému. První pády úlohy vedou k jejímu opětovnému naplánování:
Working, received parameter: 42 [2019-07-23 14:18:11,229] [PID 5604] [Thread-5] [dramatiq.worker.WorkerThread] [WARNING] Failed to process message test_worker(42) with unhandled exception. Traceback (most recent call last): File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/worker.py", line 470, in process_message res = actor(*message.args, **message.kwargs) File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/actor.py", line 145, in __call__ return self.fn(*args, **kwargs) File "./test_worker_5.py", line 19, in test_worker raise Exception("I don't like this parameter!") Exception: I don't like this parameter! [2019-07-23 14:18:11,233] [PID 5604] [Thread-5] [dramatiq.middleware.retries.Retries] [INFO] Retrying message '257d6c66-b798-47a6-bc44-2373d374eb14' in 70 milliseconds. Working, received parameter: 42 [2019-07-23 14:18:11,333] [PID 5598] [Thread-5] [dramatiq.worker.WorkerThread] [WARNING] Failed to process message test_worker(42) with unhandled exception. Traceback (most recent call last): File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/worker.py", line 470, in process_message res = actor(*message.args, **message.kwargs) File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/actor.py", line 145, in __call__ return self.fn(*args, **kwargs) File "./test_worker_5.py", line 19, in test_worker raise Exception("I don't like this parameter!") Exception: I don't like this parameter!
Nakonec se však oznámí, že již úloha nebude znovu naplánována:
Working, received parameter: 42 [2019-07-23 14:18:41,865] [PID 5601] [Thread-7] [dramatiq.worker.WorkerThread] [WARNING] Failed to process message test_worker(42) with unhandled exception. Traceback (most recent call last): File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/worker.py", line 470, in process_message res = actor(*message.args, **message.kwargs) File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/actor.py", line 145, in __call__ return self.fn(*args, **kwargs) File "./test_worker_5.py", line 19, in test_worker raise Exception("I don't like this parameter!") Exception: I don't like this parameter! [2019-07-23 14:18:41,867] [PID 5601] [Thread-7] [dramatiq.middleware.retries.Retries] [INFO] Retrying message '257d6c66-b798-47a6-bc44-2373d374eb14' in 1562 milliseconds. Traceback (most recent call last): File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/worker.py", line 470, in process_message res = actor(*message.args, **message.kwargs) File "/home/tester/.local/lib/python3.6/site-packages/dramatiq/actor.py", line 145, in __call__ return self.fn(*args, **kwargs) File "./test_worker_5.py", line 19, in test_worker raise Exception("I don't like this parameter!") Exception: I don't like this parameter! [2019-07-23 14:18:43,741] [PID 5604] [Thread-7] [dramatiq.middleware.retries.Retries] [WARNING] Retries exceeded for message '257d6c66-b798-47a6-bc44-2373d374eb14'.
13. Konfigurace aplikace v případě použití většího množství workerů a front
V dalších kapitolách si ukážeme, jak lze nakonfigurovat větší množství workerů a front. Většinou se každému typu workeru přiřadí vlastní fronta, která je po workeru pojmenována. Ovšem jak se jednotlivé naplánované úlohy budou spouštět? Existuje několik možností:
- Workeři jsou na sobě nezávislí a nepředávají si výsledky. Mohou tedy běžet naprosto nezávisle na sobě, což pochopitelně platí i pro naplánované úlohy.
- Workeři tvoří skupiny a můžeme požadovat, aby se počkalo na všechny výsledky workerů ve skupině. Tito workeři mohou běžet paralelně, ovšem bude se vždy čekat na dokončení úloh zpracovaných workery ve skupině (to musíme explicitně specifikovat).
- A naopak, workeři si mohou postupně předávat výsledky a mohou tak tvořitkolonu (pipeline). Teprve výsledek posledního workera je dále použit.
Jednotlivé možnosti jsou ukázány v navazujícím textu.
14. Šestý demonstrační příklad – tři na sobě nezávislí workeři
V šestém příkladu implementujeme tři workery, kteří jsou na sobě nezávislí (to, že volají společnou funkci, je jen výsledek malého refaktoringu):
def worker(name, parameter): print("Worker {w}: working, received parameter: {param}".format(w=name, param=parameter)) time.sleep(1) print("Worker {w}: done".format(w=name)) @dramatiq.actor def test_worker_A(parameter): worker("A", parameter) @dramatiq.actor def test_worker_B(parameter): worker("B", parameter) @dramatiq.actor def test_worker_C(parameter): worker("C", parameter)
Naplánování úloh pro tyto workery může vypadat takto – pro každý worker pět úloh:
for i in range(1, 6): test_worker_A.send(i) test_worker_B.send(i) test_worker_C.send(i)
Po spuštění workerů a naplánování úloh můžeme vidět, že jednotlivé úlohy jsou spouštěny „náhodně“ a nezávisle na ostatních úlohách (čekání na „done“ je způsobeno použitím pouhých osmi procesů):
Worker C: working, received parameter: 2 Worker B: working, received parameter: 1 Worker C: working, received parameter: 1 Worker A: working, received parameter: 1 Worker B: working, received parameter: 2 Worker A: working, received parameter: 2 Worker C: working, received parameter: 3 Worker B: working, received parameter: 3 Worker C: done Worker B: done Worker A: done Worker C: done Worker C: done Worker B: done Worker B: done Worker A: done Worker C: working, received parameter: 5 Worker A: working, received parameter: 5 Worker A: working, received parameter: 4 Worker B: working, received parameter: 4 Worker A: working, received parameter: 3 Worker B: working, received parameter: 5 Worker C: working, received parameter: 4 Worker C: done Worker A: done Worker B: done Worker A: done Worker A: done Worker B: done Worker C: done
15. Úlohy spouštěné ve skupině
Úlohy ovšem můžeme sdružit do skupiny, a to s využitím konstruktoru group, kterému se předá sekvence (seznam, n-tice) workerů s naplánovanou úlohou. Ovšem pozor – zde se nepoužívá metoda send, ale message:
for i in range(1, 6): g = dramatiq.group([ test_worker_A.message(i), test_worker_B.message(i), test_worker_C.message(i) ]).run()
Ani při použití skupiny však nemusí být zaručeno, že jsou úlohy spouštěny v nějakém pevném pořadí. Je tomu tak proto, že nečekáme na výsledky úloh zařazených do skupiny:
Worker A: working, received parameter: 3 Worker A: working, received parameter: 1 Worker C: working, received parameter: 2 Worker B: working, received parameter: 1 Worker C: working, received parameter: 1 Worker B: working, received parameter: 1 Worker B: working, received parameter: 2 Worker C: working, received parameter: 1 Worker C: working, received parameter: 5 Worker C: working, received parameter: 4 Worker A: working, received parameter: 4 Worker C: working, received parameter: 3 Worker C: working, received parameter: 4 Worker A: working, received parameter: 4 Worker B: working, received parameter: 4 Worker B: working, received parameter: 3
16. Uložení výsledků práce workerů
Pokud ovšem systém nastavíme tak, aby se výsledky práce workerů (výsledky jednotlivých úloh) ukládaly zpět do Redisu, budeme moci čekat na výsledky celé skupiny úloh. Nastavení se provede takto:
def setup_broker_and_backend(): redis_broker = RedisBroker(host="localhost", port=6379) result_backend = RedisBackend() dramatiq.set_broker(redis_broker) redis_broker.add_middleware(Results(backend=result_backend)) return redis_broker
Změní se i jednotliví workeři, protože u nich budeme vyžadovat uložení výsledků:
@dramatiq.actor(store_results=True) def test_worker_A(parameter): worker("A", parameter) return parameter + "A" @dramatiq.actor(store_results=True) def test_worker_B(parameter): worker("B", parameter) return parameter + "B" @dramatiq.actor(store_results=True) def test_worker_C(parameter): worker("C", parameter) return parameter + "C"
Naplánování úloh s jejich rozřazením do skupiny a s čekáním na výsledek (maximálně však dvacet sekund):
for i in range(1, 6): print(i) g = dramatiq.group([ test_worker_A.message(str(i)), test_worker_B.message(str(i)), test_worker_C.message(str(i)) ]).run() g.wait(timeout=20000)
Ještě se podívejme na rozdíl mezi použitím metody send a message. Pokud ve skupině workerů použijeme metodu send, bude ve skutečnosti každý worker zavolán dvakrát s tím stejným parametrem, což většinou je chování, které nám nebude vyhovovat (i když by workeři měli být idempotentní):
for i in range(1, 6): print(i) g = dramatiq.group([ test_worker_A.send(str(i)), test_worker_B.send(str(i)), test_worker_C.send(str(i)) ]).run() g.wait(timeout=20000)
S výsledky:
Worker C: working, received parameter: 1 Worker B: working, received parameter: 1 Worker B: done Worker C: done Worker C: working, received parameter: 1 Worker C: done Worker A: working, received parameter: 1 Worker A: done Worker B: working, received parameter: 1 Worker A: working, received parameter: 1 Worker A: done Worker B: done Worker C: working, received parameter: 2 Worker C: done Worker A: working, received parameter: 2 Worker A: done Worker C: working, received parameter: 2 Worker C: done Worker A: working, received parameter: 2 Worker A: done Worker B: working, received parameter: 2 Worker B: done Worker B: working, received parameter: 2 Worker B: done Worker C: working, received parameter: 3 Worker C: done Worker B: working, received parameter: 3 Worker C: working, received parameter: 3 Worker A: working, received parameter: 3
Ovšem ve chvíli, kdy se zdrojový kód příkladu změní tak, že se namísto send použije message:
for i in range(1, 6): print(i) g = dramatiq.group([ test_worker_A.message(str(i)), test_worker_B.message(str(i)), test_worker_C.message(str(i)) ]).run() g.wait(timeout=20000)
Bude rozdílné i chování – nyní se vždy spustí všechny tři workery ve skupině a teprve poté se spustí další skupina workerů:
Worker C: working, received parameter: 1 Worker C: done Worker B: working, received parameter: 1 Worker B: done Worker A: working, received parameter: 1 Worker A: done Worker C: working, received parameter: 2 Worker B: working, received parameter: 2 Worker B: done Worker C: done Worker A: working, received parameter: 2 Worker A: done Worker C: working, received parameter: 3 Worker C: done Worker B: working, received parameter: 3 Worker A: working, received parameter: 3 Worker A: done Worker B: done Worker C: working, received parameter: 4 Worker B: working, received parameter: 4 Worker B: done Worker A: working, received parameter: 4 Worker C: done Worker A: done Worker C: working, received parameter: 5 Worker B: working, received parameter: 5 Worker B: done Worker A: working, received parameter: 5 Worker A: done Worker C: done
17. Kolona (pipeline) workerů
V předposledním demonstračním příkladu je ukázáno, jak lze vytvořit takzvanou kolonu (pipeline) složenou z více workerů. Nejprve je spuštěn první worker a jeho výsledek je předán workerovi druhému, výsledek druhého workera je předán workerovi třetímu atd. – zcela stejným způsobem, jako v klasické Unixové koloně (povšimněte si, že workeři nemusí vědět, kdo jim parametry předal a kdo zpracuje jejich výsledek, naprosto stejně, jako je tomu v Unixové pipeline):
p = dramatiq.pipeline([ test_worker_A.message("!"), test_worker_B.message(), test_worker_C.message() ]).run()
Implementace všech tří workerů je snadná; každý worker přečte parametr úlohy, přidá k němu jedno písmeno a vrátí jako výsledek:
import time import dramatiq from dramatiq.brokers.redis import RedisBroker from dramatiq.results.backends import RedisBackend from dramatiq.results import Results def setup_broker_and_backend(): redis_broker = RedisBroker(host="localhost", port=6379) result_backend = RedisBackend() dramatiq.set_broker(redis_broker) redis_broker.add_middleware(Results(backend=result_backend)) return redis_broker setup_broker_and_backend() def worker(name, parameter): print("Worker {w}: working, received parameter: {param}".format(w=name, param=parameter)) print("Worker {w}: done".format(w=name)) @dramatiq.actor(store_results=True) def test_worker_A(parameter): worker("A", parameter) return parameter + "A" @dramatiq.actor(store_results=True) def test_worker_B(parameter): worker("B", parameter) return parameter + "B" @dramatiq.actor(store_results=True) def test_worker_C(parameter): worker("C", parameter) return parameter + "C"
Úplný zdrojový kód skriptu, který spustí kolonu workerů, vypadá následovně:
import time import dramatiq from dramatiq.brokers.redis import RedisBroker from test_worker_9 import test_worker_A, test_worker_B, test_worker_C, setup_broker_and_backend setup_broker_and_backend() p = dramatiq.pipeline([ test_worker_A.message("!"), test_worker_B.message(), test_worker_C.message() ]).run() print(p.get_result(block=True, timeout=5000))
Podívejme se nyní na to, jak bude vypadat výsledek spojení tří workerů do kolony:
Worker A: working, received parameter: ! Worker A: done Worker B: working, received parameter: !A Worker B: done Worker C: working, received parameter: !AB Worker C: done
V dnešním posledním demonstračním příkladu opět použijeme kolonu (pipeline) složenou ze tří workerů, tentokrát ovšem bude každá kolona spuštěna třikrát – pokaždé pro jiný vstup. Pro jednoduchost budou vstupy znaky „X“, „Y“ a „Z“ a samotná kolona bude delší (bude sestávat ze šestice workerů):
for parameter in "XYZ": p = dramatiq.pipeline([ test_worker_A.message(parameter), test_worker_B.message(), test_worker_C.message(), test_worker_A.message(), test_worker_B.message(), test_worker_C.message() ]).run()
Úplný tvar skriptu, který úlohy naplánuje, bude vypadat takto:
import time import dramatiq from dramatiq.brokers.redis import RedisBroker from test_worker_10 import test_worker_A, test_worker_B, test_worker_C, setup_broker_and_backend setup_broker_and_backend() for parameter in "XYZ": p = dramatiq.pipeline([ test_worker_A.message(parameter), test_worker_B.message(), test_worker_C.message(), test_worker_A.message(), test_worker_B.message(), test_worker_C.message() ]).run() print(p.get_result(block=True, timeout=5000))
Výsledky získané z jednotlivých kolon by tedy měly vypadat následovně:
XABCABC YABCABC ZABCABC
Pochopitelně můžeme sledovat i chování jednotlivých workerů při zpracovávání vstupů a generování výsledků:
Worker A: working, received parameter: X Worker A: done Worker B: working, received parameter: XA Worker B: done Worker C: working, received parameter: XAB Worker C: done Worker A: working, received parameter: XABC Worker A: done Worker B: working, received parameter: XABCA Worker B: done Worker C: working, received parameter: XABCAB Worker C: done Worker A: working, received parameter: Y Worker A: done Worker B: working, received parameter: YA Worker B: done Worker C: working, received parameter: YAB Worker C: done Worker A: working, received parameter: YABC Worker A: done Worker B: working, received parameter: YABCA Worker B: done Worker C: working, received parameter: YABCAB Worker C: done Worker A: working, received parameter: Z Worker A: done Worker B: working, received parameter: ZA Worker B: done Worker C: working, received parameter: ZAB Worker C: done Worker A: working, received parameter: ZABC Worker A: done Worker B: working, received parameter: ZABCA Worker B: done Worker C: working, received parameter: ZABCAB Worker C: done
18. Nástroj dramatiq-dashboard pro sledování front
Jedním z pomocných nástrojů, o němž se v dnešním článku ve stručnosti zmíníme, je nástroj nazvaný Dramatiq Dashboard. Jedná se o obdobu nástroje rq-dashboard, který byl vyvinut pro konkurenční systém Redis Queue. Nástroj Dramatiq Dashboard je určen pro zobrazení webových stránek s informacemi o použitých frontách, připojených workerech atd. Zajímavé je, že Dramatiq Dashboard je možné zabudovat do libovolné WSGI aplikace či WSGI webového serveru, nemusí se tedy jednat o další samostatně běžící webový server.
Instalaci nástroje (či možná lépe řečeno knihovny) dramatiq_dashboard opět provedeme pomocí pip nebo pip3:
$ pip3 install --user dramatiq_dashboard Collecting dramatiq_dashboard Downloading https://files.pythonhosted.org/packages/d9/f6/89bbc958546f18ab3207db3b52fef235528a1f87cc680ceea9001868941a/dramatiq_dashboard-0.2.2-py3-none-any.whl Requirement already satisfied: redis<4.0,>=2.0 in ./.local/lib/python3.6/site-packages (from dramatiq_dashboard) Requirement already satisfied: jinja2<3,>=2 in ./.local/lib/python3.6/site-packages (from dramatiq_dashboard) Requirement already satisfied: dramatiq[redis]<2.0,>=1.6 in ./.local/lib/python3.6/site-packages (from dramatiq_dashboard) Requirement already satisfied: MarkupSafe>=0.23 in ./.local/lib/python3.6/site-packages (from jinja2<3,>=2->dramatiq_dashboard) Requirement already satisfied: prometheus-client<0.3,>=0.2 in ./.local/lib/python3.6/site-packages (from dramatiq[redis]<2.0,>=1.6->dramatiq_dashboard) Installing collected packages: dramatiq-dashboard Successfully installed dramatiq-dashboard-0.2.2
Jak již bylo řečeno v předchozím odstavci, je možné knihovnu dramatiq_dashboard integrovat do WSGI aplikace. Samotná integrace je popsána přímo na stránkách tohoto projektu a může vypadat následovně:
import bjoern import dramatiq from dramatiq.brokers.redis import RedisBroker from dramatiq_dashboard import DashboardApp broker = RedisBroker(host="127.0.0.1", port=6379) dramatiq.set_broker(broker) app = DashboardApp(broker=broker, prefix="") bjoern.run(app, "127.0.0.1", 8080)
19. Repositář s demonstračními příklady
Zdrojové kódy všech dnes popsaných demonstračních příkladů vyvinutých v programovacím jazyku Python byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/message-queues-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má stále ještě doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce. Každý příklad obsahuje implementaci workera či workerů a taktéž skript pro naplánování úloh:
20. Odkazy na Internetu
- Dramatiq: simple task processing
https://dramatiq.io/ - Cookbook (for Dramatiq)
https://dramatiq.io/cookbook.html - Balíček dramatiq na PyPi
https://pypi.org/project/dramatiq/ - Dramatiq dashboard
https://github.com/Bogdanp/dramatiq_dashboard - Dramatiq na Redditu
https://www.reddit.com/r/dramatiq/ - A Dramatiq broker that can be used with Amazon SQS
https://github.com/Bogdanp/dramatiq_sqs - nanomsg na GitHubu
https://github.com/nanomsg/nanomsg - Referenční příručka knihovny nanomsg
https://nanomsg.org/v1.1.5/nanomsg.html - nng (nanomsg-next-generation)
https://github.com/nanomsg/nng - Differences between nanomsg and ZeroMQ
https://nanomsg.org/documentation-zeromq.html - NATS
https://nats.io/about/ - NATS Streaming Concepts
https://nats.io/documentation/streaming/nats-streaming-intro/ - NATS Streaming Server
https://nats.io/download/nats-io/nats-streaming-server/ - NATS Introduction
https://nats.io/documentation/ - NATS Client Protocol
https://nats.io/documentation/internals/nats-protocol/ - NATS Messaging (Wikipedia)
https://en.wikipedia.org/wiki/NATS_Messaging - Stránka Apache Software Foundation
http://www.apache.org/ - Informace o portu 5672
http://www.tcp-udp-ports.com/port-5672.htm - Třída MessagingHandler knihovny Qpid Proton
https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/proton._handlers.MessagingHandler-class.html - Třída Event knihovny Qpid Proton
https://qpid.apache.org/releases/qpid-proton-0.27.0/proton/python/api/proton._events.Event-class.html - package stomp (Go)
https://godoc.org/github.com/go-stomp/stomp - Go language library for STOMP protocol
https://github.com/go-stomp/stomp - python-qpid-proton 0.26.0 na PyPi
https://pypi.org/project/python-qpid-proton/ - Qpid Proton
http://qpid.apache.org/proton/ - Using the AMQ Python Client
https://access.redhat.com/documentation/en-us/red_hat_amq/7.1/html-single/using_the_amq_python_client/ - Apache ActiveMQ
http://activemq.apache.org/ - Apache ActiveMQ Artemis
https://activemq.apache.org/artemis/ - Apache ActiveMQ Artemis User Manual
https://activemq.apache.org/artemis/docs/latest/index.html - KahaDB
http://activemq.apache.org/kahadb.html - Understanding the KahaDB Message Store
https://access.redhat.com/documentation/en-US/Fuse_MQ_Enterprise/7.1/html/Configuring_Broker_Persistence/files/KahaDBOverview.html - Command Line Tools (Apache ActiveMQ)
https://activemq.apache.org/activemq-command-line-tools-reference.html - stomp.py 4.1.21 na PyPi
https://pypi.org/project/stomp.py/ - Stomp Tutorial
https://access.redhat.com/documentation/en-US/Fuse_Message_Broker/5.5/html/Connectivity_Guide/files/FMBConnectivityStompTelnet.html - Heartbeat (computing)
https://en.wikipedia.org/wiki/Heartbeat_(computing) - Apache Camel
https://camel.apache.org/ - Red Hat Fuse
https://developers.redhat.com/products/fuse/overview/ - Confusion between ActiveMQ and ActiveMQ-Artemis?
https://serverfault.com/questions/873533/confusion-between-activemq-and-activemq-artemis - Staré stránky projektu HornetQ
http://hornetq.jboss.org/ - Snapshot JeroMQ verze 0.4.4
https://oss.sonatype.org/content/repositories/snapshots/org/zeromq/jeromq/0.4.4-SNAPSHOT/ - Difference between ActiveMQ vs Apache ActiveMQ Artemis
http://activemq.2283324.n4.nabble.com/Difference-between-ActiveMQ-vs-Apache-ActiveMQ-Artemis-td4703828.html - Microservices communications. Why you should switch to message queues
https://dev.to/matteojoliveau/microservices-communications-why-you-should-switch-to-message-queues–48ia - Stomp.py 4.1.19 documentation
https://stomppy.readthedocs.io/en/stable/ - Repositář knihovny JeroMQ
https://github.com/zeromq/jeromq/ - ØMQ – Distributed Messaging
http://zeromq.org/ - ØMQ Community
http://zeromq.org/community - Get The Software
http://zeromq.org/intro:get-the-software - PyZMQ Documentation
https://pyzmq.readthedocs.io/en/latest/ - Module: zmq.decorators
https://pyzmq.readthedocs.io/en/latest/api/zmq.decorators.html - ZeroMQ is the answer, by Ian Barber
https://vimeo.com/20605470 - ZeroMQ RFC
https://rfc.zeromq.org/ - ZeroMQ and Clojure, a brief introduction
https://antoniogarrote.wordpress.com/2010/09/08/zeromq-and-clojure-a-brief-introduction/ - zeromq/czmq
https://github.com/zeromq/czmq - golang wrapper for CZMQ
https://github.com/zeromq/goczmq - ZeroMQ version reporting in Python
http://zguide.zeromq.org/py:version - A Go interface to ZeroMQ version 4
https://github.com/pebbe/zmq4 - Broker vs. Brokerless
http://zeromq.org/whitepapers:brokerless - Learning ØMQ with pyzmq
https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/ - Céčková funkce zmq_ctx_new
http://api.zeromq.org/4–2:zmq-ctx-new - Céčková funkce zmq_ctx_destroy
http://api.zeromq.org/4–2:zmq-ctx-destroy - Céčková funkce zmq_bind
http://api.zeromq.org/4–2:zmq-bind - Céčková funkce zmq_unbind
http://api.zeromq.org/4–2:zmq-unbind - Céčková C funkce zmq_connect
http://api.zeromq.org/4–2:zmq-connect - Céčková C funkce zmq_disconnect
http://api.zeromq.org/4–2:zmq-disconnect - Céčková C funkce zmq_send
http://api.zeromq.org/4–2:zmq-send - Céčková C funkce zmq_recv
http://api.zeromq.org/4–2:zmq-recv - Třída Context (Python)
https://pyzmq.readthedocs.io/en/latest/api/zmq.html#context - Třída Socket (Python)
https://pyzmq.readthedocs.io/en/latest/api/zmq.html#socket - Python binding
http://zeromq.org/bindings:python - Why should I have written ZeroMQ in C, not C++ (part I)
http://250bpm.com/blog:4 - Why should I have written ZeroMQ in C, not C++ (part II)
http://250bpm.com/blog:8 - About Nanomsg
https://nanomsg.org/ - Advanced Message Queuing Protocol
https://www.amqp.org/ - Advanced Message Queuing Protocol na Wikipedii
https://en.wikipedia.org/wiki/Advanced_Message_Queuing_Protocol - Dokumentace k příkazu rabbitmqctl
https://www.rabbitmq.com/rabbitmqctl.8.html - RabbitMQ
https://www.rabbitmq.com/ - RabbitMQ Tutorials
https://www.rabbitmq.com/getstarted.html - RabbitMQ: Clients and Developer Tools
https://www.rabbitmq.com/devtools.html - RabbitMQ na Wikipedii
https://en.wikipedia.org/wiki/RabbitMQ - Streaming Text Oriented Messaging Protocol
https://en.wikipedia.org/wiki/Streaming_Text_Oriented_Messaging_Protocol - Message Queuing Telemetry Transport
https://en.wikipedia.org/wiki/MQTT - Erlang
http://www.erlang.org/ - pika 0.12.0 na PyPi
https://pypi.org/project/pika/ - Introduction to Pika
https://pika.readthedocs.io/en/stable/ - Langohr: An idiomatic Clojure client for RabbitMQ that embraces the AMQP 0.9.1 model
http://clojurerabbitmq.info/ - AMQP 0–9–1 Model Explained
http://www.rabbitmq.com/tutorials/amqp-concepts.html - Part 1: RabbitMQ for beginners – What is RabbitMQ?
https://www.cloudamqp.com/blog/2015–05–18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html - Downloading and Installing RabbitMQ
https://www.rabbitmq.com/download.html - celery na PyPi
https://pypi.org/project/celery/ - Databáze Redis (nejenom) pro vývojáře používající Python
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/ - Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/ - Redis Queue (RQ)
https://www.fullstackpython.com/redis-queue-rq.html - Python Celery & RabbitMQ Tutorial
https://tests4geeks.com/python-celery-rabbitmq-tutorial/ - Flower: Real-time Celery web-monitor
http://docs.celeryproject.org/en/latest/userguide/monitoring.html#flower-real-time-celery-web-monitor - Asynchronous Tasks With Django and Celery
https://realpython.com/asynchronous-tasks-with-django-and-celery/ - First Steps with Celery
http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html - node-celery
https://github.com/mher/node-celery - Full Stack Python: web development
https://www.fullstackpython.com/web-development.html - Introducing RQ
https://nvie.com/posts/introducing-rq/ - Asynchronous Tasks with Flask and Redis Queue
https://testdriven.io/asynchronous-tasks-with-flask-and-redis-queue - rq-dashboard
https://github.com/eoranged/rq-dashboard - Stránky projektu Redis
https://redis.io/ - Introduction to Redis
https://redis.io/topics/introduction - Try Redis
http://try.redis.io/ - Redis tutorial, April 2010 (starší, ale pěkně udělaný)
https://static.simonwillison.net/static/2010/redis-tutorial/ - Python Redis
https://redislabs.com/lp/python-redis/ - Redis: key-value databáze v paměti i na disku
https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/ - Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
http://www.cloudsvet.cz/?p=253 - Praktický úvod do Redis (2): transakce
http://www.cloudsvet.cz/?p=256 - Praktický úvod do Redis (3): cluster
http://www.cloudsvet.cz/?p=258 - Connection pool
https://en.wikipedia.org/wiki/Connection_pool - Instant Redis Sentinel Setup
https://github.com/ServiceStack/redis-config - How to install REDIS in LInux
https://linuxtechlab.com/how-install-redis-server-linux/ - Redis RDB Dump File Format
https://github.com/sripathikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format - Lempel–Ziv–Welch
https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch - Redis Persistence
https://redis.io/topics/persistence - Redis persistence demystified
http://oldblog.antirez.com/post/redis-persistence-demystified.html - Redis reliable queues with Lua scripting
http://oldblog.antirez.com/post/250 - Ost (knihovna)
https://github.com/soveran/ost - NoSQL
https://en.wikipedia.org/wiki/NoSQL - Shard (database architecture)
https://en.wikipedia.org/wiki/Shard_%28database_architecture%29 - What is sharding and why is it important?
https://stackoverflow.com/questions/992988/what-is-sharding-and-why-is-it-important - What Is Sharding?
https://btcmanager.com/what-sharding/ - Redis clients
https://redis.io/clients - Category:Lua-scriptable software
https://en.wikipedia.org/wiki/Category:Lua-scriptable_software - Seriál Programovací jazyk Lua
https://www.root.cz/serialy/programovaci-jazyk-lua/ - Redis memory usage
http://nosql.mypopescu.com/post/1010844204/redis-memory-usage - Ukázka konfigurace Redisu pro lokální testování
https://github.com/tisnik/presentations/blob/master/redis/redis.conf - Resque
https://github.com/resque/resque - Nested transaction
https://en.wikipedia.org/wiki/Nested_transaction - Publish–subscribe pattern
https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern - Messaging pattern
https://en.wikipedia.org/wiki/Messaging_pattern - Using pipelining to speedup Redis queries
https://redis.io/topics/pipelining - Pub/Sub
https://redis.io/topics/pubsub - ZeroMQ distributed messaging
http://zeromq.org/ - ZeroMQ: Modern & Fast Networking Stack
https://www.igvita.com/2010/09/03/zeromq-modern-fast-networking-stack/ - Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
https://stackoverflow.com/questions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr - Python & Redis PUB/SUB
https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7 - Message broker
https://en.wikipedia.org/wiki/Message_broker - RESP Arrays
https://redis.io/topics/protocol#array-reply - Redis Protocol specification
https://redis.io/topics/protocol - Redis Pub/Sub: Intro Guide
https://www.redisgreen.net/blog/pubsub-intro/ - Redis Pub/Sub: Howto Guide
https://www.redisgreen.net/blog/pubsub-howto/ - Comparing Publish-Subscribe Messaging and Message Queuing
https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message - Apache Kafka
https://kafka.apache.org/ - Iron
http://www.iron.io/mq - kue (založeno na Redisu, určeno pro node.js)
https://github.com/Automattic/kue - Cloud Pub/Sub
https://cloud.google.com/pubsub/ - Introduction to Redis Streams
https://redis.io/topics/streams-intro - glob (programming)
https://en.wikipedia.org/wiki/Glob_(programming) - Why and how Pricing Assistant migrated from Celery to RQ – Paris.py
https://www.slideshare.net/sylvinus/why-and-how-pricing-assistant-migrated-from-celery-to-rq-parispy-2 - Enqueueing internals
http://python-rq.org/contrib/ - queue — A synchronized queue class
https://docs.python.org/3/library/queue.html - Queue – A thread-safe FIFO implementation
https://pymotw.com/2/Queue/ - Queues
http://queues.io/ - Windows Subsystem for Linux Documentation
https://docs.microsoft.com/en-us/windows/wsl/about - RestMQ
http://restmq.com/ - ActiveMQ
http://activemq.apache.org/ - Amazon MQ
https://aws.amazon.com/amazon-mq/ - Amazon Simple Queue Service
https://aws.amazon.com/sqs/ - Celery: Distributed Task Queue
http://www.celeryproject.org/ - Disque, an in-memory, distributed job queue
https://github.com/antirez/disque - rq-dashboard
https://github.com/eoranged/rq-dashboard - Projekt RQ na PyPi
https://pypi.org/project/rq/ - rq-dashboard 0.3.12
https://pypi.org/project/rq-dashboard/ - Job queue
https://en.wikipedia.org/wiki/Job_queue - Why we moved from Celery to RQ
https://frappe.io/blog/technology/why-we-moved-from-celery-to-rq - Running multiple workers using Celery
https://serverfault.com/questions/655387/running-multiple-workers-using-celery - celery — Distributed processing
http://docs.celeryproject.org/en/latest/reference/celery.html - Chains
https://celery.readthedocs.io/en/latest/userguide/canvas.html#chains - Routing
http://docs.celeryproject.org/en/latest/userguide/routing.html#automatic-routing - Celery Distributed Task Queue in Go
https://github.com/gocelery/gocelery/ - Python Decorators
https://wiki.python.org/moin/PythonDecorators - Periodic Tasks
http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html - celery.schedules
http://docs.celeryproject.org/en/latest/reference/celery.schedules.html#celery.schedules.crontab - Pros and cons to use Celery vs. RQ
https://stackoverflow.com/questions/13440875/pros-and-cons-to-use-celery-vs-rq - Priority queue
https://en.wikipedia.org/wiki/Priority_queue - Jupyter
https://jupyter.org/ - Context Managers
http://book.pythontips.com/en/latest/context_managers.html