Proudy (streams) podporované systémem Redis (dokončení)

28. 1. 2021
Doba čtení: 25 minut

Sdílet

 Autor: Redis
Dnes navážeme popisem práce se skupinami konzumentů (consumer groups). V rámci skupiny konzumentů lze zprávy potvrzovat, popř. zjišťovat, které zprávy sice byly přečteny, ovšem bez explicitního potvrzení.

Obsah

1. Proudy (streams) podporované systémem Redis (dokončení)

2. Rychlost připojování zpráv na konec proudu

3. Rychlost čtení zpráv z proudu

4. Komunikační strategie podporované systémem Redis Streams

5. Jeden konzument zpracovávající zprávy z většího množství proudů

6. Fan-out: přečtení zprávy větším množstvím konzumentů

7. Deklarace skupiny konzumentů (Consumer Group)

8. Využití skupiny konzumentů pro čtení zpráv

9. Skupina konzumentů v Pythonu

10. Blokující čtení zpráv (čekání na nové zprávy)

11. Chování klauzule > při čtení zpráv ve skupině konzumentů

12. Příkazy XACK a XPENDING

13. Potvrzování zpráv příkazem XACK

14. Příkaz XPENDING

15. Potvrzování zpráv a příkaz XPENDING pro více skupin konzumentů

16. Mazání zpráv z proudu příkazem XDEL

17. Seznam všech příkazů používaných pro práci s proudy

18. Repositář s demonstračními příklady

19. Předchozí články o systému Redis

20. Odkazy na Internetu

1. Proudy (streams) podporované systémem Redis (dokončení)

Na úvodní článek o proudech (streams) podporovaných v systému Redis verze 5 a 6 dnes navážeme. Popíšeme si zejména některé způsoby použití takzvaných skupin konzumentů (consumer group), které mohou mít v praktickém nasazení Redis Streams velký význam. Demonstrační příklady budou založeny jak na standardním nástroji redis-cli, tak i (i když dnes v poněkud menší míře v porovnání s článkem předchozím) na programovacím jazyku Python a knihovně walrus.

Poznámka: před zkoušením dále popsaných demonstračních příkladů je nutné, aby (lokálně) běžel samotný Redis, resp. přesněji řečeno jeho serverová část. Tu lze po instalaci Redisu 5 nebo Redisu 6 spustit takto:
$ redis-server

Průběh spuštění i s informací, na kterém portu bude server naslouchat přicházejícím příkazům:

