Asynchronous Interprocess Communication with Spring Boot and Amazon SQS
Very frequently microservices need to talk to each other. For performance reasons, this communication is done asynchronously. There are two major models of asynchronous interprocess communication(IPC).
This post is focused on implementing point to point communication between two microservices using Spring Boot, Java Messaging System(JMS) and Amazon SQS.
Point-to-point communication
Communication is from a particular microservice to another. The client sends a message to the consumer, the consumer process the message and may send a message back to the client if a response is expected.Publish-subscriber
A single microservice broadcasts a message which is then consumed by zero or more microsevices which are listening for this particular message type of message.This post is focused on implementing point to point communication between two microservices using Spring Boot, Java Messaging System(JMS) and Amazon SQS.
SQS Queues
A message queue hold all the messages of a particular type for a microservice to process. Microservices communicate by sending messages to each other's queue.Maven dependencies
Three dependencies are required, Spring JMS library, Amazon Java SDK and SQS Library.<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.10.77</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-sqs-java-messaging-lib</artifactId>
<version>1.0.1</version>
</dependency>
Configuring Spring Boot
Create a configuration Class to take care of initializing the AWS connection and messaging queuesimport com.amazon.sqs.javamessaging.SQSConnectionFactory;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
@Configuration
@Profile("!test") //no connection to AWS during unit testing
public class MessagingConfig {
@Bean
public ConnectionFactory connectionFactory() {
SQSConnectionFactory connectionFactory
= SQSConnectionFactory.builder()
.withRegion(Region.getRegion(Regions.EU_WEST_1))
.withAWSCredentialsProvider(new DefaultAWSCredentialsProviderChain())
.build();
return connectionFactory;
}
@Bean
public Session session(Connection connection) throws JMSException {
return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
@Bean
public Connection connection(ConnectionFactory connectionFactory) throws JMSException {
Connection connection = connectionFactory.createConnection();
connection.start();
return connection;
}
@Bean
public Queue inbox(Session session) throws JMSException {
return session.createQueue("sqs_inbox_queue");
}
}
Sending and receiving messages.
A convenient way to send and receive messages is to create a Spring component to do this.
import com.fasterxml.jackson.core.JsonProcessingException;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import ng.shoppi.delivery.service.JmsMessageHandler;
import ng.shoppi.delivery.util.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class JmsMessageHandler implements JmsMessageHandler {
private static Logger log = LoggerFactory.getLogger(JmsMessageHandler.class);
@Autowired
private Session session;
@Autowired
private Connection connection;
private ObjectMapper mapper = new ObjectMapper();
@Override
public void sendMessage(Object body, Queue destination) {
try {
MessageProducer producer = session.createProducer(destination);
String json = mapper.writeValueAsString(body);
TextMessage message = session.createTextMessage(json);
producer.send(message);
} catch (JMSException | JsonProcessingException ex) {
log.error("Error sending message", ex);
}
}
@Override
public void listen(Queue destination, MessageListener listener) {
try {
log.info("Starting listener for: " + destination);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(listener);
connection.start();
} catch (JMSException ex) {
log.error("Error starting listener for " + destination, ex);
}
}
}
Listening for incoming messages on a queue
Usually you would want to create a listener for a queue during Spring Boot initialization. For example,@Service
public class MailListener {
@Autowired
private Queue mailQueue;
@Autowired
private JmsMessageHandler handler;
@PostConstruct
public void init() {
handler.listen(mailQueue, message-> {
//logic to process incoming message
});
}
}
Sending an outgoing message
Using our messaging component, we can now send a message as follows
@Autowired
private Queue mailQueue;
@Autowired
private JmsMessageHandler handler;
public void send(Object payload) {
handler.sendMessage(payload, mailQueue);
}
Comments
Post a Comment