提问者:小点点

使用RabbitMQ插件在Grails中创建队列运行时


我有一个系统,外部系统可以订阅我的系统生成的事件。该系统是用Grails 2编写的,使用RabbitMQ插件进行内部消息传递。外部系统的事件通过HTTP进行通信。

我想为每个订阅者创建一个队列,以防止缓慢的订阅者endpoint减慢发送给其他订阅者的消息。订阅可以在运行时发生,这就是为什么不希望在应用程序配置中定义队列的原因。

如何使用Grails RabbitMQ插件创建具有主题绑定运行时的队列?

由于从RabbitMQ队列读取消息直接耦合到服务,因此创建队列运行时的一个附带问题可能是该Grails服务的多个实例。有什么想法吗?


共2个答案

匿名用户

我没有现成的解决方案,但是如果你遵循RabbitmqGrailsPlugin描述符中的代码,尤其是doWellSpring部分,你应该能够重新创建必要的步骤,在运行时动态初始化新的队列和关联的监听器

这一切都归结为传递所需的参数,注册必要的Spring豆并启动侦听器。

为了回答您的第二个问题,我认为您可以提出一些命名约定并为每个队列创建一个新的队列处理程序。如何动态创建Springbean的示例可以在这里找到:动态声明bean

只是一个简短的例子,我将如何快速注册一个队列,它需要更多的布线等……

def createQ(queueName) {
    def queuesConfig = {
        "${queueName}"(durable: true, autoDelete: false,)
    }
    def queueBuilder = new RabbitQueueBuilder()
    queuesConfig.delegate = queueBuilder
    queuesConfig.resolveStrategy = Closure.DELEGATE_FIRST
    queuesConfig()

    queueBuilder.queues?.each { queue ->
        if (log.debugEnabled) {
            log.debug "Registering queue '${queue.name}'"
        }
        BeanDefinitionBuilder builder = BeanDefinitionBuilder.rootBeanDefinition(Queue.class);
        builder.addConstructorArgValue(queue.name)
        builder.addConstructorArgValue(Boolean.valueOf(queue.durable))
        builder.addConstructorArgValue(Boolean.valueOf(queue.exclusive))
        builder.addConstructorArgValue(Boolean.valueOf(queue.autoDelete))
        builder.addConstructorArgValue(queue.arguments)
        DefaultListableBeanFactory factory = (DefaultListableBeanFactory) grailsApplication.mainContext.getBeanFactory();
        factory.registerBeanDefinition("grails.rabbit.queue.${queue.name}", builder.getBeanDefinition());
    }
}

匿名用户

我最终使用了Grails RabbitMQ插件使用的Spring AMQP。删除了一些方法/参数,因为它们与示例无关:

class MyUpdater {
  void handleMessage(Object message) {
    String content = new String(message)

    // do whatever you need with the message
  }
}


import org.springframework.amqp.core.BindingBuilder
import org.springframework.amqp.core.Queue
import org.springframework.amqp.core.TopicExchange
import org.springframework.amqp.rabbit.core.RabbitAdmin
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter
import org.springframework.amqp.support.converter.SimpleMessageConverter
import org.springframework.amqp.rabbit.connection.ConnectionFactory

class ListenerInitiator {

  // autowired
  ConnectionFactory   rabbitMQConnectionFactory

  protected void initiateListener() {
    RabbitAdmin admin = new RabbitAdmin(rabbitMQConnectionFactory)

    // normally passed to this method, moved to local vars for simplicity
    String queueName = "myQueueName"
    String routingKey = "#"
    String exchange = "myExchange"

    Queue queue = new Queue(queueName)
    admin.declareQueue(queue)
    TopicExchange exchange = new TopicExchange(exchange)
    admin.declareExchange(exchange)

    admin.declareBinding( BindingBuilder.bind(queue).to(exchange).with(routingKey) )

    // normally passed to this method, moved to local var for simplicity
    MyUpdater listener = new MyUpdater()
    SimpleMessageListenerContainer container =
        new SimpleMessageListenerContainer(rabbitMQConnectionFactory)
    MessageListenerAdapter adapter = new MessageListenerAdapter(listener)
    adapter.setMessageConverter(new SimpleMessageConverter())

    container.setMessageListener(adapter)
    container.setQueueNames(queueName)
    container.start()
}