Skip to main content
Version: 2.0

GraphGrid Machine Learning (ML)

Platform Version 2.0

API Version 1.0

Introduction

GraphGrid ML provides machine learning operations across a graph database by integrating ONgDB with Spark. Users create policies with Geequel that defines their data for full Machine Learning pipeline management. GraphGrid ML is designed to address the following machine learning workflow:

  1. Transform data
  2. Train models
  3. Evaluate models
  4. Deploy models
  5. Make predictions

Prerequisite Information

Before diving in, be sure to familiarize yourself with this general information about machine learning models, as it pertains to GraphGrid ML.

  • Transformormation
    • Preprocessing your data by extracting and formatting features for ML tasks; Taking your raw data and transforming it into individual training points.
  • Training
    • Execute training on the transformed (pre-processed) data.
  • Inference
    • Make predictions by running one or more datapoints through the trained model.
  • Spark/MLlib
    • Spark is an analytics engine for data-processing at scale. MLlib is Spark's machine learning library.
    • Spark and MLib's "training framework"
    • common learning algorithms such as classification, regression, clustering, and collaborative filtering
    • offers common Transformers and Estimators (like OneHotEncoding and Word2Vec )

API

This ML API version is 1.0 and as such all endpoints are rooted at /1.0/. For example, http://localhost/1.0/ml (requires auth) would be the base context for this ML API under the GraphGrid API deployed at http://localhost.

Environment

ML requires the following integrations:

  • ONgDB 1.0+
  • Spark 2.4.0+

Check Status

Check the status of GraphGrid ML. Will return a 200 OK response if healthy.

Base URL: /1.0/ml/status
Method: GET

Request

curl --location --request GET "${API_BASE}/1.0/ml/status"

Response

{
"status": "OK"
}

Data Briefing

Throughout this documentation, a graph with the following elements will be used as an example.

  • Nodes:
    • Individual - stores a person's record.
      • sex - m or f
      • birthday - e.g. 1964-09-02
    • Name - stores a name's information.
      • givenName - e.g. Keanu
      • surname - e.g. Reeves
  • Relationships
    • HAS_NAME
    • IS_MATCH
      • value - match or not_match
  • Connections
    • (:Individual)-[:HAS_NAME]->(:Name)
    • (:Individual)-[:IS_MATCH]->(:Individual)

The goal is to develop a binary classification model that predicts whether two individual nodes are match or not_match.

Policy

Each step in the machine learning workflow can be defined and managed in a policy that is received by the corresponding endpoint. All policies are JSON documents. GraphGrid ML supports two types of policies: transformation policy and training policy. The common parameters in these policies are:

ParameterDescription
policyName stringName of the policy.
savePolicy booleanWhether to save the policy or not. Default: true
overwrite booleanWhether to allow the existing policy to be overwritten or not. Default: false
metdata objectThe metadata of the policy, including: <-description-creationAt-updateAt

A transformation policy defines the transformation workflow, how a raw data point should be processed to transform into features. This is done by preprocessing your data (a.k.a. feature generation), and extracting and formatting features for ML tasks.

A training policy defines the training workflow, how the futures should be used/manipulated to train our model. In many incarnations this is literal code, but here its policy driven. This is done by executing training on the pre-processed data.

Graph Structure

The following is a breakdown of the graph structure created by the ML pipeline

  • GraphGridMLFeature (GGMLF) node
    • created by a transformation policy
    • represents the stored features representing a single training data point compiled from transformation of the raw data
    • connects to all used nodes through a GG_ML_FROM relationship
  • GraphGridMLTransformation (GGMLT) node
    • node representing the transformation policy being used
    • connects to all GGMLF nodes created from the represented policy, also connnecting to GGMLT nodes
    • by default saved as a node to graph (can be turned off with savePolicy: False but will take performance hit when collecting GGMLF nodes for training)

Transformation

Training machine learning models relies on having data in a specific format. Raw graph data often needs some preprocessing before it can be used for training. GraphGrid ML uses JSON documents called polices to preprocess and transform graph data so that it is ready for ML tasks. These policies are part of a generic tooling for transforming raw graph data into training data by using full access to leverage Geequel queries. provides endpoints for extracting, transforming and selecting features from raw data stored in the graph. The transformation endpoint is at /transformations.

Transformation Policy

A transformation policy is a JSON document that defines how to transform raw graph data into feature nodes ready for training. The extracted features are then stored on a GraphGridMLFeature node. Transformation policies allow the user to get their srouces data diretly from the graph. Other than the common elements described in Policy, a transformation policy has the following level 1 parameters:

ParameterDescription
option optional objectIncludes the configuration for the apoc.periodic.iterate procedure call that is used to execute the cypher query generated by the policy
source objectDefines the input data for transformation.
destination objectDefines the label on the GraphGridMLFeature nodes and connections between the feature node and source data node.
assignment optional array of objectThe intermediate steps of generating the final feature set.
feature objectThe final feature set to be stored on the GraphGridMLFeature nodes.
note

If savePolicy is true, then a GraphGridMLTransformation node will be created that connects to all GraphGridMLFeature nodes generated by the policy via GG_ML_PRODUCES relationships. Otherwise, only the GraphGridMLFeature nodes are created.

Option

Once a training policy is sent to the transformation endpoint, the transformation service will analyze and process the policy and generate a set of cypher queries. These queries are executed using apoc.periodic.iterate procedure call. The procedure call supports six parameters (see link). The option field allows user to set the values for five of them:

|batchSize integer|"that many inner statements are run within a single tx params: {_count,_batch}" Default 1000| |parallel boolean|"run inner statement in parallel, note that statements might deadlock" Default false| |retries integer|"if the inner statement fails with an error, sleep 100ms and retry until retries-count is reached, param {_retry}" Default 0| |iterateList boolean|"the inner statement is only executed once but the whole batchSize list is passed in as parameter {_batch}" Default false| |concurrency integer|"How many concurrent tasks are generated when using parallel:true" Default 50|

