Služby v distribuovaných systémech: proč vlastně komunikovat

27. 4. 2021
Doba čtení: 22 minut

Sdílet

 Autor: Depositphotos
Tímto článkem bych rád otevřel novou sérii textů a ukázek zaměřených na služby v distribuovaných systémech. V předchozí sérii jsem ukazoval, jak se dá komunikovat mezi nezávislými systémy. Teď se zaměříme na otázku „proč“.

Navazuji na předchozí sérii článků Komunikace v distribuovaných systémech, ve které jsem se primárně zaměřoval na možnosti komunikace mezi různými systémy s využitím centrálního message brokeru. Zatímco v předchozí sérii článků jsem ukazoval možnosti komunikace, tedy „jak“ se dá komunikovat mezi nezávislými systémy, v této sérii bych se rád zaměřil na otázku „proč“ komunikovat.

Jen pro připomenutí, pokud mluvím o distribuovaných systémech, pak se mně jedná především o nezávisle běžící informační systémy. Těch mohou být desítky nebo stovky, provozované různými subjekty ve vlastních provozních prostředích. Bude se tedy primárně jednat o spolupráci mezi systémy, která by měla být výhodná pro všechny zúčastněné.

V mém případě je jediným společným pojítkem mezi systémy centrální message broker. Jeden centrální bod může být výhodou, ale také ohrožením pro fungování systému. Při výpadku message brokeru nefunguje žádná komunikace. Naštěstí v případě brokeru je jednoduchý způsob zapojení do clusteru, čímž toto riziko můžeme minimalizovat. A jaká je tedy ta výhoda? Pokud se do takovéto distribuované sítě zapojuje nový systém, pak postačuje, aby se napojil na message broker (vytvoří si jedno nebo více TCP spojení, které se permanentně udržuje a využívá pro veškerou komunikaci s brokerem). V ten okamžik může nový systém komunikovat se všemi ostatními systémy, které již dříve byly zapojeny. Současně také všechny stávající systémy mají ihned k dispozici nový systém bez potřeby vytváření nějakých další spojení či konfigurací.

Pro vývoj těchto systémů dlouhodobě užívám jazyk Java, proto jsem pro vlastní realizaci použil projekty:

  • Apacha Camel ver. 3.6, jako integrační nástroj na úrovni jednotlivých uzlů

  • Apache ActiveMQ ver. 5.16, v roli message broker

  • SpringBoot ver. 2.4, framework pro rychlý vývoj aplikace pro komunikační uzly

Články této série bych se chtěl podívat na role, jaké mohou hrát jednotlivé informační systémy v rámci distribuovaného systému. A jak je možné kombinací těchto rolí dosáhnout nové funkcionality. Komunikační uzly mohou poskytovat své služby ostatním uzlům, mohou využívat služby jiných uzlů. Nejčastěji to bude ale kombinace obou přístupů, tedy vzájemná kooperace.

Při prezentaci jednotlivých řešení v bych se rád přiblížil reálnému použití. Proto je výsledkem mého snažení aplikace ve Spring Boot, která po spuštění představuje jednu instanci informačního systému. Takových instancí je možné spustit více a simulovat tak interakci několika systémů. Každá ze spuštěných instancí má přidělenu roli či více rolí, které v distribuovaném systému hraje. Připravte se tedy na to, že prezentace budou zahrnovat spuštění několika instancí aplikace v různých terminálech.

Technický úvod do série článků

Všechny zdrojové kódy můžete najít na GitHub v projektu jv-distributed-services-guide.

Celou sérii článků připravuji ve virtualizovaném linuxovém prostředí.

Používám toto, i když na vlastní funkčnost projektu by to nemělo mít vliv:

  • VirtualBox 6.1 jako virtuální prostředí

  • Fedora 33

  • OpenJDK 11 z distribuce

  • Maven 3 z distribuce

Message broker Apache ActiveMQ

Ten mám nainstalován ve stejném prostředí. V této sérii článků bych se nechtěl explicitně zabývat vlastnostmi a nastavováním brokeru, takže používám implicitní konfiguraci.

K brokeru se tedy budu připojovat protokolem OpenWire na portu 61616, a nebude k tomu potřeba ani účet a heslo.

Pro kontrolu front zpráv je možné použít administrátorské rozhraní, které standardně běží na adrese http://localhost:8161.

Aplikace Spring Boot

Jedná se o standardní aplikaci napsanou pro framework Spring Boot. Její sestavení a základní komponenty jsou popsány v rámci Maven projektového souboru pom.xml.

Samotná aplikace po svém startu zahrnuje:

  • Webový server Jetty včetně podpory pro vytváření webových služeb. V mém případě jej budu používat pro uživatelský přístup prostřednictvím REST volání.

  • Camel kontext primárně určený pro implementaci komunikačních služeb mezi jednotlivými uzly

