由于您正在使用POJO收听消息,因此您无法处理消息。
您Foundation#importExchange应该接受消息正文(在您的情况下byte[]),并返回适合于回复消息正文的内容。
该replyHandler只是要实现MessageListener。
该框架将为您做相关的事情。
解决方法我正在使用spring amqp rabbitmq,并使用
Message message = MessageBuilder.withBody(item.toString().getBytes()).setReplyTo('importReply').setCorrelationId(item.toString().getBytes()).build();
我的消息处理程序是
public class Foundation { public Message importExchange(Message exchange) {System.out.println('Command:' + exchange.getBody());Message message = MessageBuilder.withBody(exchange.getBody().toString().getBytes()).setCorrelationId(exchange.getMessageProperties().getCorrelationId() .toString().getBytes()).build();return message; }}
我已经用钩住了
<rabbit:listener-container connection-factory='rabbitConnectionFactory' concurrency='10'> <rabbit:listener queues='${rabbitmq.import.queue}'ref='foundation' method='importExchange' /> <rabbit:listener queues='${rabbitmq.import.reply.queue}'ref='importExchangeItemWriter' method='replyHandler' /></rabbit:listener-container>
但是我变得无法执行
Execution of Rabbit message listener failed,and no ErrorHandler has been set.org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Failed to invoke target method ’importExchange’ with argument type = [class [B],value = [{[B@427829d8}] at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:483) at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:374) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:647) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:573) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:75) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:154) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1111) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:556) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:904) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:888) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$500(SimpleMessageListenerContainer.java:75) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:989) at java.lang.Thread.run(Thread.java:744)Caused by: java.lang.NoSuchMethodException: com.stockopedia.symfony.Foundation.importExchange([B) at java.lang.Class.getMethod(Class.java:1665) at org.springframework.util.MethodInvoker.prepare(MethodInvoker.java:178) at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:466) ... 12 more
生产者端的replyHandler也有类似的问题
public void replyHandler(Message message) { System.out.println('In Reply Handler:' + message.getMessageProperties().getCorrelationId());}
另外,如果importExchange中有任何异常,如何在ReplyHandler中获取异常?