Home | Send Feedback

Sending Protocol Buffers messages over SQS

Published: January 15, 2022  •  go, java, aws

SQS

Amazon Simple Queue Service (Amazon SQS) is a managed message queuing service part of the Amazon Web Services (AWS). SQS allows applications to send and receive messages up to a size of 256KB. Unlike other similar technologies, there is no need to set up any server infrastructure. Amazon manages everything and sells SQS at a per-use rate.

Check out the pricing page for more information:
https://aws.amazon.com/sqs/pricing/

Sending a few hundred messages a month should fall into the free tier. But, it also depends on the number of consumers.

To use SQS, we first have to create queues. You can do this either via the AWS web console, with the AWS CLI, with an Infrastructure as Code tool (Terraform, Pulumi, ...) or programmatically from an application.
After creating the queues, an application can communicate with SQS either with the AWS SDK or any HTTP client.

A complete SQS architecture consists of producers who create and send messages to SQS and consumers who receive and consume messages. Producers and consumers are decoupled and don't have to run at the same time. SQS stores the messages in the queue for a configurable amount of time.

producer-consumer

This architecture can easily be scaled. Based on the number of messages in the queue an auto-scaling component can automatically start more consumers. There could also be multiple producers that send messages into the same queue.

Note that SQS implements only the point-to-point messaging pattern. A message is consumed by exactly one consumer. SQS does not support the publish/subscribe pattern where one message can be sent to multiple receivers (see AWS SNS for this use case)

Protocol Buffers

Protocol Buffers is a data format developed by Google. It uses a compact, binary wire format that is not self-describing. Names and full datatypes of fields are not encoded into the wire format. Therefore Protocol Buffers require an external specification. The specification is a text file usually with the ending .proto. This specification needs to be compiled with the Protocol Buffers compiler (protoc) into code for the target language. Protocol Buffers supports many popular programming languages.

For the following examples, I use this Protocol Buffers definition.

For compiling the specification into Java code, I use this Maven plugin:
https://github.com/os72/protoc-jar-maven-plugin

The plugin automatically downloads the protoc compiler and runs the compilation during the generate-sources Maven lifecycle phase. The compiled Protocol Buffers classes are stored in the target folder.

See here how to configure the plugin in the pom.xml. The plugin reads all .proto files from the configured directory and generates Java code.


For Go applications, I use this Docker image to compile the .proto files into Go code:

In a Makefile, I added the following target that allows me to start the compilation with make gen-proto

gen-proto:
  docker run --rm -v $(shell pwd):/work/output -v $(shell pwd)/../../protobuf/addressbook.proto:/work/addressbook.proto  goprotoc --proto_path=/work --go_out=/work/output addressbook.proto

Makefile

Another approach is to install the protoc compiler locally on the development machine. Visit this page for more information.

The Docker approach is slower but has the advantage that, except Docker, no additional installation is required. Which is especially convenient when working in a team of developers. Each developer uses the same protoc version, and upgrades only require changing the Dockerfile.

Infrastructure

Before coding, we need to create an SQS queue and an S3 bucket. You can do this manually in the web console or with the AWS CLI.

For this article, I wrote a Pulumi script in TypeScript. You find the code here. pulumi up creates the SQS queue and the S3 bucket. The Pulumi script prints out the name of the S3 bucket and the queue ARN. These are the two pieces of information we need for the following examples.


I will show you how to send Protocol Buffers messages over SQS with Java and Go in the following examples. The first example demonstrates sending messages that fit into the SQS message size limit of 256KB and the second example shows a way to deal with messages larger than 256KB.

Protocol Buffers message over SQS

The first example demonstrates a producer in Java and a consumer in Go. I also implemented the opposite workflow (producer in Go and consumer in Java). You find the code here. Concepts discussed in the following section also apply to this code.

Producer

To work with SQS and Protocol Buffers in Java, add these two dependencies to the project.

  <dependencies>
    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>3.19.3</version>
    </dependency>
    <dependency>
      <groupId>software.amazon.awssdk</groupId>
      <artifactId>sqs</artifactId>
    </dependency>
  </dependencies>

pom.xml

