はじめに
- DynamoDB StreamとLambdaを使ってDynamoDBのデータに変更があった場合(追加、編集、削除)に、その変更内容を別アプリに自動で反映する処理を実装しました。
 - 自動化した処理をCDK化しました。
 
DynamoDB Streamとは
DynamoDB Streamは、Amazon Web ServicesのDynamoDBサービスに含まれる機能の一つで、DynamoDBテーブルの変更をリアルタイムでキャプチャし、変更を処理するアプリケーションとの間でデータを配信するためのサービスです。DynamoDB Streamを使用することで、アプリケーション開発者は、DynamoDBテーブルの変更を非同期的に受信し、変更を処理することができます。この機能は、サーバーレスアーキテクチャを使用する場合に特に有用であり、システムの拡張性と柔軟性を高めるのに役立ちます。
やりたいこと(イメージ図)
1.DynamoDBに対する項目の追加、編集、削除をDynamoDB StreamのイベントとしてLambdaに渡す
2.イベントを受け取ったLambdaは、更新されたDynamoDBの値を別アプリのレコードに反映させる
CDK環境を構築
CDK環境を構築します。
参考:AWS CDK
ストリームを有効化する
DynamoDBの「エクスポートおよびストリーム」タブから、Streamを有効化します。新旧イメージを選択すると、項目の変更時にLambdaが変更前と変更後の値を受け取ることができます。
CDKのstackを定義
依頼実行Lambdaを定義
Lambdaを定義します
参考:class NodejsFunction (construct)
NodeJsFunctionのビルド時に必要な依存関係を解決できず苦戦しました。nodeModules にライブラリを指定し、depsLockFilePath に明示的にバージョンを指定することで、エラーが解消されました。
参考:NodeJsFunctionのビルド時に依存関係を解決する3つの方法
const lambdaFunction = new NodejsFunction(
      this,
      "example",
      {
        bundling: {
          nodeModules: ["@kintone/rest-api-client"],
          externalModules: ["aws-sdk"],
          preCompilation: false,
          target: "es2020",
          minify: false,
          charset: Charset.UTF8,
        },
        architecture: Architecture.X86_64,
        runtime: Runtime.NODEJS_18_X,
        entry: "./lib/src/index.mjs",
        depsLockFilePath: "./lib/src/package-lock.json",
        handler: "handler",
        environment: environment,
        description:
          "ユーザー情報を別アプリ反映する",
      }
    );
dynamoDBのテーブルをインポート
テーブル名だけでなく、ストリームのArnも指定する必要がありました。
const table = dynamodb.Table.fromTableAttributes(
      this,
      tableEnvironment.tableName,
      {
        tableName: tableEnvironment.tableName,
        tableStreamArn: tableEnvironment.tableArn,
      }
    );
Streamの読み取り権限を追加
ここも苦戦しましたが、table.grantStreamRead(lambdaFunction)でStreamの読み取り権限を追加することができました。
参考:AWS CDK
 table.grantStreamRead(lambdaFunction);
    lambdaFunction.addEventSource(
      new DynamoEventSource(table, {
        startingPosition: lambda.StartingPosition.LATEST,
      })
    );
Lambdaの処理
このプログラムでは、event.Recordsが配列としてイベントを受け取っており、for文を使って一つ一つの処理を行っています。その中で、eventNameという変数に「INSERT」、「MODIFY」、「REMOVE」のいずれかが格納され、それぞれの変更の種類を教えてくれます。このようにして、event.Recordsの中身を順番に処理することで、データベースの変更を正確に把握することができます。
参考:DynamoDB StreamをトリガーにしてLambdaを実行する
export const handler = async (event) => {};
  for (const record of event.Records) {
    console.log("イベント:", record.eventName);
    console.log("Record:", record.dynamodb);
    if (record.eventName === "INSERT" || record.eventName === "MODIFY") {
      //レコードがアップサートされた時の処理
    } else if (record.eventName === "REMOVE") {
      //レコードが削除された時の処理
    }
  }
};
CDKデプロイ
CDKをデプロイします。
以下の記事を参考にしてください。
参考:Create a new CDK project