SDK Overview and Usage
SDK Usage Introduction
The GraphGrid Python SDK is a python-based software development kit that can be used to programmatically interact with GraphGrid services. Currently its primary purpose is to provide a flexible way to train NLP models on a variety of tasks.
This page contains usage reference and examples for the GraphGrid SDK.
The first goes over using a local SDK python script to train an individual model, while the second example goes over setting up and using a custom job (DAG) to automate model training.
Details about specific SDK methods, their parameters, and response types can be found in the SDK Reference documentation.
The code for our examples can be pulled from our GraphGrid SDK Examples repository.
Start GraphGrid AI Edition
Before moving on to the SDK NLP Model Training or SDK Custom Job sections be sure to start running a local CDP deployment.
Download the CDP ai-edition version 2.0 from graphgrid.com/cdp-downloads and visit https://docs.graphgrid.com/2.0/#/ for more information about CDP.
Your CDP deployment needs to be running in order to properly train NLP models.
SDK NLP Model Training Example and Usage
In this example we use SDK code to train a named entity recognition (or NER) model. You'll need to be running an ai-edition CDP deployment before running this python script locally.
Below is the entire python script for training a model. In the next sections we break up this script explaining each line.
import time
from graphgrid_sdk.ggcore.config import SdkBootstrapConfig
from graphgrid_sdk.ggcore.sdk_messages import NMTStatusResponse, \
NMTTrainResponse, SaveDatasetResponse, PromoteModelResponse
from graphgrid_sdk.ggcore.training_request_body import TrainRequestBody
from graphgrid_sdk.ggsdk.sdk import GraphGridSdk
# Stream dataset in
def read_by_line():
infile = open(
"dataset_example.jsonl",
'r', encoding='utf8')
for line in infile:
yield line.encode()
# Setup bootstrap config
bootstrap_conf = SdkBootstrapConfig(
access_key='a3847750f486bd931de26c6e683b1dc4',
secret_key='81a62cea53883f4a163a96355d47656e',
url_base='localhost',
is_docker_context=False)
# Initialize the SDK
sdk = GraphGridSdk(bootstrap_conf)
# Save training dataset (streamed)
dataset_response: SaveDatasetResponse = sdk.save_dataset(read_by_line(),
"sample-dataset",
overwrite=True)
# Train a new model
training_request_body: TrainRequestBody = TrainRequestBody(model="named_entity_recognition",
dataset_id="sample-dataset.jsonl",
no_cache=False, gpu=False)
train_response: NMTTrainResponse = sdk.nmt_train(training_request_body)
# Track training status
nmt_status: NMTStatusResponse = sdk.nmt_status(train_response.dagRunId)
while nmt_status.state != "success" and nmt_status.state != "failed":
print("...running dag...")
time.sleep(10)
nmt_status: NMTStatusResponse = sdk.nmt_status(train_response.dagRunId)
if nmt_status.state == "failed":
raise Exception("Dag failed: ", nmt_status.exception)
# Training has finished
print("Dag training/eval/model upload has finished.")
# Promote updated model
promote_model_response: PromoteModelResponse = \
sdk.promote_model(nmt_status.savedModelName, "named_entity_recognition")
# Promotion is complete
print("Model has been promoted.")
Setup the SDK object
The first step in training a new model is setting up the GraphGridSdk
python object.
# Setup bootstrap config
bootstrap_conf = SdkBootstrapConfig(
access_key='a3847750f486bd931de26c6e683b1dc4',
secret_key='81a62cea53883f4a163a96355d47656e',
url_base='localhost',
is_docker_context=False)
# Initialize the SDK
sdk = GraphGridSdk(bootstrap_conf)
You create a SdkBootstrapConfig
object that provides the basic configuration the SDK needs.
This example uses the default access_key
and secret_key
associated with CDP.
Then you can initialize your GraphGridSdk
object with that configuration.
Reading in and Saving your Dataset
The next step in training a new model is getting your data into the necessary location to support training. For CDP to access your training data it must be stored under the right bucket in MinIO, and while you could place it there manually it is easier and possibly more efficient to stream it in through the SDK directly.
# Stream dataset in
def read_by_line():
infile = open(
"dataset_example.jsonl",
'r', encoding='utf8')
for line in infile:
yield line.encode()
# Save training dataset (streamed)
dataset_response: SaveDatasetResponse = sdk.save_dataset(read_by_line(),
"sample-dataset",
overwrite=True)
We set up a read_by_line
function that yields individual dataset lines.
Each line in our dataset_example.jsonl
is a training sample, which are described in the SDK Dataset Structure documentation.
We use our save_dataset
method to stream in our dataset and save it under the name sample-dataset
.
Lastly we specify overwrite=True
to ensure that any previous sample-dataset
already present is overwritten.
You can check the SaveDatasetResponse
object to confirm a 200 was returned and your dataset was properly saved to backend storage.
Kick off model training
With your dataset saved you are ready to train a new named_entity_recognition
(or NER
) model!
To kick off NLP model training we use the SDK method nmt_train
with a crafted TrainRequestBody
.
# Train a new model
training_request_body: TrainRequestBody = TrainRequestBody(model="named_entity_recognition",
dataset_id="sample-dataset.jsonl",
no_cache=False, gpu=False)
train_response: NMTTrainResponse = sdk.nmt_train(training_request_body)
Calling the nmt_train
SDK method kicks off the predefined nlp-model-training
job.
This training request body is for training a named_entity_recognition
model using the sample-dataset.json
you saved in the previous step.
Depending on your local setup you may be able to use a GPU with gpu=True
or may have to disable caching with no_cache=True
.
In this example we have caching enabled and are training using a CPU rather than a GPU.
Track your model training
Training can be programmatically monitored through use of the nmt_status
method.
The dagRunId
needed to call nmt_status
is retrieved from the NMTTrainResponse
returned by nmt_train
.
# Track training status
nmt_status: NMTStatusResponse = sdk.nmt_status(train_response.dagRunId)
while nmt_status.state != "success" and nmt_status.state != "failed":
print("...running dag...")
time.sleep(10)
nmt_status: NMTStatusResponse = sdk.nmt_status(train_response.dagRunId)
if nmt_status.state == "failed":
raise Exception("Dag failed: ", nmt_status.exception)
This shows off some simple logic for tracking the job status and waiting until the job reaches a success
or failed
state.
Compare models and promote
Once training has completed and the job reaches a success
state the NMTStatusResponse
returns some metadata and metrics about your trained model.
This response includes information about the training/eval accuracy and loss, information you can use to determine if you want to promote your newly trained model.
### LOGIC CHECKING TRAINING ACC CODE HERE
# Promote updated model
promote_model_response: PromoteModelResponse = \
sdk.promote_model(nmt_status.savedModelName, "named_entity_recognition")
Once you have verified the newly trained model performs better than the currently loaded model you can use the SDK promote_model
method to
load it into CDP's NLP module for use.
This SDK method loads in the new model trained and starts using it immediately.
SDK Custom Job Reference
This section goes over the parts of writing, creating, and deploying a custom job. A custom job is a flexible framework that can be used for various operations, such as training NLP models and ingesting data.
Please see the GraphGrid SDK Samples GitHub page to download a working custom job example to follow along.
Install the GraphGrid SDK and provider package
To use the GraphGrid SDK the user needs to add it as a dependency to their python environment. The GraphGrid SDK is publicly hosted on PyPi and can be downloaded through pip:
pip install graphgrid-sdk
You'll also need the graphgrid-provider-package
for running a custom job.
pip install airflow-provider-graphgrid
Setup your directory
The setup process includes multiple files, to download the code discussed here please see GraphGrid SDK Samples GitHub.
These files include
File | Description |
---|---|
example_dag.py | The example DAG custom job that uses the GraphGridDockerOperator |
graphgrid_python_sdk_example.dockerfile | Dockerfile used to build the graphgrid_python_examples docker image |
sdk_calls.py | File executed in the docker image |
requirements.txt | Requirements file for sdk_calls. |
dataset_example.jsonl | Example dataset file. |
Write a Job in python
Now we write our own custom job in python. This is really creating our own custom python DAG. We use the GraphGridDockerOperator as the base of our DAG.
This is the provided example_dag.py
file.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from docker import APIClient
from graphgrid_provider.operators.graphgrid_docker import \
GraphGridDockerOperator, GraphGridMount
DOCKER_URL = "tcp://socat:2375"
SOURCE = "{{ ti.xcom_pull(task_ids='create_volume') }}"
dataset_filepath = 'dataset_example.jsonl'
filename = 'sample_dataset'
models_to_train = '["NAMED_ENTITY_RECOGNITION", "PART_OF_SPEECH_TAGGING"]'
def read_by_line():
infile = open(
dataset_filepath,
'r', encoding='utf8')
for line in infile:
yield line.encode()
default_args = {
'owner': 'GraphGrid',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'start_date': datetime(2015, 6, 1),
}
dag = DAG('train_model_with_sdk',
default_args=default_args,
description='train a model using sdk calls',
schedule_interval="@daily",
catchup=False,
user_defined_macros=dict()
)
def create_volume() -> str:
"""Create volume to propagate data between tasks
:return: The name of the created persistent volume claim
"""
client = APIClient(base_url=DOCKER_URL)
volume = client.create_volume()
return volume.get("Name")
def delete_volume(claim_name: str) -> None:
"""
Delete the volume that propagated data between tasks
:param claim_name: The name of the claim to delete
"""
client = APIClient(base_url=DOCKER_URL)
client.remove_volume(name=claim_name)
t_create_volume = PythonOperator(python_callable=create_volume,
task_id='create_volume', dag=dag)
t_1 = GraphGridDockerOperator(task_id='save_dataset',
dag=dag,
mounts=[GraphGridMount(target="/volumes/",
source=SOURCE)],
image="graphgrid-sdk-python-examples",
command=["save_dataset",
"--dataset_filepath", dataset_filepath,
"--filename", filename],
auto_remove=True,
)
t_2 = GraphGridDockerOperator(task_id='train_and_promote',
dag=dag,
mounts=[GraphGridMount(target="/volumes/",
source=SOURCE)],
image="graphgrid-sdk-python-examples",
command=["train_and_promote",
"--models_to_train", models_to_train,
"--dataset_id",
"{{ ti.xcom_pull(task_ids='save_dataset') }}",
"--no_cache", 'false',
"--gpu", 'false',
"--autopromote", 'true'],
auto_remove=True,
)
t_delete_volume = PythonOperator(python_callable=delete_volume,
task_id='delete_volume',
dag=dag, op_kwargs={"claim_name": SOURCE},
trigger_rule="all_done")
t_1.set_upstream(t_create_volume)
t_2.set_upstream(t_1)
t_delete_volume.set_upstream(t_2)
Note that the GraphGridDockerOperators are calling the methods provided by sdk_calls.py
, which serves as an executable that can call the SDK.
SDK Calls Python File and Sample Dataset
SDK Calls
We use the sdk_calls.py
to define two methods, save_dataset
and train_and_promote
.
This python is executed by the DAG and kicks off things like saving the dataset, training, and promoting that are all done
through SDK calls.
These methods setup a GraphGridSdk object and make corresponding calls the the GraphGrid SDK, which can be read about in the SDK Method Reference.
We leverage Fire, which is a Python library that converts any python component into a command line interface (CLI). This
works with the dockerfile
entrypoint and the
example_dag.py
command fields.
import typing
import fire
from graphgrid_sdk.ggcore.sdk_messages import SaveDatasetResponse
from graphgrid_sdk.ggcore.utils import NlpModel
from graphgrid_sdk.ggsdk.sdk import GraphGridSdk
class Pipeline:
def save_dataset(self, dataset_filepath: str, filename: str):
sdk = GraphGridSdk()
def read_by_line(dataset_filepath):
infile = open(
dataset_filepath,
'r', encoding='utf8')
for line in infile:
yield line.encode()
yield_function = read_by_line(dataset_filepath)
dataset_response: SaveDatasetResponse = sdk.save_dataset(yield_function,
filename)
if dataset_response.status_code != 200:
raise Exception("Failed to save dataset: ",
dataset_response.exception)
return dataset_response.dataset_id
def train_and_promote(self, models_to_train: list,
dataset_id: str,
no_cache: typing.Optional[bool],
gpu: typing.Optional[bool], autopromote: bool):
sdk = GraphGridSdk()
def success_handler(args):
return
def failed_handler(args):
return
models_to_train = [getattr(NlpModel, model) for model in models_to_train]
sdk.nmt_train_pipeline(models_to_train, dataset_id, no_cache, gpu,
autopromote, success_handler, failed_handler)
def main():
fire.Fire(Pipeline)
if __name__ == '__main__':
main()
Sample Dataset
In the GitHub project we provide a sample dataset dataset_example.jsonl
to use for training.
There are only 22 samples to use for training so the models trained with this dataset won't be performant. This file is for the example project, and users will need to provide their own dataset. Please see the page on the SDK Dataset.
Set up a dockerfile
You'll need to create an image that airflow can use, to do this we use a dockerfile.
The following docker image provides all the basic functionality in order to convert our python script into a complete, continuously scheduled, custom job.
FROM python:3.8-slim
RUN apt update && apt -y install && apt -y upgrade \
bash
RUN pip install --upgrade pip
RUN apt-get -y install git
WORKDIR /graphgrid-sdk-python-examples
COPY sdk_calls.py sdk_calls.py
COPY requirements.txt requirements.txt
COPY dataset_example.jsonl dataset_example.jsonl
RUN python3 -m pip install -r requirements.txt
ENTRYPOINT ["python3", "sdk_calls.py"]
CMD ["--help"]
This dockerfile sets up a python image that installs pip and copies over the sdk_calls.py
, requirements.txt
, and dataset_example.json
files.
It then sets the sdk_calls.py
as the entrypoint for the image (this is how the GraphGridDockerOperators we setup in the example_dag.py
make their calls).
Build Job docker image
We need to build a docker image based on our dockerfile. This image is what Airflow uses when the DAG is triggered.
docker build -t sdk-graphgrid-sdk-python-examples -f graphgrid-sdk-python-example.dockerfile .
Upload your DAG
With our DAG image built we now need to upload it to Airflow.
From within your CDP directory run the graphgrid command:
./bin/graphgrid airflow upload <PATH/TO/example_dag.py>
Be sure to replace <PATH/TO/example_dag.py>
with the actual path to your DAG python file.
Airflow may take up to 1 minute to add new DAGs to the Webserver UI.
Kick off your Job
We can start training NLP models! We'll show off two ways to trigger our DAG, either directly on the Airflow Webserver UI in the browser or with the GraphGrid SDK.
Trigger your Job through the Airflow Webserver
You can also trigger a job by going to your airflow webserver browser (CDP defaults this to localhost:8081
and signing in with username/password airflow
.
From the home screen you should see your custom DAG and the nlp-model-training
DAG.
You can manually trigger your custom DAG by hitting the arrow under the Actions
column.
Trigger your Job through the SDK
You can also trigger a job directly through the GraphGrid SDK.
sdk = GraphGridSdk(SdkBootstrapConfig(
access_key='a3847750f486bd931de26c6e683b1dc4',
secret_key='81a62cea53883f4a163a96355d47656e',
url_base="localhost")
)
sdk.job_run(dag_id="train_model_with_sdk", request_body={})