Tapis ETL

Tapis ETL is a covenience layer on top of Tapis Workflows that enables users to create automated ETL pipelines with a single request. Tapis ETL leverages Tapis Workflow resources (pipeline and tasks) to manage the entire ETL process; from ingesting data files from the configured source system, to tracking their status as they are processed by user-defined batch computing jobs and transferring the results to the configured destination system.


Overview

Introduction

An ETL pipeline (Extract, Trasform, Load) is a workflow pattern commonly used in scientific computing wherein data is extracted from one or more sources, transformed by some set of processes, and output data produced by those processes are archived to some final destination.

Tapis ETL is a framework built on top of Tapis Workflows that enables users to create fully automated and parallel ETL Pipelines in the Tapis ecosystem. Once a pipeline is configured, simply drop off data and manifest files on the source system. Tapis ETL will run the HPC jobs according to the manifests and archive the results to the configured destination system. For an example of how to get started, jump to the quickstart section.

Glossary

Here we introduce the Tapis ETL standard terminology for ETL pipelines

Note

Local denotes any resource (systems, files, etc) situated at TACC. Remote denotes any resource (systems, files, etc) NOT situated at TACC.

  • Manifests - Files that track the progress of one or more data files through the various phases of an ETL Pipeline. They are responsible for managing and mantaining the state between pipeline runs.

  • Remote Outbox - The source system that contains the data files you want to process. This system is commonly a Globus endpoint or S3 bucket.

  • Local Inbox - The system where data is processed

  • ETL Jobs - Batch computing jobs that will be run sequentially on HPC/HTC systems against the data files in a manifest.

  • Local Outbox - The system where the output will be staged for transfer to the destination system

  • Remote Inbox - The destination system where output data will be archived

  • ETL System Configuration - A collection of systems responsible for storage of data and manifest files. It is the general term for remote and local inboxes and outboxes.

  • Data System - Component system of an ETL System Configuration responsible for the storage of data.

  • Manifests System - Component system of an ETL System Configuration responsible for the storage of manifests.

  • Phase - A distinct stage of a single Tapis ETL Pipeline run: ingress, transform, egress

  • Resubmission - Rerunning a specific phase or phases of an ETL pipeline for a given manifest.

  • Pipeline Run - A single run of an ETL pipeline

How it works

Tapis ETL diagram
Step 0 must be completed by a user or other external workflow asynchronous Tapis ETL.
  1. Data files to be processed are placed in the data directory of the configured Remote Outbox. A manifest is generated for those files (by a user or a script) and placed in the Remote Outbox’s manifest directory

Steps 1-4 are managed by Tapis ETL:
  1. Tapis ETL then checks the Remote Outbox manifests directory for new manifests and transfers them to the manifests directory of the Local Inbox. The files enumerated in the new manifests on the Local Inbox are then transferred over to the data directory of the Local Inbox

  2. A single unprocessed manifest is chosen by the ETL Pipeline (according to the manifest priority) and the files therein are then staged as inputs to the first batch computing jobs. Each batch computing job definied in the ETL Pipeline definition will then be submitted to the Tapis Jobs API; the first of which processes the data files in the manifest.

  3. After all jobs run to completion, a manifest is generated for each of the output files found in the data directory of the Local Outbox.

  4. All data files enumerated in that manifest are then transferred to the data directory of the Remote Inbox.

Phases

Tapis ETL Pipelines run in 3 seperate phases. The Ingress, Transform, and Egress Phases.
  1. Ingress Phase - Tapis ETL inspects the Remote Outbox for new data files and transfers them over to the Local Inbox.

  2. Transform Phase - A single data file or batch of data files is processed by a series of user-defined ETL Jobs

  3. Egress Phase - The output data files from the ETL Jobs are tracked and transferred from the Local Outbox to the Remote Inbox


User Guide

This section shall serve as the primary technical reference and comprehensive step-by-step guide for the seting up, creating, and managing ETL Pipelines with Tapis ETL.

0. Prerequisites

If you have already satisfied the prerequisites, you can skip to the ETL Systems Configuration section.

0.1 Data Center Storage and Compute Allocations

Your project may need a storage allocation on some storage resource (for example at TACC: Corral, Stockyard, or one of our Cloud-based storage resources) to serve as the project’s Local Inbox and Outbox. The size of the required allocation greatly depends on the size of the files that will be processed in the pipeline.

