Workflows

The Workflows Service is an API and Workflow Engine with which Tapis users can create and execute their research computing workflows.


Overview

Before getting into the details about how to create and run a workflow, here are some important concepts you must understand in order to properly use Tapis Workflows

Important Concepts

  • Group: A group is a collection of Tapis users that own or have access to workflow resources. In order to create your first workflow, you must first belong to, or create your own group.

  • Pipeline: A pipeline is a collection of tasks and a set of rules governing how those tasks are to be executed.

  • Archive: An archive is the storage medium for the results created during a pipeline run. By default, the results produced by each task are deleted at the end of a pipeline run.

  • Task: Tasks are discrete units of work performed during the execution of a workflow. They can be represented as nodes on a directed acyclic graph (DAG), with the order of their execution determined by their dependencies, and where all tasks without dependencies are executed first. There are different types of tasks that users can leverage to perform diffent types of work. These are called task primitives and they will be discussed in detail later:

    Type

    Example

    Supported

    image_build

    Builds Docker and Singularity images from recipe files and pushes the to repositories

    yes

    request

    Sends requests using various protocols to resources external to the workflow (Only HTTP protocol and GET currently fully supported)

    partial

    tapis_job

    Submits a Tapis job

    yes

    tapis_actor

    Executes an Tapis actor

    no

    container_run

    Runs a container based on the provided image and tag

    no

    function

    Runs user-defined code in the language and runtime of their choice

    yes


Note

Security Note

In order to create and run workflows that are automated and reproducible, the Workflow Executor must sometimes be furnished with secrets(passwords, access keys, access secrets, ssh keys, etc) that enable it access restricted resources.

To ensure the safe storage and retrieval of this sensitive data, the Workflows service integrates with Tapis SK, a built-in secrets management service backed by Hashicorp Vault.

It is also important to note that, when a user creates a task that accesses some restricted resource, the Workflow Executor will execute that task on behalf of that user with the credentials that they provided for every run of the pipeline. If those credentials expire, or the user has their access revoked for those resources, the pipeline run will fail on that task.


Quick start

Warning

This Quick Start is deprecated. There is a new one comming soon!

We will be creating a pipeline that
  • pulls code from a private Github repository

  • builds an image from a Dockerfile located in that source code

  • then pushes the resultant image to a Dockerhub image registry

In the examples below we assume you are using the TACC tenant with a base URL of tacc.tapis.io and that you have authenticated using PySDK or obtained an authorization token and stored it in the environment variable JWT, or perhaps both.

Summary of Steps

  1. Create a Group

  2. Create an Archive to which the results of the pipeline run will be persisted

  3. Create the Pipeline and its Tasks which act as instructions to the workflow executor

Creating a Group

Create a local file named group.json with json similar to the following:

{
  "id": "<group_id>",
  "users": [
      {
        "username":"<user_id>",
        "is_admin": true
      }
  ]
}

Note

You do not need to add your own Tapis id to the users list. The owner of the Group is added by default.

Replace <group_id> with your desired group id and <user_id> in the user objects with the tapis user ids of the other users that you want to grant access to this group’s workflow resources.

Submit the definition.

curl -X POST -H "content-type: application/json" -H "X-Tapis-Token: $JWT" https://tacc.tapis.io/v3/workflows/groups -d @group.json

Creating Identities

We will be creating 2 identity mappings. One for Github and one for Dockerhub. After creating the identities, we will need to retrieve the UUIDs of the newly created identities. You can do this in a separate call, or simple grab the UUID from the url in the result after each operation.

Warning

Do NOT commit these files to source control!

Create the first file named github-identity.json with the following json:

{
  "type": "github",
  "name": "my-github-identity",
  "description": "My github identity",
  "credentials": {
    "username": "<github_username>",
    "personal_access_token": "<github_personal_access_token>"
  }
}

Then submit the definition

curl -X POST -H "content-type: application/json" -H "X-Tapis-Token: $JWT" https://tacc.tapis.io/v3/workflows/identities -d @github-identity.json

