AWS Open Source Blog

New cluster-mode support in redis-py

What is Redis?

Redis is an incredibly popular open source (BSD licensed) in-memory data store, generally used as a database, cache, or message broker. Redis is regularly touted by developers as the most loved database and you’ve used Redis when on Twitter, GitHub, Instagram, Airbnb, and many other products.

Redis Clusters enable you to scale Redis as your usage of Redis grows. To connect to your Redis Cluster, there are a number of available clients supporting many different programming languages. In this blog post, we take a look at one of those clients, redis-py, and how you can now use this client to to access your data in your Redis Clusters.

This blog post is the first in a series of posts related to AWS contributions to open source Redis clients.

What is Redis Cluster?

Redis can run in one of two modes: cluster-mode enabled and cluster-mode disabled. With cluster-mode disabled, Redis cluster has a single primary node that holds the entire dataset, whereas with cluster-mode enabled, the dataset is split into multiple primaries, called “shards”. Each shard consists of a primary and zero or more replicas. The use of multiple shards allows storing a dataset much larger than what one machine may be able to hold as well as multiply throughput by the use of multiple CPUs.

Redis distributes its data by assigning each key a hashslot, an integer in the range 0 to 16383 (16384 total) which can be calculated using a CRC function. Every shard is responsible for handling a subset of hash slots. If we have two shards, for example, the first one might handle hash slots 0-8191 and the second one might handle hash slots 8192-16383. More and more shards can be added, with the restriction that every hash slot can only be handled by one shard.

As your application usage grows, you often need Redis to handle more load. You have two options, either switch to using a more powerful computer, or add more shards to your setup. Using more powerful machines is known as vertical scaling and adding more shards is known as horizontal scaling. At first you might choose to scale vertically, but there is a limit to how powerful your computer can be. Also, since Redis is primarily single-threaded, there is limited benefit in using additional CPUs. If you need significant scale you will eventually need to add more shards to your setup.

For more information about Redis Cluster we recommend reading the official documentation.

Redis-py and cluster-mode support

In order to use Redis, you need to connect to a Redis server using a Redis client. There are many open-source clients to choose from, available in a range of programming languages: C, C++, Python, PHP, NodeJS, Java and many more. As time goes on, more and more clients are being added. One of the most popular Python clients in use today is redis-py, an open source project started by Andy McCurdy. Redis-py is an active project, meaning that new features are being added and bugs are being fixed on a regular basis.

However, despite the fact that redis-py is well maintained, until recently it has been lacking support for cluster mode. This means that you could only use redis-py if all your data was sitting on one shard.

Those who wish to use redis-py with cluster support can use a spin-off project called redis-py-cluster, which was developed by Grokzen and is based on antirez’s redis-rb-cluster. Redis-py-cluster is a great project and it is well maintained, but many redis-py users have requested an all-in-one solution that doesn’t depend on 3rd party libraries.

To remedy this situation, AWS has teamed up with the open-source community that supports redis-py to add cluster-mode support to this popular client. Redis-py now supports cluster-mode natively, which means that you can use redis-py to interact with Redis Cluster without any 3rd party libraries or processes. Much of this code is based on redis-py-cluster, and we are indebted to Grokzen for the great work he has done on that project.

We will now walk you through setting up a Redis Cluster and showing you how to use the cluster mode that is native in redis-py.

Creating a Redis Cluster

Option 1: OSS Redis

If you don’t already have a Redis Cluster running, you can create one on your local machine using a utility provided with the Redis open-source code. You will need to clone the git repository, compile it, and then run the utility:

git clone https://github.com/redis/redis
[Output omitted]
cd redis
make
[Output omitted]
cd utils/create-cluster/
./create-cluster create
[Output omitted]
./create-cluster start
Starting 30001
Starting 30002
Starting 30003
Starting 30004
Starting 30005
Starting 30006

By default, this utility will create a cluster with six nodes, three primaries with one replica each. The nodes are set up to listen on ports 30001 to 30006. The utility calls all the MEET commands that are required by Redis Cluster so after running it you don’t need to do anything else to set up the cluster.

Option 2: Amazon ElastiCache cluster

To create a Redis Cluster using Amazon ElastiCache, please follow these instructions from the documentation.

Creating the client

First, let us look at how clients are created when cluster-mode is disabled:

>>> import redis
>>> r = redis.Redis(host='localhost', port=6379, db=9)

Here we created a client that connects to a server that is listening on port 6379 of IP localhost. We also chose to connect to the number 9 database.

In order to create a client that supports cluster-mode you need to create an instance of the class RedisCluster instead of Redis:

>>> import redis
>>> r = redis.RedisCluster(host='localhost', port=30001)

There are three differences to take note of:

  1. We created an instance of type RedisCluster and not Redis.
  2. We did not specify a database (db) number, because Redis Cluster only supports one database, which is always assigned the number 0.
  3. We passed the IP/port details of one of the nodes in the cluster to the constructor of RedisCluster. You can choose any node in the cluster. The initialization routine of the client will auto-discover all the other nodes in the cluster, and the client will know which nodes are primaries and which are replicas. In this example we created a connection to port 30001, because the create-cluster utility creates a cluster that listens on ports 30001-30006, but this doesn’t have to be the case.

Setup client logging

The following example can be added to your own code to enable ‘DEBUG’ logging inside the library during development.

import logging 
from redis import RedisCluster
 
logging.basicConfig()
logger = logging.getLogger('redis')
logger.setLevel(logging.DEBUG)
logger.propagate = True

Note that this logging is not recommended to be used in production as it can cause a performance drain and a slowdown of your client.

Using the client

After creating the client, we can start doing something useful, like setting the value of “foo” to “bar”:

>>> import redis
>>> r = redis.RedisCluster(host='localhost', port=30001)
>>> r.set('foo', 'bar')
True
>>> r.get('foo')
b'bar'

So how does the client know to which node to send the get and set commands ? The client keeps track of where each slot is handled and this allows it to route get and set commands to the proper slots.

Cluster management commands

A RedisCluster instance can be directly used to execute Redis commands. When a command is being executed through the cluster instance, the target node(s) will be internally determined. When using a key-based command, the target node will be the node that holds the key’s slot.

Cluster management commands and other commands that are not key-based have a parameter called ‘target_nodes’ where you can specify which nodes to execute the command on. In the absence of target_nodes, the command will be executed on the default cluster node.

As part of cluster instance initialization, the cluster’s default node is randomly selected from the cluster’s primaries, and will be updated upon re-initialization. Using r.get_default_node(), you can get the cluster’s default node, or you can change it using the ‘set_default_node’ method.

>>> # target-nodes: the node that holds 'foo1's key slot
>>> r.set('foo1', 'bar1')
>>> # target-nodes: the node that holds 'foo2's key slot
>>> r.set('foo2', 'bar2')
>>> # target-nodes: the node that holds 'foo1's key slot
>>> print(r.get('foo1'))
b'bar1'
>>> # target-node: default-node
>>> print(r.keys())
[b'foo1']
>>> # target-node: default-node
>>> r.ping()

Specifying Target Nodes

As mentioned above, all non key-based RedisCluster commands accept the keyword argument ‘target_nodes’ that specifies the node(s) that the command should be executed on.

The best practice is to specify target nodes using RedisCluster’s node flags:

PRIMARIES, REPLICAS, ALL_NODES, RANDOM.

When a nodes flag is passed along with a command, it will be internally resolved to the relevant node(s).
If the nodes topology of the cluster changes during the execution of a command, the client will be able to resolve the nodes flag again with the new topology and attempt to retry the command.

>>> # run cluster-meet command on all of the cluster's nodes
>>> r.cluster_meet('127.0.0.1', 30001, target_nodes=RedisCluster.ALL_NODES)
>>> # ping all replicas
>>> r.ping(target_nodes=RedisCluster.REPLICAS)
>>> # ping a random node
>>> r.ping(target_nodes=RedisCluster.RANDOM)
>>> # get the keys from all cluster nodes
>>> r.keys(target_nodes=RedisCluster.PRIMARIES)
[b'foo1', b'foo2']
>>> # execute bgsave in all primaries
>>> r.bgsave(RedisCluster.PRIMARIES)

You could also pass ClusterNodes directly if you want to execute a command on a specific node / node group that isn’t addressed by the nodes flag. However, if the command execution fails due to cluster topology changes, a retry attempt
will not be made, since the passed target node(s) may no longer be valid, and the relevant cluster or connection error will be returned.

>>> node = r.get_node('localhost', 30001)
>>> # Get the keys only for that specific node
>>> r.keys(target_nodes=node)
>>> # get Redis info from a subset of primaries
>>> subset_primaries = [node for node in r.get_primaries() if node.port > 30003]
>>> r.info(target_nodes=subset_primaries)

In addition, the RedisCluster instance can query the Redis instance of a specific node and execute commands on that node directly. The Redis client, however, does not handle cluster failures and retries.

