Front-End Web & Mobile

Create real-time applications via serverless WebSockets with new AWS AppSync GraphQL subscriptions filtering capabilities

With AWS AppSync you can create serverless GraphQL APIs that simplify application development by providing a single endpoint to securely query or update data from multiple data sources, and leverage GraphQL subscriptions to implement engaging real-time application experiences by automatically publishing data updates to subscribed API clients via serverless WebSockets connections.

Taking advantage of GraphQL subscriptions to perform real-time operations, AppSync pushes data to clients that choose to listen to specific events from the backend. This means that you can easily and effortlessly make any supported data source in AppSync real-time. Connection management between clients and your API endpoint is handled automatically. With AppSync, any backend service can easily broadcast data to connected clients or clients can send data to other clients, depending on the use case. Real-time data, connections, scalability, fan-out and broadcasting are all handled by AppSync, allowing you to focus on your business use cases and requirements instead of dealing with the complex infrastructure required to manage WebSockets connections at scale.

GraphQL subscriptions are invoked in response to a mutation or change in data. In other words, when data is modified via a GraphQL mutation operation, AppSync notifies subscribers of that data change on successful completion of the mutation. In short, a mutation publishes data which is sent to clients subscribed to it following a publish-subscribe (pub/sub) pattern.

When it comes to real-time with GraphQL, subscriptions filtering is an important capability as there are use cases that require restricting or filtering the data specific groups of subscribed clients receive. Basic filtering can be achieved in GraphQL using arguments defined on the subscription query itself when invoking a subscription operation from an API client. For example, with arguments clients can subscribe and listen for data related to just a particular identifier (i.e. orderID, userID, groupId, roomId, deviceId, eventId, gameId, etc) or a combination of fields with AND logic (title X AND location Y).

While such client-side filtering capabilities are widely used by AppSync customers, there are certain use cases that require more complex filtering or authorization logic that is controlled and enforced centrally by filters defined in the GraphQL API backend itself. In addition to providing enhanced control over what data is sent to subscribed clients, backend defined filters simplify application code and reduce the amount of data sent to clients.

With the launch of enhanced subscriptions filtering capabilities in AppSync, developers can now create a broader range of real-time experiences in their applications by leveraging new operators, server-side filtering, and the ability to trigger subscription invalidations. The following table lists the new filter operators available with enhanced, server-side filtering:

Operator Description
eq Equal
ne Not equal
le Less than or equal
lt Less than
ge Greater than or equal
gt Greater than
contains Checks for a subsequence, or value in a set
notContains Checks for absence of a subsequence, or absence of a value in a set
beginsWith Checks for a prefix
in Checks for matching elements in a list
notIn Checks for matching elements not in a list
between Between two values

Enhanced filters are written in JSON in the response mapping template of subscriptions resolvers. A filter group defines a list or group of filters. Filters can have one or more of rules, each one with fields, operators and values. In the following example, multiple rules in a filter are evaluated with AND logic and multiple filters in a filter group with OR logic:

In addition to more logical operators, enhanced filtering in AppSync also enables subscription invalidation capabilities. With susbcriptions invalidation AppSync can automatically unsubscribe clients from the service side based on an event triggered by a GraphQL mutation and a specific invalidation filter, effectively closing a given WebSocket connection. Invalidation provides more control over WebSockets connections from the GraphQL API backend. Without the new invalidation capabilities, clients need to either trigger or to be aware of certain backend events (i.e. user removed from group chat by owner/admin or user unfollowed and his/her events should be removed from feed) then respond to these events by unsubscribing the user accordingly with additional client logic, closing the webSocket connection to stop data flowing (i.e. from removed group chat or events from an unfollowed user).

How it works

Let’s see how we can easily implement the new enhanced filtering capabilities in AWS AppSync with a practical example.

Go to the AWS AppSync Console and create a new API with the console wizard using the Tasks App sample project. Give your API a name such as Tasks API and click Create. Wait a couple of minutes until the AWS CloudFormation template is deployed in your account with a pre-configured AppSync GraphQL API, an Amazon Cognito User Pool associated with it and a linked Amazon DynamoDB table to store data:

Next go to Cognito Console and select the newly deployed user pool named AppSyncGroupsAuthExamplePool. Click on the App Integration option and, in the App client list section, take note of the Client ID for the user pool:

Using the AWS CLI, create two new users jdoe and janed with the following commands. Replace the password with a better and more secure option:

