Skip to main content
Version: 2.0

SDK Overview and Usage

SDK Usage Introduction

The GraphGrid Python SDK is a python-based software development kit that can be used to programmatically interact with GraphGrid services. Currently its primary purpose is to provide a flexible way to train NLP models on a variety of tasks.

This page contains usage reference and examples for the GraphGrid SDK.

The first goes over using a local SDK python script to train an individual model, while the second example goes over setting up and using a custom job (DAG) to automate model training.

Details about specific SDK methods, their parameters, and response types can be found in the SDK Reference documentation.

The code for our examples can be pulled from our GraphGrid SDK Examples repository.

Start GraphGrid AI Edition

Before moving on to the SDK NLP Model Training or SDK Custom Job sections be sure to start running a local CDP deployment.

Download the CDP ai-edition version 2.0 from graphgrid.com/cdp-downloads and visit https://docs.graphgrid.com/2.0/#/ for more information about CDP.

Your CDP deployment needs to be running in order to properly train NLP models.

SDK NLP Model Training Example and Usage

In this example we use SDK code to train a named entity recognition (or NER) model. You'll need to be running an ai-edition CDP deployment before running this python script locally.

Below is the entire python script for training a model. In the next sections we break up this script explaining each line.

import time

from graphgrid_sdk.ggcore.config import SdkBootstrapConfig
from graphgrid_sdk.ggcore.sdk_messages import NMTStatusResponse, \
NMTTrainResponse, SaveDatasetResponse, PromoteModelResponse
from graphgrid_sdk.ggcore.training_request_body import TrainRequestBody
from graphgrid_sdk.ggsdk.sdk import GraphGridSdk


# Stream dataset in
def read_by_line():
infile = open(
"dataset_example.jsonl",
'r', encoding='utf8')
for line in infile:
yield line.encode()


# Setup bootstrap config
bootstrap_conf = SdkBootstrapConfig(
access_key='a3847750f486bd931de26c6e683b1dc4',
secret_key='81a62cea53883f4a163a96355d47656e',
url_base='localhost',
is_docker_context=False)

# Initialize the SDK
sdk = GraphGridSdk(bootstrap_conf)

# Save training dataset (streamed)
dataset_response: SaveDatasetResponse = sdk.save_dataset(read_by_line(),
"sample-dataset",
overwrite=True)

# Train a new model
training_request_body: TrainRequestBody = TrainRequestBody(model="named_entity_recognition",
dataset_id="sample-dataset.jsonl",
no_cache=False, gpu=False)
train_response: NMTTrainResponse = sdk.nmt_train(training_request_body)

# Track training status
nmt_status: NMTStatusResponse = sdk.nmt_status(train_response.dagRunId)
while nmt_status.state != "success" and nmt_status.state != "failed":
print("...running dag...")
time.sleep(10)
nmt_status: NMTStatusResponse = sdk.nmt_status(train_response.dagRunId)

if nmt_status.state == "failed":
raise Exception("Dag failed: ", nmt_status.exception)

# Training has finished
print("Dag training/eval/model upload has finished.")

# Promote updated model
promote_model_response: PromoteModelResponse = \
sdk.promote_model(nmt_status.savedModelName, "named_entity_recognition")

# Promotion is complete
print("Model has been promoted.")

Setup the SDK object

The first step in training a new model is setting up the GraphGridSdk python object.

# Setup bootstrap config
bootstrap_conf = SdkBootstrapConfig(
access_key='a3847750f486bd931de26c6e683b1dc4',
secret_key='81a62cea53883f4a163a96355d47656e',
url_base='localhost',
is_docker_context=False)

# Initialize the SDK
sdk = GraphGridSdk(bootstrap_conf)

You create a SdkBootstrapConfig object that provides the basic configuration the SDK needs. This example uses the default access_key and secret_key associated with CDP.

Then you can initialize your GraphGridSdk object with that configuration.

