I am experimenting with the Spring Reactor 3 components and Spring Integration to create a reactive stream (Flux) from a JMS queue.
I am attempting to create a reactive stream (Spring Reactor 3 Flux) from a JMS queue (ActiveMQ using Spring Integration) for clients to get the JMS messages asynchronously. I believe that I have everything hooked up correctly but the client does not receive any of the JMS messages until the server is stopped. Then all of the messages get "pushed" to the client a once.
Any help would be appreciated.
Here is the configuration file that I am using to configure the JMS, Integration components and the reactive publisher:
@Configuration
@EnableJms
@EnableIntegration
public class JmsConfiguration {
@Value("${spring.activemq.broker-url:tcp://localhost:61616}")
private String defaultBrokerUrl;
@Value("${queues.patient:patient}")
private String patientQueue;
@Autowired
MessageListenerAdapter messageListenerAdapter;
@Bean
public DefaultJmsListenerContainerFactory myFactory(
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
configurer.configure(factory, jmsConnectionFactory());
return factory;
}
@Bean
public Queue patientQueue() {
return new ActiveMQQueue(patientQueue);
}
@Bean
public ActiveMQConnectionFactory jmsConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(defaultBrokerUrl);
connectionFactory.setTrustedPackages(Arrays.asList("com.sapinero"));
return connectionFactory;
}
// Set the jackson message converter
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(jmsConnectionFactory());
template.setDefaultDestinationName(patientQueue);
template.setMessageConverter(jacksonJmsMessageConverter());
return template;
}
@Bean
public MessageListenerAdapter messageListenerAdapter() {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter();
messageListenerAdapter.setMessageConverter(jacksonJmsMessageConverter());
return messageListenerAdapter;
}
@Bean
public AbstractMessageListenerContainer messageListenerContainer() {
DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
defaultMessageListenerContainer.setMessageConverter(jacksonJmsMessageConverter());
defaultMessageListenerContainer.setConnectionFactory(jmsConnectionFactory());
defaultMessageListenerContainer.setDestinationName(patientQueue);
defaultMessageListenerContainer.setMessageListener(messageListenerAdapter());
defaultMessageListenerContainer.setCacheLevel(100);
defaultMessageListenerContainer.setErrorHandler(new ErrorHandler() {
@Override
public void handleError(Throwable t) {
t.printStackTrace();
}
});
return defaultMessageListenerContainer;
}
@Bean // Serialize message content to json using TextMessage
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
@Bean
public MessageChannel jmsOutboundInboundReplyChannel() {
return MessageChannels.queue().get();
}
@Bean
public Publisher<Message<String>> pollableReactiveFlow() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(messageListenerContainer()).get())
.channel(MessageChannels.queue())
.log(LoggingHandler.Level.DEBUG)
.log()
.toReactivePublisher();
}
@Bean
public MessageChannel jmsChannel() {
return new DirectChannel();
}
The controller that creates the Flux is:
@RestController
@RequestMapping("patients")
public class PatientChangePushController {
private LocalDateTime lastTimePatientDataRetrieved = LocalDateTime.now();
private int durationInSeconds = 30;
private Patient patient;
AtomicReference<SignalType> checkFinally = new AtomicReference<>();
@Autowired
PatientService patientService;
@Autowired
@Qualifier("pollableReactiveFlow")
private
Publisher<Message<String>> pollableReactiveFlow;
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Queue patientQueue;
/**
* Subscribe to a Flux of a patient that has been updated.
*
* @param id
* @return
*/
@GetMapping(value = "/{id}/alerts", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Message<String>> getPatientAlerts(@PathVariable Long id) {
Flux<Message<String>> messageFlux = Flux.from(pollableReactiveFlow);
return messageFlux;
}
@GetMapping(value = "/generate")
public void generateJmsMessage() {
for (long i = 0L; i < 100; i++) {
Patient patient = new Patient();
patient.setId(i);
send(patient);
System.out.println("Message was sent to the Queue");
}
}
void send(Patient patient) {
this.jmsTemplate.convertAndSend(this.patientQueue, patient);
}
}
If anyone can tell me why the messages do not get sent to the client until after the server is killed, I would appreciate it.
Works well for me:
@SpringBootApplication
@RestController
public class SpringIntegrationSseDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringIntegrationSseDemoApplication.class, args);
}
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private JmsTemplate jmsTemplate;
@Bean
public Publisher<Message<String>> jmsReactiveSource() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(this.connectionFactory)
.destination("testQueue"))
.channel(MessageChannels.queue())
.log(LoggingHandler.Level.DEBUG)
.log()
.toReactivePublisher();
}
@GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> getPatientAlerts() {
return Flux.from(jmsReactiveSource())
.map(Message::getPayload);
}
@GetMapping(value = "/generate")
public void generateJmsMessage() {
for (int i = 0; i < 100; i++) {
this.jmsTemplate.convertAndSend("testQueue", "testMessage #" + (i + 1));
}
}
}
In one terminal I have curl http://localhost:8080/events
which waits for SSEs from that Flux
.
In other terminal I perform curl http://localhost:8080/generate
and see in the first one:
data:testMessage #1
data:testMessage #2
data:testMessage #3
data:testMessage #4
I use Spring Boot 2.0.0.BUILD-SNAPSHOT.
Also see here: https://spring.io/blog/2017/03/08/spring-tips-server-sent-events-sse