Obsah
1. Podpora programovacího paradigmatu založeného na tocích (dat) knihovnou glow
2. Krátké zopakování: programovací paradigma založené na tocích (dat)
5. Ukázka toku Reduce→Map (nejjednodušší možný praktický příklad)
6. Realizace celého příkladu; výsledek výpočtu
7. Ukázka toku Reduce→Map, využití anonymních funkcí
8. Nepatrně složitější graf toku: Map→Reduce→Map
10. Využití operace Partition zajišťující distribuci vstupních dat
11. Import takzvaného driveru; podpora pro vizualizaci grafu toku
12. Bezproblémové použití odlišných typů dat v jednotlivých hranách grafu toku
13. Vliv parametru předaného operaci Partition na průběh výpočtu
14. Načtení a zpracování dat uložených v textovém souboru
15. Načítání strukturovaných souborů
16. Použití uživatelsky definovaných datových typů v celém řetězci zpracování
18. Problematika operace Reduce při agregaci výsledků
19. Repositář s demonstračními příklady
1. Podpora programovacího paradigmatu založeného na tocích (dat) knihovnou glow
V dnešním článku, jenž nepřímo navazuje na článek Programovací paradigma založené na tocích (dat) a knihovna goflow, si popíšeme knihovnu nazvanou glow. Jedná se o knihovnu podporující FBP neboli Flow-Based Programming v programovacím jazyku Go. Toto paradigma se poněkud odlišuje od dnes již klasického paradigmatu dataflow a zcela se odlišuje od imperativního programování. Některé společné vlastnosti ale i rozdíly mezi jednotlivými paradigmaty jsou shrnuty v následující tabulce:
Vlastnost | Flow-based | Dataflow | Imperativní |
---|---|---|---|
Základní myšlenka | data proudí mezi komponentami, které data transformují specifikovanými operacemi | data proudí mezi komponentami, které data transformují specifikovanými operacemi | operace jsou spouštěny nad daty v určitém přesně uvedeném pořadí a za určitých podmínek |
Programovací model | graf s propojenými komponentami | graf s propojenými komponentami | sekvence instrukcí popisujících jednotlivé operace i jejich návaznost |
Vizualizace modelu | graf toku dat | graf toku dat | vývojový diagram |
Reprezentace modelu | vizuální (graf) + zdrojový kód komponent | typicky vizuální | prakticky vždy formou zdrojového kódu |
Paralelizace | implicitní (všechny operace mohou běžet paralelně) |
implicitní nebo explicitní | explicitní (paralelní běh musí být korektně popsán a naprogramován) |
Model řízení | aktivní (komponenty si řídí svůj životní cyklus) |
reaktivní (komponenty jsou aktivovány externími událostmi) |
aktivní (komponenty si řídí svůj životní cyklus) |
Obrázek 1: Příklad jednoduchého grafu s několika uzly, které si předávají data.
2. Krátké zopakování: programovací paradigma založené na tocích (dat)
„The real world is asynchronous: don't try to force everything into a synchronous framework derived from the architecture of early computers.“
J. Paul Morrison, inventor/discoverer of Flow-Based Programming
Zopakujme si nyní v krátkosti základní informace o programovacím paradigmatu založenému na tocích dat, které se v angličtině označuje zkratkou FBP znamenající Flow-based programming. Jedná se o paradigma, v němž se aplikace definují jako sítě procesů – „černé skříňky“ –, které si mezi sebou posílají data předáváním zpráv předem definovanými cestami (tedy nikoli „náhodně“ či libovolně, jako je tomu například v klasickém OOP). Tyto procesy neboli černé skříňky lze prakticky libovolně propojovat a vytvářet tak různé aplikace, aniž by bylo nutné je (tedy ony procesy) interně nějakým způsobem modifikovat. FBP je tedy paradigma, které zcela přirozeně vede k tvorbě více či méně samostatně pracujících komponent.
V FBP se upouští od klasického (řekněme von Neumannovského) pohledu na aplikace jako na sekvenční proces. Namísto toho se aplikace modelují jako síť asynchronně běžících procesů, které si předávají data přes buffery (resp. přes fronty). Každý proces v FBP provádí činnost ve chvíli, kdy dostává vstupní data a díky oddělení procesů a jejich nezávislosti na ostatních procesech vlastně zcela zadarmo získáme plnou podporu pro paralelní (a samozřejmě i souběžný) běh aplikace jako celku – o to bez nutnosti explicitní práce s mutexy či s dalšími synchronizačními mechanismy.
Základními stavebními prvky při návrhu aplikace při použití paradigmatu FBP jsou komponenty. Interně se typicky jedná o třídy, struktury+metody (Go), uzávěry (closure) či někdy o obyčejné funkce popř. o funkce se statickými proměnnými (pokud tento koncept daný programovací jazyk vůbec podporuje). Instance komponent umístěné do grafu/sítě se nazývají procesy. Z jedné komponenty lze pochopitelně odvodit větší množství procesů.
Jednotlivé procesy spolu nekomunikují přímo (voláním), ale příjmem dat na vstupních portech popř. posíláním dat na porty výstupní. Jednotlivá spojení (connection) mohou obsahovat buffery resp. přesněji řečeno fronty (FIFO, queue), typicky s předem nastavenou kapacitou (tedy maximálním počtem v dané chvíli čekajících zpráv). Některá propojení mohou mít kapacitu nastavenou na nulu; zde na sebe budou procesy čekat a navzájem se tak do jisté míry synchronizovat – posílání či příjem dat tedy může být blokující operací.
3. Knihovna glow
Knihovna glow, kterou se budeme v dnešním článku zabývat, celý koncept nepatrně upravuje, a to takovým způsobem, že komponentami jsou běžné anonymní nebo pojmenované funkce (popř. uzávěry – closures), namísto portů se přímo pracuje s parametry funkcí (vstupní porty) a návratovými hodnotami funkcí (výstupní porty). Navíc glow, na rozdíl od mnoha dalších podobně koncipovaných knihoven, umožňuje automatické odvození typů předávaných dat na základě typů parametrů a návratových hodnot. Výsledné programy tedy vypadají dosti odlišně od klasických FBP programů, jak ostatně uvidíme v navazujících kapitolách.
4. Instalace knihovny glow
Instalace této knihovny je jednoduchá. Postačuje si příkazem:
$ go mod init glow-test-1
vytvořit nový projektový soubor, který by měl vypadat následovně:
module glow-test-1 go 1.18
Dále ve stejném adresáři vytvoříme kostru programu:
package main import ( "flag" "fmt" "github.com/chrislusf/glow/flow" ) func main() { flag.Parse() flow.New() }
Po zadání příkazu:
$ go get .
by se měly nainstalovat všechny potřebné balíčky:
go: downloading github.com/chrislusf/glow v0.0.0-20181102060906-4c40a2717eee go: downloading github.com/psilva261/timsort v1.0.0 go: added github.com/chrislusf/glow v0.0.0-20181102060906-4c40a2717eee go: added github.com/psilva261/timsort v1.0.0
A projektový soubor by se měl změnit do podoby:
module glow-test-1 go 1.18 require ( github.com/chrislusf/glow v0.0.0-20181102060906-4c40a2717eee // indirect github.com/psilva261/timsort v1.0.0 // indirect )
module glow-test-1 go 1.18 require ( github.com/chrislusf/glow v0.0.0-20181102060906-4c40a2717eee // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/psilva261/timsort v1.0.0 // indirect google.golang.org/protobuf v1.26.0 // indirect )
5. Ukázka toku Reduce→Map (nejjednodušší možný praktický příklad)
Ukažme si nyní nějaký reálněji pojatý příklad, v němž využijeme alespoň základní možnosti nabízené knihovnou Glow. Naprogramujeme (jinými slovy vytvoříme graf) následující operace:
- Vstupem bude sekvence celočíselných hodnot od 1 do 10.
- Prvky z této sekvence budou postupně sečteny.
- Výsledek součtu bude vypsán na standardní výstup.
Graf tedy bude obsahovat trojici uzlů. Prvním z těchto uzlů je sekvence celočíselných hodnot od 1 do 10. Ta se v knihovně Glow tvoří metodou Slice:
Slice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
V dalším uzlu je tato sekvence postupně redukována operací Reduce. Ta nejprve zpracuje první dvě hodnoty a vypočítá mezivýsledek. Tento postupně akumulovaný mezivýsledek je použit společně s dalším prvkem ve vstupní sekvenci pro další výpočet atd. atd. až je celá vstupní sekvence (resp. v našem případě řez) zredukován na jedinou výslednou hodnotu:
Reduce(sum)
A konečně v posledním uzlu je výsledek součtu vypsán na standardní výstup:
Map(printSum)
Sestrojení celého grafu bude vypadat následovně – povšimněte si, že za New se volají metody objektu představujícího celý graf:
flow. New(). Slice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}). Reduce(sum). Map(printSum)
Graf můžeme nejen vytvořit, ale i „spustit“, a to zavoláním metody Run:
flow. New(). Slice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}). Reduce(sum). Map(printSum).Run()
6. Realizace celého příkladu; výsledek výpočtu
V definici grafu jsme použili dvojici funkcí sum a printSum. Tyto dvě funkce samozřejmě musíme implementovat, a to konkrétně následujícím způsobem:
func sum(x int, y int) int { return x + y } func printSum(x int) { fmt.Println("Sum:", x) }
Povšimněte si, že se jedná o jedinou část celého programu (resp. definice grafu), v níž pracujeme s datovými typy. Kontrolu datových typů, tedy zda jsou funkci sum skutečně předána celá čísla a návratová hodnota této funkce odpovídá typu parametru funkce printSum, automaticky provádí knihovně Glow.
Úplný zdrojový kód tohoto demonstračního příkladu vypadá následovně:
package main import ( "flag" "fmt" "github.com/chrislusf/glow/flow" ) func sum(x int, y int) int { return x + y } func printSum(x int) { fmt.Println("Sum:", x) } func main() { flag.Parse() flow. New(). Slice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}). Reduce(sum). Map(printSum).Run() }
Po překladu a spuštění tohoto demonstračního příkladu by se měl na standardní výstup vypsat výsledek součtu všech prvků ze vstupního řezu:
$ go run main.go Sum: 55
7. Ukázka toku Reduce→Map, využití anonymních funkcí
Mnohdy je programový kód, který se v rámci jednotlivých uzlů grafu spouští, velmi krátký. To je ostatně i náš případ, protože těla funkcí sum a printSum obsahují jediný příkaz. V takovém případě může být výhodnější namísto pojmenovaných funkcí použít funkce anonymní, jejichž těla budou zapsána přímo v definici grafu:
New(). Slice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}). Reduce(func(x int, y int) int { return x + y }). Map(func(x int) { println("sum:", x) })
Podívejme se nyní na kód takto upraveného příkladu, v němž jsou opět knihovnou Glow kontrolovány datové typy parametrů funkcí i návratových hodnot (tedy vstupů a výstupů uzlů grafu):
package main import ( "flag" "github.com/chrislusf/glow/flow" ) func main() { flag.Parse() flow. New(). Slice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}). Reduce(func(x int, y int) int { return x + y }). Map(func(x int) { println("sum:", x) }). Run() }
Výsledek získaný po překladu a spuštění by měl být totožný s předchozím příkladem, o čemž se můžeme velmi snadno přesvědčit:
$ go run main.go sum: 55
8. Nepatrně složitější graf toku: Map→Reduce→Map
Velká síla a potenciální užitečnost FBP (a vlastně i knihovny Glow) spočívá v tom, že funkce vyššího řádu Map může být aplikována souběžně (a v ideálním případě i paralelně) na velký počet vstupních prvků. Tuto vlastnost jsme prozatím nevyužili, protože se v Map pouze vytiskla jediná hodnota na standardní výstup. Ovšem nic nám nebrání v tom upravit si původní příklad tak, že se všechny prvky ze vstupního řezu souběžně zdvojnásobí, tj. interně vlastně vznikne nový řez dále zpracovaný operací Reduce. Nová podoba definice grafu bude vypadat následovně (první funkce Map je aplikována paralelně):
Slice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}). Map(func(value int) int { return value * 2 }). Reduce(func(x int, y int) int { return x + y }). Map(func(x int) { println("sum:", x) }).
Podívejme se nyní na takto upravený kód demonstračního příkladu:
package main import ( "flag" "github.com/chrislusf/glow/flow" ) func main() { flag.Parse() flow. New(). Slice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}). Map(func(value int) int { return value * 2 }). Reduce(func(x int, y int) int { return x + y }). Map(func(x int) { println("sum:", x) }). Run() }
V případě, že přeložíme a spustíme tento demonstrační příklad, měla by být výsledkem hodnota 110 a nikoli 55. Je to logické, protože se nesčítají hodnoty z číselné řady 1, 2, 3, … 10, ale z řady 2, 4, 6, … 20:
$ go run main.go sum: 110
9. Metoda Partition
Operace Map, s níž jsme se setkali v předchozích kapitolách, může být na vstupní data aplikována paralelně. Nejdříve je ovšem nutné tato vstupní data vhodným způsobem rozdělit do více skupin a následně každou skupinu skutečně paralelně zpracovat. K tomuto účelu slouží metoda Partition, které se předá počet oddílů, do kterých se vstupní data rozdělí. Pokud například zadáme Partition(3), budou vstupní data rozdělena do tří oddílů atd.
Jak jsou však vstupní data rozdělena? Existují dvě možnosti – buď se rozdělí na základě klíče nebo na základě hešovacího kódu. Rozdělení podle klíče se provádí ve chvíli, kdy prvky ve vstupních datech mají podobu struktury, řezu nebo pole (tedy ne, že prvky jsou uloženy v řezu či poli, ale skutečně je každý prvek řezem či polem). V tomto případě je klíčem první prvek ve struktuře/prvku poli. Pokud vstupní prvky mají odlišný datový typ (typicky celé číslo, řetězec apod.), vypočítá se hešovací hodnota těchto prvků a na základě této hodnoty se prvky rozdělí do oddílů.
10. Využití operace Partition zajišťující distribuci vstupních dat
Operaci Partition, která byla popsána v předchozí kapitole, je možné v praxi použít velmi snadno. Například můžeme vstupní data (v našem případě představovaná řezem hodnot) rozdělit do tří oddílů zavoláním metody Partition(3). To mj. znamená, že navazující operace Map bude běžet souběžně (a většinou i paralelně), a to bez nutnosti explicitní práce s vlákny či gorutinami:
New(). Slice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}). Partition(3). Map(func(value int) int { return value * 2 }). Reduce(func(x int, y int) int { return x + y }). Map(func(x int) { println("sum:", x) }).
Úplný program, který před zpracováním dat operací Map rozdělí data do tří oddílů, vypadá takto:
package main import ( "flag" "github.com/chrislusf/glow/flow" ) func main() { flag.Parse() flow. New(). Slice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}). Partition(3). Map(func(value int) int { return value * 2 }). Reduce(func(x int, y int) int { return x + y }). Map(func(x int) { println("sum:", x) }). Run() }
Výsledek získaný tímto programem by se neměl lišit od předchozích výsledků, což si snadno otestujeme:
$ go run main.go sum: 110
11. Import takzvaného driveru; podpora pro vizualizaci grafu toku
Graf, který je knihovnou Glow zkonstruován ještě předtím, než proběhnou výpočty, může být kvůli operacím Partition a Reduce mnohdy i velmi složitý. Proto by bylo vhodné dokázat si takový graf vizualizovat. I tuto funkcionalitu knihovna Glow programátorům nabízí. V samotném zdrojovém kódu je nejprve nutné provést nepatrnou změnu – musí se naimportovat balíček glow/driver, a to následujícím způsobem:
imported and not used: "github.com/chrislusf/glow/driver"
Vzhledem k tomu, že se používá jmenný alias „_“, není možné přistupovat k symbolům (konstantám, proměnným, funkcím) definovaným v tomto balíčku, což ovšem ani nebudeme potřebovat. Důležitější však je, že nám překladač povolí provést import tohoto balíčku, aniž by nahlásil chybu:
imported and not used: "fmt"
Výsledný zdrojový kód by měl vypadat následovně:
package main import ( "flag" _ "github.com/chrislusf/glow/driver" "github.com/chrislusf/glow/flow" ) func main() { flag.Parse() flow. New(). Slice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}). Partition(3). Map(func(value int) int { return value * 2 }). Reduce(func(x int, y int) int { return x + y }). Map(func(x int) { println("sum:", x) }). Run() }
Tento program přeložíme příkazem:
$ go build main.go
Díky tomu, že je importován balíček glow/driver, začne přeložený program nabízet velké množství nových (dříve nedostupných) voleb, které lze zobrazit přepínačem –help:
$ ./glow-test-5 --help
Mezi nově dostupnými volbami se mj. nachází i volba -glow.flow.plot, která je na výpisu zvýrazněna:
Usage of ./main: -ca.file string A PEM eoncoded CA's certificate file -cert.file string A PEM eoncoded certificate file -glow start in driver mode -glow.agent.address string agent hostname:port -glow.channel.bufferSize int channel buffer size for reading inputs -glow.dataCenter string preferred data center name -glow.driver.host string driver runs on this host address. Required in 2-way SSL mode. -glow.driver.port int driver listens on this port to copy files to agents. Required to specify and open this port. -glow.exe.hash string hash of executable binary file -glow.flow.bid float total bid price in a flow to compete for resources (default 100) -glow.flow.id int flow id (default -1) -glow.flow.plot print out task group flow in graphviz dot format -glow.flow.stat show flow details at the end of execution -glow.leader string leader server (default "localhost:8930") -glow.module string a name to group related jobs together on agent -glow.rack string preferred rack name -glow.related.files string ':' separated list of files -glow.request.id uint request id received from agent -glow.task.memoryMB int request one task memory size in MB (default 64) -glow.task.name string name of first task in the task group -glow.taskGroup.id int task group id (default -1) -glow.taskGroup.inputs string comma and @ seperated input locations -key.file string
Volbou –glow.flow.plot je možné si nechat vyexportovat popis grafu ve formátu kompatibilním s nástrojem dot (viz též článek Tvorba grafů a diagramů s využitím doménově specifického jazyka nástroje Graphviz:
$ ./glow-test-5 --glow --glow.flow.plot > plot.gv
Vygenerovaný soubor s popisem grafu je plně čitelný:
digraph glow { subgraph group_0{ node [style=filled,color=white]; style=filled; color=lightgrey; Input0 -> Partition_scatter1; label = "group_0"; } subgraph group_1{ node [style=filled,color=white]; style=filled; color=lightgrey; Partition_collect2_0_3 -> Map3_0_3 -> LocalReduce4_0_3; label = "group_1"; } subgraph group_2{ node [style=filled,color=white]; style=filled; color=lightgrey; Partition_collect2_1_3 -> Map3_1_3 -> LocalReduce4_1_3; label = "group_2"; } subgraph group_3{ node [style=filled,color=white]; style=filled; color=lightgrey; Partition_collect2_2_3 -> Map3_2_3 -> LocalReduce4_2_3; label = "group_3"; } subgraph group_4{ node [style=filled,color=white]; style=filled; color=lightgrey; MergeReduce5 -> Map6; label = "group_4"; } input0 [shape=doublecircle]; input0 -> Input0; Partition_scatter1 -> d1_0; Partition_scatter1 -> d1_1; Partition_scatter1 -> d1_2; d1_0 -> Partition_collect2_0_3; LocalReduce4_0_3 -> d4_0; d1_1 -> Partition_collect2_1_3; LocalReduce4_1_3 -> d4_1; d1_2 -> Partition_collect2_2_3; LocalReduce4_2_3 -> d4_2; d4_0 -> MergeReduce5; d4_1 -> MergeReduce5; d4_2 -> MergeReduce5; Map6 -> end; center=true; compound=true; end [shape=Msquare]; }
Následně si necháme graf vytisknout do rastrového formátu PNG:
$ dot -Tpng plot.gv > plot.png
S tímto výsledkem:
Obrázek 2: Vizualizace grafu popsaného v pátém demonstračním příkladu.
12. Bezproblémové použití odlišných typů dat v jednotlivých hranách grafu toku
Pravděpodobně nejdůležitější vlastností knihovny Glow oproti konkurenčním knihovnám je automatické odvození a kontrola datových typů prvků, které „proudí“ grafem. Podívejme se na tento popis grafu, z něhož bude vše zřejmé:
Slice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}). Partition(3). Map(func(value int) string { return strconv.Itoa(value) }). Reduce(func(x string, y string) string { return x + "," + y }). Map(func(x string) { println("joined:", x) }).
Z tohoto popisu grafu je patrné, že operace Map je realizována funkcí akceptující celé číslo a produkující řetězec. Následuje operace Reduce, která naopak akceptuje dvojici řetězců a produkuje další řetězec. A konečně poslední operace Map akceptuje řetězec. Tok mezi Map, Reduce a dalším Map je kontrolován při konstrukci grafu, tj. například není možné změnit typ návratové hodnoty u realizace první operace Map, aniž by se adekvátním způsobem změnila hlavička funkce realizující operaci Reduce atd.
Celý zdrojový kód tohoto demonstračního příkladu vypadá následovně:
package main import ( "flag" "strconv" _ "github.com/chrislusf/glow/driver" "github.com/chrislusf/glow/flow" ) func main() { flag.Parse() flow. New(). Slice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}). Partition(3). Map(func(value int) string { return strconv.Itoa(value) }). Reduce(func(x string, y string) string { return x + "," + y }). Map(func(x string) { println("joined:", x) }). Run() }
Z výsledků získaných po spuštění tohoto příkladu můžeme odvodit, jak se vlastně prvky ze vstupu rozdělily do tří oddílů:
joined: 2,5,8,3,6,9,1,4,7,10
První oddíl obsahoval prvky 2, 5 a 8, druhý oddíl prvky 3, 6 a 9 a třetí oddíl prvky 1, 4, 7 a 10.
Obrázek 3: Vizualizace grafu popsaného v šestém demonstračním příkladu.
Co se ovšem stane ve chvíli, kdy datové typy v grafu „nesedí“, tj. když jeden uzel posílá jinému uzlu data v neočekávaném formátu? Můžeme si to snadno vyzkoušet. Povšimněte si označené části kódu s nesprávným typem vstupního parametru:
flow. New(). Slice([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}). Partition(3). Map(func(value int) string { return strconv.Itoa(value) }). Reduce(func(x int, y string) string { return strconv.Itoa(x) + "," + y }). Map(func(x string) { println("joined:", x) }). Run()
Tato chyba bude detekována, ale nikoli při překladu (překladač jazyka Go totiž nemá možnosti, jak odvozené typy kontrolovat), ale při spuštění a konstrukci grafu:
panic: reflect: Call using string as type int goroutine 32 [running]: reflect.Value.call({0x8214a0?, 0x8d9588?, 0x13?}, {0x8a1976, 0x4}, {0xc00020ef08, 0x2, 0x2?}) /opt/go/src/reflect/value.go:411 +0x19ff reflect.Value.Call({0x8214a0?, 0x8d9588?, 0x0?}, {0xc0001ccf08, 0x2, 0x2}) /opt/go/src/reflect/value.go:339 +0xbf github.com/chrislusf/glow/flow.(*Dataset).LocalReduce.func1(0x0?) /home/ptisnovs/go/pkg/mod/github.com/chrislusf/glow@v0.0.0-20181102060906-4c40a2717eee/flow/dataset_reduce.go:28 +0x20b github.com/chrislusf/glow/flow.(*Task).RunTask(0xc00012ea80) /home/ptisnovs/go/pkg/mod/github.com/chrislusf/glow@v0.0.0-20181102060906-4c40a2717eee/flow/step_task.go:33 +0x2a github.com/chrislusf/glow/flow.(*Step).RunStep.func1(0x0?, 0x0?) /home/ptisnovs/go/pkg/mod/github.com/chrislusf/glow@v0.0.0-20181102060906-4c40a2717eee/flow/step.go:22 +0x57 created by github.com/chrislusf/glow/flow.(*Step).RunStep /home/ptisnovs/go/pkg/mod/github.com/chrislusf/glow@v0.0.0-20181102060906-4c40a2717eee/flow/step.go:20 +0x4e panic: reflect: Call using string as type int
13. Vliv parametru předaného operaci Partition na průběh výpočtu
Zdrojový kód z předchozí kapitoly dokáže díky použití řetězců (které se spojují zcela odlišným způsobem, než se sčítají čísla – operace spojení například není komutativní) naznačit, jakým způsobem je vlastně vstupní řez s hodnotami rozdělen do oddílů na základě parametru předaného operaci Partition. V dalším demonstračním příkladu postupně zpracujeme 24 prvků, přičemž dojde k rozdělení těchto prvků na 1, 2, 4, 6, 8, 12 a 24 oddílů:
package main import ( "flag" "strconv" _ "github.com/chrislusf/glow/driver" "github.com/chrislusf/glow/flow" ) func newRangeSlice(sliceLength int) []int { result := make([]int, sliceLength) for i := 0; i < sliceLength; i++ { result[i] = i + 1 } return result } func compute(sliceLength int, partitions int) { flow. New(). Slice(newRangeSlice(sliceLength)). Partition(partitions). Map(func(value int) string { return strconv.Itoa(value) }). Reduce(func(x string, y string) string { return x + "," + y }). Map(func(x string) { println("joined:", x) }). Run() } func main() { flag.Parse() compute(24, 1) compute(24, 2) compute(24, 3) compute(24, 4) compute(24, 6) compute(24, 8) compute(24, 12) }
V získaných výsledcích můžeme vypozorovat způsob rozdělení prvků ze vstupu od oddílů. Každý lichý oddíl je zvýrazněn:
joined: 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24 joined: 2,4,6,8,10,12,14,16,18,20,22,24,1,3,5,7,9,11,13,15,17,19,21,23 joined: 2,5,8,11,14,17,20,23,1,4,7,10,13,16,19,22,3,6,9,12,15,18,21,24 joined: 3,7,11,15,19,23,2,6,10,14,18,22,1,5,9,13,17,21,4,8,12,16,20,24 joined: 2,8,14,20,3,9,15,21,5,11,17,23,6,12,18,24,1,7,13,19,4,10,16,22 joined: 7,15,23,8,16,24,6,14,22,1,9,17,3,11,19,2,10,18,5,13,21,4,12,20 joined: 12,24,11,23,5,17,8,20,6,18,3,15,2,14,4,16,7,19,1,13,9,21,10,22
14. Načtení a zpracování dat uložených v textovém souboru
Prozatím jsme v předchozích demonstračních příkladech získávali vstupní prvky pro graf z řezu (slice) obsahujícího celočíselné hodnoty. V praxi se však setkáme s tím, že vstupní data jsou uložena v databázích, čteny z Apache Kafka, nebo jsou načítány ze souborů. Knihovna Glow některé výše zmíněné datové zdroje přímo podporuje, což samozřejmě zjednodušuje návrh grafů a taktéž se uživatel-programátor nemusí starat například o prealokaci paměti (což u objemnějších dat není možné) atd.
Pravděpodobně nejjednodušší je načítání dat z textových souborů, protože pro tento účel slouží metoda TextFile. Této metodě se předává jméno souboru se vstupními daty (každý záznam je uložen na samostatném řádku) a taktéž počet oddílů, do kterých se vstupní data rozdělí:
package main import ( "flag" "strconv" _ "github.com/chrislusf/glow/driver" "github.com/chrislusf/glow/flow" ) func main() { flag.Parse() flow. New(). TextFile("data.txt", 1). Map(func(line string) int { value, _ := strconv.Atoi(line) return value }). Reduce(func(x int, y int) int { return x + y }). Map(func(x int) { println("sum:", x) }). Run() }
Obrázek 4: Vizualizace grafu popsaného v osmém demonstračním příkladu.
V případě, že bude vstupní datový soubor obsahovat tyto záznamy:
1 10 100 1000 10000 100000 1000000 10000000
Bude výsledkem výpočtu hodnota:
$ go run main.go sum: 11111111
Vstupní data můžeme samozřejmě rozdělit do většího množství oddílů:
package main import ( "flag" "strconv" _ "github.com/chrislusf/glow/driver" "github.com/chrislusf/glow/flow" ) func main() { flag.Parse() flow. New(). TextFile("data.txt", 3). Map(func(line string) int { value, _ := strconv.Atoi(line) return value }). Reduce(func(x int, y int) int { return x + y }). Map(func(x int) { println("sum:", x) }). Run() }
Výpočet bude totožný, ovšem lišit se bude graf i průběh výpočtu:
Obrázek 5: Vizualizace grafu popsaného v osmém demonstračním příkladu.
15. Načítání strukturovaných souborů
Soubor obsahující na každém řádku jedinou číselnou hodnotu samozřejmě není typickým příkladem toho, jak datové soubory v praxi vypadají. Zkusme si tedy zpracovat soubor s nepatrně složitější strukturou. Bude se jednat o textový soubor, který na každém řádku obsahuje jednoho vítěze známé Turingovy ceny v daném roce (z mnoha vítězů jsem vybral své favority):
1983 Ken Thompson 1983 Dennis Ritchie 1988 Ivan Sutherland 1979 Kenneth Iverson 1989 William Kahan 1977 John Backus
16. Použití uživatelsky definovaných datových typů v celém řetězci
Každý záznam z textového souboru popsaného v předchozí kapitole budeme reprezentovat datovou strukturou, která vypadá takto:
type Recipient struct { Year int Name string Surname string }
To znamená, že bude nutné provést dvojí transformaci:
- Převod řetězce (například „1983 Ken Thompson“) na řez třech řetězců ([„1983“, „Ken“, „Thompson“]).
- Převod prvního řetězce z řezu na celé číslo následované konstrukcí datové struktury Recipient
První transformaci lze v celém řetězci realizovat snadno:
Map(func(line string) []string { return strings.Fields(line) }).
Povšimněte si typu vstupního parametru i typu návratové hodnoty.
Podobně můžeme druhou transformaci dat z řezu na hodnotu typu Recipient popsat:
Map(func(input []string) Recipient { year, _ := strconv.Atoi(input[0]) return Recipient{ year, input[1], input[2]} }).
Typ vstupní hodnoty tedy odpovídá typu návratové hodnoty předchozí operace.
A konečně operace pro vytištění výsledku musí akceptovat parametr typu Recipient:
Map(func(recipient Recipient) { fmt.Printf("%4d %-12s %-12s\n", recipient.Year, recipient.Name, recipient.Surname) }).
Popis celého grafu tedy může vypadat takto:
TextFile("data.txt", 1). Map(func(line string) []string { return strings.Fields(line) }). Map(func(input []string) Recipient { year, _ := strconv.Atoi(input[0]) return Recipient{ year, input[1], input[2]} }). Map(func(recipient Recipient) { fmt.Printf("%4d %-12s %-12s\n", recipient.Year, recipient.Name, recipient.Surname) }).
Úplný zdrojový kód v pořadí již devátého demonstračního příkladu:
package main import ( "flag" "fmt" "strconv" "strings" _ "github.com/chrislusf/glow/driver" "github.com/chrislusf/glow/flow" ) type Recipient struct { Year int Name string Surname string } func main() { flag.Parse() flow. New(). TextFile("data.txt", 1). Map(func(line string) []string { return strings.Fields(line) }). Map(func(input []string) Recipient { year, _ := strconv.Atoi(input[0]) return Recipient{ year, input[1], input[2]} }). Map(func(recipient Recipient) { fmt.Printf("%4d %-12s %-12s\n", recipient.Year, recipient.Name, recipient.Surname) }). Run() }
Tento demonstrační příklad po svém spuštění vypíše naformátované hodnoty reprezentované strukturami typu Recipient:
1983 Ken Thompson 1983 Dennis Ritchie 1988 Ivan Sutherland 1979 Kenneth Iverson 1989 William Kahan 1977 John Backus
17. Operace typu Sort
Do grafu s popisem toku a zpracování dat lze přidat i operaci typu Sort. Tato operace na vstupu akceptuje dvojici hodnot a vrací pravdivostní hodnotu true/false na základě toho, zda má být první prvek zařazen za prvek druhý či naopak. Tímto způsobem lze zajistit seřazení prvků jakéhokoli typu, a to například i prvků typu Recipient, pokud explicitně zvolíme, která položka nebo položky se mají porovnávat:
Sort(func(recipient1 Recipient, recipient2 Recipient) bool { return recipient1.Year < recipient2.Year }).
Úplný zdrojový kód dnešního předposledního demonstračního příkladu vypadá takto:
package main import ( "flag" "fmt" "strconv" "strings" _ "github.com/chrislusf/glow/driver" "github.com/chrislusf/glow/flow" ) type Recipient struct { Year int Name string Surname string } func main() { flag.Parse() flow. New(). TextFile("data.txt", 1). Map(func(line string) []string { return strings.Fields(line) }). Map(func(input []string) Recipient { year, _ := strconv.Atoi(input[0]) return Recipient{ year, input[1], input[2]} }). Sort(func(recipient1 Recipient, recipient2 Recipient) bool { return recipient1.Year < recipient2.Year }). Map(func(recipient Recipient) { fmt.Printf("%4d %-12s %-12s\n", recipient.Year, recipient.Name, recipient.Surname) }). Run() }
Po překladu a spuštění získáme výherce Turing Award seřazené podle roku získání ceny:
1977 John Backus 1979 Kenneth Iverson 1983 Ken Thompson 1983 Dennis Ritchie 1988 Ivan Sutherland 1989 William Kahan
Obrázek 6: Vizualizace grafu popsaného v desátém demonstračním příkladu.
Triviálním způsobem lze zajistit seřazení podle příjmení:
Sort(func(recipient1 Recipient, recipient2 Recipient) bool { return recipient1.Surnam < recipient2.Surnam }).
S výsledkem:
1977 John Backus 1979 Kenneth Iverson 1989 William Kahan 1983 Dennis Ritchie 1988 Ivan Sutherland 1983 Ken Thompson
18. Problematika operace Reduce při agregaci výsledků
V dnešním posledním demonstračním příkladu si ukážeme způsob agregace výsledků přes operaci typu Reduce. Tuto operaci již známe, ovšem prozatím jsme měli „štěstí“, protože jsme počítali sumu prvků. A samotná suma je představována celým číslem, tj. stejným typem, jako samotné prvky, takže jsme vlastně nemuseli řešit typy parametrů vstupujících do operace Reduce.
Nyní však budeme chtít prvky typu Recipient sloučit do jediného řezu. Mohlo by se tedy zdát, že postačuje nadefinovat tento graf:
Map(func(input []string) Recipient { year, _ := strconv.Atoi(input[0]) return Recipient{ year, input[1], input[2]} }). Reduce(func(recipients []Recipient, recipient Recipient) []Recipient { x := append(recipients, recipient) return x }).
Toto zdánlivě logické řešení však není korektní, protože operace Reduce očekává, že oba vstupy budou stejného typu. Budeme si tedy muset pomoci malým trikem – oba vstupy budou shodného typu „řez Recipienty“ s tím, že do celého řetězce ještě přidáme přemapování jediné hodnoty typu Recipient na řez. Na konci zpracování vytvoříme z řezů tabulku, kterou vytiskneme na standardní výstup:
Map(func(input []string) Recipient { year, _ := strconv.Atoi(input[0]) return Recipient{ year, input[1], input[2]} }). Map(func(recipient Recipient) []Recipient { var x []Recipient = []Recipient{recipient} return x }). Reduce(func(recipients []Recipient, recipient []Recipient) []Recipient { x := append(recipients, recipient...) return x }). Map(func(recipients []Recipient) { fmt.Println(" # Year First name Surname") for i, recipient := range recipients { fmt.Printf("%2d %4d %-12s %-12s\n", i, recipient.year, recipient.name, recipient.surname) } }).
Úplný zdrojový kód tohoto demonstračního příkladu vypadá následovně:
package main import ( "flag" "fmt" "strconv" "strings" _ "github.com/chrislusf/glow/driver" "github.com/chrislusf/glow/flow" ) type Recipient struct { year int name string surname string } func main() { flag.Parse() flow. New(). TextFile("data.txt", 1). Map(func(line string) []string { return strings.Fields(line) }). Map(func(input []string) Recipient { year, _ := strconv.Atoi(input[0]) return Recipient{ year, input[1], input[2]} }). Map(func(recipient Recipient) []Recipient { var x []Recipient = []Recipient{recipient} return x }). Reduce(func(recipients []Recipient, recipient []Recipient) []Recipient { x := append(recipients, recipient...) return x }). Map(func(recipients []Recipient) { fmt.Println(" # Year First name Surname") for i, recipient := range recipients { fmt.Printf("%2d %4d %-12s %-12s\n", i, recipient.year, recipient.name, recipient.surname) } }). Run() }
Podívejme se nyní, jaký výsledek získáme po překladu a spuštění dnešního posledního demonstračního příkladu:
# Year First name Surname 0 1983 Ken Thompson 1 1983 Dennis Ritchie 2 1988 Ivan Sutherland 3 1979 Kenneth Iverson 4 1989 William Kahan 5 1977 John Backus
Obrázek 7: Vizualizace grafu popsaného v jedenáctém demonstračním příkladu.
19. Repositář s demonstračními příklady
Zdrojové kódy všech dnes použitých demonstračních příkladů byly uloženy do Git repositáře, který je dostupný na adrese https://github.com/tisnik/go-root. V případě, že nebudete chtít klonovat celý repositář (ten je ovšem – alespoň prozatím – velmi malý, dnes má přibližně stovku kilobajtů), můžete namísto toho použít odkazy na jednotlivé demonstrační příklady, které naleznete v následující tabulce:
20. Odkazy na Internetu
- glow
https://github.com/chrislusf/glow - Glow – wiki
https://github.com/chrislusf/glow/wiki#glow-apis - Gleam
https://github.com/chrislusf/gleam - Get into the flow
https://appliedgo.net/flow/ - Flow-based and dataflow programming library for Go programming language
https://github.com/trustmaster/goflow - Flow-based programming (Wikipedia)
https://en.wikipedia.org/wiki/Flow-based_programming - FlowBasedProgramming (Python wiki)
https://wiki.python.org/moin/FlowBasedProgramming - Flow Based Programming
https://github.com/flowbased/flowbased.org/wiki - Concepts
https://github.com/flowbased/flowbased.org/wiki/Concepts - Circular buffer
https://en.wikipedia.org/wiki/Circular_buffer - Circular Buffers in Linux kernel
https://www.kernel.org/doc/html/latest/core-api/circular-buffers.html - Flow-based Programming
https://jpaulm.github.io/fbp/ - DrawFBP
https://github.com/jpaulm/drawfbp - Panta Rhei
https://blogs.bu.edu/marsh-vocation/2016/09/29/panta-rhei/ - Hérakleitos
https://cs.wikipedia.org/wiki/H%C3%A9rakleitos - FlowBasedProgramming (Wiki)
https://www.jpaulmorrison.com/cgi-bin/wiki.pl - FBP Network Protocol
https://flowbased.github.io/fbp-protocol/ - Flow-based programming specification wiki
https://flow-based.org/ - Flow Based Programming
http://wiki.c2.com/?FlowBasedProgramming - FlowBasedProgramming
http://www.jpaulmorrison.com/cgi-bin/wiki.pl - BrokerageApplication
http://www.jpaulmorrison.com/cgi-bin/wiki.pl?BrokerageApplication - What the Hell Is Flow-Based Programming?
https://medium.com/bitspark/what-the-hell-is-flow-based-programming-d9e88a6a7265 - Flow-based visual scripting for Python
https://ryven.org/ - PyFlow
https://github.com/gangtao/pyflow - Flow-based Programming
https://pypi.org/project/flowpipe/ - The state of Flow-based Programming
https://blog.kodigy.com/post/state-of-flow-based-programming/