Skip to main content
Version: 2.0

GraphGrid Training from the Graph via Airflow and NLP-model-training

Overview

  1. Set up a basic python environment
    • create a venv
    • activate environment
    • install dependencies
  2. Develop a custom ETL pipeline
    • create a transformation script using python to read and transform graph data to the Wikigold format
  3. Train with the newly transformed data
    • trigger the DAG with JSON configuration
    • kick off training session for the NLP-model-training DAG to train a new NER model
  4. Convert the transformation script into a scheduled DAG that extracts from ONgDB hourly
    • create the ETL pipeline docker image
    • create the DAG and update the script within the project directory
    • upload the DAG for use
    • use the newly trained model with the NLP module

Python Library Requirements

It's recommended to use python>=3.7. This tutorial will leverage the following python dependencies:

apache-airflow==2.2.2
apache-airflow-providers-docker==2.3.0
neo4j==1.7.6
neobolt==1.7.17
neotime==1.7.4
pytz==2021.1
six==1.15.0
boto3==1.17.27
fire==0.4.0

Setup environment

For the purposes of this tutorial we will leverage the local CDP directory, however, any directory can be used without effecting the tutorial. The directory in which we will house all the resources and assets of this tutorial is ${var.misc.graphgrid_data}/airflow_tutorial. Therefore, we can start out by creating that directory by running the following command in a terminal while inside the local CDP directory:

export $(xargs <.env)
mkdir ${var.misc.graphgrid_data}/airflow_tutorial

Virtual environments (or venvs) provide dependency and library isolation, and their documentation can be found here.

In order to create a virtual environment, the following command can be run in a terminal.

python3 -m venv ${var.misc.graphgrid_data}/airflow_tutorial/venv

Common errors include:

The virtual environment was not created successfully because ensurepip is not
available. On Debian/Ubuntu systems, you need to install the python3-venv
package using the following command.

apt install python3.8-venv

You may need to use `sudo` with that command. After installing the python3-venv
package, recreate your virtual environment.

which, as prompted, can be remedied with apt install python3.8-venv.

note

sudo may need to be used for the above commands.

Then, in order to use the virtual environment, we must activate it, which depends on the platform and shell. See the following table for a list of ways to activate the venv.

PlatformShellCommand to activate virtual environment
POSIXbash/zsh$ source ${var.misc.graphgrid_data}/airflow_tutorial/venv/bin/activate
fish$ source ${var.misc.graphgrid_data}/airflow_tutorial/venv/bin/activate
csh/tcsh$ source ${var.misc.graphgrid_data}/airflow_tutorial/venv/bin/activate.csh
PowerShell Core$ ${var.misc.graphgrid_data}/airflow_tutorial/venv/bin/Activate.ps1
Windowscmd.exeC:\> ${var.misc.graphgrid_data}\airflow_tutorial\venv\Scripts\activate.bat
PowerShellPS C:\> ${var.misc.graphgrid_data}\airflow_tutorial\venv\Scripts\Activate.ps1

With the newly activated virtual environment, we now need to install our dependencies. In a terminal, execute:

python3 -m pip install apache-airflow==2.2.2 apache-airflow-providers-docker==2.3.0 neo4j==1.7.6 neobolt==1.7.17 neotime==1.7.4 pytz==2021.1 six==1.15.0 boto3==1.17.27 fire==0.4.0
note

sudo may need to be used for the above command.

This should display output similar to:

...
Collecting pytz==2021.1
Using cached pytz-2021.1-py2.py3-none-any.whl (510 kB)
Collecting six==1.15.0
Using cached six-1.15.0-py2.py3-none-any.whl (10 kB)
Collecting boto3==1.17.27
Using cached boto3-1.17.27-py2.py3-none-any.whl (130 kB)
Collecting jmespath<1.0.0,>=0.7.1
Using cached jmespath-0.10.0-py2.py3-none-any.whl (24 kB)
Collecting s3transfer<0.4.0,>=0.3.0
Using cached s3transfer-0.3.7-py2.py3-none-any.whl (73 kB)
Collecting botocore<1.21.0,>=1.20.27
Using cached botocore-1.20.112-py2.py3-none-any.whl (7.7 MB)
Collecting python-dateutil<3.0.0,>=2.1
Using cached python_dateutil-2.8.2-py2.py3-none-any.whl (247 kB)
Collecting urllib3<1.27,>=1.25.4
Using cached urllib3-1.26.7-py2.py3-none-any.whl (138 kB)
Installing collected packages: six, pytz, neotime, neobolt, neo4j, jmespath, python-dateutil, urllib3, botocore, s3transfer, boto3
Successfully installed boto3-1.17.27 botocore-1.20.112 jmespath-0.10.0 neo4j-1.7.6 neobolt-1.7.17 neotime-1.7.4 python-dateutil-2.8.2 pytz-2021.1 s3transfer-0.3.7 six-1.15.0 urllib3-1.26.7

