Front-End Web & Mobile

Connect Amplify DataStore with existing SQL datasources; adding offline and sync features in your application

This blog post was written by Brice Pellé – Principal Solution Architect – AWS AppSync.

Amplify DataStore is a library that provides a programming model for leveraging shared and distributed data without writing additional code for offline and online scenarios. With the Amplify CLI, you can easily set up a new application that leverages AWS AppSync and sets up Amazon DynamoDB to power your DataStore application. However, there are scenarios where developers want to use DataStore to build offline-first applications while leveraging existing data sources in their AWS account or in their on-prem datacenter. In this article, I will show you how you can use the Amplify CLI to build an AppSync API, for your DataStore application, that connects to an existing Aurora MySql database using an AppSync Lambda resolver and an Amazon RDS Proxy.DataStore + SQL data source architecture diagram

Starting point

I am going to build a backend that accesses two tables (Posts and Comments) in an existing Aurora MySQL database. These tables are used to power a blog and already contain data. The database is only accessible from a VPC, and does not allow public access. There is a RDS Proxy, with IAM database authentication, configured that a Lambda function can use to connect to the database.

CREATE TABLE Posts (
  id BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT,
  title TEXT NOT NULL,
  content LONGTEXT,
  post_status VARCHAR(20),
  createdAt DATETIME(3) NOT NULL,
  updatedAt DATETIME(3) NOT NULL,
  PRIMARY KEY (id)
);

CREATE TABLE Comments (
  id BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT,
  postID BIGINT(20) UNSIGNED NOT NULL, 
  content TEXT,
  createdAt DATETIME(3) NOT NULL,
  updatedAt DATETIME(3) NOT NULL,
  PRIMARY KEY (id),
  KEY (postID),
  CONSTRAINT `comments_posts_ibfk_1` FOREIGN KEY (`postID`) REFERENCES `Posts`(`id`)
);

Preparing the database

I start by adding metadata columns to the tables to turn them into versioned data sources. I introduce a _datastore_uuid column that stores the ID provided by DataStore when a new item is created. This column will be mapped back to the id field when data is returned to the client. It is also used to retrieve and lock a row during update operations. The tables’ own id field is not sent back to the client.

You can find more information about the added _version, _deleted, _lastChangedAt, and _ttl columns in the Conflict Detection and Sync documentation. Notice that I update the createdAt and updatedAt columns to automatically set the current time on creation and update.

ALTER TABLE Posts
ADD `_datastore_uuid` VARCHAR(36) NOT NULL DEFAULT '0',
ADD `_version` INT(11) UNSIGNED DEFAULT 1,
ADD `_deleted` TINYINT(1) DEFAULT 0,
ADD `_lastChangedAt` DATETIME(3) NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
ADD `_ttl` DATETIME(3) NULL DEFAULT NULL,

ADD INDEX (`_datastore_uuid`),

CHANGE createdAt createdAt TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
CHANGE updatedAt updatedAt TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3);

ALTER TABLE Comments
ADD `_datastore_uuid` VARCHAR(36) NOT NULL DEFAULT '0',
ADD `_version` INT(11) UNSIGNED DEFAULT 1,
ADD `_deleted` TINYINT(1) DEFAULT 0,
ADD `_lastChangedAt` DATETIME(3) NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
ADD `_ttl` DATETIME(3) NULL DEFAULT NULL,

ADD INDEX (`_datastore_uuid`),

CHANGE createdAt createdAt TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
CHANGE updatedAt updatedAt TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3);

I introduce two new Delta Sync tables, one for each of our model tables. A Delta Sync table tracks changes to a versioned data source and optimizes incremental updates. When our client comes online, it will query the AppSync API to synchronize any changes that may have occurred while it was offline. AppSync will then call the Lambda resolver to fetch the data. The Lambda function will determine whether to sync from the base tables, or to return the records of change from the Delta Sync table for an efficient and incremental update.