20676:C 21 Jan 2021 12:02:45.527 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
20676:C 21 Jan 2021 12:02:45.527 # Redis version=6.0.10, bits=64, commit=00000000, modified=0, pid=20676, just started
20676:C 21 Jan 2021 12:02:45.527 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
20676:M 21 Jan 2021 12:02:45.528 * Increased maximum number of open files to 10032 (it was originally set to 1024).
                _._
           _.-``__ ''-._
      _.-``    `.  `_.  ''-._           Redis 6.0.10 (00000000/0) 64 bit
  .-`` .-```.  ```\/    _.,_ ''-._
 (    '      ,       .-`  | `,    )     Running in standalone mode
 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 6379
 |    `-._   `._    /     _.-'    |     PID: 20676
  `-._    `-._  `-./  _.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |           http://redis.io
  `-._    `-._`-.__.-'_.-'    _.-'
 |`-._`-._    `-.__.-'    _.-'_.-'|
 |    `-._`-._        _.-'_.-'    |
  `-._    `-._`-.__.-'_.-'    _.-'
      `-._    `-.__.-'    _.-'
          `-._        _.-'
              `-.__.-'

20676:M 21 Jan 2021 12:02:45.530 # Server initialized
20676:M 21 Jan 2021 12:02:45.530 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.
20676:M 21 Jan 2021 12:02:45.530 * Loading RDB produced by version 6.0.10
20676:M 21 Jan 2021 12:02:45.531 * RDB age 5938 seconds
20676:M 21 Jan 2021 12:02:45.531 * RDB memory usage when created 0.84 Mb
20676:M 21 Jan 2021 12:02:45.531 * DB loaded from disk: 0.000 seconds
20676:M 21 Jan 2021 12:02:45.531 * Ready to accept connections

2. Rychlost připojování zpráv na konec proudu

Nejdříve si vyzkoušejme, jak rychlé může být připojování nových zpráv do proudu. Použijeme přitom programovací jazyk Python, přesněji řečeno jeho de facto standardní variantu CPython a vyzkoušíme si, jak rychlý je zápis zpráv vykonaný metodou Stream.add z knihovny walrus. Přitom musíme mít na paměti, že i když je samotný Redis velmi rychlým systémem (je naprogramován v C a může data udržovat přímo v operační paměti), interpret CPythonu naproti tomu patří mezi nejpomalejší současně používané mainstreamové programovací jazyky, takže čísla, k nimž dojdeme, mohou být při použití jiného jazyka mnohdy i mnohem vyšší:

import time
 
from walrus import Database
 
 
db = Database()
stream = db.Stream("streamY")
 
MESSAGES = 100000
 
start = time.time()
 
for i in range(0, MESSAGES):
    message_id = stream.add({"id": i,
                             "last": "y" if i == MESSAGES - 1 else "n"})
 
end = time.time()
print("Producent duration for {} messages: {} seconds".format(MESSAGES, (end - start)))
Poznámka: atribut „last“ je u poslední zprávy nastaven na „y“.

Tento demonstrační příklad po svém spuštění do proudu nazvaného „streamY“ připojí celkem 100000 zpráv a zjistí přitom celkovou dobu trvání těchto operací:

$ python 06_burst_message_writer.py 
 
Producent duration for 100000 messages: 8.259088277816772 seconds

Zápis všech 100000 zpráv trval přibližně osm sekund; naopak to tedy znamená, že je možné za jednu sekundu zapsat přibližně 12000 zpráv, což je na mikrobenchmark vytvořený v Pythonu poměrně slušné číslo.

Poznámka: samozřejmě je ještě vhodné otestovat dlouhodobou výkonnost a taktéž vliv delších zpráv na rychlost a propustnost producenta.

3. Rychlost čtení zpráv z proudu

Čtení zpráv lze provádět rozličnými způsoby (ideálně s využitím skupin konzumentů, což je podrobněji popsáno v navazujících kapitolách). Naivní konzument zpráv, který sám zjišťuje ID právě přečtené zprávy, které posléze předává metodě read pro přečtení zprávy následující, může vypadat následovně:

import time
 
from walrus import Database
 
db = Database()
stream = db.Stream("streamY")
 
counter = 0
 
start = time.time()
 
last_id = "0"
 
while True:
    messages = stream.read(block=0, last_id=last_id, count=1)
    message = messages[0]
    last_id = message[0]
    content = message[1]
    counter += 1
    if b"last" in content and content[b"last"] == b"y":
        break
 
end = time.time()
print("Consumer duration for {} messages: {} seconds".format(counter, (end - start)))
Poznámka: povšimněte si, jak na základě obsahu zprávy zjišťujeme, zda se má konzument ukončit či zda má přečíst následující zprávu. Tento test je založen na atributu „last“, jehož jméno i obsah jsou typu byte array (a nikoli běžný řetězec!).

Konzumenta spustíme s tím, že by měl přečíst zprávy vytvořené v rámci předchozí kapitoly, tedy celkem 100000 zpráv:

$ python 07_message_consumer.py 
 
Consumer duration for 100000 messages: 12.752091884613037 seconds

Vidíme, že čtení zpráv je v tomto případě pomalejší než zápis, ale to jen z toho důvodu, že se zprávy čtou po jedné a z předchozího článku již víme, že vyhledání zprávy pro zadané ID (tedy operace seek) má složitost O(log(N)) a nikoli konstantní složitost. Čtení lze ovšem provést i blokově:

import time
 
from walrus import Database
 
db = Database()
stream = db.Stream("streamY")
 
start = time.time()
 
messages = stream.read(last_id=0, count=100000)
counter = len(messages)
 
end = time.time()
print("Consumer duration for {} messages: {} seconds".format(counter, (end - start)))

Tato operace je již v porovnání s předchozím příkladem mnohem rychlejší, celkem lze takto přečíst přibližně 46000 zpráv za sekundu:

$ python 08_message_consumer.py 
 
Consumer duration for 100000 messages: 2.1339304447174072 seconds

4. Komunikační strategie podporované systémem Redis Streams

V navazujícím textu si popíšeme některé pokročilejší operace s proudy zpráv. Tyto operace se někdy nazývají (v tomto případě ovšem nepřesně) komunikační strategie, což je název převzatý z oblasti klasických message brokerů:

  1. Využití jediného konzumenta, který zpracovává (konzumuje) zprávy z většího množství proudů.
  2. Konzumace stejné zprávy (ze shodného proudu či proudů) větším množstvím konzumentů.
  3. Čtení zpráv v rámci skupiny konzumentů.
  4. Kombinace předchozích dvou možností.
  5. Potvrzování zpráv v rámci skupiny konzumentů.
  6. Mazání vybraných zpráv z proudu.

5. Jeden konzument zpracovávající zprávy z většího množství proudů

Systém Redis umožňuje, aby jeden konzument četl (jinými slovy konzumoval) zprávy z většího množství proudů, což je téma, s nímž jsme se ve stručnosti seznámili minule. Můžeme si to snadno vyzkoušet pokusem o přečtení zprávy z proudu nazvaného „streamA“ nebo „streamB“ – přečte se jediná zpráva systémem „kdo dřív přijde, ten dřív mele“ (dopředu tedy nelze říct, ze kterého proudu bude čtení zprávy provedeno). U obou proudů specifikujeme speciální ID $, které značí, že se má přečíst nová (ještě neexistující) zpráva; čtení je navíc blokující (s čekáním):

127.0.0.1:6379> xread BLOCK 0 streams streamA streamB $ $

V novém terminálu nyní připojíme zprávu na konec proudu nazvaného „streamA“:

127.0.0.1:6379> xadd streamA * description messageA1
 
"1611339744871-0"

V prvním terminálu uvidíme, že zpráva byla skutečně ihned přečtena a poté se příkaz read ukončí:

1) 1) "streamA"
   2) 1) 1) "1611339744871-0"
         2) 1) "description"
            2) "messageA1"
(61.71s)

Nové spuštění konzumenta, opět pro dvojici proudů „streamA“ a „streamB“:

127.0.0.1:6379> xread BLOCK 0 streams streamA streamB $ $

Nyní připojíme zprávu do druhého proudu pojmenovaného „streamB“:

127.0.0.1:6379> xadd streamB * description messageB1
 
"1611339766311-0"

Zpráva je konzumentem ihned přečtena a opět dojde k ukončení příkazu read:

1) 1) "streamB"
   2) 1) 1) "1611339766311-0"
         2) 1) "description"
            2) "messageB1"
(8.77s)
Poznámka: pokud budete vyžadovat přečtení většího množství zpráv, postačuje použít nepovinný parametr count.

6. Fan-out: přečtení zprávy větším množstvím konzumentů

Další možností nabízenou systémem Redis je přečtení zprávy větším množstvím konzumentů, což je operace, která se v oblasti message brokerů většinou označuje termínem fan-out. Nezáleží přitom na počtu konzumentů ani na tom, kdy si konzument zprávu či zprávy vyžádá (většinou je ovšem připojen v reálném čase a čeká na nové zprávy). Tuto možnost si můžeme velmi snadno otestovat.

Nejprve spustíme prvního konzumenta, a to v režimu blokujícího čtení (čekání na novou zprávu):

127.0.0.1:6379> xread BLOCK 0 streams stream3 $

V dalším terminálu spustíme druhého konzumenta, a to v naprosto stejném režimu:

127.0.0.1:6379> xread BLOCK 0 streams stream3 $

Nyní (v pořadí již třetím terminálu) pošleme zprávu do proudu nazvaného „stream3“, tedy do stejného proudu, jaký je používán oběma konzumenty:

