Retrieve results

When calling flows or tasks, the result is returned directly:
from prefect import flow, task

@task
def my_task():
    return 1

@flow
def my_flow():
    task_result = my_task()
    return task_result + 1

result = my_flow()
assert result == 2
When working with flow and task states, retrieve the result with the State.result() method:
from prefect import flow, task

@task
def my_task():
    return 1

@flow
def my_flow():
    state = my_task(return_state=True)
    return state.result() + 1

state = my_flow(return_state=True)
assert state.result() == 2
When submitting tasks to a runner, the retrieve result with the Future.result() method:
from prefect import flow, task

@task
def my_task():
    return 1

@flow
def my_flow():
    future = my_task.submit()
    return future.result() + 1

result = my_flow()
assert result == 2

Handling failures

Prefect captures all exceptions to report states to the orchestrator, but we do not hide them from you (unless you ask us to) as your program needs to know if an unexpected error has occurred. When calling flows or tasks, the exceptions are raised as in normal Python:
from prefect import flow, task

@task
def my_task():
    raise ValueError()

@flow
def my_flow():
    try:
        my_task()
    except ValueError:
        print("Oh no! The task failed.")

    return True

my_flow()
If you prefer to check for a failed task without using try/except, you may ask Prefect to return the state:
from prefect import flow, task

@task
def my_task():
    raise ValueError()

@flow
def my_flow():
    state = my_task(return_state=True)

    if state.is_failed():
        print("Oh no! The task failed. Falling back to '1'.")
        result = 1
    else:
        result = state.result()

    return result + 1

result = my_flow()
assert result == 2
If you retrieve the result from a failed state, the exception is raised. For this reason, it’s often best to check if the state is failed first.
from prefect import flow, task

@task
def my_task():
    raise ValueError()

@flow
def my_flow():
    state = my_task(return_state=True)

    try:
        result = state.result()
    except ValueError:
        print("Oh no! The state raised the error!")

    return True

my_flow()
When retrieving the result from a state, you can ask Prefect not to raise exceptions:
from prefect import flow, task

@task
def my_task():
    raise ValueError()

@flow
def my_flow():
    state = my_task(return_state=True)

    maybe_result = state.result(raise_on_failure=False)
    if isinstance(maybe_result, ValueError):
        print("Oh no! The task failed. Falling back to '1'.")
        result = 1
    else:
        result = maybe_result

    return result + 1

result = my_flow()
assert result == 2
When submitting tasks to a runner, Future.result() works the same as State.result():
from prefect import flow, task

@task
def my_task():
    raise ValueError()

@flow
def my_flow():
    future = my_task.submit()

    try:
        future.result()
    except ValueError:
        print("Ah! Futures will raise the failure as well.")

    # You can ask it not to raise the exception too
    maybe_result = future.result(raise_on_failure=False)
    print(f"Got {type(maybe_result)}")

    return True

my_flow()

Working with async results

When calling flows or tasks, the result is returned directly:
import asyncio
from prefect import flow, task

@task
async def my_task():
    return 1

@flow
async def my_flow():
    task_result = await my_task()
    return task_result + 1

result = asyncio.run(my_flow())
assert result == 2
When working with flow and task states, you can retrieve the result with the State.result() method:
import asyncio
from prefect import flow, task

@task
async def my_task():
    return 1

@flow
async def my_flow():
    state = await my_task(return_state=True)
    result = await state.result(fetch=True)
    return result + 1

async def main():
    state = await my_flow(return_state=True)
    assert await state.result(fetch=True) == 2

asyncio.run(main())
Resolving resultsPrefect 2.6.0 added automatic retrieval of persisted results. Prior to this version, State.result() did not require an await. For backwards compatibility, when used from an asynchronous context, State.result() returns a raw result type.You may opt-in to the new behavior by passing fetch=True as shown in the example above. To use this behavior automatically, you may enable the PREFECT_ASYNC_FETCH_STATE_RESULT setting. If you do not opt-in to this behavior, you will see a warning.You may also opt out by setting fetch=False. This will silence the warning, but you will need to retrieve your result manually from the result type.
When submitting tasks to a runner, you can retrieve the result with the Future.result() method:
import asyncio
from prefect import flow, task

