Global concurrency limits allow you to manage execution efficiently, controlling how many tasks, flows, or other operations can run simultaneously. They are ideal for optimizing resource usage, preventing bottlenecks, and customizing task execution.
Clarification on use of the term ‘tasks’In the context of global concurrency and rate limits, “tasks” doesn’t specifically refer to Prefect tasks, but to concurrent units of work in general — such as those managed by an event loop or TaskGroup in asynchronous programming. These general “tasks” could include Prefect tasks when they are part of an asynchronous execution environment.
Rate Limits ensure system stability by governing the frequency of requests or operations. They are suitable for preventing overuse, ensuring fairness, and handling errors gracefully. When selecting between Concurrency and Rate Limits, consider your primary goal. Choose Concurrency Limits for resource optimization and task management. Choose Rate Limits to maintain system stability and fair access to services. The core difference between a rate limit and a concurrency limit is the way slots are released. With a rate limit, slots are released at a controlled rate, controlled by slot_decay_per_second. With a concurrency limit, slots are released when the concurrency manager exits.

Manage Global concurrency limits and rate limits

Create, read, edit, and delete concurrency limits through the Prefect UI. When creating a concurrency limit, you can specify the following parameters:
  • Name: The name of the concurrency limit. This name is also how you’ll reference the concurrency limit in your code. Special characters, such as /, %, &, >, <, are not allowed.
  • Concurrency Limit: The maximum number of slots that can be occupied on this concurrency limit.
  • Slot Decay Per Second: Controls the rate at which slots are released when the concurrency limit is used as a rate limit. You must configure this value when using the rate_limit function.
  • Active: Whether or not the concurrency limit is in an active state.

Active vs. inactive limits

Global concurrency limits can be in an active or inactive state:
  • Active: In this state, slots can be occupied, and code execution is blocked when slots are unable to be acquired.
  • Inactive: In this state, slots are not occupied, and code execution is not blocked. Concurrency enforcement occurs only when you activate the limit.

Slot decay

You can configure global concurrency limits with slot decay. Use this when you want a rate limit. It will govern the pace at which slots are released or become available for reuse after being occupied. These slots represent the concurrency capacity within a specific concurrency limit. This is the rate at which these slots “decay” or refresh. To configure slot decay, set the slot_decay_per_second parameter when defining or adjusting a concurrency limit. For practical use, consider the following:
  • Higher values: Setting slot_decay_per_second to a higher value, such as 5.0, results in slots becoming available relatively quickly. In this scenario, a slot that was occupied by a task will free up after just 0.2 (1.0 / 5.0) seconds.
  • Lower values: Conversely, setting slot_decay_per_second to a lower value, like 0.1, causes slots to become available more slowly. In this scenario it takes 10 (1.0 / 0.1) seconds for a slot to become available again after occupancy.
Slot decay provides fine-grained control over the availability of slots, enabling you to optimize the rate of your workflow based on your specific requirements.

Through the UI

You can create, read, edit, and delete concurrency limits through the Prefect UI. When creating a concurrency limit, you can specify the following parameters:
  • Name: The name of the concurrency limit. This name is also how to reference the concurrency limit in your code. Special characters, such as /, %, &, >, <, are not allowed.
  • Concurrency Limit: The maximum number of slots that can be occupied on this concurrency limit.
  • Slot Decay Per Second: Controls the rate at which slots are released when the concurrency limit is used as a rate limit. You must configure this value when using the rate_limit function.
  • Active: Whether or not the concurrency limit is in an active state.

Through the CLI

You can create, read, edit, and delete global concurrency limits through the Prefect CLI. To create a new concurrency limit, use the prefect gcl create command. You must specify a --limit argument, and can optionally specify a --slot-decay-per-second and --disable argument.
prefect gcl create my-concurrency-limit --limit 5 --slot-decay-per-second 1.0
Inspect the details of a concurrency limit using the prefect gcl inspect command:
prefect gcl inspect my-concurrency-limit
To update a concurrency limit, use the prefect gcl update command. You can update the --limit, --slot-decay-per-second, --enable, and --disable arguments:
prefect gcl update my-concurrency-limit --limit 10
prefect gcl update my-concurrency-limit --disable
To delete a concurrency limit, use the prefect gcl delete command:
prefect gcl delete my-concurrency-limit
Are you sure you want to delete global concurrency limit 'my-concurrency-limit'? [y/N]: y
Deleted global concurrency limit with name 'my-concurrency-limit'.
See all available commands and options by running prefect gcl --help.

Using the concurrency context manager

The concurrencycontext manager allows control over the maximum number of concurrent operations. Select either the synchronous (sync) or asynchronous (async) version, depending on your use case. Here’s how to use it:
Concurrency limits are implicitly created When using the concurrency context manager, it creates the concurrency limit in an inactive state (if it does not already exist).
Sync
from prefect import flow, task
from prefect.concurrency.sync import concurrency


@task
def process_data(x, y):
    with concurrency("database", occupy=1):
        return x + y


@flow
def my_flow():
    for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
        process_data.submit(x, y)


if __name__ == "__main__":
    my_flow()
