Kafka Streams join not working when schema is changed

  Kiến thức lập trình

I know that Kafka Streams uses Murmur3 to hash the value of the message in order to tackle race-conditions. But this also means that if we change the structure of our message, the join fails, because the hash is changed.

This is the case when we adding a primitive type or removing/renaming fields in Java. In case of json, re-ordering/sorting properties will also result in a different hash.

For example, lets say we have a Person and a City object we want to join:

Person:

public class Person {
  private String id;
  private String name;
  private String cityId; // foreign-key
}

City:

public class City {
  private String id;
  private String name;
}

We can now join the two tables like below:

persons.join(cities, person -> person.getCityId(), (person, city) -> // join function...

Next, publish a message with id p1 to the persons topic and a message with id c1 to cities topic and the join will work. Now, stop the streams app and add a new primitive attribute in Person class:

public class Person {
  private String id;
  private String name;
  private String cityId; // foreign-key
  private boolean deleted; // new primitve attribute
}

Run the streams app again and publish a message with id c1 to the cities topic, you will see that the join will not work. However, if we publish a message with id p1 to the persons topic, the join will work.

Is there a way to fix this issue. We use a json serde for serialization/deserialization.

What you describe sounds like a known issue: https://issues.apache.org/jira/browse/KAFKA-15303

I am not aware of a workaround unfortunately.

LEAVE A COMMENT