Documentation Index
Fetch the complete documentation index at: https://prefect-bd373955-pytest-markdown.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
prefect-sqlalchemy
The prefect-sqlalchemy helps you connect to a database in your Prefect flows.
Getting Started
Prerequisites
Install prefect-sqlalchemy
Install or update to the latest version of the prefect-sqlalchemy library and dependencies.
pip install -U prefect-sqlalchemy
pip install -U -pre prefect-sqlalchemy
Register newly installed block types
Register the block types in the prefect-sqlalchemy module to make them available for use.
prefect block register -m prefect_sqlalchemy
Examples
Save credentials to a block
To use the load method on Blocks, you must have a block saved through code or saved through the UI.
from prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, SyncDriver
connector = SqlAlchemyConnector(
connection_info=ConnectionComponents(
driver=SyncDriver.POSTGRESQL_PSYCOPG2,
username="USERNAME-PLACEHOLDER",
password="PASSWORD-PLACEHOLDER",
host="localhost",
port=5432,
database="DATABASE-PLACEHOLDER",
)
)
connector.save("BLOCK_NAME-PLACEHOLDER")
Load the saved block that holds your credentials:
from prefect_sqlalchemy import SqlAlchemyConnector
SqlAlchemyConnector.load("BLOCK_NAME-PLACEHOLDER")
The required arguments depend upon the desired driver. For example, SQLite requires only the driver and database arguments:
from prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, SyncDriver
connector = SqlAlchemyConnector(
connection_info=ConnectionComponents(
driver=SyncDriver.SQLITE_PYSQLITE,
database="DATABASE-PLACEHOLDER.db"
)
)
connector.save("BLOCK_NAME-PLACEHOLDER")
Work with databases in a flow
To set up a table, use the execute and execute_many methods.
Use the fetch_many method to retrieve data in a stream until there’s no more data.
Use the SqlAlchemyConnector as a context manager, to ensure that the SQLAlchemy engine and any connected resources are closed properly after you’re done with them.
Async supportSqlAlchemyConnector supports async workflows. Just be sure to save, load, and use an async driver, as in the example below.from prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, AsyncDriver
connector = SqlAlchemyConnector(
connection_info=ConnectionComponents(
driver=AsyncDriver.SQLITE_AIOSQLITE,
database="DATABASE-PLACEHOLDER.db"
)
)
if __name__ == "__main__":
connector.save("BLOCK_NAME-PLACEHOLDER")
from prefect import flow, task
from prefect_sqlalchemy import SqlAlchemyConnector
@task
def setup_table(block_name: str) -> None:
with SqlAlchemyConnector.load(block_name) as connector:
connector.execute(
"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
)
connector.execute(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
parameters={"name": "Marvin", "address": "Highway 42"},
)
connector.execute_many(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
seq_of_parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Highway 42"},
],
)
@task
def fetch_data(block_name: str) -> list:
all_rows = []
with SqlAlchemyConnector.load(block_name) as connector:
while True:
# Repeated fetch* calls using the same operation will
# skip re-executing and instead return the next set of results
new_rows = connector.fetch_many("SELECT * FROM customers", size=2)
if len(new_rows) == 0:
break
all_rows.append(new_rows)
return all_rows
@flow
def sqlalchemy_flow(block_name: str) -> list:
setup_table(block_name)
all_rows = fetch_data(block_name)
return all_rows
if __name__ == "__main__":
sqlalchemy_flow("BLOCK-NAME-PLACEHOLDER")
from prefect import flow, task
from prefect_sqlalchemy import SqlAlchemyConnector
import asyncio
@task
async def setup_table(block_name: str) -> None:
async with await SqlAlchemyConnector.load(block_name) as connector:
await connector.execute(
"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
)
await connector.execute(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
parameters={"name": "Marvin", "address": "Highway 42"},
)
await connector.execute_many(
"INSERT INTO customers (name, address) VALUES (:name, :address);",
seq_of_parameters=[
{"name": "Ford", "address": "Highway 42"},
{"name": "Unknown", "address": "Highway 42"},
],
)
@task
async def fetch_data(block_name: str) -> list:
all_rows = []
async with await SqlAlchemyConnector.load(block_name) as connector:
while True:
# Repeated fetch* calls using the same operation will
# skip re-executing and instead return the next set of results
new_rows = await connector.fetch_many("SELECT * FROM customers", size=2)
if len(new_rows) == 0:
break
all_rows.append(new_rows)
return all_rows
@flow
async def sqlalchemy_flow(block_name: str) -> list:
await setup_table(block_name)
all_rows = await fetch_data(block_name)
return all_rows
if __name__ == "__main__":
asyncio.run(sqlalchemy_flow("BLOCK-NAME-PLACEHOLDER"))
Resources
Refer to the prefect-sqlalchemy SDK documentation linked in the sidebar to explore all the capabilities of the prefect-sqlalchemy library.
For assistance using SQLAlchemy, consult the SQLAlchemy documentation.