Proudy (streams) podporované systémem Redis

26. 1. 2021
Doba čtení: 27 minut

Sdílet

 Autor: Redis
Systém Redis se používá v mnoha oblastech. Může sloužit ve funkci distribuované key-value databáze, vyrovnávací paměti atd. V Redisu 5 navíc byla představena nová technologie implementující proudy (streams).

Obsah

1. Proudy (streams) podporované systémem Redis

2. Překlad a instalace Redisu verze 6.0

3. Instalace podpůrné knihovny pro Python

4. Spuštění Redisu a kontrola připojení klienta

5. Základní operace se streamy přímo z klienta Redisu: vytvoření nové zprávy

6. Explicitní specifikace ID nových zpráv připojovaných do proudu

7. Automatické generování ID nových zpráv

8. Zjištění počtu zpráv v proudu

9. Režimy přístupu ke zprávám uloženým v proudu

10. Čtení zpráv bez čekaní konzumenta na nové zprávy: příkaz XREAD

11. Čekání na nové zprávy s využitím parametru BLOCK

12. Proud ve funkci databáze s časovými značkami: příkazy XRANGE a XREVRANGE

13. Připojení konzumenta k většímu množství proudů

14. Základní operace nad proudy z Pythonu

15. Připojení zprávy či zpráv na konec proudu

16. Přečtení všech zpráv z proudu

17. Blokující čekání na příchozí zprávu

18. Repositář s demonstračními příklady

19. Předchozí články o Redisu

20. Odkazy na Internetu

1. Proudy (streams) podporované systémem Redis

Dnes se opět po delším čase budeme zabývat popisem systému Redis neboli (v původním významu) Remote Dictionary Server. Navážeme tak na články Databáze Redis (nejenom) pro vývojáře používající Python a Databáze Redis (nejenom) pro vývojáře používající Python (dokončení). V rámci Redisu verze 5.0 byl představen nový komplexní datový typ pojmenovaný stream (proud), který je mj. určen pro ukládání logovacích informací (zpráv). Jedná se tedy o strukturu, ke které je možné data (zde nazývané zprávy) připojovat na konec operací typu append, přičemž jednotlivé připojované zprávy obsahují jednoznačný identifikátor z monotonně rostoucí posloupnosti.

Čtení dat z proudu může být provedeno několika způsoby. Buď konzument těchto dat čeká na nově vkládané zprávy, zprávy může číst od určitého ID nebo může zprávy přečíst pro zadaný rozsah ID. Navíc může zprávy číst větší množství konzumentů, kteří buď dostanou všechny zprávy z proudu (fan-out) nebo jsou jim zprávy rozdělovány. V Redis Streams nalezneme i podporu pro skupiny konzumentů (Consumer Groups), jejichž označení bylo inspirováno myšlenkou realizovanou v systému Apache Kafky. Ve skutečnosti se ovšem jedná o odlišné techniky, což si ostatně ukážeme v několika demonstračních příkladech popsaných v dalším článku. Dnes se však budeme zabývat tou nejjednodušší možnou konfigurací – producenty zpráv, konzumenty, kteří se připojí k jednomu proudu a konzumenty, kteří jsou současně připojeni k více proudům.

V navazujících kapitolách si popíšeme funkci následujících příkazů Redisu, které souvisí s proudy:

# Příkaz Stručný popis
1 XADD připojení zprávy na konec proudu
2 XLEN získání počtu zpráv v proudu
3 XREAD čtení (blokující i neblokující) zpráv z proudu či proudů
4 XRANGE získání zpráv pro daný rozsah jejich ID
5 XREVRANGE podobné XRANGE, ovšem zprávy jsou vráceny v opačném pořadí

2. Překlad a instalace Redisu verze 6.0

V úvodní kapitole jsme si řekli, že proudy (streamy) byly představeny až v Redisu verze 5. To, jaká verze serveru je aktuálně nainstalována, lze zjistit následujícím příkazem:

$ redis-server --version
Poznámka: alternativně je možné sledovat hlášení serveru při jeho spouštění – viz též čtvrtou kapitolu.

Na mnoha systémech nalezneme stále verzi 4.x (kterou v dnešním článku nelze využít) nebo 5.0.x, což je ostatně i případ mnou používaného systému Fedora 32:

Redis server v=5.0.9 sha=00000000:0 malloc=jemalloc-5.1.0 bits=64 build=699c550ace009f13

Pokud se v repositářích vaší distribuce z nějakého důvodu nenachází novější verze Redisu, lze ji přeložit a nainstalovat přímo ze zdrojových kódů. To ve skutečnosti není nic složitého, protože závislosti Redisu jsou pouze minimální: základní knihovna glibc a volitelně též knihovna jemalloc (její použití je však možné zakázat, což může mít vliv na rychlost práce s pamětí, popř. na požadavky na větší množství virtuální paměti).

Stažení zdrojových kódů Redisu 6.0:

$ wget https://download.redis.io/releases/redis-6.0.10.tar.gz
 
--2021-01-21 10:11:24--  https://download.redis.io/releases/redis-6.0.10.tar.gz
Resolving download.redis.io (download.redis.io)... 45.60.121.1
Connecting to download.redis.io (download.redis.io)|45.60.121.1|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2271970 (2.2M) [application/octet-stream]
Saving to: ‘redis-6.0.10.tar.gz’
 
redis-6.0.10.tar.gz 100%[===================>]   2.17M  --.-KB/s    in 0.1s
 
2021-01-21 10:11:27 (15.5 MB/s) - ‘redis-6.0.10.tar.gz’ saved [2271970/2271970]

Rozbalení tarballu:

$ tar xvfz redis-6.0.10.tar.gz

Překlad a instalace (pro jednoduchost se nepoužívá knihovna jmalloc):

$ cd redis-6.0.10
 
$ make distclean; make MALLOC=libc; make
 
$ make install

Kontrola, jaká verze Redisu je nyní k dispozici:

$ redis-server --version
 
Redis server v=6.0.10 sha=00000000:0 malloc=libc bits=64 build=76a7412d0d12cd5d