Intro

Airflow and NLP-model-training can be employed to kick off training pipelines and DAGs from any data source. However, data must be transformed into one of the supported formats in order to be trainable within our pipelines. For a complete list of all the supported dataset formats, please refer to the NLP-model-training documentation here.

For the purposes of this tutorial we will leverage data stored within ONgDB in order to train a new model. Our data takes the form of Sentence nodes which are connected to Mention nodes through the HAS_MENTION relationship. Sentence nodes contain the property sentence, while Mention nodes contain the properties value for the actual mention, and ne which lists the mention's type of named-entity. Below are examples of these nodes, which includes a movie review sentence for the movie (Mention) Blindspotting.

{
"identity": 271,
"labels": [
"GraphGridResource",
"Resource",
"Sentence"
],
"properties": {
"sentence": "A work of love from the actors and writers, Blindspotting was a personal and heartfelt story about a gentrifying town they lived in. ",
"createdAt": "2021-12-17T22:22:50+00:00",
"grn": "grn:gg:sentence:G2N3TVedJzTx0qTYtMudNT4zzMzThWan3Cjqx8SaXoSb",
"lastSearchIndexedAt": "0",
"sentenceNumber": 60,
"updatedAt": "2021-12-17T22:22:50+00:00"
}
}
{
"identity": 77,
"labels": [
"GraphGridResource",
"Mention",
"NER_Miscellaneous",
"Resource"
],
"properties": {
"mentionId": "blindspotting_en",
"multiplicity": 1,
"createdAt": "2021-12-17T22:22:50+00:00",
"grn": "grn:gg:mention:e6oc2Jzn35oGzTsNcAAJGAz8dLRQuIc3wKgAi9XbMNZO",
"lastSearchIndexedAt": "0",
"pos": [
"VBG"
],
"ne": [
"MISCELLANEOUS"
],
"language": "en",
"value": "Blindspotting",
"updatedAt": "2021-12-17T22:22:50+00:00"
}
}

Diagram

For this tutorial, we will use these nodes in order to train a named_entity_recognition (NER) model. The first step for using this custom data in a new training session is to determine which dataset/corpora format to convert our data into. The Wikigold dataset has a simple formatting scheme wherein each line contains a word and its associated named-entity tag/label. The simplicity in its formatting scheme makes it a good choice.

Therefore, using python, we can read in data from the graph, transform it into the Wikigold format, then save it out to MinIO for use with Airflow.

Sample Data

Copy and paste the contents of this file to add text nodes to your graph!

Creating a Transformation Script with Python

We'll be creating a transformation script with python in order to read and transform our graph data into the Wikigold format. The following sections break down the transformation script that we'll be saving as a file within the package directory.

Querying ONgDB with Neo4j Python Driver

Using the Neo4j Python Driver, we can connect to ONgDB in order to query and retrieve data stored in the graph to train upon it.

import neo4j

def run_ongdb_query():
get_sent_mention_gql = 'MATCH (sentence:Sentence)-[hm:HAS_MENTION]->(mention:Mention) RETURN sentence, mention'

with neo4j.GraphDatabase.driver('bolt://localhost:7687', auth=('ongdb', 'admin')) as local_driver, local_driver.session() as local_session:

results: neo4j.BoltStatementResult = local_session.run(
get_sent_mention_gql)
nodes = results.data('sentence', 'mention')
return nodes

First, we have to craft a Geequel query that retrieves the data from the graph. Using neo4j.GraphDatabase.driver we can then connect to a ONgDB database with the default credentials.

After creating the local session, we can run our query to get our training data.

note

It appears all pre neo4j python driver 1.7 documentation has been taken down from their site.

Transforming graph data into Wikigold formatted data

After retrieving data from the graph, we can now iterate through each sentence and transform it into the expected Wikigold format.

def transform_data(nodes):
training_file = ""
for datapoint in nodes:
sentence: str = datapoint.get('sentence').get('sentence')
mention: str = datapoint.get('mention').get('value')
mention_label: str = datapoint.get('mention').get('ne')[0]
split_sentence: list = [
f"{word} O\n" if word != mention else f"{mention} {convert_labels(mention_label)}\n"
for word in sentence.replace("\n", "").strip().split(" ")]
split_sentence += [". O\n", "\n"]
training_file += "".join(split_sentence)
return training_file

We transform our data via a simple list comprehension where we sanitize the sentence, split it based on spaces to separate individual words, then construct the proper WORD WORD_LABEL line syntax.

Converting Native Labels to Wikigold Formatted Labels

Our data differs in the labeling schema from that of Wikigold. Therefore, we have to define a simple map that transforms our data's labeling scheme into the Wikigold formatting scheme.

LABEL_MAP = {
"ORGANIZATION": "I-ORG",
"LOCATION": "I-LOC",
"PERSON": "I-PER",
"MISCELLANEOUS": "I-MISC"
}


def convert_labels(label: str):
return LABEL_MAP[label]

Saving to MinIO

After transforming our data into the expected format, we now need to save it out to MinIO in order for it to be usable for training with Airflow and NLP-model-training. Therefore, using the boto3 library, we can encode our data and save it out to the com-graphgrid-datasets bucket as graph_training_data.txt.

import boto3
from botocore.config import Config

def save_to_minio(training_file: str):
resource = boto3.resource('s3', aws_access_key_id="minio",
aws_secret_access_key="minio123",
endpoint_url="http://localhost:9000",
config=Config(signature_version='s3v4'))

object = resource.Object("com-graphgrid-datasets", "graph_training_data.txt")

object.put(Body=(bytes(training_file.encode('UTF-8'))))

The complete extraction file

We can save this script at ${var.misc.graphgrid_data}/airflow_tutorial/script.py. Then in order to run and execute our script we can run the following command in a terminal:

python3 ${var.misc.graphgrid_data}/airflow_tutorial/script.py
import neo4j
import boto3
from botocore.config import Config


LABEL_MAP = {
"ORGANIZATION": "I-ORG",
"LOCATION": "I-LOC",
"PERSON": "I-PER",
"MISCELLANEOUS": "I-MISC"
}


def convert_labels(label: str):
return LABEL_MAP[label]


def run_ongdb_query():
get_sent_mention_gql = 'MATCH (sentence:Sentence)-[hm:HAS_MENTION]->(mention:Mention) RETURN sentence, mention'

with neo4j.GraphDatabase.driver('bolt://localhost:7687', auth=('ongdb', 'admin')) as local_driver, local_driver.session() as local_session:

results: neo4j.BoltStatementResult = local_session.run(
get_sent_mention_gql)
nodes = results.data('sentence', 'mention')
return nodes


def transform_data(nodes):
training_file = ""
for datapoint in nodes:
sentence: str = datapoint.get('sentence').get('sentence')
mention: str = datapoint.get('mention').get('value')
mention_label: str = datapoint.get('mention').get('ne')[0]
split_sentence: list = [
f"{word} O\n" if word != mention else f"{mention} {convert_labels(mention_label)}\n"
for word in sentence.replace("\n", " ").strip().split(" ")]
split_sentence += [". O\n", "\n"]
training_file += "".join(split_sentence)
return training_file


def save_to_minio(training_file: str):
resource = boto3.resource('s3', aws_access_key_id="minio",
aws_secret_access_key="minio123",
endpoint_url="http://localhost:9000",
config=Config(signature_version='s3v4'))

object = resource.Object("com-graphgrid-datasets", "graph_training_data.txt")

object.put(Body=(bytes(training_file.encode('UTF-8'))))


def main():
nodes = run_ongdb_query()
training_file = transform_data(nodes)
save_to_minio(training_file)


if __name__ == '__main__':
main()

We can inspect our newly generated training file within a terminal with the AWS CLI with the following command:

AWS_ACCESS_KEY_ID=minio AWS_SECRET_ACCESS_KEY=minio123 aws --endpoint-url http://${var.api.shellBase}:9000 s3 cp s3://com-graphgrid-datasets/graph_training_data.txt -
note

This command uses the default minio credentials, therefore, you must substitute them if you are not using the default credentials.

It is also possible to inspect the file contents without the AWS CLI by going to ${var.api.shellBase}:9000. If using the default credentials, you can log in using:

Default username: minio Default password: minio123

otherwise, use the randomly generated credentials. From the homepage, select the bucket com-graphgrid-datasets, which contains datasets and other data which can be used to train NLP models.

Screenshot

From here, we can select the graph_training_data.txt file and click the Download Object UI element at the top right. The file can then be opened and inspected using whichever preferred method.

Screenshot

This should yield output similar to (albeit not truncated):

Becoming O
The O
Fighting O
Frenchman O
While O
Daveed O
Diggs O
was O
becoming O
a O
prolific O
rapper O
in O
niche O
circles, O
it O
wasn't O
until O
he O
performed O
in O
Hamilton I-PER
that O
he O
became O
a O
very O
big O
deal. O
. O

With O
the O
incredible O
success O
of O
Hamilton I-PER
both O
on O
and O
Off-Broadway, O
as O
well O
as O
touring O
around O
the O
country, O
including O
Puerto O
Rico, O
Lin-Manuel O
Miranda O
has O
created O
one O
of O
the O
most O
beloved O
musicals O
of O
the O
2010s. O
. O

We can see how our data is now in the expected Wikigold format.

Training with the new data

Now that our data is saved to MinIO, we can open up the Airflow Webserver in order to kick off a training session at API_BASE:8081. In order to access the nlp_model_training DAG, you will need to log in with

Default username: airflow Default password: airflow

Screenshot

Following the login screen, you will be greeted with the following homescreen.

Screenshot

From here, you can select the nlp_model_training DAG under the DAGs list. This will present you with the entire DAG and show all the tasks that comprise it. In order to kick off a new training session for a named_entity_recognition model, we need to first craft a request body containing all of the relevant information.

{
"model": "named_entity_recognition",
"datasets": {"wikigold":{"train":"graph_training_data.txt","eval":"None"}}
}

The datasets field within the request body defines where Airflow and NLP-model-training will attempt to download our training file. This field is a string encoded json object. The pertitent field within it being train which we specify as graph_training_data.txt, as we saved our training file as graph_training_data.txt at the top level of the bucket. Had we saved the file within sub-folders, we would've specified the train field as some/path/with/folders/to/graph_training_data.txt.

note

Depending on your local environment's configuration, "gpu": true and "no_cache": true can also be added to the request body in order to enable GPU acceleration and no caching. However, refer to the Airflow reference documentation for limitations on these fields.

We can now use this request body by selecting the Play Icon UI element located at the top right side of the page. Once clicked, you will want to select Trigger DAG w/ config in order to use our request body.

Screenshot

We can now paste our request body into the Configuration JSON input field and click Trigger to kick off training.

Screenshot

This will initiate a training session for our named_entity_recognition model with our data from the graph.

Screenshot

Converting our transformation script to a DAG to continually ingest

We can take this a step further and convert our script to a DAG, rather than having our script exist statically as something we manually have to run in order to train new models upon our graph data. This tutorial sets up a basic ETL pipeline to extract data from ONgDB, transform it into the Wikigold format, and save it to MinIO in order to be loaded and trained upon by our named_entity_recongition model. The following sections break down each part of the process of converting our script into a DAG.

Switching towards a docker-encapsulated execution paradigm

The simplest development paradigm for custom DAGs is creating a docker image which includes callable modules and/or functions that will be invoked as tasks within the Airflow DAG. This avoids any dependency alterations, inclusions, or issues on the running Airflow instances as docker allows for complete dependency packaging and isolation.

Creating the ETL pipeline docker image

The following docker image provides all the basic functionality in order to convert our python script into a full-fledged continuously scheduled Airflow DAG, which we can save at ${var.misc.graphgrid_data}/airflow_tutorial/continuous_ingest.dockerfile.

FROM python:3.7-slim

RUN apt update && apt -y install && apt -y upgrade \
bash
RUN pip install --upgrade pip

WORKDIR /airflow_tutorial
COPY script.py script.py
COPY requirements.txt requirements.txt

RUN python3 -m pip install -r requirements.txt

ENTRYPOINT ["python3", "script.py"]
CMD ["--help"]

This image emulates much of the work we did in the first part of this tutorial wherein we set up our local environment in order to develop our ETL pipeline. However, this image is still missing some key components. First, we're missing a requirements.txt, which defines all the python libraries needed to run our ETL pipeline. Therefore, we can create a requirements.txt file at ${var.misc.graphgrid_data}/airflow_tutorial/requirements.txt with the following contents:

apache-airflow==2.2.2
apache-airflow-providers-docker==2.3.0
neo4j==1.7.6
neobolt==1.7.17
neotime==1.7.4
pytz==2021.1
six==1.15.0
boto3==1.17.27
fire==0.4.0

Before we can build this image we need to create our actual Airflow DAG as well as modify script.py in order to facilitate its employment with Airflow as a DAG.

Creating the DAG

We first need to create our tangible DAG object, which houses all of our tasks in order to get a continually executing ETL pipeline operational. These are the parameters we will use in order to get our DAG functional with their description from the Airflow Documentation.

  • dag_id: The id of the DAG; must consist exclusively of alphanumeric characters, dashes, dots and underscores (all ASCII)
  • default_args: A dictionary of default parameters to be used as constructor keyword parameters when initialising operators. Note that operators have the same hook, and precede those defined here, meaning that if your dict contains 'depends_on_past': True here and 'depends_on_past': False in the operator's call default_args, the actual value will be False.
  • start_date: The timestamp from which the scheduler will attempt to backfill
  • schedule_interval: Defines how often that DAG runs, this timedelta object gets added to your latest task instance's execution_date to figure out the next schedule
  • catchup: Perform scheduler catchup (or only run latest)? Defaults to True
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.docker.operators.docker import DockerOperator


default_args = {
'owner': 'GraphGrid',
}


dag = DAG(
dag_id='continuous_ingest',
default_args=default_args,
start_date=days_ago(1),
catchup=False,
schedule_interval="@hourly",
)

We set our schedule_interval as @hourly in order to have our DAG re-create the training samples each hour. This emulates the scenario wherein data is constantly changing or being ingested and models need to be retrained to reflect that.

Converting our python script to Airflow tasks

As we're leveraging Docker encapsulation, all of our tasks will take on the following basic outline:

tX = DockerOperator(
task_id="<method_name>",
dag=dag,
retries=0,
image="airflow-tutorial",
auto_remove=True,
command=["<method_name>", "[<method_args>]"],
network_mode="graphgrid",
do_xcom_push=True,
docker_url="tcp://socat:2375",
)

The DockerOperator allows us to spin up our airflow-tutorial docker image to run one of the predefined ETL pipeline methods. To highlight some basic DockerOperator features: auto_remove cleans up the docker container after execution, command allows us to specify which method we want to run within script.py, network_mode is defined as graphgrid as that is the network which CDP creates and runs on, and docker_url defines the Docker daemon's URL which is served at tcp://socat:2375. For more information regarding the DockerOperator visit the official Airflow documentation here.

Therefore, we define each task within our DAG in the following way:

t0 = DockerOperator(
task_id="run_ongdb_query",
dag=dag,
retries=0,
image="airflow-tutorial",
auto_remove=True,
command=["run_ongdb_query"],
network_mode="graphgrid",
do_xcom_push=True,
docker_url="tcp://socat:2375",
)


t1 = DockerOperator(
task_id="transform_data",
dag=dag,
retries=0,
image="airflow-tutorial",
auto_remove=True,
command=["transform_data",
"--nodes", "{{ ti.xcom_pull(task_ids='run_ongdb_query') }}"],
network_mode="graphgrid",
do_xcom_push=True,
docker_url="tcp://socat:2375",
)


t2 = DockerOperator(
task_id="save_to_minio",
dag=dag,
retries=0,
image="airflow-tutorial",
auto_remove=True,
command=["save_to_minio",
"--training_file", "{{ ti.xcom_pull(task_ids='transform_data') }}"],
network_mode="graphgrid",
do_xcom_push=False,
docker_url="tcp://socat:2375",
)

A couple highlights to note are: XCOM usage which allows us to easily propagate data between our tasks. Our transformation task pulls the output from its upstream extraction task while our load task pulls the output from its upstream transformation task. The XCOMs are defined through jinja templates, which allows Airflow to pass dynamic information into task instances. Additionally, we disable do_xcom_push as our save_to_mino task does not have any downstream tasks.

Defining DAG task execution order

The final step is to define upstream and downstream tasks in order for our tasks to execute in their logical order. Meaning, have the query run first, followed by the transformation step, and finished with our save step.

t1.set_upstream(t0)
t2.set_upstream(t1)

