@flow
decorator.
When a function becomes a flow, its behavior changes, giving it the following advantages:
- All runs of the flow have persistent state. Transitions between states are recorded, allowing you to observe and act on flow execution.
- Input arguments can be type validated as workflow parameters.
- Retries can be performed on failure.
- Timeouts can be enforced to prevent unintentional, long-running workflows.
- Metadata about flow runs, such as run time and final state, is automatically tracked.
- A flow can be elevated to a deployment, which exposes a remote API for interacting with it.
Run your first flow
PrerequisitesThis section requires that you:
- Have installed Prefect
- Connected to Prefect Cloud or a self-hosted server instance
@flow
decorator.
The script below fetches statistics about the main Prefect repository.
(Note that httpx is an HTTP client library and a dependency of Prefect.)
Turn this function into a Prefect flow and run the script:
repo_info.py
Flows can contain arbitrary PythonAs shown above, flow definitions can contain arbitrary Python logic.
Supported functions
Almost any standard Python function can be turned into a Prefect flow by adding the@flow
decorator.
Flows are always executed in the main thread by default to facilitate native Python debugging and profiling.
Synchronous functions
The simplest Prefect flow is a synchronous Python function. Here’s an example of a synchronous flow that prints a message:Asynchronous functions
Prefect also supports asynchronous functions. The resulting flows are coroutines that can be awaited or run concurrently, following the standard rules of async Python.Class Methods
Prefect supports synchronous and asynchronous class methods as flows, including instance methods, class methods, and static methods. For class methods and static methods, you must apply the appropriate method decorator above the@flow
decorator:
Generators
Prefect supports synchronous and asynchronous generators as flows. The flow is considered to beRunning
as long as the generator is yielding values. When the generator is exhausted, the flow is considered Completed
. Any values yielded by the generator can be consumed by other flows or tasks.
Generator functions are consumed when returned from flowsThe result of a completed flow must be serializable, but generators cannot be serialized.
Therefore, if you return a generator from a flow, the generator will be fully consumed and its yielded values will be returned as a list.
This can lead to unexpected behavior or blocking if the generator is infinite or very large.Here is an example of proactive generator consumption:If you need to return a generator without consuming it, you can
yield
it instead of using return
.
Values yielded from generator flows are not considered final results and do not face the same serialization constraints:Parameters
As with any Python function, you can pass arguments to a flow including both positional and keyword arguments. These arguments defined on your flow function are called parameters. They are stored by the Prefect orchestration engine on the flow run object. Prefect automatically performs type conversion of inputs using any provided type hints. Type hints provide an easy way to enforce typing on your flow parameters and can be greatly enhanced with Pydantic. Prefect supports any Pydantic model as a type hint within a flow is coerced automatically into the relevant object type:Prefect API requires keyword argumentsWhen creating flow runs from the Prefect API, you must specify parameter names when overriding defaults.
They cannot be positional.
Failed
state.
If a flow run for a deployment receives invalid parameters, it moves from a Pending
state to Failed
without entering a Running
state.
Flow run parameters cannot exceed
512kb
in size.Flow runs
A flow run represents a single execution of the flow. You can create a flow run by calling the flow manually. For example, by running a Python script or importing the flow into an interactive session and calling it. You can also create a flow run by:- Using external schedulers such as
cron
to invoke a flow function - Creating a deployment on Prefect Cloud or a locally run Prefect server
- Creating a flow run for the deployment through a schedule, the Prefect UI, or the Prefect API
LoggingPrefect enables you to log a variety of useful information about your flow and task runs.
You can capture information about your workflows for purposes such as monitoring, troubleshooting, and auditing.
Check out Logging for more information.

RetriesUnexpected errors may occur. For example the GitHub API may be temporarily unavailable or rate limited.
Check out Retries to learn how to make your flows more resilient.
Writing flows
The@flow
decorator is used to designate a flow:
name
parameter value for the flow.
If you don’t provide a name, Prefect uses the flow function name.
Flows and tasksThere’s nothing stopping you from putting all of your code in a single flow function.However, organizing your workflow code into smaller flow and task units lets you take advantage of Prefect features like retries,
more granular visibility into runtime state, the ability to determine final state regardless of individual task state, and more.In addition, if you put all of your workflow logic in a single flow function and any line of code fails, the entire flow fails
and must be retried from the beginning.
You can avoid this by breaking up the code into multiple tasks.You may call any number of other tasks, subflows, and even regular Python functions within your flow.
You can pass parameters to your flow function to use elsewhere in the workflow, and Prefect will report on the progress
and final state of any invocation.Prefect encourages “small tasks.” Each one should represent a single logical step of your workflow.
This allows Prefect to better contain task failures.
Subflows
In addition to calling tasks within a flow, you can also call other flows. Child flows are called subflows and allow you to efficiently manage, track, and version common multi-task logic. Subflows are a great way to organize your workflows and offer more visibility within the UI. Add aflow
decorator to the get_open_issues
function:
Flow settings
Flows allow a great deal of configuration by passing arguments to the decorator. Flows accept the following optional settings.Argument | Description |
---|---|
description | An optional string description for the flow. If not provided, the description is pulled from the docstring for the decorated function. |
name | An optional name for the flow. If not provided, the name is inferred from the function. |
retries | An optional number of times to retry on flow run failure. |
retry_delay_seconds | An optional number of seconds to wait before retrying the flow after failure. This is only applicable if retries is nonzero. |
flow_run_name | An optional name to distinguish runs of this flow; this name can be provided as a string template with the flow’s parameters as variables; you can also provide this name as a function that returns a string. |
task_runner | An optional task runner to use for task execution within the flow when you .submit() tasks. If not provided and you .submit() tasks, the ThreadPoolTaskRunner is used. |
timeout_seconds | An optional number of seconds indicating a maximum runtime for the flow. If the flow exceeds this runtime, it is marked as failed. Flow execution may continue until the next task is called. |
validate_parameters | Boolean indicating whether parameters passed to flows are validated by Pydantic. Default is True . |
version | An optional version string for the flow. If not provided, we will attempt to create a version string as a hash of the file containing the wrapped function. If the file cannot be located, the version will be null. |
name
value for the flow. Here is the optional description
argument
and a non-default task runner.
flow_run_name
.
This setting accepts a string that can optionally contain templated references to the parameters of your flow.
The name is formatted using Python’s standard string formatting syntax:
prefect.runtime
module. For example:
validate_parameters
check that input values conform to the annotated types on the function.
Where possible, values are coerced into the correct type. For example, if a parameter is defined as x: int
and “5” is passed,
it resolves to 5
.
If set to False
, no validation is performed on flow parameters.
Composing flows
A subflow run is created when a flow function is called inside the execution of another flow. The primary flow is the “parent” flow. The flow created within the parent is the “child” flow or “subflow.” Subflow runs behave like normal flow runs. There is a full representation of the flow run in the backend as if it had been called separately. When a subflow starts, it creates a new task runner for tasks within the subflow. When the subflow completes, the task runner shuts down. Subflows block execution of the parent flow until completion. However, asynchronous subflows can run concurrently with AnyIO task groups or asyncio.gather. Subflows differ from normal flows in that they resolve any passed task futures into data. This allows data to be passed from the parent flow to the child easily. The relationship between a child and parent flow is tracked by creating a special task run in the parent flow. This task run mirrors the state of the child flow run. A task that represents a subflow is annotated in itsstate_details
with the presence of a child_flow_run_id
field.
A subflow is identified with the presence of a parent_task_run_id
on state_details
.
You can define multiple flows within the same file.
Whether running locally or through a deployment, you must indicate which flow is the entrypoint for a flow run.
Cancelling subflow runsInline subflow runs, specifically those created without
run_deployment
, cannot be cancelled without cancelling their parent flow run.
If you may need to cancel a subflow run independent of its parent flow run, we recommend deploying it separately and starting it using
the run_deployment function.my_subflow()
as a subflow:
hello_world()
flow (in this example from the file hello.py
) creates a flow run like this:
Subflows or tasks?In Prefect you can call tasks or subflows to do work within your workflow, including passing results from other tasks to your subflow.A common question is: “When should I use a subflow instead of a task?”We recommend writing tasks that do a discrete, specific piece of work in your workflow. For example, calling an API, performing a database operation,
analyzing or transforming a data point.Prefect tasks are well suited for parallel or distributed execution using distributed computation frameworks such as Dask or Ray.
For troubleshooting, the more granular you make your tasks, the easier it is to find and fix issues if a task fails.Subflows enable you to group related tasks within your workflow.
Here are some scenarios where you should choose a subflow rather than calling tasks individually:
- Observability: Subflows, like any other flow run, have first-class observability within the Prefect UI and Prefect Cloud. You’ll see subflow status in the Flow Runs dashboard rather than having to dig down into the tasks within a specific flow run. See Final state determination for examples of using task state within flows.
- Conditional flows: If you have a group of tasks that run only under certain conditions, you can group them within a subflow and conditionally run the subflow rather than each task individually.
- Parameters: Flows have first-class support for parameterization, making it easy to run the same group of tasks in different use cases by simply passing different parameters to the subflow in which they run.
- Task runners: Subflows enable you to specify the task runner used for tasks within the flow. For example, to optimize parallel execution of certain tasks with Dask, group them in a subflow that uses the Dask task runner. You can use a different task runner for each subflow.
Final state determination
Read the documentation about states before proceeding with this section.
- If an exception is raised directly in the flow function, the flow run is marked as failed.
- If the flow does not return a value (or returns
None
), its state is determined by the states of all of the tasks and subflows within it.- If any task run or subflow run failed, then the final flow run state is marked as
FAILED
. - If any task run was cancelled, then the final flow run state is marked as
CANCELLED
.
- If any task run or subflow run failed, then the final flow run state is marked as
- If a flow returns a manually created state, it is used as the state of the final flow run. This allows for manual determination of final state.
- If the flow run returns any other object, then it is marked as completed.
Raise an exception
If an exception is raised within the flow function, the flow is immediately marked as failed.Return none
A flow with no return statement is determined by the state of all of its task runs.
Return a future
If a flow returns one or more futures, the final state is determined based on the underlying states.Return multiple states or futures
If a flow returns a mix of futures and states, the final state is determined by resolving all futures to states, then determining if any of the states are notCOMPLETED
.
Failed
, but the states of each of the returned futures is included in the flow state:
Returning multiple statesWhen returning multiple states, they must be contained in a
set
, list
, or tuple
.
If using other collection types, the result of the contained states are checked.