To access SQS, an application needs to create an instance of software.amazon.awssdk.services.sqs.SqsClient. I'm running these examples from my local computer, and I have a home profile configured in ~/.aws/config. It's important to note that you must not do this when your application runs on AWS. Instead, assign the permission to the container with a service role and instantiate the SQSClient with SqsClient.create(). Do not use access and secret keys in that scenario.

        ProfileCredentialsProvider awsCredentials = ProfileCredentialsProvider
            .create("home");
        SqsClient client = SqsClient.builder().credentialsProvider(awsCredentials)
            .region(Region.US_EAST_1).build()) {

Producer.java

The application then creates the Protocol Buffers messages.

    PhoneNumber pn = PhoneNumber.newBuilder().setType(PhoneType.MOBILE).setNumber("1111")
        .build();
    Person person = Person.newBuilder().setId(1).setEmail("john@test.com").setName("John")
        .addPhones(pn).build();
    AddressBook book = AddressBook.newBuilder().addPeople(person).build();

Producer.java


Now we face a problem. The body of an SQS message must be a string, but Protocol Buffers uses a binary wire format.

Encoding the binary message with Base64 or Ascii85 to a String is a solution but increases the message size (1/4 for ASCII85 and 1/3 for Base64).

We don't have to worry about this because there is a more convenient way to send binary data over SQS. An SQS message consists not only of a body but also of headers. Headers are part of the message and count towards the size limit of 256KB. SQS message headers support string, number, and binary data. So instead of sending the Protocol Buffers message in the SQS body, we send it as a binary header. You can choose an arbitrary name for the header, but it must follow the restrictions listed here.

For this example, I chose the header name Body.

      SendMessageRequest request = SendMessageRequest.builder().queueUrl(queueURL)
          .messageBody(" ")
          .messageAttributes(Map.of("Body",
              MessageAttributeValue.builder().dataType("Binary")
                  .binaryValue(SdkBytes.fromByteArray(book.toByteArray())).build()))
          .build();

      SendMessageResponse response = client.sendMessage(request);
      System.out.println("Message sent: " + response.messageId());

Producer.java

The body of an SQS message can't be empty. Sending a space (" ") solves that problem.

It is worth noting that internally the AWS SDK converts the binary header into a Base64 encoded string before sending the message over the wire to AWS. It looks like we didn't win anything by using this approach, but there is one difference. The length of the message body counts towards the message size, and a Base64 encoded string is about 1/3 larger than the original data.
Even though the AWS SDK transfers binary headers as Base64 when SQS receives the message, it decodes them back to binary and counts the number of bytes.

I tested this with a binary array of length 261,987 bytes. The Base64 encoded string had a length of 349,316. So it is too large to send it as an SQS message body (max size 262,144 bytes). But SQS accepts the message when sending the byte array as a binary message header.


Consumer

To access SQS from Go, add the following dependencies.

go get github.com/aws/aws-sdk-go-v2/aws
go get github.com/aws/aws-sdk-go-v2/config
go get github.com/aws/aws-sdk-go-v2/service/sqs

For all the following Go example I use this error checking helper function to keep the code concise.

func check(e error) {
  if e != nil {
    log.Panicln(e)
  }
}

main.go

Like in Java the application first has to create a SQS client.

  cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithSharedConfigProfile("home"))
  check(err)

  sqsClient := sqs.NewFromConfig(cfg)

main.go

As mentioned above I'm using here a profile. Do not use this code when running the application on AWS. Intead create the client with this code:

    cfg, err := config.LoadDefaultConfig(context.TODO())
    check(err)
    sqsClient := sqs.NewFromConfig(cfg)

