How to implement MQTT server using Spring Integration?

Alexander Farber picture Alexander Farber · Oct 11, 2016 · Viewed 7.5k times · Source

When I run the Outbound Channel Adapter example for MQTT it throws an error:

Executing command line: /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/bin/java -classpath /Users/afarber/src/spring-newbie/MqttOutbound/target/classes:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot/1.4.1.RELEASE/spring-boot-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot-starter/1.4.1.RELEASE/spring-boot-starter-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot-autoconfigure/1.4.1.RELEASE/spring-boot-autoconfigure-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot-starter-logging/1.4.1.RELEASE/spring-boot-starter-logging-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/ch/qos/logback/logback-classic/1.1.7/logback-classic-1.1.7.jar:/Users/afarber/.m2/repository/ch/qos/logback/logback-core/1.1.7/logback-core-1.1.7.jar:/Users/afarber/.m2/repository/org/slf4j/jcl-over-slf4j/1.7.21/jcl-over-slf4j-1.7.21.jar:/Users/afarber/.m2/repository/org/slf4j/jul-to-slf4j/1.7.21/jul-to-slf4j-1.7.21.jar:/Users/afarber/.m2/repository/org/slf4j/log4j-over-slf4j/1.7.21/log4j-over-slf4j-1.7.21.jar:/Users/afarber/.m2/repository/org/yaml/snakeyaml/1.17/snakeyaml-1.17.jar:/Users/afarber/.m2/repository/org/slf4j/slf4j-api/1.7.16/slf4j-api-1.7.16.jar:/Users/afarber/.m2/repository/org/springframework/spring-context/4.3.2.RELEASE/spring-context-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-aop/4.3.2.RELEASE/spring-aop-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-beans/4.3.2.RELEASE/spring-beans-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-expression/4.3.2.RELEASE/spring-expression-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-core/4.3.2.RELEASE/spring-core-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/commons-logging/commons-logging/1.2/commons-logging-1.2.jar:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-core/4.3.2.RELEASE/spring-integration-core-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-messaging/4.3.3.RELEASE/spring-messaging-4.3.3.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-tx/4.3.3.RELEASE/spring-tx-4.3.3.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/retry/spring-retry/1.1.3.RELEASE/spring-retry-1.1.3.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-mqtt/4.3.2.RELEASE/spring-integration-mqtt-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/eclipse/paho/org.eclipse.paho.client.mqttv3/1.0.2/org.eclipse.paho.client.mqttv3-1.0.2.jar de.afarber.mqttoutbound.MqttJavaApplication

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v1.4.1.RELEASE)

2016-10-11 21:53:36.811  INFO 2102 --- [           main] d.a.mqttoutbound.MqttJavaApplication     : Starting MqttJavaApplication on mba.local with PID 2102 (/Users/afarber/src/spring-newbie/MqttOutbound/target/classes started by afarber in /Users/afarber/src/spring-newbie/MqttOutbound)
2016-10-11 21:53:36.816  INFO 2102 --- [           main] d.a.mqttoutbound.MqttJavaApplication     : No active profile set, falling back to default profiles: default
2016-10-11 21:53:36.960  INFO 2102 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@35a50a4c: startup date [Tue Oct 11 21:53:36 CEST 2016]; root of context hierarchy
2016-10-11 21:53:37.724  INFO 2102 --- [           main] o.s.b.f.config.PropertiesFactoryBean     : Loading properties file from URL [jar:file:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-core/4.3.2.RELEASE/spring-integration-core-4.3.2.RELEASE.jar!/META-INF/spring.integration.default.properties]
2016-10-11 21:53:37.729  INFO 2102 --- [           main] o.s.i.config.IntegrationRegistrar        : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2016-10-11 21:53:37.933  INFO 2102 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2016-10-11 21:53:37.947  INFO 2102 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2016-10-11 21:53:38.143  INFO 2102 --- [           main] o.s.b.f.config.PropertiesFactoryBean     : Loading properties file from URL [jar:file:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-core/4.3.2.RELEASE/spring-integration-core-4.3.2.RELEASE.jar!/META-INF/spring.integration.default.properties]
2016-10-11 21:53:38.148  INFO 2102 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationGlobalProperties' of type [class org.springframework.beans.factory.config.PropertiesFactoryBean] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2016-10-11 21:53:38.177  INFO 2102 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationGlobalProperties' of type [class java.util.Properties] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2016-10-11 21:53:38.592  INFO 2102 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService  'taskScheduler'
2016-10-11 21:53:39.064  INFO 2102 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2016-10-11 21:53:39.077  INFO 2102 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase -2147483648
2016-10-11 21:53:39.078  INFO 2102 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {message-handler:mqttJavaApplication.mqttOutbound.serviceActivator} as a subscriber to the 'mqttOutboundChannel' channel
2016-10-11 21:53:39.078  INFO 2102 --- [           main] o.s.integration.channel.DirectChannel    : Channel 'application.mqttOutboundChannel' has 1 subscriber(s).
2016-10-11 21:53:39.079  INFO 2102 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started mqttJavaApplication.mqttOutbound.serviceActivator
2016-10-11 21:53:39.079  INFO 2102 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0
2016-10-11 21:53:39.079  INFO 2102 --- [           main] ProxyFactoryBean$MethodInvocationGateway : started mqttJavaApplication$MyGateway
2016-10-11 21:53:39.079  INFO 2102 --- [           main] GatewayCompletableFutureProxyFactoryBean : started mqttJavaApplication$MyGateway
2016-10-11 21:53:39.080  INFO 2102 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2016-10-11 21:53:39.080  INFO 2102 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2016-10-11 21:53:39.080  INFO 2102 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started _org.springframework.integration.errorLogger
2016-10-11 21:53:39.093  INFO 2102 --- [           main] d.a.mqttoutbound.MqttJavaApplication     : Started MqttJavaApplication in 2.962 seconds (JVM running for 3.669)
Exception in thread "main" org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.MessagingException: Failed to connect; nested exception is Unable to connect to server (32103) - java.net.ConnectException: Connection refused
    at org.springframework.integration.dispatcher.AbstractDispatcher.wrapExceptionIfNecessary(AbstractDispatcher.java:133)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:120)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:135)
    at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:375)
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:477)
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:429)
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:420)
    at org.springframework.integration.gateway.GatewayCompletableFutureProxyFactoryBean.invoke(GatewayCompletableFutureProxyFactoryBean.java:65)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
    at com.sun.proxy.$Proxy40.sendToMqtt(Unknown Source)
    at de.afarber.mqttoutbound.MqttJavaApplication.main(MqttJavaApplication.java:27)