Create the second file named dockerhub-identity.json with the following json

{
    "type": "dockerhub",
    "name": "my-dockerhub-identity",
    "description": "My Dockerhub identity",
    "credentials": {
      "username": "<docerkhub_username>",
      "token": "<dockerhub_access_token>"
    }
}

Then submit the definition

curl -X POST -H "content-type: application/json" -H "X-Tapis-Token: $JWT" https://tacc.tapis.io/v3/workflows/identities -d @dockerhub-identity.json

Creating an Archive

In this step, we create the Archive. The results of the pipeline run will be persisted to the archive.

Note

This step requires that you have “MODIFY” permissions on some Tapis System. If you do not have access to one, you can create it following the instruction in the “Systems” section.

Create a local file named archive.json with json similar to the following:

{
  "id": "my-sample-archive",
  "type": "system",
  "system_id": "<your-tapis-system-id>",
  "archive_dir": "/workflows/archive/"
}

Note

The archive_dir is relative to your system’s rootDir. You can change this value to whatever you like.

curl -X POST -H "content-type: application/json" -H "X-Tapis-Token: $JWT" https://tacc.tapis.io/v3/workflows/groups/<group_id>/archives -d @archive.json

Creating a Pipeline

In this step, we define the pipeline. There are many more properties that can be defined at both the pipeline and task level, but for simplicity, we will be leaving them out.

Create a local file named pipeline.json with json similar to the following:

{
  "id": "my-sample-workflow",
  "archives": [ "<archive_id>" ]
  "tasks": [
    {
      "id": "my-image-build",
      "type": "image_build",
      "builder": "kaniko",
      "context": {
          "branch": "main",
          "build_file_path": "<path/to>/Dockerfile",
          "sub_path": null,
          "type": "github",
          "url": "<account>/<repo>",
          "visibility": "private",
          "identity_uuid": "<github_identity_uuid>"
      },
      "destination": {
          "tag": "<some_image_tag>",
          "type": "dockerhub",
          "url": "<account>/<registry>",
          "identity_uuid": "<dockerhub_identity_uuid>"
      }
    }
  ]
}

Go through the definition above and replace all of the placeholders with the correct values.

curl -X POST -H "content-type: application/json" -H "X-Tapis-Token: $JWT" https://tacc.tapis.io/v3/workflows/groups/<group_id>/pipelines -d @pipeline.json

Now it’s time to run the pipeline.

Run a Pipeline (runPipeline)

curl -X POST -H "content-type: application/json" -H "X-Tapis-Token: $JWT" https://tacc.tapis.io/v3/workflows/groups/<group_id>/pipelines/<pipeline_id>/run -d "{}"

After the pipeline has finished running, take a look in your Dockerhub image repository and you will find your newly pushed image.

If you SSH into the Tapis System that you selected as your archive, you will also find that you have some new directories and files in your rootDir;

/workflows/archive/<UUID of the pipeline run>/my-image-build/output/.stdout.

If you want to find the output for any task for a given pipeline run, simply navigate to the archive directory, cd into directory with the pipline run UUID, then cd into the directory with that task’s name. Inside the output/ directory, you will find all of the data created by that task.


Groups

A group is a collection of Tapis users that own or have access to workflow resources. In order to create your first workflow, you must first belong to, or create a group. A group and all of its resources are tied to a particular tenant which is resolved from the url of the request to create the group.

Groups have an id which must be unique within the tenant to which it belongs. Groups also have users. This is a simple list of user objects where user.username is the Tapis username/id and is_admin is a Boolean. Users with admin permissions i.e. is_admin == true are able to add and remove users from a group. Only the owner of a group may add or delete other admin users.

Group Attributes Table

Attribute

Type

Example

Notes

id

String

my.group

  • Must be unique within the tenant.

owner

String

someuser

  • Cannot be removed from group unless ownership is transferred.

tenant_id

String