CREATE TABLE `DeltaSyncPosts` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `fromId` bigint(20) unsigned NOT NULL,
  `title` text NOT NULL,
  `content` longtext,
  `post_status` varchar(20) DEFAULT NULL,
  `createdAt` timestamp(3) NOT NULL,
  `updatedAt` timestamp(3) NOT NULL,
  
  `_datastore_uuid` varchar(36) NOT NULL DEFAULT '0',
  `_version` int(11) unsigned DEFAULT '1',
  `_deleted` tinyint(1) DEFAULT '0',
  `_lastChangedAt` datetime(3) NOT NULL,
  `_ttl` datetime(3) NOT NULL,
  
  PRIMARY KEY (`id`),
  KEY `_datastore_uuid` (`_datastore_uuid`),
  KEY `fromId` (`fromId`,`_lastChangedAt`,`_version`)
);

CREATE TABLE `DeltaSyncComments` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `fromId` bigint(20) unsigned NOT NULL,
  `postID` bigint(20) unsigned NOT NULL,
  `content` text,
  `createdAt` timestamp(3) NOT NULL,
  `updatedAt` timestamp(3) NOT NULL,
  
  `_datastore_uuid` varchar(36) NOT NULL DEFAULT '0',
  `_version` int(11) unsigned DEFAULT '1',
  `_deleted` tinyint(1) DEFAULT '0',
  `_lastChangedAt` datetime(3) NOT NULL,
  `_ttl` datetime(3) NOT NULL,
  
  PRIMARY KEY (`id`),
  KEY `_datastore_uuid` (`_datastore_uuid`),
  KEY `fromId` (`fromId`,`_lastChangedAt`,`_version`),
  KEY `comments_posts_ibfk_2` (`postID`),
  CONSTRAINT `comments_posts_ibfk_2` FOREIGN KEY (`postID`) REFERENCES `Posts` (`id`)
);

Finally, I set up an event to remove deleted items on a daily schedule. You can optionally turn this off or adjust the schedule frequency. For your Aurora MySQL database, make sure you set the event_scheduler to ON in your parameter group.

delimiter |

CREATE EVENT process_deleted_items
    ON SCHEDULE
      EVERY 1 DAY
    COMMENT 'purge deleted items'
    DO
      BEGIN
        DELETE FROM Comments
        WHERE
            _deleted = TRUE
            AND _ttl <= CURRENT_TIMESTAMP(3);
        DELETE FROM DeltaSyncComments 
        WHERE
            _ttl <= CURRENT_TIMESTAMP(3);
        DELETE FROM Posts 
        WHERE
            _deleted = TRUE
            AND _ttl <= CURRENT_TIMESTAMP(3);
        DELETE FROM DeltaSyncPosts 
        WHERE
            _ttl <= CURRENT_TIMESTAMP(3);
    END |

delimiter ;

Configuring the backend with the Amplify CLI

The current version of the Amplify CLI makes it easy to configure a DynamoDB-based Delta Sync data source that works with the Amplify DataStore client. I use this functionality to set up a GraphQL schema that represents my data model, and that implements the same GraphQL operations that the DataStore client expects: the sync queries, and the create, update, and delete mutations. I will then connect these fields to my database tables using an AppSync Lambda resolver created with the amplify @function directive.

I start by creating a new amplify project in my app directory with amplify init, and walk through the prompts. I then set up my functions. I will be using mysql2 to connect to the database. I’ll configure that in a layer that my Lambda function will use.

> amplify add function
? Select which capability you want to add: Lambda layer (shared code & resource used across functions)
? Provide a name for your Lambda layer: mysql2
? Select up to 2 compatible runtimes: NodeJS
? The current AWS account will always have access to this layer.
Optionally, configure who else can access this layer. (Hit <Enter> to skip)
✅ Lambda layer folders & files created:
amplify/backend/function/mysql2

> cd amplify/backend/function/mysql2/lib/nodejs
> npm install mysql2

I then create the lambda function dataStoreLink, and configure the attached layer.

