我有一个场景,比如。
有4个主题处理主题,异常主题,重试主题和拒绝主题。我有一个spring cloud stream应用程序,它有一个使用Kstream的处理器。该处理器从异常主题中读取消息,并基于每个消息中可用的标志,为重试主题和拒绝主题创建两个kstream分支。现在需要做的是,retry主题中出现的任何消息都必须等待一段特定的时间,然后才能将其推回处理主题。有没有人能帮我在spring cloud stream的一个kafka stream应用里做的最好的设计或者解决方案是什么?可以用flux和Mono的异步机制来设计吗?任何资源或指导将是一个很大的帮助。谢谢
我们不能在Spring Cloud Stream Kafka Streams应用程序中混合反应类型。基本上,您只能将Kafka流类型(如< code>KStream或< code>KTable)作为输入/输出绑定。如果您想在将它发送到处理主题之前引入一个延迟,为什么不能使用类似< code > ScheduleExecutorService 的东西,然后只在初始延迟之后调用服务呢?下面是一个潜在解决方案的示例伪代码。
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
...
...
.branch(
(k, v) -> {
if (reject flag found) {
return true;
}
}
(k, v) -> executorService.schedule(() -> true, initialDelay, TimeUnit.SECONDS));
在第一个分支中,我们检查记录是否绑定到拒绝状态,如果是,立即将其发送到拒绝主题。否则,它是针对重试主题的,在返回布尔标志之前延迟某个值(我猜这是在您的应用程序中外部配置的)。延迟完成后,布尔值< code>true将返回,到达分支的记录将被发送到重试主题。
请记住,我还没有尝试过这段代码,所以请注意第二个过滤器中的任何竞争条件,在这种情况下,我们以异步方式将事情交给调度器,但该线程上下文仍应保留当前的<code>(k,v)</code>对。请尝试使用一些测试数据,看看它是否符合您的要求。