Aby mohly jednotlivé instance plnit funkce více rolí, využil jsem systém profilů implementovaný ve Spring Boot (bližší informace najdete v dokumentaci: Profiles).

Konfigurace aplikace

Základní konfigurace platná pro každou spuštěnou instanci aplikace je v souboru resources/application.yml. Jedná se o konfigurační soubor ve formátu YAML, který se načítá jako první při spuštění aplikace. V tomto okamžiku je pro další popis podstatná tato část konfiguračního souboru:

spring:
  profiles:
    group:
      node00:
      node01:
        - "applicant"
      node02:
        - "provider"
      node03:
        - "provider"
  activemq:
    broker-url: tcp://localhost:61616
    packages:
      trustAll: True

Jsou zde definovány parametry pro připojení na message broker ActiveMQ, který je instalován lokálně. Pro komunikaci je využit protokol OpenWire poslouchající na portu 61616.

Zajímavější je ovšem ta první část parametrů, které popisují definici jednotlivých instancí programu a k nim přiřazených rolí.

Zde využívám další vlastnost Spring Boot a jejich profilů, a tou jsou skupiny profilů. V mém případě bude skupinou vždy jedna samostatně spuštěná instance programu (v konfiguračním souboru jsou instance označeny jako node00, node01, … nodeN). Ke každé skupině jsou přiřazeny profily představující role, které má spuštěná instance plnit.

Tak z výše uvedeného příkladu vyplývá, že node01 plní roli „žadatele o služby“. Naproti tomu uzly node02 a node03 plní roli „poskytovatele služeb“. Navíc je uvedena i instance isac00, která nemá přiřazenu roli žádnou. K čemu je to dobré? Uvidíte později, ale jedná se o to, že role mohu instancím přidávat i při jejich spuštění. Proto tuto instanci budu používat při experimentování s kombinováním různých rolí.

Systém skupin profilů podporuje ještě jednu zajímavou vlastnost, a tou jsou konfigurační soubory specifické pro danou skupinu. To využiji pro nastavení parametrů pro každou instanci aplikace zvláště.

Pokud se podíváte do resources projektu, pak zde kromě základního konfiguračního souboru aplikace uvidíte také konfigurační soubory pro skupiny profilů nazvané application-<název skupiny>.yml. Ty Spring Boot načítá následně pro hlavním konfiguračním souboru, a parametry zde nastavené mají přednost před parametry z hlavního souboru.

Například pro instanci node01 existuje konfigurační soubor application-node01.yml s tímto obsahem:

node:
  name: node01
  id: local:${node.name}

server:
  port: 8081

applicant:
  destinations: QUEUE-1, QUEUE-2, QUEUE-3

První dva parametry, tedy node.name a node.id, definují označení instance uzlu. Jedná se o uživatelsky čitelné jméno instance a technický identifikátor. Dále je definován parametr server.port, což je číslo HTTP portu, na kterém bude instance poslouchat. Jen pro vysvětlení, nastavení je nezbytné pro každou instanci zvláště, neboť si každá spustí vlastní Jetty server. Posledním parametrem je applicant.destinations. Ten obsahuje seznam front, které bude instance oslovovat při vyvolání žádosti o poskytnutí služeb. S těmi jsme se setkali již v rámci první série článků.

Hlavní třída aplikace

V rámci projektu je pouze jedna třída, která má statickou metodu main(), a sice Application. Ona obsahuje více funkcionality, ale pro náš začátek bude postačovat ukázka těch podstatných pro spuštění:

@SpringBootApplication
public class Application implements ApplicationRunner {

    private static final Logger logger = LoggerFactory.getLogger(Application.class);

    private static ConfigurableApplicationContext context;

    public static void main(String[] args) {
        context = SpringApplication.run(Application.class, args);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        logger.info("*** Hello World, greetings from Dwarf ***");
    }
}

Toto je základní rámec pro spuštění aplikace. Později k němu budu přidávat novou funkcionalitu především v reakci na spuštění a ukončení aplikace.

Spuštění jedné instance

Jak jsem uvedl již dříve, mám pouze jednu aplikaci, kterou chci spouštět s různými rolemi. Na to používám samostatný terminál pro každou instanci. Jako parametr spuštění se zadává název skupiny profilů (tedy název instance), které má aplikace spustit.

Tak například, spuštění instance uzlu node01 by mohlo vypadat následovně:

java -jar target/distributed-services-guide-1.0.jar --spring.profiles.active=node01

Další vlastností, kterou budu často využívat, je možnost zadat profil (rozuměj v mém případě roli) rovnou při spuštění aplikace. V následujícím příkladu budu spouštět uzel node00, ale přidám mu ještě role applicant a provider:

java -jar target/distributed-services-guide-1.0.jar --spring.profiles.active=node00,applicant,provider --applicant.destinations=QUEUE-0 --provider.queue=QUEUE-0