$ aws cognito-idp sign-up --client-id <YOUR_CLIENT_ID_HERE> --username jdoe --password P@ssw0rd --region us-west-2 --user-attributes '[{"Name":"given_name","Value":"John"},{"Name":"family_name","Value":"Doe"},{"Name":"email","Value":"jdoe@myemail.com"},{"Name":"gender","Value":"Male"},{"Name":"phone_number","Value":"+19999999999"}]'

$ aws cognito-idp sign-up --client-id <YOUR_CLIENT_ID_HERE> --username janed --password P@ssw0rd --region us-west-2 --user-attributes '[{"Name":"given_name","Value":"Jane"},{"Name":"family_name","Value":"Doe"},{"Name":"email","Value":"janed@myemail.com"},{"Name":"gender","Value":"Female"},{"Name":"phone_number","Value":"+19999999999"}]'

Back to the Cognito console, find the users and confirm their accounts:

The AppSync API is pre-configured to only allow data to be created by members of the ManagerGroup. Let’s add the user janed to the group as a manager:

Now we need to make a few modifications to the GraphQL API to test the new filtering capabilities. Back at the AppSync Console, select the Schema section on the left navigation area and replace the existing Tasks API schema with the following SDL. Click on Save Schema to commit the changes:

type Mutation {
    # In this example, only users in the ManagerGroup can create tasks
    createTask(
        owner: String,
        title: String,
        taskStatus: String,
        description: String,
        priority: Priority,
        department: String,
        classification: Int
    ): Task
        @aws_auth(cognito_groups: ["ManagerGroup"])
    # Both Employees and Managers can update a task's status
    updateTaskStatus(id: ID!, taskStatus: String!): Task
        @aws_auth(cognito_groups: ["EmployeeGroup","ManagerGroup"])
    updateTaskBody(id: ID!, title: String!, description: String!): Task
        @aws_auth(cognito_groups: ["ManagerGroup"])
    unsubscribe(email: AWSEmail): User 
        @aws_auth(cognito_groups: ["ManagerGroup"])
}

enum Priority {
    none
    lowest
    low
    medium
    high
    highest
}

type Query {
    # Users belonging to both EmployeesGroup and ManagerGroup can read a particular task
    getTask(id: ID!): Task
        @aws_auth(cognito_groups: ["EmployeeGroup","ManagerGroup"])
    # Only Managers can list all the Tasks
    allTasks(nextToken: String): TaskConnection
        @aws_auth(cognito_groups: ["ManagerGroup"])
}

type Subscription {
    onHightPriorityEngineeringTaskCreated: Task
        @aws_subscribe(mutations: ["createTask"])
}

type Task {
    id: ID
    owner: String
    title: String
    description: String
    taskStatus: String
    priority: Priority
    department: String
    classification: Int
}

type TaskConnection {
    items: [Task]
    nextToken: String
}

type User {
    email: AWSEmail
}

schema {
    query: Query
    mutation: Mutation
    subscription: Subscription
}

In the same screen, click on the resolver associated with the createTask mutation. In the Configure the request mapping template box, replace the existing template and use the following code:

## Request Mapping Template - createTask mutation

{
    "version" : "2017-02-28",
    "operation" : "PutItem",
    "key" : {
        "id": $util.dynamodb.toDynamoDBJson($util.autoId()),
    },
    "attributeValues" : $util.dynamodb.toMapValuesJson($ctx.args)
}

Go to the Data Sources section and create a new Local/None data source:

Back to the Schema section, click the Attach button to create a resolver for the new onHightPriorityEngineeringTaskCreated subscription. In the Create a new Resolver screen, select the Local data source created previously. Leave the default Request Mapping Template as is and replace the Response Mapping Template with:

## Response Mapping Template - onHightPriorityEngineeringTaskCreated subscription

$extensions.setSubscriptionFilter({
    "filterGroup": [
        {
           "filters" : [
                {
                    "fieldName" : "priority",
                    "operator" : "eq",
                    "value" : "high"
                },
                {
                    "fieldName" : "classification",
                    "operator" : "ge",
                    "value" : 7
                },
                {
                    "fieldName" : "department",
                    "operator" : "eq",
                    "value" : "engineering"
                }
           ]
           
        }
    ]
})

$util.toJson($context.result)

We’re adding an enhanced filter that only publishes data to subscribed clients if the created task is high priority, with a classification greater than or equals to 7, for the engineering department. We’re now ready to test.

Go to the Queries section of your API. The AppSync Query Explorer allows to create, test and prototype GraphQL queries and mutations. It also has a built-in WebSockets client to test GraphQL subscriptions.