@task
async def my_task():
    return 1

@flow
async def my_flow():
    future = await my_task.submit()
    result = await future.result()
    return result + 1

result = asyncio.run(my_flow())
assert result == 2

Persisting results

The Prefect API does not store your results except in special cases. Instead, the result is persisted to a storage location in your infrastructure and Prefect stores a reference to the result. The following Prefect features require results to be persisted:
  • Task cache keys
  • Flow run retries
If results are not persisted, these features may not be usable.

Configure persistence of results

Persistence of results requires a serializer and a storage location. Prefect sets defaults which you can adjust for custom behavior. You can configure results on the flow and task decorators with the following options:
  • persist_result: Whether the result should be persisted to storage.
  • result_storage: Where to store the result when persisted.
  • result_serializer: How to convert the result to a storable form.

Toggling persistence

Configure the persistence of the result of a task or flow with the persist_result option. The persist_result option defaults to a null value, which automatically enables persistence if it is needed for a Prefect feature used by the flow or task. Otherwise, persistence is disabled by default. For example, the following flow has retries enabled. Flow retries require that all task results are persisted, so the task’s result will be persisted:
from prefect import flow, task

@task
def my_task():
    return "hello world!"

@flow(retries=2)
def my_flow():
    # This task does not have persistence toggled off and it is needed for the flow feature,
    # so Prefect will persist its result at runtime
    my_task()
Flow retries do not require the flow’s result to be persisted. In this next example, one task has caching enabled. Task caching requires that the given task’s result is persisted:
from prefect import flow, task
from datetime import timedelta

@task(cache_key_fn=lambda: "always", cache_expiration=timedelta(seconds=20))
def my_task():
    # This task uses caching so its result will be persisted by default
    return "hello world!"


@task
def my_other_task():
    ...

@flow
def my_flow():
    # This task uses a feature that requires result persistence
    my_task()

    # This task does not use a feature that requires result persistence and the
    # flow does not use any features that require task result persistence so its
    # result will not be persisted by default
    my_other_task()
You can manually toggle on or off the persistence of results:
from prefect import flow, task

@flow(persist_result=True)
def my_flow():
    # This flow will persist its result even if not necessary for a feature.
    ...

@task(persist_result=False)
def my_task():
    # This task will never persist its result.
    # If persistence needed for a feature, an error will be raised.
    ...
Toggling persistence manually will always override any behavior that Prefect would infer. You may also change Prefect’s default persistence behavior with the PREFECT_RESULTS_PERSIST_BY_DEFAULT setting. To persist results by default, change the value to a truthy value:
prefect config set PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
Task and flows with persist_result=False will not persist their results even if PREFECT_RESULTS_PERSIST_BY_DEFAULT is true.

Result storage location

Configure the result storage location with the result_storage option. The result_storage option defaults to a null value, which infers storage from the context. Generally, this means that tasks will use the result storage configured on the flow unless otherwise specified. If there is no context to load the storage from and results must be persisted, results will be stored in the path specified by the PREFECT_LOCAL_STORAGE_PATH setting (defaults to ~/.prefect/storage).
from prefect import flow, task
from prefect.filesystems import LocalFileSystem, S3

@flow(persist_result=True)
def my_flow():
    my_task()  # This task will use the flow's result storage

@task(persist_result=True)
def my_task():
    ...

my_flow()  # The flow has no result storage configured and no parent, the local file system will be used.


# Reconfigure the flow to use a different storage type
new_flow = my_flow.with_options(result_storage=S3(bucket_path="my-bucket"))

new_flow()  # The flow and task within it will use S3 for result storage.
Configure this to use a specific storage using one of the following:
  • A storage instance. For example, LocalFileSystem(basepath=".my-results")
  • A storage slug. For example, 's3/dev-s3-block'

Result storage key