Jen poznámka na konec. Aby se instance korektně spustila, bylo potřeba doplnit parametr název fronty, který není součástí žádného konfiguračního souboru (načítal jsem application.yml a application-node00.yml). Naštěstí Spring Boot podporuje zadávání parametrů z příkazové řádky, jak to je v tomto případě.

Zastavení instance

Vzhledem k tomu, že mám každou instanci spuštěnou v samostatném terminálu, pro zastavení postačuje Ctrl-C.

Předávané zprávy

Veškerá komunikace mezi uzly je realizována tak, že jsou předávány Java Bean. Je vhodné sdílet vlastnosti těchto objektů s využitím dědičnosti.

Takto vypadá základní struktura zpráv (využiji UML pro lepší přehlednost):

Jediným společným předkem pro všechny typy vyměňovaných zpráv je abstraktní třída Token. Atributy definované v rámci této třídy mají následující význam:

  • nid [URI] – technický identifikátor instance uzlu; používá se pro jednoznačnou identifikaci původce zprávy

  • name [String] – uživatelsky čitelný název instance uzlu; nemusí být nutně jednoznačný, neboť se nepoužívá pro identifikaci

  • tid [URI] – jednoznačný identifikátor transakce; používám generovaný identifikátor UUID ver.4; v případě komunikace typu request/response slouží k provázání žádosti s odpověďmi

  • ts [Date] – časová značka vzniku zprávy

Dále mám definovány další tři následovníky, kteří odrážejí různé způsoby komunikace:

  • Message (abstraktní) – předek pro všechny zprávy, kdy bude použita jednosměrná asynchronní komunikace, tzv. předání zpráv.

  • Request (abstraktní) – předek pro všechny typy zpráv vystupující jako dotaz v synchronní komunikaci typu požadavek/odpověď

  • Response – předek pro typy zpráv vystupující jako odpověď na dotaz v synchronní komunikaci typu požadavek/odpověď

    • code [ResponseCodeType] – návratový kód odpovědi

Všechny definice tříd implementujících předávané zprávy jsou součástí package entity.

Komunikační funkce v Camel

Tam, kde to půjde, budu využívat projekt Apache Camel pro implementaci komunikačních funkcí. Cesty jsou definovány zápisem Java kódu ve třídě rozšiřující třídu RouteBuilder. Všechny implementace jsou součástí package route.

Cesty jsou vždy spouštěny jako podpora nějaké komunikační funkce role, kterou uzel zastává. Proto jejich spuštění musí být řízeno systémem profilů. Toho se jednoduše dosáhne doplněním anotace na úrovni implementační třídy.

Takto nějak vypadá definice cest v Camel pro roli applicant (implementace není uvedena celá, v tomto okamžiku mně jde pouze o anotaci @Profile na začátku):

@Component
@Profile(value = "applicant")
public class ApplicantCamelRoutes extends RouteBuilder {

    private static final Logger logger = LoggerFactory.getLogger(ApplicantCamelRoutes.class);

    @Override
    public void configure() throws Exception {
        from("direct:applicant").routeId("applicant")
            ...
            .end();
    }
}

Uživatelské rozhraní REST

V případech, kdy je potřeba uživatelská interakce s uzlem, budu používat REST rozhraní. Bude se jednat o webové služby zařazené do Jetty serveru, který je součástí každé spuštěné instance aplikace.

I webové služby jsou spouštěny jako podpora nějaké role, kterou uzel zastává. Proto i jejich spuštění musí být řízeno systémem profilů. Opět využiji podporu anotací na úrovni implementační třídy webové služby.

Takto vypadá příklad implementační třídy REST služby (implementace není uvedena celá, podstatná je anotace třídy @Profile):

@RestController
@Profile(value = "applicant")
public class ApplicantServiceController {

    private static final Logger logger = LoggerFactory.getLogger(ApplicantServiceController.class);

    @RequestMapping(value = "/rest/appl01")
    public ResponseEntity<List<ResponseA>> restApplicant01(
            @RequestParam(value = "value") int value,
            @RequestParam(value = "expire", required = false, defaultValue = "5000") Long expire) throws Exception {
        ...
    }
}

Všechny implementace tříd pro REST služby můžete najít v package rest.

Podrobnější informace pro nastavení testovacího prostředí

Pro vás, kteří potřebujete pomoci s nastavením prostředí a spouštěním jednotlivých příkladů, připojuji detailnější postup.

Ověřte si, že máte dostupné JDK:

[raska@localhost ~]$ java -version
openjdk version "11.0.9.1" 2020-11-04
OpenJDK Runtime Environment 18.9 (build 11.0.9.1+11)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.9.1+11, mixed mode, sharing)

A také nainstalovaný Maven:

[raska@localhost ~]$ mvn -version
Apache Maven 3.6.3 (Red Hat 3.6.3-5)
Maven home: /usr/share/maven
Java version: 11.0.9.1, vendor: Red Hat, Inc., runtime: /usr/lib/jvm/java-11-openjdk-11.0.9.11-9.fc33.x86_64
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "5.9.16-200.fc33.x86_64", arch: "amd64", family: "unix"

