Connection Pooling¶
Connection pooling is a technique of creating and managing a pool of connections that are ready for use, which greatly increase the performance of your applications by reducing the connection creation time.
The way of using connection pooling in Connector/Python with the X Protocol, is
by calling the mysqlx.get_client()
function as follows:
import mysqlx
connection_str = 'mysqlx://mike:s3cr3t!@localhost:33060'
options_string = '{}' # An empty document
client = mysqlx.get_client(connection_str, options_string)
session = client.get_session()
# (...)
session.close()
client.close()
The connection settings and options can also be a dict:
import mysqlx
connection_dict = {
'host': 'localhost',
'port': 33060,
'user': 'mike',
'password': 's3cr3t!'
}
options_dict = {}
client = mysqlx.get_client(connection_dict, options_dict)
session = client.get_session()
# (...)
session.close()
client.close()
All sessions created by mysqlx.Client.get_session()
have a pooled connection,
which after being closed by mysqlx.Session.close()
returns to the pool of
connections, so it can be reused.
Until now we didn’t supply any configuration for mysqlx.Client
. We can
set the pooling options by passing a dict or a JSON document string in the
second parameter.
The available options for the mysqlx.Client
are:
options = {
'pooling': {
'enabled': (bool), # [True | False], True by default
'max_size': (int), # Maximum connections per pool
"max_idle_time": (int), # milliseconds that a connection will remain active
# while not in use. By default 0, means infinite.
"queue_timeout": (int), # milliseconds a request will wait for a connection
# to become available. By default 0, means infinite.
}
}
To disable pooling in the client we can set the enabled
option to False
:
client = mysqlx.get_client(connection_str, {'pooling':{'enabled': False}})
To define the pool maximum size we can set the max_size
in the pooling
options. In the following example 'max_size': 5
sets 5 as the maximum number
of connections allowed in the pool.
connection_dict = {
'host': 'localhost',
'port': 33060,
'user': 'mike',
'password': 's3cr3t!'
}
options_dict = {'pooling':{'max_size': 5, 'queue_timeout': 1000}}
client = mysqlx.get_client(connection_dict, options_dict)
for _ in range(5):
client.get_session()
# This will raise a pool error:
# mysqlx.errors.PoolError: pool max size has been reached
client.get_session()
The queue_timeout
sets the maximum number of milliseconds a request will
wait for a connection to become available. The default value is 0
(zero)
and means infinite.
The following example shows the usage of threads that will have to wait for a session to become available:
import mysqlx
import time
import random
from threading import Thread
connection_dict = {
'host': 'localhost',
'port': 33060,
'user': 'mike',
'password': 's3cr3t!'
}
options_dict = {'pooling':{'max_size': 6, 'queue_timeout':5000}}
schema_name = 'test'
collection_name = 'collection_test04'
def job(client, worker_number):
"""This method keeps the tasks for a thread.
Args:
client (Client): to get the sessions to interact with the server.
worker_number (int): the id number for the worker.
"""
rand = random.Random()
worker_name = "Worker_{}".format(worker_number)
print("starting Worker: {} \n".format(worker_name))
# Take a nap before do the job, (gets a chance to other thread to start)
time.sleep(rand.randint(0,9)/10)
# Get a session from client
session1 = client.get_session()
# Get a schema to work on
schema = session1.get_schema(schema_name)
# Get the collection to put some documents in
collection = schema.get_collection(collection_name)
# Add 10 documents to the collection
for _ in range(10):
collection.add({'name': worker_name}).execute()
# close session
session1.close()
print("Worker: {} finish\n".format(worker_name))
def call_workers(client, job_thread, workers):
"""Create threads and start them.
Args:
client (Client): to get the sessions.
job_thread (method): the method to run by each thread.
workers (int): the number of threads to create.
"""
workers_list = []
for n in range(workers):
workers_list.append(Thread(target=job_thread, args=[client, n]))
for worker in workers_list:
worker.start()
# Get a client to manage the sessions
client = mysqlx.get_client(connection_dict, options_dict)
# Get a session to create an schema and a collection
session1 = client.get_session()
schema = session1.create_schema(schema_name)
collection = schema.create_collection(collection_name)
# Close the session to have another free connection
session1.close()
# Invoke call_workers with the client, the method to run by the thread and
# the number of threads, on this example 18 workers
call_workers(client, job, 18)
# Give some time for the workers to do the job
time.sleep(10)
session1 = client.get_session()
schema = session1.get_schema(schema_name)
collection = schema.get_collection(collection_name)
print(collection.find().execute().fetch_all())
The output of the last print will look like the following:
[{'_id': '00005b770c7f0000000000000389', 'name': 'Worker_2'}, \
{'_id': '00005b770c7f000000000000038a', 'name': 'Worker_2'}, \
{'_id': '00005b770c7f000000000000038b', 'name': 'Worker_2'}, \
{'_id': '00005b770c7f000000000000038c', 'name': 'Worker_2'}, \
{'_id': '00005b770c7f000000000000038d', 'name': 'Worker_2'}, \
{'_id': '00005b770c7f000000000000038e', 'name': 'Worker_2'}, \
{'_id': '00005b770c7f000000000000038f', 'name': 'Worker_2'}, \
{'_id': '00005b770c7f0000000000000390', 'name': 'Worker_2'}, \
{'_id': '00005b770c7f0000000000000391', 'name': 'Worker_2'}, \
{'_id': '00005b770c7f0000000000000392', 'name': 'Worker_2'}, \
{'_id': '00005b770c7f0000000000000393', 'name': 'Worker_1'}, \
{'_id': '00005b770c7f0000000000000394', 'name': 'Worker_4'}, \
{'_id': '00005b770c7f0000000000000395', 'name': 'Worker_1'}, \
{'_id': '00005b770c7f0000000000000396', 'name': 'Worker_4'}, \
{'_id': '00005b770c7f0000000000000397', 'name': 'Worker_7'}, \
{'_id': '00005b770c7f0000000000000398', 'name': 'Worker_1'}, \
{'_id': '00005b770c7f0000000000000399', 'name': 'Worker_4'}, \
{'_id': '00005b770c7f000000000000039a', 'name': 'Worker_7'}, \
{'_id': '00005b770c7f000000000000039b', 'name': 'Worker_1'}, \
{'_id': '00005b770c7f000000000000039c', 'name': 'Worker_4'}, \
{'_id': '00005b770c7f000000000000039d', 'name': 'Worker_7'}, \
{'_id': '00005b770c7f000000000000039e', 'name': 'Worker_1'}, \
{'_id': '00005b770c7f000000000000039f', 'name': 'Worker_8'}, \
{'_id': '00005b770c7f00000000000003a0', 'name': 'Worker_4'}, \
{'_id': '00005b770c7f00000000000003a1', 'name': 'Worker_7'}, \
... \
{'_id': '00005b770c7f000000000000043c', 'name': 'Worker_9'}]
The 18 workers took random turns to add their documents to the collection,
sharing only 6 active connections given by 'max_size': 6
in the
options_dict
given to the client instance at the time was created on
mysqlx.get_client(connection_dict, options_dict)()
.