127.0.0.1:6379> xadd stream3 * foo bar
 
"1611586562835-0"

V příslušných terminálech bychom nyní měli vidět, že oba konzumenti získali stejnou zprávu.

Terminál prvního konzumenta:

1) 1) "stream3"
   2) 1) 1) "1611586562835-0"
         2) 1) "foo"
            2) "bar"
(39.25s)

Terminál druhého konzumenta:

1) 1) "stream3"
   2) 1) 1) "1611586562835-0"
         2) 1) "foo"
            2) "bar"
(34.69s)
Poznámka: přijetí zprávy se v tomto případě žádným způsobem nepotvrzuje – každý konzument má naprostou volnost v určení, kterou zprávu či které zprávy potřebuje přečíst.

7. Deklarace skupiny konzumentů (Consumer Group)

V úvodní kapitole jsme si řekli, že konzumenti zpráv se mohou sdružovat do skupin konzumentů neboli consumer group(s), což je termín převzatý ze systému Apache Kafka. Pro danou skupinu konzumentů existují nové příkazy i nové možnosti, například možnost potvrzování zpráv atd. Nová skupina konzumentů se vytváří příkazem xgroup:

127.0.0.1:6379> help xgroup
 
  XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
  summary: Create, destroy, and manage consumer groups.
  since: 5.0.0
  group: stream

Následuje příklad vytvoření skupiny pro proud nazvaný „streamC“. Ten by měl existovat, takže v něm nejdříve vytvoříme nějakou zprávu:

127.0.0.1:6379> xadd streamC * description messageB1
 
"1611340061876-0"

Nyní již můžeme vytvořit novou skupinu a pojmenovat ji „groupC“ (nebo libovolně jinak). Navíc určíme, jaká zpráva z proudu „streamC“ se má považovat za „poslední doručenou zprávu“. Můžeme zde použít ID zprávy nebo i metaznak $:

127.0.0.1:6379> XGROUP CREATE streamC groupC $
 
OK

Alternativně lze vytvořit skupinu i pro neexistující proud, a to následovně:

127.0.0.1:6379> XGROUP CREATE streamD groupD $ MKSTREAM
 
OK

Čtení zpráv v rámci skupiny konzumentů zajišťuje nový příkaz XREADGROUP (tedy nikoli XREAD):

127.0.0.1:6379> help xreadgroup
 
  XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  summary: Return new entries from a stream using a consumer group, or access the history of the pending entries for a given consumer. Can block.
  since: 5.0.0
  group: stream

Praktickou ukázku si uvedeme v navazující kapitole.

8. Využití skupiny konzumentů pro čtení zpráv

Podívejme se nyní na způsob využití skupiny konzumentů pro čtení zpráv z vybraných proudů. Připomeňme si, že běžné přečtení zprávy z proudu (bez ohledu na skupiny) se provádí příkazem XREAD:

127.0.0.1:6379> xread streams streamX 0
1) 1) "streamX"
   2) 1) 1) "1611340576679-0"
         2) 1) "foo"
            2) "10"
            3) "bar"
            4) "20"

V případě použití consumer group se namísto příkazu XREAD používá příkaz XREADGROUP, který má navíc poněkud odlišné parametry. Specifikovat je nutné jméno skupiny, jméno konzumenta (to může být libovolný řetězec), seznam proudů (typicky jediného proudu) pro čtení a taktéž ID zprávy. Namísto tohoto ID lze použít speciální znak <, který naznačuje, že ID zprávy se má získat z interní datové struktury vytvořené pro danou skupiny (je tedy podobné konceptu offsetů ze systému Apache Kafka, které jsou taktéž pro skupinu ukládány):

127.0.0.1:6379> XREADGROUP BLOCK 0 GROUP groupC consumer1 STREAMS streamC <

Čtení můžeme spustit i pro dalšího konzumenta ze stejné skupiny:

128.0.0.1:6379> XREADGROUP BLOCK 0 GROUP groupC consumer2 STREAMS streamC <

Pokud nyní do proudu nazvaného „streamC“ přidáme dvě zprávy, budou přečteny prvním či druhým konzumentem:

127.0.0.1:6379> xadd streamC * description message1
"1611340273067-0"
 
127.0.0.1:6379> xadd streamC * description message2
"1611340275822-0"

Výsledek (zpráva) vypsaná prvním konzumentem:

1) 1) "streamC"
   2) 1) 1) "1611340273067-0"
         2) 1) "description"
            2) "message1"
(30.36s)

Výsledek (zpráva) vypsaná konzumentem druhým:

1) 1) "streamC"
   2) 1) 1) "1611340275822-0"
         2) 1) "description"
            2) "message2"
(14.38s)

9. Skupina konzumentů v Pythonu

Podobně jako tomu bylo v předchozím článku si i dnes ukažme, jakým způsobem je možné vytvořit konzumenta zpráv aktivního v rámci nějaké skupiny konzumentů. Opět použijeme knihovnu walrus. Nejprve zkonstruujeme objekt představující skupinu konzumentů. Povšimněte si, že při konstrukci lze specifikovat seznam (či n-tici) s názvy proudů – skupina konzumentů tedy může číst data z většího množství proudů:

cg = db.consumer_group("a-group", ["streamX"])

Dále skupinu vytvoříme (pokud ještě neexistuje):

cg.create()

Ve třetím kroku je nutné specifikovat, která zpráva bude pro skupinu konzumentů považována ve výchozím stavu za přečtenou. Pokud uvedeme $, znamená to, že skupina konzumentů bude čekat a konzumovat pouze nové zprávy, nezávisle na tom, jestli již nějaké zprávy v proudu existují:

cg.set_id('$')
Poznámka: ID této zprávy se uloží do interní struktury udržující informace o stavu skupiny konzumentů.

Dále je již možné zprávy číst a nějakým způsobem zpracovávat:

while True:
    messages = cg.read(block=0, count=1)
    print(messages[0])

Následuje úplný zdrojový kód tohoto příkladu:

from walrus import Database
 