Source

|cypher string|The cypher query to look up nodes, relationships and their properties to be processed.| |parameters key-value pairs|Values for cypher query with parameters.| |variables string list|Select the variables to be processed from the cypher query.|

Example
{
"source": {
"cypher": "MATCH (n1:Person)-[r1:HAS_NAME]->(name1:Name), (n2:Person)-[r2:HAS_NAME]->(name2:Name), (n1)-[l:IS_MATCH]-(n2) RETURN n1.grn AS n1_grn, n2.grn AS n2_grn, n1.birthday AS n1_birthday, n2.birthday AS n2_birthday, n1.sex AS n1_sex, n2.sex AS n2_sex, name1.givenName AS n1_givenName, name2.givenName AS n2_givenName, name1.surname AS n1_surname, name2.surname AS n2_surname,l.value AS label LIMIT 5",
"variables": [
"n1_grn",
"n2_grn",
"n1_birthday",
"n2_birthday",
"n1_sex",
"n2_sex",
"n1_givenName",
"n2_givenName",
"n1_surname",
"n2_surname",
"label"
]
}
}

The cypher above generates 5 rows and 11 columns of data. All the column names are included in variables, meaning all of them will be used to generate the final feature set.

Destination

It defines the node labels on the GraphGridMLFeature nodes and the grns of the source nodes to connect to from the GraphGridMLFeature nodes.

ParameterDescription
nodeLabels string listThe node labels of the the source nodes. It is mainly used for look up.
sourceGRNs string listThe grns of the source nodes. It is used for creating connections between source nodes and feature nodes.
Example
{
"destination": {
"nodeLabels": ["Individual", "Individual"],
"sourceGRNs": ["n1_grn", "n2_grn"]
}
}

The result is shown in the graph. The red node at the center is the GraphGridMLFeature node generated for one row from the source data. The nodeLabels property of that node is [Individual,Individual]. Since the feature set is extracted between two source nodes, two GG_ML_FROM relationships are created between the feature node and the source nodes.

Assignments

It contains a list of assignments. Each assignment represents a cypher syntax, function or procedure call. It takes some input variables and produces an output variable. Order matters.

ParameterDescription
inputs string
The input variables for the assignment. The input variables are either defined from the variables parameter in source field or a previous assignment.
assignment stringA cypher syntax, function or procedure call. For procedure calls, the YIELD keyword must be provided.
output stringThe variable that stores the result of the assignment. It can be an existing variable in the scope (the value gets overwritten), or a new variable.
Example
{
"assignments": [
{
"inputs": ["n1_birthday"],
"assignment": "split(n1_birthday, '-')",
"output": "n1_birthday_tokens"
},
{
"inputs": ["n2_birthday"],
"assignment": "split(n2_birthday, '-')",
"output": "n2_birthday_tokens"
},
{
"inputs": ["n1_birthday_tokens"],
"assignment": "n1_birthday_tokens[0]",
"output": "n1_birth_year"
},
{
"inputs": ["n1_birthday_tokens"],
"assignment": "n1_birthday_tokens[1]",
"output": "n1_birth_month"
},
{
"inputs": ["n1_birthday_tokens"],
"assignment": "n1_birthday_tokens[2]",
"output": "n1_birth_day"
},
{
"inputs": ["n2_birthday_tokens"],
"assignment": "n2_birthday_tokens[0]",
"output": "n2_birth_year"
},
{
"inputs": ["n2_birthday_tokens"],
"assignment": "n2_birthday_tokens[1]",
"output": "n2_birth_month"
},
{
"inputs": ["n2_birthday_tokens"],
"assignment": "n2_birthday_tokens[2]",
"output": "n2_birth_day"
},
{
"inputs": ["n1_birth_year", "n2_birth_year"],
"assignment": "apoc.text.hammingDistance(n1_birth_year, n2_birth_year)",
"output": "dist_by"
},
{
"inputs": ["n1_birth_month", "n2_birth_month"],
"assignment": "apoc.text.hammingDistance(n1_birth_month, n2_birth_month)",
"output": "dist_bm"
},
{
"inputs": ["n1_birth_day", "n2_birth_day"],
"assignment": "apoc.text.hammingDistance(n1_birth_day, n2_birth_day)",
"output": "dist_bd"
},
{
"inputs": ["n1_sex", "n2_sex"],
"assignment": "CASE WHEN n1_sex = n2_sex THEN 1 ELSE 0 END",
"output": "cmp_sex"
},
{
"inputs": ["n1_givenName", "n2_givenName"],
"assignment": "apoc.text.jaroWinklerDistance(n1_givenName, n2_givenName)",
"output": "dist_fname"
},
{
"inputs": ["n1_givenName"],
"assignment": "CALL apoc.text.doubleMetaphone(n1_givenName) YIELD value",
"output": "name1_double_metaphone_encoded_first_name"
},
{
"inputs": ["n1_surname"],
"assignment": "CALL apoc.text.doubleMetaphone(n1_surname) YIELD value",
"output": "name1_double_metaphone_encoded_last_name"
},
{
"inputs": ["n2_givenName"],
"assignment": "CALL apoc.text.doubleMetaphone(n2_givenName) YIELD value",
"output": "name2_double_metaphone_encoded_first_name"
},
{
"inputs": ["n2_surname"],
"assignment": "CALL apoc.text.doubleMetaphone(n2_surname) YIELD value",
"output": "name2_double_metaphone_encoded_last_name"
},
{
"inputs": [
"name1_double_metaphone_encoded_first_name",
"name2_double_metaphone_encoded_first_name"
],
"assignment": "apoc.text.levenshteinSimilarity(name1_double_metaphone_encoded_first_name, name2_double_metaphone_encoded_first_name)",
"output": "phnt_fname_sim"
},
{
"inputs": ["n1_surname", "n2_surname"],
"assignment": "apoc.text.jaroWinklerDistance(n1_surname, n2_surname)",
"output": "dist_lname"
},
{
"inputs": [
"name1_double_metaphone_encoded_last_name",
"name2_double_metaphone_encoded_last_name"
],
"assignment": "apoc.text.levenshteinSimilarity(name1_double_metaphone_encoded_last_name, name2_double_metaphone_encoded_last_name)",
"output": "phnt_lname_sim"
}
]
}