>>> cluster_node = r.get_node(host='localhost', port=30001)
>>> print(cluster_node)
[host=127.0.0.1,port=30001,name=127.0.0.1:30001,server_type=primary,redis_connection=Redis<ConnectionPool<Connection<host=127.0.0.1,port=30001,db=0>>>]
>>> redis_node = cluster_node.redis_connection
>>> redis_node.client_list()
[{'id': '276', 'addr': '127.0.0.1:64108', 'fd': '16', 'name': '', 'age': '0', 'idle': '0', 'flags': 'N', 'db': '0', 'sub': '0', 'psub': '0', 'multi': '-1', 'qbuf': '26', 'qbuf-free': '32742', 'argv-mem': '10', 'obl': '0', 'oll': '0', 'omem': '0', 'tot-mem': '54298', 'events': 'r', 'cmd': 'client', 'user': 'default'}]>>> # Get the keys only for that specific node>>> r.keys()[b'foo1']

Multi keys commands

Redis supports multi-key commands in Cluster Mode, such as Set type unions or intersections, mset and mget, as long as the keys all hash to the same slot.

By using the RedisCluster client, you can use the known functions (e.g. mget, mset) to perform an atomic multi-key operation. However, you must ensure all keys are mapped to the same slot, otherwise a RedisClusterException will be thrown. Redis Cluster implements a concept called hash tags that can be used in order to force certain keys to be stored in the same hash slot (see Keys hash tag).

You can also use non-atomic variants of some of the multi-key operations, and pass keys that aren’t mapped to the same slot. The client will then map the keys to the relevant slots, sending the commands to the slots’ node owners. Non-atomic operations batch the keys according to their hash value, and then each batch is sent separately to the slot’s owner.