Async
import asyncio
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency


@task
async def process_data(x, y):
    async with concurrency("database", occupy=1):
        return x + y


@flow
async def my_flow():
    for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
        await process_data.submit(x, y)


if __name__ == "__main__":
    asyncio.run(my_flow())
  1. The code imports the necessary modules and the concurrency context manager. Use the prefect.concurrency.sync module for sync usage and the prefect.concurrency.asyncio module for async usage.
  2. It defines a process_data task, taking x and y as input arguments. Inside this task, the concurrency context manager controls concurrency, using the database concurrency limit and occupying one slot. If another task attempts to run with the same limit and no slots are available, that task is blocked until a slot becomes available.
  3. A flow named my_flow is defined. Within this flow, it iterates through a list of tuples, each containing pairs of x and y values. For each pair, the process_data task is submitted with the corresponding x and y values for processing.

Using rate_limit

The Rate Limit feature provides control over the frequency of requests or operations, ensuring responsible usage and system stability. Depending on your requirements, you can use rate_limit to govern both synchronous (sync) and asynchronous (async) operations. Here’s how to make the most of it:
Slot decay When using the rate_limit function, the concurrency limit must have a slot decay configured.
Sync
from prefect import flow, task
from prefect.concurrency.sync import rate_limit


@task
def make_http_request():
    rate_limit("rate-limited-api")
    print("Making an HTTP request...")


@flow
def my_flow():
    for _ in range(10):
        make_http_request.submit()


if __name__ == "__main__":
    my_flow()
Async
import asyncio

from prefect import flow, task
from prefect.concurrency.asyncio import rate_limit


@task
async def make_http_request():
    await rate_limit("rate-limited-api")
    print("Making an HTTP request...")


@flow
async def my_flow():
    for _ in range(10):
        await make_http_request.submit()


if __name__ == "__main__":
    asyncio.run(my_flow())
  1. The code imports the necessary modules and the rate_limit function. Use the prefect.concurrency.sync module for sync usage and the prefect.concurrency.asyncio module for async usage.
  2. It defines a make_http_request task. Inside this task, the rate_limit function ensures that the requests are made at a controlled pace.
  3. A flow named my_flow is defined. Within this flow the make_http_request task is submitted 10 times.

Use concurrency and rate_limit outside of a flow

Useconcurrency and rate_limit outside of a flow to control concurrency and rate limits for any operation.
import asyncio

from prefect.concurrency.asyncio import rate_limit


async def main():
    for _ in range(10):
        await rate_limit("rate-limited-api")
        print("Making an HTTP request...")



if __name__ == "__main__":
    asyncio.run(main())

Use cases

Throttling task submission

Throttling task submission helps avoid overloading resources, complying with external rate limits, or ensuring a steady, controlled flow of work. In this scenario the rate_limit function throttles the submission of tasks. The rate limit acts as a bottleneck, ensuring that tasks are submitted at a controlled rate, governed by the slot_decay_per_second setting on the associated concurrency limit.
from prefect import flow, task
from prefect.concurrency.sync import rate_limit


@task
def my_task(i):
    return i


@flow
def my_flow():
    for _ in range(100):
        rate_limit("slow-my-flow", occupy=1)
        my_task.submit(1)


if __name__ == "__main__":
    my_flow()

Manage database connections

Manage the maximum number of concurrent database connections to avoid exhausting database resources. This scenario uses a concurrency limit named database. It has a maximum concurrency limit that matches the maximum number of database connections. The concurrency context manager controls the number of database connections allowed at any one time.
notest
from prefect import flow, task, concurrency
import psycopg2

@task
def database_query(query):
    # Here we request a single slot on the 'database' concurrency limit. This
    # will block in the case that all of the database connections are in use
    # ensuring that we never exceed the maximum number of database connections.
    with concurrency("database", occupy=1):
        connection = psycopg2.connect("<connection_string>")
        cursor = connection.cursor()
        cursor.execute(query)
        result = cursor.fetchall()
        connection.close()
        return result

@flow
def my_flow():
    queries = ["SELECT * FROM table1", "SELECT * FROM table2", "SELECT * FROM table3"]

    for query in queries:
        database_query.submit(query)

if __name__ == "__main__":
    my_flow()

Parallel data processing

Limit the maximum number of parallel processing tasks. This scenario limits the number of process_data tasks to five at any one time. The concurrency context manager requests five slots on the data-processing concurrency limit. This blocks until five slots are free and then submits five more tasks, ensuring that the maximum number of parallel processing tasks is never exceeded.
import asyncio
from prefect.concurrency.sync import concurrency


async def process_data(data):
    print(f"Processing: {data}")
    await asyncio.sleep(1)
    return f"Processed: {data}"


async def main():
    data_items = list(range(100))
    processed_data = []

    while data_items:
        with concurrency("data-processing", occupy=5):
            chunk = [data_items.pop() for _ in range(5)]
            processed_data += await asyncio.gather(
                *[process_data(item) for item in chunk]
            )

    print(processed_data)


if __name__ == "__main__":
    asyncio.run(main())