This example covers all scenarios of an assignment. There are built-in cypher functions like `split(n1_birthday, '-'), pure cypher syntax n1_birthday_tokens[0], APOC functions like apoc.text.hammingDistance(n1_birth_year, n2_birth_year), APOC procedure calls like CALL apoc.text.doubleMetaphone(n2_givenName) YIELD value. What it does is splitting the birthday property into year, month and day, and then compute the text similarity for each of them. For names, it computes the text similarity between the original values and the encoded values.

Feature

It defines the final feature set to be stored on the GraphGridMLFeature nodes. Each feature is a key-value pair, with key being the feature name on the node, and value being the variable from source or assignments fields and an optional string field description for comment.

Example
{
"feature": {
"dist_fname": {
"source": "dist_fname",
"description": "Jaro-Winkler distance of two first names."
},
"phnt_fname_sim": {
"source": "phnt_fname_sim",
"description": "Levenshtein similarity of two phonetic encoding first names."
},
"dist_lname": {
"source": "dist_lname",
"description": "Jaro-Winkler distance of two last names."
},
"phnt_lname_sim": {
"source": "phnt_lname_sim",
"description": "Levenshtein similarity of two phonetic encoding last names."
},
"cmp_sex": {
"source": "cmp_sex",
"description": "Whether two individuals have the same sex."
},
"dist_by": {
"source": "dist_by",
"description": "Hamming distance of two birth years."
},
"dist_bm": {
"source": "dist_bm",
"description": "Hamming distance of two birth months."
},
"dist_bd": {
"source": "dist_bd",
"description": "Hamming distance of two birth days."
},
"is_match": {
"source": "label",
"description": "Whether two individuals are the same."
}
}
}

There are 9 features in the final feature set. All of the variables can be found from source or assignments examples above. These features will be stored on the feature node as features.

Get Feature

Returns a feature.

Base URL: `/1.0/ml/{{clustername}}/features/?nodeLabels={{nodeLabel}},{{nodeLabel}}

Request
curl --location --request GET "${var.api.shellBase}/1.0/ml/default/features/?nodeLabels=Individual,Individual&limit=" \
--header "Authorization: Bearer ${var.api.auth.shellBearer}"

Create Transformation

Base URL: /1.0/ml/{{cluster-name}}/transformations/{{policy-name}}
Method: POST

Accept the transformation policy and execute the transformation workflow.

Request

curl --location --request POST "${var.api.shellBase}/1.0/ml/graphgrid-ml-example/transformations/transformation-policy" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${var.api.auth.shellBearer}" \
--data-raw '{
"savePolicy": true,
"overwrite": true,
"metadata": {
"description": "Features between Individual node and Individual node"
},
"source": {
"cypher": "MATCH (n1:TestIndividual)-[r1:HAS_NAME]->(name1:TestPersonalName), (n2:TestIndividual)-[r2:HAS_NAME]->(name2:TestPersonalName), (n1)-[l:IS_MATCH]-(n2) RETURN n1.grn AS n1_grn, n2.grn AS n2_grn, n1.birthday AS n1_birthday, n2.birthday AS n2_birthday, n1.sex AS n1_sex, n2.sex AS n2_sex, name1.givenName AS n1_givenName, name2.givenName AS n2_givenName, name1.surname AS n1_surname, name2.surname AS n2_surname,l.value AS label LIMIT 5",
"variables": [
"n1_grn",
"n2_grn",
"n1_birthday",
"n2_birthday",
"n1_sex",
"n2_sex",
"n1_givenName",
"n2_givenName",
"n1_surname",
"n2_surname",
"label"
]
},
"destination": {
"nodeLabels": [
"Individual",
"Individual"
],
"sourceGRNs": [
"n1_grn",
"n2_grn"
]
},
"assignments": [
{
"inputs": ["n1_birthday"],
"assignment": "split(n1_birthday, '\''-'\'')",
"output": "n1_birthday_tokens"
},
{
"inputs": ["n2_birthday"],
"assignment": "split(n2_birthday, '\''-'\'')",
"output": "n2_birthday_tokens"
},
{
"inputs": ["n1_birthday_tokens"],
"assignment": "n1_birthday_tokens[0]",
"output": "n1_birth_year"
},
{
"inputs": ["n1_birthday_tokens"],
"assignment": "n1_birthday_tokens[1]",
"output": "n1_birth_month"
},
{
"inputs": ["n1_birthday_tokens"],
"assignment": "n1_birthday_tokens[2]",
"output": "n1_birth_day"
},
{
"inputs": ["n2_birthday_tokens"],
"assignment": "n2_birthday_tokens[0]",
"output": "n2_birth_year"
},
{
"inputs": ["n2_birthday_tokens"],
"assignment": "n2_birthday_tokens[1]",
"output": "n2_birth_month"
},
{
"inputs": ["n2_birthday_tokens"],
"assignment": "n2_birthday_tokens[2]",
"output": "n2_birth_day"
},
{
"inputs": ["n1_birth_year", "n2_birth_year"],
"assignment": "apoc.text.hammingDistance(n1_birth_year, n2_birth_year)",
"output": "dist_by"
},
{
"inputs": ["n1_birth_month", "n2_birth_month"],
"assignment": "apoc.text.hammingDistance(n1_birth_month, n2_birth_month)",
"output": "dist_bm"
},
{
"inputs": ["n1_birth_day", "n2_birth_day"],
"assignment": "apoc.text.hammingDistance(n1_birth_day, n2_birth_day)",
"output": "dist_bd"
},
{
"inputs": ["n1_sex", "n2_sex"],
"assignment": "CASE WHEN n1_sex = n2_sex THEN 1 ELSE 0 END",
"output": "cmp_sex"
},
{
"inputs": ["n1_givenName", "n2_givenName"],
"assignment": "apoc.text.jaroWinklerDistance(n1_givenName, n2_givenName)",
"output": "dist_fname"
},
{
"inputs": ["n1_givenName"],
"assignment": "CALL apoc.text.doubleMetaphone(n1_givenName) YIELD value",
"output": "name1_double_metaphone_encoded_first_name"
},
{
"inputs": ["n1_surname"],
"assignment": "CALL apoc.text.doubleMetaphone(n1_surname) YIELD value",
"output": "name1_double_metaphone_encoded_last_name"
},
{
"inputs": ["n2_givenName"],
"assignment": "CALL apoc.text.doubleMetaphone(n2_givenName) YIELD value",
"output": "name2_double_metaphone_encoded_first_name"
},
{
"inputs": ["n2_surname"],
"assignment": "CALL apoc.text.doubleMetaphone(n2_surname) YIELD value",
"output": "name2_double_metaphone_encoded_last_name"
},
{
"inputs": ["name1_double_metaphone_encoded_first_name", "name2_double_metaphone_encoded_first_name"],
"assignment": "apoc.text.levenshteinSimilarity(name1_double_metaphone_encoded_first_name, name2_double_metaphone_encoded_first_name)",
"output": "phnt_fname_sim"
},
{
"inputs": ["n1_surname", "n2_surname"],
"assignment": "apoc.text.jaroWinklerDistance(n1_surname, n2_surname)",
"output": "dist_lname"
},
{
"inputs": ["name1_double_metaphone_encoded_last_name", "name2_double_metaphone_encoded_last_name"],
"assignment": "apoc.text.levenshteinSimilarity(name1_double_metaphone_encoded_last_name, name2_double_metaphone_encoded_last_name)",
"output": "phnt_lname_sim"
}
],
"feature": {
"dist_fname": {
"source": "dist_fname",
"description": "Jaro-Winkler distance of two first names."
},
"phnt_fname_sim": {
"source": "phnt_fname_sim",
"description": "Levenshtein similarity of two phonetic encoding first names."
},
"dist_lname": {
"source": "dist_lname",
"description": "Jaro-Winkler distance of two last names."
},
"phnt_lname_sim": {
"source": "phnt_lname_sim",
"description": "Levenshtein similarity of two phonetic encoding last names."
},
"cmp_sex": {
"source": "cmp_sex",
"description": "Whether two individuals have the same sex."
},
"dist_by": {
"source": "dist_by",
"description": "Hamming distance of two birth years."
},
"dist_bm": {
"source": "dist_bm",
"description": "Hamming distance of two birth months."
},
"dist_bd": {
"source": "dist_bd",
"description": "Hamming distance of two birth days."
},
"is_match": {
"source": "label",
"description": "Whether two individuals are the same."
}
}
}'