tacc

  • Automatically set at create-time. Determined from the url of the request. Does not need to be specified.

uuid

String(UUIDv4)

e48ada7a-56b4-4d48-974c-7574d51a8789

  • Automatically set at create-time

users

Array(Users)

GroupUser Attributes Table

Attribute

Type

Example

Notes

username

String

jsmith

  • Must be unique within the group.

is_admin

Boolean

True

  • Must be an admin or group owner to add a user as admin

uuid

String(UUIDv4)

e48ada7a-56b4-4d48-974c-7574d51a8789

  • Automatically set at create-time

group

String(UUIDv4)

e48ada7a-56b4-4d48-974c-7574d51a8789

  • Automatically set at create-time

Retrieval (getGroup)

Retrieve details for a specific group

curl -H "X-Tapis-Token: $JWT" https://tacc.tapis.io/v3/workflows/groups/<group_id>
{
  "success": true,
  "status": 200,
  "message": "Success",
  "result": {
    "id": "my.group",
    "owner": "someuser",
    "tenant_id": "tacc",
    "uuid": "f60fdf8a-4ceb-4273-b49f-4c0dd94111c3",
    "users": [
      {
        "group": "f60fdf8a-4ceb-4273-b49f-4c0dd94111c3",
        "username": "someuser",
        "is_admin": true,
        "uuid": "c6b7acfd-da4b-4a1d-acbd-adbfa6aa4057"
      },
      {
        "group": "f60fdf8a-4ceb-4273-b49f-4c0dd94111c3",
        "username": "anotheruser",
        "is_admin": false,
        "uuid": "d6ca476a-2c19-4168-8054-264bcaaa70e7"
      }
    ]
  }
}

Deletion

Groups may only be deleted by the group owner. Upon deletion of a group, every workflow object owned by the group will also be deleted; pipelines, tasks, archives, etc.


Pipelines

A pipeline is a collection of tasks and a set of rules governing how those tasks are to be executed.

Pipeline Attributes Table

Attribute

Type

Example

Notes

id

String

my.pipeline

  • Must be unique within the group

uuid

String(UUIDv4)

e48ada7a-56b4-4d48-974c-7574d51a8789

  • Globally unique identifier for the pipeline

owner

String

jsmith

  • The only user that can delete the pipeline

group

String(UUIDv4)

e48ada7a-56b4-4d48-974c-7574d51a8789

  • The uuid of the group that owns this pipeline

last_run

String(UUIDv4)

e48ada7a-56b4-4d48-974c-7574d51a8789

  • The UUID of the previous pipeline run

current_run

String(UUIDv4)

e48ada7a-56b4-4d48-974c-7574d51a8789

  • The UUID of the current running pipeline

tasks

Array[Task]

See the Task section for the Task object

execution_profile

Object

See table below

env

Object

See Pipeline Envrionment section below

  • Environment variables to be used by tasks in a pipeline run

Execution Profile Attributes Table

Overrides the default behavior of the Workflow Executor regarding task retries, backoff algorithm, max lifetime of the pipeline, etc. All Execution Profile properties of the pipeline are inherited by the tasks that belong to the pipeline unless otherwise specified in the task definition.

Attribute

Type

Example

Notes

max_retries

Int

0, 3, 10

  • The number of times that the Workflow Executor will try to rerun the task if it fails. Defualts to 0

invocation_mode

Enum

async, sync

  • Default is “async”. When “async” is selected, all tasks will be executed concurrently. Currently, async is the only support option

retry_policy

Enum

exponential_backoff

  • Dictates which policy to employ when restarting tasks. Default(and only supported option) is exponential_backoff

max_exec_time

Int

60, 3600, 10800

  • The maximum amount of time in seconds that a pipeline(or task) is permitted to run. As soon as the sum of all task runs equals this limit, the pipeline(or task) is terminated. Defaults to 3600 seconds(1 hour)

Pipeline Envrionment