db = Database()
 
cg = db.consumer_group("a-group", ["streamX"])
cg.create()
cg.set_id('$')
 
while True:
    messages = cg.read(block=0, count=1)
    print(messages[0])

Nyní spustíme dva konzumenty, každý v samostatném terminálu:

$ python streams_09_read_new_messages_group.py
$ python streams_09_read_new_messages_group.py

Ve třetím terminálu spustíme producenta zpráv:

$ python 03_add_messages.py
 
b'1611587949338-0'
b'1611587949339-0'
b'1611587949339-1'
b'1611587949340-0'
b'1611587949340-1'
b'1611587949340-2'
b'1611587949341-0'
b'1611587949341-1'
b'1611587949341-2'
b'1611587949342-0'

První konzument může zpracovat liché zprávy:

[b'streamX', [(b'1611587949338-0', {b'foo': b'1', b'bar': b'0'})]]
[b'streamX', [(b'1611587949339-1', {b'foo': b'3', b'bar': b'4'})]]
[b'streamX', [(b'1611587949340-1', {b'foo': b'5', b'bar': b'8'})]]
[b'streamX', [(b'1611587949341-0', {b'foo': b'7', b'bar': b'12'})]]
[b'streamX', [(b'1611587949341-2', {b'foo': b'9', b'bar': b'16'})]]

A druhý zprávy sudé:

[b'streamX', [(b'1611587949339-0', {b'foo': b'2', b'bar': b'2'})]]
[b'streamX', [(b'1611587949340-0', {b'foo': b'4', b'bar': b'6'})]]
[b'streamX', [(b'1611587949340-2', {b'foo': b'6', b'bar': b'10'})]]
[b'streamX', [(b'1611587949341-1', {b'foo': b'8', b'bar': b'14'})]]
[b'streamX', [(b'1611587949342-0', {b'foo': b'10', b'bar': b'18'})]]
Poznámka: ve skutečnosti je náhoda, že se zprávy rozdělily mezi oba konzumenty takto spravedlivě. Při dalším spuštění můžeme dostat odlišné výsledky, ovšem obecná zásada rozdělování zpráv mezi konzumenty (round-robin) bude zachována.

10. Blokující čtení zpráv (čekání na nové zprávy) v Pythonu

V předchozím demonstračním příkladu jsme zprávy četli po jedné, tedy s využitím následující nekonečné smyčky:

while True:
    messages = cg.read(block=0, count=1)
    print(messages[0])

Ovšem zajímavější a rychlejší je neomezit se pouze na čtení jediné zprávy a provést čtení n-zpráv, které byly do proudu přidány od předchozího čtení (víme již, že ID naposledy přečtené zprávy je uloženo v samotném Redisu). To v důsledku vede k dvojici vnořených programových smyček:

while True:
    messages = cg.read(block=0)
    for message in messages:
        print(message)

Úplný zdrojový kód konzumenta zpráv patřící do skupiny „a-group“ může ve své vylepšené podobě vypadat následovně:

from walrus import Database
 
db = Database()
 
cg = db.consumer_group("a-group", ["streamX"])
cg.create()
cg.set_id('$')
 
while True:
    messages = cg.read(block=0)
    for message in messages:
        print(message)

11. Chování klauzule > při čtení zpráv ve skupině konzumentů

Vraťme se ještě ke klauzuli > (specifikace zprávy čtené ve skupině konzumentů). Jedná se o náhradu konkrétního ID zprávy za ID automaticky uložené v Redisu pro zvolenou skupinu konzumentů.

Připravíme si nový proud „streamZ“ i skupinu „groupZ“ postupem, který již dobře známe:

127.0.0.1:6379> XADD streamZ * foo 42
"1611601803117-0"
 
127.0.0.1:6379> XGROUP CREATE streamZ groupZ $
OK

Dále do proudu pojmenovaného „streamZ“ vložíme šestici zpráv:

127.0.0.1:6379> XADD streamZ * m 1
"1611602123499-0"
 
127.0.0.1:6379> XADD streamZ * m 2
"1611602124883-0"
 
127.0.0.1:6379> XADD streamZ * m 3
"1611602125635-0"
 
127.0.0.1:6379> XADD streamZ * m 4
"1611602126323-0"
 
127.0.0.1:6379> XADD streamZ * m 5
"1611602127019-0"
 
127.0.0.1:6379> XADD streamZ * m 6
"1611602127955-0"

První pokus o přečtení zprávy ve skupině „groupZ“ vrátí první zprávu (používáme již klauzuli >):

127.0.0.1:6379> XREADGROUP GROUP groupZ consumer1 COUNT 1 STREAMS streamZ >
1) 1) "streamZ"
   2) 1) 1) "1611602123499-0"
         2) 1) "m"
            2) "1"

Zcela totožný příkaz zavolaný o chvíli později přečte druhou zprávu, protože se ID naposledy přečtené zprávy automaticky zapamatovalo:

127.0.0.1:6379> XREADGROUP GROUP groupZ consumer1 COUNT 1 STREAMS streamZ >
1) 1) "streamZ"
   2) 1) 1) "1611602124883-0"
         2) 1) "m"
            2) "2"

Přečtení zbylých čtyř zpráv:

127.0.0.1:6379> XREADGROUP GROUP groupZ consumer1 STREAMS streamZ >
1) 1) "streamZ"
   2) 1) 1) "1611602125635-0"
         2) 1) "m"
            2) "3"
      2) 1) "1611602126323-0"
         2) 1) "m"
            2) "4"
      3) 1) "1611602127019-0"
         2) 1) "m"
            2) "5"
      4) 1) "1611602127955-0"
         2) 1) "m"
            2) "6"

Pokus o další čtení již nevrátí žádné zprávy:

127.0.0.1:6379> XREADGROUP GROUP groupZ consumer1 STREAMS streamZ >
(nil)

A to ani když změníme jméno konzumenta:

127.0.0.1:6379> XREADGROUP GROUP groupZ consumer2 STREAMS streamZ >
(nil)

