Komunikace v distribuovaných systémech: komprese zpráv

16. 3. 2021
Doba čtení: 6 minut

Sdílet

 Autor: Depositphotos
Pokud budu provádět serializaci zpráv do textového formátu, např. XML nebo JSON, mohu se dostat do problémů s velikostí zpráv. Textová reprezentace binárních dat může být významně větší než jejich přirozená podoba.

V mém případě malých testovacích zpráv tomu tak není, ale pokud by součástí požadavku nebo odpovědi na službu byl nějaký velký objekt, např. obrázek, pak by situace byla výrazně jinačí. Co tedy s tím?

Asi máme dvě možné cesty, kam se vydat. 

První možností je zvolit nějaký vhodnější formát, asi binární, pro výměnu zpráv mezi komunikujícími uzly. Možností je opět více, např. BSON, když už jsem se dříve přihlásil k jeho strýci JSONovi. Tím se ale dnes zabývat nebudu.

Druhou možností je zkusit udělat kompresi zprávy před odesláním. Při příjmu pak nejdříve dekomprimovat data a následně je zpracovat. Možných komprimačních formátů je opět několik. Pro ukázku jsem si vybral jednoduchou variantu s podporou přímo v Java, a sice GZIP.

V příkladech je ukázána serializace do JSON (ta se dělá vždy; nebudu jí ovlivňovat nějakými parametry), a následně komprese do GZIP těsně před odesláním. 

Při příjmu je postup opačný, takže nejdříve dekomprese GZIP, následně reverzní serializace JSON, a nakonec vlastní zpracování.

Příklady k tomuto článku je možné najít v package: example07

Předávané zprávy

Všechny vydefinované typy zpráv jsou Java Bean, jejíž definice jsou v package entity.

Definované Camel cesty

Takto vypadají všechny definované cesty v Camel:

@Component
public class CamelRoutes extends RouteBuilder {

    private static final Logger logger = LoggerFactory.getLogger(CamelRoutes.class);

    private static final String HEADER_CLASS_NAME = "ObjectClassName";

    @Autowired
    private ObjectMapper jsonMapper;

    @Autowired
    private ProducerTemplate producerTemplate;

