Všeobecný úvod k sérii článků
Zabývám se návrhem a vývojem distribuovaných komunikačních systémů již řadu let. Tak jsem si řekl, že by nebylo špatné shrnout některé své zkušenosti a nabídnout je k širší diskuzi a použití.
Moje představa je taková, že postupně vytvořím sérii článků, ve které ukážu návrh a postup realizace některých základních funkcí distribuovaného informačního systému. O jaký systém se bude tedy jednat?
Jsem dlouhodobým příznivcem komunikačních systémů, které mají jen minimum centrálních služeb (nějaké ty centrální služby vždycky potřebujeme, jen je otázkou, jak moc jich v tom centru musí být). Proto jsem si vybral jako centrální spojovací bod message broker (dále taky MB), přes který budou všechny komponenty komunikovat. Jedná se tedy o hvězdicovou architekturu, kdy každý komunikující IS (dále také uzel nebo node) je připojen pouze a jenom na jeden centrální message broker. Komunikující IS se spolu baví tak, že si posílají zprávy.
Takto to vypadá, že úzkým místem celého řešení je centrální message broker. V případě jeho výpadku pak nekomunikuje nikdo. To je pravda, nicméně v případě message broker se velice jednoduše realizuje clustering, který nám pomůže tento problém velice efektivně a jednoduše vyřešit.
Pro vývoj těchto systémů dlouhodobě užívám jazyk Java, proto jsem pro vlastní realizaci použil projekty:
- Apacha Camel ver. 3.6, jako integrační nástroj na úrovni jednotlivých uzlů
- Apache ActiveMQ ver. 5.16, v roli message broker
- SpringBoot ver. 2.4, framework pro rychlý vývoj aplikace pro komunikační uzly
Plánovaný obsah seriálu
V rámci následujících článků bych se rád ponořil do těchto témat:
- úvod do výměny zpráv přes message broker, messaging a request-response
- vyvolán služeb z REST rozhraní
- práce s timeout v rámci výměny zpráv
- dynamické směrování
- serializace zpráv
- efektivní serializace zpráv a komprese
- ověření identity původce zpráv
- směrování dle obsahu
Technický úvod do série článků
Všechny zdrojové kódy můžete najít: jraska1/jv-distributed-systems-guide
Celou sérii článků připravuji ve virtuálním linuxovém prostředí.
Používám toto, i když na vlastní funkčnost projektu by to nemělo mít vliv:
- VirtualBox 6.1 jako virtuální prostředí
- Fedora 33
- OpenJDK 11 z distribuce
- Maven 3 z distribuce
Message broker ActiveMQ
Ten mám nainstalován ve stejném prostředí. V této sérii článků bych se nechtěl explicitně zabývat vlastnostmi a nastavováním brokeru, takže používám implicitní konfiguraci.
K brokeru se tedy budu připojovat protokolem OpenWire na portu 61616, a nebude k tomu potřeba ani účet a heslo.
Pro kontrolu front zpráv je možné použít administrátorské rozhraní, které standardně běží na adrese http://localhost:8161.
Aplikace SpringBoot
Pro zjednodušení vývoje aplikace jsem použil framework SpringBoot.
Jeho nastavení je v rámci pom.xml, který najdete v projektu na GitHub.
Příklady pro konkrétní článek jsou zahrnuty do jednoho package, v jehož kořenu je také vždy hlavní třída pro spuštění. Ta obvykle vypadá nějak takto:
@SpringBootApplication public class Example01Application { public static void main(String[] args) { SpringApplication.run(Example01Application.class, args); } }
Komunikační funkce v Camel
Tam, kde to půjde, budu využívat projekt Camel pro implementaci komunikačních funkcí. Cesty jsou definovány zápisem Java kódu ve třídě rozšiřující třídu RouteBuilder. Tato implementace je obvykle v package route.
Takto nějak vypadá definice cest v Camel:
@Component public class CamelRoutes extends RouteBuilder { private static final Logger logger = LoggerFactory.getLogger(CamelRoutes.class); @Override public void configure() { from("timer://applicant01?repeatCount=1").routeId("applicant01") .process(exchange -> exchange.getMessage().setBody(new Request("applicant01", new Date(), 10))) .to("activemq:queue:QUEUE-1"); } }
Podrobnější informace pro nastavení testovacího prostředí
Pro vás, kteří potřebujete pomoci s nastavením prostředí a spouštěním jednotlivých příkladů, připojuji detailnější postup.
Ověřte si, že máte dostupné JDK:
[raska@localhost ~]$ java -version openjdk version "11.0.9.1" 2020-11-04 OpenJDK Runtime Environment 18.9 (build 11.0.9.1+11) OpenJDK 64-Bit Server VM 18.9 (build 11.0.9.1+11, mixed mode, sharing)
A také nainstalovaný Maven:
[raska@localhost ~]$ mvn -version Apache Maven 3.6.3 (Red Hat 3.6.3-5) Maven home: /usr/share/maven Java version: 11.0.9.1, vendor: Red Hat, Inc., runtime: /usr/lib/jvm/java-11-openjdk-11.0.9.11-9.fc33.x86_64 Default locale: en_US, platform encoding: UTF-8 OS name: "linux", version: "5.9.16-200.fc33.x86_64", arch: "amd64", family: "unix"
Toto je postup krok za krokem, jak stáhnout a rozběhnout AciveMQ v implicitním nastavení:
[raska@localhost]$ mkdir opt [raska@localhost]$ cd opt [raska@localhost opt]$ wget https://downloads.apache.org/activemq/5.16.0/apache-activemq-5.16.0-bin.tar.gz [raska@localhost opt]$ tar xzf apache-activemq-5.16.0-bin.tar.gz [raska@localhost opt]$ ln -s apache-activemq-5.16.0 activemq [raska@localhost opt]$ ./activemq/bin/activemq start
Funkčnost si můžete ověřit na webovém administrátorském rozhraní http://localhost:8161.
Příklady vyvíjím a testuji v IntelliJ IDEA. Do tohoto prostředí si můžete stáhnout projekt rovnou z GitHub a pouštět jednotlivé příklady.
Pokud se nechcete zabývat vývojovým prostředím, pak si je můžete pustit i z příkazové řádky.
Nejjednodušší postup je následující:
[raska@localhost]$ cd opt [raska@localhost opt]$ git clone https://github.com/jraska1/jv-distributed-systems-guide.git [raska@localhost opt]$ cd jv-distributed-systems-guide/ raska@localhost jv-distributed-systems-guide]$ mvn compile exec:java -Dexec.mainClass="cz.dsw.distribguide.example01.Example01Application"
Spustí se vám aplikace příkladů z první lekce.
Zasílání zpráv
Odesilatel zprávu odešle příjemci a už se dále nepídí po tom, zda příjemce zprávu dostal, a případně jak s ní naložil.
Příklady k tomuto článku je možné najít v package: example01
Předávané zprávy
Nejdříve musím něco mít, abych to mohl předat někomu jinému, tedy nějakou zprávu.
V mém případě se bude jednat o Java Bean, které budou takovou zprávu představovat. Jejich definice je v package entity.
Je tam abstraktní třída Token, kterou budu používat jako předchůdce pro všechny vyměňované zprávy. Ta vypadá následovně:
public abstract class Token implements Serializable { private String name; private Date ts; public Token(String name, Date ts) { this.name = name; this.ts = ts; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Date getTs() { return ts; } public void setTs(Date ts) { this.ts = ts; } @Override public String toString() { return "Token{" + "name='" + name + '\'' + ", ts=" + ts + '}'; } }
V každé zprávě bude nějaké jméno uzlu, který zprávu vytvořil a timestamp, kdy se tak stalo.
No a dále tam je další třída Request, která bude představovat vlastní obsah předávané zprávy od odesilatele k příjemci:
public class Request extends Token { private long value; public Request(String name, Date ts, long value) { super(name, ts); this.value = value; } public long getValue() { return value; } public void setValue(long value) { this.value = value; } @Override public String toString() { return "Request{value=" + value + ", " + super.toString() + "}"; } }
Výměna zpráv
Tady se již budu zabývat jednotlivými způsoby výměny zpráv. Budou konkrétně tři.
Nejdříve ukázka definovaných cest a pak se podívám na jednotlivé způsoby samostatně:
@Component public class CamelRoutes extends RouteBuilder { private static final Logger logger = LoggerFactory.getLogger(CamelRoutes.class); @Override public void configure() { // Applicant Route definitions ... from("timer://applicant01?fixedRate=true&delay=0&repeatCount=1").routeId("applicant01") .process(exchange -> exchange.getMessage().setBody(new Request("applicant01", new Date(), 10))) .to("activemq:queue:QUEUE-1"); from("timer://applicant02?fixedRate=true&delay=10000&repeatCount=1").routeId("applicant02") .process(exchange -> exchange.getMessage().setBody(new Request("applicant02", new Date(), 20))) .multicast() .to("activemq:queue:QUEUE-1", "activemq:queue:QUEUE-2", "activemq:queue:QUEUE-3") .end(); from("timer://applicant03?fixedRate=true&delay=20000&repeatCount=1").routeId("applicant03") .process(exchange -> exchange.getMessage().setBody(new Request("applicant03", new Date(), 30))) .to("activemq:topic:TOPIC"); // Provider Route definitions ... from("activemq:queue:QUEUE-1").routeId("provider01") .process(exchange -> logger.info("... {}", exchange.getMessage().getBody(Request.class))); from("activemq:queue:QUEUE-2").routeId("provider02") .process(exchange -> logger.info("... {}", exchange.getMessage().getBody(Request.class))); from("activemq:queue:QUEUE-3").routeId("provider03") .process(exchange -> logger.info("... {}", exchange.getMessage().getBody(Request.class))); from("activemq:topic:TOPIC").routeId("provider04") .process(exchange -> logger.info("... {}", exchange.getMessage().getBody(Request.class))); from("activemq:topic:TOPIC").routeId("provider05") .process(exchange -> logger.info("... {}", exchange.getMessage().getBody(Request.class))); from("activemq:topic:TOPIC").routeId("provider06") .process(exchange -> logger.info("... {}", exchange.getMessage().getBody(Request.class))); } }
Aby se mně odesilatelé nastartovali, jsou jejich cesty iniciovány pomocí komponenty timer. Je to jenom berlička pro tyhle příklady. V dalších článcích přejdu na inicializaci z REST rozhraní.
Předání zprávy jednomu příjemci – metoda send
Je to ta nejjednodušší podoba, kdy jeden odesilatel applicant01 předá zprávu jednomu příjemci provider01 reprezentovanému frontou zpráv QUEUE-1.
Odesilatel nejdříve vytvoří bean třídy Request, kterou následně předá do fronty QUEUE-1. O výsledek se již nestará. Přesněji řečeno, jeho role končí v okamžiku, kdy se zpráva zapíše do fronty na message brokeru.
Příjemce pak vyčítá zprávy ze své fronty a pouze zapíše jejich obsah do logu. No a výsledek by se měl projevit v logu takto:
2021-01-01 18:10:40.766 INFO [nsumer[QUEUE-1]]: ... Request{value=10, Token{name='applicant01', ts=Fri Jan 01 18:10:40 CET 2021}}
Předání zprávy více příjemcům – metoda multicast
O něco komplikovanější, ale ne o moc, je předání jedné zprávy více příjemcům. To dělá odesilatel applicant02. Ten vytvoří zprávu a pak jí procesorem multicast rozešle příjemcům provider01, provider02 a provider03, a to prostřednictvím front QUEUE-1, QUEUE-2 a QUEUE-3.
Role odesilatele končí v okamžiku předání zpráv do patřičných front v message brokeru. Dále je to již úloha pro příjemce, aby obsah svých front načetli a zapsali do logu.
Takhle vypadá výsledek v logu:
2021-01-01 18:10:50.678 INFO [nsumer[QUEUE-1]]: ... Request{value=20, Token{name='applicant02', ts=Fri Jan 01 18:10:50 CET 2021}} 2021-01-01 18:10:50.726 INFO [nsumer[QUEUE-2]]: ... Request{value=20, Token{name='applicant02', ts=Fri Jan 01 18:10:50 CET 2021}} 2021-01-01 18:10:50.784 INFO [nsumer[QUEUE-3]]: ... Request{value=20, Token{name='applicant02', ts=Fri Jan 01 18:10:50 CET 2021}}
Předání zprávy neznámým příjemcům – metoda broadcast
Poslední metodou, kterou v tomto článku ukážu, je předání zprávy s využitím metody Public-Subscribe.
Odesilatel neví, komu bude zprávu předávat. On ji předá každému, kdo se přihlásí k odběru tématu TOPIC.
Ve výše uvedeném příkladu je to applicant03, který vytvoří zprávu a předá ji do TOPIC. K odběru jsou přihlášeni provider04, provider05 a provider06, kteří obdrží každý stejnou kopii zprávy.
Výsledek v logu pak vypadá nějak takto:
2021-01-01 18:11:00.685 INFO [Consumer[TOPIC]]: ... Request{value=30, Token{name='applicant03', ts=Fri Jan 01 18:11:00 CET 2021}} 2021-01-01 18:11:00.685 INFO [Consumer[TOPIC]]: ... Request{value=30, Token{name='applicant03', ts=Fri Jan 01 18:11:00 CET 2021}} 2021-01-01 18:11:00.686 INFO [Consumer[TOPIC]]: ... Request{value=30, Token{name='applicant03', ts=Fri Jan 01 18:11:00 CET 2021}}