3. Instalace podpůrné knihovny pro Python

Ve druhé části článku si ukážeme, jakým způsobem je možné s technologií proudů pracovat s využitím programovacího jazyka Python. K tomuto účelu použijeme knihovnu s implementací vhodného rozhraní. Jedná se o relativně malou a jednoduše použitelnou knihovnu nazvanou walrus, která je pochopitelně dostupná i přes PyPi. Instalace této knihovny je tedy velmi jednoduchá a lze ji provést (jak je obvyklé) příkazem pip:

# pip install walrus

resp.pouze pro právě přihlášeného uživatele:

$ pip install --user walrus

Průběh instalace se žádným podstatným způsobem neliší od instalace dalších balíčků dostupných na PyPi. Jedinou závislostí je balíček nazvaný redis, s jehož použitím jsme se již seznámili v předchozích dvou článcích o Redisu (viz též odkazy uvedené v devatenácté kapitole):

Collecting walrus
  Downloading https://files.pythonhosted.org/packages/50/15/27c9bde13eec0ac555987d643adb2b39ac0617e6b8d39c4a17256f677c73/walrus-0.8.1.tar.gz (80kB)
     |████████████████████████████████| 81kB 2.4MB/s
Requirement already satisfied: redis>=3.0.0 in /usr/local/lib/python3.8/site-packages (from walrus) (3.5.3)
Installing collected packages: walrus
    Running setup.py install for walrus ... done
