Building an OpenSearch Index from DynamoDB

In this post we will demonstrate how to set up an OpenSearch domain with a DynamoDB source

Introduction

We will be looking at how to set up an OpenSearch index from a DynamoDb table. We will assume you have some knowledge of DynamoDB and Lambdas and also are familiar with using CDK for deploying infrastructure into AWS.

DynamoDB

Firstly, let’s think about our DynamoDB table and how to set it up in a way that it is ready to be indexed. This is actually very straight forward and utilises a DynamoDB stream.

"A DynamoDB stream is an ordered flow of information about changes to items in a DynamoDB table." - AWS

Using CDK your table might look like:

const userTable = new dynamodb.Table(this, "UserTable", {
  tableName: "user-table",
  billingMode: BillingMode.PAY_PER_REQUEST,
  partitionKey: {name: "partitionKey", type: AttributeType.STRING},
  sortKey: {name: "sortKey", type: AttributeType.STRING},
  pointInTimeRecovery: true,
  stream: StreamViewType.NEW_IMAGE, // This is the important line!
});

If you are using the console instead of CDK see this.

There are different types of streams:

KEYS_ONLY - Only the key attributes of the modified item are written to the stream.
NEW_IMAGE - The entire item, as it appears after it was modified, is written to the stream.
OLD_IMAGE - The entire item, as it appeared before it was modified, is written to the stream.
NEW_AND_OLD_IMAGES - Both new and old item images of the item are written to the stream.

Here we have chosen NEW_IMAGE because we only need to know the new item to index.

This will create a table with a DynamoDB stream, which means any new, updated or deleted item events will be streamed into a place of your choosing; we choose a Lambda.

Lambda

So, next up, we must think about the indexing Lambda. There is currently no direct way to index your data from a stream to the OpenSearch domain, so we must add a middle man to do the work. The code for this Lambda and a few other helpful Lambdas can be found here on Github. This Lambda lives in the index-stream directory.

Here is a code snippet of this Lambda’s handler:

export const handler = async (event: DynamoDBStreamEvent): Promise<void> => {
  console.log("Received event from the user table");

  // In later setup we set the batchSize of this event source to be 1
  // so in reality this for loop will only run once each time
  // but if you wanted to increase the batch size this Lambda can handle that
  for (const record of event.Records) {
    if (!record.eventName || !record.dynamodb || !record.dynamodb.Keys) continue;

    const partitionKey = record.dynamodb.Keys.partitionKey.S;
    const sortKey = record.dynamodb.Keys.sortKey.S;
    // Note here that we are using a pk and sk
    // but maybe you are using only an id, this would look like:
    // const id = record.dynamodb.Keys.id.S;

    try {
      if (record.eventName === "REMOVE") {
        // removeDocumentFromOpenSearch will perform a DELETE request to your index
        return await removeDocumentFromOpenSearch(partitionKey, sortKey);
      } else {
        // There are 2 types of events left to handle, INSERT and MODIFY,
        // which will both contain a NewImage
        if (!record.dynamodb.NewImage) continue;

        const userDocument = DynamoDB.Converter.unmarshall(record.dynamodb.NewImage) as User;
        // indexDocumentInOpenSearch will perform a PUT request to your index
        return await indexDocumentInOpenSearch(userDocument, partitionKey, sortKey);
      }
    } catch (error) {
      console.error("Error occurred updating OpenSearch domain", error);
      throw error;
    }
  }
};

In CDK you can create your Lambda as follows:

const userTableIndexingFunction = new Function(this, "UserTableIndexingFunction", {
  functionName: "UserTableIndexingFunction",
  code: Code.fromAsset("user-table-indexing-lambda-dist-folder"),
  runtime: Runtime.NODEJS_14_X,
  handler: "index.handler",
});

Then we can add the DynamoDB stream as a source event to this Lambda.

userTableIndexingFunction.addEventSource(
  new DynamoEventSource(userTable, {
    startingPosition: StartingPosition.TRIM_HORIZON,
    batchSize: 1,
    retryAttempts: 3,
  }),
);

There are 2 types of starting positions:

TRIM_HORIZON - Start reading at the last untrimmed record in the shard in the system,
               which is the oldest data record in the shard.
               In other words, the stream will look at all the item events and
               deal with them in chronological order (oldest event to most recent event)

      LATEST - Start reading just after the most recent record in the shard,
               so that you always read the most recent data in the shard.
               In other words, the stream will look at all the item events and
               deal with the most recent first and work down until the oldest event.

For this example, we therefore use TRIM_HORIZON so that the index will reflect the data in its current state.

OpenSearch

Now, let’s look at the actual OpenSearch domain setup. Now, AWS suggests some substantial power (and therefore money) for a production ready domain. You can find the best practices here. For this example we will use a very small setup with no redundancy, however, feel free to scale this up based on your needs:

const openSearchDomain = new Domain(this, "OpenSearchDomain", {
  version: EngineVersion.OPENSEARCH_1_0,
  capacity: {dataNodeInstanceType: "t3.small.search", dataNodes: 1, masterNodes: 0},
  ebs: {enabled: true, volumeSize: 50, volumeType: EbsDeviceVolumeType.GENERAL_PURPOSE_SSD},
  logging: {slowSearchLogEnabled: true, appLogEnabled: true, slowIndexLogEnabled: true},
});

This will deploy your OpenSearch domain, this can take some time, so be patient.

One final thing to think about is the granting your Lambda the rights to read and write to your domain:

openSearchDomain.grantIndexReadWrite("user-index", userTableIndexingFunction);

This will allow your Lambda to do its job.

That's it, you are all set up and ready to index any new data into your OpenSearch index. For indexing existing data, you can find a helpful Lambda under the index-data directory here on Github.


If you found all of this CDK magic interesting and want to learn more, then check out our AWS Serverless Typescript course.

Article By
blog author

Emma Moinat

Senior Engineer & AWS Community Builder