Using resources in assets
Now that you’ve defined a resource, let’s refactor the taxi_trips
asset to use it.
Let’s start by looking at the before and after.
Before adding a resource
The following code shows what the taxi_trips
asset currently looks like, without a resource:
# assets/trips.py
import requests
import duckdb
import os
from dagster_essentials.assets import constants
import dagster as dg
... # other assets
@dg.asset(
deps=["taxi_trips_file"],
)
def taxi_trips() -> None:
query = """
create or replace table taxi_trips as (
select
VendorID as vendor_id,
PULocationID as pickup_zone_id,
DOLocationID as dropoff_zone_id,
RatecodeID as rate_code_id,
payment_type as payment_type,
tpep_dropoff_datetime as dropoff_datetime,
tpep_pickup_datetime as pickup_datetime,
trip_distance as trip_distance,
passenger_count as passenger_count,
total_amount as total_amount
from 'data/raw/taxi_trips_2023-03.parquet'
);
"""
conn = backoff(
fn=duckdb.connect,
retry_on=(RuntimeError, duckdb.IOException),
kwargs={
"database": os.getenv("DUCKDB_DATABASE"),
},
max_retries=10,
)
conn.execute(query)
After adding a resource
And now, after adding a resource, the taxi_trips
asset looks like the following code.
# assets/trips.py
import requests
from dagster_duckdb import DuckDBResource
from dagster_essentials.assets import constants
import dagster as dg
... # other assets
@dg.asset(
deps=["taxi_trips_file"],
)
def taxi_trips(database: DuckDBResource) -> None:
query = """
create or replace table taxi_trips as (
select
VendorID as vendor_id,
PULocationID as pickup_zone_id,
DOLocationID as dropoff_zone_id,
RatecodeID as rate_code_id,
payment_type as payment_type,
tpep_dropoff_datetime as dropoff_datetime,
tpep_pickup_datetime as pickup_datetime,
trip_distance as trip_distance,
passenger_count as passenger_count,
total_amount as total_amount
from 'data/raw/taxi_trips_2023-03.parquet'
);
"""
with database.get_connection() as conn:
conn.execute(query)
To refactor taxi_trips
to use the database
resource, we had to:
Replace the
duckdb
import withfrom dagster_duckdb import DuckDBResource
, which we used to add type hints to the Dagster projectUpdate the
taxi_trips
asset’s function definition to includedatabase: DuckDBResource
. This type hint is required to tell Dagster that the dependency is a resource and not an asset.Replace the lines that connect to DuckDB and execute a query:
conn = backoff( fn=duckdb.connect, retry_on=(RuntimeError, duckdb.IOException), kwargs={ "database": os.getenv("DUCKDB_DATABASE"), }, max_retries=10, ) conn.execute(query)
With these, which uses the
database
resource:with database.get_connection() as conn: conn.execute(query)
Notice that we no longer need to use the
backoff
function. The DagsterDuckDBResource
handles this functionality for us.
Before you continue
Before continuing, make sure you:
- Update
asset/trips.py
with the refactoredtaxi_trips
asset code - Reload the definitions in the Dagster UI
- Rematerialize the
taxi_trips
asset