Quick Start

In this section we will be constructing an ETL pipeline that:
  1. Detects the presence of data files and manifests on some remote system

  2. Transfers those manifests and data files to a local system

  3. Performs sentiment analysis on the text content of the data files and generates a result file for each analysis

  4. Transfers the results files to a remote system for archiving

In this step, we will create the following prerequisite Tapis resources:

  1. One group (Workflows API) - A collection of Tapis users that collectively own workflow resources

  2. Two (or more) systems (Systems API) - Tapis Systems for the Data and Manifests Configuration for the ETL Systems of your pipeline. We will create 1 Globus-type system for data transfers and storage, and 1 Linux-type system that we will use for manifests and compute.

Creating a Group (createGroup)

Create a file in your current working directory called group.json with the following json schema:

{
  "id": "<group_id>",
  "users": [
      {
        "username":"<user_id>",
        "is_admin": false
      }
  ]
}

Note

You do not need to add your own Tapis id to the users list. The owner of the Group is added by default.

Replace <group_id> with your desired group id and <user_id> in the user objects with the ids of any other Tapis users that you want to have access to your workflows resources.

Warning

Users with is_admin flag set to true can perform every action on all Workflow resources in a Group except for deleting the Group itself (only the Group owner has those permissions)

Submit the definition.

curl -X POST -H "content-type: application/json" -H "X-Tapis-Token: $JWT" https://tacc.tapis.io/v3/workflows/groups -d @group.json

Creating a Globus System for an ETL Pipeline

Create a file named onsite-globus-system.json in your current working directory that contains the json schema below.

{
  "id": "etl.userguide.systema.<user_id>",
  "description": "Tapis ETL Globus System on LS6 for data transfer and storage",
  "canExec": false,
  "systemType": "GLOBUS",
  "host": "24bc4499-6a3a-47a4-b6fd-a642532fd949",
  "defaultAuthnMethod": "TOKEN",
  "rootDir": "HOST_EVAL($SCRATCH)"
}
import json
from tapipy.tapis import Tapis


t = Tapis(base_url='https://tacc.tapis.io', username='<userid>', password='************')
t.tokens.get_tokens()

with open('onsite-globus-system.json', 'r') as file:
    system = json.load(file)

t.systems.createSystem(**system)

Register Globus Credentials for the System

Once you have successfully created the system, you must then register credentials for the system in order for Tapis to access it on your behalf. Follow the instructions found in the Registering Globus Credentials for a System section.

Once you have successfully registered credentials for the system, you should be able to list files on the system.

Creating a Linux System for an ETL Pipeline

Create a file named onsite-linux-system.json in your current working directory that contains the json schema below.

{
  "id": "etl.userguide.systemb.<user_id>",
  "description": "Tapis ETL Linux System on LS6 for manifests and compute",
  "systemType": "LINUX",
  "host": "ls6.tacc.utexas.edu",
  "defaultAuthnMethod": "PKI_KEYS",
  "rootDir": "HOST_EVAL($SCRATCH)",
  "canExec": true,
  "canRunBatch": true,
  "jobRuntimes": [
    {
      "runtimeType": "SINGULARITY",
      "version": null
    }
  ],
  "batchLogicalQueues": [
    {
      "name": "ls6-normal",
      "hpcQueueName": "normal",
      "maxJobs": 1,
      "maxJobsPerUser": 1,
      "minNodeCount": 1,
      "maxNodeCount": 2,
      "minCoresPerNode": 1,
      "maxCoresPerNode": 2,
      "minMemoryMB": 0,
      "maxMemoryMB": 4096,
      "minMinutes": 10,
      "maxMinutes": 100
    },
    {
      "name": "ls6-development",
      "hpcQueueName": "development",
      "maxJobs": 1,
      "maxJobsPerUser": 1,
      "minNodeCount": 1,
      "maxNodeCount": 2,
      "minCoresPerNode": 1,
      "maxCoresPerNode": 2,
      "minMemoryMB": 0,
      "maxMemoryMB": 4096,
      "minMinutes": 10,
      "maxMinutes": 100
    }
  ],
  "batchDefaultLogicalQueue": "ls6-development",
  "batchScheduler": "SLURM",
  "batchSchedulerProfile": "tacc-apptainer"
}

Use one of the following methods to submit a request to the Systems API to create the Tapis System.

with open('onsite-linux-system.json', 'r') as file:
    system = json.load(file)

t.systems.createSystem(**system)

Register Credentials for the System

Once you have successfully created the system, you must then register credentials for the system in order for Tapis to access it on your behalf. Follow the instructions found in the Registering Credentials for a System section.

Once you have successfully registered credentials for the system, you should be able to list files on the system.

Create an ETL Pipeline (createETLPipeline)

Save the following ETL pipeline definiton to a file named etl-pipeline.json in your current working directory.

