Obsah
1. Souběžné a paralelně běžící úlohy naprogramované v Pythonu
2. Balíček threading – vytvoření a spuštění několika vláken
3. Předání parametrů funkcím spouštěným v nových vláknech
4. Explicitní čekání na dokončení běhu vláken
5. Vlákna s příznakem „daemon“
6. Čekaní na dokončení vlákna po zvolený časový interval, test, zda vlákno stále běží
8. Standardní synchronizované datové struktury z balíčku queue
9. Fronta ve funkci komunikačního i synchronizačního mechanismu
10. Klasický vzor producent–konzument
12. Jazyk Python a multiprocessing
14. Spuštění většího množství procesů, čekání na dokončení těchto procesů
15. Zjednodušení předchozího demonstračního příkladu
16. Komunikace mezi procesy přes multiprocessing.Queue
17. Komunikace mezi procesy přes obousměrnou rouru (multiprocessing.Pipe)
18. Spouštění a řízení paralelně běžících úloh – concurrent.futures
19. Repositář s demonstračními příklady
1. Souběžné a paralelně běžící úlohy naprogramované v Pythonu
V dnešním článku o programovacím jazyku Python se seznámíme s několika balíčky, které je možné nalézt přímo ve standardní knihovně. Tyto balíčky jsou určeny pro spouštění souběžných či dokonce paralelně běžících úloh. Konkrétně se jedná o balíčky threading, multiprocessing a taktéž o balíček concurrent.futures. Zmíníme se i o vybraných způsobech komunikace mezi těmito úlohami, protože (alespoň většinou) je vhodnější zajistit komunikaci s využitím k tomu určených prostředků a nikoli přes sdílené objekty (což ani u úloh běžících v samostatných procesech není jednoduše možné).
Na tomto místě je vhodné upozornit na fakt, že v Pythonu sice můžeme používat všechny dále popisované balíčky, ovšem skutečný paralelní (a nezávislý) běh několika vláken je ve standardním CPythonu do značné míry omezen kvůli existenci techniky zvané GIL neboli Global Interpreter Lock (viz též příslušnou stránku). Existuje poměrně velké množství návrhů na odstranění GILu, ovšem prozatím je tato technika v CPythonu stále používána. Naproti tomu IronPython ani Jython tuto techniku nepoužívají; na druhou stranu je však například vývoj Jythonu prakticky pozastaven (viz též poznámku pod odstavcem). V případě klasického CPythonu je tedy nutné chápat především modul threading, o němž se zmíníme v dalším textu, jako řešení algoritmů, které mají běžet souběžně, nikoli nutně paralelně (paralelně poběží většinou vstupně-výstupní operace, resp. v tuto chvíli dojde k přepnutí vláken). Skutečně paralelní běh lze pochopitelně dosáhnout využitím většího množství procesů popř. některých technik uvedených v navazujícím článku.
2. Balíček threading – vytvoření a spuštění několika vláken
V první části dnešního článku si ukážeme základní techniky podporované standardním balíčkem nazvaným threading. Dokumentaci k tomuto balíčku lze najít na stránce https://docs.python.org/3/library/threading.html, kde jsou vypsány i některé postupy, které dnes nebudou popsány. Jedná se především o použití různých synchronizačních mechanismů a taktéž systémů událostí (příkladem může být, že jedno vlákno vytvoří událost, na kterou další vlákno čeká atd.). Tyto již nepatrně komplikovanější postupy budou popsány v navazujícím článku.
Podívejme se nyní na velmi jednoduchý program, v němž je spuštěno větší množství vláken. Nejprve vytvoříme zcela běžnou funkci, která v programové smyčce desetkrát vypíše jméno vlákna, ve kterém je tato funkce spuštěna, dále vypíše aktuální hodnotu počitadla smyčky a taktéž čas, kdy k tomuto výpisu došlo. Mezi jednotlivými iteracemi smyčky je vložena přibližně jednosekundová prodleva, kterou simulujeme reálnou práci (například nějaký výpočet, komunikaci s databází, provedení HTTP dotazu atd.):
def worker(): threadName = threading.current_thread().name delay = 1 n = 10 for counter in range(1, n+1): time.sleep(delay) print("{}: {}/{} - {}".format(threadName, counter, n, time.ctime(time.time())))
>>> import threading >>> print(threading.current_thread().name) MainThread
Právě definovanou funkci worker ovšem nebudeme spouštět přímo. Nejprve vytvoříme nové vlákno, specifikujeme, že se v rámci tohoto vlákna má vykonat právě funkce worker a teprve poté toto vlákno spustíme. Popsanou operaci zopakujeme několikrát, takže se spustí několik vláken provádějících podobnou činnost (lišit se bude pouze jméno vlákna vypisované funkcí worker a popř. i časy výpisu zpráv):
threading.Thread(target=worker).start() threading.Thread(target=worker).start() threading.Thread(target=worker).start()
Po provedení těchto tří příkazů se kromě hlavního vlákna spustí ještě další tři vlákna, která budou prováděna souběžně (ovšem nikoli zcela paralelně, a to kvůli výše zmíněné existenci GILu). Hlavní vlákno bude čekat na dokončení ostatních tří vláken, protože tato vlákna nemají nastaven atribut daemon. Více informací o rozlišení vláken se dozvíme v navazujícím textu. Ve výpisu si povšimněte, jak jsou vlákna automaticky pojmenována. Dále je zřejmé, že vlákna skutečně pracují souběžně:
Thread-1: 1/10 - Sat Feb 19 09:14:51 2022 Thread-3: 1/10 - Sat Feb 19 09:14:51 2022 Thread-2: 1/10 - Sat Feb 19 09:14:51 2022 Thread-1: 2/10 - Sat Feb 19 09:14:52 2022 Thread-2: 2/10 - Sat Feb 19 09:14:52 2022 Thread-3: 2/10 - Sat Feb 19 09:14:52 2022 Thread-1: 3/10 - Sat Feb 19 09:14:53 2022 Thread-3: 3/10 - Sat Feb 19 09:14:53 2022 Thread-2: 3/10 - Sat Feb 19 09:14:53 2022 Thread-1: 4/10 - Sat Feb 19 09:14:54 2022 Thread-2: 4/10 - Sat Feb 19 09:14:54 2022 Thread-3: 4/10 - Sat Feb 19 09:14:54 2022 Thread-1: 5/10 - Sat Feb 19 09:14:55 2022 Thread-2: 5/10 - Sat Feb 19 09:14:55 2022 Thread-3: 5/10 - Sat Feb 19 09:14:55 2022 Thread-1: 6/10 - Sat Feb 19 09:14:56 2022 Thread-2: 6/10 - Sat Feb 19 09:14:56 2022 Thread-3: 6/10 - Sat Feb 19 09:14:56 2022 Thread-1: 7/10 - Sat Feb 19 09:14:57 2022 Thread-2: 7/10 - Sat Feb 19 09:14:57 2022 Thread-3: 7/10 - Sat Feb 19 09:14:57 2022 Thread-1: 8/10 - Sat Feb 19 09:14:58 2022 Thread-2: 8/10 - Sat Feb 19 09:14:58 2022 Thread-3: 8/10 - Sat Feb 19 09:14:58 2022 Thread-2: 9/10 - Sat Feb 19 09:14:59 2022 Thread-1: 9/10 - Sat Feb 19 09:14:59 2022 Thread-3: 9/10 - Sat Feb 19 09:14:59 2022 Thread-1: 10/10 - Sat Feb 19 09:15:00 2022 Thread-2: 10/10 - Sat Feb 19 09:15:00 2022 Thread-3: 10/10 - Sat Feb 19 09:15:00 2022
Úplný zdrojový kód dnešního prvního demonstračního příkladu vypadá následovně:
#!/usr/bin/env python3 """Multithreading.""" import threading import time def worker(): threadName = threading.current_thread().name delay = 1 n = 10 for counter in range(1, n+1): time.sleep(delay) print("{}: {}/{} - {}".format(threadName, counter, n, time.ctime(time.time()))) # vytvoření a spuštění trojice vláken threading.Thread(target=worker).start() threading.Thread(target=worker).start() threading.Thread(target=worker).start()
3. Předání parametrů funkcím spouštěným v nových vláknech
Mnohdy je nutné funkci, která má být zavolána v nově spuštěném vláknu, předat nějaké parametry. To nelze provést přímo (funkci totiž nevolá přímo programátor, ale modul threading), nicméně parametry do volané funkce je možné specifikovat, i když nepřímým způsobem. Funkci nejprve upravíme takovým způsobem, aby akceptovala tři parametry, konkrétně explicitně nastavené jméno vlákna, čas prodlevy a počet opakování programové smyčky ve vláknu:
def worker(threadName, delay, n): for counter in range(1, n+1): time.sleep(delay) print("{}: {}/{} - {}".format(threadName, counter, n, time.ctime(time.time())))
Parametry, které se mají funkci worker po jejím zavolání v novém vláknu předat, se specifikují v parametru args konstruktoru threading.Thread, a to (v tomto případě) formou n-tice (tuple):
threading.Thread(target=worker, args=("Thread-1", 0.5, 10)).start() threading.Thread(target=worker, args=("Thread-2", 1.0, 10)).start() threading.Thread(target=worker, args=("Thread-3", 1.5, 10)).start()
Po spuštění takto upraveného programu je patrné, že kvůli různým časovým prodlevám mají výpočty ve vláknech různou rychlost a konkrétně to znamená, že celý výpočet v prvním vláknu doběhne nejrychleji a výpočet ve třetím vláknu nejpomaleji. Poslední iterace programových smyček ve všech třech vláknech jsou na následujícím výpisu zvýrazněny:
Thread-1: 1/10 - Sat Feb 19 09:21:14 2022 Thread-2: 1/10 - Sat Feb 19 09:21:14 2022 Thread-1: 2/10 - Sat Feb 19 09:21:14 2022 Thread-1: 3/10 - Sat Feb 19 09:21:15 2022 Thread-3: 1/10 - Sat Feb 19 09:21:15 2022 Thread-2: 2/10 - Sat Feb 19 09:21:15 2022 Thread-1: 4/10 - Sat Feb 19 09:21:15 2022 Thread-1: 5/10 - Sat Feb 19 09:21:16 2022 Thread-3: 2/10 - Sat Feb 19 09:21:16 2022 Thread-2: 3/10 - Sat Feb 19 09:21:16 2022 Thread-1: 6/10 - Sat Feb 19 09:21:16 2022 Thread-1: 7/10 - Sat Feb 19 09:21:17 2022 Thread-2: 4/10 - Sat Feb 19 09:21:17 2022 Thread-1: 8/10 - Sat Feb 19 09:21:17 2022 Thread-3: 3/10 - Sat Feb 19 09:21:18 2022 Thread-1: 9/10 - Sat Feb 19 09:21:18 2022 Thread-1: 10/10 - Sat Feb 19 09:21:18 2022 Thread-2: 5/10 - Sat Feb 19 09:21:18 2022 Thread-3: 4/10 - Sat Feb 19 09:21:19 2022 Thread-2: 6/10 - Sat Feb 19 09:21:19 2022 Thread-2: 7/10 - Sat Feb 19 09:21:20 2022 Thread-3: 5/10 - Sat Feb 19 09:21:21 2022 Thread-2: 8/10 - Sat Feb 19 09:21:21 2022 Thread-3: 6/10 - Sat Feb 19 09:21:22 2022 Thread-2: 9/10 - Sat Feb 19 09:21:22 2022 Thread-2: 10/10 - Sat Feb 19 09:21:23 2022 Thread-3: 7/10 - Sat Feb 19 09:21:24 2022 Thread-3: 8/10 - Sat Feb 19 09:21:25 2022 Thread-3: 9/10 - Sat Feb 19 09:21:27 2022 Thread-3: 10/10 - Sat Feb 19 09:21:28 2022
Úplný zdrojový kód dnešního druhého demonstračního příkladu vypadá následovně:
#!/usr/bin/env python3 """Multithreading.""" import threading import time def worker(threadName, delay, n): for counter in range(1, n+1): time.sleep(delay) print("{}: {}/{} - {}".format(threadName, counter, n, time.ctime(time.time()))) # vytvoření a spuštění trojice vláken threading.Thread(target=worker, args=("Thread-1", 0.5, 10)).start() threading.Thread(target=worker, args=("Thread-2", 1.0, 10)).start() threading.Thread(target=worker, args=("Thread-3", 1.5, 10)).start()
4. Explicitní čekání na dokončení běhu vláken
V předchozí dvojici demonstračních příkladů byla z hlavního vlákna spuštěna tři další vlákna. Ovšem hlavní vlákno bylo ihned po provedení těchto tří operací ukončeno, protože již neobsahovalo žádný další programový kód. Nicméně i přesto interpret Pythonu počkal na dokončení běhu ostatních vláken, protože se jedná o běžná „nedémonická“ vlákna. V případě, že nebudeme chtít, aby se hlavní vlákno automaticky ukončilo, je nutné explicitně počkat na ostatní vlákna. K tomuto účelu slouží metoda Thread.join():
join(timeout=None) method of threading.Thread instance Wait until the thread terminates. This blocks the calling thread until the thread whose join() method is called terminates -- either normally or through an unhandled exception or until the optional timeout occurs. When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened -- if the thread is still alive, the join() call timed out. When the timeout argument is not present or None, the operation will block until the thread terminates. A thread can be join()ed many times. join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.
Způsob volání metody join je jednoduchý, ovšem musíme mít k dispozici instance třídy threading.Thread, což si vyžádá nepatrnou úpravu zdrojového kódu:
# vytvoření trojice vláken t1 = threading.Thread(target=worker, args=("Thread-1", 0.5, 10)) t2 = threading.Thread(target=worker, args=("Thread-2", 1.0, 10)) t3 = threading.Thread(target=worker, args=("Thread-3", 1.5, 10)) # spuštění všech vláken t1.start() t2.start() t3.start() # čekání na dokončení všech vláken t1.join() t2.join() t3.join()
Po spuštění dostaneme velmi podobné výsledky, jako tomu bylo v předchozím příkladu:
Thread-1: 1/10 - Sat Feb 19 09:55:59 2022 Thread-2: 1/10 - Sat Feb 19 09:55:59 2022 Thread-1: 2/10 - Sat Feb 19 09:55:59 2022 Thread-3: 1/10 - Sat Feb 19 09:56:00 2022 Thread-1: 3/10 - Sat Feb 19 09:56:00 2022 Thread-1: 4/10 - Sat Feb 19 09:56:00 2022 Thread-2: 2/10 - Sat Feb 19 09:56:00 2022 Thread-1: 5/10 - Sat Feb 19 09:56:01 2022 Thread-3: 2/10 - Sat Feb 19 09:56:01 2022 Thread-2: 3/10 - Sat Feb 19 09:56:01 2022 Thread-1: 6/10 - Sat Feb 19 09:56:01 2022 Thread-1: 7/10 - Sat Feb 19 09:56:02 2022 Thread-2: 4/10 - Sat Feb 19 09:56:02 2022 Thread-1: 8/10 - Sat Feb 19 09:56:02 2022 Thread-3: 3/10 - Sat Feb 19 09:56:03 2022 Thread-1: 9/10 - Sat Feb 19 09:56:03 2022 Thread-2: 5/10 - Sat Feb 19 09:56:03 2022 Thread-1: 10/10 - Sat Feb 19 09:56:03 2022 Thread-3: 4/10 - Sat Feb 19 09:56:04 2022 Thread-2: 6/10 - Sat Feb 19 09:56:04 2022 Thread-2: 7/10 - Sat Feb 19 09:56:05 2022 Thread-3: 5/10 - Sat Feb 19 09:56:06 2022 Thread-2: 8/10 - Sat Feb 19 09:56:06 2022 Thread-3: 6/10 - Sat Feb 19 09:56:07 2022 Thread-2: 9/10 - Sat Feb 19 09:56:07 2022 Thread-2: 10/10 - Sat Feb 19 09:56:08 2022 Thread-3: 7/10 - Sat Feb 19 09:56:09 2022 Thread-3: 8/10 - Sat Feb 19 09:56:10 2022 Thread-3: 9/10 - Sat Feb 19 09:56:12 2022 Thread-3: 10/10 - Sat Feb 19 09:56:13 2022 Done!
Úplný zdrojový kód dnešního třetího demonstračního příkladu vypadá následovně:
#!/usr/bin/env python3 """Multithreading.""" import threading import time def worker(threadName, delay, n): for counter in range(1, n+1): time.sleep(delay) print("{}: {}/{} - {}".format(threadName, counter, n, time.ctime(time.time()))) # vytvoření trojice vláken t1 = threading.Thread(target=worker, args=("Thread-1", 0.5, 10)) t2 = threading.Thread(target=worker, args=("Thread-2", 1.0, 10)) t3 = threading.Thread(target=worker, args=("Thread-3", 1.5, 10)) # spuštění všech vláken t1.start() t2.start() t3.start() # čekání na dokončení všech vláken t1.join() t2.join() t3.join() print("Done!")
Objekty jsou však uvolněny až po dokončení všech vláken:
#!/usr/bin/env python3 """Multithreading.""" import threading import time class X(): def __init__(self): print("X constructed") def __del__(self): print("X destructed") def worker(): print("thread started") time.sleep(10) print("thread finished") print("main started") x = X() # vytvoření a spuštění trojice vláken threading.Thread(target=worker).start() threading.Thread(target=worker).start() threading.Thread(target=worker).start() print("main finished")
Dobré je sledovat, kdy se zavolá destruktor objektu x:
main started X constructed thread started thread started thread started main finished thread finished thread finished thread finished X destructed
5. Vlákna s příznakem „daemon“
Vlákna, která jsou vytvářená přes knihovnu threading, mohou mít nastaven příznak „daemon“. Tato vlákna se liší od standardních vláken pouze v jediném ohledu – virtuální stroj Pythonu je ukončen až ve chvíli, kdy svoji práci dokončí všechna běžná vlákna, zatímco vlákna s příznakem „daemon“ nejsou do této podmínky zahrnuta. Toto chování si můžeme snadno otestovat spuštěním následujícího skriptu. Ten vytvoří tři vlákna s příznakem „daemon“ (viz též podtrženou část kódu) a poté hlavní vlákno ihned skončí. V tomto okamžiku je ukončen i běh virtuálního stroje Pythonu, protože už neběží žádná „ne-démonická“ vlákna:
import threading import time def worker(): threadName = threading.current_thread().name delay = 1 n = 10 for counter in range(1, n+1): time.sleep(delay) print("{}: {}/{} - {}".format(threadName, counter, n, time.ctime(time.time()))) # vytvoření a spuštění trojice vláken v režimu daemon threading.Thread(target=worker, daemon=True).start() threading.Thread(target=worker, daemon=True).start() threading.Thread(target=worker, daemon=True).start() # na dokončení vláken se nečeká!
Tento skript (většinou) nic nevypíše, protože než se vlákna stihnou skutečně spustit, je činnost skriptu ukončena.
Na dokončení „démonických“ vláken je nutné explicitně čekat, pochopitelně za předpokladu, že to odpovídá logice implementované aplikace:
import threading import time def worker(): threadName = threading.current_thread().name delay = 1 n = 10 for counter in range(1, n+1): time.sleep(delay) print("{}: {}/{} - {}".format(threadName, counter, n, time.ctime(time.time()))) # vytvoření a spuštění trojice vláken v režimu daemon t1 = threading.Thread(target=worker, daemon=True) t2 = threading.Thread(target=worker, daemon=True) t3 = threading.Thread(target=worker, daemon=True) t1.start() t2.start() t3.start() t1.join() t2.join() t3.join()
Výsledek činnosti tohoto skriptu:
Thread-1: 1/10 - Sat Feb 19 09:58:02 2022 Thread-2: 1/10 - Sat Feb 19 09:58:02 2022 Thread-3: 1/10 - Sat Feb 19 09:58:02 2022 Thread-2: 2/10 - Sat Feb 19 09:58:03 2022 Thread-1: 2/10 - Sat Feb 19 09:58:03 2022 Thread-3: 2/10 - Sat Feb 19 09:58:03 2022 Thread-3: 3/10 - Sat Feb 19 09:58:04 2022 Thread-1: 3/10 - Sat Feb 19 09:58:04 2022 Thread-2: 3/10 - Sat Feb 19 09:58:04 2022 Thread-3: 4/10 - Sat Feb 19 09:58:05 2022 Thread-1: 4/10 - Sat Feb 19 09:58:05 2022 Thread-2: 4/10 - Sat Feb 19 09:58:05 2022 Thread-3: 5/10 - Sat Feb 19 09:58:06 2022 Thread-1: 5/10 - Sat Feb 19 09:58:06 2022 Thread-2: 5/10 - Sat Feb 19 09:58:06 2022 Thread-3: 6/10 - Sat Feb 19 09:58:07 2022 Thread-1: 6/10 - Sat Feb 19 09:58:07 2022 Thread-2: 6/10 - Sat Feb 19 09:58:07 2022 Thread-2: 7/10 - Sat Feb 19 09:58:08 2022 Thread-1: 7/10 - Sat Feb 19 09:58:08 2022 Thread-3: 7/10 - Sat Feb 19 09:58:08 2022 Thread-3: 8/10 - Sat Feb 19 09:58:09 2022 Thread-2: 8/10 - Sat Feb 19 09:58:09 2022 Thread-1: 8/10 - Sat Feb 19 09:58:09 2022 Thread-3: 9/10 - Sat Feb 19 09:58:10 2022 Thread-1: 9/10 - Sat Feb 19 09:58:10 2022 Thread-2: 9/10 - Sat Feb 19 09:58:10 2022 Thread-1: 10/10 - Sat Feb 19 09:58:11 2022 Thread-3: 10/10 - Sat Feb 19 09:58:11 2022 Thread-2: 10/10 - Sat Feb 19 09:58:11 2022
6. Čekání na dokončení vlákna po zvolený časový interval, test, zda vlákno stále běží
V některých případech není možné či žádoucí čekat na dokončení nějakého vlákna po (alespoň teoreticky) neomezený čas. Příkladem mohou být vlákna, která mají obsluhovat připojení různých klientů – ve chvíli, kdy se klient odpojí (resp. přestane reagovat), trvá většinou nějaký čas, než se vlákno skutečně ukončí. Pro specifikaci maximální doby čekání na ukončení vlákna se používá metoda thread.Join, kterou již známe, pouze jí je nutné navíc předat parametr timeout:
t3.join(timeout=5)
Tato metoda ovšem programátory neinformuje, zda bylo vlákno ukončeno běžným způsobem, nebo zda stále běží. V případě, že je tato informace důležitá (už jen pro potřeby logování), je nutné se dotázat na stav vlákna metodou thread.is_alive:
if t3.is_alive(): print("wait timeout") else: print("t3 has finished")
Opět se podívejme na nepatrně upravený demonstrační příklad:
import threading import time def worker(threadName, delay, n): for counter in range(1, n+1): time.sleep(delay) print("{}: {}/{} - {}".format(threadName, counter, n, time.ctime(time.time()))) # vytvoření trojice vláken t1 = threading.Thread(target=worker, args=("Thread-1", 0.5, 10)) t2 = threading.Thread(target=worker, args=("Thread-2", 1.0, 10)) t3 = threading.Thread(target=worker, args=("Thread-3", 1.5, 10)) # spuštění všech vláken t1.start() t2.start() t3.start() # čekání na dokončení všech vláken t3.join(timeout=5) if t3.is_alive(): print("wait timeout") else: print("t3 has finished") t2.join() print("t2 has finished") t1.join() print("t1 has finished") print("Done!")
Příklad chování tohoto skriptu po jeho spuštění. Důležité zprávy jsou zvýrazněny:
Thread-1: 1/10 - Sun Feb 20 16:29:40 2022 Thread-2: 1/10 - Sun Feb 20 16:29:40 2022 Thread-1: 2/10 - Sun Feb 20 16:29:40 2022 Thread-3: 1/10 - Sun Feb 20 16:29:41 2022 Thread-1: 3/10 - Sun Feb 20 16:29:41 2022 Thread-2: 2/10 - Sun Feb 20 16:29:41 2022 Thread-1: 4/10 - Sun Feb 20 16:29:41 2022 Thread-1: 5/10 - Sun Feb 20 16:29:42 2022 Thread-2: 3/10 - Sun Feb 20 16:29:42 2022 Thread-3: 2/10 - Sun Feb 20 16:29:42 2022 Thread-1: 6/10 - Sun Feb 20 16:29:42 2022 Thread-1: 7/10 - Sun Feb 20 16:29:43 2022 Thread-2: 4/10 - Sun Feb 20 16:29:43 2022 Thread-1: 8/10 - Sun Feb 20 16:29:43 2022 Thread-3: 3/10 - Sun Feb 20 16:29:44 2022 Thread-1: 9/10 - Sun Feb 20 16:29:44 2022 wait timeout Thread-2: 5/10 - Sun Feb 20 16:29:44 2022 Thread-1: 10/10 - Sun Feb 20 16:29:44 2022 Thread-2: 6/10 - Sun Feb 20 16:29:45 2022 Thread-3: 4/10 - Sun Feb 20 16:29:45 2022 Thread-2: 7/10 - Sun Feb 20 16:29:46 2022 Thread-3: 5/10 - Sun Feb 20 16:29:47 2022 Thread-2: 8/10 - Sun Feb 20 16:29:47 2022 Thread-2: 9/10 - Sun Feb 20 16:29:48 2022 Thread-3: 6/10 - Sun Feb 20 16:29:48 2022 Thread-2: 10/10 - Sun Feb 20 16:29:49 2022 t2 has finished t1 has finished Done! Thread-3: 7/10 - Sun Feb 20 16:29:50 2022 Thread-3: 8/10 - Sun Feb 20 16:29:51 2022 Thread-3: 9/10 - Sun Feb 20 16:29:53 2022 Thread-3: 10/10 - Sun Feb 20 16:29:54 2022 (až nyní došlo k ukončení třetího vlákna)
7. Komunikace mezi vlákny
„Don't communicate by sharing memory; share memory by communicating.“
Vlákna spouštěná přes standardní balíček threading jsou provozována v rámci jediného virtuálního stroje Pythonu. Z tohoto důvodu je teoreticky možné pro komunikaci mezi vlákny použít nějaké objekty (nebo například slovník), jejichž reference se jednotlivým vláknům předá v parametrech. Nejedná se však ani zdaleka o nejrozumnější řešení, a to například i kvůli nutnosti zamykání přístupů k těmto objektům, NEatomičnosti změn atd. Navíc se taková ad-hoc komunikace poměrně špatně ladí. Výhodnější může být napodobení přístupu, který v současnosti zpopularizoval programovací jazyk Go – komunikovat s využitím kanálů. Kanály bez omezení kapacity, s omezenou kapacitou (popř. kapacitou nastavenou na jediný prvek) lze realizovat například s využitím synchronizovaných datových struktur z balíčku queue, které budou zmíněny v navazující kapitole.
8. Standardní synchronizované datové struktury z balíčku queue
V balíčku queue nalezneme následující datové struktury, které je možné použít pro zajištění komunikace mezi vlákny:
Struktura | Stručný popis |
---|---|
Queue | FIFO fronta s nastavitelnou kapacitou |
LifoQueue | nenechte se zmýlit názvem, jedná se o klasický zásobník |
PriorityQueue | prioritní fronta |
SimpleQueue | klasická FIFO fronta s neomezenou kapacitou |
Mezi základní podporované operace patří vložení prvku do fronty metodou put a získání prvku z fronty metodou get. Tyto operace jsou obecně blokující, tj. například pokud je fronta prázdná, bude operace get čekat na její naplnění alespoň jedním prvkem. Toho lze využít pro posílání úloh workerovi běžícímu v samostatném vláknu:
while True: job = q.get() print(f'Starting consuming {job}') ... ... ... q.task_done()
Vytváření úloh:
for job in range(10): print(f'Producing {job}') q.put(job)
9. Fronta ve funkci komunikačního i synchronizačního mechanismu
V následujícím demonstračním příkladu je ukázán známý systém producer-consumer, kde jak producent, tak i konzument každý běží v samostatném vláknu a komunikují spolu pouze přes sdílenou frontu q:
import time import threading import queue # vytvoření fronty q = queue.Queue() # simulace konzumenta def consumer(): while True: job = q.get() print(f'Starting consuming {job}') time.sleep(0.4) print(f'Consumed {job}') q.task_done() # spuštění konzumenta threading.Thread(target=consumer, daemon=True, name="první").start() # vytvoření úloh v producentovi for job in range(10): print(f'Producing {job}') q.put(job) # čekání na zpracování všech zpráv ve frontě q.join() print('Done')
Výsledek:
Producing 0 Producing 1 Producing 2 Producing 3 Producing 4 Producing 5 Producing 6 Producing 7 Producing 8 Starting consuming 0 Producing 9 Consumed 0 Starting consuming 1 Consumed 1 Starting consuming 2 Consumed 2 Starting consuming 3 Consumed 3 Starting consuming 4 Consumed 4 Starting consuming 5 Consumed 5 Starting consuming 6 Consumed 6 Starting consuming 7 Consumed 7 Starting consuming 8 Consumed 8 Starting consuming 9 Consumed 9 Done
Nic nám ovšem nebrání v tom, abychom spustili větší množství workerů, kteří se budou o práci dělit:
import time import threading import queue # vytvoření fronty q = queue.Queue() # simulace konzumenta def consumer(): name = threading.current_thread().name while True: job = q.get() print(f'{name} thread: Starting consuming {job}') time.sleep(0.4) print(f'{name} thread: Consumed {job}') q.task_done() # spuštění konzumentů threading.Thread(target=consumer, daemon=True, name="1st").start() threading.Thread(target=consumer, daemon=True, name="2nd").start() threading.Thread(target=consumer, daemon=True, name="3rd").start() # vytvoření úloh v producentovi for job in range(10): print(f'Producing {job}') q.put(job) # čekání na zpracování všech zpráv ve frontě q.join() print('Done')
Výsledek:
Producing 0 Producing 1 Producing 2 Producing 3 Producing 4 Producing 5 3rd thread: Starting consuming 0 Producing 6 1st thread: Starting consuming 2 Producing 7 2nd thread: Starting consuming 1 Producing 8 Producing 9 3rd thread: Consumed 0 1st thread: Consumed 2 3rd thread: Starting consuming 3 2nd thread: Consumed 1 1st thread: Starting consuming 4 2nd thread: Starting consuming 5 3rd thread: Consumed 3 3rd thread: Starting consuming 6 1st thread: Consumed 4 2nd thread: Consumed 5 1st thread: Starting consuming 7 2nd thread: Starting consuming 8 3rd thread: Consumed 6 3rd thread: Starting consuming 9 2nd thread: Consumed 8 1st thread: Consumed 7 3rd thread: Consumed 9
10. Klasický vzor producent–konzument
Producentů i konzumentů může být (prakticky) libovolné množství a navíc je možné je přidávat nebo ubírat na základě požadavků aplikace. V následujícím demonstračním příkladu spolu komunikují tři producenti a čtyři konzumenti:
import time import threading import queue # vytvoření fronty q = queue.Queue() # simulace producenta def producer(): name = threading.current_thread().name for job in range(10): print(f'{name} thread: Starting producing {job}') q.put(job) time.sleep(0.3) print(f'{name} thread: Produced {job}') # simulace konzumenta def consumer(): name = threading.current_thread().name while True: job = q.get() print(f'{name} thread: Starting consuming {job}') time.sleep(0.4) print(f'{name} thread: Consumed {job}') q.task_done() # spuštění konzumentů threading.Thread(target=consumer, daemon=True, name="1st").start() threading.Thread(target=consumer, daemon=True, name="2nd").start() threading.Thread(target=consumer, daemon=True, name="3rd").start() # spuštění producentů threading.Thread(target=producer, daemon=True, name="1st").start() threading.Thread(target=producer, daemon=True, name="2nd").start() threading.Thread(target=producer, daemon=True, name="3rd").start() threading.Thread(target=producer, daemon=True, name="3rd").start() # čekání na zpracování všech zpráv ve frontě q.join() print('Done')
S výsledkem:
1st thread: Starting producing 0 1st thread: Starting consuming 0 2nd thread: Starting producing 0 3rd thread: Starting producing 0 2nd thread: Starting consuming 0 3rd thread: Starting producing 0 3rd thread: Starting consuming 0 1st thread: Produced 0 1st thread: Starting producing 1 2nd thread: Produced 0 2nd thread: Starting producing 1 3rd thread: Produced 0 ... ... ... 1st thread: Starting consuming 8 2nd thread: Starting consuming 8 3rd thread: Consumed 8 3rd thread: Starting consuming 9 1st thread: Consumed 8 1st thread: Starting consuming 9 2nd thread: Consumed 8 2nd thread: Starting consuming 9 3rd thread: Consumed 9 3rd thread: Starting consuming 9 1st thread: Consumed 9 2nd thread: Consumed 9 3rd thread: Consumed 9 Done
11. Prioritní fronta
V některých situacích může být užitečné použít prioritní frontu realizovanou třídou queue.PriorityQueue. Prvky vkládané do takové fronty se přitom při výběru (logicky) řadí stejně, jako to dělá funkce sorted. V praxi to znamená, že do fronty lze vkládat libovolné prvky, ovšem typicky se jedná o dvojice (priorita,hodnota) (dvojice je základním typem Pythonu – tuple). Funkce sorted řadí dvojice nejdříve podle jejich prvního prvku, čímž je automaticky zajištěno řazení podle priority:
#!/usr/bin/env python3 import queue import random q = queue.PriorityQueue(40) for item in range(30): print("Size", q.qsize()) print("Empty?", q.empty()) print("Full?", q.full()) value = random.randint(1, 20) print(value) q.put("prvek # {:2d}".format(value)) while not q.empty(): print("Read item:", q.get())
Prvky budou vybrány v tomto pořadí:
Read item: prvek # 1 Read item: prvek # 3 Read item: prvek # 3 Read item: prvek # 4 Read item: prvek # 5 ... ... ... Read item: prvek # 20 Read item: prvek # 20 Read item: prvek # 20 Read item: prvek # 20
12. Jazyk Python a multiprocessing
V předchozích kapitolách jsme si popsali základní způsoby rozdělení výpočtů do většího množství vláken s využitím standardního balíčku nazvaného threading. Ovšem existují i další možnosti tvorby aplikací, jejichž části mají běžet buď „pouze“ souběžně nebo které využijí plnou paralelnost nabízenou moderními počítači. Vzhledem k existenci GILu ve standardním CPythonu je problematické zajistit paralelní běh v rámci jednoho procesu (tedy vlastně jednoho virtuálního stroje Pythonu), proto je dalším logickým krokem rozdělení (fork) tohoto procesu na větší množství plnohodnotných systémových procesů. Ty budou moci běžet nezávisle na sobě a navíc toto řešení programátora donutí k tomu, aby explicitně zajistil korektní komunikaci mezi těmito procesy – zde již nemůže dojít k chybám typu „přístup do objektu vlastněného jiným vláknem“ atd. Toto řešení přináší i některé nevýhody – větší systémové nároky, pomalejší spouštění procesů (v porovnání se spouštěním vláken) a v některých případech se projeví i delší doba přepínání mezi procesy (ovšem to se podle mého názoru u interpretovaného Pythonu ztratí) a někdy by se skutečně hodilo mít snadný přístup ke sdíleným objektům.
V případě, že se rozhodnete si vyzkoušet spouštění jednotlivých částí algoritmu v samostatných procesech, můžete využít další standardní modul (balíček), který se jmenuje příznačně multiprocessing. Tento modul vývojáře do značné míry odstiňuje od nízkoúrovňových operací, tedy od samotného rozvětvení procesu (fork), spuštění nového interpretru a specifikace, jaký kód má tento interpret použít. Z pohledu vývojáře je totiž použití modulu multiprocessing velmi přímočaré – pouze se zvolí, jaká funkce se má zavolat v novém procesu a jaké mají být této funkci předány argumenty. Navíc modul multiprocessing programátorům nabízí mechanismy umožňující komunikaci mezi procesy. Zejména se jedná o frontu (queue) (ovšem jinou frontu, než jsme doposud používali, i když její API je v některých ohledech prakticky totožné) a taktéž o oboustrannou rouru (pipe).
13. Spuštění nového procesu
Podívejme se nyní na velmi jednoduchý demonstrační příklad, na němž jsou ukázány základní mechanismy nabízené modulem multiprocessing. Ve skriptu je definována následující funkce, která se má spustit v samostatném procesu:
def worker(name): print("hello", name)
Nový proces se spustí následovně:
p = Process(target=worker, args=("foo",)) p.start()
Implementovat je možné i čekání na dokončení tohoto procesu:
p.join()
p = Process(target=worker, args=("foo",)).start()
Úplný zdrojový kód takto vytvořeného příkladu vypadá následovně:
from multiprocessing import Process def worker(name): print("hello", name) def main(): p = Process(target=worker, args=("foo",)) p.start() p.join() if __name__ == '__main__': print("Running main") main()
14. Spuštění většího množství procesů, čekání na dokončení těchto procesů
Samozřejmě můžeme spustit větší množství procesů a následně si (například nástrojem top nebo htop) tyto procesy zobrazit. Na dokončení procesů lze počkat metodou join – jedná se tedy o prakticky stejný koncept, jaký jsme viděli při práci s větším množstvím vláken:
from multiprocessing import Process import time def worker(name): print("hello", name) time.sleep(5) print("done", name) def main(): ps = [] for name in ("foo", "bar", "baz", "other"): p = Process(target=worker, args=(name,)) p.start() ps.append(p) for p in ps: p.join() if __name__ == '__main__': print("Running main") main()
Výpis procesů (včetně dvou nerelevantních procesů):
$ ps ax |grep python 767 ? Ssl 0:00 /usr/bin/python3 -Es /usr/sbin/firewalld --nofork --nopid 10864 pts/4 S+ 0:00 python3 multiprocessing2.py 10865 pts/4 S+ 0:00 python3 multiprocessing2.py 10866 pts/4 S+ 0:00 python3 multiprocessing2.py 10867 pts/4 S+ 0:00 python3 multiprocessing2.py 10868 pts/4 S+ 0:00 python3 multiprocessing2.py 10947 pts/6 S+ 0:00 grep --color=auto python
Zvýraznění forku:
$ pstree -c 10864 python3─┬─python3 ├─python3 ├─python3 └─python3
15. Zjednodušení předchozího demonstračního příkladu
Jen pro úplnost si ukažme, jak je možné předchozí demonstrační příklad nepatrně zjednodušit použitím generátorové notace seznamu:
from multiprocessing import Process import time def worker(name): print("hello", name) time.sleep(5) print("done", name) def main(): ps = [Process(target=worker, args=(name,)) for name in ("foo", "bar", "baz", "other")] for p in ps: p.start() for p in ps: p.join() if __name__ == '__main__': print("Running main") main()
#!/usr/bin/env python3 from multiprocessing import Process import time def worker(name, dictionary): dictionary[name] = "ok" print("hello", name) time.sleep(5) print("done", name) def main(): d = {} ps = [] for name in ("foo", "bar", "baz", "other"): p = Process(target=worker, args=(name, d)) p.start() ps.append(p) for p in ps: p.join() print(d) if __name__ == "__main__": print("Running main") main()
Tento skript na závěr vypíše prázdný slovník, i když by „měl“ obsahovat čtyři záznamy:
Running main hello foo hello bar hello baz hello other done foo done bar done baz done other {}
16. Komunikace mezi procesy přes multiprocessing.Queue
I pro komunikaci mezi procesy, podobně jako pro komunikaci mezi vlákny, lze použít frontu. V tomto případě je ovšem její interní reprezentace zcela odlišná, protože musí zajistit meziprocesovou komunikaci, a to s využitím prostředků poskytovaných operačním systémem (například rourami). Namísto standardní struktury queue.Queue (ta je funkční jen v rámci jediného virtuálního stroje Pythonu) je tedy nutné použít strukturu multiprocessing.Queue, která se z pohledu programátora chová velmi podobně.
V následujícím demonstračním příkladu posíláme workerům, které jsou spuštěny v samostatných procesech, úlohy/příkazy, mezi jinými i příkaz pro jejich ukončení (tento příkaz je nutné poslat třikrát, aby dokázal zareagovat každý worker):
from multiprocessing import Process, Queue import time def worker(name, q): while True: cmd = q.get() print(name, cmd) if cmd == "quit": print("Quitting") return time.sleep(1) def main(): q = Queue() ps = [Process(target=worker, args=(name, q)) for name in ("foo", "bar", "baz")] for p in ps: p.start() for i in range(10): q.put("command {}".format(i)) for i in range(3): q.put("quit") for p in ps: p.join() if __name__ == '__main__': print("Running main") main()
17. Komunikace mezi procesy přes obousměrnou rouru (multiprocessing.Pipe)
Alternativně lze pro meziprocesovou komunikaci použít obousměrnou routu (pipe) s metodami send a recv. V následujícím demonstračním příkladu je spuštěn pouze jeden další proces a proto si vystačíme s jedinou rourou. Druhému procesu posíláme příkazy/úlohy, na které odpovídá (simulace práce) a taktéž příkaz quit, kterým se tento proces ukončí:
from multiprocessing import Process, Pipe import time def worker(name, conn): while True: cmd = conn.recv() print("{} received {}".format(name, cmd)) if cmd == "quit": return else: conn.send("{} accepted {}".format(name, cmd)) time.sleep(1) def main(): parent_conn, child_conn = Pipe() p = Process(target=worker, args=("Worker", child_conn)) p.start() for i in range(10): parent_conn.send("command {}".format(i)) print(parent_conn.recv()) parent_conn.send("quit") p.join() if __name__ == '__main__': main()
18. Spouštění a řízení paralelně běžících úloh – concurrent.futures
V navazujícím článku si mj. popíšeme i třídu ThreadPoolExecutor, kterou lze s výhodou využít ve chvíli, kdy se mají často spouštět různé asynchronně běžící úlohy:
from concurrent.futures.thread import ThreadPoolExecutor import time def worker(threadName, delay, n): for counter in range(1, n + 1): time.sleep(delay) print("{}: {}/{} - {}".format(threadName, counter, n, time.ctime(time.time()))) with ThreadPoolExecutor(max_workers=3) as executor: executor.submit(worker, "Thread-1", 0.5, 10) executor.submit(worker, "Thread-2", 1.0, 10) executor.submit(worker, "Thread-3", 1.5, 10) print("Done!")
Naplánování deseti úloh, z nichž ovšem souběžně poběží maximálně pouze tři:
from concurrent.futures.thread import ThreadPoolExecutor import time def worker(threadName, delay, n): for counter in range(1, n + 1): time.sleep(delay) print("{}: {}/{} - {}".format(threadName, counter, n, time.ctime(time.time()))) print("{}: DONE!".format(threadName)) workers = 10 with ThreadPoolExecutor(max_workers=3) as executor: for w in range(workers): executor.submit(worker, "Thread-{}".format(w + 1), 0.5 + w / 10.0, 10) print("Done!")
19. Repositář s demonstračními příklady
Zdrojové kódy všech dnes popsaných demonstračních příkladů určených pro programovací jazyk Python 3 byly uloženy do Git repositáře dostupného na adrese https://github.com/tisnik/most-popular-python-libs. V případě, že nebudete chtít klonovat celý repositář (ten je ovšem stále velmi malý, dnes má velikost zhruba několik desítek kilobajtů), můžete namísto toho použít odkazy na jednotlivé příklady, které naleznete v následující tabulce:
20. Odkazy na Internetu
- Dokumentace Pythonu: balíček queue
https://docs.python.org/3/library/queue.html - Dokumentace Pythonu: balíček threading
https://docs.python.org/3/library/threading.html? - Dokumentace Pythonu: balíček multiprocessing
https://docs.python.org/3/library/multiprocessing.html - Dokumentace Pythonu: balíček asyncio
https://docs.python.org/3/library/asyncio.html - Synchronization Primitives
https://docs.python.org/3/library/asyncio-sync.html - Coroutines
https://docs.python.org/3/library/asyncio-task.html - Queues
https://docs.python.org/3/library/asyncio-queue.html - python-csp
https://python-csp.readthedocs.io/en/latest/ - TrellisSTM
http://peak.telecommunity.com/DevCenter/TrellisSTM - Python Multithreading and Multiprocessing Tutorial
https://www.toptal.com/python/beginners-guide-to-concurrency-and-parallelism-in-python