Toto je postup krok za krokem, jak stáhnout a rozběhnout AciveMQ v implicitním nastavení:

[raska@localhost]$ mkdir opt
[raska@localhost]$ cd opt
[raska@localhost opt]$ wget https://downloads.apache.org/activemq/5.16.0/apache-activemq-5.16.0-bin.tar.gz
[raska@localhost opt]$ tar xzf apache-activemq-5.16.0-bin.tar.gz
[raska@localhost opt]$ ln -s apache-activemq-5.16.0 activemq
[raska@localhost opt]$ ./activemq/bin/activemq start

Funkčnost si můžete ověřit na webovém administrátorském rozhraní http://localhost:8161.

Příklady vyvíjím a testuji v IntelliJ IDEA. Do tohoto prostředí si můžete stáhnout projekt rovnou z GitHub a sestavit projekt.

Pokud se nechcete zabývat vývojovým prostředím, pak si jej můžete sestavit z příkazové řádky.

Nejjednodušší postup je následující:

[raska@localhost]$ cd opt
[raska@localhost opt]$ git clone https://github.com/jraska1/jv-distributed-services-guide.git
[raska@localhost opt]$ cd jv-distributed-services-guide/
[raska@localhost jv-distributed-services-guide]$ mvn package

Spustit aplikaci můžete tak, že zavoláte rovnou sestavený JAR:

[raska@localhost jv-distributed-services-guide]$ java -jar target/distributed-services-guide-1.0.jar --spring.profiles.active=node00

Role žadatel a poskytovatel

V této části úzce navazuji na předchozí sérii článků o komunikaci v distribuovaných systémech. Podstatou byla vždy nějaká interakce mezi žadatelem o službu (to je ta role applicant) a jedním nebo více poskytovateli služeb (to je role provider). Pro vlastní výklad není až tak podstatné, co ty služby vlastně dělají. Také jsem v předchozí sérii končil s tím, že jsem měl vydefinované dva typy služeb ServiceA a ServiceB. Žadatel žádal o jednu z těchto služeb. Poskytovatel takovou službu buď uměl poskytnout, nebo požadavek odmítl.

Pokud bych tyto dvě role promítl do aktuálních definic uzlů, pak by se dala interakce mezi uzly zachytit následovně:

Ruku v ruce jde s definicí služeb také návrh struktury vyměňovaných zpráv. Opět vyjdu z návrhu předchozí série, i když mírně modifikovaného pro aktuální potřeby.

Struktury zpráv opět raději uvedu jako diagram, což je dle mého názoru přehlednější:

Dále se detailněji podívám na implementaci obou rolí.

Role <applicant>

Role umožňuje vyvolání dvou služeb, a to ServiceA a ServiceB.

Které uzly budou osloveny se specifikuje parametrem applicant.destinations uvedeným v konfiguračním souboru uzlu nebo při spuštění aplikace. Jedná se vždy o seznam názvů front oddělených čárkou.

Komunikační funkce

Takto vypadá implementace komunikačních funkcí pro tuto roli ve třídě ApplicantCamelRoutes:

@Component
@Profile(value = "applicant")
public class ApplicantCamelRoutes extends RouteBuilder {

    private static final Logger logger = LoggerFactory.getLogger(ApplicantCamelRoutes.class);

    public static final String RECIPIENT_LIST = "Recipients";

    @Value("${routes.applicants.checkerInterval:100}")
    private long checkerInterval;

    @Autowired(required = false)
    private ApplicationStateComponent applicationState;

    @Override
    public void configure() throws Exception {
        from("direct:applicant").routeId("applicant")
            .onException(ExchangeTimedOutException.class)
                .continued(true)
                .process(exchange -> exchange.getMessage().setBody(null))
            .end()
            .process(exchange -> {
                List<String> recipients = exchange.getMessage().getHeader(RECIPIENT_LIST, List.class);
                exchange.setProperty("Request", exchange.getMessage().getBody(Request.class));
                exchange.setProperty(RECIPIENT_LIST, recipients);
                exchange.getMessage().setHeader("RecipientsURL", recipients.stream()
                        .map(s -> "activemq:queue:" + s + "?explicitQosEnabled=true&preserveMessageQos=true&requestTimeoutCheckerInterval=" + checkerInterval)
                        .collect(Collectors.toList()));
            })
            .recipientList()
                .header("RecipientsURL")
                .parallelProcessing()
                .streaming()
                .aggregationStrategy((oldExchange, newExchange) -> {
                    Exchange result;
                    List<Response> list;
                    if (oldExchange != null) {
                        list = oldExchange.getIn().getBody(List.class);
                        result = oldExchange;
                    }
                    else {
                        list = new ArrayList<>();
                        result = newExchange;
                    }
                    Response resp = newExchange.getMessage().getBody(Response.class);
                    if (resp != null && resp.getCode() == ResponseCodeType.OK) {
                        list.add(newExchange.getMessage().getBody(Response.class));
                    }
                    result.getMessage().setBody(list, List.class);
                    return result;
                })
            .end()
            .process(exchange -> { if (applicationState != null) applicationState.requestMade(); })
            .choice()
                .when(simple("${camelContext.hasEndpoint(direct:applicant-audit)} != null"))
                    .wireTap("direct:applicant-audit")
            .end();
    }
}

