Obsah
1. Souběžné a paralelně běžící úlohy naprogramované v Pythonu – Curio a Trio
2. async a await: „strukturované“ řešení problematiky souběžnosti
3. Instalace knihoven Curio a Trio
4. Základní konstrukce nabízené knihovnou Curio
5. Předání parametrů synchronně či asynchronně volané korutině
6. Chování programu při spuštění několika korutin funkcí curio.run
7. Asynchronní spuštění korutin
8. Sledování činnosti korutin s využitím monitoringu
9. Připojení k běžícímu programu a sledování činnosti korutin
10. Komunikace mezi korutinami s využitím fronty
12. Vyřešení úlohy typu producent-konzument
13. Korutiny vracející hodnoty
14. Čekání na dokončení korutiny a timeout
15. Reakce v korutině na timeout
16. Reakce na vyhozenou výjimku v korutině
18. Způsob překladu konstrukcí s async a await do bajtkódu
19. Repositář s demonstračními příklady
1. Souběžné a paralelně běžící úlohy naprogramované v Pythonu – Curio a Trio
„Everything runs parallel except your code“
Felix Geisendörfer.
V dnešním článku, který navazuje na předchozí dvojici článků o řešení paralelních či souběžně běžících úloh v Pythonu [1] a [2], si ve stručnosti popíšeme knihovny nazvané Curio a Trio. Jedná se o knihovny postavené nad relativní novinkou v Pythonu – klíčovými slovy async a await a jejich cílem je zjednodušit tvorbu aplikací, v nichž jednotlivé části mohou běžet souběžně. Typicky se jedná o části s I/O operacemi (včetně komunikací po síti) popř. o GUI aplikace. Knihovny Curio a Trio do značné míry nahrazují standardní knihovnu asyncio, s jejímiž základními vlastnostmi jsme se seznámili minule.
2. async a await: „strukturované“ řešení problematiky souběžnosti
Před vlastním popisem knihoven Curio a Trio je vhodné si připomenout, z jakého důvodu tyto knihovny vlastně vůbec vznikly a proč se do poměrně velkého množství mainstreamových jazyků postupně dostávají klíčová slova async a await. V této přednášce (věnované nejenom Triu) byla uvedena velmi dobrá analogie, kterou se zde pokusím reprodukovat.
Mnohé starší programovací jazyky, mezi nimiž figuruje zejména FORTRAN, ale i klasický BASIC, obsahovaly nestrukturované konstrukce určené pro řízení běhu programů. Tyto konstrukce byly založeny na číslech řádků popř. na návěštích (labels), na které se bylo možné odkazovat například příkazem GOTO. To se ovšem ukázalo být velmi nevýhodné při konstrukci složitějších algoritmů, protože GOTO do určité míry znemožňovalo používání lokálních proměnných, ale nepřímo taktéž zpracování výjimek atd. Řešení se ovšem našlo a spočívalo v náhradě globálních skoků za funkce/procedury a konstrukce IF-THEN-ELSE, REPEAT-UNTIL a WHILE-END (ať již se v konkrétním programovacím jazyce zapisovaly různým způsobem).
Podobná situace ovšem panovala i v oblasti algoritmů, v nichž měly určité části běžet souběžně. Tento problém se typicky řešil (a vlastně doposud řeší) knihovními funkcemi, popř. alespoň zavedením nějakých synchronizačních mechanismů do jazyka (klíčové slovo synchronized v Javě). To znamenalo nejenom složitý přechod mezi jednotlivými jazyky, ale mj. i podobné problémy, jako GOTO – například velmi problematické řešení výjimek, které byly vyhozeny v souběžně vykonávaném kódu. Ale nejenom to – problematické bylo vlastně i řízení tohoto kódu (tedy klasický problém, jak zastavit vlákno atd.). I současné řešení tohoto problému je založeno na nových řídicích strukturách, zde konkrétně na ASYNC a AWAIT. Takže se možná dočkáme článku s titulkem „thread.new() considered harmful“ :-)
V analogiích však můžeme pokračovat. Například automatický správce paměti (garbage collector) sice interně musí alokovat a dealokovat paměť s využitím technologií nabízených operačním systémem, ale z pohledu aplikačního programátora je situace mnohem jednodušší. Podobně async/await interně využívá paralelního běhu I/O operací, opět s využitím technologií nabízených operačním systémem, ale (opět) je situace z pohledu aplikačního programátora mnohem přehlednější a méně náchylná k chybám.
>>> def foo(x): ... print("foo") ... >>> async def bar(y): ... print("bar") ... >>> foo(1) foo >>> bar(2) <coroutine object bar at 0x7f9e5b1786c0>
3. Instalace knihoven Curio a Trio
Obě knihovny, jimiž se budeme v dnešním článku zabývat, je nejprve nutné nainstalovat, protože na rozdíl od minule popsané knihovny asyncio nejsou součástí standardních balíčků Pythonu. Jak curio tak i trio jsou dostupné na PyPI, takže je jejich instalace velmi snadná. Pro tento účel můžeme použít například nástroj pip.
Instalace knihovny curio (povšimněte si, že nemá žádné další závislosti):
$ pip3 install --user curio Collecting curio Downloading curio-1.5.tar.gz (234 kB) |████████████████████████████████| 234 kB 1.4 MB/s Preparing metadata (setup.py) ... done Building wheels for collected packages: curio Building wheel for curio (setup.py) ... done Created wheel for curio: filename=curio-1.5-py3-none-any.whl size=63766 sha256=042c3ba7cb8086cb4778a131bf73d9c35f82f12ff8e94e8d8c2fa673d683fcd1 Stored in directory: /home/ptisnovs/.cache/pip/wheels/93/ac/0e/8cb82ba7d5f527f27d0b8e06d031a5f7899ef30c4dbf47a313 Successfully built curio Installing collected packages: curio Successfully installed curio-1.5
A instalace knihovny trio:
$ pip3 install --user trio Collecting trio Downloading trio-0.20.0-py3-none-any.whl (359 kB) |████████████████████████████████| 359 kB 1.4 MB/s Collecting sniffio Downloading sniffio-1.2.0-py3-none-any.whl (10 kB) Collecting outcome Downloading outcome-1.1.0-py2.py3-none-any.whl (9.7 kB) Requirement already satisfied: attrs>=19.2.0 in ./.local/lib/python3.8/site-packages (from trio) (21.4.0) Collecting async-generator>=1.9 Downloading async_generator-1.10-py3-none-any.whl (18 kB) Collecting sortedcontainers Downloading sortedcontainers-2.4.0-py2.py3-none-any.whl (29 kB) Requirement already satisfied: idna in /usr/lib/python3/dist-packages (from trio) (2.8) Installing collected packages: sortedcontainers, sniffio, outcome, async-generator, trio Successfully installed async-generator-1.10 outcome-1.1.0 sniffio-1.2.0 sortedcontainers-2.4.0 trio-0.20.0
Velmi zběžná kontrola, zda jsou obě knihovny dostupné:
$ python3 Python 3.8.10 (default, Mar 15 2022, 12:22:08) [GCC 9.4.0] on linux Type "help", "copyright", "credits" or "license" for more information. >>> import curio >>> import trio
4. Základní konstrukce nabízené knihovnou Curio
Připomeňme si nejprve náš úplně první pokus o vytvoření úlohy, která bude reprezentována korutinou. Minule jsme pro tento účel použili knihovnu asyncio a řešení vypadalo následovně: úloha je získána transformací funkce task do plnohodnotné korutiny (přes přímé volání této funkce), kterou posléze voláme z funkce main, a to konkrétně s využitím programové konstrukce await:
import asyncio import time async def task(): print("task started") await asyncio.sleep(5) print("task finished") def main(): task1 = asyncio.create_task(task()) print("task created") await task1 print("done") main()
Přepis tohoto programu založeného na korutinách tak, aby se použila knihovna curio, je poměrně přímočarý. Opět použijeme funkci nazvanou sleep, ovšem tentokrát získanou z balíčku curio. Povšimněte si, že se skutečně jedná o korutinu, protože před jménem funkce je uvedeno klíčové slovo async a zavoláním této funkce získáme objekt představující korutinu:
async sleep(seconds) Sleep for a specified number of seconds. Sleeping for 0 seconds makes a task immediately switch to the next ready task (if any). Returns the value of the kernel clock when awakened.
Odlišné bude i volání korutiny, protože se pro tento účel použije funkce nazvaná run (opět importovaná z balíčku curio). Povšimněte si, že prvním (a jediným povinným) parametrem této funkce je korutina, která se má spustit. To mj. znamená, že se můžeme obejít bez explicitního vytvoření úlohy a jejího následného volání:
run(corofunc, *args, with_monitor=False, selector=None, debug=None, activations=None, **kernel_extra) Run the curio kernel with an initial task and execute until all tasks terminate. Returns the task's final result (if any). This is a convenience function that should primarily be used for launching the top-level task of a curio-based application. It creates an entirely new kernel, runs the given task to completion, and concludes by shutting down the kernel, releasing all resources used. Don't use this function if you're repeatedly launching a lot of new tasks to run in curio. Instead, create a Kernel instance and use its run() method instead.
Celý skript se nepatrně zkrátil – jedná se o odlišný zápis ve funkci main:
import curio async def task(): print("task started") await curio.sleep(5) print("task finished") def main(): print("main started") curio.run(task) print("done") main()
Po spuštění tohoto prozatím velmi jednoduchého programu by se měly na terminál vypsat následující zprávy:
main started task started task finished done
5. Předání parametrů synchronně či asynchronně volané korutině
Korutinám je v naprosté většině případů nutné předávat nějaké parametry. V případě minule popsané knihovny asyncio se parametry mohly korutině předat v rámci volání funkce asyncio.create_task, takže program, který vytvořil, zavolal a počkal na dokončení dvou korutin (s parametry) mohl vypadat následovně:
import asyncio import time async def task(name): print(f"{name} task started") await asyncio.sleep(5) print(f"{name} task finished") async def main(): task1 = asyncio.create_task(task("first")) print("first task created") task2 = asyncio.create_task(task("second")) print("second task created") await task1 await task2 print("done") asyncio.run(main())
V případě, že je namísto knihovny asyncio použita knihovna curio, jsou parametry korutině předány přímo v rámci funkce curio.run. Povšimněte si podstatného rozdílu – nevoláme zde přímo korutinu task (ve skutečnosti ani v případě asyncio nedochází k jejímu přímému volání), ale reference na (synchronně či asynchronně) volanou korutinu se společně s jejími parametry předává funkci curio.run:
import curio async def task(n, s): print("task started") for i in range(n): print(f"{i+1}/{n}") await curio.sleep(s) print("task finished") def main(): print("main started") curio.run(task, 10, 1) print("done") main()
Po spuštění tohoto příkladu opět dojde k asynchronnímu spuštění korutiny s čekáním na její dokončení:
main started task started 1/10 2/10 3/10 4/10 5/10 6/10 7/10 8/10 9/10 10/10 task finished done
6. Chování programu při spuštění několika korutin funkcí curio.run
Zkusme si nyní spustit následující skript, v němž jsou vytvořeny a spuštěny celkem tři korutiny. Pro jejich spuštění se používá funkce nazvaná curio.run, s níž jsme se již seznámili v rámci předchozích dvou kapitol:
import curio async def task(name, n, s): print(f"{name} task started") for i in range(n): print(f"{name} {i+1}/{n}") await curio.sleep(s) print(f"{name} task finished") def main(): print("main started") curio.run(task, "1st", 10, 1) curio.run(task, "2nd", 10, 1) curio.run(task, "3rd", 10, 1) print("done") main()
Po spuštění tohoto skriptu získáme následující výstup, který jasně ukazuje, že i přes volání await curio.sleep v korutině nedojde k přepnutí na další korutinu – další korutina totiž ještě ani nebyla vytvořena. To vlastně znamená, že curio.run zajišťuje synchronní volání korutiny (protože se skutečně jedná o korutinu získanou transformací funkce s využitím konstrukce async):
main started 1st task started 1st 1/10 1st 2/10 1st 3/10 1st 4/10 1st 5/10 1st 6/10 1st 7/10 1st 8/10 1st 9/10 1st 10/10 1st task finished 2nd task started 2nd 1/10 2nd 2/10 2nd 3/10 2nd 4/10 2nd 5/10 2nd 6/10 2nd 7/10 2nd 8/10 2nd 9/10 2nd 10/10 2nd task finished 3rd task started 3rd 1/10 3rd 2/10 3rd 3/10 3rd 4/10 3rd 5/10 3rd 6/10 3rd 7/10 3rd 8/10 3rd 9/10 3rd 10/10 3rd task finished done
7. Asynchronní spuštění korutin
Pro skutečně asynchronní spuštění korutin se namísto funkce curio.run použité v předchozích demonstračních příkladech musí použít funkce nazvaná curio.spawn, jejíž hlavička a (stručný) popis vypadají následovně:
async spawn(corofunc, *args, daemon=False) Create a new task, running corofunc(*args). Use the daemon=True option if the task runs forever as a background task.
Vyzkoušejme si tedy, co se stane, pokud v příkladu uděláme nepatrnou změnu – použijeme curio.spawn a nikoli curio.run:
import curio async def task(name, n, s): print(f"{name} task started") for i in range(n): print(f"{name} {i+1}/{n}") await curio.sleep(s) print(f"{name} task finished") def main(): print("main started") task1 = curio.spawn(task, "1st", 10, 1) task2 = curio.spawn(task, "2nd", 10, 1) task3 = curio.spawn(task, "3rd", 10, 1) print("done") main()
Výsledkem bude následující varování, které nám říká, že jsme sice vytvořili a spustili asynchronní kód, ale nikde jsme nečekali na jeho ukončení:
main started done curio_04.py:22: RuntimeWarning: coroutine 'spawn' was never awaited main() RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Zdrojový kód demonstračního příkladu tedy upravíme alespoň do podoby, která nezpůsobí běhovou chybu:
import curio async def task(name, n, s): print(f"{name} task started") for i in range(n): print(f"{name} {i+1}/{n}") await curio.sleep(s) print(f"{name} task finished") async def main(): print("main started") task1 = await curio.spawn(task, "1st", 10, 1) task2 = await curio.spawn(task, "2nd", 10, 1) task3 = await curio.spawn(task, "3rd", 10, 1) print("done") curio.run(main())
Nyní je patrné, že se korutiny skutečně spustily (začaly tedy běžet asynchronně), ovšem prakticky ihned poté se ukončila i funkce main a kvůli tomu, že jsme nečekali na dokončení korutin, byl ukončen i běh celého skriptu:
main started done 1st task started 1st 1/10 2nd task started 2nd 1/10 3rd task started 3rd 1/10
Musíme tedy provést ještě jednu úpravu, která konkrétně spočívá v tom, že budeme čekat na dokončení jednotlivých korutin zavoláním metody join:
import curio async def task(name, n, s): print(f"{name} task started") for i in range(n): print(f"{name} {i+1}/{n}") await curio.sleep(s) print(f"{name} task finished") async def main(): print("main started") task1 = await curio.spawn(task, "1st", 10, 1) task2 = await curio.spawn(task, "2nd", 10, 1) task3 = await curio.spawn(task, "3rd", 10, 1) await task1.join() await task2.join() await task3.join() print("done") curio.run(main())
Nyní by již mělo dojít ke skutečnému souběžnému běhu korutin, mezi nimiž se přepíná během „spánku“ vyvolanému zavoláním curio.sleep:
main started 1st task started 1st 1/10 2nd task started 2nd 1/10 3rd task started 3rd 1/10 1st 2/10 2nd 2/10 3rd 2/10 1st 3/10 2nd 3/10 3rd 3/10 1st 4/10 2nd 4/10 3rd 4/10 1st 5/10 2nd 5/10 3rd 5/10 1st 6/10 2nd 6/10 3rd 6/10 1st 7/10 2nd 7/10 3rd 7/10 1st 8/10 2nd 8/10 3rd 8/10 1st 9/10 2nd 9/10 3rd 9/10 1st 10/10 2nd 10/10 3rd 10/10 1st task finished 2nd task finished 3rd task finished done
8. Sledování činnosti korutin s využitím monitoringu
Knihovna Curio obsahuje jednu velmi užitečnou funkcionalitu. Tou je možnost sledování činnosti korutin s využitím takzvaného monitoringu. Ten spočívá v tom, že se při spuštění funkce main (ta je asynchronní) přes curio.run použije nepovinný parametr with_monitor nastavený na True, jenž umožní, aby bylo možné dále spuštěné korutiny sledovat externím nástrojem. Upravený zdrojový kód demonstračního příkladu bude vypadat následovně:
import curio async def task(name, n, s): print(f"{name} task started") for i in range(n): print(f"{name} {i+1}/{n}") await curio.sleep(s) print(f"{name} task finished") async def main(): print("main started") task1 = await curio.spawn(task, "1st", 10, 5) task2 = await curio.spawn(task, "2nd", 10, 5) task3 = await curio.spawn(task, "3rd", 10, 5) await task1.join() await task2.join() await task3.join() print("done") curio.run(main(), with_monitor=True)
9. Připojení k běžícímu programu a sledování činnosti korutin
Nyní si ukážeme použití monitoringu v praxi. V jednom terminálu spustíme demonstrační příklad uvedený v předchozí kapitole:
$ python3 curio_06.py
Ve druhém terminálu pak spustíme vlastní monitor příkazem:
$ python3 -m curio.monitor
Měla by se zobrazit zpráva o všech aktuálně běžících korutinách a následně výzva (prompt) nástroje curio.monitor:
Curio Monitor: 6 tasks running Type help for commands curio >
Podporována je i stručná nápověda:
curio > help Commands: ps : Show task table where taskid : Show stack frames for a task cancel taskid : Cancel an indicated task signal signame : Send a Unix signal parents taskid : List task parents quit : Leave the monitor
Nás ovšem bude zajímat především výpis všech běžících korutin. Pro tento účel slouží příkaz ps:
curio > ps Task State Cycles Timeout Sleep Task ------ ------------ ---------- ------- ------- -------------------------------------------------- 1 READ_WAIT 1 None None Kernel._make_kernel_runtime.<locals>._kernel_task 3 FUTURE_WAIT 1 None None Monitor.monitor_task 4 TASK_JOIN 1 None None main 5 TIME_SLEEP 6 None 3.65660 task 6 TIME_SLEEP 6 None 3.65671 task 7 TIME_SLEEP 6 None 3.65674 task
Můžeme vidět, že kromě prvních dvou „systémových“ korutin jsou další korutiny vytvořené naším skriptem. Nyní se pokusíme o zobrazení podrobnějších informací o korutině číslo 6, a to příkazem w:
curio > w 6 Stack for Task(id=6, name='task', state='TIME_SLEEP') (most recent call last): File "curio_06.py", line 9, in task await curio.sleep(s) curio >
Informace jsou poměrně přesné – ukazují místo v programovém kódu s korutinou i to, zda (a kde) došlo k přepnutí kontextu na jinou korutinu.
10. Komunikace mezi korutinami s využitím fronty
Již minule jsme si řekli, že i pro předávání dat mezi korutinami lze využít frontu. Konkrétně jsme si ukázali instanci třídy asyncio.Queue, ovšem v knihovně Curio je k dispozici jiná implementace fronty nazvaná curio.Queue. Všechny operace s touto frontou (tedy zejména operace put a get) se musí volat v konstrukci await, což je ukázáno v dalším (prozatím velmi jednoduchém) demonstračním příkladu (do jisté míry se jedná o převzatý zdrojový kód z předchozího článku):
import curio async def task(name, queue): while not queue.empty(): param = await queue.get() print(f"Task named {name} started with parameter {param}") await curio.sleep(5) print(f"{name} task finished") async def main(): print("main started") queue = curio.Queue() for i in range(20): await queue.put(i) task1 = await curio.spawn(task, "1st", queue) task2 = await curio.spawn(task, "2nd", queue) task3 = await curio.spawn(task, "3rd", queue) task4 = await curio.spawn(task, "4th", queue) await task1.join() await task2.join() await task3.join() await task4.join() print("done") curio.run(main(), with_monitor=True)
Činnost běžícího programu (tedy procesu) můžeme opět sledovat monitorem:
$ python3 -m curio.monitor
Korutiny v procesu:
curio > ps Task State Cycles Timeout Sleep Task ------ ------------ ---------- ------- ------- -------------------------------------------------- 1 READ_WAIT 1 None None Kernel._make_kernel_runtime.<locals>._kernel_task 3 FUTURE_WAIT 1 None None Monitor.monitor_task 4 TASK_JOIN 1 None None main 5 TIME_SLEEP 1 None 1.47796 task 6 TIME_SLEEP 1 None 1.47797 task 7 TIME_SLEEP 1 None 1.47798 task 8 TIME_SLEEP 1 None 1.47798 task
Získání informací o jedné konkrétní korutině:
curio > parents 5 5 TIME_SLEEP task 4 TASK_JOIN main
11. Třída UniversalQueue
Potenciálně velmi užitečná je třída nazvaná UniversalQueue. Jedná se o implementaci fronty, kterou je možné použít jak pro komunikaci mezi korutinami, tak i – což zní zajímavě – pro komunikaci mezi vlákny (threads). Jedná se tedy o datovou strukturu, která může být synchronizována (resp. přístupy k ní jsou synchronizovány), popř. podporuje volání přes await (tedy chová se jako korutina). Díky této třídě lze tedy vytvářet programy s větším množstvím vláken a současně i větším množstvím korutin.
Podívejme se na použití této třídy v nepatrně upraveném demonstračním příkladu:
import curio async def task(name, queue): while not queue.empty(): param = await queue.get() print(f"Task named {name} started with parameter {param}") await curio.sleep(1) print(f"{name} task finished") async def main(): print("main started") queue = curio.UniversalQueue() for i in range(20): await queue.put(i) task1 = await curio.spawn(task, "1st", queue) task2 = await curio.spawn(task, "2nd", queue) task3 = await curio.spawn(task, "3rd", queue) task4 = await curio.spawn(task, "4th", queue) await task1.join() await task2.join() await task3.join() await task4.join() print("done") curio.run(main(), with_monitor=True)
Výsledek získaný po spuštění tohoto příkladu:
main started Task named 1st started with parameter 0 Task named 2nd started with parameter 1 Task named 3rd started with parameter 2 Task named 4th started with parameter 3 1st task finished Task named 1st started with parameter 4 2nd task finished Task named 2nd started with parameter 5 3rd task finished Task named 3rd started with parameter 6 4th task finished Task named 4th started with parameter 7 1st task finished Task named 1st started with parameter 8 2nd task finished Task named 2nd started with parameter 9 3rd task finished Task named 3rd started with parameter 10 4th task finished Task named 4th started with parameter 11 1st task finished Task named 1st started with parameter 12 2nd task finished Task named 2nd started with parameter 13 3rd task finished Task named 3rd started with parameter 14 4th task finished Task named 4th started with parameter 15 1st task finished Task named 1st started with parameter 16 2nd task finished Task named 2nd started with parameter 17 3rd task finished Task named 3rd started with parameter 18 4th task finished Task named 4th started with parameter 19 1st task finished 2nd task finished 3rd task finished 4th task finished done
12. Vyřešení úlohy typu producent-konzument
S využitím fronty sdílené mezi korutinami lze snadno vyřešit i problém (resp. přesněji řečeno úlohu) typu producent-konzument. Jedno z možných řešení je ukázáno níže. Oproti předchozím příkladům se zde spouští větší množství korutin, které jsou sdruženy do skupiny s využitím konstrukce async with. Díky této konstrukci je možné v případě potřeby se všemi korutinami pracovat současně, například čekat na jejich dokončení atd.:
async with curio.TaskGroup() as g: await g.spawn(producer, 1, queue, 10) await g.spawn(producer, 2, queue, 10) await g.spawn(producer, 3, queue, 10) await g.spawn(consumer, 1, queue) await g.spawn(consumer, 2, queue) await g.spawn(consumer, 3, queue)
Úplný zdrojový kód tohoto demonstračního příkladu vypadá následovně:
import curio async def producer(id, queue, n): for i in range(n): message = f"message #{i} from producer {id}" await queue.put(message) await curio.sleep(0.3) async def consumer(id, queue): print(f"consumer {id} started") while True: message = await queue.get() print(f"consumer {id} received {message}") await curio.sleep(0.4) async def main(): print("main started") queue = curio.Queue() async with curio.TaskGroup() as g: await g.spawn(producer, 1, queue, 10) await g.spawn(producer, 2, queue, 10) await g.spawn(producer, 3, queue, 10) await g.spawn(consumer, 1, queue) await g.spawn(consumer, 2, queue) await g.spawn(consumer, 3, queue) print("done") curio.run(main(), with_monitor=True)
Chování tohoto programu po jeho spuštění:
main started consumer 1 started consumer 1 received message #0 from producer 1 consumer 2 started consumer 2 received message #0 from producer 2 consumer 3 started consumer 3 received message #0 from producer 3 consumer 1 received message #1 from producer 1 consumer 2 received message #1 from producer 2 consumer 3 received message #1 from producer 3 consumer 1 received message #2 from producer 1 consumer 2 received message #2 from producer 2 consumer 3 received message #2 from producer 3 consumer 1 received message #3 from producer 1 consumer 2 received message #3 from producer 2 consumer 3 received message #3 from producer 3 consumer 1 received message #4 from producer 1 consumer 2 received message #4 from producer 2 consumer 3 received message #4 from producer 3 consumer 1 received message #5 from producer 1 consumer 2 received message #5 from producer 2 consumer 3 received message #5 from producer 3 consumer 1 received message #6 from producer 1 consumer 2 received message #6 from producer 2 consumer 3 received message #6 from producer 3 consumer 1 received message #7 from producer 1 consumer 2 received message #7 from producer 2 consumer 3 received message #7 from producer 3 consumer 1 received message #8 from producer 1 consumer 2 received message #8 from producer 2 consumer 3 received message #8 from producer 3 consumer 1 received message #9 from producer 1 consumer 2 received message #9 from producer 2 consumer 3 received message #9 from producer 3
13. Korutiny vracející hodnoty
Korutiny mohou vracet hodnoty, které se (v tom nejjednodušším případě) získají příkazem await korutina.join(). Tento příkaz zajistí čekaní na dokončení korutiny s vrácením případné návratové hodnoty (popř. hodnoty None, ve chvíli, když není žádná hodnota explicitně vrácena). Podívejme se na velmi jednoduchý příklad:
import curio async def task(name, a, b): print(f"{name} task started") result = a * b print(f"{name} task finished") return result async def main(): print("main started") task1 = await curio.spawn(task, "1st", 2, 3) task2 = await curio.spawn(task, "2nd", 4, 5) task3 = await curio.spawn(task, "3rd", 6, 7) print(await task1.join()) print(await task2.join()) print(await task3.join()) print("done") curio.run(main())
14. Čekání na dokončení korutiny a timeout
Často se setkáme s požadavkem, aby se návratová hodnota z korutiny vrátila v rámci určitého časového intervalu – jinými slovy nechceme na dokončení korutiny čekat neomezeně dlouhou dobu. Tento požadavek je logický, protože korutiny většinou znamenají volání nějaké I/O operace a mnohdy je nutné umět přerušit například „zaseklé“ síťové připojení atd.
Příklad simulace déletrvající I/O operace s využitím curio.sleep:
import curio async def task(name, a, b): print(f"{name} task started") result = a * b await curio.sleep(100) print(f"{name} task finished") return result async def main(): print("main started") task1 = await curio.spawn(task, "1st", 2, 3) task2 = await curio.spawn(task, "2nd", 4, 5) task3 = await curio.spawn(task, "3rd", 6, 7) print(await task1.join()) print(await task2.join()) print(await task3.join()) print("done") curio.run(main())
import curio async def task(name, a, b): print(f"{name} task started") result = a * b await curio.sleep(100) print(f"{name} task finished") return result async def main(): print("main started") task1 = await curio.spawn(task, "1st", 2, 3) task2 = await curio.spawn(task, "2nd", 4, 5) task3 = await curio.spawn(task, "3rd", 6, 7) print(await task1.join()) print(await task2.join()) print(await task3.join()) print("done") curio.run(main())
Namísto „nekonečného“ čekání na dokončení korutiny metodou task.join je možné použít funkci curio.timeout_after, které se předá doba čekání (v sekundách) a korutina, na jejíž dokončení se čeká. Pokud korutina není dokončena ve stanoveném časovém intervalu, dojde k vyhození výjimky typu curio.TaskTimeout:
import curio async def task(name, a, b): print(f"{name} task started") result = a * b await curio.sleep(100) print(f"{name} task finished") return result async def main(): print("main started") task1 = await curio.spawn(task, "1st", 2, 3) try: result = await curio.timeout_after(1, task1.join) print(result) except curio.TaskTimeout as e: print("Timeout!") await task1.cancel() print("done") curio.run(main())
Chování tohoto programu po spuštění:
main started 1st task started Timeout! done
15. Reakce v korutině na timeout
V předchozí kapitole jsme si ukázali, jak je možné přerušit déletrvající korutinu. Ovšem ve skutečnosti může na pokus o své ukončení reagovat i samotná korutina, a to konkrétně zachycením výjimky typu curio.CancelledError. Z tohoto příkladu je patrné, jak je možné s využitím korutin prakticky řešit souběžné úlohy:
import curio async def task(name, a, b): try: print(f"{name} task started") result = a * b await curio.sleep(100) print(f"{name} task finished") return result except curio.CancelledError: print("Timeouting...") raise async def main(): print("main started") task1 = await curio.spawn(task, "1st", 2, 3) try: result = await curio.timeout_after(1, task1.join) print(result) except curio.TaskTimeout as e: print("Timeout!") await task1.cancel() print("done") curio.run(main())
Výsledek po spuštění tohoto příkladu:
main started 1st task started Timeout! Timeouting... done
16. Reakce na vyhozenou výjimku v korutině
Jak při použití vláken, tak i korutin je nutné nějakým způsobem reagovat na výjimky, které mohou ve vláknu nebo korutině nastat. Ukažme si nejdříve jednoduchý demonstrační příklad, v němž korutina kontroluje, jakého typu jsou její dva parametry nazvané a a b:
import curio async def task(name, a, b): try: print(f"{name} task started") assert isinstance(a, int) assert isinstance(b, int) result = a * b print(f"{name} task finished") return result except curio.CancelledError: print("Timeouting...") raise async def main(): print("main started") task1 = await curio.spawn(task, "1st", 2, "foo") try: result = await curio.timeout_after(1, task1.join) print(result) except curio.TaskTimeout as e: print("Timeout!") await task1.cancel() print("done") curio.run(main())
Tento příklad (podle očekávání) zhavaruje, ovšem zajímavým způsobem:
Task Crash: Task(id=3, name='task', state='TERMINATED') Traceback (most recent call last): File "/home/ptisnovs/.local/lib/python3.8/site-packages/curio/kernel.py", line 736, in kernel_run trap = current.send(current._trap_result) File "/home/ptisnovs/.local/lib/python3.8/site-packages/curio/task.py", line 167, in send return self._send(value) File "/home/ptisnovs/.local/lib/python3.8/site-packages/curio/task.py", line 171, in _task_runner return await coro File "curio_15.py", line 9, in task assert isinstance(b, int) AssertionError main started 1st task started Traceback (most recent call last): File "/home/ptisnovs/.local/lib/python3.8/site-packages/curio/kernel.py", line 736, in kernel_run trap = current.send(current._trap_result) File "/home/ptisnovs/.local/lib/python3.8/site-packages/curio/task.py", line 167, in send return self._send(value) File "/home/ptisnovs/.local/lib/python3.8/site-packages/curio/task.py", line 171, in _task_runner return await coro File "curio_15.py", line 9, in task assert isinstance(b, int) AssertionError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "curio_15.py", line 35, in <module> curio.run(main()) File "/home/ptisnovs/.local/lib/python3.8/site-packages/curio/kernel.py", line 822, in run return kernel.run(corofunc, *args) File "/home/ptisnovs/.local/lib/python3.8/site-packages/curio/kernel.py", line 172, in run raise ret_exc File "/home/ptisnovs/.local/lib/python3.8/site-packages/curio/kernel.py", line 736, in kernel_run trap = current.send(current._trap_result) File "/home/ptisnovs/.local/lib/python3.8/site-packages/curio/task.py", line 167, in send return self._send(value) File "/home/ptisnovs/.local/lib/python3.8/site-packages/curio/task.py", line 171, in _task_runner return await coro File "curio_15.py", line 26, in main result = await curio.timeout_after(1, task1.join) File "/home/ptisnovs/.local/lib/python3.8/site-packages/curio/time.py", line 139, in _timeout_after_func return await coro File "/home/ptisnovs/.local/lib/python3.8/site-packages/curio/task.py", line 187, in join raise TaskError('Task crash') from self.exception curio.errors.TaskError: Task crash
Ve skutečnosti můžeme tuto výjimku zachytit i ve volajícím kódu (tedy z funkce, odkud je korutina volána). Nutné je však nezachytávat původní výjimku, ale výjimku typu curio.TaskError tak, jak je to ukázáno v dalším příkladu:
import curio async def task(name, a, b): try: print(f"{name} task started") assert isinstance(a, int) assert isinstance(b, int) result = a * b print(f"{name} task finished") return result except curio.CancelledError: print("Timeouting...") raise async def main(): print("main started") task1 = await curio.spawn(task, "1st", 2, "foo") try: result = await curio.timeout_after(1, task1.join) print(result) except curio.TaskTimeout as e: print("Timeout!") await task1.cancel() except curio.TaskError as e: print("Task error!", e) print("done") curio.run(main())
První řádky jsou vypsány v korutině, ovšem z označeného řádku je patrné, že výjimka byla skutečně korektně zachycena i ve funkci main:
Task Crash: Task(id=3, name='task', state='TERMINATED') Traceback (most recent call last): File "/home/ptisnovs/.local/lib/python3.8/site-packages/curio/kernel.py", line 736, in kernel_run trap = current.send(current._trap_result) File "/home/ptisnovs/.local/lib/python3.8/site-packages/curio/task.py", line 167, in send return self._send(value) File "/home/ptisnovs/.local/lib/python3.8/site-packages/curio/task.py", line 171, in _task_runner return await coro File "curio_16.py", line 9, in task assert isinstance(b, int) AssertionError main started 1st task started Task error! Task crash done
17. Knihovna Trio
Podobný koncept práce s korutinami, který jsme viděli v knihovně Curio, je implementován i v knihovně nazvané Trio, která z Curia částečně vychází. Cílem této knihovny je nejenom nabídnout programátorům souběžný běh korutin (a plně paralelní běh I/O operací – což je úkol pro operační systém), ale navíc i zajistit korektnost výsledného programu, což vůbec není jednoduchý úkol. Podrobnější informace o této velmi užitečné knihovně si řekneme v navazujícím článku.
18. Způsob překladu konstrukcí s async a await do bajtkódu
V předchozím textu jsme se seznámili s tím, že v Pythonu je nyní možné v programovém kódu používat nová klíčová slova async a await. Bude tedy zajímavé zjistit, jak se vlastně existence nových klíčových slov projevila ve vygenerovaném bajtkódu (protože skripty psané v Pythonu jsou nejdříve převedeny do bajtkódu a teprve poté vykonány virtuálním strojem tohoto jazyka).
Nejdříve si připravíme jednoduchý skript, který obsahuje dvojici funkcí, které se následně zavolají. Ovšem nejenom to – taktéž se s využitím standardní knihovny dis zobrazí bajtkód volaných funkcí:
import dis def say_hello(): print("Hello world") def main(): say_hello() main() print("say_hello") dis.dis(say_hello) print("main") dis.dis(main)
Výsledek by měl vypadat následovně. Povšimněte si, že obě funkce se volají jednoduše – s využitím instrukce CALL_FUNCTION, které předchází uložení parametrů na zásobník a za nímž následuje odstranění hodnoty ze zásobníku instrukcí POP_TOP:
Hello world say_hello 5 0 LOAD_GLOBAL 0 (print) 2 LOAD_CONST 1 ('Hello world') 4 CALL_FUNCTION 1 6 POP_TOP 8 LOAD_CONST 0 (None) 10 RETURN_VALUE main 9 0 LOAD_GLOBAL 0 (say_hello) 2 CALL_FUNCTION 0 4 POP_TOP 6 LOAD_CONST 0 (None) 8 RETURN_VALUE
Nyní obě funkce upravíme, a to tak, aby se z nich transformací staly korutiny. Použijeme tedy klíčové slovo async a při volání využijeme await:
import dis import asyncio async def say_hello(): print("Hello world") async def main(): t = asyncio.create_task(say_hello()) await t asyncio.run(main()) print("say_hello") dis.dis(say_hello) print("main") dis.dis(main)
Výsledný bajtkód bude v tomto případě (podle očekávání) odlišný. Samotná funkce say_hello má sice stejný bajtkód, jako její synchronní obdoba, ovšem volá se zcela odlišným způsobem. Taktéž pro konstrukci await je použita nová instrukce bajtkódu GET_AWAITABLE určená pro získání reference na korutinu (a její uložení na zásobník). Samotné přečtení hodnoty z korutiny (nebo z generátoru) je řešeno instrukcí YIELD_FROM:
Hello world say_hello 6 0 LOAD_GLOBAL 0 (print) 2 LOAD_CONST 1 ('Hello world') 4 CALL_FUNCTION 1 6 POP_TOP 8 LOAD_CONST 0 (None) 10 RETURN_VALUE main 10 0 LOAD_GLOBAL 0 (asyncio) 2 LOAD_METHOD 1 (create_task) 4 LOAD_GLOBAL 2 (say_hello) 6 CALL_FUNCTION 0 8 CALL_METHOD 1 10 STORE_FAST 0 (t) 11 12 LOAD_FAST 0 (t) 14 GET_AWAITABLE 16 LOAD_CONST 0 (None) 18 YIELD_FROM 20 POP_TOP 22 LOAD_CONST 0 (None) 24 RETURN_VALUE
19. Repositář s demonstračními příklady
Zdrojové kódy všech prozatím popsaných demonstračních příkladů určených pro programovací jazyk Python 3 byly uloženy do Git repositáře dostupného na adrese https://github.com/tisnik/most-popular-python-libs. V případě, že nebudete chtít klonovat celý repositář (ten je ovšem stále velmi malý, dnes má velikost zhruba několik desítek kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce:
20. Odkazy na Internetu
- Dokumentace Pythonu: balíček queue
https://docs.python.org/3/library/queue.html - Dokumentace Pythonu: balíček threading
https://docs.python.org/3/library/threading.html? - Dokumentace Pythonu: balíček multiprocessing
https://docs.python.org/3/library/multiprocessing.html - Dokumentace Pythonu: balíček asyncio
https://docs.python.org/3/library/asyncio.html - Synchronization Primitives
https://docs.python.org/3/library/asyncio-sync.html - Coroutines
https://docs.python.org/3/library/asyncio-task.html - Queues
https://docs.python.org/3/library/asyncio-queue.html - python-csp
https://python-csp.readthedocs.io/en/latest/ - TrellisSTM
http://peak.telecommunity.com/DevCenter/TrellisSTM - Python Multithreading and Multiprocessing Tutorial
https://www.toptal.com/python/beginners-guide-to-concurrency-and-parallelism-in-python - ThreadPoolExecutor
https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor - ProcessPoolExecutor
https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor - asyncio — Asynchronous I/O
https://docs.python.org/3/library/asyncio.html - Threads vs Async: Has Asyncio Solved Concurrency?
https://www.youtube.com/watch?v=NZq31Sg8R9E - Python Asynchronous Programming – AsyncIO & Async/Await
https://www.youtube.com/watch?v=t5Bo1Je9EmE - AsyncIO & Asynchronous Programming in Python
https://www.youtube.com/watch?v=6RbJYN7SoRs - Coroutines and Tasks
https://docs.python.org/3/library/asyncio-task.html - Python async/await Tutorial
https://stackabuse.com/python-async-await-tutorial/ - Demystifying Python's Async and Await Keywords
https://www.youtube.com/watch?v=F19R_M4Nay4 - Curio
https://curio.readthedocs.io/en/latest/ - Trio: a friendly Python library for async concurrency and I/O
https://trio.readthedocs.io/en/stable/ - Curio – A Tutorial Introduction
https://curio.readthedocs.io/en/latest/tutorial.html - unsync
https://github.com/alex-sherman/unsync - David Beazley – Die Threads
https://www.youtube.com/watch?v=xOyJiN3yGfU - Miguel Grinberg Asynchronous Python for the Complete Beginner PyCon 2017
https://www.youtube.com/watch?v=iG6fr81×HKA - Build Your Own Async
https://www.youtube.com/watch?v=Y4Gt3Xjd7G8 - The Other Async (Threads + Async = ❤️)
https://www.youtube.com/watch?v=x1ndXuw7S0s - Fear and Awaiting in Async: A Savage Journey to the Heart of the Coroutine Dream
https://www.youtube.com/watch?v=E-1Y4kSsAFc - Keynote David Beazley – Topics of Interest (Python Asyncio)
https://www.youtube.com/watch?v=ZzfHjytDceU - David Beazley – Python Concurrency From the Ground Up: LIVE! – PyCon 2015
https://www.youtube.com/watch?v=MCs5OvhV9S4 - Python Async basics video (100 million HTTP requests)
https://www.youtube.com/watch?v=Mj-Pyg4gsPs - Nathaniel J. Smith – Trio: Async concurrency for mere mortals – PyCon 2018
https://www.youtube.com/watch?v=oLkfnc_UMcE