    @Override
    public void configure() {

//      Compression Route definitions ...
        from("direct:object-compression").routeId("object-compression")
            .choice()
                .when(simple("${header.ObjectCompression?.toLowerCase()} == 'gzip'"))
                    .process(exchange -> {
                        ByteArrayOutputStream baos = new ByteArrayOutputStream();
                        try (GZIPOutputStream gzipOut = new GZIPOutputStream(baos)) {
                            gzipOut.write(exchange.getMessage().getBody(String.class).getBytes());
                        }
                        exchange.getMessage().setBody(baos.toByteArray(), byte[].class);
                        logger.info("COMPRESSED OBJECT to GZIP");
                    })
                .otherwise()
                    .process(exchange -> logger.info("No object compression applied on the message."))
            .end();

        from("direct:reverse-compression").routeId("reverse-compression")
            .choice()
                .when(simple("${header.ObjectCompression?.toLowerCase()} == 'gzip'"))
                    .process(exchange -> {
                        GZIPInputStream gzipIn = new GZIPInputStream(new ByteArrayInputStream(exchange.getMessage().getBody(byte[].class)));
                        exchange.getMessage().setBody(new String(gzipIn.readAllBytes()));
                        logger.info("DECOMPRESSED OBJECT from GZIP");
                    })
                .otherwise()
                    .process(exchange -> logger.debug("No reverse compression applied on the message."))
            .end();

//      Serialization Route definitions ...
        from("direct:object-mapping").routeId("object-mapping")
            .process(exchange -> {
                Token token = exchange.getMessage().getBody(Token.class);
                exchange.getMessage().setHeader(HEADER_CLASS_NAME, token.getClass().getCanonicalName());
                exchange.getMessage().setBody(jsonMapper.writeValueAsString(exchange.getMessage().getBody()), String.class);
            });

        from("direct:reverse-mapping").routeId("reverse-mapping")
            .process(exchange -> {
                String className = exchange.getMessage().getHeader(HEADER_CLASS_NAME, String.class);
                Token token = (Token) jsonMapper.readValue(exchange.getMessage().getBody(String.class), Class.forName(className));
                exchange.getMessage().setBody(token);
            });

//      Applicant Route definitions ...
        from("direct:applicant01").routeId("applicant01")
            .to("direct:object-mapping")
            .to("direct:object-compression")
            .multicast()
                .aggregationStrategy((oldExchange, newExchange) -> {
                    Exchange result;

                    producerTemplate.send("direct:reverse-compression", newExchange);
                    producerTemplate.send("direct:reverse-mapping", newExchange);

                    List<Response> list;
                    if (oldExchange != null) {
                        list = oldExchange.getIn().getBody(List.class);
                        result = oldExchange;
                    }
                    else {
                        list = new ArrayList<>();
                        result = newExchange;
                    }
                    Response resp = newExchange.getMessage().getBody(Response.class);
                    list.add(newExchange.getMessage().getBody(Response.class));
                    result.getMessage().setBody(list, List.class);
                    return result;
                })
                .to("activemq:queue:QUEUE-1", "activemq:queue:QUEUE-2", "activemq:queue:QUEUE-3")
            .end();

//      Provider Route definitions ...
        from("activemq:queue:QUEUE-1").routeId("provider01")
                .to("direct:reverse-compression")
                .to("direct:reverse-mapping")
                .process(exchange -> {
                    Request request = exchange.getMessage().getBody(Request.class);
                    Response response = new Response("provider01", new Date(), request.getValue() + 10);
                    exchange.getMessage().setBody(response);
                })
                .to("direct:object-mapping")
                .to("direct:object-compression");

        from("activemq:queue:QUEUE-2").routeId("provider02")
                .to("direct:reverse-compression")
                .to("direct:reverse-mapping")
                .process(exchange -> {
                    Request request = exchange.getMessage().getBody(Request.class);
                    Response response = new Response("provider02", new Date(), (request.getValue() + 10) * 2);
                    exchange.getMessage().setBody(response);
                })
                .to("direct:object-mapping")
                .to("direct:object-compression");

        from("activemq:queue:QUEUE-3").routeId("provider03")
                .to("direct:reverse-compression")
                .to("direct:reverse-mapping")
                .process(exchange -> {
                    Request request = exchange.getMessage().getBody(Request.class);
                    Response response = new Response("provider03", new Date(), (request.getValue() + 50) * request.getValue());
                    exchange.getMessage().setBody(response);
                })
                .to("direct:object-mapping")
                .to("direct:object-compression");
    }
}

V podstatě se jedná o stejné řešení jako v případě ukázky serializace. 

Přibyly mně dvě cesty:

  • direct:object-compression   – provede kompresi obsahu zprávy 
  • direct:reverse-compression – provede zpětnou dekompresi obsahu zprávy

Pokud se podíváte na pořadí volání jednotlivých cest pro kompresi a serializaci, pak bude asi zřejmé jejich použití jak u žadatele o službu, tak u poskytovatele.

To, zda se bude obsah komprimovat/dekomprimovat, se opět řídí pomocí hlavičky. Tentokrát je to hlavička s názvem ObjectCompression. Pokud je definována, pak obsahuje název použitého algoritmu komprese.

V případě dekomprese odpovědí se musím opět popasovat s problémem spojení do jednoho seznamu. Použiji vlastní implementaci agregační strategie, kdy při příjmu odpovědi nejdříve provedu dekompresi a převod do Java bean. Následuje pak spojení do seznamu.

REST API aplikace

V tomto případě se jedná o jednoduchou službu. Nic překvapivého neočekávejte:

@RestController
public class ServiceController {

    private static final String HEADER_OBJECT_COMPRESSION = "ObjectCompression";

    @Autowired
    private ProducerTemplate producerTemplate;

