Back

Jun 28, 2023

TileDB Cloud: Task Graphs

Data Science
14 min read
Seth Shelnutt

Seth Shelnutt

CTO, TileDB

One of the key features of TileDB Cloud is its totally serverless, distributed computing framework, which allows users to easily implement their sophisticated workloads (from pipelines to ETL, SQL queries, and scientific computations), leveraging the power of the cloud with extreme cost savings.

This computing framework is architected using user-defined functions (UDFs) that were covered in a previous blog post, and “task graphs”, which are built on top of UDFs and are the topic of this blog post. Think of TileDB Cloud’s task graphs as a serverless version of Dask’s delayed package, augmented with additional functionality such as multi-language support and heterogeneous computing.

Task graphs are only part of TileDB Cloud (there is no open-source version of this functionality currently), so the contents of this tutorial can be reproduced solely inside TileDB Cloud (sign up today and you will receive free credits). On the upside, with TileDB Cloud, you never need to worry about setting up and/or sizing clusters anymore. Before embarking on this tutorial, we strongly recommend you read our blog post on UDFs first.

Task graphs at a high level

Task graphs can run in two modes: real-time and batch. TileDB Cloud provides these modes so you can optimize the user experience for low-latency analysis, or optimize costs for long-running batch jobs like data ingestion and heavy query workloads. Task graphs are designed to scale to thousands of tasks within a single task graph, and to millions of concurrent tasks across all users in the system.

Task graphs use the same default environments as TileDB Cloud UDFs, providing a variety of Python and R versions and packages.

Real-time

Real-time task graphs are built on the foundations of UDFs. This provides the capabilities for running arbitrary computations and getting the results back in the same request. For real-time task graphs, TileDB maintains warm Kubernetes pods to handle and serve requests expeditiously. Building out a task graph in Python or R involves using one of our APIs, either the Delayed API, or the DAG API, both of which automatically handle mappings of input and output parameters. Real-time task graphs are driven by the client and require maintaining a constant connection for the duration of the task graph.

Real-time task graphs offer the same computational environments as UDFs. We offer 2 GB of RAM and 2 CPUs as the default, and 8 CPUs and 8 GB of RAM for large configurations.

Batch

Batch task graphs are built using Argo Workflows. Argo Workflows provide the orchestration layer for running asynchronous batch jobs. Batch task graphs aim to allow you to run large pipelines that need a heterogeneous compute environment and might run for hours. Resources are allocated and launched when the batch graph is submitted to TileDB Cloud. Depending on what type of resources are requested, the task graph might be queued while waiting for resources to become available. Results are not returned in real-time, but they can be stored and retrieved asynchronously. Building a batch task graph uses the same APIs as a real-time task graph, but with the mode set to batch. This makes it easy to experiment and start with small real-time workflows, and then scale up and out to massive asynchronous workflows with batch.

Batch task graphs also let you specify the CPU and memory requirements for each task in the graph. You can also specify GPUs, mixing and matching resource configurations at each stage to optimize processing.

Your first real-time task graph

If you've ever used the Python futures or Dask delayed APIs, TileDB Cloud's delayed task graph API has similar semantics. You can implement a simple one-function graph like so:

# Wrap numpy median in a delayed object
x = Delayed(numpy.median)

# It can be called like a normal function to set the parameters.
# Note at this point the function does not get executed since it
# is of "delayed" type
x([1,2,3,4,5])

# To initiate execution and get the result call `compute()`
print(x.compute())

Heterogeneous task graphs

You can create graphs mixing different resource configurations, as well as different programming languages. Let's build a more involved graph than above using generic UDFs, array UDFs and even serverless SQL.

# Build several delayed objects to define a graph
# Note that package numpy is aliased as np in the UDFs
l_func = Delayed(lambda x: x * 2, name="l_func")(100)
array_apply = DelayedArrayUDF("tiledb://TileDB-Inc/quickstart_sparse",
        lambda x: np.sum(x["a"]), name="array_apply")([(1, 4), (1, 4)])
sql = DelayedSQL("select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`", name="sql")

# Custom function for averaging all the results we are passing in
def mean(l_func, array_apply, sql):
    return np.mean([l_func, array_apply, sql.iloc(0)[0]])

# This is essentially a task graph that looks like
#                mean
#          /      |    \
#         /       |     \    
#   l_func  array_apply  sql
#
# The `l_func`, `array_apply` and `sql` tasks will computed first,
# and once all three are finished, `mean` will computed on their results
res = Delayed(func_exec=mean, name="node_exec")(l_func, array_apply, sql)
print(res.compute())

Visualizing execution

There is a handy JupyterLab plugin to show a visual representation of the graph, and also provide live monitoring and details.

