Obsah
1. Proudy (streams) podporované systémem Redis (dokončení)
2. Rychlost připojování zpráv na konec proudu
3. Rychlost čtení zpráv z proudu
4. Komunikační strategie podporované systémem Redis Streams
5. Jeden konzument zpracovávající zprávy z většího množství proudů
6. Fan-out: přečtení zprávy větším množstvím konzumentů
7. Deklarace skupiny konzumentů (Consumer Group)
8. Využití skupiny konzumentů pro čtení zpráv
9. Skupina konzumentů v Pythonu
10. Blokující čtení zpráv (čekání na nové zprávy)
11. Chování klauzule > při čtení zpráv ve skupině konzumentů
13. Potvrzování zpráv příkazem XACK
15. Potvrzování zpráv a příkaz XPENDING pro více skupin konzumentů
16. Mazání zpráv z proudu příkazem XDEL
17. Seznam všech příkazů používaných pro práci s proudy
18. Repositář s demonstračními příklady
19. Předchozí články o systému Redis
1. Proudy (streams) podporované systémem Redis (dokončení)
Na úvodní článek o proudech (streams) podporovaných v systému Redis verze 5 a 6 dnes navážeme. Popíšeme si zejména některé způsoby použití takzvaných skupin konzumentů (consumer group), které mohou mít v praktickém nasazení Redis Streams velký význam. Demonstrační příklady budou založeny jak na standardním nástroji redis-cli, tak i (i když dnes v poněkud menší míře v porovnání s článkem předchozím) na programovacím jazyku Python a knihovně walrus.
$ redis-server
Průběh spuštění i s informací, na kterém portu bude server naslouchat přicházejícím příkazům:
20676:C 21 Jan 2021 12:02:45.527 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo 20676:C 21 Jan 2021 12:02:45.527 # Redis version=6.0.10, bits=64, commit=00000000, modified=0, pid=20676, just started 20676:C 21 Jan 2021 12:02:45.527 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf 20676:M 21 Jan 2021 12:02:45.528 * Increased maximum number of open files to 10032 (it was originally set to 1024). _._ _.-``__ ''-._ _.-`` `. `_. ''-._ Redis 6.0.10 (00000000/0) 64 bit .-`` .-```. ```\/ _.,_ ''-._ ( ' , .-` | `, ) Running in standalone mode |`-._`-...-` __...-.``-._|'` _.-'| Port: 6379 | `-._ `._ / _.-' | PID: 20676 `-._ `-._ `-./ _.-' _.-' |`-._`-._ `-.__.-' _.-'_.-'| | `-._`-._ _.-'_.-' | http://redis.io `-._ `-._`-.__.-'_.-' _.-' |`-._`-._ `-.__.-' _.-'_.-'| | `-._`-._ _.-'_.-' | `-._ `-._`-.__.-'_.-' _.-' `-._ `-.__.-' _.-' `-._ _.-' `-.__.-' 20676:M 21 Jan 2021 12:02:45.530 # Server initialized 20676:M 21 Jan 2021 12:02:45.530 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect. 20676:M 21 Jan 2021 12:02:45.530 * Loading RDB produced by version 6.0.10 20676:M 21 Jan 2021 12:02:45.531 * RDB age 5938 seconds 20676:M 21 Jan 2021 12:02:45.531 * RDB memory usage when created 0.84 Mb 20676:M 21 Jan 2021 12:02:45.531 * DB loaded from disk: 0.000 seconds 20676:M 21 Jan 2021 12:02:45.531 * Ready to accept connections
2. Rychlost připojování zpráv na konec proudu
Nejdříve si vyzkoušejme, jak rychlé může být připojování nových zpráv do proudu. Použijeme přitom programovací jazyk Python, přesněji řečeno jeho de facto standardní variantu CPython a vyzkoušíme si, jak rychlý je zápis zpráv vykonaný metodou Stream.add z knihovny walrus. Přitom musíme mít na paměti, že i když je samotný Redis velmi rychlým systémem (je naprogramován v C a může data udržovat přímo v operační paměti), interpret CPythonu naproti tomu patří mezi nejpomalejší současně používané mainstreamové programovací jazyky, takže čísla, k nimž dojdeme, mohou být při použití jiného jazyka mnohdy i mnohem vyšší:
import time from walrus import Database db = Database() stream = db.Stream("streamY") MESSAGES = 100000 start = time.time() for i in range(0, MESSAGES): message_id = stream.add({"id": i, "last": "y" if i == MESSAGES - 1 else "n"}) end = time.time() print("Producent duration for {} messages: {} seconds".format(MESSAGES, (end - start)))
Tento demonstrační příklad po svém spuštění do proudu nazvaného „streamY“ připojí celkem 100000 zpráv a zjistí přitom celkovou dobu trvání těchto operací:
$ python 06_burst_message_writer.py Producent duration for 100000 messages: 8.259088277816772 seconds
Zápis všech 100000 zpráv trval přibližně osm sekund; naopak to tedy znamená, že je možné za jednu sekundu zapsat přibližně 12000 zpráv, což je na mikrobenchmark vytvořený v Pythonu poměrně slušné číslo.
3. Rychlost čtení zpráv z proudu
Čtení zpráv lze provádět rozličnými způsoby (ideálně s využitím skupin konzumentů, což je podrobněji popsáno v navazujících kapitolách). Naivní konzument zpráv, který sám zjišťuje ID právě přečtené zprávy, které posléze předává metodě read pro přečtení zprávy následující, může vypadat následovně:
import time from walrus import Database db = Database() stream = db.Stream("streamY") counter = 0 start = time.time() last_id = "0" while True: messages = stream.read(block=0, last_id=last_id, count=1) message = messages[0] last_id = message[0] content = message[1] counter += 1 if b"last" in content and content[b"last"] == b"y": break end = time.time() print("Consumer duration for {} messages: {} seconds".format(counter, (end - start)))
Konzumenta spustíme s tím, že by měl přečíst zprávy vytvořené v rámci předchozí kapitoly, tedy celkem 100000 zpráv:
$ python 07_message_consumer.py Consumer duration for 100000 messages: 12.752091884613037 seconds
Vidíme, že čtení zpráv je v tomto případě pomalejší než zápis, ale to jen z toho důvodu, že se zprávy čtou po jedné a z předchozího článku již víme, že vyhledání zprávy pro zadané ID (tedy operace seek) má složitost O(log(N)) a nikoli konstantní složitost. Čtení lze ovšem provést i blokově:
import time from walrus import Database db = Database() stream = db.Stream("streamY") start = time.time() messages = stream.read(last_id=0, count=100000) counter = len(messages) end = time.time() print("Consumer duration for {} messages: {} seconds".format(counter, (end - start)))
Tato operace je již v porovnání s předchozím příkladem mnohem rychlejší, celkem lze takto přečíst přibližně 46000 zpráv za sekundu:
$ python 08_message_consumer.py Consumer duration for 100000 messages: 2.1339304447174072 seconds
4. Komunikační strategie podporované systémem Redis Streams
V navazujícím textu si popíšeme některé pokročilejší operace s proudy zpráv. Tyto operace se někdy nazývají (v tomto případě ovšem nepřesně) komunikační strategie, což je název převzatý z oblasti klasických message brokerů:
- Využití jediného konzumenta, který zpracovává (konzumuje) zprávy z většího množství proudů.
- Konzumace stejné zprávy (ze shodného proudu či proudů) větším množstvím konzumentů.
- Čtení zpráv v rámci skupiny konzumentů.
- Kombinace předchozích dvou možností.
- Potvrzování zpráv v rámci skupiny konzumentů.
- Mazání vybraných zpráv z proudu.
5. Jeden konzument zpracovávající zprávy z většího množství proudů
Systém Redis umožňuje, aby jeden konzument četl (jinými slovy konzumoval) zprávy z většího množství proudů, což je téma, s nímž jsme se ve stručnosti seznámili minule. Můžeme si to snadno vyzkoušet pokusem o přečtení zprávy z proudu nazvaného „streamA“ nebo „streamB“ – přečte se jediná zpráva systémem „kdo dřív přijde, ten dřív mele“ (dopředu tedy nelze říct, ze kterého proudu bude čtení zprávy provedeno). U obou proudů specifikujeme speciální ID $, které značí, že se má přečíst nová (ještě neexistující) zpráva; čtení je navíc blokující (s čekáním):
127.0.0.1:6379> xread BLOCK 0 streams streamA streamB $ $
V novém terminálu nyní připojíme zprávu na konec proudu nazvaného „streamA“:
127.0.0.1:6379> xadd streamA * description messageA1 "1611339744871-0"
V prvním terminálu uvidíme, že zpráva byla skutečně ihned přečtena a poté se příkaz read ukončí:
1) 1) "streamA" 2) 1) 1) "1611339744871-0" 2) 1) "description" 2) "messageA1" (61.71s)
Nové spuštění konzumenta, opět pro dvojici proudů „streamA“ a „streamB“:
127.0.0.1:6379> xread BLOCK 0 streams streamA streamB $ $
Nyní připojíme zprávu do druhého proudu pojmenovaného „streamB“:
127.0.0.1:6379> xadd streamB * description messageB1 "1611339766311-0"
Zpráva je konzumentem ihned přečtena a opět dojde k ukončení příkazu read:
1) 1) "streamB" 2) 1) 1) "1611339766311-0" 2) 1) "description" 2) "messageB1" (8.77s)
6. Fan-out: přečtení zprávy větším množstvím konzumentů
Další možností nabízenou systémem Redis je přečtení zprávy větším množstvím konzumentů, což je operace, která se v oblasti message brokerů většinou označuje termínem fan-out. Nezáleží přitom na počtu konzumentů ani na tom, kdy si konzument zprávu či zprávy vyžádá (většinou je ovšem připojen v reálném čase a čeká na nové zprávy). Tuto možnost si můžeme velmi snadno otestovat.
Nejprve spustíme prvního konzumenta, a to v režimu blokujícího čtení (čekání na novou zprávu):
127.0.0.1:6379> xread BLOCK 0 streams stream3 $
V dalším terminálu spustíme druhého konzumenta, a to v naprosto stejném režimu:
127.0.0.1:6379> xread BLOCK 0 streams stream3 $
Nyní (v pořadí již třetím terminálu) pošleme zprávu do proudu nazvaného „stream3“, tedy do stejného proudu, jaký je používán oběma konzumenty:
127.0.0.1:6379> xadd stream3 * foo bar "1611586562835-0"
V příslušných terminálech bychom nyní měli vidět, že oba konzumenti získali stejnou zprávu.
Terminál prvního konzumenta:
1) 1) "stream3" 2) 1) 1) "1611586562835-0" 2) 1) "foo" 2) "bar" (39.25s)
Terminál druhého konzumenta:
1) 1) "stream3" 2) 1) 1) "1611586562835-0" 2) 1) "foo" 2) "bar" (34.69s)
7. Deklarace skupiny konzumentů (Consumer Group)
V úvodní kapitole jsme si řekli, že konzumenti zpráv se mohou sdružovat do skupin konzumentů neboli consumer group(s), což je termín převzatý ze systému Apache Kafka. Pro danou skupinu konzumentů existují nové příkazy i nové možnosti, například možnost potvrzování zpráv atd. Nová skupina konzumentů se vytváří příkazem xgroup:
127.0.0.1:6379> help xgroup XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername] summary: Create, destroy, and manage consumer groups. since: 5.0.0 group: stream
Následuje příklad vytvoření skupiny pro proud nazvaný „streamC“. Ten by měl existovat, takže v něm nejdříve vytvoříme nějakou zprávu:
127.0.0.1:6379> xadd streamC * description messageB1 "1611340061876-0"
Nyní již můžeme vytvořit novou skupinu a pojmenovat ji „groupC“ (nebo libovolně jinak). Navíc určíme, jaká zpráva z proudu „streamC“ se má považovat za „poslední doručenou zprávu“. Můžeme zde použít ID zprávy nebo i metaznak $:
127.0.0.1:6379> XGROUP CREATE streamC groupC $ OK
Alternativně lze vytvořit skupinu i pro neexistující proud, a to následovně:
127.0.0.1:6379> XGROUP CREATE streamD groupD $ MKSTREAM OK
Čtení zpráv v rámci skupiny konzumentů zajišťuje nový příkaz XREADGROUP (tedy nikoli XREAD):
127.0.0.1:6379> help xreadgroup XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...] summary: Return new entries from a stream using a consumer group, or access the history of the pending entries for a given consumer. Can block. since: 5.0.0 group: stream
Praktickou ukázku si uvedeme v navazující kapitole.
8. Využití skupiny konzumentů pro čtení zpráv
Podívejme se nyní na způsob využití skupiny konzumentů pro čtení zpráv z vybraných proudů. Připomeňme si, že běžné přečtení zprávy z proudu (bez ohledu na skupiny) se provádí příkazem XREAD:
127.0.0.1:6379> xread streams streamX 0 1) 1) "streamX" 2) 1) 1) "1611340576679-0" 2) 1) "foo" 2) "10" 3) "bar" 4) "20"
V případě použití consumer group se namísto příkazu XREAD používá příkaz XREADGROUP, který má navíc poněkud odlišné parametry. Specifikovat je nutné jméno skupiny, jméno konzumenta (to může být libovolný řetězec), seznam proudů (typicky jediného proudu) pro čtení a taktéž ID zprávy. Namísto tohoto ID lze použít speciální znak <, který naznačuje, že ID zprávy se má získat z interní datové struktury vytvořené pro danou skupiny (je tedy podobné konceptu offsetů ze systému Apache Kafka, které jsou taktéž pro skupinu ukládány):
127.0.0.1:6379> XREADGROUP BLOCK 0 GROUP groupC consumer1 STREAMS streamC <
Čtení můžeme spustit i pro dalšího konzumenta ze stejné skupiny:
128.0.0.1:6379> XREADGROUP BLOCK 0 GROUP groupC consumer2 STREAMS streamC <
Pokud nyní do proudu nazvaného „streamC“ přidáme dvě zprávy, budou přečteny prvním či druhým konzumentem:
127.0.0.1:6379> xadd streamC * description message1 "1611340273067-0" 127.0.0.1:6379> xadd streamC * description message2 "1611340275822-0"
Výsledek (zpráva) vypsaná prvním konzumentem:
1) 1) "streamC" 2) 1) 1) "1611340273067-0" 2) 1) "description" 2) "message1" (30.36s)
Výsledek (zpráva) vypsaná konzumentem druhým:
1) 1) "streamC" 2) 1) 1) "1611340275822-0" 2) 1) "description" 2) "message2" (14.38s)
9. Skupina konzumentů v Pythonu
Podobně jako tomu bylo v předchozím článku si i dnes ukažme, jakým způsobem je možné vytvořit konzumenta zpráv aktivního v rámci nějaké skupiny konzumentů. Opět použijeme knihovnu walrus. Nejprve zkonstruujeme objekt představující skupinu konzumentů. Povšimněte si, že při konstrukci lze specifikovat seznam (či n-tici) s názvy proudů – skupina konzumentů tedy může číst data z většího množství proudů:
cg = db.consumer_group("a-group", ["streamX"])
Dále skupinu vytvoříme (pokud ještě neexistuje):
cg.create()
Ve třetím kroku je nutné specifikovat, která zpráva bude pro skupinu konzumentů považována ve výchozím stavu za přečtenou. Pokud uvedeme $, znamená to, že skupina konzumentů bude čekat a konzumovat pouze nové zprávy, nezávisle na tom, jestli již nějaké zprávy v proudu existují:
cg.set_id('$')
Dále je již možné zprávy číst a nějakým způsobem zpracovávat:
while True: messages = cg.read(block=0, count=1) print(messages[0])
Následuje úplný zdrojový kód tohoto příkladu:
from walrus import Database db = Database() cg = db.consumer_group("a-group", ["streamX"]) cg.create() cg.set_id('$') while True: messages = cg.read(block=0, count=1) print(messages[0])
Nyní spustíme dva konzumenty, každý v samostatném terminálu:
$ python streams_09_read_new_messages_group.py
$ python streams_09_read_new_messages_group.py
Ve třetím terminálu spustíme producenta zpráv:
$ python 03_add_messages.py b'1611587949338-0' b'1611587949339-0' b'1611587949339-1' b'1611587949340-0' b'1611587949340-1' b'1611587949340-2' b'1611587949341-0' b'1611587949341-1' b'1611587949341-2' b'1611587949342-0'
První konzument může zpracovat liché zprávy:
[b'streamX', [(b'1611587949338-0', {b'foo': b'1', b'bar': b'0'})]] [b'streamX', [(b'1611587949339-1', {b'foo': b'3', b'bar': b'4'})]] [b'streamX', [(b'1611587949340-1', {b'foo': b'5', b'bar': b'8'})]] [b'streamX', [(b'1611587949341-0', {b'foo': b'7', b'bar': b'12'})]] [b'streamX', [(b'1611587949341-2', {b'foo': b'9', b'bar': b'16'})]]
A druhý zprávy sudé:
[b'streamX', [(b'1611587949339-0', {b'foo': b'2', b'bar': b'2'})]] [b'streamX', [(b'1611587949340-0', {b'foo': b'4', b'bar': b'6'})]] [b'streamX', [(b'1611587949340-2', {b'foo': b'6', b'bar': b'10'})]] [b'streamX', [(b'1611587949341-1', {b'foo': b'8', b'bar': b'14'})]] [b'streamX', [(b'1611587949342-0', {b'foo': b'10', b'bar': b'18'})]]
10. Blokující čtení zpráv (čekání na nové zprávy) v Pythonu
V předchozím demonstračním příkladu jsme zprávy četli po jedné, tedy s využitím následující nekonečné smyčky:
while True: messages = cg.read(block=0, count=1) print(messages[0])
Ovšem zajímavější a rychlejší je neomezit se pouze na čtení jediné zprávy a provést čtení n-zpráv, které byly do proudu přidány od předchozího čtení (víme již, že ID naposledy přečtené zprávy je uloženo v samotném Redisu). To v důsledku vede k dvojici vnořených programových smyček:
while True: messages = cg.read(block=0) for message in messages: print(message)
Úplný zdrojový kód konzumenta zpráv patřící do skupiny „a-group“ může ve své vylepšené podobě vypadat následovně:
from walrus import Database db = Database() cg = db.consumer_group("a-group", ["streamX"]) cg.create() cg.set_id('$') while True: messages = cg.read(block=0) for message in messages: print(message)
11. Chování klauzule > při čtení zpráv ve skupině konzumentů
Vraťme se ještě ke klauzuli > (specifikace zprávy čtené ve skupině konzumentů). Jedná se o náhradu konkrétního ID zprávy za ID automaticky uložené v Redisu pro zvolenou skupinu konzumentů.
Připravíme si nový proud „streamZ“ i skupinu „groupZ“ postupem, který již dobře známe:
127.0.0.1:6379> XADD streamZ * foo 42 "1611601803117-0" 127.0.0.1:6379> XGROUP CREATE streamZ groupZ $ OK
Dále do proudu pojmenovaného „streamZ“ vložíme šestici zpráv:
127.0.0.1:6379> XADD streamZ * m 1 "1611602123499-0" 127.0.0.1:6379> XADD streamZ * m 2 "1611602124883-0" 127.0.0.1:6379> XADD streamZ * m 3 "1611602125635-0" 127.0.0.1:6379> XADD streamZ * m 4 "1611602126323-0" 127.0.0.1:6379> XADD streamZ * m 5 "1611602127019-0" 127.0.0.1:6379> XADD streamZ * m 6 "1611602127955-0"
První pokus o přečtení zprávy ve skupině „groupZ“ vrátí první zprávu (používáme již klauzuli >):
127.0.0.1:6379> XREADGROUP GROUP groupZ consumer1 COUNT 1 STREAMS streamZ > 1) 1) "streamZ" 2) 1) 1) "1611602123499-0" 2) 1) "m" 2) "1"
Zcela totožný příkaz zavolaný o chvíli později přečte druhou zprávu, protože se ID naposledy přečtené zprávy automaticky zapamatovalo:
127.0.0.1:6379> XREADGROUP GROUP groupZ consumer1 COUNT 1 STREAMS streamZ > 1) 1) "streamZ" 2) 1) 1) "1611602124883-0" 2) 1) "m" 2) "2"
Přečtení zbylých čtyř zpráv:
127.0.0.1:6379> XREADGROUP GROUP groupZ consumer1 STREAMS streamZ > 1) 1) "streamZ" 2) 1) 1) "1611602125635-0" 2) 1) "m" 2) "3" 2) 1) "1611602126323-0" 2) 1) "m" 2) "4" 3) 1) "1611602127019-0" 2) 1) "m" 2) "5" 4) 1) "1611602127955-0" 2) 1) "m" 2) "6"
Pokus o další čtení již nevrátí žádné zprávy:
127.0.0.1:6379> XREADGROUP GROUP groupZ consumer1 STREAMS streamZ > (nil)
A to ani když změníme jméno konzumenta:
127.0.0.1:6379> XREADGROUP GROUP groupZ consumer2 STREAMS streamZ > (nil)
Nic nám však nebrání kdykoli provést „přetočení“ na libovolnou zprávu s ID (zde na první zprávu v proudu):
127.0.0.1:6379> XREADGROUP GROUP groupZ consumer1 STREAMS streamZ 0 1) 1) "streamZ" 2) 1) 1) "1611602123499-0" 2) 1) "m" 2) "1" 2) 1) "1611602124883-0" 2) 1) "m" 2) "2" 3) 1) "1611602125635-0" 2) 1) "m" 2) "3" 4) 1) "1611602126323-0" 2) 1) "m" 2) "4" 5) 1) "1611602127019-0" 2) 1) "m" 2) "5" 6) 1) "1611602127955-0" 2) 1) "m" 2) "6"
Jakmile byly zprávy přečteny jedním konzumentem, druhý konzument ve stejné skupině je nezíská:
127.0.0.1:6379> XREADGROUP GROUP groupZ consumer2 STREAMS streamZ 0 1) 1) "streamZ" 2) (empty array)
12. Příkazy XACK a XPENDING
V případě, že jsou zprávy pouze přečteny příkazem XREADGROUP, nachází se ve stavu „pending“. Jedná se o zprávy již poslané nějakému konzumentu, který však jejich zpracování nepotvrdil. Pro potvrzování zpráv se používá příkaz XACK:
127.0.0.1:6379> help xack XACK key group ID [ID ...] summary: Marks a pending message as correctly processed, effectively removing it from the pending entries list of the consumer group. Return value of the command is the number of messages successfully acknowledged, that is, the IDs we were actually able to resolve in the PEL. since: 5.0.0 group: stream
Pro zjištění stavu potvrzení zpráv se používá příkaz XPENDING:
127.0.0.1:6379> help xpending XPENDING key group [start end count] [consumer] summary: Return information and entries from a stream consumer group pending entries list, that are messages fetched but never acknowledged. since: 5.0.0 group: stream
Příklady použití těchto dvou příkazů si ukážeme v navazujících dvou kapitolách.
13. Potvrzování zpráv příkazem XACK
Ukažme si nejdříve příklad použití příkazu XACK pro potvrzování zpracování zpráv. Pro tento účel vytvoříme další proud „streamW“ a skupinu „groupW“:
127.0.0.1:6379> XGROUP CREATE streamW groupW $ MKSTREAM OK
Do proudu přidáme šestici zpráv:
127.0.0.1:6379> XADD streamW * w 0 "1611603108014-0" 127.0.0.1:6379> XADD streamW * w 1 "1611603110062-0" 127.0.0.1:6379> XADD streamW * w 2 "1611603111094-0" 127.0.0.1:6379> XADD streamW * w 3 "1611603112133-0" 127.0.0.1:6379> XADD streamW * w 4 "1611603113032-0" 127.0.0.1:6379> XADD streamW * w 5 "1611603113901-0"
Zprávy pochopitelně můžeme přečíst, prozatím bez potvrzení:
127.0.0.1:6379> XREADGROUP GROUP groupW c1 STREAMS streamW 0 1) 1) "streamW" 2) 1) 1) "1611603108014-0" 2) 1) "w" 2) "0" 2) 1) "1611603110062-0" 2) 1) "w" 2) "1" 3) 1) "1611603111094-0" 2) 1) "w" 2) "2" 4) 1) "1611603112133-0" 2) 1) "w" 2) "3" 5) 1) "1611603113032-0" 2) 1) "w" 2) "4" 6) 1) "1611603113901-0" 2) 1) "w" 2) "5"
Druhé čtení vrátí ty samé zprávy, tedy původní šestici:
127.0.0.1:6379> XREADGROUP GROUP groupW c1 STREAMS streamW 0 1) 1) "streamW" 2) 1) 1) "1611603108014-0" 2) 1) "w" 2) "0" 2) 1) "1611603110062-0" 2) 1) "w" 2) "1" 3) 1) "1611603111094-0" 2) 1) "w" 2) "2" 4) 1) "1611603112133-0" 2) 1) "w" 2) "3" 5) 1) "1611603113032-0" 2) 1) "w" 2) "4" 6) 1) "1611603113901-0" 2) 1) "w" 2) "5"
Potvrdíme zpracování první zprávy, a to uvedením proudu, skupiny i ID zprávy:
127.0.0.1:6379> XACK streamW groupW 1611603108014-0 (integer) 1
Nyní pokus o přečtení všech zpráv vrátí pouze pětici zpráv a nikoli šestici (první zpráva byla potvrzena a pro skupinu zapomenuta):
127.0.0.1:6379> XREADGROUP GROUP groupW c1 STREAMS streamW 0 1) 1) "streamW" 2) 1) 1) "1611603110062-0" 2) 1) "w" 2) "1" 2) 1) "1611603111094-0" 2) 1) "w" 2) "2" 3) 1) "1611603112133-0" 2) 1) "w" 2) "3" 4) 1) "1611603113032-0" 2) 1) "w" 2) "4" 5) 1) "1611603113901-0" 2) 1) "w" 2) "5"
Potvrzení dalších zpráv (zde konkrétně čtvrté):
127.0.0.1:6379> XACK streamW groupW 1611603113032-0 (integer) 1
Zpětné potvrzení dřívější zprávy lze provést:
127.0.0.1:6379> XACK streamW groupW 1611603110062-0 (integer) 1
Potvrzeny byly tři zprávy ze šesti, takže by nás výsledek dalšího čtení již neměl překvapit:
127.0.0.1:6379> XREADGROUP GROUP groupW c1 STREAMS streamW 0 1) 1) "streamW" 2) 1) 1) "1611603111094-0" 2) 1) "w" 2) "2" 2) 1) "1611603112133-0" 2) 1) "w" 2) "3" 3) 1) "1611603113901-0" 2) 1) "w" 2) "5"
14. Příkaz XPENDING
Příkaz XPENDING vrátí pro danou skupinu a proud informaci o tom, jaké zprávy nebyly schváleny:
127.0.0.1:6379> XPENDING streamC groupC 1) (integer) 2 2) "1611340273067-0" 3) "1611340275822-0" 4) 1) 1) "consumer1" 2) "2"
V našem konkrétním případě nebyly schváleny dvě přečtené zprávy. Navíc se vrátila dvojice ID (nejmenší a největší) neschválených zpráv a dále počet neschválených zpráv pro každého konzumenta (zde je pouze jediný konzument).
Totéž můžeme provést pro proud „streamW“ a skupinu „groupW“ vytvořené v rámci předchozí kapitoly:
127.0.0.1:6379> XPENDING streamW groupW 1) (integer) 3 2) "1611603111094-0" 3) "1611603113901-0" 4) 1) 1) "c1" 2) "3"
Zkusíme nyní zprávy přečíst (a neschválit), ale odlišným konzumentem:
127.0.0.1:6379> XREADGROUP GROUP groupW c2 STREAMS streamW > 1) 1) "streamW" 2) 1) 1) "1611603448428-0" 2) 1) "w" 2) "6" 2) 1) "1611603449340-0" 2) 1) "w" 2) "7" 3) 1) "1611603450140-0" 2) 1) "w" 2) "8"
Nyní bude výsledek příkazu XPENDING odlišný, protože se v posledním prvku pole vrátí informace o dvou konzumentech:
127.0.0.1:6379> XPENDING streamW groupW 1) (integer) 6 2) "1611603111094-0" 3) "1611603450140-0" 4) 1) 1) "c1" 2) "3" 2) 1) "c2" 2) "3"
15. Potvrzování zpráv a příkaz XPENDING pro více skupin konzumentů
Kombinace potvrzování zpráv a příkazu XPENDING tvoří společně se skupinami konzumentů nejdůležitější část práce s proudy v praxi, takže se podívejme ještě na jeden příklad, který je nepatrně komplikovanější, než příklady předchozí.
Vytvoříme dvě skupiny konzumentů, ovšem nad stejným (novým) proudem nazvaným „streamQ“:
127.0.0.1:6379> XGROUP CREATE streamQ group1 $ MKSTREAM OK 127.0.0.1:6379> XGROUP CREATE streamQ group2 $ MKSTREAM OK
Do proudu sdíleného oběma skupinami přidáme pětici zpráv:
127.0.0.1:6379> xadd streamQ * m 1 "1611603635755-0" 127.0.0.1:6379> xadd streamQ * m 2 "1611603636610-0" 127.0.0.1:6379> xadd streamQ * m 3 "1611603637331-0" 127.0.0.1:6379> xadd streamQ * m 4 "1611603638563-0" 127.0.0.1:6379> xadd streamQ * m 5 "1611603639379-0"
Informace o nepotvrzených zprávách musí být pro obě skupiny stejné – žádná taková zpráva neexistuje:
127.0.0.1:6379> XPENDING streamQ group1 1) (integer) 0 2) (nil) 3) (nil) 4) (nil)
a:
127.0.0.1:6379> XPENDING streamQ group2 1) (integer) 0 2) (nil) 3) (nil) 4) (nil)
Přečtení všech zpráv v rámci první skupiny konzumentů:
127.0.0.1:6379> xreadgroup group group1 c1 streams streamQ > 1) 1) "streamQ" 2) 1) 1) "1611603635755-0" 2) 1) "m" 2) "1" 2) 1) "1611603636610-0" 2) 1) "m" 2) "2" 3) 1) "1611603637331-0" 2) 1) "m" 2) "3" 4) 1) "1611603638563-0" 2) 1) "m" 2) "4" 5) 1) "1611603639379-0" 2) 1) "m" 2) "5"
Všech pět zpráv je nepotvrzených:
127.0.0.1:6379> XPENDING streamQ group1 1) (integer) 5 2) "1611603635755-0" 3) "1611603639379-0" 4) 1) 1) "c1" 2) "5"
Přečtení jedné zprávy v rámci druhé skupiny konzumentů:
127.0.0.1:6379> xreadgroup group group2 c2 count 1 streams streamQ > 1) 1) "streamQ" 2) 1) 1) "1611603635755-0" 2) 1) "m" 2) "1"
Pro tuto skupinu nyní existuje jen jedna nepotvrzená zpráva:
127.0.0.1:6379> XPENDING streamQ group2 1) (integer) 1 2) "1611603635755-0" 3) "1611603635755-0" 4) 1) 1) "c2" 2) "1"
Potvrzení zprávy ve druhé skupině:
127.0.0.1:6379> xack streamQ group2 1611603635755-0 (integer) 1
Přečtení první nepřečtené a neschválené zprávy:
127.0.0.1:6379> xreadgroup group group2 c2 count 1 streams streamQ > 1) 1) "streamQ" 2) 1) 1) "1611603636610-0" 2) 1) "m" 2) "2"
16. Mazání zpráv z proudu příkazem XDEL
Posledním příkazem, o němž se v dnešním článku zmíníme, je příkaz XDEL, který dokáže z proudu odstranit vybrané zprávy. Tomuto příkazu se předává jméno proudu a seznam ID zpráv, které se mají smazat (přitom některé z těchto zpráv nemusí existovat, například kvůli souběžnému přístupu):
127.0.0.1:6379> help xdel XDEL key ID [ID ...] summary: Removes the specified entries from the stream. Returns the number of items actually deleted, that may be different from the number of IDs passed in case certain IDs do not exist. since: 5.0.0 group: stream
17. Seznam všech příkazů používaných pro práci s proudy
V předchozím i dnešním článku byly popsány následující příkazy určené pro práci s proudy v systému Redis:
# | Příkaz | Stručný popis |
---|---|---|
1 | XADD | připojení zprávy na konec proudu |
2 | XLEN | získání počtu zpráv v proudu |
3 | XREAD | čtení (blokující i neblokující) zpráv z proudu či proudů |
4 | XRANGE | získání zpráv pro daný rozsah jejich ID |
5 | XREVRANGE | podobné XRANGE, ovšem zprávy jsou vráceny v opačném pořadí |
6 | XGROUP | vytvoření skupiny konzumentů |
7 | XREADGROUP | (blokující) čtení zpráv v rámci skupiny konzumentů |
8 | XACK | potvrzení přijetí a zpracování zprávy |
9 | XPENDING | získání informací o nepotvrzených zprávách |
10 | XDEL | smazání zprávy či zpráv z proudu |
18. Repositář s demonstračními příklady
Zdrojové kódy všech dnes popsaných demonstračních příkladů naprogramovaných v Pythonu byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/py-redis-examples (stále na GitHubu :-). V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má doslova několik kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce:
19. Předchozí články systému Redis
Se systémem Redis jsme se již na stránkách Rootu setkali, a to dokonce několikrát. Buď jsme si popisovali přímo přístup k Redisu z různých programovacích jazyků (což je konkrétně případ všech dále zmíněných článků zaměřených na jazyky Python a Go) nebo byl Redis použit ve funkci databáze, resp. perzistentního úložiště různými message brokery (Celery, RQ, apod.). Poslední v seznamu je článek, na který dnes navazujeme:
- Databáze Redis (nejenom) pro vývojáře používající Python
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python/ - Databáze Redis (nejenom) pro vývojáře používající Python (dokončení)
https://www.root.cz/clanky/databaze-redis-nejenom-pro-vyvojare-pouzivajici-python-dokonceni/ - Použití databáze Redis v aplikacích naprogramovaných v Go
https://www.root.cz/clanky/pouziti-databaze-redis-v-aplikacich-naprogramovanych-v-go/ - Použití databáze Redis v aplikacích naprogramovaných v Go (2)
https://www.root.cz/clanky/pouziti-databaze-redis-v-aplikacich-naprogramovanych-v-go-2/ - Použití nástroje RQ (Redis Queue) pro správu úloh zpracovávaných na pozadí
https://www.root.cz/clanky/pouziti-nastroje-rq-redis-queue-pro-spravu-uloh-zpracovavanych-na-pozadi/ - Proudy (streams) podporované systémem Redis
https://www.root.cz/clanky/proudy-streams-podporovane-systemem-redis/
20. Odkazy na Internetu
- Repositář knihovny walrus na GitHubu
https://github.com/coleifer/walrus/ - Knihovna walrus na PyPi
https://pypi.org/project/walrus/ - Stránky projektu Redis
https://redis.io/ - Introduction to Redis
https://redis.io/topics/introduction - Try Redis
http://try.redis.io/ - Redis tutorial, April 2010 (starší, ale pěkně udělaný)
https://static.simonwillison.net/static/2010/redis-tutorial/ - Python Redis
https://redislabs.com/lp/python-redis/ - Redis: key-value databáze v paměti i na disku
https://www.zdrojak.cz/clanky/redis-key-value-databaze-v-pameti-i-na-disku/ - Scripting Redis with Lua
https://redislabs.com/ebook/part-3-next-steps/chapter-11-scripting-redis-with-lua/ - Redis Lua script for atomic operations and cache stampede
https://engineering.linecorp.com/en/blog/redis-lua-scripting-atomic-processing-cache/ - Redis Lua Scripts – Itamar Haber
https://www.youtube.com/watch?v=eReTl8NhHCs - Building Databases with Redis Tutorial: Lua Script | packtpub.com
https://www.youtube.com/watch?v=mMfGNsAr7Bg - Příkaz pro spuštění skriptu v jazyce Lua: EVAL script numkeys key [key …] arg [arg …]
https://redis.io/commands/eval - Redis Lua scripts debugger
https://redis.io/topics/ldb - Repositář projektu s Redis klientem pro jazyk Go
https://github.com/go-redis/redis - Stránky programovacího jazyka Lua
https://www.lua.org/ - Programovací jazyk Lua
https://www.palmknihy.cz/ucebnice-odborna-literatura/programovaci-jazyk-lua-12651 - Programming in Lua
https://www.lua.org/pil/ - Praktický úvod do Redis (1): vaše distribuovaná NoSQL cache
http://www.cloudsvet.cz/?p=253 - Praktický úvod do Redis (2): transakce
http://www.cloudsvet.cz/?p=256 - Praktický úvod do Redis (3): cluster
http://www.cloudsvet.cz/?p=258 - Connection pool
https://en.wikipedia.org/wiki/Connection_pool - Instant Redis Sentinel Setup
https://github.com/ServiceStack/redis-config - How to install REDIS in LInux
https://linuxtechlab.com/how-install-redis-server-linux/ - Redis RDB Dump File Format
https://github.com/sripathikrishnan/redis-rdb-tools/wiki/Redis-RDB-Dump-File-Format - Lempel–Ziv–Welch
https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Welch - Redis Persistence
https://redis.io/topics/persistence - Redis persistence demystified
http://oldblog.antirez.com/post/redis-persistence-demystified.html - Redis reliable queues with Lua scripting
http://oldblog.antirez.com/post/250 - Ost (knihovna)
https://github.com/soveran/ost - NoSQL
https://en.wikipedia.org/wiki/NoSQL - Shard (database architecture)
https://en.wikipedia.org/wiki/Shard_%28database_architecture%29 - What is sharding and why is it important?
https://stackoverflow.com/questions/992988/what-is-sharding-and-why-is-it-important - What Is Sharding?
https://btcmanager.com/what-sharding/ - Redis clients
https://redis.io/clients - Category:Lua-scriptable software
https://en.wikipedia.org/wiki/Category:Lua-scriptable_software - Seriál Programovací jazyk Lua
https://www.root.cz/serialy/programovaci-jazyk-lua/ - Redis memory usage
http://nosql.mypopescu.com/post/1010844204/redis-memory-usage - Ukázka konfigurace Redisu pro lokální testování
https://github.com/tisnik/presentations/blob/master/redis/redis.conf - Resque
https://github.com/resque/resque - Nested transaction
https://en.wikipedia.org/wiki/Nested_transaction - Publish–subscribe pattern
https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern - Messaging pattern
https://en.wikipedia.org/wiki/Messaging_pattern - Using pipelining to speedup Redis queries
https://redis.io/topics/pipelining - Pub/Sub
https://redis.io/topics/pubsub - ZeroMQ distributed messaging
http://zeromq.org/ - Publish/Subscribe paradigm: Why must message classes not know about their subscribers?
https://stackoverflow.com/questions/2908872/publish-subscribe-paradigm-why-must-message-classes-not-know-about-their-subscr - Python & Redis PUB/SUB
https://medium.com/@johngrant/python-redis-pub-sub-6e26b483b3f7 - Message broker
https://en.wikipedia.org/wiki/Message_broker - RESP Arrays
https://redis.io/topics/protocol#array-reply - Redis Protocol specification
https://redis.io/topics/protocol - Redis Pub/Sub: Intro Guide
https://www.redisgreen.net/blog/pubsub-intro/ - Redis Pub/Sub: Howto Guide
https://www.redisgreen.net/blog/pubsub-howto/ - Comparing Publish-Subscribe Messaging and Message Queuing
https://dzone.com/articles/comparing-publish-subscribe-messaging-and-message - ActiveMQ
http://activemq.apache.org/activemq-website/index.html - Amazon Simple Queue Service
https://aws.amazon.com/sqs/ - Apache Kafka
https://kafka.apache.org/ - Cloud Pub/Sub
https://cloud.google.com/pubsub/ - Introduction to Redis Streams
https://redis.io/topics/streams-intro - Introduction to Redis streams with Python
http://charlesleifer.com/blog/redis-streams-with-python/ - glob (programming)
https://en.wikipedia.org/wiki/Glob_(programming) - Microservices: The Rise Of Kafka
https://movio.co/blog/microservices-rise-kafka/ - Building a Microservices Ecosystem with Kafka Streams and KSQL
https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/ - An introduction to Apache Kafka and microservices communication
https://medium.com/@ulymarins/an-introduction-to-apache-kafka-and-microservices-communication-bf0a0966d63 - kappa-architecture.com
http://milinda.pathirage.org/kappa-architecture.com/ - Questioning the Lambda Architecture
https://www.oreilly.com/ideas/questioning-the-lambda-architecture - Lambda architecture
https://en.wikipedia.org/wiki/Lambda_architecture - Kafka – ecosystem (Wiki)
https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem - The Kafka Ecosystem – Kafka Core, Kafka Streams, Kafka Connect, Kafka REST Proxy, and the Schema Registry
http://cloudurable.com/blog/kafka-ecosystem/index.html - A Kafka Operator for Kubernetes
https://github.com/krallistic/kafka-operator - Kafka Streams
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams - Kafka Streams
http://kafka.apache.org/documentation/streams/ - Kafka Streams (FAQ)
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Streams - What are some alternatives to Apache Kafka?
https://www.quora.com/What-are-some-alternatives-to-Apache-Kafka - What is the best alternative to Kafka?
https://www.slant.co/options/961/alternatives/~kafka-alternatives - A super quick comparison between Kafka and Message Queues
https://hackernoon.com/a-super-quick-comparison-between-kafka-and-message-queues-e69742d855a8?gi=e965191e72d0 - Kafka Queuing: Kafka as a Messaging System
https://dzone.com/articles/kafka-queuing-kafka-as-a-messaging-system