> amplify add function
? Select which capability you want to add: Lambda function (serverless function)
? Provide an AWS Lambda function name: dataStoreLink
? Choose the runtime that you want to use: NodeJS
? Choose the function template that you want to use: Hello World

? Do you want to configure advanced settings? Yes
? Do you want to access other resources in this project from your Lambda function? No
? Do you want to invoke this function on a recurring schedule? No
? Do you want to configure Lambda layers for this function? Yes
? Provide existing layers or select layers in this project to access from this function (pick up to 5): mysql2
? Select a version for mysql2: 1
? Do you want to edit the local lambda function now? No
Successfully added resource dataStoreLink locally.

The function needs to be configured to access the Database Proxy in my VPC. I update the generated cloudformation JSON template at ./amplify/backend/function/dataStoreLink/datastoreLink-cloudformation-template.json to add the proper permissions and environment variables. To the Lambda execution role, I add a managed policy ARN to allow the function to access ENIs (read more about configuring Lambda for VPC access here, and examples with AWS AppSync here):

"LambdaExecutionRole": {
  "Type": "AWS::IAM::Role",
  "Properties": {
    ...
    "ManagedPolicyArns": [
      "arn:aws:iam::aws:policy/service-role/AWSLambdaENIManagementAccess"
    ]
  }
}

To the Lambda function definition, I add my VPC configuration and the environment variables my function will use to connect:

"LambdaFunction": {
  "Type": "AWS::Lambda::Function",
  "Properties": {
    ...
    "VpcConfig": {
      "SubnetIds": [
        "<YOUR_SUBNETS>"
      ],
      "SecurityGroupIds": [
        "<YOUR_SECURITY_GROUPS>"
      ]
    },
    "Environment": {
      ...
      "Variables": {
        ...
        "RDS_PROXY_URL": "<YOUR_PROXY_URL>",
        "USERNAME": "<YOUR_USERNAME>",
        "DATABASE": "<YOUR_DATABASE>"
      }
    }
  }
}

I add an inline policy resource that allows the function to use my database proxy:

"Resources": {
  ...
  "rdsConnectExecutionpolicy": {
    "DependsOn": [
      "LambdaExecutionRole"
    ],
    "Type": "AWS::IAM::Policy",
    "Properties": {
      "PolicyName": "rds-lambda-execution-policy",
      "Roles": [
        {
          "Ref": "LambdaExecutionRole"
        }
      ],
      "PolicyDocument": {
        "Version": "2012-10-17",
        "Statement": [
          {
            "Effect": "Allow",
            "Action": "rds-db:connect",
            "Resource": {
              "Fn::Sub": "arn:aws:rds-db:${AWS::Region}:${AWS::AccountId}:dbuser:prx-051f6c29bcbf9ffdd/*"
            }
          }
        ]
      }
    }
  }
}

Next, I set up a new AppSync API, and make sure to enable conflict detection with the auto merge resolution strategy.

> amplify add api
? Please select from one of the below mentioned services: GraphQL
? Provide API name: datastoreApi
? Choose the default authorization type for the API API key
? Enter a description for the API key:
? After how many days from now the API key should expire (1-365): 7
? Do you want to configure advanced settings for the GraphQL API Yes, I want to make some additional changes.
? Configure additional auth types? No
? Enable conflict detection? Yes
? Select the default resolution strategy Auto Merge
? Do you have an annotated GraphQL schema? Yes
? Provide your schema file path: ./base-schema.graphql

I model my database tables and relationships in my GraphQL schema using the @model directive. I do this in a file called ./base-schema.graphql. Setting queries to null in the @model directive turns off all query fields except for the synch operations. Those are the only ones we need.

type Post @model(queries: null) {
  id: ID!
  title: String!
  content: String
  post_status: POST_STATUS!
  comments: [Comment] @connection(keyName: "byPost", fields: ["id"])
}

type Comment @model(queries: null) @key(name: "byPost", fields: ["postID"]) {
  id: ID!
  content: String!
  postID: ID!
  post: Post @connection(fields: ["postID"])
}