Response

{
"response": null,
"exception": null,
"statusText": null,
"statusCode": 0,
"policyName": "transformation-policy",
"featureNodeCreation": {
"batches": 1,
"total": 5,
"timeTaken": 0,
"committedOperations": 5,
"failedOperations": 0,
"failedBatches": 0,
"retries": 0,
"errorMessages": {},
"batch": {
"total": 1,
"committed": 1,
"failed": 0,
"errors": {}
},
"operations": {
"total": 5,
"committed": 5,
"failed": 0,
"errors": {}
},
"wasTerminated": false,
"failedParams": {}
},
"featureComputation": {
"batches": 1,
"total": 5,
"timeTaken": 0,
"committedOperations": 5,
"failedOperations": 0,
"failedBatches": 0,
"retries": 0,
"errorMessages": {},
"batch": {
"total": 1,
"committed": 1,
"failed": 0,
"errors": {}
},
"operations": {
"total": 5,
"committed": 5,
"failed": 0,
"errors": {}
},
"wasTerminated": false,
"failedParams": {}
}
}

Get Transformation Policy

Base URL: /1.0/ml/{{clusterName}}/transformations?policyName={{policyName}}
Method: GET

Fetch the uploaded transformation policy.

ParametersDescription
policyNameName of the transformation policy.

Request

curl --location --request GET "${var.api.shellBase}/1.0/ml/graphgrid-ml-example/transformations?policyName=transformation-policy" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${var.api.auth.shellBearer}"

Get Transformation Status

Base URL: https://api.graphgrid.com/1.0/ml/{{cluster-name}}/transformations/status?policyName={{mlPolicyName}}
Method: GET

Show the policy name and feature node metadata associated with the transformation node.

ParametersDescription
policyNameName of the transformation policy.

Request

curl --location --request GET "${var.api.shellBase}/1.0/ml/graphgrid-ml-example/transformations/status?policyName=transformation-policy" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${var.api.auth.shellBearer}"

Response

{
"statusCode": 0,
"policyName": "transformation-policy",
"featureNodeTypes": [
{
"nodeLabels": ["Individual", "Individual"],
"count": 4,
"features": [
"cmp_sex",
"dist_bd",
"dist_bm",
"dist_by",
"dist_fname",
"dist_lname",
"is_match",
"phnt_fname_sim",
"phnt_lname_sim"
]
}
]
}

Get Inner Transformation Status

Base URL: /1.0/ml/{{cluster-name}}/transformations/status/inner?task={{mlTask}}&policyName={{mlPolicyName}}
Method: GET

Show the task, policy name and feature node metadata associated with the inner transformation node.

ParametersDescription
taskTask of the training.
policyNameName of the training policy.

Delete Transformation

Base URL: 1.0/ml/{{cluster-name}}/transformations?policyName={{policyName}}
Method: DELETE

Delete the transformation policy, transformation node and the relationships to feature nodes.

