9.3 Asynchronous Connectivity

Installing Connector/Python also installs the mysql.connector.aio package that integrates asyncio with the connector to allow integrating asynchronous MySQL interactions with an application.

Here are code examples that integrate mysql.connector.aio functionality:

Basic Usage:

from mysql.connector.aio import connect

# Connect to a MySQL server and get a cursor
cnx = await connect(user="myuser", password="mypass")
cur = await cnx.cursor()

# Execute a non-blocking query
await cur.execute("SELECT version()")

# Retrieve the results of the query asynchronously
results = await cur.fetchall()
print(results)

# Close cursor and connection
await cur.close()
await cnx.close()

Usage with context managers:

from mysql.connector.aio import connect

# Connect to a MySQL server and get a cursor
async with await connect(user="myuser", password="mypass") as cnx:
    async with await cnx.cursor() as cur:
        # Execute a non-blocking query
        await cur.execute("SELECT version()")

        # Retrieve the results of the query asynchronously
        results = await cur.fetchall()
        print(results)

Running Multiple Tasks Asynchronously

This example showcases how to run tasks asynchronously and the usage of to_thread, which is the backbone to asynchronously run blocking functions:

Note

The synchronous version of this example implements coroutines instead of following a common synchronous approach; this to explicitly demonstrate that only awaiting coroutines does not make the code run asynchronously. Functions included in the asyncio API must be used to achieve asynchronicity.

import asyncio
import os
import time

from mysql.connector.aio import connect

# Global variable which will help to format the job sequence output.
# DISCLAIMER: this is an example for showcasing/demo purposes,
# you should avoid global variables usage for production code.
global indent
indent = 0

# MySQL Connection arguments
config = {
    "host": "127.0.0.1",
    "user": "root",
    "password": os.environ.get("MYPASS", ":("),
    "use_pure": True,
    "port": 3306,
}


async def job_sleep(n):
    """Take a nap for n seconds.

    This job represents any generic task - it may be or not an IO task.
    """
    # Increment indent
    global indent
    offset = "\t" * indent
    indent += 1

    # Emulating a generic job/task
    print(f"{offset}START_SLEEP")
    await asyncio.sleep(n)
    print(f"{offset}END_SLEEP")

    return f"I slept for {n} seconds"


async def job_mysql():
    """Connect to a MySQL Server and do some operations.

    Run queries, run procedures, insert data, etc.
    """
    # Increment indent
    global indent
    offset = "\t" * indent
    indent += 1

    # MySQL operations
    print(f"{offset}START_MYSQL_OPS")
    async with await connect(**config) as cnx:
        async with await cnx.cursor() as cur:
            await cur.execute("SELECT @@version")
            res = await cur.fetchone()
            time.sleep(1)  # for simulating that the fetch isn't immediate
    print(f"{offset}END_MYSQL_OPS")

    # return server version
    return res


async def job_io():
    """Emulate an IO operation.

    `to_thread` allows to run a blocking function asynchronously.

    References:
        [asyncio.to_thread]: https://docs.python.org/3/library/asyncio-task.html#asyncio.to_thread
    """

    # Emulating a native blocking IO procedure
    def io():
        """Blocking IO operation."""
        time.sleep(5)

    # Increment indent
    global indent
    offset = "\t" * indent
    indent += 1

    # Showcasing how a native blocking IO procedure can be awaited,
    print(f"{offset}START_IO")
    await asyncio.to_thread(io)
    print(f"{offset}END_IO")

    return "I am an IO operation"


async def main_asynchronous():
    """Running tasks asynchronously.

    References:
        [asyncio.gather]: https://docs.python.org/3/library/asyncio-task.html#asyncio.gather
    """
    print("-------------------- ASYNCHRONOUS --------------------")

    # reset indent
    global indent
    indent = 0

    clock = time.time()

    # `asyncio.gather()` allows to run awaitable objects
    # in the aws sequence asynchronously.\

    # If all awaitables are completed successfully,
    # the result is an aggregate list of returned values.
    aws = (job_io(), job_mysql(), job_sleep(4))
    returned_vals = await asyncio.gather(*aws)

    print(f"Elapsed time: {time.time() - clock:0.2f}")

    # The order of result values corresponds to the
    # order of awaitables in aws.
    print(returned_vals, end="\n" * 2)

    # Example expected output
    # -------------------- ASYNCHRONOUS --------------------
    # START_IO
    #         START_MYSQL_OPS
    #                 START_SLEEP
    #         END_MYSQL_OPS
    #                 END_SLEEP
    # END_IO
    # Elapsed time: 5.01
    # ['I am an IO operation', ('8.3.0-commercial',), 'I slept for 4 seconds']


