🌀 Prefect + Makoto Integration Concept

Flow + task state hooks turn Prefect into a DBOM publisher.

Note: This page explores how Makoto Levels could be implemented on Prefect. It is a conceptual integration proposal — illustrative, not a shipped library. The patterns shown use real Prefect APIs; the Makoto pieces are sketches you (or we) could build out.

What is Prefect?

Prefect's task and flow state-change hooks are a perfect surface for Makoto. On every successful task completion, a hook computes content hashes of returned artifacts, signs the result, and writes an attestation to the artifact store.

State Hooks`on_completion`, `on_failure` per task and flow
Artifacts APIPersistent artifacts gain DBOM siblings
Result PersistenceResult hashes are already computed for caching
Deployment TagsIssuer identity flows from deployment metadata

Integration Approach

Primary pattern: State hooks + Result serializer + custom Artifact type. Below are the integration options ordered by lift required.

How Makoto attaches to Prefect

  • makoto-prefect library — Install with `pip install makoto-prefect`. Drops in `@flow(on_completion=[makoto.attest])`.
  • MakotoResult serializer — Custom result serializer that hashes + signs as it writes to S3/GCS.
  • State hook callback — Universal `on_completion` callback that you attach via `prefect deployment` config.
  • Artifact type — First-class `MakotoArtifact` shows the signed DBOM in the Prefect UI.

Conceptual Code Example

Concept: Prefect flow with Makoto attestation hooks

One decorator, one hook — every task result gets a signed receipt

from prefect import flow, task
from makoto_prefect import attest, MakotoResult

@task(result_serializer=MakotoResult(level=2))
def extract_orders():
    """Pull from operational store."""
    return read_postgres("SELECT * FROM orders WHERE updated_at > yesterday")

@task(
    result_serializer=MakotoResult(level=2),
    on_completion=[attest.transform(signing_key="kms://...")],
)
def hash_pii(orders):
    """Hash emails, keep paid orders only."""
    return (
        orders.assign(email_hash=orders.email.apply(sha256))
              .query("status == 'paid'")
              .drop(columns=["email"])
    )

@flow(
    name="sales-etl",
    on_completion=[attest.flow_dbom(signing_key="kms://...")],
)
def sales_etl():
    raw = extract_orders()
    clean = hash_pii(raw)
    publish_to_s3(clean)

# On flow completion:
#   - per-task Transform attestations written via MakotoResult
#   - flow-level DBOM aggregates all task attestations
#   - DSSE envelope signed via configured key
#   - visible in Prefect UI as a MakotoArtifact

Potential Use Cases

Hybrid-cloud Pipelines

Prefect workers across clouds — DBOMs prove the data crossed safely.

Per-task Retry Audit

Every retry of a task generates its own attestation — perfect debugging trail.

Subflow Composition

Subflow DBOMs nest into parent flow DBOMs automatically.

Compliance Dashboards

Prefect UI shows attestations alongside the runs that produced them.

Interested in Prefect + Makoto?

This is a conceptual integration. If you're shipping Prefect pipelines and want to add Makoto attestations, open an issue or reach out — we'd love to scope a real implementation.

Learn about Prefect Read Makoto Spec All Integrations