Nic nám však nebrání kdykoli provést „přetočení“ na libovolnou zprávu s ID (zde na první zprávu v proudu):

127.0.0.1:6379> XREADGROUP GROUP groupZ consumer1 STREAMS streamZ 0
1) 1) "streamZ"
   2) 1) 1) "1611602123499-0"
         2) 1) "m"
            2) "1"
      2) 1) "1611602124883-0"
         2) 1) "m"
            2) "2"
      3) 1) "1611602125635-0"
         2) 1) "m"
            2) "3"
      4) 1) "1611602126323-0"
         2) 1) "m"
            2) "4"
      5) 1) "1611602127019-0"
         2) 1) "m"
            2) "5"
      6) 1) "1611602127955-0"
         2) 1) "m"
            2) "6"

Jakmile byly zprávy přečteny jedním konzumentem, druhý konzument ve stejné skupině je nezíská:

127.0.0.1:6379> XREADGROUP GROUP groupZ consumer2 STREAMS streamZ 0
1) 1) "streamZ"
   2) (empty array)

12. Příkazy XACK a XPENDING

V případě, že jsou zprávy pouze přečteny příkazem XREADGROUP, nachází se ve stavu „pending“. Jedná se o zprávy již poslané nějakému konzumentu, který však jejich zpracování nepotvrdil. Pro potvrzování zpráv se používá příkaz XACK:

127.0.0.1:6379> help xack
 
  XACK key group ID [ID ...]
  summary: Marks a pending message as correctly processed, effectively removing it from the pending entries list of the consumer group. Return value of the command is the number of messages successfully acknowledged, that is, the IDs we were actually able to resolve in the PEL.
  since: 5.0.0
  group: stream

Pro zjištění stavu potvrzení zpráv se používá příkaz XPENDING:

127.0.0.1:6379> help xpending
 
  XPENDING key group [start end count] [consumer]
  summary: Return information and entries from a stream consumer group pending entries list, that are messages fetched but never acknowledged.
  since: 5.0.0
  group: stream

Příklady použití těchto dvou příkazů si ukážeme v navazujících dvou kapitolách.

13. Potvrzování zpráv příkazem XACK

Ukažme si nejdříve příklad použití příkazu XACK pro potvrzování zpracování zpráv. Pro tento účel vytvoříme další proud „streamW“ a skupinu „groupW“:

127.0.0.1:6379> XGROUP CREATE streamW groupW $ MKSTREAM
OK

Do proudu přidáme šestici zpráv:

127.0.0.1:6379> XADD streamW * w 0
"1611603108014-0"
 
127.0.0.1:6379> XADD streamW * w 1
"1611603110062-0"
 
127.0.0.1:6379> XADD streamW * w 2
"1611603111094-0"
 
127.0.0.1:6379> XADD streamW * w 3
"1611603112133-0"
 
127.0.0.1:6379> XADD streamW * w 4
"1611603113032-0"
 
127.0.0.1:6379> XADD streamW * w 5
"1611603113901-0"

Zprávy pochopitelně můžeme přečíst, prozatím bez potvrzení:

127.0.0.1:6379> XREADGROUP GROUP groupW c1 STREAMS streamW 0
1) 1) "streamW"
   2) 1) 1) "1611603108014-0"
         2) 1) "w"
            2) "0"
      2) 1) "1611603110062-0"
         2) 1) "w"
            2) "1"
      3) 1) "1611603111094-0"
         2) 1) "w"
            2) "2"
      4) 1) "1611603112133-0"
         2) 1) "w"
            2) "3"
      5) 1) "1611603113032-0"
         2) 1) "w"
            2) "4"
      6) 1) "1611603113901-0"
         2) 1) "w"
            2) "5"

Druhé čtení vrátí ty samé zprávy, tedy původní šestici:

127.0.0.1:6379> XREADGROUP GROUP groupW c1 STREAMS streamW 0
1) 1) "streamW"
   2) 1) 1) "1611603108014-0"
         2) 1) "w"
            2) "0"
      2) 1) "1611603110062-0"
         2) 1) "w"
            2) "1"
      3) 1) "1611603111094-0"
         2) 1) "w"
            2) "2"
      4) 1) "1611603112133-0"
         2) 1) "w"
            2) "3"
      5) 1) "1611603113032-0"
         2) 1) "w"
            2) "4"
      6) 1) "1611603113901-0"
         2) 1) "w"
            2) "5"

Potvrdíme zpracování první zprávy, a to uvedením proudu, skupiny i ID zprávy:

127.0.0.1:6379> XACK streamW groupW 1611603108014-0
(integer) 1

Nyní pokus o přečtení všech zpráv vrátí pouze pětici zpráv a nikoli šestici (první zpráva byla potvrzena a pro skupinu zapomenuta):

127.0.0.1:6379> XREADGROUP GROUP groupW c1 STREAMS streamW 0
1) 1) "streamW"
   2) 1) 1) "1611603110062-0"
         2) 1) "w"
            2) "1"
      2) 1) "1611603111094-0"
         2) 1) "w"
            2) "2"
      3) 1) "1611603112133-0"
         2) 1) "w"
            2) "3"
      4) 1) "1611603113032-0"
         2) 1) "w"
            2) "4"
      5) 1) "1611603113901-0"
         2) 1) "w"
            2) "5"

Potvrzení dalších zpráv (zde konkrétně čtvrté):

127.0.0.1:6379> XACK streamW groupW 1611603113032-0
(integer) 1

Zpětné potvrzení dřívější zprávy lze provést:

127.0.0.1:6379> XACK streamW groupW 1611603110062-0
(integer) 1

Potvrzeny byly tři zprávy ze šesti, takže by nás výsledek dalšího čtení již neměl překvapit:

127.0.0.1:6379> XREADGROUP GROUP groupW c1 STREAMS streamW 0
1) 1) "streamW"
   2) 1) 1) "1611603111094-0"
         2) 1) "w"
            2) "2"
      2) 1) "1611603112133-0"
         2) 1) "w"
            2) "3"
      3) 1) "1611603113901-0"
         2) 1) "w"
            2) "5"