Reading in and Saving your Dataset

The next step in training a new model is getting your data into the necessary location to support training. For CDP to access your training data it must be stored under the right bucket in MinIO, and while you could place it there manually it is easier and possibly more efficient to stream it in through the SDK directly.

# Stream dataset in
def read_by_line():
infile = open(
"dataset_example.jsonl",
'r', encoding='utf8')
for line in infile:
yield line.encode()

# Save training dataset (streamed)
dataset_response: SaveDatasetResponse = sdk.save_dataset(read_by_line(),
"sample-dataset",
overwrite=True)

We set up a read_by_line function that yields individual dataset lines. Each line in our dataset_example.jsonl is a training sample, which are described in the SDK Dataset Structure documentation.

We use our save_dataset method to stream in our dataset and save it under the name sample-dataset. Lastly we specify overwrite=True to ensure that any previous sample-dataset already present is overwritten.

You can check the SaveDatasetResponse object to confirm a 200 was returned and your dataset was properly saved to backend storage.

Kick off model training

With your dataset saved you are ready to train a new named_entity_recognition (or NER) model!

To kick off NLP model training we use the SDK method nmt_train with a crafted TrainRequestBody.

# Train a new model
training_request_body: TrainRequestBody = TrainRequestBody(model="named_entity_recognition",
dataset_id="sample-dataset.jsonl",
no_cache=False, gpu=False)
train_response: NMTTrainResponse = sdk.nmt_train(training_request_body)

Calling the nmt_train SDK method kicks off the predefined nlp-model-training job.

This training request body is for training a named_entity_recognition model using the sample-dataset.json you saved in the previous step. Depending on your local setup you may be able to use a GPU with gpu=True or may have to disable caching with no_cache=True. In this example we have caching enabled and are training using a CPU rather than a GPU.

Track your model training

Training can be programmatically monitored through use of the nmt_status method.

The dagRunId needed to call nmt_status is retrieved from the NMTTrainResponse returned by nmt_train.

# Track training status
nmt_status: NMTStatusResponse = sdk.nmt_status(train_response.dagRunId)
while nmt_status.state != "success" and nmt_status.state != "failed":
print("...running dag...")
time.sleep(10)
nmt_status: NMTStatusResponse = sdk.nmt_status(train_response.dagRunId)

if nmt_status.state == "failed":
raise Exception("Dag failed: ", nmt_status.exception)

This shows off some simple logic for tracking the job status and waiting until the job reaches a success or failed state.

Compare models and promote

Once training has completed and the job reaches a success state the NMTStatusResponse returns some metadata and metrics about your trained model.

This response includes information about the training/eval accuracy and loss, information you can use to determine if you want to promote your newly trained model.

### LOGIC CHECKING TRAINING ACC CODE HERE

# Promote updated model
promote_model_response: PromoteModelResponse = \
sdk.promote_model(nmt_status.savedModelName, "named_entity_recognition")

Once you have verified the newly trained model performs better than the currently loaded model you can use the SDK promote_model method to load it into CDP's NLP module for use.

This SDK method loads in the new model trained and starts using it immediately.

SDK Custom Job Reference

This section goes over the parts of writing, creating, and deploying a custom job. A custom job is a flexible framework that can be used for various operations, such as training NLP models and ingesting data.

Please see the GraphGrid SDK Samples GitHub page to download a working custom job example to follow along.

Install the GraphGrid SDK and provider package

To use the GraphGrid SDK the user needs to add it as a dependency to their python environment. The GraphGrid SDK is publicly hosted on PyPi and can be downloaded through pip:

pip install graphgrid-sdk

You'll also need the graphgrid-provider-package for running a custom job.

pip install airflow-provider-graphgrid

Setup your directory

The setup process includes multiple files, to download the code discussed here please see GraphGrid SDK Samples GitHub.

These files include

