V mém případě malých testovacích zpráv tomu tak není, ale pokud by součástí požadavku nebo odpovědi na službu byl nějaký velký objekt, např. obrázek, pak by situace byla výrazně jinačí. Co tedy s tím?
Asi máme dvě možné cesty, kam se vydat.
První možností je zvolit nějaký vhodnější formát, asi binární, pro výměnu zpráv mezi komunikujícími uzly. Možností je opět více, např. BSON, když už jsem se dříve přihlásil k jeho strýci JSONovi. Tím se ale dnes zabývat nebudu.
Druhou možností je zkusit udělat kompresi zprávy před odesláním. Při příjmu pak nejdříve dekomprimovat data a následně je zpracovat. Možných komprimačních formátů je opět několik. Pro ukázku jsem si vybral jednoduchou variantu s podporou přímo v Java, a sice GZIP.
V příkladech je ukázána serializace do JSON (ta se dělá vždy; nebudu jí ovlivňovat nějakými parametry), a následně komprese do GZIP těsně před odesláním.
Při příjmu je postup opačný, takže nejdříve dekomprese GZIP, následně reverzní serializace JSON, a nakonec vlastní zpracování.
Příklady k tomuto článku je možné najít v package: example07
Předávané zprávy
Všechny vydefinované typy zpráv jsou Java Bean, jejíž definice jsou v package entity.
Definované Camel cesty
Takto vypadají všechny definované cesty v Camel:
@Component public class CamelRoutes extends RouteBuilder { private static final Logger logger = LoggerFactory.getLogger(CamelRoutes.class); private static final String HEADER_CLASS_NAME = "ObjectClassName"; @Autowired private ObjectMapper jsonMapper; @Autowired private ProducerTemplate producerTemplate; @Override public void configure() { // Compression Route definitions ... from("direct:object-compression").routeId("object-compression") .choice() .when(simple("${header.ObjectCompression?.toLowerCase()} == 'gzip'")) .process(exchange -> { ByteArrayOutputStream baos = new ByteArrayOutputStream(); try (GZIPOutputStream gzipOut = new GZIPOutputStream(baos)) { gzipOut.write(exchange.getMessage().getBody(String.class).getBytes()); } exchange.getMessage().setBody(baos.toByteArray(), byte[].class); logger.info("COMPRESSED OBJECT to GZIP"); }) .otherwise() .process(exchange -> logger.info("No object compression applied on the message.")) .end(); from("direct:reverse-compression").routeId("reverse-compression") .choice() .when(simple("${header.ObjectCompression?.toLowerCase()} == 'gzip'")) .process(exchange -> { GZIPInputStream gzipIn = new GZIPInputStream(new ByteArrayInputStream(exchange.getMessage().getBody(byte[].class))); exchange.getMessage().setBody(new String(gzipIn.readAllBytes())); logger.info("DECOMPRESSED OBJECT from GZIP"); }) .otherwise() .process(exchange -> logger.debug("No reverse compression applied on the message.")) .end(); // Serialization Route definitions ... from("direct:object-mapping").routeId("object-mapping") .process(exchange -> { Token token = exchange.getMessage().getBody(Token.class); exchange.getMessage().setHeader(HEADER_CLASS_NAME, token.getClass().getCanonicalName()); exchange.getMessage().setBody(jsonMapper.writeValueAsString(exchange.getMessage().getBody()), String.class); }); from("direct:reverse-mapping").routeId("reverse-mapping") .process(exchange -> { String className = exchange.getMessage().getHeader(HEADER_CLASS_NAME, String.class); Token token = (Token) jsonMapper.readValue(exchange.getMessage().getBody(String.class), Class.forName(className)); exchange.getMessage().setBody(token); }); // Applicant Route definitions ... from("direct:applicant01").routeId("applicant01") .to("direct:object-mapping") .to("direct:object-compression") .multicast() .aggregationStrategy((oldExchange, newExchange) -> { Exchange result; producerTemplate.send("direct:reverse-compression", newExchange); producerTemplate.send("direct:reverse-mapping", newExchange); List<Response> list; if (oldExchange != null) { list = oldExchange.getIn().getBody(List.class); result = oldExchange; } else { list = new ArrayList<>(); result = newExchange; } Response resp = newExchange.getMessage().getBody(Response.class); list.add(newExchange.getMessage().getBody(Response.class)); result.getMessage().setBody(list, List.class); return result; }) .to("activemq:queue:QUEUE-1", "activemq:queue:QUEUE-2", "activemq:queue:QUEUE-3") .end(); // Provider Route definitions ... from("activemq:queue:QUEUE-1").routeId("provider01") .to("direct:reverse-compression") .to("direct:reverse-mapping") .process(exchange -> { Request request = exchange.getMessage().getBody(Request.class); Response response = new Response("provider01", new Date(), request.getValue() + 10); exchange.getMessage().setBody(response); }) .to("direct:object-mapping") .to("direct:object-compression"); from("activemq:queue:QUEUE-2").routeId("provider02") .to("direct:reverse-compression") .to("direct:reverse-mapping") .process(exchange -> { Request request = exchange.getMessage().getBody(Request.class); Response response = new Response("provider02", new Date(), (request.getValue() + 10) * 2); exchange.getMessage().setBody(response); }) .to("direct:object-mapping") .to("direct:object-compression"); from("activemq:queue:QUEUE-3").routeId("provider03") .to("direct:reverse-compression") .to("direct:reverse-mapping") .process(exchange -> { Request request = exchange.getMessage().getBody(Request.class); Response response = new Response("provider03", new Date(), (request.getValue() + 50) * request.getValue()); exchange.getMessage().setBody(response); }) .to("direct:object-mapping") .to("direct:object-compression"); } }
V podstatě se jedná o stejné řešení jako v případě ukázky serializace.
Přibyly mně dvě cesty:
- direct:object-compression – provede kompresi obsahu zprávy
- direct:reverse-compression – provede zpětnou dekompresi obsahu zprávy
Pokud se podíváte na pořadí volání jednotlivých cest pro kompresi a serializaci, pak bude asi zřejmé jejich použití jak u žadatele o službu, tak u poskytovatele.
To, zda se bude obsah komprimovat/dekomprimovat, se opět řídí pomocí hlavičky. Tentokrát je to hlavička s názvem ObjectCompression. Pokud je definována, pak obsahuje název použitého algoritmu komprese.
V případě dekomprese odpovědí se musím opět popasovat s problémem spojení do jednoho seznamu. Použiji vlastní implementaci agregační strategie, kdy při příjmu odpovědi nejdříve provedu dekompresi a převod do Java bean. Následuje pak spojení do seznamu.
REST API aplikace
V tomto případě se jedná o jednoduchou službu. Nic překvapivého neočekávejte:
@RestController public class ServiceController { private static final String HEADER_OBJECT_COMPRESSION = "ObjectCompression"; @Autowired private ProducerTemplate producerTemplate; @RequestMapping(value = "/rest/appl01") public ResponseEntity<List<Response>> restApplicant01( @RequestBody Request request, @RequestParam(value = "compression", required = false) String objectCompression) { if (request.getName() == null) request.setName("rest-applicant01"); request.setTs(new Date()); Map<String, Object> headers = new HashMap<>(); if (objectCompression != null && objectCompression.length() > 0) headers.put(HEADER_OBJECT_COMPRESSION, objectCompression.toLowerCase()); List<Response> response = producerTemplate.requestBodyAndHeaders("direct:applicant01", request, headers, List.class); if (response != null) { return ResponseEntity.ok(response); } else { return ResponseEntity.notFound().build(); } } }
Služba má jeden volitelný parametr, kterým říkám, zda chci komprimovat nebo ne. Ten se použije k nastavení hlavičky ObjectCompression.
Jak si to vyzkoušet
Takto to vypadá, pokud zavolám službu s požadovanou kompresí GZIP:
[raska@localhost ~]$ curl -s -d '{ "value": "1234", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' 'http://localhost:8080/rest/appl01?compression=gzip' | jq . [ { "name": "provider01", "ts": "2021-01-03T09:43:39.577+00:00", "result": 1244 }, { "name": "provider02", "ts": "2021-01-03T09:43:39.621+00:00", "result": 2488 }, { "name": "provider03", "ts": "2021-01-03T09:43:39.852+00:00", "result": 1584456 } ]
No a v logu bych měl vidět, že se provedla serializace a komprese zpráv. Nějak takto by to mělo vypadat:
2021-01-03 10:43:39.476 INFO: COMPRESSED OBJECT to GZIP 2021-01-03 10:43:39.573 INFO: DECOMPRESSED OBJECT from GZIP 2021-01-03 10:43:39.580 INFO: COMPRESSED OBJECT to GZIP 2021-01-03 10:43:39.602 INFO: DECOMPRESSED OBJECT from GZIP 2021-01-03 10:43:39.620 INFO: DECOMPRESSED OBJECT from GZIP 2021-01-03 10:43:39.625 INFO: COMPRESSED OBJECT to GZIP 2021-01-03 10:43:39.668 INFO: DECOMPRESSED OBJECT from GZIP 2021-01-03 10:43:39.851 INFO: DECOMPRESSED OBJECT from GZIP 2021-01-03 10:43:39.854 INFO: COMPRESSED OBJECT to GZIP 2021-01-03 10:43:39.876 INFO: DECOMPRESSED OBJECT from GZIP
Tady je volání bez požadované komprese:
[raska@localhost ~]$ curl -s -d '{ "value": "1234", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' 'http://localhost:8080/rest/appl01' | jq .
[
{
"name": "provider01",
"ts": "2021-01-03T09:45:48.674+00:00",
"result": 1244
},
{
"name": "provider02",
"ts": "2021-01-03T09:45:48.695+00:00",
"result": 2488
},
{
"name": "provider03",
"ts": "2021-01-03T09:45:48.736+00:00",
"result": 1584456
}
]
V logu bych měl vidět, že se v tomto případě komprese nedělala. Asi nějak takto:
2021-01-03 10:45:48.651 INFO: No object compression applied on the message. 2021-01-03 10:45:48.677 INFO: No object compression applied on the message. 2021-01-03 10:45:48.695 INFO: No object compression applied on the message. 2021-01-03 10:45:48.736 INFO: No object compression applied on the message.