14. Příkaz XPENDING

Příkaz XPENDING vrátí pro danou skupinu a proud informaci o tom, jaké zprávy nebyly schváleny:

127.0.0.1:6379> XPENDING streamC groupC
1) (integer) 2
2) "1611340273067-0"
3) "1611340275822-0"
4) 1) 1) "consumer1"
      2) "2"

V našem konkrétním případě nebyly schváleny dvě přečtené zprávy. Navíc se vrátila dvojice ID (nejmenší a největší) neschválených zpráv a dále počet neschválených zpráv pro každého konzumenta (zde je pouze jediný konzument).

Totéž můžeme provést pro proud „streamW“ a skupinu „groupW“ vytvořené v rámci předchozí kapitoly:

127.0.0.1:6379> XPENDING streamW groupW
1) (integer) 3
2) "1611603111094-0"
3) "1611603113901-0"
4) 1) 1) "c1"
      2) "3"

Zkusíme nyní zprávy přečíst (a neschválit), ale odlišným konzumentem:

127.0.0.1:6379> XREADGROUP GROUP groupW c2 STREAMS streamW >
1) 1) "streamW"
   2) 1) 1) "1611603448428-0"
         2) 1) "w"
            2) "6"
      2) 1) "1611603449340-0"
         2) 1) "w"
            2) "7"
      3) 1) "1611603450140-0"
         2) 1) "w"
            2) "8"

Nyní bude výsledek příkazu XPENDING odlišný, protože se v posledním prvku pole vrátí informace o dvou konzumentech:

127.0.0.1:6379> XPENDING streamW groupW
1) (integer) 6
2) "1611603111094-0"
3) "1611603450140-0"
4) 1) 1) "c1"
      2) "3"
   2) 1) "c2"
      2) "3"

15. Potvrzování zpráv a příkaz XPENDING pro více skupin konzumentů

Kombinace potvrzování zpráv a příkazu XPENDING tvoří společně se skupinami konzumentů nejdůležitější část práce s proudy v praxi, takže se podívejme ještě na jeden příklad, který je nepatrně komplikovanější, než příklady předchozí.

Vytvoříme dvě skupiny konzumentů, ovšem nad stejným (novým) proudem nazvaným „streamQ“:

127.0.0.1:6379> XGROUP CREATE streamQ group1 $ MKSTREAM
OK
 
127.0.0.1:6379> XGROUP CREATE streamQ group2 $ MKSTREAM
OK

Do proudu sdíleného oběma skupinami přidáme pětici zpráv:

127.0.0.1:6379> xadd streamQ * m 1
"1611603635755-0"
 
127.0.0.1:6379> xadd streamQ * m 2
"1611603636610-0"
 
127.0.0.1:6379> xadd streamQ * m 3
"1611603637331-0"
 
127.0.0.1:6379> xadd streamQ * m 4
"1611603638563-0"
 
127.0.0.1:6379> xadd streamQ * m 5
"1611603639379-0"

Informace o nepotvrzených zprávách musí být pro obě skupiny stejné – žádná taková zpráva neexistuje:

127.0.0.1:6379> XPENDING streamQ group1
1) (integer) 0
2) (nil)
3) (nil)
4) (nil)

a:

127.0.0.1:6379> XPENDING streamQ group2
1) (integer) 0
2) (nil)
3) (nil)
4) (nil)

Přečtení všech zpráv v rámci první skupiny konzumentů:

127.0.0.1:6379> xreadgroup group group1 c1 streams streamQ >
1) 1) "streamQ"
   2) 1) 1) "1611603635755-0"
         2) 1) "m"
            2) "1"
      2) 1) "1611603636610-0"
         2) 1) "m"
            2) "2"
      3) 1) "1611603637331-0"
         2) 1) "m"
            2) "3"
      4) 1) "1611603638563-0"
         2) 1) "m"
            2) "4"
      5) 1) "1611603639379-0"
         2) 1) "m"
            2) "5"

Všech pět zpráv je nepotvrzených:

127.0.0.1:6379> XPENDING streamQ group1
1) (integer) 5
2) "1611603635755-0"
3) "1611603639379-0"
4) 1) 1) "c1"
      2) "5"

Přečtení jedné zprávy v rámci druhé skupiny konzumentů:

127.0.0.1:6379> xreadgroup group group2 c2 count 1 streams streamQ >
1) 1) "streamQ"
   2) 1) 1) "1611603635755-0"
         2) 1) "m"
            2) "1"

Pro tuto skupinu nyní existuje jen jedna nepotvrzená zpráva:

127.0.0.1:6379> XPENDING streamQ group2
1) (integer) 1
2) "1611603635755-0"
3) "1611603635755-0"
4) 1) 1) "c2"
      2) "1"

Potvrzení zprávy ve druhé skupině:

127.0.0.1:6379> xack streamQ group2 1611603635755-0
(integer) 1

Přečtení první nepřečtené a neschválené zprávy:

127.0.0.1:6379> xreadgroup group group2 c2 count 1 streams streamQ >
1) 1) "streamQ"
   2) 1) 1) "1611603636610-0"
         2) 1) "m"
            2) "2"

16. Mazání zpráv z proudu příkazem XDEL

Posledním příkazem, o němž se v dnešním článku zmíníme, je příkaz XDEL, který dokáže z proudu odstranit vybrané zprávy. Tomuto příkazu se předává jméno proudu a seznam ID zpráv, které se mají smazat (přitom některé z těchto zpráv nemusí existovat, například kvůli souběžnému přístupu):

127.0.0.1:6379> help xdel
 
  XDEL key ID [ID ...]
  summary: Removes the specified entries from the stream. Returns the number of items actually deleted, that may be different from the number of IDs passed in case certain IDs do not exist.
  since: 5.0.0
  group: stream
Poznámka: tímto příkazem lze alespoň do jisté míry implementovat retention policy, která je v případě systému Apache Kafka prováděna zcela automaticky na základě zadaných kritérií (počtu zpráv, celkové velikosti tématu, platnosti zpráv atd.).