FileDescription
example_dag.pyThe example DAG custom job that uses the GraphGridDockerOperator
graphgrid_python_sdk_example.dockerfileDockerfile used to build the graphgrid_python_examples docker image
sdk_calls.pyFile executed in the docker image
requirements.txtRequirements file for sdk_calls.
dataset_example.jsonlExample dataset file.

Write a Job in python

Now we write our own custom job in python. This is really creating our own custom python DAG. We use the GraphGridDockerOperator as the base of our DAG.

This is the provided example_dag.py file.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from docker import APIClient
from graphgrid_provider.operators.graphgrid_docker import \
GraphGridDockerOperator, GraphGridMount

DOCKER_URL = "tcp://socat:2375"
SOURCE = "{{ ti.xcom_pull(task_ids='create_volume') }}"
dataset_filepath = 'dataset_example.jsonl'
filename = 'sample_dataset'
models_to_train = '["NAMED_ENTITY_RECOGNITION", "PART_OF_SPEECH_TAGGING"]'


def read_by_line():
infile = open(
dataset_filepath,
'r', encoding='utf8')
for line in infile:
yield line.encode()


default_args = {
'owner': 'GraphGrid',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'start_date': datetime(2015, 6, 1),
}

dag = DAG('train_model_with_sdk',
default_args=default_args,
description='train a model using sdk calls',
schedule_interval="@daily",
catchup=False,
user_defined_macros=dict()
)


def create_volume() -> str:
"""Create volume to propagate data between tasks

:return: The name of the created persistent volume claim

"""
client = APIClient(base_url=DOCKER_URL)
volume = client.create_volume()
return volume.get("Name")


def delete_volume(claim_name: str) -> None:
"""

Delete the volume that propagated data between tasks

:param claim_name: The name of the claim to delete

"""
client = APIClient(base_url=DOCKER_URL)
client.remove_volume(name=claim_name)


t_create_volume = PythonOperator(python_callable=create_volume,
task_id='create_volume', dag=dag)

t_1 = GraphGridDockerOperator(task_id='save_dataset',
dag=dag,
mounts=[GraphGridMount(target="/volumes/",
source=SOURCE)],
image="graphgrid-sdk-python-examples",
command=["save_dataset",
"--dataset_filepath", dataset_filepath,
"--filename", filename],
auto_remove=True,
)

t_2 = GraphGridDockerOperator(task_id='train_and_promote',
dag=dag,
mounts=[GraphGridMount(target="/volumes/",
source=SOURCE)],
image="graphgrid-sdk-python-examples",
command=["train_and_promote",
"--models_to_train", models_to_train,
"--dataset_id",
"{{ ti.xcom_pull(task_ids='save_dataset') }}",
"--no_cache", 'false',
"--gpu", 'false',
"--autopromote", 'true'],
auto_remove=True,
)

t_delete_volume = PythonOperator(python_callable=delete_volume,
task_id='delete_volume',
dag=dag, op_kwargs={"claim_name": SOURCE},
trigger_rule="all_done")

t_1.set_upstream(t_create_volume)
t_2.set_upstream(t_1)
t_delete_volume.set_upstream(t_2)

Note that the GraphGridDockerOperators are calling the methods provided by sdk_calls.py, which serves as an executable that can call the SDK.

SDK Calls Python File and Sample Dataset

SDK Calls

We use the sdk_calls.py to define two methods, save_dataset and train_and_promote. This python is executed by the DAG and kicks off things like saving the dataset, training, and promoting that are all done through SDK calls.

These methods setup a GraphGridSdk object and make corresponding calls the the GraphGrid SDK, which can be read about in the SDK Method Reference.

We leverage Fire, which is a Python library that converts any python component into a command line interface (CLI). This works with the dockerfile entrypoint and the example_dag.py command fields.

import typing
import fire
from graphgrid_sdk.ggcore.sdk_messages import SaveDatasetResponse
from graphgrid_sdk.ggcore.utils import NlpModel
from graphgrid_sdk.ggsdk.sdk import GraphGridSdk