async def main_non_asynchronous():
    """Running tasks non-asynchronously"""
    print("------------------- NON-ASYNCHRONOUS -------------------")

    # reset indent
    global indent
    indent = 0

    clock = time.time()

    # Sequence of awaitable objects
    aws = (job_io(), job_mysql(), job_sleep(4))

    # The line below this docstring is the short version of:
    #     coro1, coro2, coro3 = *aws
    #     res1 = await coro1
    #     res2 = await coro2
    #     res3 = await coro3
    #     returned_vals = [res1, res2, res3]
    # NOTE: Simply awaiting a coro does not make the code run asynchronously!
    returned_vals = [await coro for coro in aws]  # this will run synchronously

    print(f"Elapsed time: {time.time() - clock:0.2f}")

    print(returned_vals, end="\n")

    # Example expected output
    # ------------------- NON-ASYNCHRONOUS -------------------
    # START_IO
    # END_IO
    #         START_MYSQL_OPS
    #         END_MYSQL_OPS
    #                 START_SLEEP
    #                 END_SLEEP
    # Elapsed time: 10.07
    # ['I am an IO operation', ('8.3.0-commercial',), 'I slept for 4 seconds']


if __name__ == "__main__":
    # `asyncio.run()`` allows to execute a coroutine (`coro`) and return the result.
    # You cannot run a coro without it.

    # References:
    #     [asyncio.run]: https://docs.python.org/3/library/asyncio-runner.html#asyncio.run
    assert asyncio.run(main_asynchronous()) == asyncio.run(main_non_asynchronous())

It shows these three jobs running asynchronously:

  • job_io: Emulate an I/O operation; with to_thread to allow running a blocking function asynchronously.

    Starts first, and takes five seconds to complete so is the last job to finish.

  • job_mysql: Connects to a MySQL server to perform operations such as queries and stored procedures.

    Starts second, and takes one second to complete so is the first job to finish.

  • job_sleep: Sleeps for n seconds to represent a generic task.

    Starts last, and takes four seconds to complete so is the second job to finish.

Note

A lock/mutex wasn't added to the indent variable because multithreading isn't used; instead the unique active thread executes all of the jobs. Asynchronous execution is about completing other jobs while waiting for the result of an I/O operation.

Asynchronous MySQL Queries

This is a similar example that uses MySQL queries instead of generic jobs.

Note

While cursors are not utilized in the these examples, the principles and workflow could apply to cursors by letting every connection object create a cursor to operate from.

Synchronous code to create and populate hundreds of tables:

import os
import time
from typing import TYPE_CHECKING, Callable, List, Tuple

from mysql.connector import connect

if TYPE_CHECKING:
    from mysql.connector.abstracts import (
        MySQLConnectionAbstract,
    )


# MySQL Connection arguments
config = {
    "host": "127.0.0.1",
    "user": "root",
    "password": os.environ.get("MYPASS", ":("),
    "use_pure": True,
    "port": 3306,
}

exec_sequence = []


def create_table(
    exec_seq: List[str], table_names: List[str], cnx: "MySQLConnectionAbstract", i: int
) -> None:
    """Creates a table."""
    if i >= len(table_names):
        return False

    exec_seq.append(f"start_{i}")
    stmt = f"""
    CREATE TABLE IF NOT EXISTS {table_names[i]} (
        dish_id INT(11) UNSIGNED AUTO_INCREMENT UNIQUE KEY,
        category TEXT,
        dish_name TEXT,
        price FLOAT,
        servings INT,
        order_time TIME
    )
    """
    cnx.cmd_query(f"DROP TABLE IF EXISTS {table_names[i]}")
    cnx.cmd_query(stmt)
    exec_seq.append(f"end_{i}")
    return True


def drop_table(
    exec_seq: List[str], table_names: List[str], cnx: "MySQLConnectionAbstract", i: int
) -> None:
    """Drops a table."""
    if i >= len(table_names):
        return False

    exec_seq.append(f"start_{i}")
    cnx.cmd_query(f"DROP TABLE IF EXISTS {table_names[i]}")
    exec_seq.append(f"end_{i}")
    return True


def main(
    kernel: Callable[[List[str], List[str], "MySQLConnectionAbstract", int], None],
    table_names: List[str],
) -> Tuple[List, List]:

    exec_seq = []
    database_name = "TABLE_CREATOR"

    with connect(**config) as cnx:
        # Create/Setup database
        cnx.cmd_query(f"CREATE DATABASE IF NOT EXISTS {database_name}")
        cnx.cmd_query(f"USE {database_name}")

        # Execute Kernel: Create or Delete tables
        for i in range(len(table_names)):
            kernel(exec_seq, table_names, cnx, i)

        # Show tables
        cnx.cmd_query("SHOW tables")
        show_tables = cnx.get_rows()[0]

    # Return execution sequence and table names retrieved with `SHOW tables;`.
    return exec_seq, show_tables