Your project may also need one or more allocations on a compute system (for example at TACC: such as Frontera, Stampede2, Lonestar5, or one of cloud computing systems. ETL Jobs run by Tapis ETL will use the allocations specified in the ETL Job definitions.

0.2 Users and Allocations

When Tapis ETL runs an ETL Job, it does so as the owner of the ETL Pipeline. This Tapis user must be mapped to a valid user with a valid allocation on the underlying compute system or the Transform Phase of the ETL Pipeline will always fail.

0.3 TapisV3 Client Setup

Throughout the User Guide, we will be creating TapisV3 objects. For all TapisV3 requests, both cURL and tapipy instructions will be available. Tapipy usage and installation instructions can be found at https://pypi.org/project/tapipy/.

curl -H "Content-type: application/json" -d '{"username": "your_tacc_username", "password": "your_tacc_password", "grant_type": "password" }' https://tacc.tapis.io/v3/oauth2/tokens
export JWT=your_access_token_string

0.4 Tapis Workflows Group

Tapis ETL uses Tapis Workflows’ Groups to manage access (creating, deleting, running) ETL Pipelines. If you do not own or belong to a group, create it following the instructions below.

Creating a Group (createGroup)

Create a file in your current working directory called group.json with the following json schema:

{
  "id": "<group_id>",
  "users": [
      {
        "username":"<user_id>",
        "is_admin": false
      }
  ]
}

Note

You do not need to add your own Tapis id to the users list. The owner of the Group is added by default.

Replace <group_id> with your desired group id and <user_id> in the user objects with the ids of any other Tapis users that you want to have access to your workflows resources.

Warning

Users with is_admin flag set to true can perform every action on all Workflow resources in a Group except for deleting the Group itself (only the Group owner has those permissions)

Submit the definition.

curl -X POST -H "content-type: application/json" -H "X-Tapis-Token: $JWT" https://tacc.tapis.io/v3/workflows/groups -d @group.json

Note

Section 0 Punch List
  • Valid allocation for your project on the compute and storage resources

  • Valid user with a valid allocation on the compute resources

  • Set up a Tapis client (Bash or Python)

  • Create or belong to a valid Tapis Workflows Group


1. ETL Systems Configurations

In this section, we discuss ETL System Configurations. At the end of this section, we will explain how to create and configure all Remote and Local ETL System Configruations for an ETL Pipeline.

Note

ETL System Configurations are by far the most complex part of creating an ETL Pipeline. Do not be discouraged! The initial complexity is a small price to pay for the ease with which ETL Pipelines are operated with Tapis ETL.

An ETL System Configuration is a collection of systems that are responsible for the storage of data files (Data System) and the storage of manifest files (Manifest System).

There are 4 ETL System Configurations for every ETL Pipeline:
  • The Remote Outbox - Where input data files to be processed are staged

  • The Local Inbox - Where input data files are processed

  • The Local Outbox - Where output data files are staged

  • The Remote Inbox - Where output data files are transferred

Every ETL System Configuration is composed of 2 systems: A Data System and a Manifests System. These systems generally have a one-to-one correspondence with a Tapis Systems.

Note

There are many different possible ETL System Confguration setups for an ETL Pipeline. It is entirely possible to have a single Tapis System representing all systems in an ETL Pipeline. How you set up your ETL System Configurations depends entirely upon where the data is and how you want to process it.

Common ETL System Configurations Setup

Tapis ETL System Configurations diagram

Above is an illustration of the most common ETL System Configuration setup. You will notice that there are four distinct Tapis Systems; System(A) System(C) are Globus endpoints, System(B) is a Linux system, and System(D) is the ETL Job execution system (also Linux) that shares a file system with Systems(C). It is worth noting that some of the Data and Manifests Systems for some ETL System Configurations (Remote Inbox, Remote Outbox, etc.) have corresponding Tapis System that are located at entirely different data centers.

1.1 Create Tapis Systems for the ETL System Configurations

Before we discuss ETL System Configurations in depth, we will first create all of the Tapis Systems that will be used by Tapis ETL to run our ETL Pipeline. We will be using a simplified single-data center model for this pipeline. In this setup, the Local Inbox, Local Outbox, and compute system will use LoneStar6 (LS6) as the host and the Remote Inbox and Remote Outbox will use the LS6 Globus Endpoint as the host.

Simplified ETL System Configurations Setup

Simplified Tapis ETL System Configurations diagram

Follow the instructions below to create the necessary Tapis Systems for your ETL Pipeline.

This globus Tapis System will be used as the Data System for both of the Remote ETL System Configurations (Remote Inbox, Remote Outbox) in our pipeline.

Creating a Globus System for an ETL Pipeline

Create a file named onsite-globus-system.json in your current working directory that contains the json schema below.

{
  "id": "etl.userguide.systema.<user_id>",
  "description": "Tapis ETL Globus System on LS6 for data transfer and storage",
  "canExec": false,
  "systemType": "GLOBUS",
  "host": "24bc4499-6a3a-47a4-b6fd-a642532fd949",
  "defaultAuthnMethod": "TOKEN",
  "rootDir": "HOST_EVAL($SCRATCH)"
}
import json
from tapipy.tapis import Tapis


t = Tapis(base_url='https://tacc.tapis.io', username='<userid>', password='************')
t.tokens.get_tokens()

with open('onsite-globus-system.json', 'r') as file:
    system = json.load(file)

t.systems.createSystem(**system)

Register Globus Credentials for the System

Once you have successfully created the system, you must then register credentials for the system in order for Tapis to access it on your behalf. Follow the instructions found in the Registering Globus Credentials for a System section.

Once you have successfully registered credentials for the system, you should be able to list files on the system.

Once you have succesfully listed files for this system, move onto the instructions for System (B)

1.2 ETL System Confguration Components

Every ETL System Configuration is composed of two parts. The Data System and the Manifests System.

1.2.1 Data System

The Data System is a collection of properties that define where where data files are located (or where data files are supposed to be transferred) as well as how to perform integrity checks on that data. Below is an example of the Data System of a Remote Inbox ETL System Configuration (Manifests System ignored to simplify the schema)

{
    "remote_outbox": {
        "data": {
            "system_id": "etl.userguide.systema.<user_id>",
            "path": "/ETL/REMOTE-OUTBOX/DATA",
            "integrity_profile": {
                "type": "done_file",
                "done_files_path": "/ETL/REMOTE-OUTBOX/DATA",
                "include_patterns": ["*.md5"],
                "exclude_patterns": []
            },
            "include_patterns": ["*.txt"],
            "exclude_patterns": []
        }
        "manifests": {}
    }
}
  • system_id - The ID of Tapis System on which:

    • data files are stored (Outbox systems)

    • data files are to be transferred (Inbox systems)

  • path - The path to the directory on the Tapis System where data files are staged

  • include_patterns - An array of glob patterns that are used to filter filenames. All files matching the glob patterns are considered by Tapis ETL to be data files.

  • exclude_patterns - An array of glob patterns that are used to filter filenames. All files matching the glob patterns are considered by Tapis ETL to be non-data files.

1.2.1.1 Data Integrity Profile

Every Data System has a Data Integrity Profile. The Data Integrity Profile is a set of instructions that informs Tapis ETL on how it should check the validity of all data files in the data path. There are 3 ways in which Tapis ETL can perform data integrity checks:

  • byte_check - Checks the actual size (in bytes) of a data file against it’s recored size in a manifest

  • checksum - Takes the hash of a data file according to the specified hashing algorithm and checks that value against the value of the checksum in a manifest. This is an expensive process for large data files. The data integrity check type should only be used if data files are small or if absolutely necessary.

  • done_files - Checks that a data file has a corresponding done file in which:

    • the done file matches a set of glob patterns

    • the done file’s filename contains the corresponding data file’s filename as a substring

Below are some example schemas of Data Systems with different Data Integrity Profiles

{
    "remote_outbox": {
        "data": {
            "system_id": "etl.userguide.systema.<user_id>",
            "path": "/ETL/REMOTE-OUTBOX/DATA",
            "integrity_profile": {
                "type": "byte_check"
            },
            "include_patterns": ["*.txt"],
            "exclude_patterns": []
        }
        "manifests": {}
    }
}

1.2.2 Manifests System

The Manfests System is a is a collection of properties that define where manifests for a given system are located as well as how to generate manifests. Below is an example of the Manifests System of a Remote Inbox ETL System Configuration (Data System ignored to simplify the schema)

{
    "remote_outbox": {
        "data": {}
        "manifests": {
            "system_id": "etl.userguide.systema.<user_id>",
            "generation_policy": "auto_one_per_file",
            "priority": "oldest",
            "path": "/ETL/REMOTE-OUTBOX/MANIFESTS",
            "include_patterns": [],
            "exclude_patterns": []
        }
    }
}
  • system_id - The ID of Tapis System on which manifest files are stored or created

  • path - The path to the directory on the Tapis System where the manifest files are stored or created

  • include_patterns - An array of glob patterns that are used to filter filenames. All files matching the glob patterns are considered by Tapis ETL to be manifest files.

  • exclude_patterns - An array of glob patterns that are used to filter filenames. All files matching the glob patterns are considered by Tapis ETL to be non-manifest files.

  • generation_policy - Indicates how manifests will be generated for data files on the Manifests System. Must be one of the following values:

    • manual - A user must manually generate the manifests for the data files that they want their ETL Pipeline to process

    • auto_one_per_file - Generates one manifest per data file found in the data directory

    • auto_one_for_all - Generates one manifests for all data files found in the data directory

  • priority - The order in which manifests should be processed. Must be one of the following values

    • oldest - Tapis ETL will process the oldest manifest in the manifests path

    • newest - Tapis ETL will process the newest manifest in the manifests path

    • any - Tapis ETL determine which manifests in the manifests path to process first

Note

Section 1 Punch List
  • Created 1 Globus system with Tapis

  • Registered credentials with Tapis for the Globus system

  • Successully peformed a file listing with Tapis on the Globus system

  • Created 1 Linux system with Tapis

  • Registered credentials with Tapis for the Linux system

  • Successully peformed a file listing with Tapis on the Linux system


2. ETL Pipeline Creation

In this section we create our ETL Pipeline. Once it’s created, we will discuss each part of the ETL Pipeline schema in detail.

Note

This schema is built using data from resources that were created during the earlier sections of this User Guide.

2.1 Create the ETL Pipeline

Create an ETL Pipeline (createETLPipeline)

Save the following ETL pipeline definiton to a file named etl-pipeline.json in your current working directory.

{
    "id": "etl-userguide-pipeline-<user_id>",
    "before": null,
    "remote_outbox": {
        "data": {
            "system_id": "etl.userguide.systema.<user_id>",
            "path": "/ETL/REMOTE-OUTBOX/DATA",
        },
        "manifests": {
            "system_id": "etl.userguide.systemb.<user_id>",
            "generation_policy": "auto_one_per_file",
            "path": "/ETL/REMOTE-OUTBOX/MANIFESTS"
        }
    },
    "local_inbox": {
        "control": {
            "system_id": "etl.userguide.systemb.<user_id>",
            "path": "/ETL/LOCAL-INBOX/CONTROL"
        },
        "data": {
            "system_id": "etl.userguide.systema.<user_id>",
            "path": "/ETL/LOCAL-INBOX/DATA"
        },
        "manifests": {
            "system_id": "etl.userguide.systemb.<user_id>",
            "path": "/ETL/LOCAL-INBOX/MANIFESTS"
        }
    },
    "jobs": [
        {
            "name": "sentiment-analysis",
            "appId": "etl-sentiment-analysis-test",
            "appVersion": "dev",
            "nodeCount": 1,
            "coresPerNode": 1,
            "maxMinutes": 10,
            "execSystemId": "test.etl.ls6.local.inbox",
            "execSystemInputDir": "${JobWorkingDir}/jobs/${JobUUID}/input",
            "archiveSystemId": "test.etl.ls6.local.inbox",
            "archiveSystemDir": "ETL/LOCAL-OUTBOX/DATA",
            "parameterSet": {
                "schedulerOptions": [
                    {
                        "name": "allocation",
                        "arg": "-A TACC-ACI"
                    },
                    {
                        "name": "profile",
                        "arg": "--tapis-profile tacc-apptainer"
                    }
                ],
                "containerArgs": [
                    {
                        "name": "input-mount",
                        "arg": "--bind $(pwd)/input:/src/input:ro,$(pwd)/output:/src/output:rw"
                    }
                ],
                "archiveFilter": {
                    "includes": [],
                    "excludes": ["tapisjob.out"],
                    "includeLaunchFiles": false
                }
            }
        }
    ],
    "local_outbox": {
        "data": {
            "system_id": "etl.userguide.systema.<user_id>",
            "path": "/ETL/LOCAL-OUTBOX/DATA"
        },
        "manifests": {
            "system_id": "etl.userguide.systemb.<user_id>",
            "path": "/ETL/LOCAL-OUTBOX/MANIFESTS"
        }
    },
    "remote_inbox": {
        "data": {
            "system_id": "etl.userguide.systema.<user_id>",
            "path": "/ETL/REMOTE-INBOX/DATA"
        },
        "manifests": {
            "system_id": "etl.userguide.systemb.<user_id>",
            "path": "/ETL/REMOTE-INBOX/MANIFESTS"
        }
    },
    "after": null
}

Then submit the definition.

with open('etl-pipeline.json', 'r') as file:
    pipeline = json.load(file)

    t.workflows.createETLPipeline(group_id=<group_id>, **pipeline)

Once created, you can now fetch and run the pipeline

t.workflows.getPipeline(group_id=<group_id>, pipeline_id=<pipeline_id>)

2.2 Remote Outbox

The Remote Outbox is the ETL System Configuration that tells Tapis ETL where data files to be processed are staged. Any data files placed on the Data System in the data directory (after applying the include and exclude pattern filters) are the files that will be processed during runs of an ETL Job.

How many get processed and in what order is determined by the manifests generated for those data files. Consider the schema below, specifically the Manifest Configuration.

{
    "data": {
        "system_id": "etl.userguide.systema.<user_id>",
        "path": "/ETL/REMOTE-OUTBOX/DATA"
    },
    "manifests": {
        "system_id": "etl.userguide.systemb.<user_id>",
        "generation_policy": "auto_one_per_file",
        "path": "/ETL/REMOTE-OUTBOX/MANIFESTS"
    }
}

This Manifest Configuration tells Tapis ETL that the user wants Tapis ETL to handle manifests generation. According to the manifest generation policy of auto_one_per_file, Tapis ETL will generate one manifest for each data file that is not currently being tracked in another manifest.

Warning

Using automatic manifest generation policies without specifying a data integrity profile can beak a pipeline. The preferred manifest generation policy for the Remote Inbox is manual. Instructions for generating manifests will come in the following sections. For more info on other possible configurations, see the Manifests Configuration section

Tapis ETL will perform this operation for every untracked data file for every run of the pipeline. Whether this step runs or not has no effect on the actual data processing phase of the pipeline run (the phase where the ETL Jobs are run). If there are no data files to be tracked, Tapis ETL will simply move on to the next step; looking for an unprocessed manifest with a status of pending and submitting it for processing.

2.3 Local Inbox

Tapis ETL will transfer all of the data files from the Remote Outbox to the data directory of the Local Inbox for processing.

Note

When configuring your Local Inbox, consider that your ETL Jobs should run on a system that has a shared file system with the Local Inbox. The data path in the Local Inboxes Data Configuration should be accessible to the first ETL Job.

Notice that this ETL System Configuration has an addition property, control. This is simply a place that Tapis ETL will write accounting files to ensure the pipeline runs as expected. It is recommended that you use the same system as in Local Inbox’s Manifest System, however any system to which Tapis ETL can write files to would work.

{
    "control": {
        "system_id": "etl.userguide.systemb.<user_id>",
        "path": "/ETL/LOCAL-INBOX/CONTROL"
    },
    "data": {
        "system_id": "etl.userguide.systema.<user_id>",
        "path": "/ETL/LOCAL-INBOX/DATA"
    },
    "manifests": {
        "system_id": "etl.userguide.systemb.<user_id>",
        "path": "/ETL/LOCAL-INBOX/MANIFESTS"
    }
}

2.4 ETL Jobs

ETL Jobs are an ordered list Tapis Job definitions that are dispatched and run serially during a pipeline’s transform phase. Tapis ETL dispatches these Tapis Jobs and uses the data files in a manifest as the inputs for the job. Once all of the ETL Jobs complete, the tranform phase ends.

[
    {
        "name": "sentiment-analysis",
        "appId": "etl-sentiment-analysis-test",
        "appVersion": "dev",
        "nodeCount": 1,
        "coresPerNode": 1,
        "maxMinutes": 10,
        "execSystemId": "test.etl.ls6.local.inbox",
        "execSystemInputDir": "${JobWorkingDir}/jobs/${JobUUID}/input",
        "archiveSystemId": "test.etl.ls6.local.inbox",
        "archiveSystemDir": "ETL/LOCAL-OUTBOX/DATA",
        "parameterSet": {
            "schedulerOptions": [
                {
                    "name": "allocation",
                    "arg": "-A TACC-ACI"
                },
                {
                    "name": "profile",
                    "arg": "--tapis-profile tacc-apptainer"
                }
            ],
            "containerArgs": [
                {
                    "name": "input-mount",
                    "arg": "--bind $(pwd)/input:/src/input:ro,$(pwd)/output:/src/output:rw"
                }
            ],
            "archiveFilter": {
                "includes": [],
                "excludes": ["tapisjob.out"],
                "includeLaunchFiles": false
            }
        }
    }
]
Every ETL Job is furnished with the following envrionment variables which can be accessed at runtime by your Tapis Job.
  • TAPIS_WORKFLOWS_TASK_ID - The ID of the Tapis Workflows task in the pipeline that is currently being executed

  • TAPIS_WORKFLOWS_PIPELINE_ID - The ID of the Tapis Workflows Pipeline that is currently running

  • TAPIS_WORKFLOWS_PIPELINE_RUN_UUID - A UUID given to this specific run of the pipeline

  • TAPIS_ETL_HOST_DATA_INPUT_DIR - The directory that contains the data files for inital ETL Jobs

  • TAPIS_ETL_HOST_DATA_OUTPUT_DIR - The directory to which output data files should be persisted

  • TAPIS_ETL_MANIFEST_FILENAME - The name of the file that contains the manifest

  • TAPIS_ETL_MANIFEST_PATH - The full path (including the filename) to the manifest file

  • TAPIS_ETL_MANIFEST_MIME_TYPE - The MIME type of the manifest file (always application/json)

In addition to the envrionment variables, a fileInput for the manfiest file is added to the job definition to ensure that is available to the ETL Jobs runtime. In your application code, you can use the envrionment variables above to locate the manifest file and inspect its contents. This is useful if your application code does not know where to find its input data. The local_files array property of the manifest contains the list of input data files. The files are represented as objects in the local_files array and take the following form.

{
    "mimeType": null,
    "type": "file",
    "owner": "876245",
    "group": "815499",
    "nativePermissions": "rwxr-xr-x",
    "url": "tapis://etl.userguide.systemb.<user_id>/ETL/LOCAL-INBOX",
    "lastModified": "2024-02-26T22:51:31Z",
    "name": "data1.txt",
    "path": "ETL/LOCAL-INBOX/DATA/data1.txt",
    "size": 66
}

2.5 Local Outbox

Once the Transform Phase ends, all of the output data files should be in the data directory of the Local Inbox. These files are then tracked in manifests to be transferred in the Egress Phase of the pipeline.

{
    "data": {
        "system_id": "etl.userguide.systema.<user_id>",
        "path": "/ETL/LOCAL-OUTBOX/DATA"
    },
    "manifests": {
        "system_id": "etl.userguide.systemb.<user_id>",
        "path": "/ETL/LOCAL-OUTBOX/MANIFESTS"
    }
}

2.6 Remote Inbox

During the Egress Phase, all of the output data produced by the ETL Jobs is transferred from the Local Outbox to the Remote Inbox. Once all of the data files from an Egress Manifest are successfully transferred over, Tapis ETL will then transfer that manifest to the manifests directory of the Remote Inbox system to indicate that files transfers are complete.

{
    "data": {
        "system_id": "etl.userguide.systema.<user_id>",
        "path": "/ETL/REMOTE-INBOX/DATA"
    },
    "manifests": {
        "system_id": "etl.userguide.systemb.<user_id>",
        "path": "/ETL/REMOTE-INBOX/MANIFESTS"
    }
}

Note

Section 2 Punch List
  • Created an ETL Pipeline

  • Can fetch the pipeline’s details from Tapis Workflows


3. Running an ETL Pipeline

In this section we will make our final preperations and perform our first pipeline run. This ETL Pipeline is configured to run HPC jobs that perform sentiment analysis on our data files. It is safe to trigger multiple pipeline runs with Tapis ETL. Tapis ETL has a system for locking down ETL resources (such as manifests) to prevent race conditions for data files. In other words, multiple pipelines can be running in parallel, all of which can be guaranteed to be processing entirely different data files.

3.1 Staging data files to the Remote Outbox

Before running an ETL Pipeline, you must first ensure that there are data files in the data directory of the Remote Outbox. Running a pipeline before there is any data staged to it would result in a “no-op” for each of the 3 phases of the ETL pipeline.

3.2 Manifests

Manifests

Manifests are JSON files that track one or more data files on a Data System as well as those data file’s progress through the various ETL Pipeline Phases. These manifest files contain a single manifest object that conforms to the schema below.

{
    "status": "pending",
    "phase": "ingress",
    "local_files": [],
    "transfers": [],
    "remote_files": [
        {
            "mimeType": null,
            "type": "file",
            "owner": "876245",
            "group": "815499",
            "nativePermissions": "rwxr-xr-x",
            "url": "tapis://etl.userguide.systemb.<user_id>/ETL/LOCAL-INBOX",
            "lastModified": "2024-02-26T22:51:31Z",
            "name": "data1.txt",
            "path": "ETL/LOCAL-INBOX/DATA/data1.txt",
            "size": 66
        },
        {
            "mimeType": null,
            "type": "dir",
            "owner": "876245",
            "group": "815499",
            "nativePermissions": "rwxr-xr-x",
            "url": "tapis://etl.userguide.systemb.<user_id>/ETL/LOCAL-INBOX/data2.txt",
            "lastModified": "2024-02-26T22:51:38Z",
            "name": "data2.txt",
            "path": "ETL/LOCAL-INBOX/DATA/data2.txt",
            "size": 47
        }
    ],
    "jobs": [],
    "logs": [
        "2024-04-01 19:16:49.378611 Created"
    ],
    "created_at": 1711999009.378607,
    "last_modified": 1711999009.378607,
    "metadata": {}
}

3.3 Generating Manifests

For each data file or set of data files in the Remote Outbox that you want to run an ETL Job against, you will need to generate a manifest for them. Each manifest in the Remote Inbox corresponds with a single Transform, or ETL Job. Each data file in a single manifest will be used as input files for a single ETL Job.

There are two ways to generate manifest files. Manually and automatically. Automatic manifest generation is the simplest manifest generation policy to use but the least flexible. With automatic manifest generation, you can use the auto_one_for_all policy and generate a manifest for all untracked files in the data directory, or the auto_one_per_file and generate one manifest per file in the data directory.

For all situations in which you need to generate manifests for arbitrary files in the Remote Inbox, you must use the manual manifest generation policy

Note

Coming Soon - Manual manifest generation

If you are following the tutorial in the user guide, we previously configured our Remote Inbox to have a manifest generation policy of auto_one_per_file so Tapis ETL will generate the manifest for our data1.txt data file that we created in the previous step.

3.4 Run the Pipeline

Now that we have a data file (data1.txt) and a manifest tracking that data file, we can trigger the first run of our ETL Pipeline.

Run an ETL Pipeline (runPipeline)

To run a pipeline, you must:
  1. Provide the id of the group that owns the pipeline

  2. Provide the id of the pipeline

  3. Belong to the group that owns the pipeline

Pipeline Arguments

When running an ETL Pipeline with Tapis ETL, you must provide the following arguments:
  • TAPIS_USERNAME - Username of the Tapis user running the pipeline

  • TAPIS_PASSWORD - Password of the user

  • TAPIS_BASEURL - The URL of the Tapis Tenant in which your pipeline should run (should be the same as the base URL you used to create your pipeline)

Save the following Pipeline Args definiton to a file named etl-pipeline-args.json in your current working directory.

{
    "args": {
        "TAPIS_USERNAME": {
            "type": "string",
            "value": "<tapis_username>"
        },
        "TAPIS_PASSWORD": {
            "type": "string",
            "value": "<tapis_password>"
        },
        "TAPIS_BASE_URL": {
            "type": "string",
            "value": "https://tacc.tapis.io"
        }
    }
}

Then submit the definition.

with open('pipeline-args.json', 'r') as file:
    args = json.load(file)

t.workflows.runPipeline(group_id=<group_id>, pipeline_id=<pipeline_id>, args=args)

3.5 Check the status of the Pipeline Run

Fetch Pipeline Run Details (getPipelineRun)

Fetch the details of a single pipeline run. You will need the following data. * group_id - id of the group that owns the pipeline * pipeline_id - id of the pipeline * pipeline_run_uuid - UUID of the pipeline run

t.workflows.getPipelineRun(group_id=<group_id>, pipeline_id=<pipeline_id>)

3.6 Pipeline Run Complete

Once the pipeline has gone through the 3 phases, the ETL Pipeline will enter a terminal state of either completed or failed.

Here are the most common reasons why an ETL Pipeline may fail.
  • Ingress transfer failed (Remote Outbox to Local Inbox)

  • Egress transfer failed (Local Outbox to Remote Inbox)

  • Malformed manifest - This is a critical and unrecoverable error

  • One of the batch compute jobs (ETL Jobs) exited with a non-zero exit code


4. Resubmitting a Pipeline Run

Coming Soon


Quick Start

In this section we will be constructing an ETL pipeline that:
  1. Detects the presence of data files and manifests on some remote system

  2. Transfers those manifests and data files to a local system

  3. Performs sentiment analysis on the text content of the data files and generates a result file for each analysis

  4. Transfers the results files to a remote system for archiving

In this step, we will create the following prerequisite Tapis resources:

  1. One group (Workflows API) - A collection of Tapis users that collectively own workflow resources

  2. Two (or more) systems (Systems API) - Tapis Systems for the Data and Manifests Configuration for the ETL Systems of your pipeline. We will create 1 Globus-type system for data transfers and storage, and 1 Linux-type system that we will use for manifests and compute.

Creating a Group (createGroup)

Create a file in your current working directory called group.json with the following json schema:

{
  "id": "<group_id>",
  "users": [
      {
        "username":"<user_id>",
        "is_admin": false
      }
  ]
}

Note

You do not need to add your own Tapis id to the users list. The owner of the Group is added by default.

Replace <group_id> with your desired group id and <user_id> in the user objects with the ids of any other Tapis users that you want to have access to your workflows resources.

Warning

Users with is_admin flag set to true can perform every action on all Workflow resources in a Group except for deleting the Group itself (only the Group owner has those permissions)

Submit the definition.

curl -X POST -H "content-type: application/json" -H "X-Tapis-Token: $JWT" https://tacc.tapis.io/v3/workflows/groups -d @group.json

Creating a Globus System for an ETL Pipeline

Create a file named onsite-globus-system.json in your current working directory that contains the json schema below.

{
  "id": "etl.userguide.systema.<user_id>",
  "description": "Tapis ETL Globus System on LS6 for data transfer and storage",
  "canExec": false,
  "systemType": "GLOBUS",
  "host": "24bc4499-6a3a-47a4-b6fd-a642532fd949",
  "defaultAuthnMethod": "TOKEN",
  "rootDir": "HOST_EVAL($SCRATCH)"
}
import json
from tapipy.tapis import Tapis


t = Tapis(base_url='https://tacc.tapis.io', username='<userid>', password='************')
t.tokens.get_tokens()

with open('onsite-globus-system.json', 'r') as file:
    system = json.load(file)

t.systems.createSystem(**system)

Register Globus Credentials for the System

Once you have successfully created the system, you must then register credentials for the system in order for Tapis to access it on your behalf. Follow the instructions found in the Registering Globus Credentials for a System section.

Once you have successfully registered credentials for the system, you should be able to list files on the system.

Creating a Linux System for an ETL Pipeline

Create a file named onsite-linux-system.json in your current working directory that contains the json schema below.

{
  "id": "etl.userguide.systemb.<user_id>",
  "description": "Tapis ETL Linux System on LS6 for manifests and compute",
  "systemType": "LINUX",
  "host": "ls6.tacc.utexas.edu",
  "defaultAuthnMethod": "PKI_KEYS",
  "rootDir": "HOST_EVAL($SCRATCH)",
  "canExec": true,
  "canRunBatch": true,
  "jobRuntimes": [
    {
      "runtimeType": "SINGULARITY",
      "version": null
    }
  ],
  "batchLogicalQueues": [
    {
      "name": "ls6-normal",
      "hpcQueueName": "normal",
      "maxJobs": 1,
      "maxJobsPerUser": 1,
      "minNodeCount": 1,
      "maxNodeCount": 2,
      "minCoresPerNode": 1,
      "maxCoresPerNode": 2,
      "minMemoryMB": 0,
      "maxMemoryMB": 4096,
      "minMinutes": 10,
      "maxMinutes": 100
    },
    {
      "name": "ls6-development",
      "hpcQueueName": "development",
      "maxJobs": 1,
      "maxJobsPerUser": 1,
      "minNodeCount": 1,
      "maxNodeCount": 2,
      "minCoresPerNode": 1,
      "maxCoresPerNode": 2,
      "minMemoryMB": 0,
      "maxMemoryMB": 4096,
      "minMinutes": 10,
      "maxMinutes": 100
    }
  ],
  "batchDefaultLogicalQueue": "ls6-development",
  "batchScheduler": "SLURM",
  "batchSchedulerProfile": "tacc-apptainer"
}

Use one of the following methods to submit a request to the Systems API to create the Tapis System.

with open('onsite-linux-system.json', 'r') as file:
    system = json.load(file)

t.systems.createSystem(**system)

Register Credentials for the System

Once you have successfully created the system, you must then register credentials for the system in order for Tapis to access it on your behalf. Follow the instructions found in the Registering Credentials for a System section.

Once you have successfully registered credentials for the system, you should be able to list files on the system.

Create an ETL Pipeline (createETLPipeline)

Save the following ETL pipeline definiton to a file named etl-pipeline.json in your current working directory.

{
    "id": "etl-userguide-pipeline-<user_id>",
    "before": null,
    "remote_outbox": {
        "data": {
            "system_id": "etl.userguide.systema.<user_id>",
            "path": "/ETL/REMOTE-OUTBOX/DATA",
        },
        "manifests": {
            "system_id": "etl.userguide.systemb.<user_id>",
            "generation_policy": "auto_one_per_file",
            "path": "/ETL/REMOTE-OUTBOX/MANIFESTS"
        }
    },
    "local_inbox": {
        "control": {
            "system_id": "etl.userguide.systemb.<user_id>",
            "path": "/ETL/LOCAL-INBOX/CONTROL"
        },
        "data": {
            "system_id": "etl.userguide.systema.<user_id>",
            "path": "/ETL/LOCAL-INBOX/DATA"
        },
        "manifests": {
            "system_id": "etl.userguide.systemb.<user_id>",
            "path": "/ETL/LOCAL-INBOX/MANIFESTS"
        }
    },
    "jobs": [
        {
            "name": "sentiment-analysis",
            "appId": "etl-sentiment-analysis-test",
            "appVersion": "dev",
            "nodeCount": 1,
            "coresPerNode": 1,
            "maxMinutes": 10,
            "execSystemId": "test.etl.ls6.local.inbox",
            "execSystemInputDir": "${JobWorkingDir}/jobs/${JobUUID}/input",
            "archiveSystemId": "test.etl.ls6.local.inbox",
            "archiveSystemDir": "ETL/LOCAL-OUTBOX/DATA",
            "parameterSet": {
                "schedulerOptions": [
                    {
                        "name": "allocation",
                        "arg": "-A TACC-ACI"
                    },
                    {
                        "name": "profile",
                        "arg": "--tapis-profile tacc-apptainer"
                    }
                ],
                "containerArgs": [
                    {
                        "name": "input-mount",
                        "arg": "--bind $(pwd)/input:/src/input:ro,$(pwd)/output:/src/output:rw"
                    }
                ],
                "archiveFilter": {
                    "includes": [],
                    "excludes": ["tapisjob.out"],
                    "includeLaunchFiles": false
                }
            }
        }
    ],
    "local_outbox": {
        "data": {
            "system_id": "etl.userguide.systema.<user_id>",
            "path": "/ETL/LOCAL-OUTBOX/DATA"
        },
        "manifests": {
            "system_id": "etl.userguide.systemb.<user_id>",
            "path": "/ETL/LOCAL-OUTBOX/MANIFESTS"
        }
    },
    "remote_inbox": {
        "data": {
            "system_id": "etl.userguide.systema.<user_id>",
            "path": "/ETL/REMOTE-INBOX/DATA"
        },
        "manifests": {
            "system_id": "etl.userguide.systemb.<user_id>",
            "path": "/ETL/REMOTE-INBOX/MANIFESTS"
        }
    },
    "after": null
}

Then submit the definition.

with open('etl-pipeline.json', 'r') as file:
    pipeline = json.load(file)

    t.workflows.createETLPipeline(group_id=<group_id>, **pipeline)

Once created, you can now fetch and run the pipeline

t.workflows.getPipeline(group_id=<group_id>, pipeline_id=<pipeline_id>)

Run an ETL Pipeline (runPipeline)

To run a pipeline, you must:
  1. Provide the id of the group that owns the pipeline

  2. Provide the id of the pipeline

  3. Belong to the group that owns the pipeline

Pipeline Arguments

When running an ETL Pipeline with Tapis ETL, you must provide the following arguments:
  • TAPIS_USERNAME - Username of the Tapis user running the pipeline

  • TAPIS_PASSWORD - Password of the user

  • TAPIS_BASEURL - The URL of the Tapis Tenant in which your pipeline should run (should be the same as the base URL you used to create your pipeline)

Save the following Pipeline Args definiton to a file named etl-pipeline-args.json in your current working directory.

{
    "args": {
        "TAPIS_USERNAME": {
            "type": "string",
            "value": "<tapis_username>"
        },
        "TAPIS_PASSWORD": {
            "type": "string",
            "value": "<tapis_password>"
        },
        "TAPIS_BASE_URL": {
            "type": "string",
            "value": "https://tacc.tapis.io"
        }
    }
}

Then submit the definition.

with open('pipeline-args.json', 'r') as file:
    args = json.load(file)

t.workflows.runPipeline(group_id=<group_id>, pipeline_id=<pipeline_id>, args=args)

Fetch Pipeline Run Details (getPipelineRun)

Fetch the details of a single pipeline run. You will need the following data. * group_id - id of the group that owns the pipeline * pipeline_id - id of the pipeline * pipeline_run_uuid - UUID of the pipeline run

t.workflows.getPipelineRun(group_id=<group_id>, pipeline_id=<pipeline_id>)