{
    "id": "etl-userguide-pipeline-<user_id>",
    "before": null,
    "remote_outbox": {
        "data": {
            "system_id": "etl.userguide.systema.<user_id>",
            "path": "/ETL/REMOTE-OUTBOX/DATA",
        },
        "manifests": {
            "system_id": "etl.userguide.systemb.<user_id>",
            "generation_policy": "auto_one_per_file",
            "path": "/ETL/REMOTE-OUTBOX/MANIFESTS"
        }
    },
    "local_inbox": {
        "control": {
            "system_id": "etl.userguide.systemb.<user_id>",
            "path": "/ETL/LOCAL-INBOX/CONTROL"
        },
        "data": {
            "system_id": "etl.userguide.systema.<user_id>",
            "path": "/ETL/LOCAL-INBOX/DATA"
        },
        "manifests": {
            "system_id": "etl.userguide.systemb.<user_id>",
            "path": "/ETL/LOCAL-INBOX/MANIFESTS"
        }
    },
    "jobs": [
        {
            "name": "sentiment-analysis",
            "appId": "etl-sentiment-analysis-test",
            "appVersion": "dev",
            "nodeCount": 1,
            "coresPerNode": 1,
            "maxMinutes": 10,
            "execSystemId": "test.etl.ls6.local.inbox",
            "execSystemInputDir": "${JobWorkingDir}/jobs/${JobUUID}/input",
            "archiveSystemId": "test.etl.ls6.local.inbox",
            "archiveSystemDir": "ETL/LOCAL-OUTBOX/DATA",
            "parameterSet": {
                "schedulerOptions": [
                    {
                        "name": "allocation",
                        "arg": "-A TACC-ACI"
                    },
                    {
                        "name": "profile",
                        "arg": "--tapis-profile tacc-apptainer"
                    }
                ],
                "containerArgs": [
                    {
                        "name": "input-mount",
                        "arg": "--bind $(pwd)/input:/src/input:ro,$(pwd)/output:/src/output:rw"
                    }
                ],
                "archiveFilter": {
                    "includes": [],
                    "excludes": ["tapisjob.out"],
                    "includeLaunchFiles": false
                }
            }
        }
    ],
    "local_outbox": {
        "data": {
            "system_id": "etl.userguide.systema.<user_id>",
            "path": "/ETL/LOCAL-OUTBOX/DATA"
        },
        "manifests": {
            "system_id": "etl.userguide.systemb.<user_id>",
            "path": "/ETL/LOCAL-OUTBOX/MANIFESTS"
        }
    },
    "remote_inbox": {
        "data": {
            "system_id": "etl.userguide.systema.<user_id>",
            "path": "/ETL/REMOTE-INBOX/DATA"
        },
        "manifests": {
            "system_id": "etl.userguide.systemb.<user_id>",
            "path": "/ETL/REMOTE-INBOX/MANIFESTS"
        }
    },
    "after": null
}

Then submit the definition.

with open('etl-pipeline.json', 'r') as file:
    pipeline = json.load(file)

    t.workflows.createETLPipeline(group_id=<group_id>, **pipeline)

Once created, you can now fetch and run the pipeline

t.workflows.getPipeline(group_id=<group_id>, pipeline_id=<pipeline_id>)

Run an ETL Pipeline (runPipeline)

To run a pipeline, you must:
  1. Provide the id of the group that owns the pipeline

  2. Provide the id of the pipeline

  3. Belong to the group that owns the pipeline

Pipeline Arguments

When running an ETL Pipeline with Tapis ETL, you must provide the following arguments:
  • TAPIS_USERNAME - Username of the Tapis user running the pipeline

  • TAPIS_PASSWORD - Password of the user

  • TAPIS_BASEURL - The URL of the Tapis Tenant in which your pipeline should run (should be the same as the base URL you used to create your pipeline)

Save the following Pipeline Args definiton to a file named etl-pipeline-args.json in your current working directory.

{
    "args": {
        "TAPIS_USERNAME": {
            "type": "string",
            "value": "<tapis_username>"
        },
        "TAPIS_PASSWORD": {
            "type": "string",
            "value": "<tapis_password>"
        },
        "TAPIS_BASE_URL": {
            "type": "string",
            "value": "https://tacc.tapis.io"
        }
    }
}

Then submit the definition.

with open('pipeline-args.json', 'r') as file:
    args = json.load(file)

t.workflows.runPipeline(group_id=<group_id>, pipeline_id=<pipeline_id>, args=args)

Fetch Pipeline Run Details (getPipelineRun)

Fetch the details of a single pipeline run. You will need the following data. * group_id - id of the group that owns the pipeline * pipeline_id - id of the pipeline * pipeline_run_uuid - UUID of the pipeline run

t.workflows.getPipelineRun(group_id=<group_id>, pipeline_id=<pipeline_id>)