Open a second browser window to the exact same link. Both browser windows should have the Query Explorer opened to prototype and test GraphQL operations for our API. In the first window, click on the Login with User Pools button, sign in with jdoe credentials and execute the following subscription. A WebSocket connection is established and the user listens for tasks created with the associated createTask mutation (a gray spinning wheel on the top right side of the screen confirms the secure wss:// connection is established and the authorized client is listening for data published by the related mutation):

subscription MySubscription {
  onHightPriorityEngineeringTaskCreated {
    classification
    department
    description
    id
    owner
    priority
    taskStatus
    title
  }
}

On the second browser window, click on the Login with User Pools button, sign in with janed credentials and execute the following mutation. As the user is part of the ManagerGroup she can invoke the mutation accordingly:

mutation MyMutation {
  createTask(description: "Low Priority Task", owner: "janed", taskStatus: "WIP", priority: low, department: "HR", classification: 2, title: "Low Priority Task") {
    id
    classification
    department
    description
    owner
    priority
    taskStatus
    title
  }
}

We notice the user jdoe doesn’t receive any data. The filter we configured blocked it accordingly since the filtering criteria was not met:

Back to the window where janed is logged in, let’s invoke a different mutation. This time the filtering criteria is met:

mutation MyMutation {
  createTask(description: "High Priority Task", owner: "janed", taskStatus: "WIP", priority: high, department: "engineering", classification: 8, title: "High Priority Task") {
    id
    classification
    department
    description
    owner
    priority
    taskStatus
    title
  }
}

The data is successfully pushed to jdoe, confirming enhanced filtering in AppSync is implemented and working as expected:

Defining enhanced filters from the client side

As mentioned previously, basic filtering can be achieved in GraphQL leveraging subscriptions arguments. The arguments are defined by the client making the subscription call. When enhanced filters are enabled in an AppSync subscription resolver with the new filtering $extensions, these subscriptions arguments are ignored as far as filtering is concerned and backend filters defined in the resolver take precedence and priority. If there are no filtering $extensions defined in the subscription resolver, arguments defined on the client are used for basic filtering.

While defining filters from a central location at the API backend is useful, there might a requirement for clients to define their own filters dynamically similar to how basic filtering with subscriptions arguments accomplish today, taking advantage of all advanced enhanced filtering logic and additional operators now available in AppSync.

Dynamic client-defined enhanced filters can be configured using a filter argument in the subscription. The GraphQL schema needs to be updated to reflect the new argument:

...
type Subscription {
    onHightPriorityEngineeringTaskCreated(filter: String): Task
        @aws_subscribe(mutations: ["createTask"])
}
...

A client would then execute the following query to subscribe to data changes based on the filter criteria defined in the query itself:

subscription MySubscription {
  onHightPriorityEngineeringTaskCreated (filter: "{\"classification\":{\"le\":2}}") {
    classification
    department
    description
    id
    owner
    priority
    taskStatus
    title
  }
}

A new resolver utility $util.transform.toSubscriptionFilter() generates dynamic enhanced filters based on the filter definition passed in the filter argument. In our Tasks API, the response mapping template for the onHightPriorityEngineeringTaskCreated subscription could be replaced with the following template in order to enable client-defined dynamic enhanced filters:

## Response Mapping Template - onHightPriorityEngineeringTaskCreated subscription

$extensions.setSubscriptionFilter($util.transform.toSubscriptionFilter($util.parseJson($ctx.args.filter)))

$util.toJson($context.result)

Here’s a JavaScript client code snippet based on the Amplify GraphQL library which would generate a similar dynamic enhanced filter in AppSync:

...
const syncExpressionObject = [{
  classification: { le: 2 }
}];
const syncExpressionStr = JSON.stringify(syncExpressionObject); 
const variables = { filter: syncExpressionStr };

const query = `
  subscription onHightPriorityEngineeringTaskCreated ($filter: String) {
  onHightPriorityEngineeringTaskCreated (filter: $filter) {
    classification
    department
    description
    id
    owner
    priority
    taskStatus
    title
  }
}
`;

API.graphql({
  query: query,
  variables: variables
}).subscribe();
...

The additional filter argument with the new resolver utility $util.transform.toSubscriptionFilter() make it straight forward to implement client-defined dynamic enhanced filtering logic for real-time subscriptions in AppSync. For more information about the transform utility for enhanced filtering including the simplified format of the filter query variable payload, refer to the Appsync documentation.

Unsubscribing users and closing client connections with Invalidation

Our sample Tasks API is successfully filtering data to subscribed clients from a central location in the API backend. What happens if our user jdoe leaves the engineering department and doesn’t need to receive messages about high priority engineering tasks anymore, how can we trigger a closure of existing running Websocket connections from his currently authorized clients in Appsync?

Subscriptions Invalidation in AppSync are triggered in response to a special payload or event defined in a mutation. When the mutation is invoked, the invalidation payload defined in the resolver is forwarded to a linked subscription that has an invalidation filter configured to unsubscribe the connection. If the invalidation payload sent as argument(s) in the mutation match the invalidation filter criteria in the subscription, invalidation takes place and the WebSocket connection is closed. For our use case, we configure the user e-mail as the mutation invalidation payload. If the e-mail sent in the mutation request matches the e-mail defined in the invalidation filter (retrieved as a claim from the JWT token which authorized the subscription connection in the first place), the WebSocket connection is invalidated. We could go one step further and leverage AppSync’s support to multiple authorization modes to potentially restrict the invalidation mutation so it is authorized by AWS IAM in order to limit this action to backend services or functions, however this is beyond the scope of this article.

In our API console Schema section, click the Attach button to create a resolver for the unsubscribe mutation. In the Create a new Resolver screen, select the Local data source created previously. Add the following request mapping template:

## Request Mapping Template - unsubscribe mutation

{
    "version": "2017-02-28",
    "payload": {
        "email": "${context.arguments.email}"
    }
}

We add the new invalidateSubscriptions() extension linking it with our onHightPriorityEngineeringTaskCreated subscription, replacing the default response mapping template with:

## Response Mapping Template - unsubscibe mutation

$extensions.invalidateSubscriptions({
        "subscriptionField": "onHightPriorityEngineeringTaskCreated",
        "payload": {
                "email": $context.arguments.email
        }
    })    
$util.toJson($context.result)

Back to the resolver for the onHightPriorityEngineeringTaskCreated subscription, we append an invalidation filter to the existing the Response Mapping Template:

## Response Mapping Template - onHightPriorityEngineeringTaskCreated subscription

$extensions.setSubscriptionFilter({
    "filterGroup": [
        {
           "filters" : [
                {
                    "fieldName" : "priority",
                    "operator" : "eq",
                    "value" : "high"
                },
                {
                    "fieldName" : "classification",
                    "operator" : "ge",
                    "value" : 7
                },
                {
                    "fieldName" : "department",
                    "operator" : "eq",
                    "value" : "engineering"
                }
           ]
           
        }
    ]
})

$extensions.setSubscriptionInvalidationFilter({
    "filterGroup": [
        {
           "filters" : [
                {
                    "fieldName" : "email",
                    "operator" : "eq",
                    "value" : $context.identity.claims.email
                }
           ]
        }
    ]
})

$util.toJson($context.result)

When authenticating a Cognito user to access the API and subscribe to real-time data, a JWT token is validated and it contains claims with user attributes such as their e-mail. The invalidation filter defined in $extensions.setSubscriptionInvalidationFilter()  checks if the e-mail address defined in the mutation invalidateSubscriptions() extension’s payload matches the e-mail address retrieved from the JWT token in $context.identity.claims.email, trigering the invalidation if there’s a match.

We test again with the same strategy using two browser windows and different logged in users. As part of the group ManagerGroup allowed to invoke the mutation to invalidate users WebSockets connections,  janed sets the invalidation payload:

mutation MyMutation {
  unsubscribe(email: "jdoe@myemail.com") {
    email
  }
}

The user jdoe who was previously subscribed to the  onHightPriorityEngineeringTaskCreated subscription has its e-mail address matched by the invalidation filter, causing the WebSocket connection to be forcibly closed. The gray spinning wheel stops showing the subscription was closed, John won’t receive any further real-time updates provided by it:

What if our user is connected and authenticated to multiple clients? We can simulate more connected clients authenticated with the same jdoe user in additional browser windows and confirm all subscriptions are successfuly invalidated simultaneously even if the user is still authenticated in the client:

Conclusion

Enhanced filtering in AppSync allows for more flexibility when developing real-time applications and building engaging digital experiences for your users, such as gaming leaderboards, e-learning, social media, live streaming, sports, media, audience engagement, interactive chat rooms, IoT dashboards for connected cars, healthcare or smart homes, and more. The new filter operators give developers more control on the data specific clients need to receive. Invalidation makes it easier to close connections from the AppSync backend, providing more options when it comes to authorization use cases. Best of all, AppSync is fully serverless and scales with your application’s demand. AppSync helps you improve your real-time user experience with no need to worry about managing WebSockets connections and the infrastructure required to provide real-time data at scale. For more information on enhanced GraphQL subscriptions filtering, refer to the AppSync documentation.