If we visualize the previous graph, here is what it looks like:

# View the graph representation right in the notebook environment
res.visualize()

DAG API

In addition to the delayed API, TileDB Cloud offers a lower-level DAG API that provides additional flexibility for constructing task graphs. Let's revisit the previous example task graph.

# This is the same implementation which backs `Delayed`, but this interface
# is better suited to more advanced use cases where full control is desired.
graph = dag.DAG()

# Define a graph
# Note that package numpy is aliased as np in the UDFs
l_func = graph.submit(lambda x: x * 2, 100, name="l_func")
array_apply = graph.submit_array_udf("tiledb://TileDB-Inc/quickstart_sparse",
    lambda x: np.sum(x["a"]),
       name="array_apply", ranges=[(1, 4), (1, 4)])
sql = graph.submit_sql("select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`", name="sql")

# Custom function for averaging all the results we are passing in
def mean(l_func, array_apply, sql):
    return np.mean([l_func, array_apply, sql.iloc(0)[0]])

# This is essentially a task graph that looks like
#                mean
#          /      |    \
#         /       |     \    
#   l_func  array_apply  sql
#
# The `l_func`, `array_apply` and `sql` tasks will computed first,
# and once all three are finished, `mean` will computed on their results
res = graph.submit(func=mean, name="node_exec", l_func=l_func, array_apply=array_apply, sql=sql)
graph.compute()
graph.wait()

print(res.result())

You'll notice there are equivalent functions between the delayed and DAG APIs.

+-----------------+------------------------+
|   Delayed API   |        DAG API         |
+-----------------+------------------------+
| Delayed         | graph.submit           |
| DelayedArrayUDF | graph.submit_array_udf |
| DelayedSQL      | graph.submit_sql       |
+-----------------+------------------------+

Batch task graph

By default, task graphs run in real-time mode. Switching a task graph to run in batch is only a single parameter change: mode.

Let's take the initial example of a simple delayed wrapper on numpy.median. Using the Delayed API we pass a parameter to the Delayed function:

# Wrap numpy median in a delayed object
x = Delayed(numpy.median, mode=dag.Mode.BATCH)

# It can be called like a normal function to set the parameters
# Note at this point the function does not get executed since it
# is of "delayed" type
x([1,2,3,4,5])

# To initiate execution and get the result call `compute()`
print(x.compute())

Using the DAG API, we just need to set the mode when constructing the DAG object:

# Create graph
graph = dag.DAG(mode=dag.Mode.BATCH)

# Wrap numpy median in a delayed object
x = graph.submit(np.median, [1,2,3,4,5])

# To initiate execution and get the result call `compute()`
graph.compute()
graph.wait()
print(x.result())

Specifying resources

Task graphs let you specify resources for each task independently. This lets you use GPUs only for tasks that require it, reducing the cost of the workflow. You can also use more memory or CPUs only when required and smaller resources on less important tasks.

Each API has a simple parameter to specify the resources. For real-time graphs it is similar to UDFs: resource_class.

# Wrap numpy median in a delayed object
x = Delayed(numpy.median, resource_class='large')

# It can be called like a normal function to set the parameters
# Note at this point the function does not get executed since it
# is of "delayed" type
x([1,2,3,4,5])

# To initiate execution and get the result call `compute()`
print(x.compute())

Batch mode resources with custom resources:

# Wrap numpy median in a delayed object
x = Delayed(numpy.median, mode=tiledb.cloud.dag.Mode.BATCH, resources={'cpu': '8', 'memory': '8Gi'})

# It can be called like a normal function to set the parameters
# Note at this point the function does not get executed since it
# is of "delayed" type
x([1,2,3,4,5])

# To initiate execution and get the result call `compute()`
print(x.compute())

GPUs

Batch task graphs support GPUs to be used in any task. For TileDB Cloud SaaS, we currently offer Nvidia K80 GPUs. We are planning on expanding GPU support to different types in the near future and would love to hear your input. On-Prem deployments of TileDB Cloud have full flexibility in defining different GPU configurations.

One or more GPUs can be requested using the same resources parameter as CPUs. The parameter gpus lets you set the number you'd like attached to the job. Up to 16 GPUs can be used on a single task.

# Create graph
graph = dag.DAG(mode=dag.Mode.BATCH)

# Wrap numpy median in a delayed object
x =graph.submit(np.median, [1,2,3,4,5], resources={'cpu': '8', 'memory': '8Gi', 'gpu': 1})

# To initiate execution and get the result call `compute()`
graph.compute()
graph.wait()
print(x.result())

Accessing storage for ingestion