Ta implementace by vám měla být povědomá. V podstatě odpovídá tomu, co jsem používal již v první sérii článků.

Navíc je zde ten „ocásek“ na konci cesty obsahující dva procesory. Jedná se o rozšíření, kterými se budu zabývat v dalších pokračováních, proto je prosím v tomto okamžiku ignorujte. Na probíranou funkcionalitu nyní nemají žádný vliv.

Aplikační rozhraní

A nyní se podívám na webové služby, kterými se vyvolává komunikační rozhraní uvedené výše. Implementace je ve třídě ApplicantServiceController:

@RestController
@Profile(value = "applicant")
public class ApplicantServiceController {

    private static final Logger logger = LoggerFactory.getLogger(ApplicantServiceController.class);

    private static final String JMS_EXPIRATION = "JMSExpiration";

    @Autowired
    private ProducerTemplate producerTemplate;

    @Value("${applicant.destinations:#{null}}")
    List<String> destinations;

    @Autowired(required = false)
    private Map<URI, NodeConfiguration> configurations;

    @Autowired
    TokenFactory factory;

    @RequestMapping(value = "/rest/appl01")
    public ResponseEntity<List<ResponseA>> restApplicant01(
            @RequestParam(value = "value") int value,
            @RequestParam(value = "expire", required = false, defaultValue = "3000") Long expire) {

        RequestA request = factory.tokenInstance(RequestA.class);
        request.setValue(value);
        Map<String, Object> headers = new HashMap<>();
        headers.put(JmsConstants.JMS_REQUEST_TIMEOUT, expire);
        headers.put(JMS_EXPIRATION, System.currentTimeMillis() + expire);

        List<String> recipients = null;
        if (configurations != null)
            recipients = configurations.values().stream().filter(x -> x.getQueue() != null).map(NodeConfiguration::getQueue).collect(Collectors.toList());
        else if (destinations != null)
            recipients = destinations;

        if (recipients != null && recipients.size() > 0)
            headers.put(ApplicantCamelRoutes.RECIPIENT_LIST, recipients);
        else
            return ResponseEntity.notFound().build();

        List<ResponseA> response = producerTemplate.requestBodyAndHeaders("direct:applicant", request, headers, List.class);
        if (response != null) {
            return ResponseEntity.ok(response);
        } else {
            return ResponseEntity.notFound().build();
        }
    }

    @RequestMapping(value = "/rest/appl02")
    public ResponseEntity<List<ResponseB>> restApplicant02(
            @RequestParam(value = "text") String text,
            @RequestParam(value = "expire", required = false, defaultValue = "3000") Long expire) {

        RequestB request = factory.tokenInstance(RequestB.class);
        request.setText(text);
        Map<String, Object> headers = new HashMap<>();
        headers.put(JmsConstants.JMS_REQUEST_TIMEOUT, expire);
        headers.put(JMS_EXPIRATION, System.currentTimeMillis() + expire);

        List<String> recipients = null;
        if (configurations != null)
            recipients = configurations.values().stream().filter(x -> x.getQueue() != null).map(NodeConfiguration::getQueue).collect(Collectors.toList());
        else if (destinations != null)
            recipients = destinations;

        if (recipients != null && recipients.size() > 0)
            headers.put(ApplicantCamelRoutes.RECIPIENT_LIST, recipients);
        else
            return ResponseEntity.notFound().build();

        List<ResponseB> response = producerTemplate.requestBodyAndHeaders("direct:applicant", request, headers, List.class);
        if (response != null) {
            return ResponseEntity.ok(response);
        } else {
            return ResponseEntity.notFound().build();
        }
    }
}

Pro každý typ služby je jedno REST volání. Liší se vzájemně typem zadaného parametru. Výsledkem volání je vždy JSON pole s odpověďmi od všech oslovených poskytovatelů (tedy pokud skutečně odpověděli).

Touto anotací se zajistí načtení seznamu cílových front z konfigurace:

    @Value("${applicant.destinations:#{null}}")
    List<String> destinations;

Další části asi není třeba diskutovat, neb byly dostatečně detailně debatovány v první sérii článků.

Role <provider>