Caused by: org.springframework.messaging.MessagingException: Failed to connect; nested exception is Unable to connect to server (32103) - java.net.ConnectException: Connection refused
    at org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler.checkConnection(MqttPahoMessageHandler.java:180)
    at org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler.publish(MqttPahoMessageHandler.java:189)
    at org.springframework.integration.mqtt.outbound.AbstractMqttMessageHandler.handleMessageInternal(AbstractMqttMessageHandler.java:150)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.config.annotation.ServiceActivatorAnnotationPostProcessor$ReplyProducingMessageHandlerWrapper.handleRequestMessage(ServiceActivatorAnnotationPostProcessor.java:98)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    ... 19 more
Caused by: Unable to connect to server (32103) - java.net.ConnectException: Connection refused
    at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:79)
    at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:590)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:345)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:70)
    ... 2 more
2016-10-11 21:53:39.203  INFO 2102 --- [       Thread-1] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@35a50a4c: startup date [Tue Oct 11 21:53:36 CEST 2016]; root of context hierarchy
2016-10-11 21:53:39.207  INFO 2102 --- [       Thread-1] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 0
2016-10-11 21:53:39.209  INFO 2102 --- [       Thread-1] ProxyFactoryBean$MethodInvocationGateway : stopped mqttJavaApplication$MyGateway
2016-10-11 21:53:39.210  INFO 2102 --- [       Thread-1] GatewayCompletableFutureProxyFactoryBean : stopped mqttJavaApplication$MyGateway
2016-10-11 21:53:39.210  INFO 2102 --- [       Thread-1] o.s.i.endpoint.EventDrivenConsumer       : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2016-10-11 21:53:39.210  INFO 2102 --- [       Thread-1] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 0 subscriber(s).
2016-10-11 21:53:39.210  INFO 2102 --- [       Thread-1] o.s.i.endpoint.EventDrivenConsumer       : stopped _org.springframework.integration.errorLogger
2016-10-11 21:53:39.210  INFO 2102 --- [       Thread-1] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase -2147483648
2016-10-11 21:53:39.210  INFO 2102 --- [       Thread-1] o.s.i.endpoint.EventDrivenConsumer       : Removing {message-handler:mqttJavaApplication.mqttOutbound.serviceActivator} as a subscriber to the 'mqttOutboundChannel' channel
2016-10-11 21:53:39.210  INFO 2102 --- [       Thread-1] o.s.integration.channel.DirectChannel    : Channel 'application.mqttOutboundChannel' has 0 subscriber(s).
2016-10-11 21:53:39.211  INFO 2102 --- [       Thread-1] o.s.i.endpoint.EventDrivenConsumer       : stopped mqttJavaApplication.mqttOutbound.serviceActivator
2016-10-11 21:53:39.211  INFO 2102 --- [       Thread-1] o.s.j.e.a.AnnotationMBeanExporter        : Unregistering JMX-exposed beans on shutdown
2016-10-11 21:53:39.212  INFO 2102 --- [       Thread-1] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService 'taskScheduler'
------------------------------------------------------------------------
BUILD FAILURE

The failing code in MqttJavaApplication.java file is copied below:

@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                new SpringApplicationBuilder(MqttJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToMqtt("foo"); // THROWS THE ABOVE EXCEPTION
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setServerURIs("tcp://localhost:1883");
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                       new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {
        void sendToMqtt(String data);
    }
}

and it seems to be an MQTT client anyway...

How to implement an MQTT server (broker) as a Bean for Java Spring Integration, where to start please?

Answer

Gary Russell picture Gary Russell · Oct 11, 2016

Spring Integration does not provide a broker; it provides clients for sending/receiving messages.