Batch task graphs let you access storage, such as Amazon S3, if you have configured an IAM role (or equivalent) and you have marked it as "Allow to run tasks and code" in the TileDB Cloud credential settings. You can simply pass the parameter access_credential_name and then you'll be able to access S3. This is helpful and often used when you are ingesting data from S3 into TileDB arrays.

def my_ingestor():
    import boto3 
    sts_client = boto3.client('sts')
    account = sts_client.get_caller_identity()
    return account

# Create graph
graph = dag.DAG(mode=dag.Mode.BATCH)

# Wrap numpy median in a delayed object
x =graph.submit(my_ingestor, access_credentials_name=="my_role")

# To initiate execution and get the result call `compute()`
graph.compute()
graph.wait()
print(x.result())

Details and Logs

Task graph activity is logged in TileDB Cloud, and you can retrieve all details about them, including the individual task logs and code or SQL that was run.

# Get ID of graph that was launched
print(graph.server_graph_uuid)

# Get status
print(graph.status)

# Get completion details
print(graph.stats())

All the information is also logged directly in the TileDB Cloud UI, and can be accessed by anyone with proper permissions. This makes it easy to share and collaborate as you are building out pipelines and running production workflows.

example_dag_002.png

Registering a task graph

You can register a task graph, and then call it later for execution or share with access policies with other collaborators. This works similarly to registration of a user-defined function. Let's take the same example from above and register it now.

# This is the same implementation which backs `Delayed`, but this interface
# is better suited to more advanced use cases where full control is desired.
graph = builder.TaskGraphBuilder(name="Registration Example")

# Define a graph
# Note that package numpy is aliased as np in the UDFs
l_func = graph.submit(lambda x: x * 2, 100, name="l_func")
array_apply = graph.array_udf("tiledb://TileDB-Inc/quickstart_sparse",
    lambda x: np.sum(x["a"]),
       name="array_apply", ranges=[(1, 4), (1, 4)])
sql = graph.sql("select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`", name="sql")

# Custom function for averaging all the results we are passing in
def mean(l_func, array_apply, sql):
    return np.mean([l_func, array_apply, sql.iloc(0)[0]])

# This is essentially a task graph that looks like
#                mean
#          /      |    \
#         /       |     \    
#   l_func  array_apply  sql
#
# The `l_func`, `array_apply` and `sql` tasks will computed first,
# and once all three are finished, `mean` will computed on their results
res = graph.udf(func=mean, name="node_exec", types.args(l_func=l_func, array_apply=array_apply, sql=sql))

# Now let's register the dag instead of running it
tiledb.cloud.taskgraphs.register(dag, name="registration-example")

# To call the dag we simply load it, then execute.
tiledb.cloud.taskgraphs.registration.load("registration-example", namespace="TileDB-Inc")
results = tiledb.cloud.taskgraphs.execute(new_tgb)

You can register any complex graph, with any number of input nodes and parameters. For instance, here is a genomics workflow where you can specify the input gene and cohort (country and sex).

genomics_workflow_003.png

Large-scale examples

NYC Yellow Taxi Cab data ingestion

This example notebook showcases distributed CSV ingestion of the NYC Yellow Taxi Cab data. We ingested 69 CSVs across June 2016 through February 2022.

The ingestion takes about 3 minutes to load the 36 GB of raw uncompressed CSV data. Of this time, about 2 minutes is spent with pandas to parse and manipulate the various formats of the CSVs to normalize them for insertion. The total cost of this ingestion on TileDB Cloud SaaS is $0.419!

NYC Taxi Data Ingestion

nyc_taxi_ingestion_004.png

NYC taxi aggregation

This notebook shows running large-scale distributed aggregations against the NYC taxi data.

Performing a distributed average, we are able to scan over 840 million records in 22 seconds yielding 38 million records per second! The whole query cost $0.0493 on TileDB Cloud SaaS.

See the full notebook on TileDB Cloud: NYC Taxi Data Large Aggregations.

nyc_taxi_distributed_avg_005.png

What’s Next?

In this blog post, I covered the fundamentals of TileDB Cloud’s serverless distributed computing framework based on task graphs. There are many more possible combinations and use cases for task graphs, including large-scale ingestion of data, massive analysis on petabyte-scale data, running genomics pipelines, ML training and more. There are also more APIs and parameters that provide additional flexibility for building out your workloads.

If you’d like to learn more, I recommend checking out some of our real-world task graph use cases, including:

Run them for yourself, and let us know what you think! Also follow us on Twitter and LinkedIn, join our Slack community, or contact us with your exciting use cases.

Want to see TileDB Cloud in action?
Seth Shelnutt

Seth Shelnutt

CTO, TileDB