#  Atomic operations can be used when all keys are mapped to the same slot
>>> rc.mset({'{foo}1': 'bar1', '{foo}2': 'bar2'})
>>> rc.mget('{foo}1', '{foo}2')    [b'bar1', b'bar2']    
# Non-atomic multi-key operations splits the keys into different slots    
>>> rc.mset_nonatomic({'foo': 'value1', 'bar': 'value2', 'zzz': 'value3')    
>>> rc.mget_nonatomic('foo', 'bar', 'zzz')    [b'value1', b'value2', b'value3']

Read Only Mode

By default, Redis Cluster always returns a MOVED redirection response when accessing a replica node. You can overcome this limitation and scale read commands by triggering READONLY mode.

To enable READONLY mode pass read_from_replicas=True to RedisCluster constructor. When set to true, read commands will be distributed among the primary and its replicas in a round-robin manner.

READONLY mode can be set at runtime by calling the readonly() method with target_nodes=’replicas’, and read-write access can be restored by calling the readwrite() method.

Make sure you setup your client logging to ‘DEBUG’ before running the example below, so you can see which node executes the command

>>> import RedisCluster
# Use 'debug' log level to print the node that the command is executed on
>>> rc_readonly = RedisCluster(startup_nodes=startup_nodes,
...                     read_from_replicas=True)
>>> rc_readonly.set('{foo}1', 'bar1')
>>> for i in range(0, 4):
...     # Assigns read command to the slot's hosts in a Round-Robin manner
...     rc_readonly.get('{foo}1')
# set command would be directed only to the slot's primary node
>>> rc_readonly.set('{foo}2', 'bar2')
# reset READONLY flag
>>> rc_readonly.readwrite(target_nodes='replicas')
# now the get command would be directed only to the slot's primary node
>>> rc_readonly.get('{foo}1')

Pub-sub

ClusterPubSub can be created with RedisCluster’s pubsub() method. When pubsub() is called without specifying a node, a single node will be transparently chosen for the pub-sub connection on the first command execution. The node will be determined by:

  • Hashing the channel name in the request to find its key-slot
  • Selecting a node that handles the key-slot: If read_from_replicas is set to true, a replica can be selected.

Here’s an example of how to create a pub-sub instance, either with or without specifying a target node:

>>> p1 = rc.pubsub()
# p1 connection will be set to the node that holds 'foo' keyslot
>>> p1.subscribe('foo')
# p2 connection will be set to node 'localhost:6379'
>>> p2 = rc.pubsub(rc.get_node('localhost', 6379))

Known limitations with pub-sub:

Pattern subscribe and publish do not currently work properly due to key slots. If we hash a pattern like fo* we will receive a key-slot for that string but there are endless possibilities for channel names based on this pattern – unknowable in advance. This feature is not disabled but the commands are not currently recommended for use. This issue will be fixed in Redis 7 (https://github.com/redis/redis/pull/8621)

See redis-py-cluster documentation for more information.

Pipelines

Why use pipelines?

Whenever we send a command to a Redis server we need to wait for the command to reach the server and for the response to reach us back. This is known as the Round Trip Time (RTT). If we have many commands to execute, ideally we don’t want to wait for one command to return before sending the next command. When protocols support this feature, it is known as pipelining. Luckily, Redis has supported pipelining since the beginning.

The basic idea is simple: You concatenate a list of commands that you would like to execute, you send them all at once, and you receive a list of responses back where the order of responses corresponds to the order of requests. By doing this, we only incur one RTT for the whole list of commands.

Basic usage of pipelines

Using a pipeline in cluster-mode is very similar to the way it was done in non cluster-mode. It involves three steps:

  • Call the pipeline() method to return a pipeline instance.
  • Add commands to the pipeline.
  • Call the execute() method of the pipeline to send all the buffered commands.

Here’s an example:

>>> import redis
>>> r = redis.RedisCluster(host='localhost', port=30001)
>>> p = r.pipeline()
>>> p.set('foo','bar')
ClusterPipeline
>>> p.get('foo')
ClusterPipeline
>>> p.execute()
[True, b'bar']

You may have noticed that when you buffer a pipeline command a ClusterPipeline object is returned. This allows you to chain commands together:

>>> p.set('foo','bar').set('spam','eggs').get('foo').get('spam')
ClusterPipeline
>>> p.execute()
[True, True, b'bar', b'eggs']

Pipeline Internals

So how does the RedisClient execute the pipeline when you call execute()?

This is what happens under the hood:

  • Each buffered pipeline command is assigned to the node to which it needs to be sent.
  • The client sends each node the buffered commands that need to run on that node. The commands for all the nodes are sent in parallel, meaning we don’t wait for one node to respond before sending the commands of the next node.
  • The client waits for responses from all the nodes.
  • The responses are sorted to correspond to the original order of the requests and returned to the user.

Currently, only commands that are key-based are allowed in pipelines. This means that you can’t buffer any cluster management commands in a pipeline.

Best practices

Redis-py uses connection pools (see redis-py internals below). This means that connections are kept open to every shard and reused when needed. This means that we don’t have to incur the cost of establishing a new connection every time we send a command.

To make use of the connection pools it is very important not to create a new instance of RedisCluster for every command, but to create one instance and reuse it instead.

Redis-py internals

Connection pools: RedisCluster internally holds a Redis client instance to communicate with each of the shards in the cluster. Each of these Redis client instances maintains a pool of connections to its shard, which allows it to reuse connections when communicating with the shard. This saves the performance penalty of establishing a new connection for each command.

Slot management: When RedisCluster is initialized with a starting node, it will send a CLUSTER SLOTS command to that node. The command will return a list of the hash slots handled by the cluster, and the primary and replicas that handle each slot. RedisCluster saves this information in a dictionary and uses it whenever a key-based command needs to be sent to a shard. It will first calculate the hashslot of the key, and then look up the hashslot in the dictionary to retrieve the primary or replicas. The command will be sent to the primary, unless it is a READ command and READ ONLY MODE is enabled. In this case, the command is load balanced between the primary AND replicas.

MOVED responses: A redis-server will respond with a MOVED whenever a client tries to access a key whose hashslot is handled on a different shard. Typically this might mean that a topology change has taken place. The client will handle the required redirection and no actions on the part of the user are required. The MOVED response will contain the IP and port of the shard that handles the key. The client will update this hashslot in its dictionary to point to the new shard. When the number of MOVED responses reaches a certain threshold (set by default to 10), a complete refresh of the cluster topology will be triggered by calling CLUSTER SLOTS. The number of moves that needs to occur before a full topology refresh is triggered can be set by passing the value of reinitialize_steps to the constructor. To force every MOVED response to refresh the cluster topology set reinitialize_steps to 1. If you don’t want MOVED errors to ever trigger a cluster topology refresh, set reinitialize_steps to 0.

ASK responses: Redis has a mechanism of migrating slots between shards for rebalancing. During this process, all the keys of a certain slot are migrated from a source node to a destination node. Only when the process is complete will the client receive a MOVED error when trying to access keys of the migrated slot in the source node. However, during the migration process, if a certain key is not present on the source node anymore, the shard will respond with an ASK error and the IP and port of the destination node. The request then needs to be forwarded to the destination node with an ASKING prefix. RedisCluster handles this process transparently and it does not require any action on the part of the user.

Summary

We have seen how to use redis-py with cluster-mode enabled. We have also looked at multi-key commands, pipelines, pubsub and cluster configuration commands. We encourage you to download the redis-py source code from GitHub, set up a Redis Cluster and start experimenting with this exciting new feature!

Asher Yermiyahu

Asher Yermiyahu

SDE in AWS ElastiCache from Israel. Loves piano, history, languages, and Blackadder