红联Linux门户
Linux帮助

Spring For Apache Kafka 2.1.0和1.3.2发布

发布时间:2017-12-02 09:06:25来源:红联作者:baihuo
Spring for Apache Kafka 2.1.0 已发布,同时发布的还有 1.3.2 和 2.0.2 维护版本,包含重要的 Bug 修复。

2.1.0 版本的主要将 kafka-clients 库升级到 1.0.0,以及一些改进:

Sometimes, when a message can’t be processed, you may wish to stop the container so the condition can be corrected and the message re-delivered. The framework now provides the ContainerStoppingErrorHandler for record listeners and ContainerStoppingBatchErrorHandler for batch listeners.

The KafkaAdmin now supports increasing partitions when a NewTopic bean is detected with a larger number of partitions than currently exist on the topic.

StringJsonMessageConverter and JsonSerializer/JsonDeserializer now pass and consume type information in Headers. This allows multiple types to be easily sent/received on the same topic:[code]@SpringBootApplication
public class Kafka21Application {

public static void main(String[] args) {
SpringApplication.run(Kafka21Application.class, args)
.close();
}

@Bean
public ApplicationRunner runner(KafkaTemplate template) {
return args -> {
template.send(MessageBuilder.withPayload(42)
.setHeader(KafkaHeaders.TOPIC, "blog")
.build());
template.send(MessageBuilder.withPayload("43")
.setHeader(KafkaHeaders.TOPIC, "blog")
.build());
Thread.sleep(5_000);
};
}

@Bean
public StringJsonMessageConverter converter() {
return new StringJsonMessageConverter();
}

@Component
@KafkaListener(id = "multi", topics = "blog")
public static class Listener {

@KafkaHandler
public void intListener(Integer in) {
System.out.println("Got an int: " + in);
}

@KafkaHandler
public void stringListener(String in) {
System.out.println("Got a string: " + in);
}

}

}[/code][code]Got an int: 42
Got a string: 43[/code]the JsonSerializer and JsonDeserializer can be configured using kafka properties for the producer/consumer.

软件详情:https://spring.io/blog/2017/12/01/spring-for-apache-kafka-2-1-0-release-and-1-3-2-2-0-2-available

来自:开源中国社区
文章评论

共有 0 条评论