Obsah
1. Proudy (streams) podporované systémem Redis
2. Překlad a instalace Redisu verze 6.0
3. Instalace podpůrné knihovny pro Python
4. Spuštění Redisu a kontrola připojení klienta
5. Základní operace se streamy přímo z klienta Redisu: vytvoření nové zprávy
6. Explicitní specifikace ID nových zpráv připojovaných do proudu
7. Automatické generování ID nových zpráv
8. Zjištění počtu zpráv v proudu
9. Režimy přístupu ke zprávám uloženým v proudu
10. Čtení zpráv bez čekaní konzumenta na nové zprávy: příkaz XREAD
11. Čekání na nové zprávy s využitím parametru BLOCK
12. Proud ve funkci databáze s časovými značkami: příkazy XRANGE a XREVRANGE
13. Připojení konzumenta k většímu množství proudů
14. Základní operace nad proudy z Pythonu
15. Připojení zprávy či zpráv na konec proudu
16. Přečtení všech zpráv z proudu
17. Blokující čekání na příchozí zprávu
18. Repositář s demonstračními příklady
1. Proudy (streams) podporované systémem Redis
Dnes se opět po delším čase budeme zabývat popisem systému Redis neboli (v původním významu) Remote Dictionary Server. Navážeme tak na články Databáze Redis (nejenom) pro vývojáře používající Python a Databáze Redis (nejenom) pro vývojáře používající Python (dokončení). V rámci Redisu verze 5.0 byl představen nový komplexní datový typ pojmenovaný stream (proud), který je mj. určen pro ukládání logovacích informací (zpráv). Jedná se tedy o strukturu, ke které je možné data (zde nazývané zprávy) připojovat na konec operací typu append, přičemž jednotlivé připojované zprávy obsahují jednoznačný identifikátor z monotonně rostoucí posloupnosti.
Čtení dat z proudu může být provedeno několika způsoby. Buď konzument těchto dat čeká na nově vkládané zprávy, zprávy může číst od určitého ID nebo může zprávy přečíst pro zadaný rozsah ID. Navíc může zprávy číst větší množství konzumentů, kteří buď dostanou všechny zprávy z proudu (fan-out) nebo jsou jim zprávy rozdělovány. V Redis Streams nalezneme i podporu pro skupiny konzumentů (Consumer Groups), jejichž označení bylo inspirováno myšlenkou realizovanou v systému Apache Kafky. Ve skutečnosti se ovšem jedná o odlišné techniky, což si ostatně ukážeme v několika demonstračních příkladech popsaných v dalším článku. Dnes se však budeme zabývat tou nejjednodušší možnou konfigurací – producenty zpráv, konzumenty, kteří se připojí k jednomu proudu a konzumenty, kteří jsou současně připojeni k více proudům.
V navazujících kapitolách si popíšeme funkci následujících příkazů Redisu, které souvisí s proudy:
# | Příkaz | Stručný popis |
---|---|---|
1 | XADD | připojení zprávy na konec proudu |
2 | XLEN | získání počtu zpráv v proudu |
3 | XREAD | čtení (blokující i neblokující) zpráv z proudu či proudů |
4 | XRANGE | získání zpráv pro daný rozsah jejich ID |
5 | XREVRANGE | podobné XRANGE, ovšem zprávy jsou vráceny v opačném pořadí |
2. Překlad a instalace Redisu verze 6.0
V úvodní kapitole jsme si řekli, že proudy (streamy) byly představeny až v Redisu verze 5. To, jaká verze serveru je aktuálně nainstalována, lze zjistit následujícím příkazem:
$ redis-server --version
Na mnoha systémech nalezneme stále verzi 4.x (kterou v dnešním článku nelze využít) nebo 5.0.x, což je ostatně i případ mnou používaného systému Fedora 32:
Redis server v=5.0.9 sha=00000000:0 malloc=jemalloc-5.1.0 bits=64 build=699c550ace009f13
Pokud se v repositářích vaší distribuce z nějakého důvodu nenachází novější verze Redisu, lze ji přeložit a nainstalovat přímo ze zdrojových kódů. To ve skutečnosti není nic složitého, protože závislosti Redisu jsou pouze minimální: základní knihovna glibc a volitelně též knihovna jemalloc (její použití je však možné zakázat, což může mít vliv na rychlost práce s pamětí, popř. na požadavky na větší množství virtuální paměti).
Stažení zdrojových kódů Redisu 6.0:
$ wget https://download.redis.io/releases/redis-6.0.10.tar.gz --2021-01-21 10:11:24-- https://download.redis.io/releases/redis-6.0.10.tar.gz Resolving download.redis.io (download.redis.io)... 45.60.121.1 Connecting to download.redis.io (download.redis.io)|45.60.121.1|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 2271970 (2.2M) [application/octet-stream] Saving to: ‘redis-6.0.10.tar.gz’ redis-6.0.10.tar.gz 100%[===================>] 2.17M --.-KB/s in 0.1s 2021-01-21 10:11:27 (15.5 MB/s) - ‘redis-6.0.10.tar.gz’ saved [2271970/2271970]
Rozbalení tarballu:
$ tar xvfz redis-6.0.10.tar.gz
Překlad a instalace (pro jednoduchost se nepoužívá knihovna jmalloc):
$ cd redis-6.0.10 $ make distclean; make MALLOC=libc; make $ make install
Kontrola, jaká verze Redisu je nyní k dispozici:
$ redis-server --version Redis server v=6.0.10 sha=00000000:0 malloc=libc bits=64 build=76a7412d0d12cd5d
3. Instalace podpůrné knihovny pro Python
Ve druhé části článku si ukážeme, jakým způsobem je možné s technologií proudů pracovat s využitím programovacího jazyka Python. K tomuto účelu použijeme knihovnu s implementací vhodného rozhraní. Jedná se o relativně malou a jednoduše použitelnou knihovnu nazvanou walrus, která je pochopitelně dostupná i přes PyPi. Instalace této knihovny je tedy velmi jednoduchá a lze ji provést (jak je obvyklé) příkazem pip:
# pip install walrus
resp.pouze pro právě přihlášeného uživatele:
$ pip install --user walrus
Průběh instalace se žádným podstatným způsobem neliší od instalace dalších balíčků dostupných na PyPi. Jedinou závislostí je balíček nazvaný redis, s jehož použitím jsme se již seznámili v předchozích dvou článcích o Redisu (viz též odkazy uvedené v devatenácté kapitole):
Collecting walrus Downloading https://files.pythonhosted.org/packages/50/15/27c9bde13eec0ac555987d643adb2b39ac0617e6b8d39c4a17256f677c73/walrus-0.8.1.tar.gz (80kB) |████████████████████████████████| 81kB 2.4MB/s Requirement already satisfied: redis>=3.0.0 in /usr/local/lib/python3.8/site-packages (from walrus) (3.5.3) Installing collected packages: walrus Running setup.py install for walrus ... done Successfully installed walrus-0.8.1
Po (doufejme že úspěšné) instalaci by se měly v podadresáři ~/.local/lib/python3.x/site-packages (lokální instalace), popř. v adresáři /usr/local/lib/python3.x/site-packages objevit následující podadresáře s výše zmíněnými knihovnami redis a walrus:
drwxr-xr-x. 3 tester tester 171 Jan 21 10:23 redis drwxr-xr-x. 2 tester tester 102 Jan 21 10:23 redis-3.5.3.dist-info drwxr-xr-x. 7 tester tester 4096 Jan 21 10:24 walrus drwxr-xr-x. 2 tester tester 137 Jan 21 10:24 walrus-0.8.1-py3.8.egg-info
Rychlá kontrola instalace přímo z Pythonu:
$ python Python 3.8.6 (default, Sep 25 2020, 00:00:00) [GCC 10.2.1 20200723 (Red Hat 10.2.1-1)] on linux Type "help", "copyright", "credits" or "license" for more information. >>> import redis >>> import walrus >>> help(walrus)
Po zadání posledního příkazu by se měla zobrazit nápověda k balíčku walrus:
Help on package walrus: NAME walrus - Lightweight Python utilities for working with Redis. PACKAGE CONTENTS autocomplete cache containers counter database fts graph lock models query rate_limit search (package) streams tests (package) tusks (package) utils ... ... ...
4. Spuštění Redisu a kontrola připojení klienta
V dalších kapitolách se předpokládá, že se klienti budou připojovat k běžícímu serveru Redisu, a to na standardním portu 6379. Chování serveru, volba úložiště dat, jeho dostupnost i mimo lokální síť atd. jsou pochopitelně plně konfigurovatelné. O několika důležitých konfiguračních volbách jsme se již zmínili v tomto textu (ovšem určeném ještě pro Redis 4.x). Samotný server se spouští příkazem redis-server:
$ redis-server
Po spuštění by se měla vypsat informace o verzi Redisu, použitém konfiguračním souboru (ve výpisu níže není konfigurační soubor specifikován) a především o portu, ke kterému je možné se připojit z klientů:
20676:C 21 Jan 2021 12:02:45.527 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo 20676:C 21 Jan 2021 12:02:45.527 # Redis version=6.0.10, bits=64, commit=00000000, modified=0, pid=20676, just started 20676:C 21 Jan 2021 12:02:45.527 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf 20676:M 21 Jan 2021 12:02:45.528 * Increased maximum number of open files to 10032 (it was originally set to 1024). _._ _.-``__ ''-._ _.-`` `. `_. ''-._ Redis 6.0.10 (00000000/0) 64 bit .-`` .-```. ```\/ _.,_ ''-._ ( ' , .-` | `, ) Running in standalone mode |`-._`-...-` __...-.``-._|'` _.-'| Port: 6379 | `-._ `._ / _.-' | PID: 20676 `-._ `-._ `-./ _.-' _.-' |`-._`-._ `-.__.-' _.-'_.-'| | `-._`-._ _.-'_.-' | http://redis.io `-._ `-._`-.__.-'_.-' _.-' |`-._`-._ `-.__.-' _.-'_.-'| | `-._`-._ _.-'_.-' | `-._ `-._`-.__.-'_.-' _.-' `-._ `-.__.-' _.-' `-._ _.-' `-.__.-' 20676:M 21 Jan 2021 12:02:45.530 # Server initialized 20676:M 21 Jan 2021 12:02:45.530 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect. 20676:M 21 Jan 2021 12:02:45.530 * Loading RDB produced by version 6.0.10 20676:M 21 Jan 2021 12:02:45.531 * RDB age 5938 seconds 20676:M 21 Jan 2021 12:02:45.531 * RDB memory usage when created 0.84 Mb 20676:M 21 Jan 2021 12:02:45.531 * DB loaded from disk: 0.000 seconds 20676:M 21 Jan 2021 12:02:45.531 * Ready to accept connections
Pokud je server spuštěn v samostatném terminálu, je ho možné ukončit buď klávesovou zkratkou Ctrl+C nebo (jak je to běžné) příkazem kill:
^C 20676:signal-handler (1611248833) Received SIGINT scheduling shutdown... 20676:M 21 Jan 2021 12:07:13.469 # User requested shutdown... 20676:M 21 Jan 2021 12:07:13.469 * Saving the final RDB snapshot before exiting. 20676:M 21 Jan 2021 12:07:13.471 * DB saved on disk 20676:M 21 Jan 2021 12:07:13.471 # Redis is now ready to exit, bye bye...
Nyní se k serveru zkusíme připojit ze standardního klienta:
$ redis-cli
Prakticky ihned by se měla objevit výzva:
127.0.0.1:6379>
Vyzkoušíme základní komunikaci příkazem PING. Server by měl odpovědět zprávou PONG:
127.0.0.1:6379> PING PONG
Vypsat si můžeme i konfiguraci serveru, a to konkrétně příkazem INFO:
127.0.0.1:6379> INFO # Server redis_version:6.0.10 redis_git_sha1:00000000 redis_git_dirty:0 ... ... ... # CPU used_cpu_sys:0.072013 used_cpu_user:0.033019 used_cpu_sys_children:0.000000 used_cpu_user_children:0.000000 # Modules # Cluster cluster_enabled:0 # Keyspace db0:keys=1,expires=0,avg_ttl=0 127.0.0.1:6379>
5. Základní operace se streamy přímo z klienta Redisu: vytvoření nové zprávy
Nyní, když je spuštěný server Redisu, si můžeme vyzkoušet zcela základní operace, které je možné s proudy (streamy) provádět. Prozatím využijeme možností nabízené přímo standardním klientem Redisu – ostatně tento klient jsme používali i v předchozích dvou článcích o Redisu. Proud (stream) je datová struktura určená pro připojování nových zpráv, tj. podporující především operaci typu append. Tato operace je realizována příkazem XADD:
127.0.0.1:6379> help xadd XADD key ID field value [field value ...] summary: Appends a new entry to a stream since: 5.0.0 group: stream
Při volání tohoto příkazu se musí uvést především jméno proudu (streamu), identifikátor zprávy (viz další text) a následně data, ze kterých se zpráva skládá. Ovšem pozor – na rozdíl od systému Apache Kafka mají zprávy v Redisu strukturu, nejedná se tedy o pouhou sekvenci bajtů (což je současně výhoda i nevýhoda). Strukturou jsou v tomto kontextu myšleny dvojice klíč+hodnota (což je ostatně základní struktura, na které je postaven celý Redis, resp. přesněji řečeno jeho původní architektura).
Následuje příklad vytvoření nové zprávy a její připojení do proudu nazvaného „stream1“ Identifikátor zprávy je nastaven na hodnotu „1“ a po této hodnotě následuje vlastní zpráva zapisovaná dvojicemi klíč+hodnota (oddělovačem je vždy bílý znak):
127.0.0.1:6379> xadd stream1 1 x 10 y 20 "1-0"
Pokud proud „stream1“ neexistuje, je příkazem vytvořen. Příkaz vrátí skutečné ID zprávy, které je odvozeno od celého čísla (většího než nula), které jsme explicitně předali. O formátu ID se podrobněji zmíníme v následujících dvou kapitolách.
6. Explicitní specifikace ID nových zpráv připojovaných do proudu
Identifikátory (ID) zpráv musí v každém proudu tvořit monotonně rostoucí číselnou sekvenci. Pokud se například do proudu pokusíme přidat zprávu s již existujícím ID, dojde k chybě:
127.0.0.1:6379> xadd stream1 1 x 10 y 20 (error) ERR The ID specified in XADD is equal or smaller than the target stream top item
Přidání dalších zpráv, přičemž mezi ID mohou být libovolně dlouhé kroky (nejenom 1):
127.0.0.1:6379> xadd stream1 2 x 10 y 20 "2-0" 127.0.0.1:6379> xadd stream1 4 x 10 y 20 "4-0"
Ovšem pokus o vložení zprávy „dovnitř“ proudu opět vede k chybě – tato operace není podporována:
127.0.0.1:6379> xadd stream1 3 x 10 y 20 (error) ERR The ID specified in XADD is equal or smaller than the target stream top item
Ve skutečnosti je ID zprávy (tedy číslo) rozděleno na dvě části oddělené pomlčkou. První hodnota typicky obsahuje čas, druhé hodnotě se říká pořadové číslo. Pokud jsou první hodnoty ID shodné, musí se zprávy odlišovat pořadovým číslem (a opět tvořit monotonně rostoucí řadu). Možné je měnit pouze pořadové číslo, a to následujícím způsobem (pro větší přehlednost použijeme jiný proud):
127.0.0.1:6379> xadd stream3 0-1 data prvni "0-1" 127.0.0.1:6379> xadd stream3 0-2 data druhy "0-2" 127.0.0.1:6379> xadd stream3 0-3 data treti "0-3"
Přeskočení jednoho pořadového čísla je opět možné:
127.0.0.1:6379> xadd stream3 0-5 data paty "0-5"
Ovšem k nepoužitému pořadovému číslu se již nelze vrátit:
127.0.0.1:6379> xadd stream3 0-4 data ctvrty (error) ERR The ID specified in XADD is equal or smaller than the target stream top item
A navíc musí být hodnoty první hodnota větší než nula (zatímco pořadové číslo nulové být může):
127.0.0.1:6379> xadd stream4 0-0 data 0 (error) ERR The ID specified in XADD must be greater than 0-0
7. Automatické generování ID nových zpráv
V naprosté většině případů se však nemusíme zabývat explicitním generováním jednoznačných identifikátorů pro nové zprávy, protože tuto činnost dokáže systém Redis provádět automaticky. V příkazu XADD je ovšem nutné namísto ID vložit znak * se speciálním významem „vygeneruj ID automaticky“:
127.0.0.1:6379> xadd stream1 * x 1 y 2 "1611328029392-0" 127.0.0.1:6379> xadd stream1 * x 1 y 3 "1611328035713-0" 127.0.0.1:6379> xadd stream1 * x 2 y 3 "1611328039051-0"
8. Zjištění počtu zpráv v proudu
Počet zpráv, které jsou v daném proudu uloženy, lze zjistit příkazem XLEN:
127.0.0.1:6379> help xlen XLEN key summary: Return the number of entires in a stream since: 5.0.0 group: stream
Počet zpráv v existujících (již vytvořených) proudech:
127.0.0.1:6379> xlen stream1 (integer) 6 127.0.0.1:6379> xlen stream3 (integer) 4
Počet zpráv v neexistujícím proudu:
127.0.0.1:6379> xlen stream2 (integer) 0
9. Režimy přístupu ke zprávám uloženým v proudu
Nyní již víme, jakým způsobem je možné na konec proudu připojit novou zprávu příkazem XADD. Současně se jedná o jedinou operaci určenou pro přidání nových zpráv – ty lze skutečně pouze připojovat na konec a nikoli přidávat na začátek či doprostřed proudu. Ovšem čtení zpráv je podporováno v několika režimech, z nichž každý se hodí v jiné situaci:
- Čekání na novou zprávu s její následnou konzumací (zpracováním). Pokud je připojeno větší množství konzumentů zpráv, dostanou novou zprávu všichni konzumenti. Tato operace, která je podporována i některými klasickými message brokery, se nazývá fan-out.
- Stejná operace, ovšem provedena současně pro více proudů.
- Přečtení zprávy či zpráv se zadaným ID, popř. od zadaného ID. Částečně se tento režim podobá specifikaci offsetu v systému Apache Kafka.
- Stejná operace, ovšem provedena současně pro více proudů.
- Čekání na nové zprávy, o něž se konzumenti budou dělit. To znamená, že každý konzument získá jiné zprávy. Přidáním dalších konzumentů se obecně zvýší počet zpráv zpracovatelných za jednotku času. Tento koncept můžeme znát ze systému Apache Kafka, kde se objevuje pod názvem skupiny konzumentů (Consumer Groups).
- Posledním režimem je získání zpráv na základě zadaného časového intervalu. K celému proudu se tedy chováme tak, jakoby se jednalo o databázi s časovými razítky (podobně ostatně pracuje například i Prometheus a Grafana).
10. Čtení zpráv bez čekání konzumenta na nové zprávy: příkaz XREAD
Pro přečtení zprávy z proudu se používá příkaz READ, jehož konkrétní chování je možné upravit několika parametry:
127.0.0.1:6379> help xread XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...] summary: Return never seen elements in multiple streams, with IDs greater than the ones reported by the caller for each stream. Can block. since: 5.0.0 group: stream
Při čtení zprávy můžeme zadat identifikátor zprávy, od které má čtení začít (přesněji řečeno se zadává identifikátor poslední zpracované zprávy, tedy takové zprávy, kterou již přečíst nepotřebujeme). V rámci předchozích pokusů jsme do proudu „stream3“ připojili čtyři zprávy, které přečteme následovně (od zprávy s ID=„0–0“, která nemůže existovat, takže se začne zprávou s ID=„0–1“):
127.0.0.1:6379> xread streams stream3 0 1) 1) "stream3" 2) 1) 1) "0-1" 2) 1) "data" 2) "prvni" 2) 1) "0-2" 2) 1) "data" 2) "druhy" 3) 1) "0-3" 2) 1) "data" 2) "treti" 4) 1) "0-5" 2) 1) "data" 2) "paty"
Nejprve se podívejme na příkaz XREAD. Tomu se předá parametr streams, za nímž následuje posloupnost jmen proudů, ze kterých chceme číst a za touto posloupností pak posloupnost identifikátorů. Počet zadaných jmen proudů a identifikátorů musí být shodný. Prozatím chceme přečíst zprávy z jediného proudu „stream3“, a to od samotného od začátku proudu, takže zadáme strema3 0, kde „0“ je expandováno na „0–0“.
Výsledkem čtení je relativně složitě strukturované pole (de facto se jedná o několik vnořených polí). Každý element tohoto pole obsahuje dvojici (taktéž pole), přičemž první element z této dvojice obsahuje jméno proudu (v našem případě „stream3“) a druhý element pak jednotlivé zprávy, které jsou opět tvořeny polem. Na této úrovni tedy pole vypadá takto:
1) 1) "stream3" 2) pole_se_zprávami
A jak je z výpisu patrné, obsahují tato vnitřní pole vždy v prvním prvku ID zprávy a v prvku druhém její obsah.
Druhé spuštění příkazu READ „přehraje“ ty samé zprávy (popř. i nově příchozí zprávy):
127.0.0.1:6379> xread streams stream3 0 1) 1) "stream3" 2) 1) 1) "0-1" 2) 1) "data" 2) "prvni" 2) 1) "0-2" 2) 1) "data" 2) "druhy" 3) 1) "0-3" 2) 1) "data" 2) "treti" 4) 1) "0-5" 2) 1) "data" 2) "paty"
11. Čekání na nové zprávy s využitím parametru BLOCK
Většinou konzumenti na nově příchozí zprávy musí čekat. I tato operace je příkazem XREAD podporována, a to konkrétně po použití parametru BLOCK, kterému se předá, kolik milisekund (maximálně) se má na zprávu čekat. Hodnota 0 značí čekání bez omezení. Navíc se v tomto kontextu setkáme se speciálním ID zprávy označeným dolarem – toto ID znamená poslední zprávu v proudu a tudíž bude operace XREAD čekat na další zprávu.
Čekání na další zprávu v proudu „stream“ bez časového omezení:
127.0.0.1:6379> xread BLOCK 0 streams stream3 $
V dalším terminálu přidáme do proudu „stream3“ novou zprávu:
127.0.0.1:6379> XADD stream3 * foo 1 bar 2 "1611329909780-0"
V prvním terminálu (s příkazem XREAD) by se ihned měl objevit výsledek:
1) 1) "stream3" 2) 1) 1) "1611329909780-0" 2) 1) "foo" 2) "1" 3) "bar" 4) "2" (93.15s) 127.0.0.1:6379>
Čekání po dobu maximálně jedné sekundy na novou zprávu:
127.0.0.1:6379> xread BLOCK 1000 streams stream3 $ (nil) (1.08s)
12. Proud ve funkci databáze s časovými značkami: XRANGE a XREVRANGE
K proudu je možné přistupovat taktéž jako k databázi, v níž jsou jednotlivé záznamy identifikovány časovými značkami, přesněji řečeno časovou značkou (Unix time v milisekundách) následovanou pořadím zprávy. Vrácení většího množství zpráv v zadaném časovém intervalu zajišťují funkce XRANGE a XREVRANGE:
127.0.0.1:6379> help xrange XRANGE key start end [COUNT count] summary: Return a range of elements in a stream, with IDs matching the specified IDs interval since: 5.0.0 group: stream
Pro specifikaci „od“ a „do“ je možné použít speciální identifikátory „-“ a „+“, které značí „minimální možný ID“ a „maximální možný ID“. Přečtení všech zpráv z proudu „stream1“ se tedy provede takto:
127.0.0.1:6379> xrange stream1 - + 1) 1) "1-0" 2) 1) "x" 2) "10" 3) "y" 4) "20" 2) 1) "2-0" 2) 1) "x" 2) "10" 3) "y" 4) "20" 3) 1) "4-0" 2) 1) "x" 2) "10" 3) "y" 4) "20" 4) 1) "1611328029392-0" 2) 1) "x" 2) "1" 3) "y" 4) "2" 5) 1) "1611328035713-0" 2) 1) "x" 2) "1" 3) "y" 4) "3" 6) 1) "1611328039051-0" 2) 1) "x" 2) "2" 3) "y" 4) "3"
Čtení z neexistujícího proudu je možné:
127.0.0.1:6379> xrange stream2 - + (empty array)
Čtení zpráv z jiného proudu:
127.0.0.1:6379> xrange stream3 - + 1) 1) "0-1" 2) 1) "data" 2) "prvni" 2) 1) "0-2" 2) 1) "data" 2) "druhy" 3) 1) "0-3" 2) 1) "data" 2) "treti" 4) 1) "0-5" 2) 1) "data" 2) "paty"
Určení konkrétních ID „od“ a „do“:
127.0.0.1:6379> xrange stream3 0-1 0-4 1) 1) "0-1" 2) 1) "data" 2) "prvni" 2) 1) "0-2" 2) 1) "data" 2) "druhy" 3) 1) "0-3" 2) 1) "data" 2) "treti"
ID ve skutečnosti nemusí existovat:
127.0.0.1:6379> xrange stream3 0-2 0-10000 1) 1) "0-2" 2) 1) "data" 2) "druhy" 2) 1) "0-3" 2) 1) "data" 2) "treti" 3) 1) "0-5" 2) 1) "data" 2) "paty"
Určení rozsahu ID, v němž se nenachází žádné zprávy:
127.0.0.1:6379> xrange stream3 0-100 0-10000 (empty array)
Omezení počtu vrácených zpráv parametrem COUNT:
127.0.0.1:6379> xrange stream3 - + count 3 1) 1) "0-1" 2) 1) "data" 2) "prvni" 2) 1) "0-2" 2) 1) "data" 2) "druhy" 3) 1) "0-3" 2) 1) "data" 2) "treti"
Vrácení zpráv v opačném pořadí příkazem XREVRANGE:
127.0.0.1:6379> XREVRANGE stream1 + - 1) 1) "1611328039051-0" 2) 1) "x" 2) "2" 3) "y" 4) "3" 2) 1) "1611328035713-0" 2) 1) "x" 2) "1" 3) "y" 4) "3" 3) 1) "1611328029392-0" 2) 1) "x" 2) "1" 3) "y" 4) "2" 4) 1) "4-0" 2) 1) "x" 2) "10" 3) "y" 4) "20" 5) 1) "2-0" 2) 1) "x" 2) "10" 3) "y" 4) "20" 6) 1) "1-0" 2) 1) "x" 2) "10" 3) "y" 4) "20"
13. Připojení konzumenta k většímu množství proudů
Jednoho konzumenta je možné připojit k více proudům. V takovém případě konzument přečte (operací READ) první zprávu z libovolného proudu, která odpovídá zadanému ID. Typicky takový konzument očekává nové zprávy s blokujícím čekáním na tyto zprávy. V popisovaném případě se zadají nejdříve jména proudů a posléze i speciální ID zapisované znakem dolaru, které znamenají, že se má přečíst nová zpráva (která ještě do proudu nebyla připojena):
127.0.0.1:6379> xread BLOCK 0 streams streamA streamB $ $
V novém terminálu nyní připojíme zprávu do proudu „streamA“:
127.0.0.1:6379> xadd streamA * description messageA1 "1611339744871-0"
V prvním terminálu uvidíme, že zpráva byla skutečně ihned přečtena:
1) 1) "streamA" 2) 1) 1) "1611339744871-0" 2) 1) "description" 2) "messageA1" (61.71s)
Nové spuštění konzumenta:
127.0.0.1:6379> xread BLOCK 0 streams streamA streamB $ $
Nyní připojíme zprávu do druhého proudu „streamB“:
127.0.0.1:6379> xadd streamB * description messageB1 "1611339766311-0"
Zpráva je konzumentem ihned přečtena:
1) 1) "streamB" 2) 1) 1) "1611339766311-0" 2) 1) "description" 2) "messageB1" (8.77s)
14. Základní operace nad proudy z Pythonu
Nyní si – prozatím alespoň ve stručnosti – popíšeme operace, které lze s proudy Redisu provádět v programovacího jazyka Python s využitím knihovny walrus, kterou jsme nainstalovali v rámci třetí kapitoly. Tato knihovna nabízí několik tříd, z nichž základní se jmenuje Database. Tato třída představuje rozhraní k Redisu a lze ji inicializovat následovně:
db = Database()
Následně je možné získat instanci třídy Stream, která reprezentuje rozhraní k jednomu konkrétnímu proudu. Ten je uveden jako parametr konstruktoru této třídy:
stream = db.Stream("streamX")
Celý příklad, který získá rozhraní k proudu „streamX“ v lokálně běžícím Redisu, může vypadat následovně:
from walrus import Database db = Database() stream = db.Stream("streamX") print(db) print(stream)
15. Připojení zprávy či zpráv na konec proudu
Pro připojení zprávy na konec proudu se v Pythonu používá metoda nazvaná add ze třídy Stream. Této metodě, která odpovídá příkazu XADD, lze v nepovinném parametru předat ID zprávy a zejména její obsah, který může být tvořen slovníkem s libovolným obsahem. Teoreticky může mít každá zpráva uložená do proudu odlišnou strukturu (ta se nekontroluje), v praxi to ovšem není příliš časté.
Připojení jediné zprávy do proudu „streamX“ metodou Stream.add. Připojení je neblokující operací:
from walrus import Database db = Database() stream = db.Stream("streamX") message_id = stream.add({"foo": 10, "bar": 20}) print(message_id)
V následujícím příkladu je do proudu nazvaného „streamX“ vloženo deset zpráv se stejnou strukturou, ovšem s odlišnými hodnotami uloženými pod jednotlivými klíči. Pokud proud neexistuje, tak je automaticky vytvořen. ID zpráv je vygenerováno automaticky na základě času jejich vložení do proudu:
from walrus import Database db = Database() stream = db.Stream("streamX") for i in range(0, 10): message_id = stream.add({"foo": i+1, "bar": i*2}) print(message_id)
Tento příklad po svém spuštění vypíše ID vytvořených (připojených) zpráv. Povšimněte si, že se vrací pole bajtů (tedy nikoli řetězec Pythonu 3) a taktéž toho, že skript je dostatečně rychlý na to, aby v jedné milisekundě uložil více zpráv, které se musí odlišovat svým pořadovým číslem:
b'1611340770619-0' b'1611340770620-0' b'1611340770620-1' b'1611340770620-2' b'1611340770620-3' b'1611340770620-4' b'1611340770621-0' b'1611340770621-1' b'1611340770621-2' b'1611340770621-3'
16. Přečtení všech zpráv z proudu
Nejčastěji prováděnou operací, kterou musí konzumenti (či workeři) provádět, je přečtení všech zpráv z vybraného proudu. Tato operace se s využitím knihovny walrus ve skutečnosti provede velmi jednoduše, protože dostupné zprávy získáme následujícím způsobem ve formě sekvence, kterou lze převést na seznam (pokud je k dispozici dostatek paměti) a následně s ní provádět všechny operace typu slicingu atd.:
messages = list(stream)
Tato operace je neblokující; případné čtení s čekáním na příchod zprávy se realizuje metodou Stream.read popsanou v navazující kapitole.
from walrus import Database db = Database() stream = db.Stream("streamX") messages = list(stream) print(messages)
Podívejme se nyní na to, jaký seznam vlastně získáme, pokud přečteme zprávy vložené do proudu „streamX“ v rámci předchozího demonstračního příkladu:
$ python3 04_read_messages.py
Vrácená datová struktura bude mít tento obsah:
[(b'1611340576679-0', {b'foo': b'10', b'bar': b'20'}), (b'1611340770619-0', {b'foo': b'1', b'bar': b'0'}), (b'1611340770620-0', {b'foo': b'2', b'bar': b'2'}), (b'1611340770620-1', {b'foo': b'3', b'bar': b'4'}), (b'1611340770620-2', {b'foo': b'4', b'bar': b'6'}), (b'1611340770620-3', {b'foo': b'5', b'bar': b'8'}), (b'1611340770620-4', {b'foo': b'6', b'bar': b'10'}), (b'1611340770621-0', {b'foo': b'7', b'bar': b'12'}), (b'1611340770621-1', {b'foo': b'8', b'bar': b'14'}), (b'1611340770621-2', {b'foo': b'9', b'bar': b'16'}), (b'1611340770621-3', {b'foo': b'10', b'bar': b'18'})]
17. Blokující čekání na příchozí zprávu
Posledním demonstračním příkladem, který si v dnešním článku ukážeme, je blokující čekání na příchozí zprávu. Tato operace je provedena metodou read třídy Stream. U čtení můžeme specifikovat několik parametrů, především ID již přečtené zprávy a taktéž příznak blokující operace s případným určením maximální doby čekání na příchod zprávy:
from walrus import Database db = Database() stream = db.Stream("streamX") message = stream.read(block=0, last_id="$") print(message)
18. Repositář s demonstračními příklady
Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v Pythonu byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/py-redis-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce:
# | Demonstrační příklad | Popis | Cesta |
---|---|---|---|
1 | streams01_create_stream.py | připojení k Redisu a konstrukce objektu typu stream | https://github.com/tisnik/py-redis-examples/blob/master/streams01_create_stream.py |
2 | streams02_add_message.py | přidání zprávy do proudu metodou add | https://github.com/tisnik/py-redis-examples/blob/master/streams02_add_message.py |
3 | streams03_add_messages.py | rychlé přidání několika zpráv do proudu metodou add | https://github.com/tisnik/py-redis-examples/blob/master/streams03_add_messages.py |
4 | streams04_read_messages.py | neblokující přečtení všech zpráv, které jsou uloženy ve vybraném proudu | https://github.com/tisnik/py-redis-examples/blob/master/streams04_read_messages.py |
5 | streams05_read_new_message.py | blokující čekání na příchod nové zprávy | https://github.com/tisnik/py-redis-examples/blob/master/streams05_read_new_message.py |
19. Předchozí články o Redisu
Se systémem Redis jsme se již na stránkách Rootu setkali, a to dokonce několikrát. Buď jsme si popisovali přímo přístup k Redisu z různých programovacích jazyků (což je konkrétně případ všech dále zmíněných článků zaměřených na jazyky Python a Go) nebo byl Redis použit ve funkci databáze, resp. perzistentního úložiště různými message brokery (Celery, RQ, apod.):
- Databáze Redis (nejenom) pro vývojáře používající Python
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/ - Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/ - Použití databáze Redis v aplikacích naprogramovaných v Go
https://www.root.cz/clanky/pouziti-databaze-redis-v-aplikacich-naprogramovanych-v-go/ - Použití databáze Redis v aplikacích naprogramovaných v Go (2)
https://www.root.cz/clanky/pouziti-databaze-redis-v-aplikacich-naprogramovanych-v-go-2/ - Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí
https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/
20. Odkazy na Internetu
- Repositář knihovny walrus na GitHubu
https://github.com/coleifer/walrus/ - Knihovna walrus na PyPi
https://pypi.org/project/walrus/ - Stránky projektu Redis
https://redis.io/ - Introduction to Redis
https://redis.io/topics/introduction - Try Redis
http://try.redis.io/ - Redis tutorial, April 2010 (starší, ale pěkně udělaný)
https://static.simonwillison.net/static/2010/redis-tutorial/ - Python Redis
https://redislabs.com/lp/python-redis/ - Redis: key-value databáze v paměti i na disku
https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/ - Scripting Redis with Lua
https://redislabs.com/ebook/part-3-next-steps/chapter-11-scripting-redis-with-lua/ - Redis Lua script for atomic operations and cache stampede
https://engineering.linecorp.com/en/blog/redis-lua-scripting-atomic-processing-cache/ - Redis Lua Scripts – Itamar Haber
https://www.youtube.com/watch?v=eReTl8NhHCs - Building Databases with Redis Tutorial: Lua Script | packtpub.com
https://www.youtube.com/watch?v=mMfGNsAr7Bg - Příkaz pro spuštění skriptu v jazyce Lua: EVAL script numkeys key [key …] arg [arg …]
https://redis.io/commands/eval - Redis Lua scripts debugger
https://redis.io/topics/ldb - Repositář projektu s Redis klientem pro jazyk Go
https://github.com/go-redis/redis - Stránky programovacího jazyka Lua
https://www.lua.org/ - Programovací jazyk Lua
https://www.palmknihy.cz/ucebnice-odborna-literatura/programovaci-jazyk-lua-12651 - Programming in Lua
https://www.lua.org/pil/ - Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
http://www.cloudsvet.cz/?p=253 - Praktický úvod do Redis (2): transakce
http://www.cloudsvet.cz/?p=256 - Praktický úvod do Redis (3): cluster
http://www.cloudsvet.cz/?p=258 - Connection pool
https://en.wikipedia.org/wiki/Connection_pool - Instant Redis Sentinel Setup
https://github.com/ServiceStack/redis-config - How to install REDIS in LInux
https://linuxtechlab.com/how-install-redis-server-linux/ - Redis RDB Dump File Format
https://github.com/sripathikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format - Lempel–Ziv–Welch
https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch - Redis Persistence
https://redis.io/topics/persistence - Redis persistence demystified
http://oldblog.antirez.com/post/redis-persistence-demystified.html - Redis reliable queues with Lua scripting
http://oldblog.antirez.com/post/250 - Ost (knihovna)
https://github.com/soveran/ost - NoSQL
https://en.wikipedia.org/wiki/NoSQL - Shard (database architecture)
https://en.wikipedia.org/wiki/Shard_%28database_architecture%29 - What is sharding and why is it important?
https://stackoverflow.com/questions/992988/what-is-sharding-and-why-is-it-important - What Is Sharding?
https://btcmanager.com/what-sharding/ - Redis clients
https://redis.io/clients - Category:Lua-scriptable software
https://en.wikipedia.org/wiki/Category:Lua-scriptable_software - Seriál Programovací jazyk Lua
https://www.root.cz/serialy/programovaci-jazyk-lua/ - Redis memory usage
http://nosql.mypopescu.com/post/1010844204/redis-memory-usage - Ukázka konfigurace Redisu pro lokální testování
https://github.com/tisnik/presentations/blob/master/redis/redis.conf - Resque
https://github.com/resque/resque - Nested transaction
https://en.wikipedia.org/wiki/Nested_transaction - Publish–subscribe pattern
https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern - Messaging pattern
https://en.wikipedia.org/wiki/Messaging_pattern - Using pipelining to speedup Redis queries
https://redis.io/topics/pipelining - Pub/Sub
https://redis.io/topics/pubsub - ZeroMQ distributed messaging
http://zeromq.org/ - Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
https://stackoverflow.com/questions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr - Python & Redis PUB/SUB
https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7 - Message broker
https://en.wikipedia.org/wiki/Message_broker - RESP Arrays
https://redis.io/topics/protocol#array-reply - Redis Protocol specification
https://redis.io/topics/protocol - Redis Pub/Sub: Intro Guide
https://www.redisgreen.net/blog/pubsub-intro/ - Redis Pub/Sub: Howto Guide
https://www.redisgreen.net/blog/pubsub-howto/ - Comparing Publish-Subscribe Messaging and Message Queuing
https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message - ActiveMQ
http://activemq.apache.org/activemq-website/index.html - Amazon Simple Queue Service
https://aws.amazon.com/sqs/ - Apache Kafka
https://kafka.apache.org/ - Cloud Pub/Sub
https://cloud.google.com/pubsub/ - Introduction to Redis Streams
https://redis.io/topics/streams-intro - Introduction to Redis streams with Python
http://charlesleifer.com/blog/redis-streams-with-python/ - glob (programming)
https://en.wikipedia.org/wiki/Glob_(programming) - Microservices: The Rise Of Kafka
https://movio.co/blog/microservices-rise-kafka/ - Building a Microservices Ecosystem with Kafka Streams and KSQL
https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/ - An introduction to Apache Kafka and microservices communication
https://medium.com/@ulymarins/an-introduction-to-apache-kafka-and-microservices-communication-bf0a0966d63 - kappa-architecture.com
http://milinda.pathirage.org/kappa-architecture.com/ - Questioning the Lambda Architecture
https://www.oreilly.com/ideas/questioning-the-lambda-architecture - Lambda architecture
https://en.wikipedia.org/wiki/Lambda_architecture - Kafka – ecosystem (Wiki)
https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem - The Kafka Ecosystem – Kafka Core, Kafka Streams, Kafka Connect, Kafka REST Proxy, and the Schema Registry
http://cloudurable.com/blog/kafka-ecosystem/index.html - A Kafka Operator for Kubernetes
https://github.com/krallistic/kafka-operator - Kafka Streams
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams - Kafka Streams
http://kafka.apache.org/documentation/streams/ - Kafka Streams (FAQ)
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Streams - What are some alternatives to Apache Kafka?
https://www.quora.com/What-are-some-alternatives-to-Apache-Kafka - What is the best alternative to Kafka?
https://www.slant.co/options/961/alternatives/~kafka-alternatives - A super quick comparison between Kafka and Message Queues
https://hackernoon.com/a-super-quick-comparison-between-kafka-and-message-queues-e69742d855a8?gi=e965191e72d0 - Kafka Queuing: Kafka as a Messaging System
https://dzone.com/articles/kafka-queuing-kafka-as-a-messaging-system