Configure the path of the result file in the result storage with the result_storage_key. The result_storage_key option defaults to a null value, which generates a unique identifier for each result.
from prefect import flow, task
from prefect.filesystems import LocalFileSystem, S3

@flow(result_storage=S3(bucket_path="my-bucket"))
def my_flow():
    my_task()

@task(persist_result=True, result_storage_key="my_task.json")
def my_task():
    ...

my_flow()  # The task's result will be persisted to 's3://my-bucket/my_task.json'
Result storage keys are formatted with access to all of the modules in prefect.runtime and the run’s parameters. The following example runs a flow with three runs of the same task. Each task run will write its result to a unique file based on the name parameter.
from prefect import flow, task

@flow()
def my_flow():
    hello_world()
    hello_world(name="foo")
    hello_world(name="bar")

@task(persist_result=True, result_storage_key="hello-{parameters[name]}.json")
def hello_world(name: str = "world"):
    return f"hello {name}"

my_flow()
After running the flow, we can see three persisted result files in our storage directory:
$ ls ~/.prefect/storage | grep "hello-"
hello-bar.json
hello-foo.json
hello-world.json
In the next example, we include metadata about the flow run from the prefect.runtime.flow_run module:
from prefect import flow, task

@flow
def my_flow():
    hello_world()

@task(persist_result=True, result_storage_key="{flow_run.flow_name}_{flow_run.name}_hello.json")
def hello_world(name: str = "world"):
    return f"hello {name}"

my_flow()
After running this flow, you can see a result file templated with the name of the flow and the flow run:
❯ ls ~/.prefect/storage | grep "my-flow"    
my-flow_industrious-trout_hello.json
If a result exists at a given storage key in the storage location, it will be overwritten. You can only configure result storage keys on tasks at this time.

Result serializer

You can configure the result serializer with the result_serializer option. The result_serializer option defaults to a null value, which infers the serializer from the context. Generally, this means that tasks will use the result serializer configured on the flow unless otherwise specified. If there is no context to load the serializer from, the serializer defined by PREFECT_RESULTS_DEFAULT_SERIALIZER is used. This setting defaults to Prefect’s pickle serializer. You may configure the result serializer using:
  • A type name. For example, "json" or "pickle" which corresponds to an instance with default values
  • An instance. For example, JSONSerializer(jsonlib="orjson")

Compress results

Prefect provides a CompressedSerializer which can wrap other serializers to provide compression over the bytes they generate. The compressed serializer uses lzma compression by default. We test other compression schemes provided in the Python standard library such as bz2 and zlib, but any compression library that provides compress and decompress methods should work. Configure compression of results using:
  • A type name, prefixed with compressed/. For example, "compressed/json" or "compressed/pickle"
  • An instance. For example, CompressedSerializer(serializer="pickle", compressionlib="lzma")
Note that the "compressed/<serializer-type>" shortcut will only work for serializers provided by Prefect. If you are using custom serializers, you must pass a full instance.

Storage of results in Prefect

The Prefect API does not store your results in most cases for the following reasons:
  • Results can be large and slow to send to and from the API.
  • Results often contain private information or data.
  • Results would need to be stored in the database or complex logic implemented to hydrate from another source.
There are a few cases where Prefect will store your results directly in the database. This is an optimization to reduce the overhead of reading and writing to result storage. The following data types is stored by the API without persistence to storage:
  • booleans (True, False)
  • nulls (None)
If persist_result is set to False, these values will never be stored.

Track results

The Prefect API tracks metadata about your results. The value of your result is only stored in specific cases. View result metadata in the UI on the “Results” page for flows. Prefect tracks the following result metadata:
  • Data type
  • Storage location (if persisted)

Caching of results in memory

When running your workflows, Prefect keeps the results of all tasks and flows in memory so they can be passed downstream. In some cases, it is desirable to override this behavior. For example, if you are returning a large amount of data from a task, it can be costly to keep it in memory for the entire duration of the flow run. Flows and tasks both include an option to drop the result from memory with cache_result_in_memory:
@flow(cache_result_in_memory=False)
def foo():
    return "pretend this is large data"