The complete DAG

Combining all of these steps together, we arrive at our python module containing our completed continous ingestion DAG. We can save this under ${var.misc.graphgrid_data}/airflow_tutorial/dag.py.

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.docker.operators.docker import DockerOperator


default_args = {
'owner': 'GraphGrid',
}


dag = DAG(
dag_id='continuous_ingest',
default_args=default_args,
start_date=days_ago(1),
catchup=False,
schedule_interval="@hourly",
)


t0 = DockerOperator(
task_id="run_ongdb_query",
dag=dag,
retries=0,
image="airflow-tutorial",
auto_remove=True,
command=["run_ongdb_query"],
network_mode="graphgrid",
do_xcom_push=True,
docker_url="tcp://socat:2375",
)


t1 = DockerOperator(
task_id="transform_data",
dag=dag,
retries=0,
image="airflow-tutorial",
auto_remove=True,
command=["transform_data",
"--nodes", "{{ ti.xcom_pull(task_ids='run_ongdb_query') }}"],
network_mode="graphgrid",
do_xcom_push=True,
docker_url="tcp://socat:2375",
)


t2 = DockerOperator(
task_id="save_to_minio",
dag=dag,
retries=0,
image="airflow-tutorial",
auto_remove=True,
command=["save_to_minio",
"--training_file", "{{ ti.xcom_pull(task_ids='transform_data') }}"],
network_mode="graphgrid",
do_xcom_push=False,
docker_url="tcp://socat:2375",
)

t1.set_upstream(t0)
t2.set_upstream(t1)

Modifying script.py

The first alteration needed in order to convert our script into an Airflow DAG is having the ability for each ETL task to be easily callable by Airflow. For that we leverage fire, which is a Python library that converts any python component into a command line interface (CLI). As our previous sentence implies, fire "wraps" a python component, therefore, we'll create a Pipeline class which will house all of our individual ETL pipeline tasks. See the pseudocode below:

class Pipeline:
def run_ongdb_query(self):
...

def transform_data(self, nodes):
...

def save_to_minio(self, training_file):
...

def main():
fire.Fire(Pipeline)

This scheme allows us to execute our individual methods in the following manner:

python3 script.py save_to_minio --training_file "Blindspotting I-MISC\n..."

The next modification required is altering the run_ongdb_query task in order to make the returned nodes serializable, as per the docs, XCOMs "can have any (serializable) value". This is important because the neo4j python driver's query returns Nodes which are not serializable objects. For this we'll define the method extract_node_properties, which will iterate through our query results, extract each node's properties, and store them in a serializable list of dicts.

def extract_node_properties(data: List[Dict]) -> List[Dict]:
nodes = []
for datapoint in data:
intermediary_dict = {}
for key, node in datapoint.items():
intermediary_dict[key] = node._properties
nodes.append(intermediary_dict)

return nodes

The final alteration is changing the return of each method within our ETL pipeline to instead be a simple print statement. This is in order to propagate our data between each task, as the Airflow BaseOperator's do_xcom_push in combination with the DockerOperator's xcom_all push the last line of the container's stdout as an XCOM for downstream tasks to use. This means that the last line of output from our container will be the query results, or the transformed query results depending on the task. See the following pseudocode below:

class Pipeline:
def run_ongdb_query(self):
...
print(self.extract_node_properties(nodes), end='')

def transform_data(self, nodes):
...
print(training_file)

def save_to_minio(self, training_file):
...

All of these culminate in the final fully modified script.py below:

from typing import List, Dict
import neo4j
import boto3
from botocore.config import Config
import fire


LABEL_MAP = {
"ORGANIZATION": "I-ORG",
"LOCATION": "I-LOC",
"PERSON": "I-PER",
"MISCELLANEOUS": "I-MISC"
}


class Pipeline:
@staticmethod
def convert_labels(label: str):
return LABEL_MAP[label]

@staticmethod
def extract_node_properties(data: List[Dict]) -> List[Dict]:
nodes = []
for datapoint in data:
intermediary_dict = {}
for key, node in datapoint.items():
intermediary_dict[key] = node._properties
nodes.append(intermediary_dict)

return nodes

def run_ongdb_query(self):
get_sent_mention_gql = 'MATCH (sentence:Sentence)-[hm:HAS_MENTION]->(mention:Mention) RETURN sentence, mention'

