In Prefect, a task fails if its Python function raises an exception. To enable retries, pass retries and retry_delay_seconds parameters to your task. If the task fails, Prefect will retry it up to the number of retries times, waiting retry_delay_seconds seconds between each attempt. If the task fails on the final retry, Prefect marks the task as crashed if the task raised an exception; or failed if it returned a string.
Retries do not create new task runsA new task run is not created when a task is retried. A new state is added to the state history of the original task run.

Example: making an API request

Consider the real-world problem of making an API request. This example uses the httpx library to make an HTTP request.
import httpx

from prefect import flow, task


@task(retries=2, retry_delay_seconds=5)
def get_data_task(
    url: str = "https://api.brittle-service.com/endpoint"
) -> dict:
    response = httpx.get(url)
    
    # If the response status code is anything but a 2xx, httpx will raise
    # an exception. This task doesn't handle the exception, so Prefect will
    # catch the exception and will consider the task run failed.
    response.raise_for_status()
    
    return response.json()
    

@flow
def get_data_flow():
    get_data_task()
In this task, if the HTTP request to the brittle API receives any status code other than a 2xx, Prefect will retry the task a maximum of two times, waiting five seconds in between retries.

Custom retry behavior

The retry_delay_seconds option accepts a list of delays for more custom retry behavior. The following task waits for successively increasing intervals of 1, 10, and 100 seconds, respectively, before the next attempt starts:
from prefect import task

@task(retries=3, retry_delay_seconds=[1, 10, 100])
def some_task_with_manual_backoff_retries():
   ...
The retry_condition_fn option accepts a callable that returns a boolean. If the callable returns True, the task is retried. If the callable returns False, the task is not retried. The callable accepts three arguments: the task, the task run, and the state of the task run. The following task retries on HTTP status codes other than 401 or 404:
import httpx
from prefect import flow, task

def retry_handler(task, task_run, state) -> bool:
    """This is a custom retry handler to handle when we want to retry a task"""
    try:
        # Attempt to get the result of the task
        state.result()
    except httpx.HTTPStatusError as exc:
        # Retry on any HTTP status code that is not 401 or 404
        do_not_retry_on_these_codes = [401, 404]
        return exc.response.status_code not in do_not_retry_on_these_codes
    except httpx.ConnectError:
        # Do not retry
        return False
    except:
        # For any other exception, retry
        return True

@task(retries=1, retry_condition_fn=retry_handler)
def my_api_call_task(url):
    response = httpx.get(url)
    response.raise_for_status()
    return response.json()

@flow
def get_data_flow(url):
    my_api_call_task(url=url)

if __name__ == "__main__":
    get_data_flow(url="https://httpbin.org/status/503")
Additionally, you can pass a callable that accepts the number of retries as an argument and returns a list. Prefect includes an exponential_backoff utility. This utility automatically generates a list of retry delays that correspond to an exponential backoff retry strategy. The following flow waits for 10, 20, then 40 seconds before each retry:
from prefect import task
from prefect.tasks import exponential_backoff

@task(retries=3, retry_delay_seconds=exponential_backoff(backoff_factor=10))
def some_task_with_exponential_backoff_retries():
   ...

Advanced topic: adding “jitter”

With exponential backoff, you may want to add jitter to the delay times. Jitter is a random amount of time added to retry periods that helps prevent “thundering herd” scenarios. This is when many tasks all retry at the exact same time, potentially overwhelming systems. You can use the retry_jitter_factor option to add variance to the base delay. For example, a retry delay of 10 seconds with a retry_jitter_factor of 0.5 allows a delay up to 15 seconds. Large values of retry_jitter_factor provide more protection against thundering herds, while keeping the average retry delay time constant. For example, the following task adds jitter to its exponential backoff so the retry delays will vary up to a maximum delay time of 20, 40, and 80 seconds respectively:
from prefect import task
from prefect.tasks import exponential_backoff

@task(
    retries=3,
    retry_delay_seconds=exponential_backoff(backoff_factor=10),
    retry_jitter_factor=1,
)
def some_task_with_exponential_backoff_retries():
   ...

Configure retry behavior globally with settings

You can set retries and retry delays with the right global settings. These settings do not override the retries or retry_delay_seconds that are set in the flow or task decorator.
prefect config set PREFECT_FLOW_DEFAULT_RETRIES=2
prefect config set PREFECT_TASK_DEFAULT_RETRIES=2
prefect config set PREFECT_FLOW_DEFAULT_RETRY_DELAY_SECONDS = [1, 10, 100]
prefect config set PREFECT_TASK_DEFAULT_RETRY_DELAY_SECONDS = [1, 10, 100]