Home | Send Feedback

Access Amazon S3 objects with SQL SELECT

Published: January 22, 2022  •  go, java, aws

Amazon S3 is the oldest Amazon Web Service. It is a managed binary object storage that allows an application to store and retrieve any binary data.

In this blog post, I want to show you a feature of Amazon S3 that transforms the service into a database. Not into a full-fledged transactional database, just into a read-only database. Still, a helpful feature that can simplify application code and reduce network transfer time.

Imagine an application that needs to process a file from S3 periodically. First, it has to download the whole file, extract the required information, and process it. If this is a huge file that contains several GB of data, but the application only needs a small fraction of the data, it's a waste of bandwidth and also adds complexity to the application because it needs code that extracts the necessary data from the file.

With S3 Select, an application can delegate the process of extracting the data to S3 and then download only the extracted data. It saves bandwidth, reduces the transfer time, and simplifies the application's code because it receives only the data it needs.
To extract the data, an application sends SELECT statements to S3. The SELECT statements look like regular statements you would send to a relational database, but S3 Select only supports a subset of the features.

S3 Select only supports CSV (comma-separated value), JSON and Parquet files. They must be UTF-8 encoded, and they can be compressed using gzip or bzip (CSV, JSON). S3 Select supports columnar compression (but not whole-object compression) for Parquet using gzip or Snappy.


You can access S3 Select with the AWS SDK, the REST API, or the AWS CLI. In this blog post, I will show you two examples with the AWS SDK. The first example is a Go application that reads a CSV file, and the second example is a Java application that retrieves data from a JSON file.

Setup

To demonstrate the functionality of S3 Select, I use a CSV file from USGS (United States Geological Survey):

An up-to-date list of all earthquakes that occurred during the last 30 days:
https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_month.csv


For the JSON demo, I use this Pokémon Pokédex: https://github.com/Biuni/PokemonGO-Pokedex
It contains a list of 151 Pokémons and their attributes.


To set up the demo environment, I run the following script. It creates an S3 bucket, downloads the files, compresses and uploads them to S3.

demo-up:
  aws-vault exec home -- aws s3 mb s3://select-demo
  curl https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_month.csv -o all_month.csv
  tar czf all_month.csv.tar.gz all_month.csv
  aws-vault exec home -- aws s3 cp all_month.csv s3://select-demo
  aws-vault exec home -- aws s3 cp all_month.csv.tar.gz s3://select-demo
  curl https://raw.githubusercontent.com/Biuni/PokemonGO-Pokedex/master/pokedex.json -o pokedex.json
  aws-vault exec home -- aws s3 cp pokedex.json s3://select-demo
  bzip2 pokedex.json
  aws-vault exec home -- aws s3 cp pokedex.json.bz2 s3://select-demo
  rm all_month*
  rm pokedex*

Makefile

Go, CSV

To access S3 from Go, add the following dependency to an application.

go get github.com/aws/aws-sdk-go-v2/aws
go get github.com/aws/aws-sdk-go-v2/config
go get github.com/aws/aws-sdk-go-v2/service/s3

To access S3 Select, we don't need a special client. The functionality is available from the standard S3 client.

  cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithSharedConfigProfile("home"))
  check(err)
  s3Client := s3.NewFromConfig(cfg)

main.go

I developed these examples on my local machine, which has an AWS profile configured (home). If your application runs on AWS omit the profile configuration: cfg, err := config.LoadDefaultConfig(context.TODO())


An S3 Select request consists of the bucket and object name, the input and output format description, and the SELECT query.

func runSelect(query string, s3Client *s3.Client, useCompressedFile bool) {
  key := "all_month.csv"
  if useCompressedFile {
    key = "all_month.csv.tar.gz"
  }
  compressionType := types.CompressionTypeNone
  if useCompressedFile {
    compressionType = types.CompressionTypeGzip
  }

  selectObjectInput := &s3.SelectObjectContentInput{
    Bucket:         aws.String(bucketName),
    Key:            &key,
    ExpressionType: types.ExpressionTypeSql,
    Expression:     aws.String(query),
    InputSerialization: &types.InputSerialization{
      CSV: &types.CSVInput{
        FileHeaderInfo: types.FileHeaderInfoUse,
      },
      CompressionType: compressionType,
    },
    OutputSerialization: &types.OutputSerialization{
      CSV: &types.CSVOutput{},
    },
  }

main.go

In the input description, we tell S3 Select in what format the file is stored and if it's compressed. The CSV file we will read in this example contains a header row with the field names.

time,latitude,longitude,depth,mag,magType,nst,gap,dmin,rms,net,id,updated,place,type,horizontalError,depthError,magError,magNst,status,locationSource,magSource
2022-01-18T01:12:54.160Z,38.7921677,-122.7884979,0.24,0.35,md,5,121,0.001107,0.01,nc,nc73679951,2022-01-18T01:14:31.889Z,"3km WNW of The Geysers, CA",earthquake,0.4,0.56,,1,automatic,nc,nc
...

Because the file contains a header row I set the FileHeaderInfo to types.FileHeaderInfoUse. This option allows the application to use the filed name in the SELECT query.

In the output format description, we tell S3 Select in what format we want the response back. In this example, I set the response format to CSV.


To access S3 Select the application sends the input configuration with the method SelectObjectContent to S3.

  resp, err := s3Client.SelectObjectContent(context.Background(), selectObjectInput)
  check(err)
  defer resp.GetStream().Close()

  for event := range resp.GetStream().Events() {
    switch v := event.(type) {
    case *types.SelectObjectContentEventStreamMemberRecords:
      fmt.Println(string(v.Value.Payload))
    case *types.SelectObjectContentEventStreamMemberStats:
      fmt.Println("Processed", v.Value.Details.BytesProcessed, "bytes")
    case *types.SelectObjectContentEventStreamMemberEnd:
      fmt.Println("SelectObjectContent completed")
    }
  }

  if err := resp.GetStream().Err(); err != nil {
    check(err)
  }

main.go

Because S3 Select does not know the response size in advance, it streams the response as a series of events back to the calling application. The SelectObjectContentEventStreamMemberRecords event is emitted for each row in the response. v.Value.Payload contains the data as a byte array.

SelectObjectContentEventStreamMemberStats is emitted at the end of the processing and returns the number of processed bytes (v.Value.Details.BytesProcessed), the number of scanned bytes (v.Value.Details.BytesScanned) and the number of returned bytes (v.Value.Details.BytesReturned).

At the end of the stream the SDK emits the SelectObjectContentEventStreamMemberEnd event.


An S3 Select query looks like a normal SELECT query from a relational database.

The query starts with the keyword SELECT, then a comma-separated list of columns or an asterisk (*) to select all columns. The FROM clause is always S3Object for a CSV file. After that follows an optional WHERE and LIMIT clause.

S3 Select currently does not support the ORDER BY clause, subqueries, or joins.


In the first example, we list latitude, longitude, magnitude, time, and place of the first five rows of the CSV file.

  query := "SELECT latitude,longitude,mag,\"time\",place FROM S3Object LIMIT 5"
  runSelect(query, s3Client, false)

main.go

This is what we get back. Note that you can customize the output with the OutputSerialization configuration.

38.8218346,-122.8050003,0.88,2022-01-22T05:43:24.090Z,"6km NW of The Geysers, CA"
35.9286652,-119.8608322,2.01,2022-01-22T05:37:02.350Z,"13km SE of Kettleman City, CA"        
33.2495,-117.4351667,1.12,2022-01-22T05:33:31.330Z,"6km WNW of Camp Pendleton South, CA"     
33.645,-116.703,1.25,2022-01-22T05:32:30.740Z,"10km NNW of Anza, CA"
19.2258338928223,-155.417663574219,2.55,2022-01-22T05:26:06.730Z,"6 km ENE of Pāhala, Hawaii"

time is a reserved keyword on S3 Select and needs to be escaped with double quotation marks. See here a list of all reserved keywords:
https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-glacier-select-sql-reference-keyword-list.html

Column names are, by default, case-insensitive. However, you can use double quotation marks to indicate that the column names are case-sensitive.

Because the CSV contains a header row, we can set FileHeaderInfo: types.FileHeaderInfoUse in the request and then use the column name in the query. If you need to read a CSV file without a header row, identify the columns using positional headers:

SELECT _1, _2 FROM S3Object WHERE _3 > 100

The column numbering starts with 1 (not 0).


By default, S3 Select treats all data as a string. If you want to compare the data to a value that is not a string (number, boolean, timestamp), you have to convert the value into the desired data type with the CAST function.

The following example selects all earthquakes with a magnitude greater than 6. The input file contains rows with an empty magnitude. I had to filter them out because the cast would otherwise fail.

  query = "SELECT \"time\",place,mag FROM S3Object WHERE mag <> '' AND cast(mag as float) > 6"
  runSelect(query, s3Client, false)

main.go

Output:

2022-01-22T05:17:06.947Z,"63 km SSW of Unalaska, Alaska",6.2
2022-01-21T16:08:37.361Z,"24 km SSE of Saiki, Japan",6.3
2022-01-16T12:52:10.596Z,"74 km W of Panguna, Papua New Guinea",6.1
...

Check out this link for a list of all supported data types:
https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-glacier-select-sql-reference-data-types.html

Here you find a list of all supported operators:
https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-glacier-select-sql-reference-operators.html


S3 Select also supports aggregate functions. Currently only AVG, COUNT, MIN, MAX and SUM.

The first query returns the number of records, and the second returns the maximum and minimum magnitude.

  query = "SELECT COUNT(*) FROM S3Object"
  runSelect(query, s3Client, true)

  query = "SELECT MAX(cast(mag as float)), MIN(cast(mag as float)) FROM S3Object where mag <> ''"
  runSelect(query, s3Client, true)

main.go

Output:

9226

7.3,-1.31

Visit this link for a description of all supported aggregate functions:
https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-glacier-select-sql-reference-aggregate.html

In addition S3 Select supports conditional functions, date functions and string functions.

Java, JSON

Unfortunately, S3 Select is currently (January 2022) not yet implemented in version 2 of the Java SDK. Instead, we have to use version 1 of the SDK. (pom.xml)

First, instantiate the S3 client.

    AmazonS3 s3Client = AmazonS3Client.builder().withRegion(Regions.US_EAST_1)
        .withCredentials(new AWSStaticCredentialsProvider(
            new BasicSessionCredentials(accessKey, secretKey, sessionToken)))
        .build();

Main.java

For this example I use a temporary access session key. If your application runs on AWS create the S3 client with

AmazonS3 s3Client = AmazonS3Client.builder().build()

Like in the previous Go example, our application first has to create the S3 Select request, with the bucket and object name, the input and output specification, and the SELECT query.

  private static void selectObject(AmazonS3 s3Client, String query, boolean compressed)
      throws IOException {
    String bucket = "select-demo";
    String keyUncompressed = "pokedex.json";
    String keyCompressed = "pokedex.json.bz2";
    SelectObjectContentRequest request = new SelectObjectContentRequest();
    request.setBucketName(bucket);
    if (compressed) {
      request.setKey(keyCompressed);
    }
    else {
      request.setKey(keyUncompressed);
    }
    request.setExpression(query);
    request.setExpressionType(ExpressionType.SQL);

    InputSerialization inputSerialization = new InputSerialization();
    inputSerialization.setJson(new JSONInput().withType(JSONType.DOCUMENT));
    if (compressed) {
      inputSerialization.setCompressionType(CompressionType.BZIP2);
    }
    else {
      inputSerialization.setCompressionType(CompressionType.NONE);
    }
    request.setInputSerialization(inputSerialization);

    OutputSerialization outputSerialization = new OutputSerialization();
    outputSerialization.setJson(new JSONOutput());
    request.setOutputSerialization(outputSerialization);

Main.java

In this example, we will read a JSON file, and we also want the result back in JSON format. An important configuration is the JSON type we set here to JSONType.DOCUMENT. DOCUMENT means that a single JSON object can span multiple lines in the input. The other supported value is LINES, which means that each line in the input data contains a single JSON object.

The JSON contains a root object with the name pokemon, which is an array of objects. Each of the objects contains information about one Pokémon.

{
  "pokemon": [{
    "id": 1,
    "num": "001",
    "name": "Bulbasaur",
    "img": "http://www.serebii.net/pokemongo/pokemon/001.png",
    "type": [
      "Grass",
      "Poison"
    ],

S3 Select sends back the response in a streaming fashion. Optionally we can create a visitor that is called each time an event is emitted.

    SelectObjectContentEventVisitor listener = new SelectObjectContentEventVisitor() {
      @Override
      public void visit(RecordsEvent event) {
        System.out.println("record event");
      }

      @Override
      public void visit(SelectObjectContentEvent.StatsEvent event) {
        System.out.println("stats event: ");
        System.out.println(
            "uncompressed bytes processed: " + event.getDetails().getBytesProcessed());
      }

      @Override
      public void visit(SelectObjectContentEvent.EndEvent event) {
        System.out.println("end event");
      }
    };

Main.java

visit(RecordsEvent event) is called for each object, visit(SelectObjectContentEvent.StatsEvent event) is called at the end with information about the number of processed bytes and visit(SelectObjectContentEvent.EndEvent event) is also called at the end to signal the end of the processing.

The application calls the selectObjectContent method from the S3 client to send the request to S3 Select. This call returns an instance of SelectObjectContentEventStream, which provides the method getRecordsInputStream that returns the InputStream with the result. Optionally an application can pass a visitor instance mentioned above.

    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
        SelectObjectContentResult result = s3Client.selectObjectContent(request);
        SelectObjectContentEventStream payload = result.getPayload();
        InputStream is = payload.getRecordsInputStream(listener)) {
      is.transferTo(baos);
      System.out.println(new String(baos.toByteArray()));
    }

    // without visitor
    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
        SelectObjectContentResult result = s3Client.selectObjectContent(request);
        SelectObjectContentEventStream payload = result.getPayload();
        InputStream is = payload.getRecordsInputStream()) {
      is.transferTo(baos);
      System.out.println(new String(baos.toByteArray()));
    }

Main.java


When we access JSON with S3 Select, the structure of the SELECT query stays the same compared to a CSV query but the FROM clause changes. S3 Select always treats a JSON document as an array of root-level values, even if the JSON document only has one root element. Therefore, the FROM clause must always begin with S3Object[*]. However, for compatibility reasons, S3 Select allows you to omit the wildcard if you don't include a path. So FROM S3Object and FROM S3Object[*].path are both valid clauses.

Our JSON test data contains one root element with the name pokemon, which is an array of objects. To select data from this array we select all array elements with the asterisk and assign an alias p, which we can reference in the SELECT clause.

    String query = "select p.id,p.name from S3Object[*].pokemon[*] p";
    selectObject(s3Client, query, false);

Main.java

Output:

{"id":1,"name":"Bulbasaur"}
{"id":2,"name":"Ivysaur"}
{"id":3,"name":"Venusaur"}
{"id":4,"name":"Charmander"}
...

In this example, we select the Charmander Pokémon and request every object field.

    query = "select p from S3Object[*].pokemon[*] p where p.name = 'Charmander'";
    selectObject(s3Client, query, false);

Main.java

Output:

{"p":{"id":4,"num":"004","name":"Charmander","img":"http://www.serebii.net/pokemongo/pokemon/004.png","type":["Fire"],
 "height":"0.61 m","weight":"8.5 kg","candy":"Charmander Candy","candy_count":25,"egg":"2 km","spawn_chance":0.253e0,
  "avg_spawns":25.3e0,"spawn_time":"08:45","multipliers":[1.65e0],"weaknesses":["Water","Ground","Rock"],
  "next_evolution":[{"num":"005","name":"Charmeleon"},{"num":"006","name":"Charizard"}]}
}

The following example selects all the fire Pokémons. The field type is an array containing one or two elements.

    query = "select p.id,p.name,p.type from S3Object[*].pokemon[*] p where p.type[0] = 'Fire' or p.type[1] = 'Fire'";
    selectObject(s3Client, query, false);

Main.java

Output:

{"id":4,"name":"Charmander","type":["Fire"]}
{"id":5,"name":"Charmeleon","type":["Fire"]}
{"id":6,"name":"Charizard","type":["Fire","Flying"]}
{"id":37,"name":"Vulpix","type":["Fire"]}
{"id":38,"name":"Ninetales","type":["Fire"]}
{"id":58,"name":"Growlithe","type":["Fire"]}
{"id":59,"name":"Arcanine","type":["Fire"]}
{"id":77,"name":"Ponyta","type":["Fire"]}
{"id":78,"name":"Rapidash","type":["Fire"]}
{"id":126,"name":"Magmar","type":["Fire"]}
{"id":136,"name":"Flareon","type":["Fire"]}
{"id":146,"name":"Moltres","type":["Fire","Flying"]}

We can also use aggregate functions. This example returns the total number of objects.

    query = "select count(*) from S3Object[*].pokemon[*] p";
    selectObject(s3Client, query, true);

Main.java

Output:

{"_1":151}

Check out this documentation page for more information about SELECT with JSON:
https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-glacier-select-sql-reference-select.html

S3 Glacier Select

You can also access objects stored on S3 Glacier with the Select feature. Amazon calls this service S3 Glacier Select.

S3 Glacier Select does not support aggregate functions and the LIMIT clause, and it also can only process uncompressed CSV files.

Because S3 needs to restore Glacier objects first, it cannot respond immediately. Instead, an application submits a Glacier job and specifies an S3 bucket as an output location. Then, S3 restores the object from Glacier, executes the query, and stores the result in the given bucket.


To access S3 Glacier Select from a Go application, add the Glacier SDK.

go get github.com/aws/aws-sdk-go-v2/service/glacier

An application needs to create a Glacier client and the job input request. Then it submits the job request with InitiateJob. This job can run for several minutes or several hours, depending on the storage class.

func selectWithGlacier(cfg aws.Config) {
  glacierClient := glacier.NewFromConfig(cfg)
  key := "all_month.csv"
  query := "SELECT \"time\",place,mag FROM archive WHERE mag <> '' AND cast(mag as float) > 6"

  job := &glacier.InitiateJobInput{
    AccountId: aws.String("-"),
    VaultName: aws.String(bucketName),
    JobParameters: &glacierTypes.JobParameters{
      ArchiveId: &key,
      Tier:      aws.String("Expedited"), // or "Standard" or "Bulk"
      Type:      aws.String("select"),
      OutputLocation: &glacierTypes.OutputLocation{S3: &glacierTypes.S3Location{
        BucketName: aws.String("output-s3-bucket"),
        Prefix:     aws.String("1"),
      }},
      SelectParameters: &glacierTypes.SelectParameters{
        Expression:     aws.String(query),
        ExpressionType: glacierTypes.ExpressionTypeSql,
        InputSerialization: &glacierTypes.InputSerialization{
          Csv: &glacierTypes.CSVInput{
            FileHeaderInfo: glacierTypes.FileHeaderInfoUse,
          },
        },
        OutputSerialization: &glacierTypes.OutputSerialization{Csv: &glacierTypes.CSVOutput{}},
      },
    },
  }

  _, err := glacierClient.InitiateJob(context.Background(), job)
  check(err)
}

main.go

Like S3 Select, we have to describe the input and output format (currently, only uncompressed CSV files are supported) and specify the query. The job type must be set to select, and an S3 bucket must be specified as the output location.

Note that the name of the objects in the FROM clause must be archive (FROM archive). This references the object specified with the ArchiveId option.

Check out the following documentation pages for more information about S3 Glacier Select:
https://docs.aws.amazon.com/AmazonS3/latest/userguide/querying-glacier-archives.html
https://docs.aws.amazon.com/amazonglacier/latest/dev/glacier-select.html


This concludes this tutorial about S3 Select, a helpful addition to S3, which allows an application to extract and download only the data it needs. It is also worth noting that Minio, a self-hosted storage solution with an S3 compatible API, also supports the SELECT feature. This could be useful for local development, testing, or setting up a self-hosted object storage service.