enum POST_STATUS {
  draft
  published
}

I then execute the following script to generate my actual model and final artifacts.

#!/bin/sh

# 0. set API_DIR variable to the value of your API folder
API_DIR='<YOUR_API_FOLDER>'

# 1. copy the base schema to the amplify API input schema
cp ./base-schema.graphql amplify/backend/api/$API_DIR/schema.graphql

# 2. compile the input schema to generate the build artifacts
amplify api gql-compile

# 3. build the application models;
#    add them to a base-model folder for safe-keeping
amplify models codegen
mkdir -p ./base-models
cp ./src/models/* ./base-models/

cd amplify/backend/api/$API_DIR/

# 4. remove the resolvers generated by the first gql-compile pass
/bin/rm ./build/resolvers/*
/bin/rm ./resolvers/*

# 5. Move the build schema to the API input schema
cp ./build/schema.graphql ./schema.graphql

# 6. update the schema query and mutation fields with the @function 
#    directive specying to use the dataStoreLink Lambda function
echo "\nUpdating schema with dataStoreLink function..."
sed -i 's/\(sync.*(filter:.*$\)/\1 @function(name: "dataStoreLink-${env}")/' schema.graphql
sed -i 's/\(create.*(input:.*$\)/\1 @function(name: "dataStoreLink-${env}")/' schema.graphql
sed -i 's/\(update.*(input:.*$\)/\1 @function(name: "dataStoreLink-${env}")/' schema.graphql
sed -i 's/\(delete.*(input:.*$\)/\1 @function(name: "dataStoreLink-${env}")/' schema.graphql

# 7. recompile the schema to generate the final assets
amplify api gql-compile

# 8. copy the response resolvers to the input resolvers folder,
#    and update these files with a custom template
echo "\nUpdating response resolvers...\n"
cp ./build/resolvers/*res.vtl ./resolvers/
do_cat () {
  echo "Updating $1"
  cat << EOF > $1
  #if(\$context.prev.result && \$context.prev.result.errorMessage )
    \$utils.error(\$context.prev.result.errorMessage, \$context.prev.result.errorType,
    \$context.prev.result.data)
  #else
    \$utils.toJson(\$context.prev.result.data)
  #end
EOF
}
export -f do_cat
find ./resolvers -type f -exec sh -c 'do_cat "{}"' \;

At step 6, I update all the query and mutation fields with the @function directive that specifies the dataStoreLink Lambda function. Compiling the schema again at step 7 generates the required resolvers, AppSync functions, and defines an IAM role that allows AppSync to call the Lambda function. At step 8, I update the resolver response mapping templates. This new custom template checks to see if the Lambda function returned errors. If so, the resolver will trigger an AppSync error that is sent back to the application. This allows the DataStore client to handle errors such as unhandled conflict.

#if($context.prev.result && $context.prev.result.errorMessage )
    $utils.error($context.prev.result.errorMessage, $context.prev.result.errorType,
    $context.prev.result.data)
#else
    $utils.toJson($context.prev.result.data)
#end

Note: after pushing your changes to the cloud with amplify push, your models may get overwritten. Simply restore them with this command:

cp ./base-models/* .src/models/

Reviewing the Lambda function code

With this done, I have all the assets needed to deploy my API. Let’s take a closer look at the dataStoreLink function implementation at ./amplify/backend/function/dataStoreLink/src/index.js. Keep in mind that I’m using the database proxy with the AWS signer to connect to my database, but you can use any method that is appropriate for your use case.

const AWS = require('aws-sdk')
const mysql = require('mysql2/promise')

const { RDS_PROXY_URL, DATABASE, USERNAME, REGION } = process.env
const DeltaSyncConfig = {
  DeltaSyncTableTTL: 30, // 30 minutes
  BaseTableTTL: 30 * 24 * 60, // => 43200, 30 days in minutes
}
const MIN_TO_MILLI = 60 * 1_000
const DELTA_SYNC_PREFIX = 'DeltaSync'

const signer = new AWS.RDS.Signer({
  region: REGION,
  port: 3306,
  username: USERNAME,
  hostname: RDS_PROXY_URL,
})

const initConn = () => {
  const connectionConfig = {
    host: RDS_PROXY_URL,
    database: DATABASE,
    user: USERNAME,
    ssl: 'Amazon RDS',
    authPlugins: { mysql_clear_password: () => () => signer.getAuthToken() },
  }
  return mysql.createConnection(connectionConfig)
}

const tableName = (belongsTo) =>
  belongsTo[0].toUpperCase() + belongsTo.slice(1) + 's'

const deltaSyncTable = (baseTable) => DELTA_SYNC_PREFIX + baseTable

const toModel = (row, belongsTo) => {
  const mysql_id = row.id // include in log
  let pid, _deleted
  const id = row._datastore_uuid || `datastore-uuid-${row.id}`
  if (belongsTo) {
    pid = row.parentUUID
    _deleted = row.parentDeleted
  }
  return {
    ...row,
    mysql_id,
    id,
    _lastChangedAt: parseInt(new Date(row._lastChangedAt).getTime()),
    ...(belongsTo && pid && _deleted !== undefined
      ? { [belongsTo]: { id: pid, _deleted } }
      : null),
  }
}

const _runQuery = async (conn, sql, values) => {
  console.log(`execute sql >`)
  console.log(sql.trim().replace(/\s+/g, ' '))
  console.log(`with values >`)
  console.log(JSON.stringify(values, null, 2))
  const [result] = await conn.query(sql, values)
  console.log(`result >`)
  console.log(JSON.stringify(result, null, 2))
  return result
}

const _selectRow = async ({ table, lookupId, belongsTo, connection }) => {
  let sql = null
  if (belongsTo) {
    const parentTable = tableName(belongsTo)
    sql = `
    SELECT ${table}.*, ${parentTable}._datastore_uuid as parentUUID, ${parentTable}._deleted as parentDeleted
    FROM ${table}
    LEFT JOIN ${parentTable} ON ${table}.${belongsTo}ID = ${parentTable}.id
    WHERE ${table}.id = ?`
  } else {
    sql = `SELECT * FROM ${table} WHERE id = ?`
  }
  const values = [lookupId]

  // RETRIEVE the row and potential parent
  const [row] = await _runQuery(connection, sql, values)
  return row
}

const _writeToDeltaSyncTable = async ({ row, table, connection }) => {
  const ds = Object.assign({}, row)
  delete ds.id
  delete ds._ttl
  delete ds.parentUUID
  delete ds.parentDeleted
  const keys = Object.keys(ds)
  const sql = `INSERT INTO ${deltaSyncTable(table)} (${keys.join(
    ','
  )}, _ttl) VALUES(${keys
    .map((k) => '?')
    .join(',')}, TIMESTAMPADD(MINUTE, ?, CURRENT_TIMESTAMP(3)))`
  const values = keys.map((k) => ds[k])
  values.push(DeltaSyncConfig.DeltaSyncTableTTL)

  return await _runQuery(connection, sql, values)
}

const _doUpdateTransactionWithRowLock = async ({
  sql,
  values,
  uuid,
  table,
  connection,
  belongsTo,
}) => {
  // START TRANSACTION to lock the row
  await connection.query(`START TRANSACTION`)

  // TRY to lock the row for update
  const locksql = `SELECT id FROM ${table} WHERE _datastore_uuid=? LOCK IN SHARE MODE;`
  const [existing] = await _runQuery(connection, locksql, [uuid])

  // UPDATE the row - op specific
  const result = await _runQuery(connection, sql, values)

  const row = await _selectRow({
    table,
    lookupId: existing.id,
    belongsTo,
    connection,
  })

  // FINALLY COMMIT
  await connection.query('COMMIT;')

  if (result.affectedRows !== 1) {
    // INITIAL operation did not update a row, return unhandled mismatch
    console.error('Error: version mismatch on item')
    return {
      data: toModel(row, belongsTo),
      errorMessage: 'Conflict',
      errorType: 'ConflictUnhandled',
    }
  }

  // WRITE record to the DeltaSync table if row was created
  if (row && row.id) {
    await _writeToDeltaSyncTable({ row, table, connection })
  }

  return { data: toModel(row, belongsTo) }
}

const _query = async ({
  args: { limit = 1_000, lastSync, nextToken: inNextToken },
  table,
  connection,
  belongsTo,
}) => {
  const startedAt = Date.now()
  const moment = startedAt - DeltaSyncConfig.DeltaSyncTableTTL * MIN_TO_MILLI
  let sql
  let values = []
  let offset = 0
  if (inNextToken) {
    const tokenInfo = JSON.parse(Buffer.from(inNextToken, 'base64').toString())
    offset = tokenInfo.offset
  }

  if (belongsTo) {
    const parentTable = tableName(belongsTo)
    sql = `
    SELECT ${table}.*, ${parentTable}._datastore_uuid as parentUUID, ${parentTable}._deleted as parentDeleted
    FROM ${table}
    LEFT JOIN ${parentTable} ON ${table}.${belongsTo}ID = ${parentTable}.id`
  } else {
    sql = `SELECT * FROM ${table}`
  }

  if (lastSync === undefined) {
    // If the lastSync field is not specified, a Scan on the Base table is performed.
    sql += ` ORDER BY ${table}.id LIMIT ?, ?`
    values = [offset, limit]
  } else if (lastSync < moment) {
    // the value is before the current moment - DeltaSyncTTL, a Scan on the Base table is performed.
    sql += ` WHERE ${table}._lastChangedAt > FROM_UNIXTIME(?/1000) ORDER BY ${table}.id LIMIT ?, ?`
    values = [lastSync, offset, limit]
  } else {
    // the value is on or after the current moment - DeltaSyncTTL, a Query on the Delta table is performed.
    const dsTable = deltaSyncTable(table)
    if (belongsTo) {
      const parentTable = tableName(belongsTo)
      sql = `
      SELECT ${dsTable}.*, ${parentTable}._datastore_uuid as parentUUID, ${parentTable}._deleted as parentDeleted
      FROM ${dsTable}
      LEFT JOIN ${parentTable} ON ${dsTable}.${belongsTo}ID = ${parentTable}.id`
    } else {
      sql = `SELECT ${dsTable}.* FROM ${dsTable}`
    }
    sql += ` WHERE ${dsTable}._lastChangedAt > FROM_UNIXTIME(?/1000) ORDER BY ${dsTable}.id LIMIT ?, ?`
    values = [lastSync, offset, limit]
  }

  // FETCH the rows
  const rows = await _runQuery(connection, sql, values)

  // EVALUATE next token
  let nextToken = null
  if (rows.length >= limit) {
    nextToken = Buffer.from(
      JSON.stringify({ offset: offset + rows.length })
    ).toString('base64')
  }
  const items = rows.map((row) => toModel(row, belongsTo))

  return { data: { items, startedAt, nextToken } }
}

const _create = async ({ args: { input }, table, connection, belongsTo }) => {
  const { id, ...rest } = input
  const item = { ...rest, _datastore_uuid: id }

  if (belongsTo) {
    // fetch id of belongsTo item
    const sql = `select id from ${tableName(
      belongsTo
    )} where _datastore_uuid = ?`
    const values = [item[`${belongsTo}ID`]]
    const [row] = await _runQuery(connection, sql, values)
    item[`${belongsTo}ID`] = row.id
  }

  const keys = Object.keys(item)

  const sql = `INSERT INTO ${table} (${keys.join(',')}) VALUES(${keys
    .map((k) => '?')
    .join(',')})`
  const values = keys.map((k) => item[k])

  // INSERT the new row
  const result = await _runQuery(connection, sql, values)

  const row = await _selectRow({
    table,
    lookupId: result.insertId,
    belongsTo,
    connection,
  })

  // UPDATE the DeltaSync table if row was created
  if (row && row.id) {
    await _writeToDeltaSyncTable({ row, table, connection })
  }

  return { data: toModel(row, belongsTo) }
}

const _update = async ({ args: { input }, table, connection, belongsTo }) => {
  const { id: uuid, _version = 0, ...item } = input
  const keys = Object.keys(item)

  const sql = `UPDATE ${table} SET ${keys
    .map((k) => k + ' = ?')
    .join(
      ', '
    )}, _version=_version+1 WHERE _datastore_uuid = ? AND _version = ?`
  const values = keys.map((k) => item[k])
  values.push(uuid)
  values.push(_version)

  return await _doUpdateTransactionWithRowLock({
    sql,
    values,
    uuid,
    table,
    connection,
    belongsTo,
  })
}

const _delete = async ({ args: { input }, table, connection, belongsTo }) => {
  const { id: uuid, _version = 0 } = input
  const sql = `
  UPDATE ${table} SET _deleted=true, _version=_version+1, _ttl = TIMESTAMPADD(MINUTE, ?, CURRENT_TIMESTAMP(3))
  WHERE _datastore_uuid = ? AND _version = ?`
  const values = [DeltaSyncConfig.BaseTableTTL, uuid, _version]

  return await _doUpdateTransactionWithRowLock({
    sql,
    values,
    uuid,
    table,
    connection,
    belongsTo,
  })
}

const operations = {
  syncPosts: { fn: _query, table: 'Posts' },
  createPost: { fn: _create, table: 'Posts' },
  updatePost: { fn: _update, table: 'Posts' },
  deletePost: { fn: _delete, table: 'Posts' },

  syncComments: { fn: _query, table: 'Comments', belongsTo: 'post' },
  createComment: { fn: _create, table: 'Comments', belongsTo: 'post' },
  updateComment: { fn: _update, table: 'Comments', belongsTo: 'post' },
  deleteComment: { fn: _delete, table: 'Comments', belongsTo: 'post' },
}

exports.handler = async (event) => {
  try {
    console.log(`passed event >`, JSON.stringify(event, null, 2))
    const { fieldName: operation, arguments: args } = event

    if (operation in operations) {
      const connection = await initConn()
      const { fn, table, belongsTo } = operations[operation]
      const result = await fn.apply(undefined, [
        { table, args, connection, belongsTo },
      ])
      await connection.end()
      return result
    }
  } catch (error) {
    console.log(`Error: unhandled error >`, JSON.stringify(error, null, 2))
    return {
      data: null,
      errorMessage: error.message || JSON.stringify(error),
      errorType: 'InternalFailure',
    }
  }
}

When the function is triggered, it looks at the fieldName that it is resolving, and calls the appropriate function with the specific table for the model. If you take a closer look at the _query function, you see that it applies the steps outlined in Sync Operations to query either the base tables or the Delta Sync tables. The _update and _delete functions try to update a row with the exact same id and _version as specified in the operation. If not matching row is found, a ConflictUnhandled error is returned. Each mutation stores the new version of a row in the corresponding Delta Sync table.

Conclusion

In this article, I showed you how to leverage the Amplify CLI to build an AppSync powered backend for your DataStore application that connects to an existing relational database. Customizing your DataStore datasource is possible by implementing the query and mutation fields that the DataStore client uses, following the design as outlined in Conflict Detection and Sync, and using a Lambda function. This allows you to implement offline-first applications that connect to a wide variety of resources in your VPC, in your AWS account, or even in your on-prem datacenter. For example, you could connect to an Amazon Timestream table to power a read-only mobile app that allows users to review their latest logs at any time. Or, you could connect to an on-prem PostgreSQL database to power an offline-first field service application that allows agents to work anywhere and synch data when connectivity is available.