Skip to main content
Version: 2.0

Airflow

Platform 2.0

API Version N/A

Introduction

GraphGrid Airflow is responsible for orchestrating tasks defined through Directed Acyclic Graphs (DAGs). Per Airflow's official documentation, "A Task is the basic unit of execution in Airflow. Tasks are arranged into DAGs". One of the key types of Tasks are Operators, which are "a template for a predefined Task, that you can just define declaratively inside your DAG." For more conceptual information regarding Tasks, Operators, or DAGs see 1, 2, and 3 respectively.

DAGs

DAGs are the core foundational block that Airflow founds itself upon. For implementation specifics refer to the airflow docs here. Currently, Airflow wraps the NLP model training module's training pipeline as an executable DAG. The following section covers the implementation specifics for that DAG.

NLP Model Training DAG

The NLP model training DAG is made of the following tasks:

  • check_request_body - check the input request body/Configuration JSON (see here)
  • create_unique_session_path - creates the DAG run specific volume to store and propagate training resources
  • retrieve_resources - retrieve training resources (corpora, models, etc.) (see here)
  • determine_cpu_or_gpu - determine whether the current training session is using a GPU or CPU
  • gpu_training_sensor - sensor to determine whether all training sessions are in a valid state and running
  • gpu_preflight_configuration - blocking heuristic task to determine whether the current environment can support the current training session
  • gpu_pool_setup - modify the GPU training pool
  • train_model_gpu - train a model via GPU (see here)
  • train_model_cpu - train a model via CPU (see here)
  • save_model - save a model (see here)
  • eval_model - evaluate a model's performance (see here)
  • update_model_metadata_registry - update the model metadata registry (see here)
  • cleanup - remove all generated and retrieved resources
  • delete_volume - delete the dynamically created volume for the current DAG run
  • check_DAG_success - parse whether all tasks succeeded, otherwise fail.

GraphgridDockerOperator

The NLP model training DAG uses the GraphgridDockerOperator, which is an expanded and improved implementation of Airflow's docker provider's DockerOperator. The GraphgridDockerOperator is a part of the airflow-provider-graphgrid package. It has been improved to add additional configuration to the spawned containers to allow for logging customization, GPU passthrough and support, and much more.

As the GraphgridDockerOperator extends the DockerOperator, please refer to its documentation here for more information pertaining to base parameters.

New parameters specific to GraphgridDockerOperator:

  • labels (dict) - Labels to bind to the container (see here)
  • gpu (bool) - Flag to allow GPU passthrough to allow for GPU acceleration
  • include_credentials (bool) - Flag to include the requisite SDK credentials
  • gpu_label (bool) - Flag to label the container as using a GPU
  • gpu_healthcheck (bool) - Flag to enable a GPU healthcheck to determine when tensorflow has started

The current template fields are as follows:

  • command
  • environment
  • container_name
  • image
  • mounts
  • gpu

GraphgridMount

The GraphgridMount wraps the Python Docker SDK's docker.types.Mount to allow its fields to use Jinja templates and thus dynamically named and mounted. The GraphgridMount is also a part of the airflow-provider-graphgrid package.

The current template fields are as follows:

  • source

Usage

Airflow is accessible at localhost:8081, which will prompt you to login.

Default username: airflow
Default password: airflow

If you have generated random credentials using the passwords generate GraphGrid CLI command (./bin/graphgrid passwords generate), your new password can be found in ${GRAPHGRID_DATA}/env/airflow.env.

Following authentication, you will be greeted with the following home screen.

Screenshot

If you select the nlp_model_training DAG within the DAGs list, you will get the tree view of the DAG.

Screenshot

To kick off the DAG and begin the NLP model training pipeline, select the Trigger DAG w/ config option under the play button UI element on the upper right side of the webpage.

Screenshot

Within the Configuration JSON input box, you can input a request body in order to specify how the training pipeline should run (see here for information about request bodies). After inputting a request body, the Trigger button will trigger the DAG run.

Screenshot

After triggering the DAG you will be redirected towards the tree view of the DAG and you can monitor the progress and status of the current DAG run.

Uploading custom DAGs

If you have a custom DAG you wish to use in place of the NLP model training DAG, it may be uploaded via the GraphGrid CLI using the airflow upload command.

./bin/graphgrid airflow upload <path/to/custom/dag/file>

In order to ensure your custom DAG can be uploaded and run without error, it must be a single Python file without any local module dependencies.

Request Bodies / Configuration JSON

Request Bodies or Configuration JSON dictate the functioning of DAGs. For the NLP model training DAG this includes which model to train via the pipeline as well as the datasets and corpora.

{
"model": "named_entity_recognition",
"datasets": {
"sdk": {
"train": "SBRRPqE4LV28aTTXLWmPqzgG7cKD2MVQgT9sLQeRheQW/SBRRPqE4LV28aTTXLWmPqzgG7cKD2MVQgT9sLQeRheQW.jsonl",
"eval": null
}
},
"gpu": true
}