Tato role nabízí odpovědi na služby ServiceA a ServiceB. Jedná se tedy o protistranu pro žadatele. Ve všech případech, kdy bude uzel zastávat roli provider, bude nabízet stejnou implementaci pro obě služby. V reálném životě je to ale obvykle opačně. Různé uzly poskytují různé modifikace odpovědi na stejnou službu (prostě ji dělají každá jinak dle svých podmínek). To ale nebudu v tomto článku prezentovat, asi si umíte představit, jak byste to dělali vy sami.

Komunikační funkce

Implementace obou služeb jsou realizovány jako Camel cesty. Jejich definice najdete ve třídě ProviderCamelRoutes.

@Component
@Profile(value = "provider")
public class ProviderCamelRoutes extends RouteBuilder {

    private static final Logger logger = LoggerFactory.getLogger(ProviderCamelRoutes.class);

    @Autowired
    TokenFactory factory;

    @Autowired(required = false)
    private ApplicationStateComponent applicationState;

    @Override
    public void configure() throws Exception {

        from("activemq:queue:{{provider.queue}}").routeId("provider")
            .process(exchange -> exchange.setProperty("Request", exchange.getMessage().getBody(Request.class)))
            .choice()
                .when(body().isInstanceOf(RequestA.class))
                    .process(exchange -> {
                        RequestA request = exchange.getMessage().getBody(RequestA.class);
                        ResponseA response = factory.tokenInstance(request.getTid(), ResponseA.class);
                        response.setCode(ResponseCodeType.OK);
                        response.setResult(request.getValue() + new Random().nextInt((int) request.getValue() / 2));
                        exchange.getMessage().setBody(response);
                    })
                .when(body().isInstanceOf(RequestB.class))
                    .process(exchange -> {
                        RequestB request = exchange.getMessage().getBody(RequestB.class);
                        ResponseB response = factory.tokenInstance(request.getTid(), ResponseB.class);
                        response.setCode(ResponseCodeType.OK);
                        response.setText("text length: " + request.getText().length());
                        exchange.getMessage().setBody(response);
                    })
                .otherwise()
                    .process(exchange -> {
                        Request request = exchange.getMessage().getBody(Request.class);
                        Response response = factory.tokenInstance(request.getTid(), Response.class);
                        response.setCode(ResponseCodeType.REFUSED);
                        exchange.getMessage().setBody(response);
                    })
            .end()
            .process(exchange -> { if (applicationState != null) applicationState.responseMade(); })
            .choice()
                .when(simple("${camelContext.hasEndpoint(direct:provider-audit)} != null"))
                    .wireTap("direct:provider-audit")
            .end();
    }
}

Opět by to pro vás nemělo být žádné překvapení. Tohle jsme již vše diskutovali dříve. Je zde opět ten „ocásek“ na konci cesty. Opět se jedná o rozšíření, které nemá nyní vliv na funkcionalitu a můžete jej s klidem ignorovat.

Aplikační rozhraní

Tady nastává změna. Pro roli poskytovatel nemám vytvořené žádné uživatelské rozhraní ve formě REST služby. Ono by vlastně ani k ničemu nebylo, neb není co prezentovat.

Nastal čas na vyzkoušení

Předpokladem pro úspěšné vyzkoušení výše diskutované funkcionality je stažení zdrojových kódů projektu a jejich úspěšné sestavení. Měli byste tedy mít dostupný jeden JAR v adresáři ./target projektu.

Dále budu pro prezentaci používat několik současně spuštěných terminálů. Jako aktuální adresář je vždy nastaven kořenový adresář projektu. Můžete být i jinde, ale pak musíte správně nastavit cestu na výše zmíněný JAR soubor. Jiné soubory nebude potřeba, vše včetně implicitní konfigurace je zabaleno v tomto souboru.

No a ještě jedna poznámka. Nezapomeňte si spustit message broker. Bez něho by nám nic nefungovalo.

První spuštění uzlu node01

V terminálu spusťte (pro první spuštění uvedu celý výpis logů, což v dalších příkladech již vypustím):

[raska@localhost distributed-services-guide]$ java -jar target/distributed-services-guide-1.0.jar --spring.profiles.active=node01

  .   ____          _            __ _ _
 /\\ / ___‘_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | ‚_ | ‚_| | ‚_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  ‚  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.4.0)