17. Seznam všech příkazů používaných pro práci s proudy

V předchozím i dnešním článku byly popsány následující příkazy určené pro práci s proudy v systému Redis:

bitcoin_skoleni

# Příkaz Stručný popis
1 XADD připojení zprávy na konec proudu
2 XLEN získání počtu zpráv v proudu
3 XREAD čtení (blokující i neblokující) zpráv z proudu či proudů
4 XRANGE získání zpráv pro daný rozsah jejich ID
5 XREVRANGE podobné XRANGE, ovšem zprávy jsou vráceny v opačném pořadí
6 XGROUP vytvoření skupiny konzumentů
7 XREADGROUP (blokující) čtení zpráv v rámci skupiny konzumentů
8 XACK potvrzení přijetí a zpracování zprávy
9 XPENDING získání informací o nepotvrzených zprávách
10 XDEL smazání zprávy či zpráv z proudu

18. Repositář s demonstračními příklady

Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v Pythonu byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/py-redis-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce:

# Demonstrační příklad Popis Cesta
1 streams01_create_stream.py připojení k Redisu a konstrukce objektu typu stream https://github.com/tisnik/py-redis-examples/blob/master/stre­ams01_create_stream.py
2 streams02_add_message.py přidání zprávy do proudu metodou add https://github.com/tisnik/py-redis-examples/blob/master/stre­ams02_add_message.py
3 streams03_add_messages.py rychlé přidání několika zpráv do proudu metodou add https://github.com/tisnik/py-redis-examples/blob/master/stre­ams03_add_messages.py
4 streams04_read_messages.py neblokující přečtení všech zpráv, které jsou uloženy ve vybraném proudu https://github.com/tisnik/py-redis-examples/blob/master/stre­ams04_read_messages.py
5 streams05_read_new_message.py blokující čekání na příchod nové zprávy https://github.com/tisnik/py-redis-examples/blob/master/stre­ams05_read_new_message.py
       
6 streams06_burst_message_producer.py vytvoření 100000 zpráv s měřením času jejich připojení do proudu https://github.com/tisnik/py-redis-examples/blob/master/stre­ams06_burst_message_produ­cer.py
7 streams07_message_consumer.py postupná konzumace 100000 zpráv https://github.com/tisnik/py-redis-examples/blob/master/stre­ams07_message_consumer.py
8 streams08_message_consumer.py konzumace 100000 jedinou blokovou operací https://github.com/tisnik/py-redis-examples/blob/master/stre­ams08_message_consumer.py
9 streams09_read_new_messages_group.py základní použití skupin konzumentů https://github.com/tisnik/py-redis-examples/blob/master/stre­ams09_read_new_messages_grou­p.py
10 streams10_read_new_messages_group.py blokující čekání na nové zprávy ve skupině konzumentů https://github.com/tisnik/py-redis-examples/blob/master/stre­ams10_read_new_messages_grou­p.py

19. Předchozí články systému Redis

Se systémem Redis jsme se již na stránkách Rootu setkali, a to dokonce několikrát. Buď jsme si popisovali přímo přístup k Redisu z různých programovacích jazyků (což je konkrétně případ všech dále zmíněných článků zaměřených na jazyky Python a Go) nebo byl Redis použit ve funkci databáze, resp. perzistentního úložiště různými message brokery (Celery, RQ, apod.). Poslední v seznamu je článek, na který dnes navazujeme:

  1. Databáze Redis (nejenom) pro vývojáře používající Python
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/
  2. Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
    https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/
  3. Použití databáze Redis v aplikacích naprogramovaných v Go
    https://www.root.cz/clanky/pouziti-databaze-redis-v-aplikacich-naprogramovanych-v-go/
  4. Použití databáze Redis v aplikacích naprogramovaných v Go (2)
    https://www.root.cz/clanky/pouziti-databaze-redis-v-aplikacich-naprogramovanych-v-go-2/
  5. Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí
    https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/
  6. Proudy (streams) podporované systémem Redis
    https://www.root.cz/clanky/proudy-streams-podporovane-systemem-redis/

