Task run concurrency limits help prevent too many tasks from running simultaneously. For example, if many tasks across multiple flows are designed to interact with a database that only allows 10 connections. Task run concurrency limits use task tags. You can specify an optional concurrency limit as the maximum number of concurrent task runs in a Running state for tasks with a given tag. If a task has multiple tags, it will run only if all tags have available concurrency. Tags without explicit limits are considered to have unlimited concurrency.
0 concurrency limit aborts task runsIf the concurrency limit is set to 0 for a tag, any attempt to run a task with that tag will abort instead of delay.

Execution behavior

Task tag limits are checked whenever a task run attempts to enter a Running state. If there are no concurrency slots available for any one of your task’s tags, it delays the transition to a Running state and instructs the client to try entering a Running state again in 30 seconds (or the value specified by the PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS setting).

Configure concurrency limits

Flow run concurrency limits are set at a work pool and/or work queue level While task run concurrency limits are configured through tags (as shown below), flow run concurrency limits are configured through work pools and/or work queues.
You can set concurrency limits on as few or as many tags as you wish. You can set limits through:
  • Prefect CLI
  • Prefect API by using PrefectClient Python client
  • Prefect server UI or Prefect Cloud

CLI

You can create, list, and remove concurrency limits with Prefect CLI concurrency-limit commands.
prefect concurrency-limit [command] [arguments]
CommandDescription
createCreate a concurrency limit by specifying a tag and limit.
deleteDelete the concurrency limit set on the specified tag.
inspectView details about a concurrency limit set on the specified tag.
lsView all defined concurrency limits.
For example, to set a concurrency limit of 10 on the ‘small_instance’ tag:
prefect concurrency-limit create small_instance 10
To delete the concurrency limit on the ‘small_instance’ tag:
prefect concurrency-limit delete small_instance
To view details about the concurrency limit on the ‘small_instance’ tag:
prefect concurrency-limit inspect small_instance

Python client

To update your tag concurrency limits programmatically, use PrefectClient.orchestration.create_concurrency_limit. create_concurrency_limit takes two arguments:
  • tag specifies the task tag on which you’re setting a limit.
  • concurrency_limit specifies the maximum number of concurrent task runs for that tag.
For example, to set a concurrency limit of 10 on the ‘small_instance’ tag:
from prefect import get_client

async with get_client() as client:
    # set a concurrency limit of 10 on the 'small_instance' tag
    limit_id = await client.create_concurrency_limit(
        tag="small_instance", 
        concurrency_limit=10
        )
To remove all concurrency limits on a tag, use PrefectClient.delete_concurrency_limit_by_tag, passing the tag:
async with get_client() as client:
    # remove a concurrency limit on the 'small_instance' tag
    await client.delete_concurrency_limit_by_tag(tag="small_instance")
If you wish to query for the current limit on a tag, use PrefectClient.read_concurrency_limit_by_tag, passing the tag: To see all of your limits across all of your tags, use PrefectClient.read_concurrency_limits.
async with get_client() as client:
    # query the concurrency limit on the 'small_instance' tag
    limit = await client.read_concurrency_limit_by_tag(tag="small_instance")