@task(cache_result_in_memory=False)
def bar():
    return "pretend this is biiiig data"
When cache_result_in_memory is disabled, the result of your flow or task will be persisted by default. The result is then pulled from storage when needed.
@flow
def foo():
    result = bar()
    state = bar(return_state=True)

    # The result will be retrieved from storage here
    state.result()

    future = bar.submit()
    # The result will be retrieved from storage here
    future.result()

@task(cache_result_in_memory=False)
def bar():
    # This result will persisted
    return "pretend this is biiiig data"
If both cache_result_in_memory and persistence are disabled, your results are not available downstream.

@task(persist_result=False, cache_result_in_memory=False)
def bar():
    return "pretend this is biiiig data"

@flow
def foo():
    # Raises an error
    result = bar()

    # This is oaky
    state = bar(return_state=True)

    # Raises an error
    state.result()

    # This is okay
    future = bar.submit()

    # Raises an error
    future.result()

Result storage types

Result storage is responsible for reading and writing serialized data to an external location. You can use any file system block for result storage.

Result serializer types

A result serializer is responsible for converting your Python object to and from bytes. This is necessary to store the object outside of Python and retrieve it later.

Pickle serializer

Pickle is a standard Python protocol for encoding arbitrary Python objects. We supply a custom pickle serializer at prefect.serializers.PickleSerializer. Prefect’s pickle serializer uses the cloudpickle project by default to support more object types. You can specify alternative pickle libraries, such as:
from prefect.serializers import PickleSerializer

PickleSerializer(picklelib="custompickle")
Benefits of the pickle serializer:
  • Many object types are supported.
  • Objects can define custom pickle support.
Drawbacks of the pickle serializer:
  • When nested attributes of an object cannot be pickled, it is hard to determine the cause.
  • When deserializing objects, your Python and pickle library versions must match the one used at serialization time.
  • Serialized objects cannot be easily shared across different programming languages.
  • Serialized objects are not human readable.

JSON serializer

We supply a custom JSON serializer at prefect.serializers.JSONSerializer. Prefect’s JSON serializer uses custom hooks by default to support more object types. Specifically, we add support for all types supported by Pydantic. By default, we use the standard Python json library. You can specify alternative JSON libraries, such as:
from prefect.serializers import JSONSerializer

JSONSerializer(jsonlib="orjson")
Benefits of the JSON serializer:
  • Serialized objects are human readable.
  • You can share serialized objects across different programming languages.
  • Deserialization of serialized objects is generally version agnostic.
Drawbacks of the JSON serializer:
  • Supported types are limited.
  • You must implement support for additional types at the serializer level.

Result types

Prefect uses internal result types to capture information about the result attached to a state. The following types are used:
  • UnpersistedResult: Stores result metadata but the value is only available when created.
  • LiteralResult: Stores simple values inline.
  • PersistedResult: Stores a reference to a result persisted to storage.
All result types include a get() method that can be called to return the value of the result. This is done behind the scenes when the result() method is used on states or futures.

Unpersisted results

Unpersisted results represent results that are not persisted beyond the current flow run. The value associated with the result is stored in memory, but is not available later. Result metadata is attached to this object for storage in the API and representation in the UI.

Literal results

Literal results represent results stored in the Prefect database. The values contained by these results must be JSON serializable. Example:
result = LiteralResult(value=None)
result.json()
# {"type": "result", "value": "null"}
Literal results reduce the overhead required to persist simple results.

Persisted results

The persisted result type contains all of the information needed to retrieve the result from storage. This includes:
  • Storage: A reference to the result storage to read the serialized result.
  • Key: Indicates where this specific result is in storage.
Persisted result types also contain metadata for inspection without retrieving the result: The get() method on result references retrieves the data from storage, deserializes it, and returns the original object. The get() operation caches the resolved object to reduce the overhead of subsequent calls.

Persisted result blob

When results are persisted to storage, they are always written as a JSON document. The schema for this is described by the PersistedResultBlob type. The document contains:
  • The serialized data of the result.
  • A full description of result serializer to deserialize the result data.
  • The Prefect version used to create the result.