20. Odkazy na Internetu

  1. Repositář knihovny walrus na GitHubu
    https://github.com/coleifer/walrus/
  2. Knihovna walrus na PyPi
    https://pypi.org/project/walrus/
  3. Stránky projektu Redis
    https://redis.io/
  4. Introduction to Redis
    https://redis.io/topics/introduction
  5. Try Redis
    http://try.redis.io/
  6. Redis tutorial, April 2010 (starší, ale pěkně udělaný)
    https://static.simonwilli­son.net/static/2010/redis-tutorial/
  7. Python Redis
    https://redislabs.com/lp/python-redis/
  8. Redis: key-value databáze v paměti i na disku
    https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/
  9. Scripting Redis with Lua
    https://redislabs.com/ebook/part-3-next-steps/chapter-11-scripting-redis-with-lua/
  10. Redis Lua script for atomic operations and cache stampede
    https://engineering.linecor­p.com/en/blog/redis-lua-scripting-atomic-processing-cache/
  11. Redis Lua Scripts – Itamar Haber
    https://www.youtube.com/wat­ch?v=eReTl8NhHCs
  12. Building Databases with Redis Tutorial: Lua Script | packtpub.com
    https://www.youtube.com/wat­ch?v=mMfGNsAr7Bg
  13. Příkaz pro spuštění skriptu v jazyce Lua: EVAL script numkeys key [key …] arg [arg …]
    https://redis.io/commands/eval
  14. Redis Lua scripts debugger
    https://redis.io/topics/ldb
  15. Repositář projektu s Redis klientem pro jazyk Go
    https://github.com/go-redis/redis
  16. Stránky programovacího jazyka Lua
    https://www.lua.org/
  17. Programovací jazyk Lua
    https://www.palmknihy.cz/ucebnice-odborna-literatura/programovaci-jazyk-lua-12651
  18. Programming in Lua
    https://www.lua.org/pil/
  19. Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
    http://www.cloudsvet.cz/?p=253
  20. Praktický úvod do Redis (2): transakce
    http://www.cloudsvet.cz/?p=256
  21. Praktický úvod do Redis (3): cluster
    http://www.cloudsvet.cz/?p=258
  22. Connection pool
    https://en.wikipedia.org/wi­ki/Connection_pool
  23. Instant Redis Sentinel Setup
    https://github.com/ServiceStack/redis-config
  24. How to install REDIS in LInux
    https://linuxtechlab.com/how-install-redis-server-linux/
  25. Redis RDB Dump File Format
    https://github.com/sripat­hikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format
  26. Lempel–Ziv–Welch
    https://en.wikipedia.org/wi­ki/Lempel%E2%80%93Ziv%E2%80%93­Welch
  27. Redis Persistence
    https://redis.io/topics/persistence
  28. Redis persistence demystified
    http://oldblog.antirez.com/post/redis-persistence-demystified.html
  29. Redis reliable queues with Lua scripting
    http://oldblog.antirez.com/post/250
  30. Ost (knihovna)
    https://github.com/soveran/ost
  31. NoSQL
    https://en.wikipedia.org/wiki/NoSQL
  32. Shard (database architecture)
    https://en.wikipedia.org/wi­ki/Shard_%28database_archi­tecture%29
  33. What is sharding and why is it important?
    https://stackoverflow.com/qu­estions/992988/what-is-sharding-and-why-is-it-important
  34. What Is Sharding?
    https://btcmanager.com/what-sharding/
  35. Redis clients
    https://redis.io/clients
  36. Category:Lua-scriptable software
    https://en.wikipedia.org/wi­ki/Category:Lua-scriptable_software
  37. Seriál Programovací jazyk Lua
    https://www.root.cz/seria­ly/programovaci-jazyk-lua/
  38. Redis memory usage
    http://nosql.mypopescu.com/pos­t/1010844204/redis-memory-usage
  39. Ukázka konfigurace Redisu pro lokální testování
    https://github.com/tisnik/pre­sentations/blob/master/re­dis/redis.conf
  40. Resque
    https://github.com/resque/resque
  41. Nested transaction
    https://en.wikipedia.org/wi­ki/Nested_transaction
  42. Publish–subscribe pattern
    https://en.wikipedia.org/wi­ki/Publish%E2%80%93subscri­be_pattern
  43. Messaging pattern
    https://en.wikipedia.org/wi­ki/Messaging_pattern
  44. Using pipelining to speedup Redis queries
    https://redis.io/topics/pipelining
  45. Pub/Sub
    https://redis.io/topics/pubsub
  46. ZeroMQ distributed messaging
    http://zeromq.org/
  47. Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
    https://stackoverflow.com/qu­estions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr
  48. Python & Redis PUB/SUB
    https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7
  49. Message broker
    https://en.wikipedia.org/wi­ki/Message_broker
  50. RESP Arrays
    https://redis.io/topics/protocol#array-reply
  51. Redis Protocol specification
    https://redis.io/topics/protocol
  52. Redis Pub/Sub: Intro Guide
    https://www.redisgreen.net/blog/pubsub-intro/
  53. Redis Pub/Sub: Howto Guide
    https://www.redisgreen.net/blog/pubsub-howto/
  54. Comparing Publish-Subscribe Messaging and Message Queuing
    https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message
  55. ActiveMQ
    http://activemq.apache.org/activemq-website/index.html
  56. Amazon Simple Queue Service
    https://aws.amazon.com/sqs/
  57. Apache Kafka
    https://kafka.apache.org/
  58. Cloud Pub/Sub
    https://cloud.google.com/pubsub/
  59. Introduction to Redis Streams
    https://redis.io/topics/streams-intro
  60. Introduction to Redis streams with Python
    http://charlesleifer.com/blog/redis-streams-with-python/
  61. glob (programming)
    https://en.wikipedia.org/wi­ki/Glob_(programming)
  62. Microservices: The Rise Of Kafka
    https://movio.co/blog/microservices-rise-kafka/
  63. Building a Microservices Ecosystem with Kafka Streams and KSQL
    https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/
  64. An introduction to Apache Kafka and microservices communication
    https://medium.com/@ulymarins/an-introduction-to-apache-kafka-and-microservices-communication-bf0a0966d63
  65. kappa-architecture.com
    http://milinda.pathirage.org/kappa-architecture.com/
  66. Questioning the Lambda Architecture
    https://www.oreilly.com/i­deas/questioning-the-lambda-architecture
  67. Lambda architecture
    https://en.wikipedia.org/wi­ki/Lambda_architecture
  68. Kafka – ecosystem (Wiki)
    https://cwiki.apache.org/con­fluence/display/KAFKA/Eco­system
  69. The Kafka Ecosystem – Kafka Core, Kafka Streams, Kafka Connect, Kafka REST Proxy, and the Schema Registry
    http://cloudurable.com/blog/kafka-ecosystem/index.html
  70. A Kafka Operator for Kubernetes
    https://github.com/krallistic/kafka-operator
  71. Kafka Streams
    https://cwiki.apache.org/con­fluence/display/KAFKA/Kaf­ka+Streams
  72. Kafka Streams
    http://kafka.apache.org/do­cumentation/streams/
  73. Kafka Streams (FAQ)
    https://cwiki.apache.org/con­fluence/display/KAFKA/FAQ#FAQ-Streams
  74. What are some alternatives to Apache Kafka?
    https://www.quora.com/What-are-some-alternatives-to-Apache-Kafka
  75. What is the best alternative to Kafka?
    https://www.slant.co/opti­ons/961/alternatives/~kaf­ka-alternatives
  76. A super quick comparison between Kafka and Message Queues
    https://hackernoon.com/a-super-quick-comparison-between-kafka-and-message-queues-e69742d855a8?gi=e965191e72d0
  77. Kafka Queuing: Kafka as a Messaging System
    https://dzone.com/articles/kafka-queuing-kafka-as-a-messaging-system

Autor článku

Vystudoval VUT FIT a v současné době pracuje na projektech vytvářených v jazycích Python a Go.