State change hooks execute code in response to changes in flow or task run states, enabling you to define actions for specific state transitions in a workflow.

Available state change hooks

TypeFlowTaskDescription
on_completionExecutes when a flow or task run enters a Completed state.
on_failureExecutes when a flow or task run enters a Failed state.
on_cancellation-Executes when a flow run enters a Cancelling state.
on_crashed-Executes when a flow run enters a Crashed state.
on_running-Executes when a flow run enters a Running state.

Flow run state change hooks

def my_flow_hook(flow: Flow, flow_run: FlowRun, state: State):
    """This is the required signature for a flow run state
    change hook. This hook can only be passed into flows.
    """

# pass hook as a list of callables
@flow(on_completion=[my_flow_hook])

Task run state change hooks

def my_task_hook(task: Task, task_run: TaskRun, state: State):
    """This is the required signature for a task run state change
    hook. This hook can only be passed into tasks.
    """

# pass hook as a list of callables
@task(on_failure=[my_task_hook])

Use multiple state change hooks

State change hooks are versatile, allowing you to specify multiple state change hooks for the same state transition, or to use the same state change hook for different transitions:
def my_success_hook(task, task_run, state):
    print("Task run succeeded!")

def my_failure_hook(task, task_run, state):
    print("Task run failed!")

def my_succeed_or_fail_hook(task, task_run, state):
    print("If the task run succeeds or fails, this hook runs.")

@task(
    on_completion=[my_success_hook, my_succeed_or_fail_hook],
    on_failure=[my_failure_hook, my_succeed_or_fail_hook]
)

Pass kwargs to state change hooks

The Prefect engine will call your hooks for you upon the state change, passing in the flow, flow run, and state objects. However, you can define your hook to have additional default arguments:
from prefect import flow

data = {}

def my_hook(flow, flow_run, state, my_arg="custom_value"):
    data.update(my_arg=my_arg, state=state)

@flow(on_completion=[my_hook])
def lazy_flow():
    pass

state = lazy_flow(return_state=True)

assert data == {"my_arg": "custom_value", "state": state}
… or define your hook to accept arbitrary keyword arguments:
from functools import partial
from prefect import flow, task

data = {}

def my_hook(task, task_run, state, **kwargs):
    data.update(state=state, **kwargs)

@task
def bad_task():
    raise ValueError("meh")

@flow
def ok_with_failure_flow(x: str = "foo", y: int = 42):
    bad_task_with_a_hook = bad_task.with_options(
        on_failure=[partial(my_hook, **dict(x=x, y=y))]
    )
    # return a tuple of "bar" and the task run state
    # to avoid raising the task's exception
    return "bar", bad_task_with_a_hook(return_state=True)

_, task_run_state = ok_with_failure_flow()

assert data == {"x": "foo", "y": 42, "state": task_run_state}

Common uses of state change hooks

Below are two common real-world applications of state change hooks.

Send a notification when a flow run fails

State change hooks enable you to customize messages sent when tasks transition between states, such as sending notifications containing sensitive information when tasks enter a Failed state. Here’s an example of running a client-side hook upon a flow run entering a Failed state:
from prefect import flow
from prefect.blocks.core import Block
from prefect.settings import PREFECT_API_URL

def notify_slack(flow, flow_run, state):
    slack_webhook_block = Block.load(
        "slack-webhook/my-slack-webhook"
    )
            
    slack_webhook_block.notify(
        (
            f"Your job {flow_run.name} entered {state.name} "
            f"with message:\n\n"
            f"See <https://{PREFECT_API_URL.value()}/flow-runs/"
            f"flow-run/{flow_run.id}|the flow run in the UI>\n\n"
            f"Tags: {flow_run.tags}\n\n"
            f"Scheduled start: {flow_run.expected_start_time}"
        )
    )

@flow(on_failure=[notify_slack], retries=1)
def failing_flow():
    raise ValueError("oops!")

if __name__ == "__main__":
    failing_flow()
Note that retries are configured in this example. This means the on_failure hook does not run until all retries have completed when the flow run enters a Failed state.

Delete a Cloud Run job when a flow run crashes

State change hooks help manage infrastructure cleanup in scenarios where tasks spin up individual infrastructure resources independently of Prefect. When a flow run crashes, tasks may exit abruptly and result in the potential omission of cleanup logic within the tasks. Use state change hooks to ensure infrastructure is properly cleaned up, even when a flow run enters a Crashed state. Here’s how to create a hook that deletes a Cloud Run job if the flow run crashes:
import os
from prefect import flow, task
from prefect.variables import Variable
from prefect.client.orchestration import get_client
import prefect.runtime

async def delete_cloud_run_job(flow, flow_run, state):
    """Flow run state change hook that deletes a Cloud Run Job if
    the flow run crashes."""

    # retrieve Cloud Run job name
    cloud_run_job_name = await Variable.get(
        name="crashing-flow_cloud_run_job"
    )

    # delete Cloud Run job
    delete_job_command = f"yes | gcloud beta run jobs delete 
    {cloud_run_job_name.value} --region us-central1"
    os.system(delete_job_command)


@task
def my_task_that_crashes():
    raise SystemExit("Crashing on purpose!")

@flow(on_crashed=[delete_cloud_run_job])
def crashing_flow():
    """Save the flow run name (i.e. Cloud Run job name) as a 
    Variable. It then executes a task that ends up crashing."""
    flow_run_name = prefect.runtime.flow_run.name
    cloud_run_job_name = Variable.set(name="crashing_flow_cloud_run_job", value=flow_run_name, overwrite=True)

    my_task_that_crashes()

if __name__ == "__main__":
    crashing_flow()