GraphGrid Machine Learning (ML)
2.0
API Version 1.0
Introduction
GraphGrid ML provides machine learning operations across a graph database by integrating ONgDB with Spark. Users create policies with Geequel that defines their data for full Machine Learning pipeline management. GraphGrid ML is designed to address the following machine learning workflow:
- Transform data
- Train models
- Evaluate models
- Deploy models
- Make predictions
Prerequisite Information
Before diving in, be sure to familiarize yourself with this general information about machine learning models, as it pertains to GraphGrid ML.
- Transformormation
- Preprocessing your data by extracting and formatting features for ML tasks; Taking your raw data and transforming it into individual training points.
- Training
- Execute training on the transformed (pre-processed) data.
- Inference
- Make predictions by running one or more datapoints through the trained model.
- Spark/MLlib
- Spark is an analytics engine for data-processing at scale. MLlib is Spark's machine learning library.
- Spark and MLib's "training framework"
- common learning algorithms such as classification, regression, clustering, and collaborative filtering
- offers common Transformers and Estimators (like OneHotEncoding and Word2Vec )
API
This ML API version is 1.0
and as such all endpoints are rooted at /1.0/
. For example,
http://localhost/1.0/ml
(requires auth) would be the base context for this ML API under the GraphGrid API
deployed at http://localhost
.
Environment
ML requires the following integrations:
- ONgDB 1.0+
- Spark 2.4.0+
Check Status
Check the status of GraphGrid ML. Will return a 200 OK
response if healthy.
Base URL: /1.0/ml/status
Method: GET
Request
curl --location --request GET "${API_BASE}/1.0/ml/status"
Response
{
"status": "OK"
}
Data Briefing
Throughout this documentation, a graph with the following elements will be used as an example.
- Nodes:
- Individual - stores a person's record.
- sex -
m
orf
- birthday - e.g.
1964-09-02
- sex -
- Name - stores a name's information.
- givenName - e.g.
Keanu
- surname - e.g.
Reeves
- givenName - e.g.
- Individual - stores a person's record.
- Relationships
- HAS_NAME
- IS_MATCH
- value -
match
ornot_match
- value -
- Connections
- (:Individual)-[:HAS_NAME]->(:Name)
- (:Individual)-[:IS_MATCH]->(:Individual)
The goal is to develop a binary classification model that predicts whether two individual nodes are match
or not_match
.
Policy
Each step in the machine learning workflow can be defined and managed in a policy that is received by the corresponding endpoint. All policies are JSON documents. GraphGrid ML supports two types of policies: transformation policy and training policy. The common parameters in these policies are:
Parameter | Description |
---|---|
policyName string | Name of the policy. |
savePolicy boolean | Whether to save the policy or not. Default: true |
overwrite boolean | Whether to allow the existing policy to be overwritten or not. Default: false |
metdata object | The metadata of the policy, including: <-description -creationAt -updateAt |
A transformation policy defines the transformation workflow, how a raw data point should be processed to transform into features. This is done by preprocessing your data (a.k.a. feature generation), and extracting and formatting features for ML tasks.
A training policy defines the training workflow, how the futures should be used/manipulated to train our model. In many incarnations this is literal code, but here its policy driven. This is done by executing training on the pre-processed data.
Graph Structure
The following is a breakdown of the graph structure created by the ML pipeline
GraphGridMLFeature
(GGMLF) node- created by a transformation policy
- represents the stored features representing a single training data point compiled from transformation of the raw data
- connects to all used nodes through a
GG_ML_FROM
relationship
GraphGridMLTransformation
(GGMLT) node- node representing the transformation policy being used
- connects to all
GGMLF
nodes created from the represented policy, also connnecting toGGMLT
nodes - by default saved as a node to graph (can be turned off with
savePolicy: False
but will take performance hit when collecting GGMLF nodes for training)
Transformation
Training machine learning models relies on having data in a specific format. Raw graph data often needs some
preprocessing before it can be used for training. GraphGrid ML uses JSON documents called polices to preprocess
and transform graph data so that it is ready for ML tasks. These policies are part of a generic tooling for
transforming raw graph data into training data by using full access to leverage Geequel queries.
provides endpoints for extracting, transforming and selecting features from raw data stored in the graph. The
transformation endpoint is at /transformations
.
Transformation Policy
A transformation policy is a JSON document that defines how to transform raw graph data into feature nodes ready
for training. The extracted features are then stored on a GraphGridMLFeature
node. Transformation policies
allow the user to get their srouces data diretly from the graph. Other than the common elements described in
Policy, a transformation policy
has the following level 1 parameters:
Parameter | Description |
---|---|
option optional object | Includes the configuration for the apoc.periodic.iterate procedure call that is used to execute the cypher query generated by the policy |
source object | Defines the input data for transformation. |
destination object | Defines the label on the GraphGridMLFeature nodes and connections between the feature node and source data node. |
assignment optional array of object | The intermediate steps of generating the final feature set. |
feature object | The final feature set to be stored on the GraphGridMLFeature nodes. |
If savePolicy is true
, then a GraphGridMLTransformation node will be created that connects to all GraphGridMLFeature nodes
generated by the policy via GG_ML_PRODUCES relationships. Otherwise, only the GraphGridMLFeature nodes are created.
Option
Once a training policy is sent to the transformation endpoint, the transformation service will analyze and process the policy and generate a set of cypher queries. These queries are executed using apoc.periodic.iterate procedure call. The procedure call supports six parameters (see link). The option field allows user to set the values for five of them:
|batchSize integer|"that many inner statements are run within a single tx params: {_count,_batch}" Default 1000
|
|parallel boolean|"run inner statement in parallel, note that statements might deadlock" Default false
|
|retries integer|"if the inner statement fails with an error, sleep 100ms and retry until retries-count is reached, param {_retry}" Default 0
|
|iterateList boolean|"the inner statement is only executed once but the whole batchSize list is passed in as parameter {_batch}" Default false
|
|concurrency integer|"How many concurrent tasks are generated when using parallel:true" Default 50
|
Source
|cypher string|The cypher query to look up nodes, relationships and their properties to be processed.| |parameters key-value pairs|Values for cypher query with parameters.| |variables string list|Select the variables to be processed from the cypher query.|
Example
{
"source": {
"cypher": "MATCH (n1:Person)-[r1:HAS_NAME]->(name1:Name), (n2:Person)-[r2:HAS_NAME]->(name2:Name), (n1)-[l:IS_MATCH]-(n2) RETURN n1.grn AS n1_grn, n2.grn AS n2_grn, n1.birthday AS n1_birthday, n2.birthday AS n2_birthday, n1.sex AS n1_sex, n2.sex AS n2_sex, name1.givenName AS n1_givenName, name2.givenName AS n2_givenName, name1.surname AS n1_surname, name2.surname AS n2_surname,l.value AS label LIMIT 5",
"variables": [
"n1_grn",
"n2_grn",
"n1_birthday",
"n2_birthday",
"n1_sex",
"n2_sex",
"n1_givenName",
"n2_givenName",
"n1_surname",
"n2_surname",
"label"
]
}
}
The cypher above generates 5 rows and 11 columns of data. All the column names are included in variables, meaning all of them will be used to generate the final feature set.
Destination
It defines the node labels on the GraphGridMLFeature nodes and the grns of the source nodes to connect to from the GraphGridMLFeature nodes.
Parameter | Description |
---|---|
nodeLabels string list | The node labels of the the source nodes. It is mainly used for look up. |
sourceGRNs string list | The grns of the source nodes. It is used for creating connections between source nodes and feature nodes. |
Example
{
"destination": {
"nodeLabels": ["Individual", "Individual"],
"sourceGRNs": ["n1_grn", "n2_grn"]
}
}
The result is shown in the graph. The red node at the center is the GraphGridMLFeature node generated for one row from the source data. The nodeLabels property of that node is
[Individual,Individual]
. Since the feature set is extracted between two source nodes, two GG_ML_FROM relationships are created between the feature node and the source nodes.
Assignments
It contains a list of assignments. Each assignment represents a cypher syntax, function or procedure call. It takes some input variables and produces an output variable. Order matters.
Parameter | Description |
---|---|
inputs string | |
The input variables for the assignment. The input variables are either defined from the variables parameter in source field or a previous assignment. | |
assignment string | A cypher syntax, function or procedure call. For procedure calls, the YIELD keyword must be provided. |
output string | The variable that stores the result of the assignment. It can be an existing variable in the scope (the value gets overwritten), or a new variable. |
Example
{
"assignments": [
{
"inputs": ["n1_birthday"],
"assignment": "split(n1_birthday, '-')",
"output": "n1_birthday_tokens"
},
{
"inputs": ["n2_birthday"],
"assignment": "split(n2_birthday, '-')",
"output": "n2_birthday_tokens"
},
{
"inputs": ["n1_birthday_tokens"],
"assignment": "n1_birthday_tokens[0]",
"output": "n1_birth_year"
},
{
"inputs": ["n1_birthday_tokens"],
"assignment": "n1_birthday_tokens[1]",
"output": "n1_birth_month"
},
{
"inputs": ["n1_birthday_tokens"],
"assignment": "n1_birthday_tokens[2]",
"output": "n1_birth_day"
},
{
"inputs": ["n2_birthday_tokens"],
"assignment": "n2_birthday_tokens[0]",
"output": "n2_birth_year"
},
{
"inputs": ["n2_birthday_tokens"],
"assignment": "n2_birthday_tokens[1]",
"output": "n2_birth_month"
},
{
"inputs": ["n2_birthday_tokens"],
"assignment": "n2_birthday_tokens[2]",
"output": "n2_birth_day"
},
{
"inputs": ["n1_birth_year", "n2_birth_year"],
"assignment": "apoc.text.hammingDistance(n1_birth_year, n2_birth_year)",
"output": "dist_by"
},
{
"inputs": ["n1_birth_month", "n2_birth_month"],
"assignment": "apoc.text.hammingDistance(n1_birth_month, n2_birth_month)",
"output": "dist_bm"
},
{
"inputs": ["n1_birth_day", "n2_birth_day"],
"assignment": "apoc.text.hammingDistance(n1_birth_day, n2_birth_day)",
"output": "dist_bd"
},
{
"inputs": ["n1_sex", "n2_sex"],
"assignment": "CASE WHEN n1_sex = n2_sex THEN 1 ELSE 0 END",
"output": "cmp_sex"
},
{
"inputs": ["n1_givenName", "n2_givenName"],
"assignment": "apoc.text.jaroWinklerDistance(n1_givenName, n2_givenName)",
"output": "dist_fname"
},
{
"inputs": ["n1_givenName"],
"assignment": "CALL apoc.text.doubleMetaphone(n1_givenName) YIELD value",
"output": "name1_double_metaphone_encoded_first_name"
},
{
"inputs": ["n1_surname"],
"assignment": "CALL apoc.text.doubleMetaphone(n1_surname) YIELD value",
"output": "name1_double_metaphone_encoded_last_name"
},
{
"inputs": ["n2_givenName"],
"assignment": "CALL apoc.text.doubleMetaphone(n2_givenName) YIELD value",
"output": "name2_double_metaphone_encoded_first_name"
},
{
"inputs": ["n2_surname"],
"assignment": "CALL apoc.text.doubleMetaphone(n2_surname) YIELD value",
"output": "name2_double_metaphone_encoded_last_name"
},
{
"inputs": [
"name1_double_metaphone_encoded_first_name",
"name2_double_metaphone_encoded_first_name"
],
"assignment": "apoc.text.levenshteinSimilarity(name1_double_metaphone_encoded_first_name, name2_double_metaphone_encoded_first_name)",
"output": "phnt_fname_sim"
},
{
"inputs": ["n1_surname", "n2_surname"],
"assignment": "apoc.text.jaroWinklerDistance(n1_surname, n2_surname)",
"output": "dist_lname"
},
{
"inputs": [
"name1_double_metaphone_encoded_last_name",
"name2_double_metaphone_encoded_last_name"
],
"assignment": "apoc.text.levenshteinSimilarity(name1_double_metaphone_encoded_last_name, name2_double_metaphone_encoded_last_name)",
"output": "phnt_lname_sim"
}
]
}
This example covers all scenarios of an assignment. There are built-in cypher functions like `split(n1_birthday, '-'), pure cypher syntax n1_birthday_tokens[0], APOC functions like apoc.text.hammingDistance(n1_birth_year, n2_birth_year), APOC procedure calls like CALL apoc.text.doubleMetaphone(n2_givenName) YIELD value. What it does is splitting the birthday property into year, month and day, and then compute the text similarity for each of them. For names, it computes the text similarity between the original values and the encoded values.
Feature
It defines the final feature set to be stored on the GraphGridMLFeature nodes. Each feature is a key-value pair, with key being the feature name on the node, and value being the variable from source or assignments fields and an optional string field description for comment.
Example
{
"feature": {
"dist_fname": {
"source": "dist_fname",
"description": "Jaro-Winkler distance of two first names."
},
"phnt_fname_sim": {
"source": "phnt_fname_sim",
"description": "Levenshtein similarity of two phonetic encoding first names."
},
"dist_lname": {
"source": "dist_lname",
"description": "Jaro-Winkler distance of two last names."
},
"phnt_lname_sim": {
"source": "phnt_lname_sim",
"description": "Levenshtein similarity of two phonetic encoding last names."
},
"cmp_sex": {
"source": "cmp_sex",
"description": "Whether two individuals have the same sex."
},
"dist_by": {
"source": "dist_by",
"description": "Hamming distance of two birth years."
},
"dist_bm": {
"source": "dist_bm",
"description": "Hamming distance of two birth months."
},
"dist_bd": {
"source": "dist_bd",
"description": "Hamming distance of two birth days."
},
"is_match": {
"source": "label",
"description": "Whether two individuals are the same."
}
}
}
There are 9 features in the final feature set. All of the variables can be found from source or assignments examples above. These features will be stored on the feature node as features.
Get Feature
Returns a feature.
Base URL: `/1.0/ml/{{clustername}}/features/?nodeLabels={{nodeLabel}},{{nodeLabel}}
Request
curl --location --request GET "${var.api.shellBase}/1.0/ml/default/features/?nodeLabels=Individual,Individual&limit=" \
--header "Authorization: Bearer ${var.api.auth.shellBearer}"
Create Transformation
Base URL: /1.0/ml/{{cluster-name}}/transformations/{{policy-name}}
Method: POST
Accept the transformation policy and execute the transformation workflow.
Request
curl --location --request POST "${var.api.shellBase}/1.0/ml/graphgrid-ml-example/transformations/transformation-policy" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${var.api.auth.shellBearer}" \
--data-raw '{
"savePolicy": true,
"overwrite": true,
"metadata": {
"description": "Features between Individual node and Individual node"
},
"source": {
"cypher": "MATCH (n1:TestIndividual)-[r1:HAS_NAME]->(name1:TestPersonalName), (n2:TestIndividual)-[r2:HAS_NAME]->(name2:TestPersonalName), (n1)-[l:IS_MATCH]-(n2) RETURN n1.grn AS n1_grn, n2.grn AS n2_grn, n1.birthday AS n1_birthday, n2.birthday AS n2_birthday, n1.sex AS n1_sex, n2.sex AS n2_sex, name1.givenName AS n1_givenName, name2.givenName AS n2_givenName, name1.surname AS n1_surname, name2.surname AS n2_surname,l.value AS label LIMIT 5",
"variables": [
"n1_grn",
"n2_grn",
"n1_birthday",
"n2_birthday",
"n1_sex",
"n2_sex",
"n1_givenName",
"n2_givenName",
"n1_surname",
"n2_surname",
"label"
]
},
"destination": {
"nodeLabels": [
"Individual",
"Individual"
],
"sourceGRNs": [
"n1_grn",
"n2_grn"
]
},
"assignments": [
{
"inputs": ["n1_birthday"],
"assignment": "split(n1_birthday, '\''-'\'')",
"output": "n1_birthday_tokens"
},
{
"inputs": ["n2_birthday"],
"assignment": "split(n2_birthday, '\''-'\'')",
"output": "n2_birthday_tokens"
},
{
"inputs": ["n1_birthday_tokens"],
"assignment": "n1_birthday_tokens[0]",
"output": "n1_birth_year"
},
{
"inputs": ["n1_birthday_tokens"],
"assignment": "n1_birthday_tokens[1]",
"output": "n1_birth_month"
},
{
"inputs": ["n1_birthday_tokens"],
"assignment": "n1_birthday_tokens[2]",
"output": "n1_birth_day"
},
{
"inputs": ["n2_birthday_tokens"],
"assignment": "n2_birthday_tokens[0]",
"output": "n2_birth_year"
},
{
"inputs": ["n2_birthday_tokens"],
"assignment": "n2_birthday_tokens[1]",
"output": "n2_birth_month"
},
{
"inputs": ["n2_birthday_tokens"],
"assignment": "n2_birthday_tokens[2]",
"output": "n2_birth_day"
},
{
"inputs": ["n1_birth_year", "n2_birth_year"],
"assignment": "apoc.text.hammingDistance(n1_birth_year, n2_birth_year)",
"output": "dist_by"
},
{
"inputs": ["n1_birth_month", "n2_birth_month"],
"assignment": "apoc.text.hammingDistance(n1_birth_month, n2_birth_month)",
"output": "dist_bm"
},
{
"inputs": ["n1_birth_day", "n2_birth_day"],
"assignment": "apoc.text.hammingDistance(n1_birth_day, n2_birth_day)",
"output": "dist_bd"
},
{
"inputs": ["n1_sex", "n2_sex"],
"assignment": "CASE WHEN n1_sex = n2_sex THEN 1 ELSE 0 END",
"output": "cmp_sex"
},
{
"inputs": ["n1_givenName", "n2_givenName"],
"assignment": "apoc.text.jaroWinklerDistance(n1_givenName, n2_givenName)",
"output": "dist_fname"
},
{
"inputs": ["n1_givenName"],
"assignment": "CALL apoc.text.doubleMetaphone(n1_givenName) YIELD value",
"output": "name1_double_metaphone_encoded_first_name"
},
{
"inputs": ["n1_surname"],
"assignment": "CALL apoc.text.doubleMetaphone(n1_surname) YIELD value",
"output": "name1_double_metaphone_encoded_last_name"
},
{
"inputs": ["n2_givenName"],
"assignment": "CALL apoc.text.doubleMetaphone(n2_givenName) YIELD value",
"output": "name2_double_metaphone_encoded_first_name"
},
{
"inputs": ["n2_surname"],
"assignment": "CALL apoc.text.doubleMetaphone(n2_surname) YIELD value",
"output": "name2_double_metaphone_encoded_last_name"
},
{
"inputs": ["name1_double_metaphone_encoded_first_name", "name2_double_metaphone_encoded_first_name"],
"assignment": "apoc.text.levenshteinSimilarity(name1_double_metaphone_encoded_first_name, name2_double_metaphone_encoded_first_name)",
"output": "phnt_fname_sim"
},
{
"inputs": ["n1_surname", "n2_surname"],
"assignment": "apoc.text.jaroWinklerDistance(n1_surname, n2_surname)",
"output": "dist_lname"
},
{
"inputs": ["name1_double_metaphone_encoded_last_name", "name2_double_metaphone_encoded_last_name"],
"assignment": "apoc.text.levenshteinSimilarity(name1_double_metaphone_encoded_last_name, name2_double_metaphone_encoded_last_name)",
"output": "phnt_lname_sim"
}
],
"feature": {
"dist_fname": {
"source": "dist_fname",
"description": "Jaro-Winkler distance of two first names."
},
"phnt_fname_sim": {
"source": "phnt_fname_sim",
"description": "Levenshtein similarity of two phonetic encoding first names."
},
"dist_lname": {
"source": "dist_lname",
"description": "Jaro-Winkler distance of two last names."
},
"phnt_lname_sim": {
"source": "phnt_lname_sim",
"description": "Levenshtein similarity of two phonetic encoding last names."
},
"cmp_sex": {
"source": "cmp_sex",
"description": "Whether two individuals have the same sex."
},
"dist_by": {
"source": "dist_by",
"description": "Hamming distance of two birth years."
},
"dist_bm": {
"source": "dist_bm",
"description": "Hamming distance of two birth months."
},
"dist_bd": {
"source": "dist_bd",
"description": "Hamming distance of two birth days."
},
"is_match": {
"source": "label",
"description": "Whether two individuals are the same."
}
}
}'
Response
{
"response": null,
"exception": null,
"statusText": null,
"statusCode": 0,
"policyName": "transformation-policy",
"featureNodeCreation": {
"batches": 1,
"total": 5,
"timeTaken": 0,
"committedOperations": 5,
"failedOperations": 0,
"failedBatches": 0,
"retries": 0,
"errorMessages": {},
"batch": {
"total": 1,
"committed": 1,
"failed": 0,
"errors": {}
},
"operations": {
"total": 5,
"committed": 5,
"failed": 0,
"errors": {}
},
"wasTerminated": false,
"failedParams": {}
},
"featureComputation": {
"batches": 1,
"total": 5,
"timeTaken": 0,
"committedOperations": 5,
"failedOperations": 0,
"failedBatches": 0,
"retries": 0,
"errorMessages": {},
"batch": {
"total": 1,
"committed": 1,
"failed": 0,
"errors": {}
},
"operations": {
"total": 5,
"committed": 5,
"failed": 0,
"errors": {}
},
"wasTerminated": false,
"failedParams": {}
}
}
Get Transformation Policy
Base URL: /1.0/ml/{{clusterName}}/transformations?policyName={{policyName}}
Method: GET
Fetch the uploaded transformation policy.
Parameters | Description |
---|---|
policyName | Name of the transformation policy. |
Request
curl --location --request GET "${var.api.shellBase}/1.0/ml/graphgrid-ml-example/transformations?policyName=transformation-policy" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${var.api.auth.shellBearer}"
Get Transformation Status
Base URL: https://api.graphgrid.com/1.0/ml/{{cluster-name}}/transformations/status?policyName={{mlPolicyName}}
Method: GET
Show the policy name and feature node metadata associated with the transformation node.
Parameters | Description |
---|---|
policyName | Name of the transformation policy. |
Request
curl --location --request GET "${var.api.shellBase}/1.0/ml/graphgrid-ml-example/transformations/status?policyName=transformation-policy" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${var.api.auth.shellBearer}"
Response
{
"statusCode": 0,
"policyName": "transformation-policy",
"featureNodeTypes": [
{
"nodeLabels": ["Individual", "Individual"],
"count": 4,
"features": [
"cmp_sex",
"dist_bd",
"dist_bm",
"dist_by",
"dist_fname",
"dist_lname",
"is_match",
"phnt_fname_sim",
"phnt_lname_sim"
]
}
]
}
Get Inner Transformation Status
Base URL: /1.0/ml/{{cluster-name}}/transformations/status/inner?task={{mlTask}}&policyName={{mlPolicyName}}
Method: GET
Show the task, policy name and feature node metadata associated with the inner transformation node.
Parameters | Description |
---|---|
task | Task of the training. |
policyName | Name of the training policy. |
Delete Transformation
Base URL: 1.0/ml/{{cluster-name}}/transformations?policyName={{policyName}}
Method: DELETE
Delete the transformation policy, transformation node and the relationships to feature nodes.
Parameters | Description |
---|---|
policyName | Name of the transformation policy. |
Feature
GraphGrid ML provides an endpoint that returns the features on feature nodes with a given node labels value in JSON document. The feature
endpoint is at /features
.
Show Feature
Base URL: /1.0/ml/{{cluster-name}}/features?nodeLabels={{nodeLabel}}
Methiod: GET
Return the features stored on feature nodes with provided node labels.
Parameters | Description |
---|---|
nodeLabels | Node labels. |
limit | Maximum number of feature sets. |
Example Response
{
"features": [
{
"cmp_sex": 1,
"dist_bd": 0,
"dist_bm": 0,
"dist_by": 0,
"dist_fname": 1,
"dist_lname": 1,
"is_match": "match",
"phnt_fname_sim": 1,
"phnt_lname_sim": 1
},
{
"cmp_sex": 1,
"dist_bd": 2,
"dist_bm": 2,
"dist_by": 2,
"dist_fname": 0.5777777777777778,
"dist_lname": 1,
"is_match": "not_match",
"phnt_fname_sim": 0.6666666666666666,
"phnt_lname_sim": 1
}
]
}
Training
GraphGrid ML uses Spark MLlib as the training framework. The training endpoint is at /trainings. The training policy defines the source data, the training workflow and metrics for evaluation. The training data is imported using Cypher query. After the training process is completed, a GraphGridMLModel node is created on which it stores the following information:
- PipelineStages used
- Training and test data count
- Evaluation metric values
- Required input and output column schema
The transformation endpoint is at /trainings
.
Train Model
Base URL: https://api.graphgrid.com/1.0/ml/{{cluster-name}}/trainings/{{task}}/{{policy-name}}
Method: POST
Training Policy
A JSON document that defines the workflow for training a machine learning model. Since the training framework is Spark ML, a lot of Spark SQL and Spark MLlib's concepts are used. Other than the common elements described in Policy, a transformation policy has the following level 1 parameters:
Parmeter | Description |
---|---|
dataFrame object | Includes the cypher query and schema to construct the input DataFrame. |
pipelineStages object | Defines a series of Pipeline stages to be used during the training process. |
trainingData object | Specify the DataFrame for training. |
trainingPipelines array of strings | Specify the pipeline stages to be included in the final ML pipeline. |
evaluators optional array of objects | Specify one or evaluators for the trained model. |
The savePolicy field is forced to be true
for training policy.
The dataFrame and every Pipeline stage object in the pipelineStages have a common parameter name. It is used as a reference of
the current element to be
called by latter elements via {{reference}}
.
DataFrame
In Spark SQL, a DataFrame is conceptually equivalent to a table in a relational database or a data frame in R/Python. In this training policy, dataFrame contains the parameters for constructing a DataFrame that is going to be used throughout the training process.
Parmeter | Description |
---|---|
name string | The reference of the DataFrame. |
transformation optional object | Contains cypher, variables, destination, assignments and feature elements from Transformation Policy. It is an inner transformation policy that gets executed to prepare data for the DataFrame. After an inner transformation policy gets executed, a GraphGridMLInnerTransformation node is created and connected to the associated GraphGridMLFeature nodes and the GraphGridMLModel node. |
cypher string | The cypher query to fetch columns of data from the graph. |
parameters optional map | Values for cypher query with parameters. |
schema object | The schema of each column in the DataFrame, including the name and data type. See Schema for more information |
Example
{
"dataFrame": {
"name": "sourceData",
"cypher": "MATCH (n:GraphGridMLTransformation {policyName:'sample'})-[:GG_ML_PRODUCES]->(m:GraphGridMLFeature) RETURN apoc.convert.toInteger(m.`features.cmp_sex`) AS cmp_sex,apoc.convert.toInteger(m.`features.dist_bd`) AS dist_bd,apoc.convert.toInteger(m.`features.dist_bm`) AS dist_bm,apoc.convert.toInteger(m.`features.dist_by`) AS dist_by,m.`features.dist_fname` AS dist_fname,m.`features.dist_lname` AS dist_lname,m.`features.is_match` AS is_match,apoc.convert.toInteger(m.`features.phnt_fname_sim`) AS phnt_fname_sim,apoc.convert.toInteger(m.`features.phnt_lname_sim`) AS phnt_lname_sim;",
"schema": [
{
"name": "cmp_sex",
"type": "integer"
},
{
"name": "dist_bd",
"type": "integer"
},
{
"name": "dist_bm",
"type": "integer"
},
{
"name": "dist_by",
"type": "integer"
},
{
"name": "dist_fname",
"type": "double"
},
{
"name": "dist_lname",
"type": "double"
},
{
"name": "is_match",
"type": "string"
},
{
"name": "phnt_fname_sim",
"type": "integer"
},
{
"name": "phnt_lname_sim",
"type": "integer"
}
]
}
}
In this example, the cypher query is looking up feature nodes associated with a transformation node created earlier (from Transformation, and selecting properties on those feature nodes to be used as the columns of the DataFrame. The schema defines the names and data type of the DataFrame columns.
PipelineStage
The pipelineStages element contains a series of PipelineStages. Pipeline is a concept from Spark MLlib (see ML Pipelines. A PipelineStage is a Pipeline component, either Transformer or Estimator. The training policy allows users to define the pipeline stages with JSON syntax. The core parameters of applying a Pipeline component from Spark MLlib are the follows:
Parameter | Description |
---|---|
name string | The reference of the PipelineStage. |
type string | The class name of the Pipeline component from Spark MLlib. |
parameters map | Parameters of the Pipeline component. |
outputs optional map | Create references for the return values of getter methods of the Pipeline component. |
Depending on the type , the PipelineStage can be a Transformer or Estimator. The parameters are different for each of them.
Transformer
A Transformer transforms a DataFrame into another DataFrame. The parameter for applying a Transformer is transform and it has three fields:
|name string|The reference of the transformed DataFrame.| |dataFrame string|The reference of a DataFrame already defined in the scope.| |outputs optional map|Create references for the return value of getter methods of the transformed DataFrame.|
Estimator
An Estimator takes in a DataFrame and outputs a Transformer. The parameter for applying an Estimator is fit and it has four fields, one of them is a nested transform which allows users to apply the generated Transformer immediately after applying the Estimator, in one block of PipelineStage definition. The fields of fit are:
|name string|The reference of the generated Transformer.| |dataFrame string|The reference of a DataFrame already defined in the scope.| |outputs optional map|Create references for the return value of getter methods of the generated Transformer.| |transform optional object|The object used to apply the generated Transformer. See Transformer.|
Example
{
"pipelineStages": [
{
"name": "stringIndexer",
"type": "StringIndexer",
"parameters": {
"inputCol": "is_match",
"outputCol": "label"
},
"fit": {
"name": "stringIndexerModel",
"dataFrame": "{{sourceData}}",
"outputs": {
"indexerLabels": "labels"
},
"transform": {
"name": "training",
"dataFrame": "{{sourceData}}"
}
}
},
{
"name": "assembler",
"type": "VectorAssembler",
"parameters": {
"inputCols": [
"cmp_sex",
"dist_bd",
"dist_bm",
"dist_by",
"dist_fname",
"dist_lname",
"phnt_fname_sim",
"phnt_lname_sim"
],
"outputCol": "features"
}
},
{
"name": "lr",
"type": "LogisticRegression",
"parameters": {
"predictionCol": "predicted_label"
}
},
{
"name": "indexToString",
"type": "IndexToString",
"parameters": {
"inputCol": "predicted_label",
"outputCol": "predicted_label_string",
"labels": "{{indexerLabels}}"
}
}
]
}
In this example, four PipelineStages are defined, each corresponds to a Transformer or Estimator from SparkMLlib:
StringIndexer
,VectorAssembler
,LogisticRegression
andIndexToString
.
The first PipelineStage, StringIndexer
, is an Estimator, so its definition has fit parameter. It is used to create a number column
"label" for the string column "is_match" as LogisticRegression
model requires the label column to be in number format. Moreover, the
dataFrame field of fit is {{sourceData}}
, which refer to the DataFrame defined in dataFrame in previous section. The fit
generates a StringIndexerModel
object, a Transformer. We want to use the labels value carried by this Transformer later for creating
the IndexToString
model, therefore in outputs field inside fit we create a reference to the return value of
StringIndexerModel.labels()
method. Further, we want to append the number column to the final DataFrame. Hence we apply the generated
transformer on the sourceData
DataFrame in transform field and create a reference for the transformed DataFrame.
The second PipelineStage is VectorAssembler
which collects the features columns into a vector column. The next one is
LogisticRegression
which is the core of this training pipeline, generating the predicted value in the "predicted_label" column. The
last PipelineStage is IndexToString
which converts the predicted value (in number) back to its corresponding string value. We can see
that in its parameters field, the value of labels parameter is a reference to the labels array named indexerLabels
which we defined
in the first PipelineStage.
Training Data
Specify the DataFrame and training / test ratio for training data.
Parameter | Description |
---|---|
dataFrame string | The reerence of a DataFrame. |
trainingRatio float | The percentage of DataFrame to be used as training data. |
Example
{
"trainingData": {
"dataFrame": "{{training}}",
"trainingRatio": 0.7
}
}
We are using the DataFrame with number column "label" appended as source of training data. So the value of name is {{training}}
.
70% of the DataFrame will be used for training and 30% will be used for evaluation so the value of trainingRatio is 0.7.
Training Pipelines
Specify the final PipelineStage set for training.
Example
{
"trainingPipelines": ["assembler", "lr", "indexToString"]
}
Three of the PipelineStages defined earlier are included in the final PipelineStage set. They will form the ML Pipeline and apply on the training data.
Evaluator
Evaluator is another concept from Spark MLlib, just like Pipeline components. Evaluators are used to evaluate the performance of a model, computing the metrics that reflects the quality of a model. There are four Evaluators supported by GraphGrid ML.
Evaluator | Metrics |
---|---|
BinaryClassificationEvaluator | areaUnderROC: Area under a Receiver Operating Characteristic curve. areaUnderPR: Area under a Precision-Recall curve. |
MulticlassClassificationEvaluator | f1: F1 score. weightedPrecision:Weighted average precision score. accuracy: Accuracy score. |
RegressionEvaluator | rmse: Root mean squared error mseMean squared error r2: R2 score |
ClusteringEvaluator | silhouette: Mean Silhouette Coefficient of all samples |
To apply one or more Evaluators on a model, fill the evaluators parameter of the training policy. The syntax of this parameter is similar to the one of pipelineStages.
Parameter | Description |
---|---|
type string | The class name of the Evaluator from Spark MLlib. |
parameters map | Parameters of the Evaluator. |
metricslist | Metrics to use provided by the Evaluator. |
Example
{
"evaluators": [
{
"type": "BinaryClassificationEvaluator",
"metrics": ["areaUnderROC", "areaUnderPR"]
}
]
}
In this example, we apply a BinaryClassificationEvaluator
Evaluator for our binary classification model. The metrics are areaUnderROC
and areaUnderPR
.
Graph
After executing the sample policy above, the following elements are created in the graph.
The blue node is the GraphGridMLModel node, it is connected to the purple GraphGridMLInnerTransformation node via GG_ML_PRODUCES relationship. The GraphGridMLInnerTransformation node is defined in dataFrame parameter and connects to all the GraphGridMLFeature nodes it produces via GG_ML_PRODUCES relationships.
The model node carries properties about:
- Task and policy name
- Training and test data count
- PipelineStages used
- E.g.
VectorAssembler
,LogisticRegression
,IndexToString
, which are the ones specified in the trainingPipelines parameter.
- E.g.
- Metrics: metric scores
- E.g.
metrics.areaUnderPR: 1
which means theareaUnderPR
of this model is1
.
- E.g.
- Input and output schema: basic data type and structure data type of each column.
- E.g.
inputSchema.dist_bm: double,basic
which means the basic data type and structure data type of the input column named "dist_bm" aredouble
andbasic
.
- E.g.
Get Training Policy
Base URL: /1.0/ml/{{cluster-name}}/trainings?task={{mlTask}}&policyName={{mlPolicyName}}
Method: GET
Fetch the uploaded training policy.
Parameter | Description |
---|---|
task | Task of the training. |
policyName | Name of the training policy. |
Model
The model endpoint is at /models. It accepts two requests, one for displaying the information of a trained model, and one for removing a trained model.
Delete Model
Base URL: /1.0/ml/{{cluster-name}}/models?task={{mlTask}}&policyName={{mlPolicyName}}
MethodL DELETE
Parameter | Description |
---|---|
task | Task of the training. |
policyName | Name of the training policy. |
Inference
Once a model is trained, it can be used to predict the outcome for a given set of input data. The input and output schema must be acceptable by the trained model, otherwise an error will occur. There are two types of inference: real time and batch inference. Real time inference handles a small amount of input data and returns the predictions in a short amount of time in a response. Batch inference accepts a large amount of input data and stores the prediction results in the graph. The inference endpoint is at /inference.
Realtime Inference
Load Model
Base URL: /1.0/ml/{{cluster-name}}/inference/model
Method: POST
Load the trained model specified by the task and policy name.
{
"task": "sample-task",
"policyName": "sample-policy"
}
Unload Model
Base URL: /1.0/ml/{{cluster-name}}/model
Method: DELETE
Unload the model that is currently loaded.
Parameters | Description |
---|---|
task | Task of the training. |
policyName | Name of the training policy. |
Schema
It specifies the column schema for a data frame. Each column schema contains three fields. Note that during the training phase, only the first two parameters will be effective. In other words, the struct parameter will be ignored in the training policy. In the inference requests, all of them will be used.
Parameter | Description |
---|---|
name string | The name of the column. |
type string | The basic type of the column. |
struct optional string | The structure data type of the column. - basic : basic data types like int, double, string, etc. - vector : variable length structure of basic data types. - array : fixed length structure of basic data types. Default: basic |
The schema for inference requests must be acceptable by the target model. To confirm the validity of the schema, check the model information for valid input and output column schema.
Example
{
"schema": {
"input": [
{
"name": "cmp_sex",
"type": "integer"
},
{
"name": "dist_bd",
"type": "integer"
},
{
"name": "dist_bm",
"type": "integer"
},
{
"name": "dist_by",
"type": "integer"
},
{
"name": "dist_fname",
"type": "double"
},
{
"name": "dist_lname",
"type": "double"
},
{
"name": "phnt_fname_sim",
"type": "integer"
},
{
"name": "phnt_lname_sim",
"type": "integer"
}
],
"output": [
{
"name": "predicted_label_string",
"type": "string"
},
{
"name": "probability",
"type": "double",
"struct": "vector"
}
]
}
}
There are 8 columns in the input data for the trained binary classification model. We are interested in 2 columns in the output data: a string column indicating whether the result is
match
ornot_match
, and a vector of double column showing the probabilities of getting either result.
Create JSON Data Inference
Base URL: https://api.graphgrid.com/1.0/ml/{{cluster-name}}/inference/json
Method: POST
Accept a JSON document request in which one row of input data is included and return the prediction.
Headers > Content-Type: application/json
Body > JSON Data Request
JSON Data Inference
The JSON data inference allows only one row of data to be sent. The input data is in the list parameter named data. The entries must follow the order and data type of the input schema.
Example Request
{
"schema": {
...
},
"data": [
1,
1,
1,
1,
0.45,
0.7,
1,
1
]
}
Example Response
{
"result": {
"predicted_label_string": "not_match",
"probability": [0.9862161796038926, 0.013783820396107364]
}
}
Create Cypher Data Inference
Base URL: https://api.graphgrid.com/1.0/ml/{{cluster-name}}/inference/cypher
Method: POST
Cypher Dara Inference
The Cypher data inference allows user to enter a cypher query with parameters to fetch one or more rows of data to be sent. The data must follow the order, name, and data type of the input schema.
Example Request
{
"schema": {
...
},
"data": {
"cypher": "WITH [$list1,$list2] AS colls UNWIND colls AS coll RETURN coll[0] AS cmp_sex, coll[1] AS dist_bd, coll[2] AS dist_bm, coll[3] AS dist_by, coll[4] AS dist_fname, coll[5] AS dist_lname, coll[6] AS phnt_fname_sim, coll[7] AS phnt_lname_sim;",
"parameters": {
"list1": [
1,
0,
1,
0,
0.8,
0.5,
1,
1
],
"list2": [
1,
1,
2,
1,
0.31,
0.43,
0,
0
]
}
}
}
Example Response
{
"results": [
{
"predicted_label_string": "match",
"probability": [0.00001481113003280617, 0.9999851888699672]
},
{
"predicted_label_string": "not_match",
"probability": [0.9999999999997948, 2.052527119198712e-13]
}
]
}
Batch Inference
Batch inference endpoints allow the user to create and remove a batch inference job. The batch inference job handles a large data input and export the prediction results back in the graph. Unlike real time inference which produces the results in a short amount of time, batch inference job usually takes a long time as the input is large. Once a batch inference job is created, a GraphGridMLBatchInference node will be generated in the graph that contains the metadata of the batch inference job:
Parameter | Description |
---|---|
task string | The task of the target trained model. |
policyName string | The policy name of the target trained model. |
job id string | The job id of the batch inference job. |
Example: job_20190314102200_0000 | |
status string | The status of the batch inference job. - RUNNING - SUCCEEDED - FAILED |
state int | A number code indicating the last executed state of the batch inference job. - 0 - Job initialized 1 - Model loaded 2 - Input data loaded 4 - Inference results generated 5 - Inference results uploaded 6 - Inference results written to the graph |
totalPartitions long | Total partitions of the inference results generated. |
processedPartitions long | Number of partitions written to the graph. |
Create Batch Inference
Base URL: /1.0/ml/{{cluster-name}}/inference/batch
Method: POST
Accept a JSON document request which includes cypher query that defines the input and output data.
Headers > Content-Type: application/json
Body > Batch Inference Policy
Batch Inference Policy
The batch inference policy is essentially cypher data request with the additional elements as follows:
Parameter | Description |
---|---|
task string | The name of the column. |
policyName string | The basic data type of the column. |
schema object | The strucutre data type of the column. Default: basic |
inputQuery object | Cypher query and optional parameters to load input data. |
outputQuery object | Cypher query and optional parameters to write output data. The output fields specified in the output schema will be stored on a collection of map object named ggMLResults . Users can decide what they want with this object. Each entry of this object contains a row of output data. |
Example Policy
{
"task": "local",
"policyName": "test-local",
"schema": {
"input": [
{
"name": "n1_grn",
"type": "string"
},
{
"name": "n2_grn",
"type": "string"
},
{
"name": "cmp_sex",
"type": "integer"
},
{
"name": "dist_bd",
"type": "integer"
},
{
"name": "dist_bm",
"type": "integer"
},
{
"name": "dist_by",
"type": "integer"
},
{
"name": "dist_fname",
"type": "double"
},
{
"name": "dist_lname",
"type": "double"
},
{
"name": "phnt_fname_sim",
"type": "integer"
},
{
"name": "phnt_lname_sim",
"type": "integer"
}
],
"output": [
{
"name": "n1_grn"
},
{
"name": "n2_grn"
},
{
"name": "predicted_label_string"
}
]
},
"inputQuery": {
"cypher": "MATCH (n:GraphGridMLTransformation)-[:GG_ML_PRODUCES]->(m:GraphGridMLFeature) RETURN m.`features.n1_grn` AS n1_grn, m.`features.n2_grn` AS n2_grn, apoc.convert.toInteger(m.`features.cmp_sex`) AS cmp_sex,apoc.convert.toInteger(m.`features.dist_bd`) AS dist_bd,apoc.convert.toInteger(m.`features.dist_bm`) AS dist_bm,apoc.convert.toInteger(m.`features.dist_by`) AS dist_by,m.`features.dist_fname` AS dist_fname,m.`features.dist_lname` AS dist_lname,m.`features.is_match` AS is_match,apoc.convert.toInteger(m.`features.phnt_fname_sim`) AS phnt_fname_sim,apoc.convert.toInteger(m.`features.phnt_lname_sim`) AS phnt_lname_sim; "
},
"outputQuery": {
"cypher": "UNWIND ggMLResults AS ggMLResult MERGE (n:IndividualDeduplicationResult {individualA:ggMLResult.n1_grn,individualB:ggMLResult.n2_grn}) SET n.result=ggMLResult.predicted_label_string;"
}
}
In this policy, we are passing in all individual pairs in the graph that are not connected via a IS_MATCH relationship. In addition to the required input fields, there are
n1_grn
andn2_grn
which are mainly used for creating the output nodes. That is why they are included in the output schema. In output query, we create nodes with label IndividualDeduplicationResult to store the prediction results, each node is identified byn1_grn
andn2_grn
and stores the string label indicating whether the corresponding individual pair is a match or not.
Retry Batch Inference
Base URL: /1.0/ml/{{cluster-name}}/inference/batch/{{job-id}}
Method: PUT
If the batch inference job did not complete successfully and users wish not to start it all over again, this endpoint can satisfy that need. By including job id in the url path, the batch service will try its best to start from where it left off. The request body is a revised batch inference policy.
Headers > Content-Type: application/json
Body > Batch Inference Policy
Remove Batch Inference
This endpoint is used to remove the objects generated from a batch inference process, i.e. GraphGridBatchJob node and the prediction results.
Base URL: /1.0/ml/{{cluster-name}}/inference/batch?task={{mlTask}}&policyName={{policyName}}&jobId={{jobId}}
Method: DELETE
Remove the resources generated by a batch inference job.
Parameter | Description |
---|---|
task | Task of the model. |
policyName | Policy name of the model. |
jodId | Job id of the batch inference. |
Event
Event endpoint provides a feature which allows users to create an event-triggered action. The event is a conditional cypher query against the graph. Once that condition is satisfied, the defined action will be executed. There are three actions supported: Transformation, Training and Batch Inference.
Create Event Action
Base URL: /1.0/ml/{{cluster-name}}/events
Method: POST
Accept a JSON document request which includes a conditional cypher query and action definition.
Event Action Policy
The batch inference policy is essentially cypher data request with several additional elements.
Parmeter | Description |
---|---|
cypherCondition string | A conditional cypher query that blocks / starts the action. |
brokerType string | The type of message broker to use. - kafka - rabbitMQ - SQS |
actionType object | The type of action to execute. - transformation - training - batchInference |
policy object | The policy for the corresponding action type. I.e. a Transformation Policy, Training Policy or Batch Inference Policy. |
Example Policy
{
"cypherCondition": "MATCH (n:TestEvent) RETURN COUNT(n) >= 5",
"brokerType": "sqs",
"actionType": "transformation",
"policy": {
"policyName": "testEvent",
"overwrite": true,
"source": {
"cypher": "MATCH (n1:TestEvent), (n2:TestEvent) WHERE ID(n1) < ID(n2) RETURN n1.grn AS n1_grn, n2.grn AS n2_grn, n1.sex AS n1_sex, n2.sex AS n2_sex;",
"variables": ["n1_grn", "n2_grn", "n1_sex", "n2_sex"]
},
"destination": {
"nodeLabels": ["TestEvent", "TestEvent"],
"sourceGRNs": ["n1_grn", "n2_grn"]
},
"assignments": [
{
"inputs": ["n1_sex", "n2_sex"],
"assignment": "CASE WHEN n1_sex = n2_sex THEN 1 ELSE 0 END",
"output": "cmp_sex"
}
],
"feature": {
"cmp_sex": {
"source": "cmp_sex",
"description": "Whether two individuals have the same sex."
}
}
}
}
This policy creates transformation action that gets executed as long as the cypher condition is satisfied.