Successfully installed walrus-0.8.1
Poznámka: nenechte se zmást tím, že se instaluje balíček redis ve verzi 3.0.0 nebo vyšší. Toto je verze rozhraní pro Python (tedy konkrétně balíčku https://pypi.org/project/redis/, jenž v současnosti existuje ve verzi 3.5.3), nikoli verze samotného Redisu.

Po (doufejme že úspěšné) instalaci by se měly v podadresáři ~/.local/lib/python3.x/site-packages (lokální instalace), popř. v adresáři /usr/local/lib/python3.x/site-packages objevit následující podadresáře s výše zmíněnými knihovnami redis a walrus:

drwxr-xr-x. 3 tester tester  171 Jan 21 10:23 redis
drwxr-xr-x. 2 tester tester  102 Jan 21 10:23 redis-3.5.3.dist-info
drwxr-xr-x. 7 tester tester 4096 Jan 21 10:24 walrus
drwxr-xr-x. 2 tester tester  137 Jan 21 10:24 walrus-0.8.1-py3.8.egg-info

Rychlá kontrola instalace přímo z Pythonu:

$ python
 
Python 3.8.6 (default, Sep 25 2020, 00:00:00)
[GCC 10.2.1 20200723 (Red Hat 10.2.1-1)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import redis
>>> import walrus
>>> help(walrus)

Po zadání posledního příkazu by se měla zobrazit nápověda k balíčku walrus:

Help on package walrus:
 
NAME
    walrus - Lightweight Python utilities for working with Redis.
 
PACKAGE CONTENTS
    autocomplete
    cache
    containers
    counter
    database
    fts
    graph
    lock
    models
    query
    rate_limit
    search (package)
    streams
    tests (package)
    tusks (package)
    utils
 
   ...
   ...
   ...

4. Spuštění Redisu a kontrola připojení klienta

V dalších kapitolách se předpokládá, že se klienti budou připojovat k běžícímu serveru Redisu, a to na standardním portu 6379. Chování serveru, volba úložiště dat, jeho dostupnost i mimo lokální síť atd. jsou pochopitelně plně konfigurovatelné. O několika důležitých konfiguračních volbách jsme se již zmínili v tomto textu (ovšem určeném ještě pro Redis 4.x). Samotný server se spouští příkazem redis-server:

$ redis-server

Po spuštění by se měla vypsat informace o verzi Redisu, použitém konfiguračním souboru (ve výpisu níže není konfigurační soubor specifikován) a především o portu, ke kterému je možné se připojit z klientů:

20676:C 21 Jan 2021 12:02:45.527 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
20676:C 21 Jan 2021 12:02:45.527 # Redis version=6.0.10, bits=64, commit=00000000, modified=0, pid=20676, just started
20676:C 21 Jan 2021 12:02:45.527 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
20676:M 21 Jan 2021 12:02:45.528 * Increased maximum number of open files to 10032 (it was originally set to 1024).
                _._
           _.-``__ ''-._
      _.-``    `.  `_.  ''-._           Redis 6.0.10 (00000000/0) 64 bit
  .-`` .-```.  ```\/    _.,_ ''-._
 (    '      ,       .-`  | `,    )     Running in standalone mode
 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 6379
 |    `-._   `._    /     _.-'    |     PID: 20676
  `-._    `-._  `-./  _.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |           http://redis.io
  `-._    `-._`-.__.-'_.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |
  `-._    `-._`-.__.-'_.-'    _.-'
      `-._    `-.__.-'    _.-'
          `-._        _.-'
              `-.__.-'

20676:M 21 Jan 2021 12:02:45.530 # Server initialized
20676:M 21 Jan 2021 12:02:45.530 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.
20676:M 21 Jan 2021 12:02:45.530 * Loading RDB produced by version 6.0.10
20676:M 21 Jan 2021 12:02:45.531 * RDB age 5938 seconds
20676:M 21 Jan 2021 12:02:45.531 * RDB memory usage when created 0.84 Mb
20676:M 21 Jan 2021 12:02:45.531 * DB loaded from disk: 0.000 seconds
20676:M 21 Jan 2021 12:02:45.531 * Ready to accept connections

Pokud je server spuštěn v samostatném terminálu, je ho možné ukončit buď klávesovou zkratkou Ctrl+C nebo (jak je to běžné) příkazem kill:

^C
20676:signal-handler (1611248833) Received SIGINT scheduling shutdown...
20676:M 21 Jan 2021 12:07:13.469 # User requested shutdown...
20676:M 21 Jan 2021 12:07:13.469 * Saving the final RDB snapshot before exiting.
20676:M 21 Jan 2021 12:07:13.471 * DB saved on disk
20676:M 21 Jan 2021 12:07:13.471 # Redis is now ready to exit, bye bye...

Nyní se k serveru zkusíme připojit ze standardního klienta:

$ redis-cli

Prakticky ihned by se měla objevit výzva:

127.0.0.1:6379>

Vyzkoušíme základní komunikaci příkazem PING. Server by měl odpovědět zprávou PONG:

127.0.0.1:6379> PING
 
PONG

Vypsat si můžeme i konfiguraci serveru, a to konkrétně příkazem INFO:

127.0.0.1:6379> INFO
 
# Server
redis_version:6.0.10
redis_git_sha1:00000000
redis_git_dirty:0
 
...
...
...
 
# CPU
used_cpu_sys:0.072013
used_cpu_user:0.033019
used_cpu_sys_children:0.000000
used_cpu_user_children:0.000000
 
# Modules
 
# Cluster
cluster_enabled:0
 
# Keyspace
db0:keys=1,expires=0,avg_ttl=0
 
127.0.0.1:6379>
Poznámka: pokud předchozí příkazy nepracují korektně, je nutné se ujistit, že server běží na zadaném portu, že není blokováno připojení atd.

5. Základní operace se streamy přímo z klienta Redisu: vytvoření nové zprávy

Nyní, když je spuštěný server Redisu, si můžeme vyzkoušet zcela základní operace, které je možné s proudy (streamy) provádět. Prozatím využijeme možností nabízené přímo standardním klientem Redisu – ostatně tento klient jsme používali i v předchozích dvou článcích o Redisu. Proud (stream) je datová struktura určená pro připojování nových zpráv, tj. podporující především operaci typu append. Tato operace je realizována příkazem XADD:

127.0.0.1:6379> help xadd
 
  XADD key ID field value [field value ...]
  summary: Appends a new entry to a stream
  since: 5.0.0
  group: stream

Při volání tohoto příkazu se musí uvést především jméno proudu (streamu), identifikátor zprávy (viz další text) a následně data, ze kterých se zpráva skládá. Ovšem pozor – na rozdíl od systému Apache Kafka mají zprávy v Redisu strukturu, nejedná se tedy o pouhou sekvenci bajtů (což je současně výhoda i nevýhoda). Strukturou jsou v tomto kontextu myšleny dvojice klíč+hodnota (což je ostatně základní struktura, na které je postaven celý Redis, resp. přesněji řečeno jeho původní architektura).

Následuje příklad vytvoření nové zprávy a její připojení do proudu nazvaného „stream1“ Identifikátor zprávy je nastaven na hodnotu „1“ a po této hodnotě následuje vlastní zpráva zapisovaná dvojicemi klíč+hodnota (oddělovačem je vždy bílý znak):

127.0.0.1:6379> xadd stream1 1 x 10 y 20
 
"1-0"

Pokud proud „stream1“ neexistuje, je příkazem vytvořen. Příkaz vrátí skutečné ID zprávy, které je odvozeno od celého čísla (většího než nula), které jsme explicitně předali. O formátu ID se podrobněji zmíníme v následujících dvou kapitolách.

6. Explicitní specifikace ID nových zpráv připojovaných do proudu

Identifikátory (ID) zpráv musí v každém proudu tvořit monotonně rostoucí číselnou sekvenci. Pokud se například do proudu pokusíme přidat zprávu s již existujícím ID, dojde k chybě:

127.0.0.1:6379> xadd stream1 1 x 10 y 20
 
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Přidání dalších zpráv, přičemž mezi ID mohou být libovolně dlouhé kroky (nejenom 1):

127.0.0.1:6379> xadd stream1 2 x 10 y 20
"2-0"
 
127.0.0.1:6379> xadd stream1 4 x 10 y 20
"4-0"

Ovšem pokus o vložení zprávy „dovnitř“ proudu opět vede k chybě – tato operace není podporována:

127.0.0.1:6379> xadd stream1 3 x 10 y 20
 
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

Ve skutečnosti je ID zprávy (tedy číslo) rozděleno na dvě části oddělené pomlčkou. První hodnota typicky obsahuje čas, druhé hodnotě se říká pořadové číslo. Pokud jsou první hodnoty ID shodné, musí se zprávy odlišovat pořadovým číslem (a opět tvořit monotonně rostoucí řadu). Možné je měnit pouze pořadové číslo, a to následujícím způsobem (pro větší přehlednost použijeme jiný proud):

127.0.0.1:6379> xadd stream3 0-1 data prvni
"0-1"
 
127.0.0.1:6379> xadd stream3 0-2 data druhy
"0-2"
 
127.0.0.1:6379> xadd stream3 0-3 data treti
"0-3"

Přeskočení jednoho pořadového čísla je opět možné:

127.0.0.1:6379> xadd stream3 0-5 data paty
 
"0-5"

Ovšem k nepoužitému pořadovému číslu se již nelze vrátit:

127.0.0.1:6379> xadd stream3 0-4 data ctvrty
 
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

A navíc musí být hodnoty první hodnota větší než nula (zatímco pořadové číslo nulové být může):

127.0.0.1:6379> xadd stream4 0-0 data 0
 
(error) ERR The ID specified in XADD must be greater than 0-0
Poznámka: důvodem pro rozdělení ID na dvě části je možnost snadnějšího vyhledávání zpráv v časové posloupnosti.

7. Automatické generování ID nových zpráv

V naprosté většině případů se však nemusíme zabývat explicitním generováním jednoznačných identifikátorů pro nové zprávy, protože tuto činnost dokáže systém Redis provádět automaticky. V příkazu XADD je ovšem nutné namísto ID vložit znak * se speciálním významem „vygeneruj ID automaticky“:

127.0.0.1:6379> xadd stream1 * x 1 y 2
"1611328029392-0"
 
127.0.0.1:6379> xadd stream1 * x 1 y 3
"1611328035713-0"
 
127.0.0.1:6379> xadd stream1 * x 2 y 3
"1611328039051-0"
Poznámka: pokud vás zaráží vysoké číslo ID a vlastně i jeho formát s pomlčkou zmíněný výše, pak vězte, že je odvozeno od časového razítka a konkrétně reprezentuje počet milisekund (nikoli pouze sekund), které uběhly od začátku Unixové epochy, tedy od 1.1.1970. Za pomlčkou pak následuje pořadové číslo zprávy pro daný časový okamžik. To je zvýšeno v případě, že se ve stejné milisekundě zapíše větší množství zpráv do stejného proudu. Interně je číslo sekvence uloženo v 64 bitech, takže celkový počet zpráv zapisovatelných ve stejné milisekundě je prakticky neomezený. Ručně se nám pochopitelně nepodaří vložit dvě zprávy ve stejné milisekundě, ale programově je to pochopitelně možné – Redis je dostatečně rychlým systémem, aby zpracovat jednotky až desítky zpráv za 1ms.

8. Zjištění počtu zpráv v proudu

Počet zpráv, které jsou v daném proudu uloženy, lze zjistit příkazem XLEN:

127.0.0.1:6379> help xlen
 
  XLEN key
  summary: Return the number of entires in a stream
  since: 5.0.0
  group: stream

Počet zpráv v existujících (již vytvořených) proudech:

127.0.0.1:6379> xlen stream1
(integer) 6
 
127.0.0.1:6379> xlen stream3
(integer) 4

Počet zpráv v neexistujícím proudu:

127.0.0.1:6379> xlen stream2
(integer) 0

9. Režimy přístupu ke zprávám uloženým v proudu

Nyní již víme, jakým způsobem je možné na konec proudu připojit novou zprávu příkazem XADD. Současně se jedná o jedinou operaci určenou pro přidání nových zpráv – ty lze skutečně pouze připojovat na konec a nikoli přidávat na začátek či doprostřed proudu. Ovšem čtení zpráv je podporováno v několika režimech, z nichž každý se hodí v jiné situaci:

  1. Čekání na novou zprávu s její následnou konzumací (zpracováním). Pokud je připojeno větší množství konzumentů zpráv, dostanou novou zprávu všichni konzumenti. Tato operace, která je podporována i některými klasickými message brokery, se nazývá fan-out.
  2. Stejná operace, ovšem provedena současně pro více proudů.
  3. Přečtení zprávy či zpráv se zadaným ID, popř. od zadaného ID. Částečně se tento režim podobá specifikaci offsetu v systému Apache Kafka.
  4. Stejná operace, ovšem provedena současně pro více proudů.
  5. Čekání na nové zprávy, o něž se konzumenti budou dělit. To znamená, že každý konzument získá jiné zprávy. Přidáním dalších konzumentů se obecně zvýší počet zpráv zpracovatelných za jednotku času. Tento koncept můžeme znát ze systému Apache Kafka, kde se objevuje pod názvem skupiny konzumentů (Consumer Groups).
  6. Posledním režimem je získání zpráv na základě zadaného časového intervalu. K celému proudu se tedy chováme tak, jakoby se jednalo o databázi s časovými razítky (podobně ostatně pracuje například i Prometheus a Grafana).
Poznámka: přístup ke zprávě se zadaným ID (operace seek) má složitost O(log(N)) a operace čtení zpráv se zadaným intervalem (po seeku) má složitost O(M), kde M je počet zpráv v intervalu.

10. Čtení zpráv bez čekání konzumenta na nové zprávy: příkaz XREAD

Pro přečtení zprávy z proudu se používá příkaz READ, jehož konkrétní chování je možné upravit několika parametry:

127.0.0.1:6379> help xread
 
  XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
  summary: Return never seen elements in multiple streams, with IDs greater than the ones reported by the caller for each stream. Can block.
  since: 5.0.0
  group: stream

Při čtení zprávy můžeme zadat identifikátor zprávy, od které má čtení začít (přesněji řečeno se zadává identifikátor poslední zpracované zprávy, tedy takové zprávy, kterou již přečíst nepotřebujeme). V rámci předchozích pokusů jsme do proudu „stream3“ připojili čtyři zprávy, které přečteme následovně (od zprávy s ID=„0–0“, která nemůže existovat, takže se začne zprávou s ID=„0–1“):

127.0.0.1:6379> xread streams stream3 0
1) 1) "stream3"
   2) 1) 1) "0-1"
         2) 1) "data"
            2) "prvni"
      2) 1) "0-2"
         2) 1) "data"
            2) "druhy"
      3) 1) "0-3"
         2) 1) "data"
            2) "treti"
      4) 1) "0-5"
         2) 1) "data"
            2) "paty"

Nejprve se podívejme na příkaz XREAD. Tomu se předá parametr streams, za nímž následuje posloupnost jmen proudů, ze kterých chceme číst a za touto posloupností pak posloupnost identifikátorů. Počet zadaných jmen proudů a identifikátorů musí být shodný. Prozatím chceme přečíst zprávy z jediného proudu „stream3“, a to od samotného od začátku proudu, takže zadáme strema3 0, kde „0“ je expandováno na „0–0“.

Výsledkem čtení je relativně složitě strukturované pole (de facto se jedná o několik vnořených polí). Každý element tohoto pole obsahuje dvojici (taktéž pole), přičemž první element z této dvojice obsahuje jméno proudu (v našem případě „stream3“) a druhý element pak jednotlivé zprávy, které jsou opět tvořeny polem. Na této úrovni tedy pole vypadá takto:

1) 1) "stream3"
   2) pole_se_zprávami

A jak je z výpisu patrné, obsahují tato vnitřní pole vždy v prvním prvku ID zprávy a v prvku druhém její obsah.

Poznámka: jak uvidíme dále, bude interpretace této struktury v Pythonu skryta, takže ji většinou ani nebudeme potřebovat znát do podrobností.

Druhé spuštění příkazu READ „přehraje“ ty samé zprávy (popř. i nově příchozí zprávy):

127.0.0.1:6379> xread streams stream3 0
1) 1) "stream3"
   2) 1) 1) "0-1"
         2) 1) "data"
            2) "prvni"
      2) 1) "0-2"
         2) 1) "data"
            2) "druhy"
      3) 1) "0-3"
         2) 1) "data"
            2) "treti"
      4) 1) "0-5"
         2) 1) "data"
            2) "paty"

11. Čekání na nové zprávy s využitím parametru BLOCK

Většinou konzumenti na nově příchozí zprávy musí čekat. I tato operace je příkazem XREAD podporována, a to konkrétně po použití parametru BLOCK, kterému se předá, kolik milisekund (maximálně) se má na zprávu čekat. Hodnota 0 značí čekání bez omezení. Navíc se v tomto kontextu setkáme se speciálním ID zprávy označeným dolarem – toto ID znamená poslední zprávu v proudu a tudíž bude operace XREAD čekat na další zprávu.

Čekání na další zprávu v proudu „stream“ bez časového omezení:

127.0.0.1:6379> xread BLOCK 0 streams stream3 $

V dalším terminálu přidáme do proudu „stream3“ novou zprávu:

127.0.0.1:6379> XADD stream3 * foo 1 bar 2
 
"1611329909780-0"

V prvním terminálu (s příkazem XREAD) by se ihned měl objevit výsledek:

1) 1) "stream3"
   2) 1) 1) "1611329909780-0"
         2) 1) "foo"
            2) "1"
            3) "bar"
            4) "2"
(93.15s)
127.0.0.1:6379>

Čekání po dobu maximálně jedné sekundy na novou zprávu:

127.0.0.1:6379> xread BLOCK 1000 streams stream3 $
 
(nil)
(1.08s)
Poznámka: v tomto případě nová zpráva nepřišla, takže se vrátila hodnota nil po cca oné jedné sekundě.

12. Proud ve funkci databáze s časovými značkami: XRANGE a XREVRANGE

K proudu je možné přistupovat taktéž jako k databázi, v níž jsou jednotlivé záznamy identifikovány časovými značkami, přesněji řečeno časovou značkou (Unix time v milisekundách) následovanou pořadím zprávy. Vrácení většího množství zpráv v zadaném časovém intervalu zajišťují funkce XRANGE a XREVRANGE:

127.0.0.1:6379> help xrange
 
  XRANGE key start end [COUNT count]
  summary: Return a range of elements in a stream, with IDs matching the specified IDs interval
  since: 5.0.0
  group: stream

Pro specifikaci „od“ a „do“ je možné použít speciální identifikátory „-“ a „+“, které značí „minimální možný ID“ a „maximální možný ID“. Přečtení všech zpráv z proudu „stream1“ se tedy provede takto:

127.0.0.1:6379> xrange stream1 - +
 
1) 1) "1-0"
   2) 1) "x"
      2) "10"
      3) "y"
      4) "20"
2) 1) "2-0"
   2) 1) "x"
      2) "10"
      3) "y"
      4) "20"
3) 1) "4-0"
   2) 1) "x"
      2) "10"
      3) "y"
      4) "20"
4) 1) "1611328029392-0"
   2) 1) "x"
      2) "1"
      3) "y"
      4) "2"
5) 1) "1611328035713-0"
   2) 1) "x"
      2) "1"
      3) "y"
      4) "3"
6) 1) "1611328039051-0"
   2) 1) "x"
      2) "2"
      3) "y"
      4) "3"

Čtení z neexistujícího proudu je možné:

127.0.0.1:6379> xrange stream2 - +
 
(empty array)

Čtení zpráv z jiného proudu:

127.0.0.1:6379> xrange stream3 - +
 
1) 1) "0-1"
   2) 1) "data"
      2) "prvni"
2) 1) "0-2"
   2) 1) "data"
      2) "druhy"
3) 1) "0-3"
   2) 1) "data"
      2) "treti"
4) 1) "0-5"
   2) 1) "data"
      2) "paty"

Určení konkrétních ID „od“ a „do“:

127.0.0.1:6379> xrange stream3 0-1 0-4
 
1) 1) "0-1"
   2) 1) "data"
      2) "prvni"
2) 1) "0-2"
   2) 1) "data"
      2) "druhy"
3) 1) "0-3"
   2) 1) "data"
      2) "treti"

ID ve skutečnosti nemusí existovat:

127.0.0.1:6379> xrange stream3 0-2 0-10000
 
1) 1) "0-2"
   2) 1) "data"
      2) "druhy"
2) 1) "0-3"
   2) 1) "data"
      2) "treti"
3) 1) "0-5"
   2) 1) "data"
      2) "paty"

Určení rozsahu ID, v němž se nenachází žádné zprávy:

127.0.0.1:6379> xrange stream3 0-100 0-10000
 
(empty array)

Omezení počtu vrácených zpráv parametrem COUNT:

127.0.0.1:6379> xrange stream3 - + count 3
 
1) 1) "0-1"
   2) 1) "data"
      2) "prvni"
2) 1) "0-2"
   2) 1) "data"
      2) "druhy"
3) 1) "0-3"
   2) 1) "data"
      2) "treti"

Vrácení zpráv v opačném pořadí příkazem XREVRANGE:

127.0.0.1:6379> XREVRANGE stream1 + -
 
1) 1) "1611328039051-0"
   2) 1) "x"
      2) "2"
      3) "y"
      4) "3"
2) 1) "1611328035713-0"
   2) 1) "x"
      2) "1"
      3) "y"
      4) "3"
3) 1) "1611328029392-0"
   2) 1) "x"
      2) "1"
      3) "y"
      4) "2"
4) 1) "4-0"
   2) 1) "x"
      2) "10"
      3) "y"
      4) "20"
5) 1) "2-0"
   2) 1) "x"
      2) "10"
      3) "y"
      4) "20"
6) 1) "1-0"
   2) 1) "x"
      2) "10"
      3) "y"
      4) "20"

13. Připojení konzumenta k většímu množství proudů

Jednoho konzumenta je možné připojit k více proudům. V takovém případě konzument přečte (operací READ) první zprávu z libovolného proudu, která odpovídá zadanému ID. Typicky takový konzument očekává nové zprávy s blokujícím čekáním na tyto zprávy. V popisovaném případě se zadají nejdříve jména proudů a posléze i speciální ID zapisované znakem dolaru, které znamenají, že se má přečíst nová zpráva (která ještě do proudu nebyla připojena):

127.0.0.1:6379> xread BLOCK 0 streams streamA streamB $ $

V novém terminálu nyní připojíme zprávu do proudu „streamA“:

127.0.0.1:6379> xadd streamA * description messageA1
 
"1611339744871-0"

V prvním terminálu uvidíme, že zpráva byla skutečně ihned přečtena:

1) 1) "streamA"
   2) 1) 1) "1611339744871-0"
         2) 1) "description"
            2) "messageA1"
(61.71s)

Nové spuštění konzumenta:

127.0.0.1:6379> xread BLOCK 0 streams streamA streamB $ $

Nyní připojíme zprávu do druhého proudu „streamB“:

127.0.0.1:6379> xadd streamB * description messageB1
 
"1611339766311-0"

Zpráva je konzumentem ihned přečtena:

1) 1) "streamB"
   2) 1) 1) "1611339766311-0"
         2) 1) "description"
            2) "messageB1"
(8.77s)
Poznámka: nyní je zřejmé, proč jsou zprávy přečtené operací READ vráceny v poli, v němž se vždy vyskytuje jméno proudu, ze kterého byla zpráva získána.

14. Základní operace nad proudy z Pythonu

Nyní si – prozatím alespoň ve stručnosti – popíšeme operace, které lze s proudy Redisu provádět v programovacího jazyka Python s využitím knihovny walrus, kterou jsme nainstalovali v rámci třetí kapitoly. Tato knihovna nabízí několik tříd, z nichž základní se jmenuje Database. Tato třída představuje rozhraní k Redisu a lze ji inicializovat následovně:

db = Database()
Poznámka: pokud nejsou uvedeny žádné parametry, je provedeno připojení k lokálně dostupnému Redisu na portu 6379.

Následně je možné získat instanci třídy Stream, která reprezentuje rozhraní k jednomu konkrétnímu proudu. Ten je uveden jako parametr konstruktoru této třídy:

stream = db.Stream("streamX")

Celý příklad, který získá rozhraní k proudu „streamX“ v lokálně běžícím Redisu, může vypadat následovně:

from walrus import Database
 
db = Database()
stream = db.Stream("streamX")
 
print(db)
print(stream)
Poznámka: zdrojový kód tohoto příkladu naleznete na adrese https://github.com/tisnik/py-redis-examples/blob/master/stre­ams01_create_stream.py.

15. Připojení zprávy či zpráv na konec proudu

Pro připojení zprávy na konec proudu se v Pythonu používá metoda nazvaná add ze třídy Stream. Této metodě, která odpovídá příkazu XADD, lze v nepovinném parametru předat ID zprávy a zejména její obsah, který může být tvořen slovníkem s libovolným obsahem. Teoreticky může mít každá zpráva uložená do proudu odlišnou strukturu (ta se nekontroluje), v praxi to ovšem není příliš časté.

Připojení jediné zprávy do proudu „streamX“ metodou Stream.add. Připojení je neblokující operací:

from walrus import Database
 
db = Database()
stream = db.Stream("streamX")
 
message_id = stream.add({"foo": 10,
                         "bar": 20})
print(message_id)
Poznámka: zdrojový kód tohoto příkladu naleznete na adrese https://github.com/tisnik/py-redis-examples/blob/master/stre­ams02_add_message.py.

V následujícím příkladu je do proudu nazvaného „streamX“ vloženo deset zpráv se stejnou strukturou, ovšem s odlišnými hodnotami uloženými pod jednotlivými klíči. Pokud proud neexistuje, tak je automaticky vytvořen. ID zpráv je vygenerováno automaticky na základě času jejich vložení do proudu:

from walrus import Database
 
db = Database()
stream = db.Stream("streamX")
 
for i in range(0, 10):
    message_id = stream.add({"foo": i+1,
                             "bar": i*2})
    print(message_id)

Tento příklad po svém spuštění vypíše ID vytvořených (připojených) zpráv. Povšimněte si, že se vrací pole bajtů (tedy nikoli řetězec Pythonu 3) a taktéž toho, že skript je dostatečně rychlý na to, aby v jedné milisekundě uložil více zpráv, které se musí odlišovat svým pořadovým číslem:

b'1611340770619-0'
b'1611340770620-0'
b'1611340770620-1'
b'1611340770620-2'
b'1611340770620-3'
b'1611340770620-4'
b'1611340770621-0'
b'1611340770621-1'
b'1611340770621-2'
b'1611340770621-3'
Poznámka: zdrojový kód tohoto příkladu naleznete na adrese https://github.com/tisnik/py-redis-examples/blob/master/stre­ams03_add_messages.py.

16. Přečtení všech zpráv z proudu

Nejčastěji prováděnou operací, kterou musí konzumenti (či workeři) provádět, je přečtení všech zpráv z vybraného proudu. Tato operace se s využitím knihovny walrus ve skutečnosti provede velmi jednoduše, protože dostupné zprávy získáme následujícím způsobem ve formě sekvence, kterou lze převést na seznam (pokud je k dispozici dostatek paměti) a následně s ní provádět všechny operace typu slicingu atd.:

messages = list(stream)

Tato operace je neblokující; případné čtení s čekáním na příchod zprávy se realizuje metodou Stream.read popsanou v navazující kapitole.

from walrus import Database
 
db = Database()
stream = db.Stream("streamX")
 
messages = list(stream)
print(messages)
Poznámka: zdrojový kód tohoto příkladu naleznete na adrese https://github.com/tisnik/py-redis-examples/blob/master/stre­ams04_read_messages.py.

Podívejme se nyní na to, jaký seznam vlastně získáme, pokud přečteme zprávy vložené do proudu „streamX“ v rámci předchozího demonstračního příkladu:

$ python3 04_read_messages.py

Vrácená datová struktura bude mít tento obsah:

[(b'1611340576679-0', {b'foo': b'10', b'bar': b'20'}),
 (b'1611340770619-0', {b'foo': b'1', b'bar': b'0'}),
 (b'1611340770620-0', {b'foo': b'2', b'bar': b'2'}),
 (b'1611340770620-1', {b'foo': b'3', b'bar': b'4'}),
 (b'1611340770620-2', {b'foo': b'4', b'bar': b'6'}),
 (b'1611340770620-3', {b'foo': b'5', b'bar': b'8'}),
 (b'1611340770620-4', {b'foo': b'6', b'bar': b'10'}),
 (b'1611340770621-0', {b'foo': b'7', b'bar': b'12'}),
 (b'1611340770621-1', {b'foo': b'8', b'bar': b'14'}),
 (b'1611340770621-2', {b'foo': b'9', b'bar': b'16'}),
 (b'1611340770621-3', {b'foo': b'10', b'bar': b'18'})]
Poznámka: povšimněte si, že jsme ve skutečnosti nezískali zprávy v původní podobě, protože celočíselné hodnoty i řetězce byly převedeny na pole bajtů. Serializace a deserializace zpráv tedy může obsahovat „pasti“, na které by měl být vývojář připraven. Na druhou stranu nám ale nic nebrání použít pro serializaci a deserializaci zpráv knihovnu pickle, což je téma, kterému se budeme věnovat příště.

17. Blokující čekání na příchozí zprávu

Posledním demonstračním příkladem, který si v dnešním článku ukážeme, je blokující čekání na příchozí zprávu. Tato operace je provedena metodou read třídy Stream. U čtení můžeme specifikovat několik parametrů, především ID již přečtené zprávy a taktéž příznak blokující operace s případným určením maximální doby čekání na příchod zprávy:

bitcoin školení listopad 24

from walrus import Database
 
db = Database()
stream = db.Stream("streamX")
 
message = stream.read(block=0, last_id="$")
print(message)
Poznámka: zdrojový kód tohoto příkladu naleznete na adrese https://github.com/tisnik/py-redis-examples/blob/master/stre­ams05_read_new_message.py.

18. Repositář s demonstračními příklady

Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v Pythonu byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/py-redis-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce:

# Demonstrační příklad Popis Cesta
1 streams01_create_stream.py připojení k Redisu a konstrukce objektu typu stream https://github.com/tisnik/py-redis-examples/blob/master/stre­ams01_create_stream.py
2 streams02_add_message.py přidání zprávy do proudu metodou add https://github.com/tisnik/py-redis-examples/blob/master/stre­ams02_add_message.py
3 streams03_add_messages.py rychlé přidání několika zpráv do proudu metodou add https://github.com/tisnik/py-redis-examples/blob/master/stre­ams03_add_messages.py
4 streams04_read_messages.py neblokující přečtení všech zpráv, které jsou uloženy ve vybraném proudu https://github.com/tisnik/py-redis-examples/blob/master/stre­ams04_read_messages.py
5 streams05_read_new_message.py blokující čekání na příchod nové zprávy https://github.com/tisnik/py-redis-examples/blob/master/stre­ams05_read_new_message.py

19. Předchozí články o Redisu

Se systémem Redis jsme se již na stránkách Rootu setkali, a to dokonce několikrát. Buď jsme si popisovali přímo přístup k Redisu z různých programovacích jazyků (což je konkrétně případ všech dále zmíněných článků zaměřených na jazyky Python a Go) nebo byl Redis použit ve funkci databáze, resp. perzistentního úložiště různými message brokery (Celery, RQ, apod.):

  1. Databáze Redis (nejenom) pro vývojáře používající Python
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/
  2. Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/
  3. Použití databáze Redis v aplikacích naprogramovaných v Go
    https://www.root.cz/clanky/pouziti-databaze-redis-v-aplikacich-naprogramovanych-v-go/
  4. Použití databáze Redis v aplikacích naprogramovaných v Go (2)
    https://www.root.cz/clanky/pouziti-databaze-redis-v-aplikacich-naprogramovanych-v-go-2/
  5. Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí
    https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/

20. Odkazy na Internetu

  1. Repositář knihovny walrus na GitHubu
    https://github.com/coleifer/walrus/
  2. Knihovna walrus na PyPi
    https://pypi.org/project/walrus/
  3. Stránky projektu Redis
    https://redis.io/
  4. Introduction to Redis
    https://redis.io/topics/introduction
  5. Try Redis
    http://try.redis.io/
  6. Redis tutorial, April 2010 (starší, ale pěkně udělaný)
    https://static.simonwilli­son.net/static/2010/redis-tutorial/
  7. Python Redis
    https://redislabs.com/lp/python-redis/
  8. Redis: key-value databáze v paměti i na disku
    https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/
  9. Scripting Redis with Lua
    https://redislabs.com/ebook/part-3-next-steps/chapter-11-scripting-redis-with-lua/
  10. Redis Lua script for atomic operations and cache stampede
    https://engineering.linecor­p.com/en/blog/redis-lua-scripting-atomic-processing-cache/
  11. Redis Lua Scripts – Itamar Haber
    https://www.youtube.com/wat­ch?v=eReTl8NhHCs
  12. Building Databases with Redis Tutorial: Lua Script | packtpub.com
    https://www.youtube.com/wat­ch?v=mMfGNsAr7Bg
  13. Příkaz pro spuštění skriptu v jazyce Lua: EVAL script numkeys key [key …] arg [arg …]
    https://redis.io/commands/eval
  14. Redis Lua scripts debugger
    https://redis.io/topics/ldb
  15. Repositář projektu s Redis klientem pro jazyk Go
    https://github.com/go-redis/redis
  16. Stránky programovacího jazyka Lua
    https://www.lua.org/
  17. Programovací jazyk Lua
    https://www.palmknihy.cz/ucebnice-odborna-literatura/programovaci-jazyk-lua-12651
  18. Programming in Lua
    https://www.lua.org/pil/
  19. Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
    http://www.cloudsvet.cz/?p=253
  20. Praktický úvod do Redis (2): transakce
    http://www.cloudsvet.cz/?p=256
  21. Praktický úvod do Redis (3): cluster
    http://www.cloudsvet.cz/?p=258
  22. Connection pool
    https://en.wikipedia.org/wi­ki/Connection_pool
  23. Instant Redis Sentinel Setup
    https://github.com/ServiceStack/redis-config
  24. How to install REDIS in LInux
    https://linuxtechlab.com/how-install-redis-server-linux/
  25. Redis RDB Dump File Format
    https://github.com/sripat­hikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format
  26. Lempel–Ziv–Welch
    https://en.wikipedia.org/wi­ki/Lempel%E2%80%93Ziv%E2%80%93­Welch
  27. Redis Persistence
    https://redis.io/topics/persistence
  28. Redis persistence demystified
    http://oldblog.antirez.com/post/redis-persistence-demystified.html
  29. Redis reliable queues with Lua scripting
    http://oldblog.antirez.com/post/250
  30. Ost (knihovna)
    https://github.com/soveran/ost
  31. NoSQL
    https://en.wikipedia.org/wiki/NoSQL
  32. Shard (database architecture)
    https://en.wikipedia.org/wi­ki/Shard_%28database_archi­tecture%29
  33. What is sharding and why is it important?
    https://stackoverflow.com/qu­estions/992988/what-is-sharding-and-why-is-it-important
  34. What Is Sharding?
    https://btcmanager.com/what-sharding/
  35. Redis clients
    https://redis.io/clients
  36. Category:Lua-scriptable software
    https://en.wikipedia.org/wi­ki/Category:Lua-scriptable_software
  37. Seriál Programovací jazyk Lua
    https://www.root.cz/seria­ly/programovaci-jazyk-lua/
  38. Redis memory usage
    http://nosql.mypopescu.com/pos­t/1010844204/redis-memory-usage
  39. Ukázka konfigurace Redisu pro lokální testování
    https://github.com/tisnik/pre­sentations/blob/master/re­dis/redis.conf
  40. Resque
    https://github.com/resque/resque
  41. Nested transaction
    https://en.wikipedia.org/wi­ki/Nested_transaction
  42. Publish–subscribe pattern
    https://en.wikipedia.org/wi­ki/Publish%E2%80%93subscri­be_pattern
  43. Messaging pattern
    https://en.wikipedia.org/wi­ki/Messaging_pattern
  44. Using pipelining to speedup Redis queries
    https://redis.io/topics/pipelining
  45. Pub/Sub
    https://redis.io/topics/pubsub
  46. ZeroMQ distributed messaging
    http://zeromq.org/
  47. Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
    https://stackoverflow.com/qu­estions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr
  48. Python & Redis PUB/SUB
    https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7
  49. Message broker
    https://en.wikipedia.org/wi­ki/Message_broker
  50. RESP Arrays
    https://redis.io/topics/protocol#array-reply
  51. Redis Protocol specification
    https://redis.io/topics/protocol
  52. Redis Pub/Sub: Intro Guide
    https://www.redisgreen.net/blog/pubsub-intro/
  53. Redis Pub/Sub: Howto Guide
    https://www.redisgreen.net/blog/pubsub-howto/
  54. Comparing Publish-Subscribe Messaging and Message Queuing
    https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message
  55. ActiveMQ
    http://activemq.apache.org/activemq-website/index.html
  56. Amazon Simple Queue Service
    https://aws.amazon.com/sqs/
  57. Apache Kafka
    https://kafka.apache.org/
  58. Cloud Pub/Sub
    https://cloud.google.com/pubsub/
  59. Introduction to Redis Streams
    https://redis.io/topics/streams-intro
  60. Introduction to Redis streams with Python
    http://charlesleifer.com/blog/redis-streams-with-python/
  61. glob (programming)
    https://en.wikipedia.org/wi­ki/Glob_(programming)
  62. Microservices: The Rise Of Kafka
    https://movio.co/blog/microservices-rise-kafka/
  63. Building a Microservices Ecosystem with Kafka Streams and KSQL
    https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/
  64. An introduction to Apache Kafka and microservices communication
    https://medium.com/@ulymarins/an-introduction-to-apache-kafka-and-microservices-communication-bf0a0966d63
  65. kappa-architecture.com
    http://milinda.pathirage.org/kappa-architecture.com/
  66. Questioning the Lambda Architecture
    https://www.oreilly.com/i­deas/questioning-the-lambda-architecture
  67. Lambda architecture
    https://en.wikipedia.org/wi­ki/Lambda_architecture
  68. Kafka – ecosystem (Wiki)
    https://cwiki.apache.org/con­fluence/display/KAFKA/Eco­system
  69. The Kafka Ecosystem – Kafka Core, Kafka Streams, Kafka Connect, Kafka REST Proxy, and the Schema Registry
    http://cloudurable.com/blog/kafka-ecosystem/index.html
  70. A Kafka Operator for Kubernetes
    https://github.com/krallistic/kafka-operator
  71. Kafka Streams
    https://cwiki.apache.org/con­fluence/display/KAFKA/Kaf­ka+Streams
  72. Kafka Streams
    http://kafka.apache.org/do­cumentation/streams/
  73. Kafka Streams (FAQ)
    https://cwiki.apache.org/con­fluence/display/KAFKA/FAQ#FAQ-Streams
  74. What are some alternatives to Apache Kafka?
    https://www.quora.com/What-are-some-alternatives-to-Apache-Kafka
  75. What is the best alternative to Kafka?
    https://www.slant.co/opti­ons/961/alternatives/~kaf­ka-alternatives
  76. A super quick comparison between Kafka and Message Queues
    https://hackernoon.com/a-super-quick-comparison-between-kafka-and-message-queues-e69742d855a8?gi=e965191e72d0
  77. Kafka Queuing: Kafka as a Messaging System
    https://dzone.com/articles/kafka-queuing-kafka-as-a-messaging-system

Autor článku

Vystudoval VUT FIT a v současné době pracuje na projektech vytvářených v jazycích Python a Go.