class Pipeline:

def save_dataset(self, dataset_filepath: str, filename: str):
sdk = GraphGridSdk()

def read_by_line(dataset_filepath):
infile = open(
dataset_filepath,
'r', encoding='utf8')
for line in infile:
yield line.encode()

yield_function = read_by_line(dataset_filepath)
dataset_response: SaveDatasetResponse = sdk.save_dataset(yield_function,
filename)
if dataset_response.status_code != 200:
raise Exception("Failed to save dataset: ",
dataset_response.exception)

return dataset_response.dataset_id

def train_and_promote(self, models_to_train: list,
dataset_id: str,
no_cache: typing.Optional[bool],
gpu: typing.Optional[bool], autopromote: bool):
sdk = GraphGridSdk()

def success_handler(args):
return

def failed_handler(args):
return

models_to_train = [getattr(NlpModel, model) for model in models_to_train]

sdk.nmt_train_pipeline(models_to_train, dataset_id, no_cache, gpu,
autopromote, success_handler, failed_handler)


def main():
fire.Fire(Pipeline)


if __name__ == '__main__':
main()

Sample Dataset

In the GitHub project we provide a sample dataset dataset_example.jsonl to use for training.

There are only 22 samples to use for training so the models trained with this dataset won't be performant. This file is for the example project, and users will need to provide their own dataset. Please see the page on the SDK Dataset.

Set up a dockerfile

You'll need to create an image that airflow can use, to do this we use a dockerfile.

The following docker image provides all the basic functionality in order to convert our python script into a complete, continuously scheduled, custom job.

FROM python:3.8-slim

RUN apt update && apt -y install && apt -y upgrade \
bash
RUN pip install --upgrade pip
RUN apt-get -y install git

WORKDIR /graphgrid-sdk-python-examples
COPY sdk_calls.py sdk_calls.py
COPY requirements.txt requirements.txt
COPY dataset_example.jsonl dataset_example.jsonl

RUN python3 -m pip install -r requirements.txt

ENTRYPOINT ["python3", "sdk_calls.py"]
CMD ["--help"]

This dockerfile sets up a python image that installs pip and copies over the sdk_calls.py, requirements.txt, and dataset_example.json files. It then sets the sdk_calls.py as the entrypoint for the image (this is how the GraphGridDockerOperators we setup in the example_dag.py make their calls).

Build Job docker image

We need to build a docker image based on our dockerfile. This image is what Airflow uses when the DAG is triggered.

docker build -t sdk-graphgrid-sdk-python-examples -f graphgrid-sdk-python-example.dockerfile .

Upload your DAG

With our DAG image built we now need to upload it to Airflow.

From within your CDP directory run the graphgrid command:

./bin/graphgrid airflow upload <PATH/TO/example_dag.py>

Be sure to replace <PATH/TO/example_dag.py> with the actual path to your DAG python file.

Airflow may take up to 1 minute to add new DAGs to the Webserver UI.

Kick off your Job

We can start training NLP models! We'll show off two ways to trigger our DAG, either directly on the Airflow Webserver UI in the browser or with the GraphGrid SDK.

Trigger your Job through the Airflow Webserver

You can also trigger a job by going to your airflow webserver browser (CDP defaults this to localhost:8081 and signing in with username/password airflow.

From the home screen you should see your custom DAG and the nlp-model-training DAG. You can manually trigger your custom DAG by hitting the arrow under the Actions column.

Trigger your Job through the SDK

You can also trigger a job directly through the GraphGrid SDK.

sdk = GraphGridSdk(SdkBootstrapConfig(
access_key='a3847750f486bd931de26c6e683b1dc4',
secret_key='81a62cea53883f4a163a96355d47656e',
url_base="localhost")
)

sdk.job_run(dag_id="train_model_with_sdk", request_body={})