if __name__ == "__main__":
    # with num_tables=511 -> Elapsed time ~ 25.86
    clock = time.time()
    print_exec_seq = False
    num_tables = 511
    table_names = [f"table_sync_{n}" for n in range(num_tables)]

    print("-------------------- SYNC CREATOR --------------------")
    exec_seq, show_tables = main(kernel=create_table, table_names=table_names)
    assert len(show_tables) == num_tables
    if print_exec_seq:
        print(exec_seq)

    print("-------------------- SYNC DROPPER --------------------")
    exec_seq, show_tables = main(kernel=drop_table, table_names=table_names)
    assert len(show_tables) == 0
    if print_exec_seq:
        print(exec_seq)

    print(f"Elapsed time: {time.time() - clock:0.2f}")

    # Expected output with num_tables = 11:
    # -------------------- SYNC CREATOR --------------------
    # [
    #     "start_0",
    #     "end_0",
    #     "start_1",
    #     "end_1",
    #     "start_2",
    #     "end_2",
    #     "start_3",
    #     "end_3",
    #     "start_4",
    #     "end_4",
    #     "start_5",
    #     "end_5",
    #     "start_6",
    #     "end_6",
    #     "start_7",
    #     "end_7",
    #     "start_8",
    #     "end_8",
    #     "start_9",
    #     "end_9",
    #     "start_10",
    #     "end_10",
    # ]
    # -------------------- SYNC DROPPER --------------------
    # [
    #     "start_0",
    #     "end_0",
    #     "start_1",
    #     "end_1",
    #     "start_2",
    #     "end_2",
    #     "start_3",
    #     "end_3",
    #     "start_4",
    #     "end_4",
    #     "start_5",
    #     "end_5",
    #     "start_6",
    #     "end_6",
    #     "start_7",
    #     "end_7",
    #     "start_8",
    #     "end_8",
    #     "start_9",
    #     "end_9",
    #     "start_10",
    #     "end_10",
    # ]

That script creates and deletes {num_tables} tables, and is fully sequential in that it creates and deletes table_{i} before moving to table_{i+1}.

An asynchronous code example for the same task:

import asyncio
import os
import time
from typing import TYPE_CHECKING, Callable, List, Tuple

from mysql.connector.aio import connect

if TYPE_CHECKING:
    from mysql.connector.aio.abstracts import (
        MySQLConnectionAbstract,
    )


# MySQL Connection arguments
config = {
    "host": "127.0.0.1",
    "user": "root",
    "password": os.environ.get("MYPASS", ":("),
    "use_pure": True,
    "port": 3306,
}

exec_sequence = []


async def create_table(
    exec_seq: List[str], table_names: List[str], cnx: "MySQLConnectionAbstract", i: int
) -> None:
    """Creates a table."""
    if i >= len(table_names):
        return False

    exec_seq.append(f"start_{i}")
    stmt = f"""
    CREATE TABLE IF NOT EXISTS {table_names[i]} (
        dish_id INT(11) UNSIGNED AUTO_INCREMENT UNIQUE KEY,
        category TEXT,
        dish_name TEXT,
        price FLOAT,
        servings INT,
        order_time TIME
    )
    """
    await cnx.cmd_query(f"DROP TABLE IF EXISTS {table_names[i]}")
    await cnx.cmd_query(stmt)
    exec_seq.append(f"end_{i}")
    return True


async def drop_table(
    exec_seq: List[str], table_names: List[str], cnx: "MySQLConnectionAbstract", i: int
) -> None:
    """Drops a table."""
    if i >= len(table_names):
        return False

    exec_seq.append(f"start_{i}")
    await cnx.cmd_query(f"DROP TABLE IF EXISTS {table_names[i]}")
    exec_seq.append(f"end_{i}")
    return True


