How to Implement Spring Boot Endpoint for Confluent Cloud-Like Batch Payloads?

  softwareengineering

I’m currently working on a Spring Boot application where I need to create an endpoint similar to the one mentioned in the Confluent Cloud documentation (link).

The cURL example provided in the documentation is as follows:

curl -X POST -H "Content-Type: application/json"  -H  
"Authorization: Basic <BASE64-encoded-key-and-secret>” 
"https://<REST-endpoint>/kafka/v3/clusters/<cluster-id>/topics/<topic-name>/records" -d 
'{"value": {"type": "BINARY", "data": "<base64-encoded-data>"}} {"value": {"type": "BINARY", "data": "<base64-encoded-data>"}}'

Could anyone guide me on how to design the Spring Boot controller to handle this type of batch payload and effectively access the data from within the controller?

4

I am expanding on my comment, just to add few more pointers towards a possible solution. Likely a nicer way needs to customize the Jackson mapper (assuming it is what’s being used) so to hide to the controller the trick that makes it possible. But on the fly this is all I could think of:

  // inside your class, likely annotated with @RestController.
  // Here, annotations like @PostMapping ... or maybe @Override
  // and all the annotations are in an interface the controller class
  // is implementing...
  public AnAnswer someControllerMethod(@RequestBody String body) {
    List<Item> items = new ArrayList<>(); 
    // for this code, see https://stackoverflow.com/questions/51820871/parse-all-jsons-that-are-concatenated-in-one-string-in-java
    ObjectMapper mapper = new ObjectMapper();
    JsonFactory factory = new JsonFactory(mapper);
    try {
      JsonParser parser = factory.createParser(body);
      Iterator<Item> iterItem = parser.readValuesAs(Item.class);
      // maybe "modern" Java allows for cooler ways to populate items;
      while (iterItem.hasNext()) {
        Item i = iterItem.next();
        items.add(i);
        // or elaborate the item on the spot:
        //   service.doSomethingWithThis(i)
        // then you don't need items anymore, but you need a way
        // to collect the outcomes (if any) to build AnAnswer,
        // unless this method needs to return just a 2xx, with no
        // body...
      }
    } catch (IOException | JsonParseException ex) {
      // handle or rethrow these exceptions...
      // if you let this fallthrough, the service method will be executed
      // with an empty list, and the errors will silently be forgotten.
    }
    return service.doSomethingWithThese(items);
    // or just
    //   return; // or
    //   return new AnAnswer(something); // or
    //   ...
  }

Notes:

  • I am assuming that the sentence ‘I need to create an endpoint similar to the one mentioned in the Confluent Cloud documentationdoes not imply that the OP is actually working with Confluent Cloud (and Kafka, as a comment imagined): I am not building on unstated OP’s motivations and purposes; just considering this one: creating an endpoint similar to the cited one — assuming that the problematic part is the payload with several JSON objects not inside a JSON array.
  • I am assuming that the code from the other SO answer is working! I did not test this code. The code above is written directly in the web editor: not tested, not checked for typos or alike: consider it as pseudocode, more or less.
  • Maybe this code does other assumptions which don’t work for OP’s case
  • I am not saying this is a good way of doing it: it is just a lazy way that could work (that is, it solves a problem while there aren’t better answers yet)
  • Each JSON in the payload must be unmarshallable into an Item
  • This is not a streaming solution; that is, all the payload will be loaded into the String, and then the unmarshalled Items will fill the array… a lot of memory could be used, depending on the size of the payload. If you consume each item on the spot, still the whole payload will be loaded into the string, which is not ideal.

LEAVE A COMMENT