The Pipeline Envrionment (the env property of a pipeline definition) is a mechanism for exposing static global data to task inputs. The Pipeline Envrionment is an object in which the keys are the name of the variables and the value is either a scalar data (string, number, etc) or an object with a value_from property which references data from a source external to the the workflow (ex. Tapis Security Kernel).

{
  "env": {
    "TAPIS_SYSTEM_ID": "tapisv3-exec",
    "MANIFEST_FILES_DIR": "/path/to/manifest/files",
    "TAPIS_USERNAME": "someusername",
    "TAPIS_PASSWORD": {
      "value_from": {
        "tapis-security-kernel": "some+sk+id"
      }
    }
  }
}

Retrieval

Retrieve details for a specific pipeline

curl -H "X-Tapis-Token: $JWT" https://tacc.tapis.io/v3/workflows/groups/<group_id>/pipelines/<pipeline_id>

The response should look similar to the following:

{
   "success": true,
   "status": 200,
   "message": "Success",
   "result": {
     "id": "some_pipeline_id",
     "group": "c487c25f-6c6e-457d-a781-85120df9f10b",
     "invocation_mode": "async",
     "max_exec_time": 10800,
     "max_retries": 0,
     "owner": "testuser2",
     "retry_policy": "exponential_backoff",
     "uuid": "e48ada7a-56b4-4d48-974c-7574d51a8789",
     "current_run": null,
     "last_run": null,
     "tasks": [
       {
         "id": "build",
         "cache": false,
         "depends_on": [],
         "description": "Build an image from a repository and push it to an image registry",
         "input": null,
         "invocation_mode": "async",
         "max_exec_time": 3600,
         "max_retries": 0,
         "output": null,
         "pipeline": "e48ada7a-56b4-4d48-974c-7574d51a8789",
         "poll": null,
         "retry_policy": "exponential_backoff",
         "type": "image_build",
         "uuid": "e442b5df-8a9e-4d55-b4da-c51b7241a79f",
         "builder": "singularity",
         "context": "5bd771ab-8df5-43cd-a059-fbaa2323841b",
         "destination": "b34d1439-d2c9-4238-ab74-13b5fd7f3b1f",
         "auth": null,
         "data": null,
         "headers": null,
         "http_method": null,
         "protocol": null,
         "query_params": null,
         "url": null,
         "image": null,
         "tapis_job_def": null,
         "tapis_actor_id": null
       }
     ]
   }
 }

Deletion

Deleting a Pipeline will delete all of it’s tasks. This operation can only be performed the owner of the pipeline.


Tasks

Tasks are discrete units of work performed during the execution of a workflow. They can be represented as nodes on a directed acyclic graph (DAG), with the order of their execution determined by their dependencies, and where all tasks without dependencies are executed first.

Tasks can be defined when creating pipeline, or after the pipelines creation. Every task must have an id that is unique within the pipeline.

Task may also specify their dependencies in a number of ways. The first way is by declaring the dependency explicity in the depends_on property. This is an Array of TaskDependency objects which only have 2 attributes. The id, which is the id of the task that it depends on, and the can_fail attribute(Boolean) which specifies whether the dependent task is allowed to run if that TaskDependency fails.

Task Attributes Table

This table contains all of the properties that are shared by all tasks. Different types of tasks will have other unique properties in addition to all of the properties in the table below.

Attribute

Type

Example

Notes

id

String

my-task, my.task, my_task

  • Must be unique within the pipeline that it belongs to

type

Enum

image_build, tapis_job, tapis_actor, request, container_run, function

  • Only image_build is fully supported. Partial support for the request type exists; HTTP GET requests only

depends_on

Array[TaskDependency]

see table below

  • Explicitly declares this task’s dependencies. Task with the specified id must exist or the pipeline will not run.

execution_profile

Object

see execution profile table in the pipeline section

  • Inherits the execution_profile set in the pipeline definition.

description

String

My task description

input

Object

see input table below

output

Object

pipeline

String(UUIDv4)

