Koncepce
Základní architektura je velice podobná všem „velkým“ ETL nástrojům (například Informatica). Práce s daty probíhá prostřednictvím sestavení tzv. transformačního grafu (orientovaný, acyklický), který obsahuje jednotlivé transformační komponenty. Tyto komponenty jsou navzájem provázány prostřednictvím datového potrubí. Datovým potrubím data pouze protékají, nic se s nimi neděje. Jakákoliv práce s daty probíhá v rámci specializovaných komponent, které mohou například data filtrovat, měnit jejich formát, zapisovat na disk či propojovat na základě definovaného klíče.
Výhodou tohoto přístupu je to, že změnu chování transformačního grafu lze ovlivnit přidáním další komponenty do cesty datům či pouhou úpravou parametrů komponenty, která již v grafu je zapojena. Sestavení nové či úprava stávající datové transformace je tak otázkou velice krátkého času. Samozřejmě se zde ideálně nabízí možnost sestavovat transformace vizuálně pomocí návrháře, kde máme paletu možných komponent, ze kterých si vybíráme a které různě spojujeme do více či méně komplikovaných struktur.
Systém Clover.ETL.
Systém je napsán v jazyce Java a je licencován pod LGPL licencí. Už slyším nářky pravověrných zastánců strojového kódu či alespoň jazyka C na rychlost celého balíku. Kupodivu to s rychlostí není až tak špatné, a i když má systém k dnešnímu dni desítky stálých uživatelů, nikdo si zatím na rychlost nestěžoval. Uživatele spíše zajímá určitá funkcionalita či nové plánované vlastnosti.
Clover.ETL se na data dívá jako na sekvence jednotlivých záznamů s víceméně pevnou strukturou. Každý záznam je pak sestaven z položek ruzných typů – řetězec, číslo, datum atd. V rámci Clover.ETL jsou různé typy záznamů (a tudíž i datových toků) popsány ruznými sekvencemi metadat. Ta popisují pro každý typ jeho vnitřní strukturu. Kromě klasické možnosti sestavit takový popis struktury pomocí vytvoření určitých instancí příslušných tříd Javy se nabízí také možnost popsat strukturu pomocí XML dokumentu, který pak může být automaticky načten a příslušná metadata z něj budou extrahována.
Následující příklad ukazuje jednoduchý popis datové struktury záznamu o zaměstnanci nějaké hypotetické firmy:
<?xml version="1.0" encoding="UTF-8"?>
<!-- Automatically generated from database null -->
<Record name="EMPLOYEE" type="delimited">
<Field name="EMP_NO" type="numeric" delimiter="," format="#"/>
<Field name="FIRST_NAME" type="string" delimiter="," />
<Field name="LAST_NAME" type="string" delimiter="," />
<Field name="PHONE_EXT" type="string" nullable="yes" delimiter="," />
<Field name="HIRE_DATE" type="date" delimiter="," format="dd/MM/yyyy" />
<Field name="DEPT_NO" type="string" delimiter="," />
<Field name="JOB_CODE" type="string" delimiter="," />
<Field name="JOB_GRADE" type="numeric" delimiter="," />
<Field name="JOB_COUNTRY" type="string" delimiter="," />
<Field name="SALARY" type="numeric" delimiter="," />
<Field name="FULL_NAME" type="string" nullable="yes" delimiter="\n" />
</Record>
Kromě názvu celého záznamu a jednotlivých položek můžeme vidět také datové typy, informaci o tom, zda jsou přípustné NULL hodnoty pro jednotlivé položky, a další údaje týkající se formátování obsahu položek či celého záznamu. Takto sestavený popis záznamu lze v transformačním grafu použít pro komponentu, která načítá jednotlivé záznamy z textového souboru a převádí je na interní reprezentaci. Interně jsou všechny záznamy uchovávány v binární podobě s proměnnou délkou. Pokud takový záznam potřebujeme zapsat do datového souboru s pevnou strukturou, prostě použijeme metadata definující záznam se shodnými položkami, ale pevnou délkou formátování – viz následující popis struktury:
<?xml version="1.0" encoding="UTF-8"?>
<!-- Automatically generated from database null -->
<Record name="EMPLOYEE_FIX" type="fixed">
<Field name="EMP_NO" type="numeric" size="5" />
<Field name="FIRST_NAME" type="string" size="20" />
<Field name="LAST_NAME" type="string" size="20" />
<Field name="PHONE_EXT" type="string" size="10" />
<Field name="HIRE_DATE" type="date" size="10" format="dd/MM/yyyy" />
<Field name="DEPT_NO" type="string" size="3" />
<Field name="JOB_CODE" type="string" size="4" />
<Field name="JOB_GRADE" type="numeric" size="1" />
<Field name="JOB_COUNTRY" type="string" size="15" />
<Field name="SALARY" type="numeric" size="10" />
<Field name="FULL_NAME" type="string" size="40" />
</Record>
Chceme-li například pouze změnit formát uložení dat – z formátu s oddělovačem na pevnou délku, můžeme k tomu sestavit následující transformační graf v prostředí Clover.ETL:
1: <?xml version="1.0" encoding="UTF-8"?>
2: <Graph name="Simple Reformat">
3: <Global>
4: <Metadata id="InMetadata" fileURL="metadata/employee.fmt"/>
5: <Metadata id="OutMetadata" fileURL="metadata/employee_fix.fmt"/>
6: </Global>
7: <Phase number="0">
8: <Node id="INPUT" type="DELIMITED_DATA_READER_NIO" fileURL="data/employee.dat" />
9: <Node is="COPY" type="SIMPLE_COPY"/>
10:<Node id="OUTPUT" type="FIXED_DATA_WRITER_NIO" append="false" fileURL="output/employee.fix.dat"/>
11:<Edge id="INEDGE" fromNode="INPUT:0" toNode="COPY:0" metadata="InMetadata"/>
12:<Edge id="OUTEDGE" fromNode="COPY:0" toNode="OUTPUT:0" metadata="OutMetadata"/>
13:</Phase>
12:</Graph>
Při definování transformace vytváříme topologii grafu s uzly a hranami (Node, Edge). Uzly zde představují dělníky pracující na datech (provádějí konkrétní operaci), hrany jsou pouze určitými transportéry dat z místa na místo (dopravují data mezi jednotlivými uzly). Hrany mají ješte jednu důležitou vlastnost, určují strukturu dat, která jimi tečou.
Jak je z příkladu vidět, metadata popisující strukturu jednotlivých záznamů jsou přiřazena ke hranám (viz řádek 11,12 atribut metadata). Podoba metadata je definována obsahem XML souborů, které jsou načítány na řádcích 4 a 5.
Všechny hrany jsou orientované, ke každé hraně je specifikováno, ze kterého uzlu vychází a do kterého směřuje (atributy fromNode a toNode). Pro jednotlivé uzly se pak hrany jeví buď jako vstupní, nebo výstupní porty – podle toho, zda hrana z uzlu vychází, nebo do něj naopak směřuje. Základním úkolem každé komponenty je číst data ze všech připojených vstupních portů, udělat s nimi nějakou užitečnou operaci/transformaci a poté je zapsat na výstupní port(y). Samozřejmě jinak se chová parser textového souboru, jinak komponenta třídící data a jinou úlohu má komponenta pro zápis dat do databáze.
Běh transformací
Když máme celou transformaci hezky navrženu v podobě transformačního grafu, je načase ji vyzkoušet – tedy spustit a pokusit se zpracovat nějaká data.
Transformace je vykonávána prostřednictvím běhu jednotlivých svých komponent (uzlů / Node). Co běžící komponenta, to jedno vlákno (thread). Všechny běžící komponenty jsou řízeny prostřednictvím speciálního vlákna zvaného WatchDog. WatchDog provádí spoustu užitecných funkcí, jednou z hlavních je sledování počtu běžících komponent a jejich zdraví. Pokud nějaká komponenta zhavaruje, provede WatchDog potřebné uklizení – zejména ukončení všech ostatních komponent. Kromě jiného WatchDog také pravidelně reportuje o tom, co se v rámci grafu právě děje – které komponenty běží a které již byly ukončeny, kolik záznamů proteklo jednotlivými hranami (Edge) spojující komponenty.
Jednotlivé komponenty spolu vzájemně komunikují tak, že si posílají jednotlivé datové záznamy. Jinak o sobě v podstatě nevědí – a ani to nepotřebují. Běh transformace probíhá tak, že jsou nejdříve spušteny všechny komponenty nemající žádné datové vstupy – tedy takové, které by měly nějaká data samy generovat (například číst ze souboru či databáze). Potom se čeká tak dlouho, dokud běží komponenty, které nemají žádné datové výstupy – jsou to tzv. terminátory, které data někam zapisují, ukládají či prostě jenom vyhazují. Pokud v průběhu zpracování dojde k nějaké závažné chybě, která znemožní standardní „doběhnutí“ transformace, je tu WatchDog, který se o problém postará (násilně ukončí všechny dosud běžící komponenty). Standardní ukončení transformace probíhá tak, že komponenty generující data pošlou zprávu navazujícím komponentám (které od nich data přebírají), že žádná další data už k dispozici nejsou, a poté se samy ukončí. Tato zpráva se lavinovitě šíří až do okamžiku, kdy i poslední komponenta zjistí, že už žádná další data nemá očekávat, a také se sama ukončí. Jakmile WatchDog zjistí, že již není žádná běžící komponenta, provede standardní ukončení své činnosti, což způsobí zastavení celého programu.
Fáze v rámci transformací
Každý transformační graf je rozdělen na fáze – viz Phase v ukázce grafu na řádku 7. Alespoň jedna fáze musí vždy existovat. Provádění transformace probíhá tak, že nejdríve jsou spuštěny všechny komponenty v rámci jedné fáze – s nejnižším číslem fáze. Když všechny ukončí zpracování, jsou spušteny všechny komponenty z fáze s nejnižším vyšším číslem. V tomto duchu se pokračuje, dokud jsou nějaké další fáze čekající na spuštění. V případě, že zpracování fáze skončí neúspěchem, je běh transformace zastaven a další fáze již nejsou spouštěny.
Pokud existuje propojení mezi dvěma či více komponentami napříč fázemi, jsou tato propojení automaticky „obohacena“ o vyrovnávací paměť. Tím je umožněno komponentě v nižší fázi dokončit svoji práci předtím, než je spuštěna na ni navazující komponenta z fáze vyšší.
Poznámka: Obecně spolu dvě komponenty komunikují synchronně – tedy jedna vyprodukuje datový záznam a pošle jej ke zpracování navazující. Pak čeká, než je tato druhá komponenta připravena zpracovat další data. Samozřejmě je i zde použita určitá vyrovnávací paměť, ta je však poměrně malá a rozhodně nepojme více než několik záznamů.
Spuštení transformace – jak na to
Existují v zásadě dvě možnosti, jak spustit transformaci v rámci Clover.ETL. První je využít třídy runGraph z package org.jetel.main, která očekává název XML souboru s topologií grafu. Spuštení z příkazové řádky pak vypadá takto:
cmd> java -cp CloverETL.zip org.jetel.main.runGraph
graphJoinHashInline.grf
Výsledkem běhu transformace je zhruba následující výpis na konzoli:
*** CloverETL framework/transformation graph runner, (c) 2002-04 D.Pavlis, released under GNU Public Licence ***
Graph definition file: graphJoinHashInline.grf
output/joined_data_hash.out
[Clover] starting WatchDog thread ...
[WatchDog] Thread started.
[WatchDog] Running on 1 CPU(s) max available memory for JVM 1512 KB
[Clover] Initializing phase: 0
all edges initialized successfully...
initializing nodes:
INPUT1 ...OK
INPUT2 ...OK
JOIN (compiling dynamic source) ...OK
WRITER ...OK
[Clover] phase: 0 initialized successfully.
[WatchDog] Starting up all nodes in phase [0]
[WatchDog] INPUT1 ... started
[WatchDog] INPUT2 ... started
[WatchDog] JOIN ... started
[WatchDog] WRITER ... started
[WatchDog] Sucessfully started all nodes in phase!
[WatchDog] Execution of phase [0] successfully finished - elapsed time(sec): 1
---------------------** Start of tracking Log for phase [0] **-------------------
Time: 21/07/04 23:25:04
Node Status Port #Records
-------------------- ---------------------- ---------------------- --------------
INPUT1 OK
Out:0 49
INPUT2 OK
Out:0 12
JOIN OK
In:0 49
In:1 12
Out:0 49
WRITER OK
In:0 49
---------------------------------** End of Log **--------------------------------
[WatchDog] Forcing garbage collection ...
-----------------------** Summary of Phases execution **---------------------
Phase# Finished Status RunTime(sec) MemoryAllocation(KB)
0 0 1 111
------------------------------** End of Summary **---------------------------
[Clover] WatchDog thread finished - total execution time: 1 (sec)
[Clover] Graph execution finished successfully
Execution of graph finished !
Druhou možností je začlenit Clover.ETL do vaší existující Java aplikace. A to buď tak, že využijete třídu TransformationGraph, která má metodu run() umožňující spuštění transformace, nebo řízení celého běhu napíšete sami.
Topologii grafu lze načíst buď z XML souboru pomocí třídy TransformationGraphXMLReaderWriter, nebo lze graf sestavit přímo vytvořením jednotlivých prvků – Phase, Node, Edge, Metadata atd. Následující ukázka představuje jednoduchou aplikaci pro třídění dat uložených v textovém souboru s oddělovači:
package org.jetel.test;
import java.io.*;
import org.jetel.metadata.*;
import org.jetel.data.*;
import org.jetel.graph.*;
import org.jetel.component.*;
public class testGraphSort {
private static final int _PHASE_1=0;
private static final int _PHASE_2=1;
public static void main(String args[]){
if (args.length!=4){
System.out.println("Example graph which sorts input data accordingt o specified key.");
System.out.println("The key must be a name of field(or comma delimited fields) from input data.");
System.out.println("Usage: testGraphSort <input data filename> <output sorted filename> <metadata filename> <key>");
System.exit(1);
}
System.out.println("**************** Input parameters: ***************");
System.out.println("Input file: "+args[0]);
System.out.println("Output file: "+args[1]);
System.out.println("Input Metadata: "+args[2]);
System.out.println("Key: "+args[3]);
System.out.println("************************** ************************");
DataRecordMetadata metadataIn;
DataRecordMetadataXMLReaderWriter reader=new DataRecordMetadataXMLReaderWriter();
try{
metadataIn=reader.read(new FileInputStream(args[2]));
}catch(IOException ex){
System.err.println("Error when reading metadata!!");
throw new RuntimeException(ex);
}
if (metadataIn==null){
throw new RuntimeException("No INPUT metadata");
}
// create Graph + Node + 2 connections (edges)
TransformationGraph graph= TransformationGraph.getReference();
Edge inEdge=new Edge("In Edge", metadataIn);
Edge outEdge=new Edge("Out Edge", metadataIn);
Node nodeRead=new DelimitedDataReaderNIO("Data Parser", args[0]);
String[] sortKeys=args[3].split(",");
Node nodeSort=new Sort("Sorter", sortKeys, true);
Node nodeWrite=new DelimitedDataWriterNIO("Data Writer", args[1], false);
// add Edges & Nodes & Phases to graph
graph.addEdge(inEdge);
graph.addEdge(outEdge);
graph.addPhase(new Phase(_PHASE_1));
graph.addNode(nodeRead,_PHASE_1);
graph.addNode(nodeSort,_PHASE_1);
graph.addPhase(new Phase(_PHASE_2));
graph.addNode(nodeWrite,_PHASE_2);
// assign ports (input & output)
nodeRead.addOutputPort(0,inEdge);
nodeSort.addInputPort(0,inEdge);
nodeSort.addOutputPort(0,outEdge);
nodeWrite.addInputPort(0,outEdge);
if(!graph.init(System.out)){
System.err.println("Graph initialization failed !");
return;
}
// output graph layout - only for debugging
graph.dumpGraphConfiguration();
if (!graph.run()){ // start all Nodes (each node is one thread)
System.out.println("Failed starting all nodes!");
return;
}
}
}
Hlavním objektem, který udržuje přehled o všech komponentách a vzájemných propojení komponent, je TransformationGraph. Je to tzv. singleton, takže existuje právě jeden a jeho referenci získáme voláním TransformationGraph.getReference(). Pomocí metod addNode() a addPhase() do tohoto objektu přidáváme jednotlivé prvky grafu (Node, Edge, Phase,…). Komponenty je potřeba vzájemně propojit. Toho dosáhneme voláním metod addInputPort() a addOutputPort(), kterým jako parametr předáme hranu (Edge), která bude zprostředkovávat přenos dat mezi oběmi komponentami.
V ukázce je také vidět vytvoření dvou separátních fází, ke kterým pak jednotlivé komponenty přiřazujeme. Běh grafu bude vypadat tak, že nejdříve se spustí komponenty zajišťující čtení dat v textovém souboru a jejich třídění. Až v okamžiku, kdy budou veškerá data setříděna, bude spuštěna komponenta pro zápis setříděných dat do souboru.
V okamžiku, kdy máme topologii grafu hotovu, je třeba zavolat metodu init() třídy TransformationGraph. Jejím úkolem je provést nezbytné přípravné kroky (například přidat ke hranám spojujícím komponenty v rozdílných fázích vyrovnávací paměť nebo ověřit, že námi sestavený transformační graf neobsahuje cykly).
Posledním krokem na cestě k úspěšnému spuštění transformace je zavolání metody run() třídy TransformationGraph. Tato metoda zajistí vypuštění „kontrolního chrousta“ (WatchDog), který pak nastartuje jednotlivé komponenty (co komponenta, to vlákno) a dále celý běh transformace řídí. Metoda run() je ukončena v okamžiku ukončení běhu celé transformace (ukončení běhu WatchDog) a vrací logickou hodnotuTrue, pokud nedošlo k chybě.
Připravené komponenty
Pro přehled uvádím ještě seznam dosud vytvořených komponent.Jejich názvy snad napoví, k čemu slouží. Pokud ne, najdete jejich podrobný popis na domovské stránce projektu v sekci věnované dokumentaci.
- Concatenate
- DBExecute
- DBInputTable
- DBOutputTable
- DBFReader
- Dedup
- DelimitedDataReader
- DelimitedDataWriter
- Filter
- FixLenDataReader
- FixLenDataWriter
- HashJoin
- CheckForeignKey
- Merge
- MergeJoin
- Reformat
- SimpleCopy
- SimpleGather
- Sort
- Trash
Uživatelské komponenty
Vytvoření nové komponenty není problémem a na domovské stránce projektu je krátký tutorial (je to jednoduché, takže krátký), který zájemce rychle seznámí s podstatnými náležitostmi. Důkazem, že se nejedná o nic komplikovaného, je fakt, že několik komponent již bylo do projektu věnováno samotnými uživateli.
Závěr
Tento článek si kladl za cíl představit projekt Clover.ETL „odborné“ veřejnosti. Historie projektu sahá zhruba dva roky zpět a vývoj je zatím spíše náladový (když je čas a nálada, tak se vyvíjí). Za dobu své existence si Clover.ETL našel komunitu uživatelů a dokonce i několik komerčních firem, které se rozhodly jej „zabudovat“ do svých rešení. V rámci projektu je stále ještě dost práce, a pokud by měl někdo chuť se do projektu zapojit, nechť neváhá a kontaktuje mne prostřednictvím e-mailu na dpavlis<at>berlios.de.
Pro detailní seznámení se s vlastnostmi Clover.ETL doporučuji navštívit domovskou stránku projektu.