ParametersDescription
policyNameName of the transformation policy.

Feature

GraphGrid ML provides an endpoint that returns the features on feature nodes with a given node labels value in JSON document. The feature endpoint is at /features.

Show Feature

Base URL: /1.0/ml/{{cluster-name}}/features?nodeLabels={{nodeLabel}}
Methiod: GET

Return the features stored on feature nodes with provided node labels.

ParametersDescription
nodeLabelsNode labels.
limitMaximum number of feature sets.

Example Response

{
"features": [
{
"cmp_sex": 1,
"dist_bd": 0,
"dist_bm": 0,
"dist_by": 0,
"dist_fname": 1,
"dist_lname": 1,
"is_match": "match",
"phnt_fname_sim": 1,
"phnt_lname_sim": 1
},
{
"cmp_sex": 1,
"dist_bd": 2,
"dist_bm": 2,
"dist_by": 2,
"dist_fname": 0.5777777777777778,
"dist_lname": 1,
"is_match": "not_match",
"phnt_fname_sim": 0.6666666666666666,
"phnt_lname_sim": 1
}
]
}

Training

GraphGrid ML uses Spark MLlib as the training framework. The training endpoint is at /trainings. The training policy defines the source data, the training workflow and metrics for evaluation. The training data is imported using Cypher query. After the training process is completed, a GraphGridMLModel node is created on which it stores the following information:

  • PipelineStages used
  • Training and test data count
  • Evaluation metric values
  • Required input and output column schema

The transformation endpoint is at /trainings.

Train Model

Base URL: https://api.graphgrid.com/1.0/ml/{{cluster-name}}/trainings/{{task}}/{{policy-name}}
Method: POST

Training Policy

A JSON document that defines the workflow for training a machine learning model. Since the training framework is Spark ML, a lot of Spark SQL and Spark MLlib's concepts are used. Other than the common elements described in Policy, a transformation policy has the following level 1 parameters:

ParmeterDescription
dataFrame objectIncludes the cypher query and schema to construct the input DataFrame.
pipelineStages objectDefines a series of Pipeline stages to be used during the training process.
trainingData objectSpecify the DataFrame for training.
trainingPipelines array of stringsSpecify the pipeline stages to be included in the final ML pipeline.
evaluators optional array of objectsSpecify one or evaluators for the trained model.
note

The savePolicy field is forced to be true for training policy.

The dataFrame and every Pipeline stage object in the pipelineStages have a common parameter name. It is used as a reference of the current element to be called by latter elements via {{reference}}.

DataFrame

In Spark SQL, a DataFrame is conceptually equivalent to a table in a relational database or a data frame in R/Python. In this training policy, dataFrame contains the parameters for constructing a DataFrame that is going to be used throughout the training process.

ParmeterDescription
name stringThe reference of the DataFrame.
transformation optional objectContains cypher, variables, destination, assignments and feature elements from Transformation Policy. It is an inner transformation policy that gets executed to prepare data for the DataFrame. After an inner transformation policy gets executed, a GraphGridMLInnerTransformation node is created and connected to the associated GraphGridMLFeature nodes and the GraphGridMLModel node.
cypher stringThe cypher query to fetch columns of data from the graph.
parameters optional mapValues for cypher query with parameters.
schema objectThe schema of each column in the DataFrame, including the name and data type. See Schema for more information
Example
{
"dataFrame": {
"name": "sourceData",
"cypher": "MATCH (n:GraphGridMLTransformation {policyName:'sample'})-[:GG_ML_PRODUCES]->(m:GraphGridMLFeature) RETURN apoc.convert.toInteger(m.`features.cmp_sex`) AS cmp_sex,apoc.convert.toInteger(m.`features.dist_bd`) AS dist_bd,apoc.convert.toInteger(m.`features.dist_bm`) AS dist_bm,apoc.convert.toInteger(m.`features.dist_by`) AS dist_by,m.`features.dist_fname` AS dist_fname,m.`features.dist_lname` AS dist_lname,m.`features.is_match` AS is_match,apoc.convert.toInteger(m.`features.phnt_fname_sim`) AS phnt_fname_sim,apoc.convert.toInteger(m.`features.phnt_lname_sim`) AS phnt_lname_sim;",
"schema": [
{
"name": "cmp_sex",
"type": "integer"
},
{
"name": "dist_bd",
"type": "integer"
},
{
"name": "dist_bm",
"type": "integer"
},
{
"name": "dist_by",
"type": "integer"
},
{
"name": "dist_fname",
"type": "double"
},
{
"name": "dist_lname",
"type": "double"
},
{
"name": "is_match",
"type": "string"
},
{
"name": "phnt_fname_sim",
"type": "integer"
},
{
"name": "phnt_lname_sim",
"type": "integer"
}
]
}
}

