How to create a Spring Reactor Flux from a ActiveMQ queue?

T. Nash picture T. Nash · Mar 30, 2017 · Viewed 8k times · Source

I am experimenting with the Spring Reactor 3 components and Spring Integration to create a reactive stream (Flux) from a JMS queue.

I am attempting to create a reactive stream (Spring Reactor 3 Flux) from a JMS queue (ActiveMQ using Spring Integration) for clients to get the JMS messages asynchronously. I believe that I have everything hooked up correctly but the client does not receive any of the JMS messages until the server is stopped. Then all of the messages get "pushed" to the client a once.

Any help would be appreciated.

Here is the configuration file that I am using to configure the JMS, Integration components and the reactive publisher:

@Configuration
@EnableJms
@EnableIntegration
public class JmsConfiguration {

    @Value("${spring.activemq.broker-url:tcp://localhost:61616}")
    private String defaultBrokerUrl;

    @Value("${queues.patient:patient}")
    private String patientQueue;

    @Autowired
    MessageListenerAdapter messageListenerAdapter;

    @Bean
    public DefaultJmsListenerContainerFactory myFactory(
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, jmsConnectionFactory());
        return factory;
    }

    @Bean
    public Queue patientQueue() {
        return new ActiveMQQueue(patientQueue);

    }

    @Bean
    public ActiveMQConnectionFactory jmsConnectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(defaultBrokerUrl);
        connectionFactory.setTrustedPackages(Arrays.asList("com.sapinero"));
        return connectionFactory;
    }

    // Set the jackson message converter
    @Bean
    public JmsTemplate jmsTemplate() {
        JmsTemplate template = new JmsTemplate();
        template.setConnectionFactory(jmsConnectionFactory());
        template.setDefaultDestinationName(patientQueue);
        template.setMessageConverter(jacksonJmsMessageConverter());
        return template;
    }

    @Bean
    public MessageListenerAdapter messageListenerAdapter() {
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
        messageListenerAdapter.setMessageConverter(jacksonJmsMessageConverter());
        return messageListenerAdapter;
    }

    @Bean
    public AbstractMessageListenerContainer messageListenerContainer() {
        DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
        defaultMessageListenerContainer.setMessageConverter(jacksonJmsMessageConverter());
        defaultMessageListenerContainer.setConnectionFactory(jmsConnectionFactory());
        defaultMessageListenerContainer.setDestinationName(patientQueue);
        defaultMessageListenerContainer.setMessageListener(messageListenerAdapter());
        defaultMessageListenerContainer.setCacheLevel(100);
        defaultMessageListenerContainer.setErrorHandler(new ErrorHandler() {
            @Override
            public void handleError(Throwable t) {
                t.printStackTrace();
            }
        });

        return defaultMessageListenerContainer;
    }

    @Bean // Serialize message content to json using TextMessage
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }


    @Bean
    public MessageChannel jmsOutboundInboundReplyChannel() {
        return MessageChannels.queue().get();
    }

    @Bean
    public Publisher<Message<String>> pollableReactiveFlow() {
        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(messageListenerContainer()).get())
                .channel(MessageChannels.queue())
                .log(LoggingHandler.Level.DEBUG)
                .log()
                .toReactivePublisher();
    }

    @Bean
    public MessageChannel jmsChannel() {
        return new DirectChannel();
    }

The controller that creates the Flux is:

@RestController
@RequestMapping("patients")
public class PatientChangePushController {
    private LocalDateTime lastTimePatientDataRetrieved = LocalDateTime.now();
    private int durationInSeconds = 30;
    private Patient patient;
    AtomicReference<SignalType> checkFinally = new AtomicReference<>();

    @Autowired
    PatientService patientService;

    @Autowired
    @Qualifier("pollableReactiveFlow")
    private
    Publisher<Message<String>> pollableReactiveFlow;

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    private Queue patientQueue;

    /**
     * Subscribe to a Flux of a patient that has been updated.
     *
     * @param id
     * @return
     */
    @GetMapping(value = "/{id}/alerts", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Message<String>> getPatientAlerts(@PathVariable Long id) {

        Flux<Message<String>> messageFlux = Flux.from(pollableReactiveFlow);
        return messageFlux;
    }

    @GetMapping(value = "/generate")
    public void generateJmsMessage() {
        for (long i = 0L; i < 100; i++) {
            Patient patient = new Patient();
            patient.setId(i);
            send(patient);
            System.out.println("Message was sent to the Queue");
        }

    }

    void send(Patient patient) {
        this.jmsTemplate.convertAndSend(this.patientQueue, patient);
    }

}

If anyone can tell me why the messages do not get sent to the client until after the server is killed, I would appreciate it.

Answer

Artem Bilan picture Artem Bilan · Mar 30, 2017

Works well for me:

@SpringBootApplication
@RestController
public class SpringIntegrationSseDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringIntegrationSseDemoApplication.class, args);
    }

    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    private JmsTemplate jmsTemplate;

    @Bean
    public Publisher<Message<String>> jmsReactiveSource() {
        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(this.connectionFactory)
                        .destination("testQueue"))
                .channel(MessageChannels.queue())
                .log(LoggingHandler.Level.DEBUG)
                .log()
                .toReactivePublisher();
    }

    @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> getPatientAlerts() {
        return Flux.from(jmsReactiveSource())
                .map(Message::getPayload);
    }

    @GetMapping(value = "/generate")
    public void generateJmsMessage() {
        for (int i = 0; i < 100; i++) {
            this.jmsTemplate.convertAndSend("testQueue", "testMessage #" + (i + 1));
        }
    }

}

In one terminal I have curl http://localhost:8080/events which waits for SSEs from that Flux.

In other terminal I perform curl http://localhost:8080/generate and see in the first one:

data:testMessage #1

data:testMessage #2

data:testMessage #3

data:testMessage #4

I use Spring Boot 2.0.0.BUILD-SNAPSHOT.

Also see here: https://spring.io/blog/2017/03/08/spring-tips-server-sent-events-sse