with neo4j.GraphDatabase.driver('bolt://ongdb:7687', auth=(
'ongdb',
'admin')) as local_driver, local_driver.session() as local_session:
results: neo4j.BoltStatementResult = local_session.run(
get_sent_mention_gql)
nodes = results.data('sentence', 'mention')
print(self.extract_node_properties(nodes), end='')

def transform_data(self, nodes):
training_file = ""
for datapoint in nodes:
sentence: str = datapoint.get('sentence').get('sentence')
mention: str = datapoint.get('mention').get('value')
mention_label: str = datapoint.get('mention').get('ne')[0]
split_sentence: list = [
f"{word} O\n" if word != mention else f"{mention} {self.convert_labels(mention_label)}\n"
for word in sentence.replace("\n", " ").strip().split(" ")]
split_sentence += [". O\n", "\n"]
training_file += "".join(split_sentence)
print(training_file)

@staticmethod
def save_to_minio(training_file):
resource = boto3.resource('s3', aws_access_key_id="minio",
aws_secret_access_key="minio123",
endpoint_url="http://minio:9000",
config=Config(signature_version='s3v4'))

object = resource.Object("com-graphgrid-datasets",
"graph_training_data.txt")

object.put(Body=(bytes(training_file.encode('UTF-8'))))


def main():
fire.Fire(Pipeline)


if __name__ == '__main__':
main()

Building the airflow-tutorial docker image

Now that we've created the continuous ingest dockerfile, the ETL DAG, and modified script.py, we can finally build the docker image. First, we referenced our docker image within our DAG as:

t2 = DockerOperator(
...
image="airflow-tutorial",
...
)

This means that we need to tag our docker image with the same airflow-tutorial tag. Therefore, the following command can be run in a terminal to create our docker image.

cd ${var.misc.graphgrid_data}/airflow_tutorial
docker build -t airflow-tutorial -f continuous_ingest.dockerfile .
cd ${var.misc.graphgrid_data}/..

Uploading our DAG for use

Finally, we must use graphgrid airflow upload <path/to/DAG.py>, i.e. ./bin/graphgrid airflow upload ${var.misc.graphgrid_data}/airflow_tutorial/dag.py from the top level directory within the local CDP install if saved under the tutorial directory, in order to upload a custom DAG into Airflow's DagBag. Then we can open Airflow at $API_BASE:8081 and either Trigger the DAG or toggle the DAG into the On state which will automatically schedule it to run. Once we've run automatic ingest, we can yet again kick off a new training session with our newly ingested data via the same request body as before:

note

Airflow may take up to 1 minute to add new DAGs to the Webserver UI as it runs scans every minute.

{
"model": "named_entity_recognition",
"datasets": "{\"wikigold\":{\"train\":\"graph_training_data.txt\",\"eval\":\"None\"}}"
}

Screenshot

Screenshot

Using the newly trained model with the NLP module

Now that we've trained a new NER model, we can load this model for immediate use with the NLP module. First, we need to head to $API_BASE:9000 in order to access our running MinIO server. If using the default credentials, you can log in using:

Default username: minio Default password: minio123

otherwise, use the randomly generated credentials. From the homepage, select the bucket com-graphgrid-nlp, which houses all NLP resources and newly trained NLP models. Models will be stored under a CDP version specific folder, e.g. 2.0.0. This is for use with the model metadata registry in order to track models by CDP release.

Screenshot

Traversing within the 2.0.0 subfolder, we can see our newly trained NER model, which was named 20220104T220538-nerModel.

Screenshot

Using this information, i.e. the name of the model, we can utilize NLP's loadModel endpoint and specify 20220104T220538-nerModel, in order to use our newly trained model!

curl --location --request GET "${var.api.shellBase}/1.0/nlp/loadModel/20220104T220538-nerModel" \
--header "Authorization: Bearer ${var.api.auth.shellBearer}"

Conclusion

In this tutorial we outlined how to set up a basic python environment to develop a means of extracting data from ONgDB. We developed a custom ETL pipeline that takes our personalized data from ONgDB, converts it to the Wikigold format, and saves it to MinIO. This then allowed us to kick off a new training session for the NLP-model-training DAG to train a new named_entity_recognition model. Finally, we took this a step further and converted our script into a scheduled DAG that would extract data from ONgDB hourly.

This tutorial demonstrates one of the many possibilities with how we can take new data sources and generate new NLP models based upon their data. All models can be retrained with the same or similar steps.