5bd771ab-8df5-43cd-a059-fbaa2323841b

  • UUID of the pipeline that this task is a part of

uuid

String(UUIDv4)

5bd771ab-8df5-43cd-a059-fbaa2323841b

  • A globally unique identifier for this task

Input Object

Input is an object in which the keys are the id of the input(String) and the value is an object that defines the type of data, and either the literal value of the data via the data property or a reference to where that data can be found via the data_from.

Key-Val

Type

Example

Notes

key

String

TAPIS_USERNAME, PATH_TO_FILES, _SOME_INPUT, etc.

  • Must be an alpha numeric string that conforms to the following regex pattern: ^[_]?[a-zA-Z]+[a-zA-Z0-9_]*

value

InputValueObject

see table below

InputValue Object

Attributes

Type

Example

Notes

type

Enum

string, number, tapis_file_input, tapis_file_input_array

value

String, Int, Float

123, “hello world”, 1.23

value_from

ValueFrom Object

see table below

  • Used to reference data from a workflow definition (env, params, task_output)

ValueFrom Object

The Value From object is a key value pair in which the key is an enum denoting the source object of the value(pipeline envrionment, workflow submission parmeteters, and task outputs) and the value is the key on that source where the value data can be found

Key-Val

Type

Example

Notes

key

Enum

env, params, task_output

value

String or TaskOutput Object(see table below)

TAPIS_USERNAME, PATH_TO_FILES

TaskOutput Object

Attributes

Type

Example

Notes

task_id

String

task_1, my-task

  • The id of a previously run task

output_id

String

123, “hello world”, 1.23

  • The id of the output for the specified task

Example Task Definition with Inputs

Note

The example below is in yaml format for readability, but the Tapis Workflows API only accepts JSON

tasks:
- id: generate-manifests-and-process-next
  type: function
  description: |
    Python script that creates manifest files and outputs a
    tapis file inputs array to stdout
  runtime: python3.9
  installer: pip # poetry
  command: ""
  packages:
    - tapipy==1.2.20
  code: "<base64 encoded user-defined code here>"
  input:
    TAPIS_USERNAME:
      type: string
      value_from:
        env: TAPIS_USERNAME
    TAPIS_PASSWORD:
      type: string
      value_from:
        env: TAPIS_PASSWORD
    TAPIS_SYSTEM_ID:
      type: string
      value: "some-system-id"
    TARGET_DIR:
      type: string
      value: target/dir/for/file-input-array
    INBOX:
      type: string
      value: "/scratch/08294/jplneid/tacc_ep/INBOX"
    MANIFESTS:
      type: string
      value: "/scratch/08294/jplneid/tacc_ep/MANIFESTS"
    TAPIS_BASE_URL:
      type: string
      value_from:
        params: base_url
  output:
    fileInputArrays:
      type: "tapis_file_input_arrays"
    manifestFilePath:
      type: "string"

Task Types

There are different types of tasks types users can leverage to perform diffent types of work. These are called task types or primitives. Task types include the image_build type, the request type, the tapis_job type, the tapis_actor type, the container_run type, and the function task.

When defining tasks on a pipeline, the type must be present in the task definition along with all other attributes specific to the task type.


Function

Functions enable users to execute arbitrary code in select languages and runtimes as a part of their workflow. Functions have access to the Execution Context(see section below on Execution Context) which enables programmatic access to the state of the current Task Execution and the results of previous tasks.

Function Task Attributes Table

Attribute

Type

Example

Notes

code

Base64 String

  • The user-defined code to be run during this task

runtime

Enum

python3.9

  • The runtime environment in which the user-defined code will be run

packages

Array<String>

[“tapipy”, “numpy”]

  • The packages to be installed prior to running the user-defined code

installer

Enum

pip

  • The installer to install the packages

command

String

mkdir -p /some/dir && apt-get install package

  • Bash command to be run before package installation

Accessing Workflow Execution Context within a Funtion Task