async def main_async(
    kernel: Callable[[List[str], List[str], "MySQLConnectionAbstract", int], None],
    table_names: List[str],
    num_jobs: int = 2,
) -> Tuple[List, List]:
    """The asynchronous tables creator...
    Reference:
        [as_completed]: https://docs.python.org/3/library/asyncio-task.html#asyncio.as_completed
    """
    exec_seq = []
    database_name = "TABLE_CREATOR"

    # Create/Setup database
    # ---------------------
    # No asynchronous execution is done here.
    # NOTE: observe usage WITH context manager.
    async with await connect(**config) as cnx:
        await cnx.cmd_query(f"CREATE DATABASE IF NOT EXISTS {database_name}")
        await cnx.cmd_query(f"USE {database_name}")
    config["database"] = database_name

    # Open connections
    # ----------------
    # `as_completed` allows to run awaitable objects in the `aws` iterable asynchronously.
    # NOTE: observe usage WITHOUT context manager.
    aws = [connect(**config) for _ in range(num_jobs)]
    cnxs: List["MySQLConnectionAbstract"] = [
        await coro for coro in asyncio.as_completed(aws)
    ]

    # Execute Kernel: Create or Delete tables
    # -------------
    # N tables must be created/deleted and we can run up to `num_jobs` jobs asynchronously,
    # therefore we execute jobs in batches of size num_jobs`.
    returned_values, i = [True], 0
    while any(returned_values):  # Keep running until i >= len(table_names) for all jobs
        # Prepare coros: map connections/cursors and table-name IDs to jobs.
        aws = [
            kernel(exec_seq, table_names, cnx, i + idx) for idx, cnx in enumerate(cnxs)
        ]
        # When i >= len(table_names) coro simply returns False, else True.
        returned_values = [await coro for coro in asyncio.as_completed(aws)]
        # Update table-name ID offset based on the number of jobs
        i += num_jobs

    # Close cursors
    # -------------
    # `as_completed` allows to run awaitable objects in the `aws` iterable asynchronously.
    for coro in asyncio.as_completed([cnx.close() for cnx in cnxs]):
        await coro

    # Load table names
    # ----------------
    # No asynchronous execution is done here.
    async with await connect(**config) as cnx:
        # Show tables
        await cnx.cmd_query("SHOW tables")
        show_tables = (await cnx.get_rows())[0]

    # Return execution sequence and table names retrieved with `SHOW tables;`.
    return exec_seq, show_tables


if __name__ == "__main__":
    # `asyncio.run()`` allows to execute a coroutine (`coro`) and return the result.
    # You cannot run a coro without it.

    # References:
    #     [asyncio.run]: https://docs.python.org/3/library/asyncio-runner.html#asyncio.run

    # with num_tables=511 and num_jobs=3 -> Elapsed time ~ 19.09
    # with num_tables=511 and num_jobs=12 -> Elapsed time ~ 13.15
    clock = time.time()
    print_exec_seq = False
    num_tables = 511
    num_jobs = 12
    table_names = [f"table_async_{n}" for n in range(num_tables)]

    print("-------------------- ASYNC CREATOR --------------------")
    exec_seq, show_tables = asyncio.run(
        main_async(kernel=create_table, table_names=table_names, num_jobs=num_jobs)
    )
    assert len(show_tables) == num_tables
    if print_exec_seq:
        print(exec_seq)

    print("-------------------- ASYNC DROPPER --------------------")
    exec_seq, show_tables = asyncio.run(
        main_async(kernel=drop_table, table_names=table_names, num_jobs=num_jobs)
    )
    assert len(show_tables) == 0
    if print_exec_seq:
        print(exec_seq)

    print(f"Elapsed time: {time.time() - clock:0.2f}")

    # Expected output with num_tables = 11 and num_jobs = 3:
    # -------------------- ASYNC CREATOR --------------------
    # 11
    # [
    #     "start_2",
    #     "start_1",
    #     "start_0",
    #     "end_2",
    #     "end_0",
    #     "end_1",
    #     "start_5",
    #     "start_3",
    #     "start_4",
    #     "end_3",
    #     "end_5",
    #     "end_4",
    #     "start_8",
    #     "start_7",
    #     "start_6",
    #     "end_7",
    #     "end_8",
    #     "end_6",
    #     "start_10",
    #     "start_9",
    #     "end_9",
    #     "end_10",
    # ]
    # -------------------- ASYNC DROPPER --------------------
    # [
    #     "start_1",
    #     "start_2",
    #     "start_0",
    #     "end_1",
    #     "end_2",
    #     "end_0",
    #     "start_3",
    #     "start_5",
    #     "start_4",
    #     "end_4",
    #     "end_5",
    #     "end_3",
    #     "start_6",
    #     "start_8",
    #     "start_7",
    #     "end_7",
    #     "end_6",
    #     "end_8",
    #     "start_10",
    #     "start_9",
    #     "end_9",
    #     "end_10",
    # ]

This output shows how the job flow isn't sequential in that up to {num_jobs} can be executed asynchronously. The jobs are run following a batch-like approach of {num_jobs} and waits until all terminate before launching the next batch, and the loop ends once no tables remain to create.

Performance comparison for these examples: the asynchronous implementation is about 26% faster when using 3 jobs, and 49% faster using 12 jobs. Note that increasing the number of jobs does add job management overhead which at some point evaporates the initial speed-up. The optimal number of jobs is problem-dependent, and is a value determined with experience.

As demonstrated, the asynchronous version requires more code to function than the non-asynchronous variant. Is it worth the effort? It depends on the goal as asynchronous code better optimizes performance, such as CPU usage, whereas writing standard synchronous code is simpler.

For additional information about the asyncio module, see the official Asynchronous I/O Python Documentation.