In this example, the cypher query is looking up feature nodes associated with a transformation node created earlier (from Transformation, and selecting properties on those feature nodes to be used as the columns of the DataFrame. The schema defines the names and data type of the DataFrame columns.

PipelineStage

The pipelineStages element contains a series of PipelineStages. Pipeline is a concept from Spark MLlib (see ML Pipelines. A PipelineStage is a Pipeline component, either Transformer or Estimator. The training policy allows users to define the pipeline stages with JSON syntax. The core parameters of applying a Pipeline component from Spark MLlib are the follows:

ParameterDescription
name stringThe reference of the PipelineStage.
type stringThe class name of the Pipeline component from Spark MLlib.
parameters mapParameters of the Pipeline component.
outputs optional mapCreate references for the return values of getter methods of the Pipeline component.

Depending on the type , the PipelineStage can be a Transformer or Estimator. The parameters are different for each of them.

Transformer

A Transformer transforms a DataFrame into another DataFrame. The parameter for applying a Transformer is transform and it has three fields:

|name string|The reference of the transformed DataFrame.| |dataFrame string|The reference of a DataFrame already defined in the scope.| |outputs optional map|Create references for the return value of getter methods of the transformed DataFrame.|

Estimator

An Estimator takes in a DataFrame and outputs a Transformer. The parameter for applying an Estimator is fit and it has four fields, one of them is a nested transform which allows users to apply the generated Transformer immediately after applying the Estimator, in one block of PipelineStage definition. The fields of fit are:

|name string|The reference of the generated Transformer.| |dataFrame string|The reference of a DataFrame already defined in the scope.| |outputs optional map|Create references for the return value of getter methods of the generated Transformer.| |transform optional object|The object used to apply the generated Transformer. See Transformer.|

Example
{
"pipelineStages": [
{
"name": "stringIndexer",
"type": "StringIndexer",
"parameters": {
"inputCol": "is_match",
"outputCol": "label"
},
"fit": {
"name": "stringIndexerModel",
"dataFrame": "{{sourceData}}",
"outputs": {
"indexerLabels": "labels"
},
"transform": {
"name": "training",
"dataFrame": "{{sourceData}}"
}
}
},
{
"name": "assembler",
"type": "VectorAssembler",
"parameters": {
"inputCols": [
"cmp_sex",
"dist_bd",
"dist_bm",
"dist_by",
"dist_fname",
"dist_lname",
"phnt_fname_sim",
"phnt_lname_sim"
],
"outputCol": "features"
}
},
{
"name": "lr",
"type": "LogisticRegression",
"parameters": {
"predictionCol": "predicted_label"
}
},
{
"name": "indexToString",
"type": "IndexToString",
"parameters": {
"inputCol": "predicted_label",
"outputCol": "predicted_label_string",
"labels": "{{indexerLabels}}"
}
}
]
}

In this example, four PipelineStages are defined, each corresponds to a Transformer or Estimator from SparkMLlib: StringIndexer, VectorAssembler, LogisticRegression and IndexToString.

The first PipelineStage, StringIndexer, is an Estimator, so its definition has fit parameter. It is used to create a number column "label" for the string column "is_match" as LogisticRegression model requires the label column to be in number format. Moreover, the dataFrame field of fit is {{sourceData}}, which refer to the DataFrame defined in dataFrame in previous section. The fit generates a StringIndexerModel object, a Transformer. We want to use the labels value carried by this Transformer later for creating the IndexToString model, therefore in outputs field inside fit we create a reference to the return value of StringIndexerModel.labels() method. Further, we want to append the number column to the final DataFrame. Hence we apply the generated transformer on the sourceData DataFrame in transform field and create a reference for the transformed DataFrame.

The second PipelineStage is VectorAssembler which collects the features columns into a vector column. The next one is LogisticRegression which is the core of this training pipeline, generating the predicted value in the "predicted_label" column. The last PipelineStage is IndexToString which converts the predicted value (in number) back to its corresponding string value. We can see that in its parameters field, the value of labels parameter is a reference to the labels array named indexerLabels which we defined in the first PipelineStage.

Training Data

Specify the DataFrame and training / test ratio for training data.

ParameterDescription
dataFrame stringThe reerence of a DataFrame.
trainingRatio floatThe percentage of DataFrame to be used as training data.
Example
{
"trainingData": {
"dataFrame": "{{training}}",
"trainingRatio": 0.7
}
}

We are using the DataFrame with number column "label" appended as source of training data. So the value of name is {{training}}. 70% of the DataFrame will be used for training and 30% will be used for evaluation so the value of trainingRatio is 0.7.

Training Pipelines

Specify the final PipelineStage set for training.

Example
{
"trainingPipelines": ["assembler", "lr", "indexToString"]
}

Three of the PipelineStages defined earlier are included in the final PipelineStage set. They will form the ML Pipeline and apply on the training data.

Evaluator

Evaluator is another concept from Spark MLlib, just like Pipeline components. Evaluators are used to evaluate the performance of a model, computing the metrics that reflects the quality of a model. There are four Evaluators supported by GraphGrid ML.

EvaluatorMetrics
BinaryClassificationEvaluatorareaUnderROC: Area under a Receiver Operating Characteristic curve. areaUnderPR: Area under a Precision-Recall curve.
MulticlassClassificationEvaluatorf1: F1 score. weightedPrecision:Weighted average precision score. accuracy: Accuracy score.
RegressionEvaluatorrmse: Root mean squared error mseMean squared error r2: R2 score
ClusteringEvaluatorsilhouette: Mean Silhouette Coefficient of all samples

To apply one or more Evaluators on a model, fill the evaluators parameter of the training policy. The syntax of this parameter is similar to the one of pipelineStages.

ParameterDescription
type stringThe class name of the Evaluator from Spark MLlib.
parameters mapParameters of the Evaluator.
metricslistMetrics to use provided by the Evaluator.
Example
{
"evaluators": [
{
"type": "BinaryClassificationEvaluator",
"metrics": ["areaUnderROC", "areaUnderPR"]
}
]
}

In this example, we apply a BinaryClassificationEvaluator Evaluator for our binary classification model. The metrics are areaUnderROC and areaUnderPR.

Graph

After executing the sample policy above, the following elements are created in the graph.

The blue node is the GraphGridMLModel node, it is connected to the purple GraphGridMLInnerTransformation node via GG_ML_PRODUCES relationship. The GraphGridMLInnerTransformation node is defined in dataFrame parameter and connects to all the GraphGridMLFeature nodes it produces via GG_ML_PRODUCES relationships.

The model node carries properties about:

  • Task and policy name
  • Training and test data count
  • PipelineStages used
    • E.g. VectorAssembler,LogisticRegression,IndexToString, which are the ones specified in the trainingPipelines parameter.
  • Metrics: metric scores
    • E.g. metrics.areaUnderPR: 1 which means the areaUnderPR of this model is 1.
  • Input and output schema: basic data type and structure data type of each column.
    • E.g. inputSchema.dist_bm: double,basic which means the basic data type and structure data type of the input column named "dist_bm" are double and basic.

Get Training Policy

Base URL: /1.0/ml/{{cluster-name}}/trainings?task={{mlTask}}&policyName={{mlPolicyName}}
Method: GET

Fetch the uploaded training policy.

ParameterDescription
taskTask of the training.
policyNameName of the training policy.

Model

The model endpoint is at /models. It accepts two requests, one for displaying the information of a trained model, and one for removing a trained model.

Delete Model

Base URL: /1.0/ml/{{cluster-name}}/models?task={{mlTask}}&policyName={{mlPolicyName}}
MethodL DELETE

ParameterDescription
taskTask of the training.
policyNameName of the training policy.

Inference

Once a model is trained, it can be used to predict the outcome for a given set of input data. The input and output schema must be acceptable by the trained model, otherwise an error will occur. There are two types of inference: real time and batch inference. Real time inference handles a small amount of input data and returns the predictions in a short amount of time in a response. Batch inference accepts a large amount of input data and stores the prediction results in the graph. The inference endpoint is at /inference.

Realtime Inference

Load Model

Base URL: /1.0/ml/{{cluster-name}}/inference/model
Method: POST

Load the trained model specified by the task and policy name.

{
"task": "sample-task",
"policyName": "sample-policy"
}

Unload Model

Base URL: /1.0/ml/{{cluster-name}}/model
Method: DELETE

Unload the model that is currently loaded.

ParametersDescription
taskTask of the training.
policyNameName of the training policy.

Schema

It specifies the column schema for a data frame. Each column schema contains three fields. Note that during the training phase, only the first two parameters will be effective. In other words, the struct parameter will be ignored in the training policy. In the inference requests, all of them will be used.

ParameterDescription
name stringThe name of the column.
type stringThe basic type of the column.
struct optional stringThe structure data type of the column. - basic: basic data types like int, double, string, etc. - vector: variable length structure of basic data types. - array: fixed length structure of basic data types. Default: basic

The schema for inference requests must be acceptable by the target model. To confirm the validity of the schema, check the model information for valid input and output column schema.

Example

{
"schema": {
"input": [
{
"name": "cmp_sex",
"type": "integer"
},
{
"name": "dist_bd",
"type": "integer"
},
{
"name": "dist_bm",
"type": "integer"
},
{
"name": "dist_by",
"type": "integer"
},
{
"name": "dist_fname",
"type": "double"
},
{
"name": "dist_lname",
"type": "double"
},
{
"name": "phnt_fname_sim",
"type": "integer"
},
{
"name": "phnt_lname_sim",
"type": "integer"
}
],
"output": [
{
"name": "predicted_label_string",
"type": "string"
},
{
"name": "probability",
"type": "double",
"struct": "vector"
}
]
}
}

There are 8 columns in the input data for the trained binary classification model. We are interested in 2 columns in the output data: a string column indicating whether the result is match or not_match, and a vector of double column showing the probabilities of getting either result.

Create JSON Data Inference

Base URL: https://api.graphgrid.com/1.0/ml/{{cluster-name}}/inference/json
Method: POST

Accept a JSON document request in which one row of input data is included and return the prediction.

Headers > Content-Type: application/json
Body > JSON Data Request

JSON Data Inference

The JSON data inference allows only one row of data to be sent. The input data is in the list parameter named data. The entries must follow the order and data type of the input schema.

Example Request
{
"schema": {
...
},
"data": [
1,
1,
1,
1,
0.45,
0.7,
1,
1
]
}
Example Response
{
"result": {
"predicted_label_string": "not_match",
"probability": [0.9862161796038926, 0.013783820396107364]
}
}

Create Cypher Data Inference

Base URL: https://api.graphgrid.com/1.0/ml/{{cluster-name}}/inference/cypher
Method: POST

Cypher Dara Inference

The Cypher data inference allows user to enter a cypher query with parameters to fetch one or more rows of data to be sent. The data must follow the order, name, and data type of the input schema.

Example Request
{
"schema": {
...
},
"data": {
"cypher": "WITH [$list1,$list2] AS colls UNWIND colls AS coll RETURN coll[0] AS cmp_sex, coll[1] AS dist_bd, coll[2] AS dist_bm, coll[3] AS dist_by, coll[4] AS dist_fname, coll[5] AS dist_lname, coll[6] AS phnt_fname_sim, coll[7] AS phnt_lname_sim;",
"parameters": {
"list1": [
1,
0,
1,
0,
0.8,
0.5,
1,
1
],
"list2": [
1,
1,
2,
1,
0.31,
0.43,
0,
0
]
}
}
}
Example Response
{
"results": [
{
"predicted_label_string": "match",
"probability": [0.00001481113003280617, 0.9999851888699672]
},
{
"predicted_label_string": "not_match",
"probability": [0.9999999999997948, 2.052527119198712e-13]
}
]
}

Batch Inference

Batch inference endpoints allow the user to create and remove a batch inference job. The batch inference job handles a large data input and export the prediction results back in the graph. Unlike real time inference which produces the results in a short amount of time, batch inference job usually takes a long time as the input is large. Once a batch inference job is created, a GraphGridMLBatchInference node will be generated in the graph that contains the metadata of the batch inference job:

ParameterDescription
task stringThe task of the target trained model.
policyName stringThe policy name of the target trained model.
job id stringThe job id of the batch inference job.
Example: job_20190314102200_0000
status stringThe status of the batch inference job. - RUNNING - SUCCEEDED - FAILED
state intA number code indicating the last executed state of the batch inference job. - 0 - Job initialized 1 - Model loaded 2 - Input data loaded 4 - Inference results generated 5 - Inference results uploaded 6 - Inference results written to the graph
totalPartitions longTotal partitions of the inference results generated.
processedPartitions longNumber of partitions written to the graph.

Create Batch Inference

Base URL: /1.0/ml/{{cluster-name}}/inference/batch
Method: POST

Accept a JSON document request which includes cypher query that defines the input and output data.

Headers > Content-Type: application/json
Body > Batch Inference Policy

Batch Inference Policy

The batch inference policy is essentially cypher data request with the additional elements as follows:

ParameterDescription
task stringThe name of the column.
policyName stringThe basic data type of the column.
schema objectThe strucutre data type of the column. Default: basic
inputQuery objectCypher query and optional parameters to load input data.
outputQuery objectCypher query and optional parameters to write output data. The output fields specified in the output schema will be stored on a collection of map object named ggMLResults. Users can decide what they want with this object. Each entry of this object contains a row of output data.
Example Policy
{
"task": "local",
"policyName": "test-local",
"schema": {
"input": [
{
"name": "n1_grn",
"type": "string"
},
{
"name": "n2_grn",
"type": "string"
},
{
"name": "cmp_sex",
"type": "integer"
},
{
"name": "dist_bd",
"type": "integer"
},
{
"name": "dist_bm",
"type": "integer"
},
{
"name": "dist_by",
"type": "integer"
},
{
"name": "dist_fname",
"type": "double"
},
{
"name": "dist_lname",
"type": "double"
},
{
"name": "phnt_fname_sim",
"type": "integer"
},
{
"name": "phnt_lname_sim",
"type": "integer"
}
],
"output": [
{
"name": "n1_grn"
},
{
"name": "n2_grn"
},
{
"name": "predicted_label_string"
}
]
},
"inputQuery": {
"cypher": "MATCH (n:GraphGridMLTransformation)-[:GG_ML_PRODUCES]->(m:GraphGridMLFeature) RETURN m.`features.n1_grn` AS n1_grn, m.`features.n2_grn` AS n2_grn, apoc.convert.toInteger(m.`features.cmp_sex`) AS cmp_sex,apoc.convert.toInteger(m.`features.dist_bd`) AS dist_bd,apoc.convert.toInteger(m.`features.dist_bm`) AS dist_bm,apoc.convert.toInteger(m.`features.dist_by`) AS dist_by,m.`features.dist_fname` AS dist_fname,m.`features.dist_lname` AS dist_lname,m.`features.is_match` AS is_match,apoc.convert.toInteger(m.`features.phnt_fname_sim`) AS phnt_fname_sim,apoc.convert.toInteger(m.`features.phnt_lname_sim`) AS phnt_lname_sim; "
},
"outputQuery": {
"cypher": "UNWIND ggMLResults AS ggMLResult MERGE (n:IndividualDeduplicationResult {individualA:ggMLResult.n1_grn,individualB:ggMLResult.n2_grn}) SET n.result=ggMLResult.predicted_label_string;"
}
}

In this policy, we are passing in all individual pairs in the graph that are not connected via a IS_MATCH relationship. In addition to the required input fields, there are n1_grn and n2_grn which are mainly used for creating the output nodes. That is why they are included in the output schema. In output query, we create nodes with label IndividualDeduplicationResult to store the prediction results, each node is identified by n1_grn and n2_grn and stores the string label indicating whether the corresponding individual pair is a match or not.

Retry Batch Inference

Base URL: /1.0/ml/{{cluster-name}}/inference/batch/{{job-id}}
Method: PUT

If the batch inference job did not complete successfully and users wish not to start it all over again, this endpoint can satisfy that need. By including job id in the url path, the batch service will try its best to start from where it left off. The request body is a revised batch inference policy.

Headers > Content-Type: application/json
Body > Batch Inference Policy

Remove Batch Inference

This endpoint is used to remove the objects generated from a batch inference process, i.e. GraphGridBatchJob node and the prediction results.

Base URL: /1.0/ml/{{cluster-name}}/inference/batch?task={{mlTask}}&policyName={{policyName}}&jobId={{jobId}} Method: DELETE

Remove the resources generated by a batch inference job.

ParameterDescription
taskTask of the model.
policyNamePolicy name of the model.
jodIdJob id of the batch inference.

Event

Event endpoint provides a feature which allows users to create an event-triggered action. The event is a conditional cypher query against the graph. Once that condition is satisfied, the defined action will be executed. There are three actions supported: Transformation, Training and Batch Inference.

Create Event Action

Base URL: /1.0/ml/{{cluster-name}}/events
Method: POST

Accept a JSON document request which includes a conditional cypher query and action definition.

Event Action Policy

The batch inference policy is essentially cypher data request with several additional elements.

ParmeterDescription
cypherCondition stringA conditional cypher query that blocks / starts the action.
brokerType stringThe type of message broker to use. - kafka - rabbitMQ - SQS
actionType objectThe type of action to execute. - transformation - training - batchInference
policy objectThe policy for the corresponding action type. I.e. a Transformation Policy, Training Policy or Batch Inference Policy.
Example Policy
{
"cypherCondition": "MATCH (n:TestEvent) RETURN COUNT(n) >= 5",
"brokerType": "sqs",
"actionType": "transformation",
"policy": {
"policyName": "testEvent",
"overwrite": true,
"source": {
"cypher": "MATCH (n1:TestEvent), (n2:TestEvent) WHERE ID(n1) < ID(n2) RETURN n1.grn AS n1_grn, n2.grn AS n2_grn, n1.sex AS n1_sex, n2.sex AS n2_sex;",
"variables": ["n1_grn", "n2_grn", "n1_sex", "n2_sex"]
},
"destination": {
"nodeLabels": ["TestEvent", "TestEvent"],
"sourceGRNs": ["n1_grn", "n2_grn"]
},
"assignments": [
{
"inputs": ["n1_sex", "n2_sex"],
"assignment": "CASE WHEN n1_sex = n2_sex THEN 1 ELSE 0 END",
"output": "cmp_sex"
}
],
"feature": {
"cmp_sex": {
"source": "cmp_sex",
"description": "Whether two individuals have the same sex."
}
}
}
}

This policy creates transformation action that gets executed as long as the cypher condition is satisfied.