The fields are as follows:

  • model (string) - the name of the model to train
  • datasets (Map[String, Map[String, String]]) - the datasets/corpora to use for training and evaluation
  • GPU (bool - optional, default: false) - whether to enable GPU passthrough and acceleration

GPU Limitations

Currently, only a handful of Linux distributions are supported with GPU acceleration via Airflow. Additionally, only one GPU accelerated training session can be run at once. However, multiple CPU-based training sessions can be run concurrently. These can also be mixed, e.g. 1 GPU accelerated training session with multiple CPU training sessions. Finally, not every GPU is currently supported for training, most GPUs released in and beyond 2020 are not compatible and supported for GPU acceleration. Use the GraphGrid CLI to check whether your environment is compatible.

GPU Concurrency

The nlp_model_training DAG has a branching path for training depending on whether a GPU is being employed for the training session. In the case where a GPU is being used, there is a basic subgraph responsible for job scheduling and management which controls the concurrency of GPU training sessions.

The first important aspect of this management system is two new Airflow pools, which are named gpu_preflight_pool and the gpu_pool. For context, every running task within a DAGRun is placed within a pool. By default, tasks are placed within the default_pool which is statically defined by Airflow and comes with 128 slots. Tasks can only run if they occupy a slot within a pool. This means we can control task concurrency by placing specific tasks into pools. Hence, the creation of the two aforementioned pools.

gpu_preflight_pool is statically defined and only has one slot. Every task within the "GPU subgraph" besides train_model_gpu is placed within this pool. This means that only one preflight task can be run at any one time across any training session. This ensures that every heuristic check performed during the gpu_preflight_calcuation task is accurate and not subject to any potential race conditions or unpredictable variability.

gpu_pool is a dynamically defined pool and changes based on gpu_preflight_configuration. This allows the pool to increase in scenarios where gpu_preflight_configuration determined that there is enough headroom for another concurrent GPU training session.

The next part of this system requires the introduction of the GraphGridTrainingRunningSensor, which inherits from the BaseSensorOperator. Sensors are important and special types of operators which detect when a specific condition occurs. For our purposes, this sensor detects when all GPU training tasks are in a valid state (e.g. running or no state at all), and all running tasks have healthy docker containers. For more information about sensors refer to the airflow documentation here. This is an important and necessary step since this allows us to block any further training sessions and preflight calculations until we know all the training sessions are actually running.

The GraphgridDockerOperator has a togglable GPU healthcheck which runs a query to determine when a GPU training session has actually begun. The sensor makes a basic docker request to get all the statuses of all the GPU training containers.

After the training sensor task succeeds, the task gpu_preflight_configuration begins. This task performs multiple heuristics in order to get an accurate snapshot of the current environment in terms of GPU usage. It uses this snapshot to determine whether another training session can be run. In the case where it cannot, the process sleeps and then recursively calls the calculation again. This task will continue to block/sleep and recursively call itself until we reach the default maximum recursion depth in python of 1000. This gives an approximate timeout of ~8-9 hours.

Assuming the gpu_preflight_configuration task succeeds, gpu_pool_setup is then scheduled and run. gpu_pool_setup takes the pool size that the gpu_preflight_configuration task returned and either creates the pool with that number of slots or modifies the existing pool to match the newly calculated number of slots.

Finally, this subgraph/subDAG concludes with train_model_gpu which actually trains the requested model within the dynamically changing gpu_pool.

Air traffic controller analogy

The role of the GPU subDAG is analogous to an air traffic controller. This concurrency system monitors all training sessions (Airplanes) and an environment's bandwidth for these training sessions (airport runway availability). Air traffic controllers attempt to schedule planes to land on runways while avoiding collisions and errors. This is exactly the same goal as this concurrency management system, as we want all training tasks (planes) to run without overexerting the local environment and crash.

The training sensor tells us where all planes currently are (i.e. the states of all the current training sessions). For planes, these states include "at the terminal", "approaching the runway", "on the runway", "taking off", "in the air". For our training tasks, they can be in states such as "queued", "running", "failed", "no state", etc. The training sensor queries the backend airflow database which gets us every state other than "in the air", this is due to the fact that while Airflow may state the task is "running", we do not know whether the GPU process and TensorFlow have actually begun and will be queryable for our heuristics calculations. This is where the docker healthcheck comes in, since that will only mark containers as healthy when they have actually started and are using a GPU. For instance there is a task queued for "take off" while another a task is "in the air" with a passing healthcheck. The concurrency system is doing its job to ensure that there are no "flight" delays. This means with these two queries we can get every state of the plane and accurately know when we should consider trying to land a plane or schedule another plane to take off.

The gpu_preflight_configuration is analogous to the air traffic controller checking whether there are any available runways. This task looks at all the current planes and can get a current snapshot of which runways are currently occupied by these planes and which runways will be occupied in the future by other planes. Overall, the goal of this system is to "automagically" manage concurrency for GPU training sessions in order to avoid all possible crashes. If we instead opted for a "hands off" approach and let the planes and pilots decide for themselves where and when to land, then we would undoubtedly have many crashes and/or inefficiencies where pilots are too conservative and inactive in terms of deciding when to land or take off.