To receive SQS messages, a consumer has to poll SQS with the ReceiveMessage method periodically. This example does this inside an endless loop.

  for {
    fmt.Println(time.Now(), ": poll for message")
    receiveMessageInput := &sqs.ReceiveMessageInput{
      QueueUrl:              aws.String(queueURL),
      MaxNumberOfMessages:   10,
      WaitTimeSeconds:       20,
      MessageAttributeNames: []string{"Body"},
    }
    receiveMessageOutput, err := sqsClient.ReceiveMessage(context.TODO(), receiveMessageInput)
    check(err)

main.go

Important here is that the code instructs SQS with the MessageAttributeNames option to return the "Body" header, which contains the Protocol Buffers message. By default, SQS does not return any custom headers, and an application always has to request them with the MessageAttributeNames option.

When implementing an SQS consumer, ensure that the WaitTimeSeconds option is configured correctly. This option configures the duration for which the call waits for a message to arrive in the queue. The call returns sooner than WaitTimeSeconds if a message is available. The call returns after WaitTimeSeconds with an empty array if no messages are available.

If you omit this option, it falls back to the value configured on the queue, and when it's also not configured there, it falls back to the default, which is 1 second. The reason why this configuration option is essential is the fact that AWS charges you for the number of requests.

One consumer polling every second results in 2,628,288 requests per month. The first million requests each month are free. After that, it costs $0.40 (standard queues) and $0.50 (FIFO queues) per million requests. I always set this option to the maximum value of 20 seconds (about 131,414 requests per month).

The last step in the workflow is processing the message. Because I set the MaxNumberOfMessages option to 10, the ReceiveMessage method could return up to 10 messages at once. Therefore, this consumer has to process the messages in a loop.

The consumer unmarshals the binary message into the structs protoc generated and prints them out.

    for _, message := range receiveMessageOutput.Messages {
      body := message.MessageAttributes["Body"]

      ab := &shared.AddressBook{}
      err := proto.Unmarshal(body.BinaryValue, ab)
      check(err)

      for _, person := range ab.People {
        fmt.Println(person.Id)
        fmt.Println(person.Name)
        fmt.Println(person.Email)
        for _, phone := range person.Phones {
          fmt.Print(phone.Type)
          fmt.Print(" : ")
          fmt.Println(phone.Number)
        }
        fmt.Println()
      }

      deleteMessageInput := &sqs.DeleteMessageInput{
        QueueUrl:      aws.String(queueURL),
        ReceiptHandle: message.ReceiptHandle,
      }
      _, err = sqsClient.DeleteMessage(context.TODO(), deleteMessageInput)
      check(err)
    }

main.go

After processing the message, it's important to delete it from the SQS queue with DeleteMessage.

ReceiveMessage does not delete messages from the queue. Instead, it marks them as invisible, so other consumers no longer see these messages. SQS then internally starts a timer, and when this timer reaches zero, the messages become visible again. With this workflow, SQS makes sure that a consumer has processed the message. If a consumer fails while processing a message, another consumer can take over.

The visibilityTimeoutSeconds option configures the duration of the mentioned timer. Always ensure that the duration is longer than the time a consumer needs to process a message (time between ReceiveMessage and DeleteMessage). Another important option is messageRetentionSeconds, which specifies the duration after SQS deletes messages from the queue when no consumer processes them. Both these options are set on the queue.

Large message

In this example, I will show you a way to send messages that are larger than the SQS message size limit of 256KB. The producer for this example is written in Go and the consumer in Java. If you are interested in the opposite workflow (producer in Java and consumer in Go), check out this repository.

We need to find another location to store the messages because applications can't send them over SQS. In this example, the producer stores the encoded Procol Buffers message in an S3 bucket (1) and only sends the S3 object key over SQS (2). The consumer receives the object key (3) and downloads the message from S3 (4).

producer-consumer-s3


Producer

Our application needs one additional dependency because we added S3 to our infrastructure.

go get github.com/aws/aws-sdk-go-v2/service/s3

The producer first creates the SQS and S3 client.

  cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithSharedConfigProfile("home"))
  check(err)

  sqsClient := sqs.NewFromConfig(cfg)
  s3Client := s3.NewFromConfig(cfg)

main.go

Then creates a large Protocol Buffers message.

  var people []*shared.Person
  for i := 1; i < 10_000; i++ {
    p := &shared.Person{
      Id:    int32(i),
      Name:  faker.Name(),
      Email: faker.Email(),
      Phones: []*shared.Person_PhoneNumber{
        {Number: faker.Phonenumber(), Type: shared.Person_MOBILE},
      }}

    people = append(people, p)
  }
  book := &shared.AddressBook{People: people}

  out, err := proto.Marshal(book)
  check(err)

main.go

