Using deferred tasks
Prefect tasks are Python functions that can be run immediately or deferred for background execution. Define a task by adding the@task
decorator to a Python function, and use the delay
method to
run the task in the background.
If you schedule the task for background execution, you can run a task worker in a separate process or container to execute
the task. This process is similar to a Celery worker or an arq worker.
Defining a task
Add the@task
decorator to a Python function to define a Prefect task:
Calling tasks
You can call a task to run it immediately, or you can defer the task by scheduling it for background execution withTask.delay
.
You can submit tasks to a task runner such as Ray or Dask within
a workflow, which in Prefect is called a flow. However, this guide focuses on deferring task execution outside of
workflows. For example, by calling
my_task.delay()
within a web application.delay
:
Executing deferred tasks with a task worker
To run tasks in a separate process or container, start a task worker. The task worker continually receives instructions to execute deferred tasks from Prefect’s API, executes them, and reports the results back to the API.Task workers only run deferred tasks, not tasks you call directly as normal Python functions.
prefect.task_worker.serve()
method:
tasks.py
prefect task serve
to start a task worker:
Guided exploration of deferred tasks and task workers in Prefect
Here are some examples of using deferred tasks and task workers in Prefect. You will:- run a Prefect task in the foreground by calling it
- start a task worker and defer tasks so that they run in the background
- create a basic FastAPI application that defers tasks when you hit an endpoint
- use Docker in two examples that mimic real use cases
Set up
Step 1: Activate a virtual environment
This example uses conda, but any virtual environment manager will work.Step 2: Install Python dependencies
Step 3: Connect to Prefect Cloud or a local Prefect server instance
You can use either Prefect Cloud or a local Prefect server instance for these examples (if not set already). You must havePREFECT_API_URL
set to send tasks to task workers.
If you’re using a local Prefect server instance with a SQLite backing database (the default database),
save this value to your active Prefect Profile with the following command:
PREFECT_API_URL
value to the Prefect Cloud API URL and add your
API key.
The examples that use docker (examples 4 and 5) use a local Prefect server instance by default.
You can switch to Prefect Cloud by changing the PREFECT_API_URL
and adding a variable for your API key in the
docker-compose.yaml
.
Or use a local server instance backed by a PostgreSQL database by setting the PREFECT_API_DATABASE_CONNECTION_URL
.
If using a local Prefect server instance instead of Prefect Cloud, start your server by running the following command:
Step 4: Clone the repository (optional)
Clone the repository to get the code files for the examples:Example 1: Run a Prefect task in the foreground by calling it
Add the@task
decorator to any Python function to define a Prefect task.
Step 1: Create a file named greeter.py
Create a file named greeter.py
and save the following code in it, or run the existing file in the
basic-examples directory.
Step 2: Run the script in the terminal
Optional
You can see the task run in the UI. If you’re using a self-hosted Prefect Server instance, you can also see the task runs in the database. If you want to inspect the SQLite database, use your favorite interface. DB Browser for SQLite is explained below. Download it here, if needed. Install it and open it. Click Connect. Then navigate to your SQLite DB file. It’s located in~/.prefect
directory by default.
Go to the task_run
table to see all your task runs there.
Scroll down to see your most recent task runs or filter for them.
Hit the refresh button for updates, if needed.
Example 2: Start a task worker and run deferred tasks in the background
This example starts a task worker and runs deferred tasks in the background. To run tasks in a separate process or container, you need to start a task worker, similar to how you would run a Celery worker or an arq worker. The task worker continually receives scheduled tasks to execute from Prefect’s API, executes them, and reports the results back to the API. Run a task worker by passing tasks into theprefect.task_worker.serve()
method.
Step 1: Define the task and task worker in the file task_worker.py
Step 2: Start the task worker by running the script in the terminal
my_background_task
task.
Step 3: Create a file named task_scheduler.py
and save the following code in it:
delay
method. You can use this object to wait for the task to complete with
wait()
and to retrieve its result with result()
.
You can also see the task run’s UUID and other information about the task run.
Step 5: See the task run in the UI
Use the task run UUID to see the task run in the UI. The URL will look like this:http://127.0.0.1:4200/task-runs/task-run/my_task_run_uuid_goes_here
Substitute your UUID at the end of the URL.
Step 6: Use multiple task workers to run tasks in parallel.
Start another instance of the task worker. In another terminal run:Step 7: Send multiple tasks to the task worker
Modify thetask_scheduler.py
file to send multiple tasks to the task worker with different inputs: