Documentation Index
Fetch the complete documentation index at: https://prefect-bd373955-pytest-markdown.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
The simplest way to create a deployment for your
flow is by calling its serve method.
This method creates a deployment for the flow and starts a long-running process
that monitors for work from the Prefect server.
When work is found, it is executed within its own isolated subprocess.
from prefect import flow
@flow(log_prints=True)
def hello_world(name: str = "world", goodbye: bool = False):
print(f"Hello {name} from Prefect! 🤗")
if goodbye:
print(f"Goodbye {name}!")
if __name__ == "__main__":
# creates a deployment and stays running to monitor for work instructions
generated on the server
hello_world.serve(name="my-first-deployment",
tags=["onboarding"],
parameters={"goodbye": True},
interval=60)
This interface provides the configuration for a deployment (with no
strong infrastructure requirements), such as:
- schedules
- event triggers
- metadata such as tags and description
- default parameter values
Schedules are auto-paused on shutdownBy default, stopping the process running flow.serve will pause the schedule
for the deployment (if it has one).When running this in environments where restarts are expected use thepause_on_shutdown=False flag to prevent this behavior:if __name__ == "__main__":
hello_world.serve(name="my-first-deployment",
tags=["onboarding"],
parameters={"goodbye": True},
pause_on_shutdown=False,
interval=60)
Serve multiple flows at once
Serve multiple flows with the same process using
the serve utility along
with the to_deployment method of flows:
import time
from prefect import flow, serve
@flow
def slow_flow(sleep: int = 60):
"Sleepy flow - sleeps the provided amount of time (in seconds)."
time.sleep(sleep)
@flow
def fast_flow():
"Fastest flow this side of the Mississippi."
return
if __name__ == "__main__":
slow_deploy = slow_flow.to_deployment(name="sleeper", interval=45)
fast_deploy = fast_flow.to_deployment(name="fast")
serve(slow_deploy, fast_deploy)
The behavior and interfaces are identical to the single flow case.
Retrieve a flow from remote storage
You can retrieve flows from remote storage with the
flow.from_source
method.
flow.from_source accepts a git repository URL and an entrypoint pointing to the
flow to load from the repository:
from prefect import flow
my_flow = flow.from_source(
source="https://github.com/PrefectHQ/prefect.git",
entrypoint="flows/hello_world.py:hello"
)
if __name__ == "__main__":
my_flow()
16:40:33.818 | INFO | prefect.engine - Created flow run 'muscular-perch' for flow 'hello'
16:40:34.048 | INFO | Flow run 'muscular-perch' - Hello world!
16:40:34.706 | INFO | Flow run 'muscular-perch' - Finished in state Completed()
A flow entrypoint is the path to the file where the flow is located, and the name of the
flow function separated by a colon.
For additional configuration, such as specifying a private repository,
provide a GitRepository
instead of URL:
from prefect import flow
from prefect.runner.storage import GitRepository
from prefect.blocks.system import Secret
my_flow = flow.from_source(
source=GitRepository(
url="https://github.com/org/private-repo.git",
branch="dev",
credentials={
"access_token": Secret.load("github-access-token").get()
}
),
entrypoint="flows.py:my_flow"
)
if __name__ == "__main__":
my_flow()
You can serve loaded flowsYou can serve flows loaded from remote storage with the same
serve method as local flows:from prefect import flow
if __name__ == "__main__":
flow.from_source(
source="https://github.com/org/repo.git",
entrypoint="flows.py:my_flow"
).serve(name="my-deployment")
When you serve a flow loaded from remote storage, the serving process
periodically polls your remote storage for updates to the flow’s code.
This pattern allows you to update your flow code without restarting the serving
process.