Asynchronous Interprocess Communication with Spring Boot and Amazon SQS


Very frequently microservices need to talk to each other. For performance reasons, this communication is done asynchronously. There are two major models of asynchronous interprocess communication(IPC).

Point-to-point communication

Communication is from a particular microservice to another. The client sends a message to the consumer, the consumer process the message and may send a message back to the client if a response is expected.

Publish-subscriber

A single microservice broadcasts a message which is then consumed by zero or more microsevices which are listening for this particular message type of message.
This post is focused on implementing point to point communication between two microservices using Spring Boot, Java Messaging System(JMS) and Amazon SQS.

SQS Queues

A message queue hold all the messages of a particular type for a microservice to process. Microservices communicate by sending messages to each other's queue.

Maven dependencies

Three dependencies are required, Spring JMS library, Amazon Java SDK and SQS Library.
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
</dependency>
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk</artifactId>
    <version>1.10.77</version>
</dependency>
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-sqs-java-messaging-lib</artifactId>
    <version>1.0.1</version>
</dependency> 

Configuring Spring Boot

Create a configuration Class to take care of initializing the AWS connection and messaging queues
import com.amazon.sqs.javamessaging.SQSConnectionFactory;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;

@Configuration
@Profile("!test") //no connection to AWS during unit testing
public class MessagingConfig {


    @Bean
    public ConnectionFactory connectionFactory() {
        SQSConnectionFactory connectionFactory
                = SQSConnectionFactory.builder()
                        .withRegion(Region.getRegion(Regions.EU_WEST_1))
                        .withAWSCredentialsProvider(new DefaultAWSCredentialsProviderChain())
                        .build();
        return connectionFactory;
    }

    @Bean
    public Session session(Connection connection) throws JMSException {
        return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    }

    @Bean
    public Connection connection(ConnectionFactory connectionFactory) throws JMSException {
        Connection connection = connectionFactory.createConnection();
        connection.start();
        return connection;
    }

    @Bean
    public Queue inbox(Session session) throws JMSException {
        return session.createQueue("sqs_inbox_queue");
    }

}

Sending and receiving messages.

A convenient way to send and receive messages is to create a Spring component to do this.

import com.fasterxml.jackson.core.JsonProcessingException;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import ng.shoppi.delivery.service.JmsMessageHandler;
import ng.shoppi.delivery.util.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class JmsMessageHandler implements JmsMessageHandler {

    private static Logger log = LoggerFactory.getLogger(JmsMessageHandler.class);

    @Autowired
    private Session session;

    @Autowired
    private Connection connection;

    private ObjectMapper mapper = new ObjectMapper();

    @Override
    public void sendMessage(Object body, Queue destination) {
        try {
            MessageProducer producer = session.createProducer(destination);
            String json = mapper.writeValueAsString(body);
            TextMessage message = session.createTextMessage(json);
            producer.send(message);
        } catch (JMSException | JsonProcessingException ex) {
            log.error("Error sending message", ex);
        }

    }

    @Override
    public void listen(Queue destination, MessageListener listener) {
        try {
            log.info("Starting listener for: " + destination);
            MessageConsumer consumer = session.createConsumer(destination);
            consumer.setMessageListener(listener);
            connection.start();
        } catch (JMSException ex) {
            log.error("Error starting listener for " + destination, ex);
        }
    }

}

Listening for incoming messages on a queue

Usually you would want to create a listener for a queue during Spring Boot initialization. For example,
@Service
public class MailListener {

    @Autowired
    private Queue mailQueue;

    @Autowired 
    private JmsMessageHandler handler;

    @PostConstruct
    public void init() {
        handler.listen(mailQueue, message-> {
            //logic to process incoming message
        });
    }

}

Sending an outgoing message

Using our messaging component, we can now send a message as follows

    @Autowired
    private Queue mailQueue;

    @Autowired
    private JmsMessageHandler handler;

    public void send(Object payload) {
        handler.sendMessage(payload, mailQueue);
    }

Comments