Obsah
1. Klasická úloha typu producent-konzument podruhé
2. Malá odbočka: producent-konzument realizovaný v jazyce Go gorutinami a kanálem
3. Chování Tria v případě, že existuje jen producent zpráv u kanálu s nulovou kapacitou
4. Detekce neexistence konzumenta implementovaná v jazyce Go
5. Chování kanálu s bufferem o zadané kapacitě
6. Zápis zprávy do kanálu synchronní operací send_nowait
7. Pokus o zápis do kanálu se specifikovanou maximální dobou čekání
8. Chování Tria v případě, že existuje jen konzument zpráv u kanálu s nulovou kapacitou
9. Úprava předchozího příkladu – náhrada smyčky async for za while
10. Detekce neexistence producenta implementovaná v jazyce Go
11. Synchronní příjem zpráv z kanálu (bez čekání)
12. Pokus o příjem zprávy z kanálu se specifikovanou maximální dobou čekání
13. Větší množství producentů a konzumentů sdílejících společný kanál
14. Realizace a vylepšení předchozího příkladu v jazyce Go
15. Obsah závěrečné části článku o knihovně Trio
16. Repositář s demonstračními příklady
1. Klasická úloha typu producent-konzument podruhé
V předchozím článku o knihovně Trio jsme si mj. ukázali i klasickou úlohu typu producent-konzument s jediným producentem a jediným konzumentem. Připomeňme si ve stručnosti, že celé řešení bylo založeno na korutinách (pochopitelně) a na kanálu, který je z pohledu programátora realizován dvojicí objektů – vstupem a výstupem. Producent použije „vysílací“ část kanálu a producent část „přijímací“:
send_channel, receive_channel = trio.open_memory_channel(0)
Jak producent, tak i konzument jsou vytvořeny společně s kanálem v bloku async with, čímž je zajištěno automatické uzavření všech prostředků. Tento demonstrační příklad lze později rozšířit, například tak, aby se použil kanál s kapacitou, větší množství producentů a/nebo větší množství konzumentů:
import trio async def producer(send_channel): for i in range(1, 10): message = f"message {i}" print(f"Producer: {message}") await send_channel.send(message) async def consumer(receive_channel): async for value in receive_channel: print(f"Consumer: received{value!r}") await trio.sleep(1) async def main(): async with trio.open_nursery() as nursery: send_channel, receive_channel = trio.open_memory_channel(0) nursery.start_soon(producer, send_channel) nursery.start_soon(consumer, receive_channel) trio.run(main)
Nyní se pokusme skript spustit, aby bylo možné sledovat jeho chování:
$ python3 trio_14.py Producer: message 1 Producer: message 2 Consumer: received'message 1' Producer: message 3 Consumer: received'message 2' Producer: message 4 Consumer: received'message 3' Producer: message 5 Consumer: received'message 4' Producer: message 6 Consumer: received'message 5' Producer: message 7 Consumer: received'message 6' Producer: message 8 Consumer: received'message 7' Consumer: received'message 8' Producer: message 9 Consumer: received'message 9'
V této chvíli je kanál vyprázdněn konzumentem, ovšem producent žádné další zprávy neposílá. Celý program se tedy „zasekne“, protože korutina s realizací konzumenta čeká na zprávy v nekonečné smyčce a prozatím nemáme k dispozici mechanismus pro detekování tohoto (nekonečného) čekání. Skript můžeme zastavit externě, například stlačením Ctrl+C, což vede k vyhození výjimky typu KeyboardInterrupt:
^CTraceback (most recent call last): File "trio_14.py", line 24, in <module> trio.run(main) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 1946, in run raise runner.main_task_outcome.error File "trio_14.py", line 21, in main nursery.start_soon(consumer, receive_channel) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 813, in __aexit__ raise combined_error_from_nursery File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 1184, in raise_cancel raise KeyboardInterrupt KeyboardInterrupt
2. Malá odbočka: producent-konzument realizovaný v jazyce Go gorutinami a kanálem
Pro zajímavost a taktéž pro ukázku rozdílného chování v runtime se podívejme na realizaci též klasické úlohy typu producent-konzument, tentokrát ovšem v programovacím jazyku Go. Namísto korutin budou použity gorutiny (které běží nejenom souběžně, ale mohou běžet i paralelně) a pro komunikaci mezi producentem a konzumentem se použije kanál. Ten je v jazyce Go reprezentován jediným objektem, do kterého je možné číst i zapisovat. Jedná se sice o univerzálnější řešení, které je ale méně přehledné (lze ovšem alespoň specifikovat směr toku dat při definici typu kanálu v hlavičce funkce). Navíc je nutné nějakým způsobem čekat na dokončení běhu gorutin. Prozatím zcela primitivním způsobem budeme číst z dalšího kanálu nazvaného done, čímž hlavní gorutinu de facto zastavíme:
package main import ( "fmt" ) func producer(channel chan string) { for i := 1; i <= 10; i++ { message := fmt.Sprintf("message %d", i) fmt.Printf("Producer: %s\n", message) channel <- message } } func consumer(channel chan string) { for { message := <-channel fmt.Printf("Consumer: received %s\n", message) } } func main() { var channel = make(chan string) var done = make(chan interface{}) go producer(channel) go consumer(channel) <-done }
Po spuštění tohoto příkladu dojde k zajímavé situaci, protože je (zcela korektně) detekován stav, kdy žádná gorutina neběží, protože čeká na zprávy z nějakého kanálu:
- Gorutina producenta byla ukončena
- Gorutina konzumenta čeká na zprávu poslanou někým do kanáluchannel
- Hlavní gorutina čeká na zprávu poslanou někým do kanáludone
Toto čekání je korektně detekováno runtimem jazyka Go, což je patrné z následujících zpráv získaných po spuštění demonstračního příkladu:
Producer: message 1 Producer: message 2 Consumer: received message 1 Consumer: received message 2 Producer: message 3 Producer: message 4 Consumer: received message 3 Consumer: received message 4 Producer: message 5 Producer: message 6 Consumer: received message 5 Consumer: received message 6 Producer: message 7 Producer: message 8 Consumer: received message 7 Consumer: received message 8 Producer: message 9 Producer: message 10 Consumer: received message 9 Consumer: received message 10 fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan receive]: main.main() /home/ptisnovs/src/most-popular-python-libs/concurrent/producer_consumer.go:27 +0xc5 goroutine 19 [chan receive]: main.consumer(0x0?) /home/ptisnovs/src/most-popular-python-libs/concurrent/producer_consumer.go:17 +0x29 created by main.main /home/ptisnovs/src/most-popular-python-libs/concurrent/producer_consumer.go:26 +0xb8 exit status 2
Tomuto chování lze v reálných programech předejít, například korektním použitím kanálu, který se většinou jmenuje done, tedy stejně, jako v našem kódu. Producent jednoduše zápisem jakékoli hodnoty do tohoto kanálu oznámí, že jeho práce je dokončena, čímž dojde i k dokončení hlavní gorutiny, tedy funkce main. Úprava zdrojového kódu je jednoduchá (viz podtržené části):
package main import ( "fmt" ) func producer(channel chan string, done chan interface{}) { for i := 1; i <= 10; i++ { message := fmt.Sprintf("message %d", i) fmt.Printf("Producer: %s\n", message) channel <- message } done <- struct{}{} } func consumer(channel chan string) { for { message := <-channel fmt.Printf("Consumer: received %s\n", message) } } func main() { var channel = make(chan string) var done = make(chan interface{}) go producer(channel, done) go consumer(channel) <-done }
Po spuštění takto upraveného příkladu již bude program ukončen bez chyby, protože po opuštění hlavní gorutiny se ukončí i ostatní gorutiny, tedy včetně gorutiny konzumenta:
Producer: message 1 Producer: message 2 Consumer: received message 1 Consumer: received message 2 Producer: message 3 Producer: message 4 Consumer: received message 3 Consumer: received message 4 Producer: message 5 Producer: message 6 Consumer: received message 5 Consumer: received message 6 Producer: message 7 Producer: message 8 Consumer: received message 7 Consumer: received message 8 Producer: message 9 Producer: message 10 Consumer: received message 9 Consumer: received message 10
3. Chování Tria v případě, že existuje jen producent zpráv u kanálu s nulovou kapacitou
Vraťme se nyní k Pythonu a ke knihovně Trio. Ukážeme si, jak se bude chovat program, který v korutině zapisuje zprávy do kanálu s nulovou kapacitou, přičemž tyto zapisované zprávy nejsou nikde čteny (neexistuje tedy žádný běžící konzument). Realizace takového programu může vypadat následovně:
import trio async def producer(send_channel): for i in range(1, 1000): message = f"message {i}" print(f"Producer: {message}") await send_channel.send(message) async def main(): async with trio.open_nursery() as nursery: send_channel, receive_channel = trio.open_memory_channel(0) nursery.start_soon(producer, send_channel) trio.run(main)
Tento program po svém spuštění pouze vypíše informaci o tom, že do kanálu zapsal první zprávu. Poté se „zasekne“, protože byl kanál naplněn a žádná další korutina z něj nečte:
$ python3 trio_17_no_consumer.py Producer: message 1
Tato skutečnost prozatím není Triem detekována, takže opět budeme muset použít Ctrl+C:
^CTraceback (most recent call last): File "trio_17_no_consumer.py", line 17, in <module> trio.run(main) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 1946, in run raise runner.main_task_outcome.error File "trio_17_no_consumer.py", line 14, in main nursery.start_soon(producer, send_channel) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 813, in __aexit__ raise combined_error_from_nursery File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 1184, in raise_cancel raise KeyboardInterrupt KeyboardInterrupt
4. Detekce neexistence konzumenta implementovaná v jazyce Go
Jak se bude chovat obdobný příklad, ovšem napsaný v Go, v němž se použijí gorutiny (souběžné a běžící paralelně) namísto korutin (souběžné)? Můžeme si to snadno otestovat:
package main import ( "fmt" ) func producer(channel chan string) { for i := 1; i <= 10; i++ { message := fmt.Sprintf("message %d", i) fmt.Printf("Producer: %s\n", message) channel <- message } } func main() { var channel = make(chan string) var done = make(chan interface{}) go producer(channel) <-done }
Po produkci první zprávy se obě gorutiny zastaví – hlavní gorutina totiž čeká na zprávu v kanálu done, producent naopak čeká na uvolnění kanálu channel. Tento stav je detekován runtime jazyka Go:
Producer: message 1 fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan receive]: main.main() /home/ptisnovs/src/most-popular-python-libs/concurrent/producer_only.go:19 +0x87 goroutine 6 [chan send]: main.producer(0x0?) /home/ptisnovs/src/most-popular-python-libs/concurrent/producer_only.go:11 +0xf0 created by main.main /home/ptisnovs/src/most-popular-python-libs/concurrent/producer_only.go:18 +0x7b exit status 2
5. Chování kanálu s bufferem o zadané kapacitě
Jak již víme z předchozího článku, lze v knihovně Trio vytvořit kanál s bufferem o zadané kapacitě. Zápis do takového kanálu producentem je zpočátku neblokující, protože do kanálu je možné zapsat n prvků bez jejich čtení, aniž by zápis byl blokující operací. K zablokování kanálu dojde až při pokusu o zápis další (n+11) zprávy, pochopitelně za předpokladu, že mezitím nedošlo k přečtení zprávy (zpráv) nějakým konzumentem. V následujícím demonstračním příkladu je vytvořen kanál s bufferem o kapacitě deseti prvků a (opět) se nepoužívá konzument:
import trio async def producer(send_channel): for i in range(1, 1000): message = f"message {i}" print(f"Producer: {message}") await send_channel.send(message) async def main(): async with trio.open_nursery() as nursery: send_channel, receive_channel = trio.open_memory_channel(10) nursery.start_soon(producer, send_channel) trio.run(main)
Po spuštění se do kanálu zapíše deset prvních zpráv a k zablokování korutiny s producentem dojde až při zápisu jedenácté zprávy:
$ python3 trio_18_no_consumer_capacity.py Producer: message 1 Producer: message 2 Producer: message 3 Producer: message 4 Producer: message 5 Producer: message 6 Producer: message 7 Producer: message 8 Producer: message 9 Producer: message 10 Producer: message 11
K ukončení skriptu opět musíme použít externí signál, například Ctrl+C:
^CTraceback (most recent call last): File "trio_18_no_consumer_capacity.py", line 17, in <module> trio.run(main) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 1946, in run raise runner.main_task_outcome.error File "trio_18_no_consumer_capacity.py", line 14, in main nursery.start_soon(producer, send_channel) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 813, in __aexit__ raise combined_error_from_nursery File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 1184, in raise_cancel raise KeyboardInterrupt KeyboardInterrupt
6. Zápis zprávy do kanálu synchronní operací send_nowait
Alternativně je možné se pokusit zprávu poslat do kanálu synchronní operací nazvanou send_nowait. Název této operace hovoří jasně – zpráva se buď ihned zapíše (předpokládá se tedy, že kanál není blokován nebo má volnou kapacitu) nebo dojde k vyhození výjimky. Předchozí příklad tedy můžeme jednoduše upravit a použít tuto novou operaci:
import trio async def producer(send_channel): for i in range(1, 1000): message = f"message {i}" print(f"Producer: {message}") send_channel.send_nowait(message) async def main(): async with trio.open_nursery() as nursery: send_channel, receive_channel = trio.open_memory_channel(10) nursery.start_soon(producer, send_channel) trio.run(main)
Sledujme chování takto upraveného příkladu po jeho spuštění:
Producer: message 1 Producer: message 2 Producer: message 3 Producer: message 4 Producer: message 5 Producer: message 6 Producer: message 7 Producer: message 8 Producer: message 9 Producer: message 10 Producer: message 11 Traceback (most recent call last): File "trio_19_no_consumer_no_wait.py", line 17, in <module> trio.run(main) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 1946, in run raise runner.main_task_outcome.error File "trio_19_no_consumer_no_wait.py", line 14, in main nursery.start_soon(producer, send_channel) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 813, in __aexit__ raise combined_error_from_nursery File "trio_19_no_consumer_no_wait.py", line 8, in producer send_channel.send_nowait(message) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_ki.py", line 159, in wrapper return fn(*args, **kwargs) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_channel.py", line 150, in send_nowait raise trio.WouldBlock trio.WouldBlock
Vidíme, že namísto čekání na (neexistujícího) konzumenta zpráv dojde k vyhození výjimky typu WouldBlock, která jasně napovídá, že operace poslání zprávy by byla v dané chvíli blokující.
7. Pokus o zápis do kanálu se specifikovanou maximální dobou čekání
V některých aplikacích může být výhodnější namísto nekonečného čekání na zápis do kanálu použít operaci zápisu se specifikovanou maximální dobou čekání. Tuto operaci sice nelze zapsat přímo (neexistuje zde metoda typu send_with_timeout), ale knihovna Trio nám nabízí další programovou konstrukci založenou na bloku with s manažerem kontextu fail_after, kterému se předá maximální doba čekání na operaci (či více operací) umístěných v tomto bloku. Jedná se o zajímavý koncept, kde jedinou konstrukcí můžeme nahradit mnoho variant funkcí s „timeoutem“. Podívejme se nyní na příklad, v němž se na dokončení poslání zprávy do kanálu čeká maximálně jednu sekundu. Po této době dojde k vyhození výjimky, kterou je pochopitelně možné v případě potřeby zachytit a vhodným způsobem na ni reagovat:
import trio async def producer(send_channel): for i in range(1, 1000): message = f"message {i}" print(f"Producer: {message}") with trio.fail_after(1): await send_channel.send(message) async def main(): async with trio.open_nursery() as nursery: send_channel, receive_channel = trio.open_memory_channel(10) nursery.start_soon(producer, send_channel) trio.run(main)
Po spuštění skriptu se podle očekávání nejdříve do kanálu (resp. do jeho bufferu) zapíše deset zpráv:
Producer: message 1 Producer: message 2 Producer: message 3 Producer: message 4 Producer: message 5 Producer: message 6 Producer: message 7 Producer: message 8 Producer: message 9 Producer: message 10
Pokus o zápis jedenácté zprávy skončí s chybou po uběhnutí jedné sekundy:
Producer: message 11 Traceback (most recent call last): File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_timeouts.py", line 106, in fail_at yield scope File "trio_20_no_consumer_fail_after.py", line 9, in producer await send_channel.send(123) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_channel.py", line 178, in send await trio.lowlevel.wait_task_rescheduled(abort_fn) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap() File "/home/ptisnovs/.local/lib/python3.8/site-packages/outcome/_impl.py", line 138, in unwrap raise captured_error File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 1173, in raise_cancel raise Cancelled._create() trio.Cancelled: Cancelled
A vznikne ještě jedna výjimka:
During handling of the above exception, another exception occurred: Traceback (most recent call last): File "trio_20_no_consumer_fail_after.py", line 18, in <module> trio.run(main) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 1946, in run raise runner.main_task_outcome.error File "trio_20_no_consumer_fail_after.py", line 15, in main nursery.start_soon(producer, send_channel) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 813, in __aexit__ raise combined_error_from_nursery File "trio_20_no_consumer_fail_after.py", line 9, in producer await send_channel.send(123) File "/usr/lib/python3.8/contextlib.py", line 131, in __exit__ self.gen.throw(type, value, traceback) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_timeouts.py", line 108, in fail_at raise TooSlowError trio.TooSlowError
8. Chování Tria v případě, že existuje jen konzument zpráv u kanálu s nulovou kapacitou
Opět si upravme náš demonstrační příklad. Tentokrát ovšem takovým způsobem, že bude existovat pouze konzument zpráv, nikoli jejich producent. Prozatím použijeme kanál s nulovou kapacitou. Upravená verze příkladu může vypadat následovně:
import trio async def consumer(receive_channel): async for value in receive_channel: print(f"Consumer: received{value!r}") await trio.sleep(1) async def main(): async with trio.open_nursery() as nursery: send_channel, receive_channel = trio.open_memory_channel(0) nursery.start_soon(consumer, receive_channel) trio.run(main)
Po spuštění se – podle očekávání – nezobrazí žádná zpráva, protože konzument čeká ve smyčce async for a vlastně vůbec nedojde k dalším příkazům uvedeným uvnitř této smyčky. K ukončení tohoto skriptu opět použijeme oblíbenou zkratku Ctrl+C:
^CTraceback (most recent call last): File "trio_21_no_producer.py", line 16, in <module> trio.run(main) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 1946, in run raise runner.main_task_outcome.error File "trio_21_no_producer.py", line 13, in main nursery.start_soon(consumer, receive_channel) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 813, in __aexit__ raise combined_error_from_nursery File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 1184, in raise_cancel raise KeyboardInterrupt KeyboardInterrupt
9. Úprava předchozího příkladu – náhrada smyčky async for za while
Předchozí příklad se smyčkou async for byl sice zapsán (pro Trio) idiomaticky, ovšem nevýhodou je, že async for pro kontinuální čtení zpráv z kanálu se těžko přímočaře nahrazuje za jinou operaci (čtení s čekáním, čtení bez blokace atd.). Proto provedeme náhradu – použijeme nekonečnou smyčku while a zprávy budeme z kanálu číst metodou receive. Jedná se o korutinu a tudíž musíme použít await:
import trio async def consumer(receive_channel): while True: value = await receive_channel.receive() print(f"Consumer: received{value!r}") await trio.sleep(1) async def main(): async with trio.open_nursery() as nursery: send_channel, receive_channel = trio.open_memory_channel(0) nursery.start_soon(consumer, receive_channel) trio.run(main)
Chování je přitom totožné, jako tomu bylo i v předchozím příkladu – blokující čtení je nutné ukončit pomocí Ctrl+C:
^CTraceback (most recent call last): File "trio_22_no_producer_B.py", line 17, in <module> trio.run(main) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 1946, in run raise runner.main_task_outcome.error File "trio_22_no_producer_B.py", line 14, in main nursery.start_soon(consumer, receive_channel) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 813, in __aexit__ raise combined_error_from_nursery File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 1184, in raise_cancel raise KeyboardInterrupt KeyboardInterrupt
10. Detekce neexistence producenta implementovaná v jazyce Go
Opět si otestujme, jak se bude chovat obdobný příklad, ovšem napsaný v jazyce Go, v němž se použijí gorutiny (souběžné a běžící paralelně) namísto korutin (souběžné):
package main import ( "fmt" ) func consumer(channel chan string) { for { message := <-channel fmt.Printf("Consumer: received %s\n", message) } } func main() { var channel = make(chan string) var done = make(chan interface{}) go consumer(channel) <-done }
Runtime jazyka Go opět zjistí, že žádná z gorutin (ani hlavní gorutina) nemůže pokračovat (je blokována čtením z kanálu) a proto je aplikace ukončena běhovou chybou:
$ go run consumer_only.go fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan receive]: main.main() /home/ptisnovs/src/most-popular-python-libs/concurrent/consumer_only.go:18 +0x87 goroutine 6 [chan receive]: main.consumer(0x0?) /home/ptisnovs/src/most-popular-python-libs/concurrent/consumer_only.go:9 +0x29 created by main.main /home/ptisnovs/src/most-popular-python-libs/concurrent/consumer_only.go:17 +0x7b exit status 2
11. Synchronní příjem zpráv z kanálu (bez čekání)
V případě, že je nutné zajistit okamžité příjetí zprávy z kanálu (bez jakéhokoli čekání), použije se namísto korutiny channel.receive běžná metoda nazvaná channel.receive_nowait. Tato metoda buď ihned vrátí zprávu získanou z kanálu (samozřejmě za předpokladu, že kanál zprávu obsahuje) nebo se vyhodí výjimka – nikdy se tedy nebude čekat na příjem zprávy. Předchozí demonstrační příklad tedy můžeme triviálním způsobem upravit do následující podoby:
import trio async def consumer(receive_channel): while True: value = receive_channel.receive_nowait() print(f"Consumer: received{value!r}") await trio.sleep(1) async def main(): async with trio.open_nursery() as nursery: send_channel, receive_channel = trio.open_memory_channel(0) nursery.start_soon(consumer, receive_channel) trio.run(main)
Chování příkladu bez producenta zpráv bude následující – při prvním pokusu o přečtení zprávy dojde k vyhození výjimky a celý program je následně ukončen, protože tato výjimka není zachycena:
$ python3 trio_23_no_producer_no_wait.py Traceback (most recent call last): File "trio_23_no_producer_no_wait.py", line 17, in <module> trio.run(main) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 1946, in run raise runner.main_task_outcome.error File "trio_23_no_producer_no_wait.py", line 14, in main nursery.start_soon(consumer, receive_channel) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 813, in __aexit__ raise combined_error_from_nursery File "trio_23_no_producer_no_wait.py", line 6, in consumer value = receive_channel.receive_nowait() File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_ki.py", line 159, in wrapper return fn(*args, **kwargs) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_channel.py", line 284, in receive_nowait raise trio.WouldBlock trio.WouldBlock
12. Pokus o příjem zprávy z kanálu se specifikovanou maximální dobou čekání
Podobně, jako jsme v sedmé kapitole vytvořili producenta, u něhož se čekalo na uvolnění kanálu při posílání zprávy pouze po specifikovaný časový úsek (timeout), je možné stejný způsob použít i pro konzumenta zpráv pro zajištění, že se na příjem zprávy bude čekat po maximální zadanou dobu. Opět se použije manažer kontextu vytvořený pomocí trio.fail_after:
import trio async def consumer(receive_channel): while True: with trio.fail_after(1): value = await receive_channel.receive() print(f"Consumer: received{value!r}") await trio.sleep(1) async def main(): async with trio.open_nursery() as nursery: send_channel, receive_channel = trio.open_memory_channel(0) nursery.start_soon(consumer, receive_channel) trio.run(main)
S výsledkem:
$ python3 trio_24_no_producer_fail_after.py Traceback (most recent call last): File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_timeouts.py", line 106, in fail_at yield scope File "trio_24_no_producer_fail_after.py", line 7, in consumer value = await receive_channel.receive() File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_channel.py", line 314, in receive return await trio.lowlevel.wait_task_rescheduled(abort_fn) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap() File "/home/ptisnovs/.local/lib/python3.8/site-packages/outcome/_impl.py", line 138, in unwrap raise captured_error File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 1173, in raise_cancel raise Cancelled._create() trio.Cancelled: Cancelled During handling of the above exception, another exception occurred: Traceback (most recent call last): File "trio_24_no_producer_fail_after.py", line 18, in trio.run(main) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 1946, in run raise runner.main_task_outcome.error File "trio_24_no_producer_fail_after.py", line 15, in main nursery.start_soon(consumer, receive_channel) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_core/_run.py", line 813, in __aexit__ raise combined_error_from_nursery File "trio_24_no_producer_fail_after.py", line 9, in consumer await trio.sleep(1) File "/usr/lib/python3.8/contextlib.py", line 131, in __exit__ self.gen.throw(type, value, traceback) File "/home/ptisnovs/.local/lib/python3.8/site-packages/trio/_timeouts.py", line 108, in fail_at raise TooSlowError trio.TooSlowError
13. Větší množství producentů a konzumentů sdílejících společný kanál
Vykoušejme si nyní vytvořit nepatrně komplikovanější demonstrační příklad, v němž se namísto jediného producenta zpráv a jediného konzumenta vyskytuje obecný počet producentů a konzumentů. Pro komunikaci mezi všemi korutinami se (prozatím) používá pouze jediný kanál se dvěma konci (vysílací a přijímací část), přičemž každý konec je sdílen několika korutinami běžícími souběžně. Díky použití await je zajištěn koordinovaný přístup ke kanálu:
import trio num_producers = 5 num_consumers = 20 async def producer(id, send_channel): for i in range(1, 10): message = f"message {i}" print(f"Producer #{id}: {message}") await send_channel.send(message) async def consumer(id, receive_channel): async for value in receive_channel: print(f"Consumer #{id}: received{value!r}") await trio.sleep(1) async def main(): async with trio.open_nursery() as nursery: send_channel, receive_channel = trio.open_memory_channel(0) for id in range(num_producers): nursery.start_soon(producer, id, send_channel) for id in range(num_consumers): nursery.start_soon(consumer, id, receive_channel) trio.run(main)
Zdálo by se, že úkol byl splněn, ovšem ve skutečnosti chybí doplnit celou řadu praktických kroků:
- Zajištění, že všichni konzumenti skončí ve chvíli, kdy je již zřejmé, že žádná zpráva nedojde.
- Zajištění korektní funkčnosti i při vzniku výjimky (nepatrná chyba v jediné korutině by neměla vést k pádu celého programu).
- Zajištění předčasného ukončení korutiny či korutin.
S řešeními těchto problémů se seznámíme v závěrečném článku o knihovně Trio.
14. Realizace a vylepšení předchozího příkladu v jazyce Go
Pro zajímavost se ještě podívejme, jak by byl podobný příklad řešen v programovacím jazyce Go. Zde ovšem došlo k vylepšení, protože kanál done je zde použit pro čekání na dokončení všech producentů – každý producent musí do kanálu zapsat jedinou hodnotu a v hlavní gorutině se čeká na zápis tolika hodnot, kolik existuje producentů:
package main import ( "fmt" ) const ( num_producers = 5 num_consumers = 20 ) func producer(id int, channel chan string, done chan interface{}) { for i := 1; i <= 10; i++ { message := fmt.Sprintf("message %d", i) fmt.Printf("Producer %d: %s\n", id, message) channel <- message } done <- struct{}{} } func consumer(id int, channel chan string) { for { message := <-channel fmt.Printf("Consumer %d: received %s\n", id, message) } } func main() { var channel = make(chan string) var done = make(chan interface{}) for i := 0; i < num_producers; i++ { go producer(i, channel, done) } for i := 0; i < num_consumers; i++ { go consumer(i, channel) } for i := 0; i < num_producers; i++ { <-done } }
Podívejme se na důkaz, že jsou skutečně zpracovány všechny zprávy:
Producer 0: message 1 Producer 0: message 2 Producer 0: message 3 Consumer 7: received message 2 Consumer 7: received message 3 Producer 2: message 1 Producer 2: message 2 Consumer 19: received message 1 Producer 1: message 1 Producer 1: message 2 Producer 2: message 3 Producer 0: message 4 Consumer 3: received message 3 Consumer 7: received message 1 Producer 3: message 1 Producer 3: message 2 Producer 3: message 3 Producer 3: message 4 Producer 3: message 5 Producer 3: message 6 Producer 3: message 7 Producer 3: message 8 Producer 3: message 9 Producer 3: message 10 Consumer 2: received message 2 Consumer 5: received message 1 Consumer 6: received message 2 Consumer 0: received message 2 Consumer 14: received message 3 Producer 2: message 4 Producer 2: message 5 Producer 2: message 6 Producer 2: message 7 Producer 2: message 8 Producer 2: message 9 Producer 2: message 10 Consumer 2: received message 10 Producer 4: message 1 Producer 4: message 2 Producer 4: message 3 Producer 4: message 4 Producer 4: message 5 Producer 4: message 6 Consumer 15: received message 4 Consumer 15: received message 6 Consumer 17: received message 5 Consumer 9: received message 5 Consumer 19: received message 6 Consumer 4: received message 4 Consumer 18: received message 7 Consumer 10: received message 6 Consumer 3: received message 8 Consumer 1: received message 1 Consumer 7: received message 9 Consumer 11: received message 7 Consumer 12: received message 8 Consumer 5: received message 1 Producer 1: message 3 Producer 1: message 4 Producer 1: message 5 Producer 1: message 6 Producer 1: message 7 Producer 1: message 8 Producer 1: message 9 Producer 1: message 10 Consumer 10: received message 9 Producer 4: message 7 Producer 4: message 8 Producer 4: message 9 Producer 4: message 10 Consumer 9: received message 5 Consumer 4: received message 7 Consumer 16: received message 10 Consumer 17: received message 4 Consumer 0: received message 3 Consumer 19: received message 6 Consumer 11: received message 9 Consumer 12: received message 10 Consumer 8: received message 4 Consumer 2: received message 5 Producer 0: message 5 Producer 0: message 6 Producer 0: message 7 Producer 0: message 8 Producer 0: message 9 Producer 0: message 10
15. Obsah závěrečné části článku o knihovně Trio
V předchozích dvou kapitolách jsme částečně narazili na zajímavý problém – jak zajistit kooperaci mezi větším množstvím korutin, tedy jak mj. zajistit, že se počká na jejich dokončení nebo jak naopak „donutit“ korutiny k jejich předčasnému ukončení. Nejedná se ani zdaleka o akademické problémy, ale naopak o prakticky každodenní úlohy, s nimiž se při tvorbě aplikací se souběžně spuštěným kódem velmi často setkáme a musíme je umět řešit. Zde již pouhé abstraktní teoretické koncepty typu korutina+kanál nemusí dostačovat, popř. nemusí být dostatečně přehledné. Proto si příště (což bude s velkou pravděpodobností již poslední článek na dané téma) ukážeme, jak se takové problémy dají s knihovnou Trio poměrně uspokojivě a především idiomaticky vyřešit (idiomaticky proto, aby již ze stylu zápisu bylo patrné, jaký problém je řešen – což lze považovat za prazáklad návrhových vzorů).
16. Repositář s demonstračními příklady
Zdrojové kódy všech prozatím popsaných demonstračních příkladů určených pro programovací jazyk Python 3 byly uloženy do Git repositáře dostupného na adrese https://github.com/tisnik/most-popular-python-libs. V případě, že nebudete chtít klonovat celý repositář (ten je ovšem stále velmi malý, dnes má velikost zhruba několik desítek kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce:
17. Odkazy na Internetu
- Dokumentace Pythonu: balíček queue
https://docs.python.org/3/library/queue.html - Dokumentace Pythonu: balíček threading
https://docs.python.org/3/library/threading.html? - Dokumentace Pythonu: balíček multiprocessing
https://docs.python.org/3/library/multiprocessing.html - Dokumentace Pythonu: balíček asyncio
https://docs.python.org/3/library/asyncio.html - Synchronization Primitives
https://docs.python.org/3/library/asyncio-sync.html - Coroutines
https://docs.python.org/3/library/asyncio-task.html - Queues
https://docs.python.org/3/library/asyncio-queue.html - python-csp
https://python-csp.readthedocs.io/en/latest/ - TrellisSTM
http://peak.telecommunity.com/DevCenter/TrellisSTM - Python Multithreading and Multiprocessing Tutorial
https://www.toptal.com/python/beginners-guide-to-concurrency-and-parallelism-in-python - ThreadPoolExecutor
https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor - ProcessPoolExecutor
https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor - asyncio — Asynchronous I/O
https://docs.python.org/3/library/asyncio.html - Threads vs Async: Has Asyncio Solved Concurrency?
https://www.youtube.com/watch?v=NZq31Sg8R9E - Python Asynchronous Programming – AsyncIO & Async/Await
https://www.youtube.com/watch?v=t5Bo1Je9EmE - AsyncIO & Asynchronous Programming in Python
https://www.youtube.com/watch?v=6RbJYN7SoRs - Coroutines and Tasks
https://docs.python.org/3/library/asyncio-task.html - Python async/await Tutorial
https://stackabuse.com/python-async-await-tutorial/ - Demystifying Python's Async and Await Keywords
https://www.youtube.com/watch?v=F19R_M4Nay4 - Curio
https://curio.readthedocs.io/en/latest/ - Trio: a friendly Python library for async concurrency and I/O
https://trio.readthedocs.io/en/stable/ - Curio – A Tutorial Introduction
https://curio.readthedocs.io/en/latest/tutorial.html - unsync
https://github.com/alex-sherman/unsync - David Beazley – Die Threads
https://www.youtube.com/watch?v=xOyJiN3yGfU - Miguel Grinberg Asynchronous Python for the Complete Beginner PyCon 2017
https://www.youtube.com/watch?v=iG6fr81×HKA - Build Your Own Async
https://www.youtube.com/watch?v=Y4Gt3Xjd7G8 - The Other Async (Threads + Async = ❤️)
https://www.youtube.com/watch?v=x1ndXuw7S0s - Fear and Awaiting in Async: A Savage Journey to the Heart of the Coroutine Dream
https://www.youtube.com/watch?v=E-1Y4kSsAFc - Keynote David Beazley – Topics of Interest (Python Asyncio)
https://www.youtube.com/watch?v=ZzfHjytDceU - David Beazley – Python Concurrency From the Ground Up: LIVE! – PyCon 2015
https://www.youtube.com/watch?v=MCs5OvhV9S4 - Python Async basics video (100 million HTTP requests)
https://www.youtube.com/watch?v=Mj-Pyg4gsPs - Nathaniel J. Smith – Trio: Async concurrency for mere mortals – PyCon 2018
https://www.youtube.com/watch?v=oLkfnc_UMcE - Timeouts and cancellation for humans
https://vorpus.org/blog/timeouts-and-cancellation-for-humans/ - What is the core difference between asyncio and trio?
https://stackoverflow.com/questions/49482969/what-is-the-core-difference-between-asyncio-and-trio - Some thoughts on asynchronous API design in a post-async/await world
https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/#the-curious-effectiveness-of-curio - Companion post for my PyCon 2018 talk on async concurrency using Trio
https://vorpus.org/blog/companion-post-for-my-pycon-2018-talk-on-async-concurrency-using-trio/ - Control-C handling in Python and Trio
https://vorpus.org/blog/control-c-handling-in-python-and-trio/ - Context Managers and Python's with Statement
https://realpython.com/python-with-statement/ - Notes on structured concurrency, or: Go statement considered harmful
https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/ - Structured concurrency explained – Part 1: Introduction
https://www.thedevtavern.com/blog/posts/structured-concurrency-explained/ - Structured concurrency
https://en.wikipedia.org/wiki/Structured_concurrency - Structured Concurrency
https://250bpm.com/blog:71/ - Python and Trio, where producers are consumers, how to exit gracefully when the job is done?
https://stackoverflow.com/questions/65304775/python-and-trio-where-producers-are-consumers-how-to-exit-gracefully-when-the