The Execution Context is a set of functions to query and mutate the state of the Task Execution. The Execution Context is available for use in user-defined code via the Open Workflow Engine SDK. This can be imported into all functions and used to get task input values, fetch outputs of previously run tasks, set task outputs, and control the termination of the task(stdout, stderr).

Here is an example of user-defined python code that imports the Execution Context, performs work upon it, then terminates the task.

import json

from tapipy.tapis import Tapis
from owe_python_sdk.runtime import execution_context as ctx


system_id = ctx.get_input("TAPIS_SYSTEM_ID")
username = ctx.get_input("TAPIS_USERNAME")
password = ctx.get_input("TAPIS_PASSWORD")
manifest_files_path = ctx.get_input("MANIFEST_FILES_PATH")
base_url = ctx.get_input("tapis_base_url")

try:
  t = Tapis(
      base_url=base_url,
      username=username,
      password=password
  )

  t.get_tokens()

  files = t.systems.listFiles(systemId=system_id, path=manifest_files_path)
except Exception as e:
  ctx.stderr(1, "There was an error listing files")

for file, i in enumerate(files):
  ctx.set_output(f"fileObject{i}", json.dumps(file))

ctx.stdout("Hello stdout")

Back to tasks


Retrieval

Retrieve details for a specific task in a pipeline

curl -H "X-Tapis-Token: $JWT" https://tacc.tapis.io/v3/workflows/groups/<group_id>/pipelines/<pipeline_id>/tasks/<task_id>

The response should look similar to the following:

{
   "success": true,
   "status": 200,
   "message": "Success",
   "result": {
     "id": "build",
     "cache": false,
     "depends_on": [],
     "description": "Build an image from a repository and push it to an image registry",
     "input": null,
     "invocation_mode": "async",
     "max_exec_time": 3600,
     "max_retries": 0,
     "output": null,
     "pipeline": "ececc546-3ee0-437e-ae50-5882ec03356a",
     "poll": null,
     "retry_policy": "exponential_backoff",
     "type": "image_build",
     "uuid": "01eac121-19bf-4d8e-957e-faa27bdaa1f8",
     "builder": "singularity",
     "context": "ea58c3ef-7175-41b0-9671-e50700a33c77",
     "destination": "6eac73da-5799-4e74-957c-03b5cee97149",
     "auth": null,
     "data": null,
     "headers": null,
     "http_method": null,
     "protocol": null,
     "query_params": null,
     "url": null,
     "image": null,
     "tapis_job_def": null,
     "tapis_actor_id": null
   }
 }

Deletion

Deleting a task can only be done by a pipeline administrator. If any tasks depend on the deleted task, the pipeline will fail when run


Archives

Archives are the storage mechanisms for pipeline results. Without an archive, the results produced by each task will be permanently deleted at the end of each pipeline run. By default, archiving occurs at the end of a pipeline run when all tasks have reached a terminal state.

Archive Attributes Table

This table contains all of the properties that are shared by all archives. Different types of archives will have other unique properties in addition to all of the properties in the table below.

Attribute

Type

Example

Notes

id

String

my-task, my.task, my_task

  • Must be unique within the group that it belongs to

archive_dir

String

path/to/archive/dir

  • Relative to either the “root directory” of the archive’s file system

type

Enum

system, S3

Archive Types

Tapis System

Store the results of a pipeline run to a specific system. The owner of the archive must have MODIFY permissions on the system. Permission will be checked at the time the archive is created and every time before archiving.

Note

The archiving process does NOT interfere with the task execution process. If archiving fails, the pipeline run can still complete successfully.

Tapis System Archive Attributes Table

Attribute

Type

Example

Notes

system_id

String

somerepo/some_image

  • Must have MODIFY permissions on this system. Also, by default, the system is assumed to be in the same tenant as the group to which it belongs

Tapis System Archive Example:

{
  "id": "my.archive",
  "type": "system",
  "system_id": "my.system",
  "archive_dir": "workflows/archive/"
}

Back to archives