    @RequestMapping(value = "/rest/appl01")
    public ResponseEntity<List<Response>> restApplicant01(
            @RequestBody Request request,
            @RequestParam(value = "compression", required = false) String objectCompression) {
        if (request.getName() == null)
            request.setName("rest-applicant01");
        request.setTs(new Date());

        Map<String, Object> headers = new HashMap<>();
        if (objectCompression != null && objectCompression.length() > 0)
            headers.put(HEADER_OBJECT_COMPRESSION, objectCompression.toLowerCase());

        List<Response> response = producerTemplate.requestBodyAndHeaders("direct:applicant01", request, headers, List.class);
        if (response != null) {
            return ResponseEntity.ok(response);
        } else {
            return ResponseEntity.notFound().build();
        }
    }
}

Služba má jeden volitelný parametr, kterým říkám, zda chci komprimovat nebo ne. Ten se použije k nastavení hlavičky ObjectCompression.

Jak si to vyzkoušet

Takto to vypadá, pokud zavolám službu s požadovanou kompresí GZIP:

[raska@localhost ~]$ curl -s -d '{ "value": "1234", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' 'http://localhost:8080/rest/appl01?compression=gzip' | jq .
[
  {
    "name": "provider01",
    "ts": "2021-01-03T09:43:39.577+00:00",
    "result": 1244
  },
  {
    "name": "provider02",
    "ts": "2021-01-03T09:43:39.621+00:00",
    "result": 2488
  },
  {
    "name": "provider03",
    "ts": "2021-01-03T09:43:39.852+00:00",
    "result": 1584456
  }
]

No a v logu bych měl vidět, že se provedla serializace a komprese zpráv. Nějak takto by to mělo vypadat:

bitcoin školení listopad 24

2021-01-03 10:43:39.476  INFO: COMPRESSED OBJECT to GZIP
2021-01-03 10:43:39.573  INFO: DECOMPRESSED OBJECT from GZIP
2021-01-03 10:43:39.580  INFO: COMPRESSED OBJECT to GZIP
2021-01-03 10:43:39.602  INFO: DECOMPRESSED OBJECT from GZIP
2021-01-03 10:43:39.620  INFO: DECOMPRESSED OBJECT from GZIP
2021-01-03 10:43:39.625  INFO: COMPRESSED OBJECT to GZIP
2021-01-03 10:43:39.668  INFO: DECOMPRESSED OBJECT from GZIP
2021-01-03 10:43:39.851  INFO: DECOMPRESSED OBJECT from GZIP
2021-01-03 10:43:39.854  INFO: COMPRESSED OBJECT to GZIP
2021-01-03 10:43:39.876  INFO: DECOMPRESSED OBJECT from GZIP

Tady je volání bez požadované komprese:

[raska@localhost ~]$ curl -s -d '{ "value": "1234", "name": "REQUESTED by TRPASLIK" }' -H 'Content-Type: application/json' 'http://localhost:8080/rest/appl01' | jq .
[
  {
    "name": "provider01",
    "ts": "2021-01-03T09:45:48.674+00:00",
    "result": 1244
  },
  {
    "name": "provider02",
    "ts": "2021-01-03T09:45:48.695+00:00",
    "result": 2488
  },
  {
    "name": "provider03",
    "ts": "2021-01-03T09:45:48.736+00:00",
    "result": 1584456
  }
]

V logu bych měl vidět, že se v tomto případě komprese nedělala. Asi nějak takto:

2021-01-03 10:45:48.651  INFO: No object compression applied on the message.
2021-01-03 10:45:48.677  INFO: No object compression applied on the message.
2021-01-03 10:45:48.695  INFO: No object compression applied on the message.
2021-01-03 10:45:48.736  INFO: No object compression applied on the message.

Autor článku

Jiří Raška pracuje na pozici IT architekta. Poslední roky se zaměřuje na integrační a komunikační projekty ve zdravotnictví. Mezi jeho koníčky patří také paragliding a jízda na horském kole.