Next, it creates a random S3 object key and uploads the binary Protocol Buffers message to S3. We don't have to worry about string encoding because S3 is a binary storage, and we can directly upload and store binary data.

  s3Key, err := uuid.NewUUID()
  check(err)

  _, err = s3Client.PutObject(context.TODO(), &s3.PutObjectInput{
    Bucket: aws.String(messageBucket),
    Key:    aws.String(s3Key.String()),
    Body:   bytes.NewReader(out),
  })
  check(err)

main.go

As the last step, the producer creates an SQS message puts the S3 object key into the body, and sends it to SQS.

  msg := &sqs.SendMessageInput{
    MessageBody: aws.String(s3Key.String()),
    QueueUrl:    aws.String(queueURL),
  }
  message, err := sqsClient.SendMessage(context.TODO(), msg)
  check(err)

main.go


Consumer

To access S3, we add the following dependency to our Java consumer.

    <dependency>
      <groupId>software.amazon.awssdk</groupId>
      <artifactId>s3</artifactId>
    </dependency>

pom.xml

The consumer first instantiates the SQS and S3 clients.

        ProfileCredentialsProvider awsCredentials = ProfileCredentialsProvider
            .create("home");
        S3Client s3Client = S3Client.builder().credentialsProvider(awsCredentials)
            .region(Region.US_EAST_1).build();
        SqsClient sqsClient = SqsClient.builder().credentialsProvider(awsCredentials)
            .region(Region.US_EAST_1).build()) {

Consumer.java

Inside an endless loop, it polls for new messages.

      while (true) {
        System.out.println("polling for message");
        ReceiveMessageRequest request = ReceiveMessageRequest.builder().queueUrl(queueURL)
            .maxNumberOfMessages(10).waitTimeSeconds(20).build();
        ReceiveMessageResponse response = sqsClient.receiveMessage(request);

Consumer.java

And processes the messages inside a loop. Because of maxNumberOfMessages(10) it is possible that receiveMessage receives up to 10 messages in one call.

In the processing phase, the consumer extracts the S3 object key from the SQS message body, downloads the object from S3, decodes the binary object into the Procol Buffers objects and prints the content out.

        for (Message message : response.messages()) {
          String s3Key = message.body();
          try (ResponseInputStream<GetObjectResponse> ris = s3Client.getObject(
              GetObjectRequest.builder().bucket(messageBucket).key(s3Key).build())) {
            AddressBook ab = AddressBook.parseFrom(ris);

            for (Person person : ab.getPeopleList()) {
              System.out.println(person.getId());
              System.out.println(person.getName());
              System.out.println(person.getEmail());
              for (PhoneNumber phone : person.getPhonesList()) {
                System.out.print(phone.getType());
                System.out.print(" : ");
                System.out.println(phone.getNumber());
              }
            }
          }

Consumer.java

After processing the message, the consumer deletes the message on SQS and S3.

          s3Client.deleteObject(
              DeleteObjectRequest.builder().bucket(messageBucket).key(s3Key).build());
          sqsClient.deleteMessage(DeleteMessageRequest.builder().queueUrl(queueURL)
              .receiptHandle(message.receiptHandle()).build());

Consumer.java

Important to note here that there is no coupling between SQS and S3. For example, the messageRetentionSeconds option on the queue instructs SQS to delete messages if no consumer fetches them during a certain amount of time. But this does not delete the object on S3.

It is possible to configure lifecycle rules on S3 that delete messages after a certain amount of time. For this article, I use this Pulumi code.

const bucket = new aws.s3.Bucket("messages", {
  acl: "private",
  lifecycleRules: [{
    enabled: true,
    expiration: {
      days: 1
    }
  }]
});

index.ts

The downside of this solution is that the expiration can't be set to lower than one day. The example in this blog post sets the SQS messageRetentionSeconds to 180 seconds, but the message on S3 will only be deleted after 24 hours. If that is a problem, you could install a Lambda function that periodically checks the age of the S3 objects and deletes old objects.


This concludes my tutorial about sending Protocol Buffers messages over SQS. You have learned that sending binary message over SQS is very convenient when using a binary header, and SQS can also be used to exchange large messages with the help of an additional storage service.