2021-02-13 13:01:12.636  INFO 3120 --- [           main] c.d.distrib_services_guide.Application   : Starting Application v1.0 using Java 11.0.10 on localhost.localdomain with PID 3120 (/home/raska/IdeaProjects/distributed-services-guide/target/distributed-services-guide-1.0.jar started by raska in /home/raska/IdeaProjects/distributed-services-guide)
2021-02-13 13:01:12.643  INFO 3120 --- [           main] c.d.distrib_services_guide.Application   : The following profiles are active: node01,applicant
2021-02-13 13:01:15.925  INFO 3120 --- [           main] org.eclipse.jetty.util.log               : Logging initialized @6594ms to org.eclipse.jetty.util.log.Slf4jLog
2021-02-13 13:01:16.256  INFO 3120 --- [           main] o.s.b.w.e.j.JettyServletWebServerFactory : Server initialized with port: 8081
2021-02-13 13:01:16.261  INFO 3120 --- [           main] org.eclipse.jetty.server.Server          : jetty-9.4.34.v20201102; built: 2020-11-02T14:15:39.302Z; git: e46af88704a893fc12cb0e3bf46e2c7b48a009e7; jvm 11.0.10+9
2021-02-13 13:01:16.359  INFO 3120 --- [           main] o.e.j.s.h.ContextHandler.application     : Initializing Spring embedded WebApplicationContext
2021-02-13 13:01:16.359  INFO 3120 --- [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 3512 ms
2021-02-13 13:01:16.684  INFO 3120 --- [           main] org.eclipse.jetty.server.session         : DefaultSessionIdManager workerName=node0
2021-02-13 13:01:16.684  INFO 3120 --- [           main] org.eclipse.jetty.server.session         : No SessionScavenger set, using defaults
2021-02-13 13:01:16.687  INFO 3120 --- [           main] org.eclipse.jetty.server.session         : node0 Scavenging every 600000ms
2021-02-13 13:01:16.739  INFO 3120 --- [           main] o.e.jetty.server.handler.ContextHandler  : Started o.s.b.w.e.j.JettyEmbeddedWebAppContext@40238dd0{application,/,[file:///tmp/jetty-docbase.8081.11044523953630318635/],AVAILABLE}
2021-02-13 13:01:16.740  INFO 3120 --- [           main] org.eclipse.jetty.server.Server          : Started @7411ms
2021-02-13 13:01:17.036  INFO 3120 --- [           main] o.apache.camel.support.LRUCacheFactory   : Detected and using LRUCacheFactory: camel-caffeine-lrucache
2021-02-13 13:01:18.138  INFO 3120 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService &apos;applicationTaskExecutor&apos;
2021-02-13 13:01:19.397  INFO 3120 --- [           main] o.e.j.s.h.ContextHandler.application     : Initializing Spring DispatcherServlet &apos;dispatcherServlet&apos;
2021-02-13 13:01:19.397  INFO 3120 --- [           main] o.s.web.servlet.DispatcherServlet        : Initializing Servlet &apos;dispatcherServlet&apos;
2021-02-13 13:01:19.399  INFO 3120 --- [           main] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
2021-02-13 13:01:19.442  INFO 3120 --- [           main] o.e.jetty.server.AbstractConnector       : Started ServerConnector@20435c40{HTTP/1.1, (http/1.1)}{0.0.0.0:8081}
2021-02-13 13:01:19.445  INFO 3120 --- [           main] o.s.b.web.embedded.jetty.JettyWebServer  : Jetty started on port(s) 8081 (http/1.1) with context path &apos;/&apos;
2021-02-13 13:01:19.590  INFO 3120 --- [           main] o.a.c.s.boot.SpringBootRoutesCollector   : Loading additional Camel XML routes from: classpath:camel/*.xml
2021-02-13 13:01:19.597  INFO 3120 --- [           main] o.a.c.s.boot.SpringBootRoutesCollector   : Loading additional Camel XML route templates from: classpath:camel-template/*.xml
2021-02-13 13:01:19.597  INFO 3120 --- [           main] o.a.c.s.boot.SpringBootRoutesCollector   : Loading additional Camel XML rests from: classpath:camel-rest/*.xml
2021-02-13 13:01:21.814  INFO 3120 --- [           main] o.a.c.impl.engine.AbstractCamelContext   : Apache Camel 3.6.0 (camel-1) is starting
2021-02-13 13:01:21.820  INFO 3120 --- [           main] o.a.c.impl.engine.AbstractCamelContext   : StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2021-02-13 13:01:21.820  INFO 3120 --- [           main] o.a.c.impl.engine.AbstractCamelContext   : Using HealthCheck: camel-health
2021-02-13 13:01:21.868  INFO 3120 --- [           main] o.a.c.i.e.InternalRouteStartupManager    : Route: applicant started and consuming from: direct://applicant
2021-02-13 13:01:21.899  INFO 3120 --- [           main] o.a.c.impl.engine.AbstractCamelContext   : Total 1 routes, of which 1 are started
2021-02-13 13:01:21.899  INFO 3120 --- [           main] o.a.c.impl.engine.AbstractCamelContext   : Apache Camel 3.6.0 (camel-1) started in 0.081 seconds
2021-02-13 13:01:21.916  INFO 3120 --- [           main] c.d.distrib_services_guide.Application   : Started Application in 10.972 seconds (JVM running for 12.587)
2021-02-13 13:01:21.925  INFO 3120 --- [           main] c.d.distrib_services_guide.Application   : *** Hello World, greetings from Dwarf ***

Z vypsaných logovacích záznamů byste měli vidět, že se vám spustila aplikace s profilem node01 a applicant. Dále se vám spustil Jetty webový server na portu 8081, a také Camel kontext s jednou cestou applicant. Aplikace by měla stále běžet a čekat na požadavky z REST rozhraní. Zastavit jí můžete přes Ctrl-C.

Zkusím vyvolat jednu službu (jiný terminál):

[raska@localhost ~]$ time curl -s http://localhost:8081/rest/appl01?value=1234 | jq .
[]

real    0m3.200s
user    0m0.039s
sys     0m0.010s

Výsledek je prázdné JSON pole odpovědí. To je v pořádku, protože jsem doposud žádného poskytovatele služeb nespustil. Ale proč to trvalo tak dlouho? Protože služba čekala, než vyprší čas poskytnutý na odpovědi pro všechny oslovené (implicitně mají služby nastaveny 3 sekundy).

Další uzly node02 a node03

Přidám tedy uzly, které mají nastavenu roli poskytovatele služby. V mém případě mluvím o o uzlech node02 a node03.

Každý spustím v samostatném terminálu:

[raska@localhost distributed-services-guide]$ java -jar target/distributed-services-guide-1.0.jar --spring.profiles.active=node02
[raska@localhost distributed-services-guide]$ java -jar target/distributed-services-guide-1.0.jar --spring.profiles.active=node03

Opět byste měli v logovacích záznamech vidět, že se vám spustila aplikace s profilem node02 nebo node03 a provider. U každého se pustil i webový server, ale každý na jiném portu. A nesmím zapomenout na Camel kontext s cestou provider.

Tak, a nyní vyzkouším, zda bude node01 dávat jiné výsledky:

[raska@localhost ~]$ time curl -s http://localhost:8081/rest/appl01?value=1234 | jq .
[
  {
    "nid": "local:node02",
    "name": "node02",
    "tid": "uuid:6f8c2220-c2bc-43bf-b2f4-bdbf86a400ff",
    "ts": "2021-02-13T12:18:43.385+00:00",
    "code": "OK",
    "result": 1521
  },
  {
    "nid": "local:node03",
    "name": "node03",
    "tid": "uuid:6f8c2220-c2bc-43bf-b2f4-bdbf86a400ff",
    "ts": "2021-02-13T12:18:43.464+00:00",
    "code": "OK",
    "result": 1395
  }
]

real    0m3.341s
user    0m0.041s
sys     0m0.012s

Dostal jsem odpovědi od obou poskytovatelů. Proč to ale stále trvá tak dlouho? No protože v konfiguraci uzlu node01 mám uvedeno, že má oslovit fronty QUEUE-1, QUEUE-2 a QUEUE-3. Implementace služba stále čeká na odpověď z fronty QUEUE-1, která ale nepřijde.

Přidání role při startu uzlu

Zkusím tedy přidat roli provider při spuštění uzlu node01. Nejdříve jej zastavte, a následně opět spusťte s doplněným profilem:

[raska@localhost distributed-services-guide]$ java -jar target/distributed-services-guide-1.0.jar --spring.profiles.active=node01,provider --provider.queue=QUEUE-1

Spustily se vám profily node01, applicant a provider.

A jaký bude výsledek vyvolání služby nyní?

ict ve školství 24

[raska@localhost ~]$ time curl -s http://localhost:8081/rest/appl01?value=1234 | jq .
[
  {
    "nid": "local:node01",
    "name": "node01",
    "tid": "uuid:2f25290a-9014-4b71-9f1c-1f489f61e292",
    "ts": "2021-02-13T12:31:11.523+00:00",
    "code": "OK",
    "result": 1445
  },
  {
    "nid": "local:node02",
    "name": "node02",
    "tid": "uuid:2f25290a-9014-4b71-9f1c-1f489f61e292",
    "ts": "2021-02-13T12:31:11.540+00:00",
    "code": "OK",
    "result": 1245
  },
  {
    "nid": "local:node03",
    "name": "node03",
    "tid": "uuid:2f25290a-9014-4b71-9f1c-1f489f61e292",
    "ts": "2021-02-13T12:31:11.571+00:00",
    "code": "OK",
    "result": 1595
  }
]

real    0m0.286s
user    0m0.044s
sys     0m0.004s

Tak to už je výrazně lepší. Dostal jsem odpovědi od všech poskytovatelů, a navíc to bylo značně svižnější.

Náměty na další experimentování

A teď si s uzly můžete libovolně hrát, například:

  • přidat role applicant pro uzly node02 a node03, a vyzkoušet si zavolat služby na těchto uzlech

  • přidat si další uzel na což můžete využít předdefinovaný uzel node00 bez přidělených rolí

  • vydefinovat si další uzly a spustit je v různých rolích

  • případně co vás ještě napadne

Autor článku

Jiří Raška pracuje na pozici IT architekta. Poslední roky se zaměřuje na integrační a komunikační projekty ve zdravotnictví. Mezi jeho koníčky patří také paragliding a jízda na horském kole.