Příklady k tomuto článku je možné najít v balíčku example02.
Předávané zprávy
Stejně jako tomu bylo v případě předávání zpráv, potřebuji mít vydefinované typy zpráv.
V mém případě se bude jednat o Java Bean, které budou představovat požadavek na službu a její výsledek. Jejich definice je v balíčku entity.
Je tam abstraktní třída Token, kterou budu používat jako předchůdce pro všechny vyměňované zprávy. Ta vypadá následovně:
public abstract class Token implements Serializable { private String name; private Date ts; public Token(String name, Date ts) { this.name = name; this.ts = ts; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Date getTs() { return ts; } public void setTs(Date ts) { this.ts = ts; } @Override public String toString() { return "Token{" + "name='" + name + '\'' + ", ts=" + ts + '}'; } }
Dále je tam třída Request, která reprezentuje požadavek na službu:
public class Request extends Token { private long value; public Request(String name, Date ts, long value) { super(name, ts); this.value = value; } public long getValue() { return value; } public void setValue(long value) { this.value = value; } @Override public String toString() { return "Request{value=" + value + ", " + super.toString() + "}"; } }
A nakonec je zde třída Response, která reprezentuje odpověď na náš dotaz:
public class Response extends Token { private long result; public Response(String name, Date ts, long result) { super(name, ts); this.result = result; } public long getResult() { return result; } public void setResult(long result) { this.result = result; } @Override public String toString() { return "Response{result=" + result + ", " + super.toString() + "}"; } }
Vyvolání služby
V této části se budu zabývat dvěma způsoby vzdáleného volání služby. Vzájemně se liší v tom, jestli oslovuji jednoho nebo více poskytovatelů služby.
Nejdříve ukázka definovaných cest a pak se podívám na jednotlivé způsoby samostatně:
@Component public class CamelRoutes extends RouteBuilder { private static final Logger logger = LoggerFactory.getLogger(CamelRoutes.class); @Override public void configure() { // Applicant Route definitions ... from("timer://applicant01?fixedRate=true&delay=0&repeatCount=1").routeId("applicant01") .process(exchange -> { exchange.getMessage().setBody(new Request("applicant01", new Date(), 10)); logger.info(">>> {}", exchange.getMessage().getBody(Request.class)); }) .setExchangePattern(ExchangePattern.InOut) .to("activemq:queue:QUEUE-1") .process(exchange -> logger.info("<<< {}", exchange.getMessage().getBody(Response.class))); from("timer://applicant02?fixedRate=true&delay=10000&repeatCount=1").routeId("applicant02") .process(exchange -> { exchange.getMessage().setBody(new Request("applicant02", new Date(), 20)); logger.info(">>> {}", exchange.getMessage().getBody(Request.class)); }) .setExchangePattern(ExchangePattern.InOut) .multicast() .aggregationStrategy(new GroupedBodyAggregationStrategy()) .to("activemq:queue:QUEUE-1", "activemq:queue:QUEUE-2", "activemq:queue:QUEUE-3") .end() .process(exchange -> exchange.getMessage().getBody(List.class).forEach(o -> logger.info("<<< {}", o.toString()))); // Provider Route definitions ... from("activemq:queue:QUEUE-1").routeId("provider01") .process(exchange -> { Request request = exchange.getMessage().getBody(Request.class); exchange.getMessage().setBody(new Response("provider01", new Date(), request.getValue() + 10)); }); from("activemq:queue:QUEUE-2").routeId("provider02") .process(exchange -> { Request request = exchange.getMessage().getBody(Request.class); exchange.getMessage().setBody(new Response("provider02", new Date(), (request.getValue() + 10) * 2)); }); from("activemq:queue:QUEUE-3").routeId("provider03") .process(exchange -> { Request request = exchange.getMessage().getBody(Request.class); exchange.getMessage().setBody(new Response("provider03", new Date(), (request.getValue() + 50) * request.getValue())); }); } }
Aby se mně žadatelé nastartovali, jsou jejich cesty iniciovány pomocí komponenty timer. Je to jenom berlička pro tyhle příklady, v dalších článcích přejdu na inicializaci z REST rozhraní.
Mám vytvořeny tři poskytovatele služby, kteří poslouchají na frontě QUEUE-1, QUEUE-2 a QUEUE-3. Všichni jako vstup očekávají objekt třídy Request. Na základě v něm zadané hodnoty pak vytvoří odpověď ve formě objektu třídy Response. Ten pak odešlou zpět žadateli.
Vyvolání služby jednoho poskytovatele – metoda request/response
Asi ta nejjednodušší obdoba lokálního volání procedury, ale v distribuované podobě. Žadatel a poskytovatel mohou běžet na jakémkoliv uzlu, propojeni pouze prostřednictvím fronty zpráv.
Žadatel, v tomto případě applicant01, předá žádost poskytovateli provider01 prostřednictvím fronty QUEUE-1.
Odesilatel vytvoří bean třídy Request a předá je do fronty. Rozdíl proti předávání zpráv je v nastavení vzoru komunikace.
Tímto nastavením komunikačního vzoru říkám, že od příjemce očekávám odpověď:
.setExchangePattern(ExchangePattern.InOut)
Poté, co mně poskytovatel odpoví, výsledek pouze zapíšu do logu.
Takto by to mělo vypadat v logu po spuštění:
2021-01-01 18:40:24.005 INFO [r://applicant01]: >>> Request{value=10, Token{name='applicant01', ts=Fri Jan 01 18:40:24 CET 2021}} 2021-01-01 18:40:24.156 INFO [anager[QUEUE-1]]: <<< Response{result=20, Token{name='provider01', ts=Fri Jan 01 18:40:24 CET 2021}}
Vyvolání služby více poskytovatelů – metoda multicast/response
V tomto případě je situace o něco komplikovanější, ale ne zase o tolik.
Jako žadatel chci oslovit více poskytovatelů a od každého dostat nějakou odpověď.
V mém příkladu je to applicant02, který nejdříve vytvoří žádost typu Request. Nastaví vzor komunikace na request/response. Doposud je to stejné jako v předchozím případě.
Dále ale požadavek rozešle třem poskytovatelům provider01, provider02 a provider03, adresovaným prostřednictvím jejich front.
Každý provider vytvoří nějakou odpověď dle svých pravidel, a odešle ji zpět žadateli.
V tomto okamžiku ale žadatel dostane více než jednu odpověď. Musí si je nějak spojit do jednoho výsledku.
Ve výše uvedeném příkladu se používá strategie spojení více odpovědí do seznamu (pochopitelně si můžete napsat i vlastní, ale to teď řešit nebudu):
.aggregationStrategy(new GroupedBodyAggregationStrategy())
Tato strategie zajistí, že všechny odpovědi dostanu jako seznam, tedy v Java je to List<Response>.
A pak již následuje pouze zápis do logu.
Výsledek v logu by pak měl vypadat nějak takto:
2021-01-01 18:40:33.980 INFO [r://applicant02]: >>> Request{value=20, Token{name='applicant02', ts=Fri Jan 01 18:40:33 CET 2021}} 2021-01-01 18:40:34.098 INFO [anager[QUEUE-3]]: <<< Response{result=30, Token{name='provider01', ts=Fri Jan 01 18:40:33 CET 2021}} 2021-01-01 18:40:34.099 INFO [anager[QUEUE-3]]: <<< Response{result=60, Token{name='provider02', ts=Fri Jan 01 18:40:34 CET 2021}} 2021-01-01 18:40:34.100 INFO [anager[QUEUE-3]]: <<< Response{result=1400, Token{name='provider03', ts=Fri Jan 01 18:40:34 CET 2021}}
Proč není příklad na metodu broadcast/response
V předchozím díle o předávání zpráv jsem měl prezentovánu metodu broadcast. Proč tedy není i tady varianta, která by umožnila broadcast požadavku a posbírání všech odpovědí?
No, taková varianta existuje. Pokud o ní bude mít čtenář zájem, může se podívat třeba na vzor EIP Scatter-Gather.
Nicméně jedná se o vzor použití dost komplikovaný, v reálu přinášející hodně problémů se zajištěním synchronního způsobu volání služby.