Skip to main content
Version: 2.0

GraphGrid Fuze

Platform Version 2.0

API Version 1.0

Intro to Fuze

GraphGrid Fuze provides integration services to distribute, route and transform transactional event data from ONgDB to trigger dynamic workers for graph processing, searching, indexing and machine learning processes. Fuze is comprised of 5 components: Distributor, ONgDB Writer/Reader, Trigger Manager, and Worker.

This Fuze API version is 1.0 and as such all endpoints are rooted at /1.0/. For example, http://localhost/1.0/fuze (requires auth) would be the base context for this Fuze API under the GraphGrid API deployed at http://localhost.

Environment

Fuze requires the following integrations:

  • ONgDB 1.0+
  • Redis 5.0.7+
  • Elasticsearch 6.3+
  • Kafka 2.0+
  • RabbitMQ 3.7+

Policy Management

Each component of Fuze (Distributor, ONgDB Writer, Trigger Manager, and Worker) uses policies to manage which data flows through it and how that data is processed. For details on how to create a policy for a particular Fuze component, see the section for that component. All Fuze components use the same API endpoints for basic interaction with policies.

Common Policy Field Types

There are several field types that appear in multiple types of policies and so are detailed below.

Metadata

Provides information about the policy. This information is purely for the user and is not used by Fuze in any way.

ParameterDescription
description stringA description of what the policy does Default: null
displayName stringA human parsable name for the policy. Default: null
Example
{
"displayName": "ongdbwriter-replication-policy",
"description": "Listen on broker endpoints for TxData, convert it to Geequel, and copy the changes to this ONgDB instance."
}

Broker Endpoint

Specifies an endpoint on a message broker to either send data to or receive data from.

The fields used when either sending a message or creating a listener depends on the value of the broker field.

ParameterDescription
broker stringThe message broker to use. Options are KAFKA, RABBITMQ, or SQS Default: null
topic stringthe Kafka topic Default: null
exchange stringThe RabbitMQ exchange Default: null
exchangeType stringThe type of the RabbitMQ exchange. Options are direct, topic, fanout, headers, or system Default: topic
routingKey stringThe RabbitMQ routing key. Default: null
durable booleanIs the RabbitMQ queue durable? Default: true
exclusive booleanIs the RabbitMQ queue exclusive? Default: false
autoDelete booleanDelete the RabbitMQ queue after the consumer is destroyed? Default: false
prefetchCount int(optional) The number of RabbitMQ messages to prefetch. Default: 250
queue stringThe RabbitMQ or SQS queue. Default: null
region stringThe AWS region in which the SQS queue exists. Default: null
bucket stringThe S3 bucket to use to temporarily hold large messages. Default: null
Examples

KAFKA

{
"broker": "KAFKA",
"topic": "com.graphgrid.fuze.topic"
}

RabbitMQ

{
"broker": "RABBITMQ",
"exchange": "com.graphgrid.fuze.exchange",
"routingKey": "com.graphgrid.fuze.routingKey",
"queue": "com.graphgrid.fuze.queue"
}

SQS

{
"broker": "SQS",
"region": "us-west-2",
"bucket": "graphgrid-fuze-bucket",
"queue": "graphgrid-fuze-queue"
}

ONgDB Credentials

Specifies how to connect to a ONgDB instance (or cluster).

ParameterDescription
uri stringThe bolt or bolt+routing endpoint for the ONgDB instance (including the protocol, hostname, and port number). Default: null
username stringA valid username for the ONgDB instance. Default: null
password stringThe password that goes with the username. Default: null
Example
{
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
}

Get Status

Get the status of GraphGrid Fuze. Will return a 200 OK response if healthy.

Base URL: /1.0/fuze/status
Method: GET

Request

curl --location --request GET "${API_BASE}/1.0/fuze/status"

Response

{
"status": "OK"
}

Save Policy

Save a Policy for later use and returns the saved Policy.

Base URL: /1.0/fuze/{{cluster-name}}/savePolicy/{{policy-name}}
Method: POST

ParameterDescription
cluster-nameName of the cluster to save the policy on
policy-nameName of the policy

Request

This request saves a Distributor policy. For details on how to create a policy for a particular Fuze component, see the section for that component. All Fuze components use the same API endpoints for basic interaction with policies.

curl --location --request POST "${API_BASE}/1.0/fuze/default/savePolicy/example-distributor-policy" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${BEARER_TOKEN}" \
--data-raw '{
"metadata": {
"description": "Handle txData related to NLP.",
"displayName": "example-distributor-policy"
},
"listeningBrokerEndpoint": {
"broker": "RABBITMQ",
"exchange": "com.graphgrid.fuze.exchange",
"routingKey": "com.graphgrid.fuze.key",
"queue": "com.graphgrid.fuze.queue"
},
"forwardingRules": [
{
"cypher": "WITH {txData} AS txData WITH txData.createdRelationships AS rels WITH [k IN KEYS(rels) WHERE ANY(x IN rels[k] WHERE x.type = \"SUMMARY_SENTENCE\") | rels[k].uidOfStartNode] AS resultList UNWIND resultList as annotatedTextUid WITH annotatedTextUid, apoc.text.split(annotatedTextUid, \":`\", 3)[-1] AS tmp WITH annotatedTextUid, substring(tmp, 0, length(tmp)-1) AS annotatedTextGrn MATCH (a:Article)-[:HAS_ANNOTATED_TEXT]->(an:AnnotatedText {grn: annotatedTextGrn}) WITH COLLECT(DISTINCT \"``:``:`\" + a.grn + \"`\") AS articleGrns RETURN {numTries: 0, policyName: \"default-index-policy\", clusterName: \"default\", transactionId: timestamp(), indexName: \"article\", strategyUsed: \"defaultBroker\", idPartition: apoc.convert.toString(articleGrns)} AS results",
"resultKey": "results",
"multicast": {
"retry": 0,
"stopOnFailure": false,
"brokers": [
{
"broker": "RABBITMQ",
"exchange": "com.graphgrid.search.exchange",
"routingKey": "com.graphgrid.search.key",
"queue": "com.graphgrid.search.queue"
}
]
}
}
],
"neo4jCredentials": {
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "*****"
}
}'

Response

{
"policy": {
"metadata": {
"description": "Handle txData related to NLP.",
"displayName": "example-distributor-policy",
"createdAt": "2019-10-30T19:26:37+00:00",
"updatedAt": "2021-03-05T15:03:27.302Z",
"versions": []
},
"status": "ACTIVE",
"listeningBrokerEndpoint": {
"broker": "RABBITMQ",
"topic": null,
"exchange": "com.graphgrid.fuze.exchange",
"exchangeType": "topic",
"routingKey": "com.graphgrid.fuze.key",
"durable": true,
"exclusive": false,
"autoDelete": false,
"prefetchCount": 250,
"queue": "com.graphgrid.fuze.queue",
"region": null,
"bucket": null
},
"forwardingRules": [
{
"cypher": "WITH {txData} AS txData WITH txData.createdRelationships AS rels WITH [k IN KEYS(rels) WHERE ANY(x IN rels[k] WHERE x.type = \"SUMMARY_SENTENCE\") | rels[k].uidOfStartNode] AS resultList UNWIND resultList as annotatedTextUid WITH annotatedTextUid, apoc.text.split(annotatedTextUid, \":`\", 3)[-1] AS tmp WITH annotatedTextUid, substring(tmp, 0, length(tmp)-1) AS annotatedTextGrn MATCH (a:Article)-[:HAS_ANNOTATED_TEXT]->(an:AnnotatedText {grn: annotatedTextGrn}) WITH COLLECT(DISTINCT \"``:``:`\" + a.grn + \"`\") AS articleGrns RETURN {numTries: 0, policyName: \"default-index-policy\", clusterName: \"gg-dev-neo\", transactionId: timestamp(), indexName: \"article\", strategyUsed: \"defaultBroker\", idPartition: apoc.convert.toString(articleGrns)} AS results",
"resultKey": "results",
"multicast": {
"retry": 0,
"stopOnFailure": false,
"brokers": [
{
"broker": "RABBITMQ",
"topic": null,
"exchange": "com.graphgrid.search.exchange",
"exchangeType": "topic",
"routingKey": "com.graphgrid.search.key",
"durable": true,
"exclusive": false,
"autoDelete": false,
"prefetchCount": 250,
"queue": "com.graphgrid.search.queue",
"region": null,
"bucket": null
}
]
}
}
],
"neo4jCredentials": {
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "*****"
}
}
}

Retrieve Policy

Returns a saved Policy.

Base URL: /1.0/fuze/{{cluster-name}}/retrievePolicy/{{policy-name}}
Method: GET

ParameterDescription
cluster-nameName of the cluster on which the policy is saved
policy-nameName of the policy

Delete Policy

Deactivate and delete the policy.

Base URL: /1.0/fuze/{{cluster-name}}/deletePolicy/{{policy-name}}
Method: DELETE

ParameterDescription
cluster-nameName of the cluster on which the policy is saved
policy-nameName of the policy

Activate Policy

Sets the status field on the Policy to ACTIVE. Fuze will then start using the Policy when it next polls for active policies.

Base URL: /1.0/fuze/{{cluster-name}}/activatePolicy/{{policy-name}}
Method: POST

ParameterDescription
cluster-nameName of the cluster on which the policy is saved
policy-nameName of the policy

Deactivate Policy

Sets the status field on the Policy to INACTIVE. Fuze will then stop using the Policy and cleanup any resources associated with it the next time it polls for active policies.

Base URL: /1.0/fuze/{{cluster-name}}/deactivatePolicy/{{policy-name}}
Method: POST

ParameterDescription
cluster-nameName of the cluster on which the policy is saved
policy-nameName of the policy

Distributor

Sometimes a message needs to be sent on a message broker, or sent to multiple endpoints on one or more different brokers. The Distributor simplifies this by providing a single listening endpoint that forwards its received messages to any number of other broker endpoints.

The Distributor uses Geequel to filter which messages are forwarded, so any data transformation that can happen in Geequel can be performed as well.

Setting up the Distributor

A few steps are needed before message forwarding can begin.

  1. Create an S3 bucket for Fuze, if one does not already exist in the target environment
  2. Set the AWS param store value for /1.4/fuze/<env>/spring.aws.bucket to the bucket name created in step 1
  3. Create a distribution policy (see Distribution Policy))
  4. Set up the listening broker endpoint from the policy (RabbitMQ requires an exchange, queue, and routing key; SQS requires a queue; Kafka will create the topic automatically)
  5. Deploy Fuze to the target environment
  6. Upload the distribution policy created in step 3 (see Save Policy)
  7. Activate the distribution policy (see Activate Policy)
  8. If incoming data will be coming from ONgDB, install a trigger to send transaction data to the listening broker. This can also be set up by a Trigger Policy.

An example trigger:

CALL apoc.trigger.add("gg-dev-fuze-transactionData", "CALL apoc.broker.send(\"rabbitmq\", {txData}, {exchangeName: \"com.graphgrid.fuze.exchange\", routingKey:
\"com.graphgrid.fuze.key\", queueName: \"com.graphgrid.fuze.queue\"}) YIELD connectionName, message, configuration RETURN 1", {phase: "after"}, {uidKey: "grn"})

Distribution Policy

A JSON document that defines the broker endpoint to listen to for TransactionData coming from the graph and a number of forwarding rules that define where to send the data. Each of these forwarding rules may also contain some Geequel to transform the data.

A distribution policy has the following elements:

  • metadata - Information to uniquely identify the policy.
  • listeningBrokerEndpoint - Defines the message broker to use.
  • neo4jCredentials - The URI and credentials for a ONgDB instance.
  • forwardingRules - The broker endpoints where received messages should be sent.

Forwarding Rules

A List of all broker endpoints to forward the received message to and Geequel to filter and/or transform the message before sending it to each endpoint.

ParameterDescription
cypher stringThe Geequel that filters and/or transforms the received message. If {txData} is included in the query it is assumed that the query begins with WITH {txData} AS <varName> and {txData} will be replaced by apoc.convert.fromJsonMap("<txData>"). Otherwise WITH apoc.convert.fromJsonMap("<txData>") AS txData will be prepended to the query.
resultKey string(optional) The key that results from the Geequel query will be wrapped in. Allows the forwarded message to just be the data of interest, rather than wrapped in an object.
multicast objectContains a list of broker endpoints (brokers) to forward the transformed message to, if the message passes the filter. Each item in this list requires the broker. For KAFKA, it additionally requires a topic. For RABBITMQ it requires an exchange and routingKey. For SQS it requires a region, queue, and bucket.
Forwarding Rules Example
{
"forwardingRules": [
{
"cypher": "WITH {txData} AS txData WITH txData.assignedNodeProperties.byLabel.Movie AS assignedMovieProperties, txData WITH [k IN KEYS(assignedMovieProperties) WHERE ANY(x IN assignedMovieProperties[k] WHERE x.key=\"released\" AND x.value>=2000)] AS grnsWeWant UNWIND grnsWeWant AS result RETURN result",
"resultKey": "result",
"multicast": {
"brokers": [
{
"broker": "KAFKA",
"topic": "com.graphgrid.ml.movieTopic"
},
{
"broker": "RABBITMQ",
"exchange": "com.graphgrid.ml.exchange",
"routingKey": "com.graphgrid.ml.movieRoutingKey"
},
{
"broker": "SQS",
"region": "us-west-2",
"queue": "graphrid-ml-movieQueue",
"bucket": "graphgrid-ml-fuze"
}
]
}
},
{
"cypher": "WITH txData.assignedNodeProperties.byLabel.Person AS assignedPersonProperties, txData WITH [k IN KEYS(assignedPersonProperties) WHERE ANY(x IN assignedPersonProperties[k] WHERE x.key=\"born\" AND x.value<1980)] AS grnsWeWant UNWIND grnsWeWant AS result RETURN result",
"multicast": {
"brokers": [
{
"broker": "KAFKA",
"topic": "com.graphgrid.ml.personTopic"
},
{
"broker": "RABBITMQ",
"exchange": "com.graphgrid.ml.exchange",
"routingKey": "com.graphgrid.ml.personRoutingKey"
},
{
"broker": "SQS",
"region": "us-west-2",
"queue": "graphrid-ml-personQueue",
"bucket": "graphgrid-ml-fuze"
}
]
}
}
]
}

This example defines two forwarding rules. The first filters for movies released in the 21st century and forwards a subset of the TransactionData to each of a Kafka, a RabbitMQ, and an SQS endpoint. The second rule filters for people born before 1980 and forwards a subset of the TransactionData to each of a Kafka, a RabbitMQ, and an SQS endpoint.

Example Distribution Policy

This policy listens for messages on the RabbitMQ exchange/queue com.graphgrid.fuze.exchange/com.graphgrid.fuze.queue. The messages in this case are coming from ONgDB in the form of transaction data sent with a trigger after changes are made to the database. This data is then run through 3 rules, each filtering for different changes in the graph (one for created relationships of a certain type, and the other two for changes to certain types of nodes). The filtered results are returned as a JSON object that is then sent to the RabbitMQ queue for Search.

{
"metadata": {
"description": "Send new NLP related nodes and relationships to Search for indexing.",
"displayName": "example-distributor-policy"
},
"listeningBrokerEndpoint": {
"broker": "RABBITMQ",
"exchange": "com.graphgrid.fuze.exchange",
"routingKey": "com.graphgrid.fuze.key",
"queue": "com.graphgrid.fuze.queue"
},
"neo4jCredentials": {
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
},
"forwardingRules": [
{
"cypher": "WITH {txData} AS txData WITH txData.createdRelationships AS rels WITH [k IN KEYS(rels) WHERE ANY(x IN rels[k] WHERE x.type = \"SUMMARY_SENTENCE\") | rels[k].uidOfStartNode] AS resultList UNWIND resultList as annotatedTextUid WITH annotatedTextUid, apoc.text.split(annotatedTextUid, \":`\", 3)[-1] AS tmp WITH annotatedTextUid, substring(tmp, 0, length(tmp)-1) AS annotatedTextGrn MATCH (a:Article)-[:HAS_ANNOTATED_TEXT]->(an:AnnotatedText {grn: annotatedTextGrn}) WITH COLLECT(DISTINCT \"``:``:`\" + a.grn + \"`\") AS articleGrns RETURN {numTries: 0, policyName: \"default-index-policy\", clusterName: \"gg-dev-neo\", transactionId: timestamp(), indexName: \"article\", strategyUsed: \"defaultBroker\", idPartition: apoc.convert.toString(articleGrns)} AS results",
"resultKey": "results",
"multicast": {
"brokers": [
{
"broker": "RABBITMQ",
"exchange": "com.graphgrid.search.exchange",
"routingKey": "com.graphgrid.search.key",
"queue": "com.graphgrid.search.queue"
}
]
}
}
]
}

Distribution Policy Management

All Fuze components use the same API endpoints for basic interaction with policies. Please refer to the policy management section to save, retrieve, activate, deactivate, and/or delete Distribution policies.

Get Active Distribution Policies

Returns a list of all active distribution policies.

Base URL: /1.0/fuze/{{cluster-name}}/activeDistributionPolicies
Method: GET

ParameterDescription
cluster-nameName of the cluster on which the policy is saved
Request
curl --location --request GET "${API_BASE}/1.0/fuze/default/activeDistributionPolicies" \
--header "Authorization: Bearer ${BEARER_TOKEN}"
Response
{
"activePolicies": [
{
"metadata": {
"description": "Send new NLP related nodes and relationships to Search for indexing.",
"displayName": "example-distributor-policy",
"createdAt": "2019-10-30T19:26:37+00:00",
"updatedAt": "2021-03-05T15:03:27.302Z",
"versions": []
},
"status": "ACTIVE",
"listeningBrokerEndpoint": {
"broker": "RABBITMQ",
"topic": null,
"exchange": "com.graphgrid.fuze.exchange",
"exchangeType": "topic",
"routingKey": "com.graphgrid.fuze.key",
"durable": true,
"exclusive": false,
"autoDelete": false,
"prefetchCount": 250,
"queue": "com.graphgrid.fuze.queue",
"region": null,
"bucket": null
},
"forwardingRules": [
{
"cypher": "WITH {txData} AS txData WITH txData.createdRelationships AS rels WITH [k IN KEYS(rels) WHERE ANY(x IN rels[k] WHERE x.type = \"SUMMARY_SENTENCE\") | rels[k].uidOfStartNode] AS resultList UNWIND resultList as annotatedTextUid WITH annotatedTextUid, apoc.text.split(annotatedTextUid, \":`\", 3)[-1] AS tmp WITH annotatedTextUid, substring(tmp, 0, length(tmp)-1) AS annotatedTextGrn MATCH (a:Article)-[:HAS_ANNOTATED_TEXT]->(an:AnnotatedText {grn: annotatedTextGrn}) WITH COLLECT(DISTINCT \"``:``:`\" + a.grn + \"`\") AS articleGrns RETURN {numTries: 0, policyName: \"default-index-policy\", clusterName: \"gg-dev-neo\", transactionId: timestamp(), indexName: \"article\", strategyUsed: \"defaultBroker\", idPartition: apoc.convert.toString(articleGrns)} AS results",
"resultKey": "results",
"multicast": {
"retry": 0,
"stopOnFailure": false,
"brokers": [
{
"broker": "RABBITMQ",
"topic": null,
"exchange": "com.graphgrid.search.exchange",
"exchangeType": "topic",
"routingKey": "com.graphgrid.search.key",
"durable": true,
"exclusive": false,
"autoDelete": false,
"prefetchCount": 250,
"queue": "com.graphgrid.search.queue",
"region": null,
"bucket": null
}
]
}
}
],
"neo4jCredentials": {
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "*****"
}
}
]
}

ONgDB Writer

Fuze provides a service for writing to ONgDB that offers robust failure handling and retry mechanisms. It will attempt to execute any Geequel statements it receives at least once, retrying on transient failures, splitting requests around failed statements, and quarantining requests that could not commit to the graph, even after retry.

info

Requests may be made either through the REST API or via the message brokers. Both TransactionRequests and TransactionResults are also persisted to Elasticsearch.

ONgDB Writer Policy

A JSON document that defines the settings to use when executing a TransactionRequest and/or a list of message broker endpoints that listen for TransactionRequests to execute. A ONgDB Writer policy has the following elements:

  • metadata - Information to uniquely identify the policy.
  • defaultNeo4jCredentials - The Neo4jCredentials to use if a request does not specify any.
  • listeningBrokerEndpoints - A list of broker endpoints on which to listen for requests.
  • maxReadAttempts - The maximum number of times to attempt executing the statements in a read request before quarantining the request.
  • maxWriteAttempts - The maximum number of times to attempt executing a write request before quarantining the request.
  • minBatchSize - the minimum number of TransactionStatements that must be gathered into a batch request before executing.
  • maxBatchSize - (currently unavailable) the maximum number of TransactionStatements that may be present in a batch request.

Example ONgDB Writer Policy

{
"metadata": {
"description": "The settings to use when making requests to ONgDB",
"displayName": "ongdbwriter-policy"
},
"listeningBrokerEndpoints": [
{
"broker": "KAFKA",
"topic": "com.graphgrid.fuze.neo4jWriter.topic"
}
],
"defaultNeo4jCredentials": {
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
},
"maxReadAttempts": 5,
"maxWriteAttempts": 5,
"minBatchSize": 20,
"maxBatchSize": 100
}

ONgED Writer Policy Management

All Fuze components use the same API endpoints for basic interaction with policies. Please refer to the policy management section to save, retrieve, activate, deactivate, and/or delete Distribution policies.

Get Active ONgDB Writer Policies

Returns a list of all active ONgDB Writer policies.

Base URL: /1.0/fuze/{{cluster-name}}/activeNeo4jWriterPolicies/{{policy-name}}
Method: GET

ParameterDescription
cluster-nameName of the cluster on which the policy is saved
Request
curl --location --request GET "${API_BASE}/1.0/fuze/default/activeNeo4jWriterPolicies" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${BEARER_TOKEN}" \
Response
{
"activePolicies": [
{
"metadata": {
"description": "The settings to use when making requests to ONgDB",
"displayName": "ongdbwriter-policy",
"createdAt": "2021-03-05T15:53:45.646Z",
"updatedAt": "2021-03-05T15:53:50.615Z"
},
"status": "ACTIVE",
"defaultNeo4jCredentials": {
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
},
"txRequestBrokerEndpoints": null,
"txDataBrokerEndpoints": null,
"lastTransactionId": null,
"maxWriteAttempts": 5,
"maxReadAttempts": 5,
"minBatchSize": 20,
"maxBatchSize": 100
}
]
}

Write

Returns a TransactionResult object. Ignores the requestType field of the request and executes the request in write mode.

Base URL: /1.0/fuze/write
Method: POST

The write endpoint requires a json body called a Transaction Request which is detailed below. For an example of a full Write request see Write with Policy

Transaction Request

A JSON document that defines a request for the ONgDB Writer to execute.

ParameterDescription
statements TransactionStatement listA list of TransactionStatement objects containing the Geequel to execute this request. Default: null
metadata objectA map of metadata for the client to use. Default: null
requestType enumThe execution mode under which this request should be run. Options are READ and WRITE. Default: WRITE
route object listA list of message broker endpoints where the TransactionResult object created by executing this request should be sent. Default: null
requiredBookmarks string listA list of ONgDB bookmarks that must be present in the graph before this request's statements may execute. Default: null
splitAllowed booleanIndicates whether or not the request may be split into child requests in the event of a failure while executing a statement. Set to false if all statements in the request must succeed as one unit. Default: true
Neo4jCredentials Neo4jCredentials listA list of Neo4jCredentials for the ONgDB instances on which the statements should be run. Default: null
caution

requiredBookmarks is not currently available as an option when making batchable requests and any provided value will be ignored. Note also that using the read and write (and readAsync and writeAsync) API endpoints overrides the requestType value. When a request is sent through one of the message brokers, the requestType field is used to determine the appropriate access mode to run under.

Transaction Statement

The statements field specifies the Geequel queries to execute. It consists of a list of TransactionStatements, each of which contain the following fields:

ParameterDescription
statement stringThe Geequel statement to execute. Default: null
parameters objectA map of parameter names to values to use in the statement. Default: null
metadata objectA map of metadata for the client to use. Default: null
actions objectA set of TransactionStatements to execute after a request either successfully executes or fails and is sent to quarantine. Default: null
Actions
ParameterDescription
onSuccess TransactionStatement listA list of TransactionStatements to run after this statement succeeds. An example use might be sending a broker message with APOC indicating that it succeeded. These statements are run after the request succeeds and any potential changes have been committed to the graph. Default: null
onFailure TransactionStatement listA list of TransactionStatements to run after this statement fails. An example use might be sending a broker message with APOC indicating that it failed. These statements are run after the request has failed and is quarantined. Default: null

TransactionResult

A JSON document containing a list of values returned by executing the request and the input request with some additional values to assist in profiling and debugging requests.

FieldMeaning
records object listThe list of values returned by running the statements of a request. Default: []
txRequest TransactionRequestThe request that caused the creation of this result object. Contains some additional fields Default: null

Additional Transaction Request Fields

In addition to the fields provided when creating the request, the returned request contains some additional fields that may be useful when profiling the request or debugging failures.

FieldMeaning
fuzeId stringThe UUID assigned to this request.
batchId objectThe UUID of the batch request this one was a part of. Default: null
parentId objectThe UUID of this request's parent. Default: null
status enumThe current status of this request.
returned bookmark stringThe last ONgDB bookmark created after running the statements in this request. Default: null
queryAttemptNumber integerThe number of attempts made so far to run the statements in this request.
creationTime longThe time, in milliseconds, when this request was created.
startTime longThe time, in milliseconds, when execution of this request started. Default: 0
endTime longThe time, in milliseconds, when execution of this request finished. Default: 0
executionDuration longThe number of milliseconds spent executing this request. Default: -1
children TransactionRequest listA list of child requests resulting from splitting the request. Splits may occur either due to errors when running a statement or because multiple Neo4jCredentials were provided (in this case each child contains the same statements but only one set of credentials). Default: null
errorIndex integerThe index of the statement that caused an error to occur. Default: null
errorCode integerThe ONgDB code of the error that occurred. Default: null
errorMessage stringThe error message of the error that occurred. Default: null

Transaction Request Status

A request may be in one of several states:

  • DOES_NOT_EXIST - A request with this fuzeId has not been created.
  • NOT_RUN - The request has been created, but execution has not yet started.
  • BATCH - The request contains several child requests whose statements will be run under the same ONgDB session.
  • SPLIT - The request was broken into child requests before execution was attempted. This usually occurs when the request contains more than one set of Neo4jCredentials
  • IN_PROGRESS - Execution of this request is ongoing.
  • SUCCEEDED - Execution of this request succeeded without any problems.
  • FAILED_WITH_RETRY - Execution of this request failed, but it was retried. See the children for subsequent attempts. See the errorIndex, errorCode, and errorMessage for details about the failure.
  • FAILED - Execution of this request failed with an error that cannot be retried. See the errorIndex, errorCode, and `errorMessage for details about the failure.

Additional Transaction Statemenet Fields

Like the TransactionRequest containing the statements, each TransactionStatement in a returned request also contains some additional fields.

FieldMeaning
executed booleanThe request that caused the creation of this result object. Contains some additional fields. Default: null
startTime longThe time, in milliseconds, when execution of this statement started. Default: 0
endTime longThe time, in milliseconds, when execution of this statement finished. Default: 0
executionDuration longThe number of milliseconds spent executing this statement. Default: -1
status enumThe current state of this statement. Possible values are NOT_RUN, IN_PROGRESS, SUCCEEDED, and FAILED.
Example

This is a sample of the TransactionResult from a request to retrieve the properties of every Person node in the graph:

{
"txRequest": {
"statements": [
{
"statement": "MATCH (p:Person) RETURN PROPERTIES(p)",
"parameters": null,
"metadata": null,
"actions": null,
"executed": true,
"startTime": 1553265673299,
"endTime": 1553265673359,
"status": "SUCCEEDED",
"executionDuration": 60
}
],
"fuzeId": "6523221b-61d0-4695-875c-70d565a2ea12",
"batchId": null,
"parentId": null,
"metadata": {
"id": "abc123"
},
"requestType": "WRITE",
"sendResultsTo": null,
"status": "SUCCEEDED",
"requiredBookmarks": null,
"returnedBookmark": null,
"queryAttemptNumber": 1,
"creationTime": 1553265670537,
"startTime": 1553265671272,
"endTime": 1553265674624,
"splitAllowed": true,
"children": null,
"errorIndex": null,
"errorCode": null,
"errorMessage": null,
"neo4jCredentials": [
{
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
}
],
"executionDuration": 3352
},
"records": [
{
"PROPERTIES(p)": {
"name": "David"
}
},
{
"PROPERTIES(p)": {
"name": "Keanu Reeves",
"born": 1964
}
},
{
"PROPERTIES(p)": {
"name": "Edgar"
}
}
]
}

Write With Policy

Returns a TransactionRequest object.

Uses the ONgDB Writer Policy {{policy-name}} stored on cluster {{cluster-name}} to determine settings for the maximum number of write attempts and the default Neo4jCredentials to use if the TransactionRequest does not contain any.

Base URL: /1.0/fuze/{{cluster-name}}/write/{{policy-name}}
Method: POST

note

Ignores the requestType field of the request and executes the request in write mode.

Request

This request simply writes a single node.

curl --location --request POST "${API_BASE}/1.0/fuze/default/write/example-ongdbwriter-policy" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${BEARER_TOKEN}" \
--data-raw '{
"metadata": {
"statementId": 42
},
"statement": "MERGE (:Person {name: $name, born: $born})",
"parameters": {
"name": "Keanu Reeves",
"born": 1964
},
"actions": {
"onSuccess": [
{
"statement": "CALL apoc.broker.send(\"rabbitmq\", {msg: \"Successfully wrote statement $id\"}, {exchangeName: \"com.graphgrid.fuze.exchange\", routingKey: \"com.graphgrid.fuze.successKey\", queueName: \"com.graphgrid.fuze.successQueue\"}) YIELD connectionName, message, configuration RETURN 1",
"parameters": {
"id": 42
}
}
],
"onFailure": [
{
"statement": "CALL apoc.broker.send(\"rabbitmq\", {msg: \"Failed to write statement $id\"}, {exchangeName: \"com.graphgrid.fuze.exchange\", routingKey: \"com.graphgrid.fuze.failureKey\", queueName: \"com.graphgrid.fuze.failureQueue\"}) YIELD connectionName, message, configuration RETURN 1",
"parameters": {
"id": 42
}
}
]
}
}
'
Response
{
"transactionResult": {
"transactionRequest": {
"statements": [],
"fuzeId": "c7b891c3-8abc-4cfa-af13-635da90e5cf6",
"batchId": null,
"parentId": null,
"metadata": {
"statementId": 42
},
"requestType": "WRITE",
"sendResultsTo": null,
"status": "SUCCEEDED",
"requiredBookmarks": null,
"returnedBookmark": "neo4j:bookmark:v1:tx581",
"queryAttemptNumber": 1,
"creationTime": "2021-02-24T18:02:04.828Z",
"timeToCreate": null,
"startTime": "2021-02-24T18:02:04.829Z",
"endTime": "2021-02-24T18:02:04.831Z",
"splitAllowed": true,
"children": null,
"errorIndex": null,
"errorCode": null,
"errorMessage": null,
"neo4jCredentials": null,
"executionDuration": 2
},
"records": []
}
}

Write Async

Returns the fuzeId of the provided request. This value may be used to check the status of the request and, when results are available, to retrieve the TransactionResult generated from executing the request.

note

Ignores the requestType field of the request and executes the request in write mode.

Base URL: /1.0/fuze/writeAsync
Method: POST

For an example of a Write Async request see Write Async With Policy.

Write Async With Policy

Returns the fuzeId of the provided request. This value may be used to check the status of the request and, when results are available, to retrieve the TransactionResult generated from executing the request.

Uses the ONgDB Writer Policy {{policy-name}} stored on cluster {{cluster-name}} to determine settings for the maximum number of write attempts and the default Neo4jCredentials to use if the TransactionRequest does not contain any.

note

Ignores the requestType field of the request and executes the request in write mode.

Base URL: /1.0/fuze/{{cluster-name}}/writeAsync/{{policy-name}}
Method: POST

Request
curl --location --request POST "${API_BASE}/1.0/fuze/default/writeAsync/example-ongdbwriter-policy" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${BEARER_TOKEN}" \
--data-raw '{
"metadata": {
"id": "24601"
},
"statements": [
{
"statement": "CREATE (:Person {name: $name})",
"parameters": {
"name": "Edgar"
}
}
],
"neo4jCredentials": [
{
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
}
]
}'
Response
{
"fuzeId": "4b5fe3f4-d27d-4f89-a8ea-bac14b390b1d"
}

Read

Return a TransactionResult object.

note

Ignores the requestType field of the request and executes the request in read mode.

Base URL: /1.0/fuze/read
Method: POST

For an example of a Read request see Read With Policy.

Read With Policy

Return a TransactionResult object.

Uses the ONgDB Writer Policy {{policy-name}} stored on cluster {{cluster-name}} to determine settings for the maximum number of read attempts and the default Neo4jCredentials to use if the TransactionRequest does not contain any.

note

Ignores the requestType field of the request and executes the request in read mode.

Base URL: /1.0/fuze/{{cluster-name}}/read/{{policy-name}}
Method: POST

Request
curl --location --request GET "${API_BASE}/1.0/fuze/default/read/example-ongdbwriter-policy" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${BEARER_TOKEN}" \
--data-raw '{
"metadata": {
"id": "abc123"
},
"statements": [
{
"statement": "MATCH (p:Person) RETURN PROPERTIES(p)"
}
],
"neo4jCredentials": [
{
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
}
]
}'
Response
{
"transactionResult": {
"transactionRequest": {
"statements": [
{
"statement": "MATCH (p:Person) RETURN PROPERTIES(p)",
"parameters": null,
"startTime": "2021-02-24T18:09:31.353Z",
"endTime": "2021-02-24T18:09:31.357Z",
"executionDuration": 4,
"metadata": null,
"actions": null,
"status": "SUCCEEDED",
"executed": true
}
],
"fuzeId": "780b2c3b-2821-4e91-a287-711e30cf8e6f",
"batchId": null,
"parentId": null,
"metadata": {
"id": "abc123"
},
"requestType": "READ",
"sendResultsTo": null,
"status": "SUCCEEDED",
"requiredBookmarks": null,
"returnedBookmark": null,
"queryAttemptNumber": 1,
"creationTime": "2021-02-24T18:09:31.351Z",
"timeToCreate": null,
"startTime": "2021-02-24T18:09:31.352Z",
"endTime": "2021-02-24T18:09:31.358Z",
"splitAllowed": true,
"children": null,
"errorIndex": null,
"errorCode": null,
"errorMessage": null,
"neo4jCredentials": [
{
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
}
],
"executionDuration": 6
},
"records": [
{
"PROPERTIES(p)": {
"name": "Edgar",
"createdAt": "2021-02-24T16:25:19+00:00",
"grn": "grn:gg:person:iY4Ehm0Bb3Jccu42S8rmjvvKkUqz3lc9qmZtBTFkrLeV",
"lastSearchIndexedAt": "0",
"updatedAt": "2021-02-24T16:25:19+00:00"
}
},
{
"PROPERTIES(p)": {
"createdAt": "2021-02-11T16:21:22+00:00",
"grn": "grn:gg:person:pyXToRhZEh5iIZq5GLGNvrTiXmSMEEAjcdHEdL3jIcTq",
"lastSearchIndexedAt": "0",
"born": 1964,
"name": "Keanu Reeves",
"index": "Person",
"updatedAt": "2021-02-11T16:21:22+00:00"
}
}
]
}

Read Async

Returns the fuzeId of the provided request. This value may be used to check the status of the request and, when results are available, to retrieve the TransactionResult generated from executing the request.

note

Ignores the requestType field of the request and executes the request in read mode.

Base URL: /1.0/fuze/readAsync
Method: POST

For an example of a Read Async request see Read Async With Policy.

Read Async With Policy

Returns the fuzeId of the provided request. This value may be used to check the status of the request and, when results are available, to retrieve the TransactionResult generated from executing the request.

Uses the ONgDB Writer Policy {{policy-name}} stored on cluster {{cluster-name}} to determine settings for the maximum number of read attempts and the default Neo4jCredentials to use if the TransactionRequest does not contain any.

note

Ignores the requestType field of the request and executes the request in read mode.

Base URL: /1.0/fuze/{{cluster-name}}/readAsync/{{policy-name}}
Method: POST

Request
curl --location --request GET "${API_BASE}/1.0/fuze/default/readAsync/example-ongdbwriter-policy" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${BEARER_TOKEN}" \
--data-raw '{
"metadata": {
"id": "abc123"
},
"statements": [
{
"statement": "MATCH (p:Person) RETURN PROPERTIES(p)"
}
],
"neo4jCredentials": [
{
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
}
]
}'
Response
{
"fuzeId": "389627ad-129a-4888-9048-ae9487b21ee8"
}

Check Request Status

Accepts the fuzeId string as a query parameter. Returns the current status of the request. Possible values are defined and explained here.

Base URL: /1.0/fuze/checkTransactionRequestStatus/{{fuzeId}}
Method: GET

Request
note

We will be using the fuzeId from the Read Async with Policy response for the following transaction examples.

curl --location --request GET "${API_BASE}/1.0/fuze/checkTransactionRequestStatus/389627ad-129a-4888-9048-ae9487b21ee8" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${BEARER_TOKEN}"
Response
{
"status": "SUCCEEDED"
}

Get Transaction Request

Base URL: /1.0/fuze/getTransactionRequest/{{fuzeId}}
Method: GET

Accepts the fuzeId string as a query parameter. Returns the TransactionRequest with that fuzeId, if it exists.

Request
curl --location --request GET "${API_BASE}/1.0/fuze/getTransactionRequest/389627ad-129a-4888-9048-ae9487b21ee8" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${BEARER_TOKEN}"
Response
{
"transactionRequest": {
"statements": [
{
"statement": "MATCH (p:Person) RETURN PROPERTIES(p)",
"parameters": null,
"startTime": "2021-02-25T16:48:40.583Z",
"endTime": "2021-02-25T16:48:40.585Z",
"executionDuration": 2,
"metadata": null,
"actions": null,
"status": "SUCCEEDED",
"executed": true
}
],
"fuzeId": "389627ad-129a-4888-9048-ae9487b21ee8",
"batchId": null,
"parentId": null,
"metadata": {
"id": "abc123"
},
"requestType": "READ",
"sendResultsTo": null,
"status": "SUCCEEDED",
"requiredBookmarks": null,
"returnedBookmark": null,
"queryAttemptNumber": 1,
"creationTime": "2021-02-25T16:48:40.580Z",
"timeToCreate": null,
"startTime": "2021-02-25T16:48:40.582Z",
"endTime": "2021-02-25T16:48:40.586Z",
"splitAllowed": true,
"children": null,
"errorIndex": null,
"errorCode": null,
"errorMessage": null,
"neo4jCredentials": [
{
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
}
],
"executionDuration": 4
}
}

Get Transaction Result

Base URL: 1.0/fuze/getTransactionResult/{{fuzeId}}
Method: GET

Accepts the fuzeId string as a query parameter. Returns the TransactionRequest generated by executing the request with that id, if it is available.

Request
curl --location --request GET "${API_BASE}/1.0/fuze/getTransactionResult/389627ad-129a-4888-9048-ae9487b21ee8" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${BEARER_TOKEN}"
Response
{
"transactionResult": {
"transactionRequest": {
"statements": [
{
"statement": "MATCH (p:Person) RETURN PROPERTIES(p)",
"parameters": null,
"startTime": "2021-02-25T16:48:40.583Z",
"endTime": "2021-02-25T16:48:40.585Z",
"executionDuration": 2,
"metadata": null,
"actions": null,
"status": "SUCCEEDED",
"executed": true
}
],
"fuzeId": "389627ad-129a-4888-9048-ae9487b21ee8",
"batchId": null,
"parentId": null,
"metadata": {
"id": "abc123"
},
"requestType": "READ",
"sendResultsTo": null,
"status": "SUCCEEDED",
"requiredBookmarks": null,
"returnedBookmark": null,
"queryAttemptNumber": 1,
"creationTime": "2021-02-25T16:48:40.580Z",
"timeToCreate": null,
"startTime": "2021-02-25T16:48:40.582Z",
"endTime": "2021-02-25T16:48:40.586Z",
"splitAllowed": true,
"children": null,
"errorIndex": null,
"errorCode": null,
"errorMessage": null,
"neo4jCredentials": [
{
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
}
],
"executionDuration": 4
},
"records": [
{
"PROPERTIES(p)": {
"name": "Edgar",
"createdAt": "2021-02-24T16:25:19+00:00",
"grn": "grn:gg:person:iY4Ehm0Bb3Jccu42S8rmjvvKkUqz3lc9qmZtBTFkrLeV",
"lastSearchIndexedAt": "0",
"updatedAt": "2021-02-24T16:25:19+00:00"
}
}
]
}
}

View Quarantine

Returns a list of TransactionRequests that have failed in a way such that they are no longer being retried.

Base URL: /1.0/fuze/viewQuarantine
Method: GET

Before we jump into this request let's set up an example. Let's say we accidently made a single statement write request with a snytax error. Our json body could like this:

{
"statements": [
{
"statement": "CREAT (:Person {name: $name})",
"parameters": {
"name": "Freda"
}
}
],
"neo4jCredentials": [
{
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
}
]
}

Notice that our statement says CREAT instead of CREATE. This request will fail and show up in our quarantine queue.

Request
curl --location --request GET "${API_BASE}/1.0/fuze/viewQuarantine" \
--header "Authorization: Bearer ${BEARER_TOKEN}"
Response
{
"quarantine": [
{
"statements": [
{
"statement": "CREAT (:Person {name: $name})",
"parameters": {
"name": "Freda"
},
"startTime": "2021-02-25T17:05:42.582Z",
"endTime": "2021-02-25T17:05:42.583Z",
"executionDuration": 1,
"metadata": null,
"actions": null,
"status": "FAILED",
"executed": true
}
],
"fuzeId": "9e9d8b4c-53ab-47f4-b951-6a947a516622",
"batchId": null,
"parentId": null,
"metadata": null,
"requestType": "WRITE",
"sendResultsTo": null,
"status": "FAILED",
"requiredBookmarks": null,
"returnedBookmark": null,
"queryAttemptNumber": 1,
"creationTime": "2021-02-25T17:05:42.584Z",
"timeToCreate": null,
"startTime": "1970-01-01T00:00:00Z",
"endTime": "1970-01-01T00:00:00Z",
"splitAllowed": true,
"children": null,
"errorIndex": null,
"errorCode": "Neo.ClientError.Statement.SyntaxError",
"errorMessage": "Invalid input ' ': expected 'e/E' (line 1, column 6 (offset: 5))\n\"CREAT (:Person {name: $name})\"\n ^",
"neo4jCredentials": [
{
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
}
],
"executionDuration": -1
}
]
}

Clear Quarantine

Clears the quarantine from memory and from Elasticsearch. This endpoint does not return anything.

Base URL: /1.0/fuze/clearQuarantine
Method: DELETE

Request
curl --location --request DELETE "${API_BASE}/1.0/fuze/clearQuarantine" \
--header "Authorization: Bearer ${BEARER_TOKEN}"
Response

200 OK

Queue Request for Batching

Returns the fuzeId of the provided request. This value may be used to check the status of the request and, when results are available, to retrieve the TransactionResult generated from executing the request.

Requests sent to this endpoint will not be executed until the batchAndExecuteQueuedRequests endpoint is called.

Base URL: /1.0/fuze/{{cluster-name}}/queueRequestForBatching
Method: POST

Request
curl --location --request POST "${API_BASE}/1.0/fuze/queueRequestForBatching" \
--header "Authorization: Bearer ${BEARER_TOKEN}" \
--header 'Content-Type: application/json' \
--data-raw '{
"metadata": {
"id": "24601"
},
"statements": [
{
"statement": "CREATE (:Person {name: $name})",
"parameters": {
"name": "David"
}
}
],
"neo4jCredentials": [
{
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
}
]
}'
{
"fuzeId": "a12721c7-4126-4085-86d6-1ab8532fa61d"
}

Batch and Execute Queued Requests

Runs the queued requests in batches. Requests are grouped for batching by their neo4jCredentials. Basically, all requests to write to a particular Neo4j instance will be executed together. This endpoint does not return anything.

Base URL: /1.0/fuze/{{cluster-name}}/batchAndExecuteQueuedRequests
Method: POST

curl --location --request POST "${API_BASE}/1.0/fuze/batchAndExecuteQueuedRequests" \
--header "Authorization: Bearer ${BEARER_TOKEN}" \
--header 'Content-Type: application/json'
Response

200 OK

Trigger Manager

Ensure that each APOC trigger exists on the specified ONgDB instances.

Trigger Policy

ParameterDescription
metadata metadataA map of metadata for the client to use Default: null
instances Map< string, Neo4jCredentials >The collection of ONgDB instances that each trigger may reference. Default: null
triggers List< Trigger >The list of triggers to add/remove when de/activating the policy Default: null

Trigger

ParameterDescription
triggerName stringName of the triggers as it should appear in ONgDB Default: null
triggerCypher stringGeequel to execute when the trigger runs. Default: null
phase stringThe phase to run the trigger in. Options are before and after Default: null
uidKeys List< string >(optional) ordered set of properties to prioritize when creating uids (used for txData) Default: []
uidLabels List< string >(optional) ordered set of labels to prioritize when creating uids (used for txData) Defaul: []
includedInstances List < string >The list of keys from the instances field in the policy specifying which instances the trigger applies to. Default: null
excludedInstances List < string >The list of keys from the instances field in the policy specifying which instances the trigger does not apply to. Default: null
caution

A Trigger cannot have both includedInstances and excludedInstances set, and if neither includedInstances and excludedInstances are set, then the trigger will apply to all database instances in the instances Map.

Example Trigger Policies

Triggers to apply GRNs to new nodes and relationships:

{
"metadata": {
"description": "Ensure that the txData trigger exists on ongdb",
"displayName": "trigger-policy"
},
"instances": {
"db1": {
"uri": "bolt://someHost:7687",
"username": "ongdb",
"password": "***"
},
"db2": {
"uri": "bolt://someOtherHost:7687",
"username": "ongdb",
"password": "***"
}
},
"triggers": [
{
"triggerName": "addGrnToCreatedNodes",
"triggerCypher": "UNWIND {createdNodes} AS n WITH n WHERE NOT EXISTS (n.grn) CALL apoc.do.when( size(labels(n)) > 1, \"SET n.grn = 'grn:gg:' + lower(filter(l IN labels(n)WHERE l <> 'Resource')[0]) + ':' + apoc.text.random(44)\" , \"SET n.grn = 'grn:gg:' + toLower(labels(n)[0]) + ':' + apoc.text.random(44)\", {n : n } ) yield value RETURN n",
"phase": "before"
},
{
"triggerName": "addGrnToCreatedRelationships",
"triggerCypher": "UNWIND {createdRelationships} AS r WITH r WHERE NOT exists(r.grn) SET r.grn = 'grn:gg:' + toLower(type(r)) + ':' + apoc.text.random(44)",
"phase": "before"
}
]
}

Trigger Policy Management

All Fuze components use the same API endpoints for basic interaction with policies. Please refer to the policy management section to save, retrieve, activate, deactivate, and/or delete Distribution policies.

Get Active Trigger Policies

Returns a list of all active trigger policies.

Base URL: /1.0/fuze/activeTriggerPolicies
Method: GET

Request

curl

curl --location --request GET "${API_BASE}/1.0/fuze/activeTriggerPolicies" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${BEARER_TOKEN}"
Response
{
"activePolicies": [
{
"metadata": {
"description": "Ensure that the txData trigger exists on ongdb",
"displayName": "trigger-policy",
"createdAt": "2021-03-05T15:13:56.65Z",
"updatedAt": "2021-03-05T15:14:08.937Z",
"versions": []
},
"status": "ACTIVE",
"instances": {
"db1": {
"uri": "bolt://someHost:7687",
"username": "ongdb",
"password": "***"
},
"db2": {
"uri": "bolt://someOtherHost:7687",
"username": "ongdb",
"password": "***"
}
},
"triggers": [
{
"triggerName": "addGrnToCreatedNodes",
"triggerCypher": "UNWIND {createdNodes} AS n WITH n WHERE NOT EXISTS (n.grn) CALL apoc.do.when( size(labels(n)) > 1, \"SET n.grn = 'grn:gg:' + lower(filter(l IN labels(n)WHERE l <> 'Resource')[0]) + ':' + apoc.text.random(44)\" , \"SET n.grn = 'grn:gg:' + toLower(labels(n)[0]) + ':' + apoc.text.random(44)\", {n : n } ) yield value RETURN n",
"phase": "before",
"uidKeys": null,
"uidLabels": null,
"includedInstances": null,
"excludedInstances": null
},
{
"triggerName": "addGrnToCreatedRelationships",
"triggerCypher": "UNWIND {createdRelationships} AS r WITH r WHERE NOT exists(r.grn) SET r.grn = 'grn:gg:' + toLower(type(r)) + ':' + apoc.text.random(44)",
"phase": "before",
"uidKeys": null,
"uidLabels": null,
"includedInstances": null,
"excludedInstances": null
}
]
}
]
}

Create a Notification Trigger

A trigger policy is capable of setting up event listeners that will send transaction data to the com.graphgrid.ingest queue in RabbitMQ. This data is used to create a Notification node which connects to the user's UserInbox node.

Example Fuze Notification Trigger Policy

This trigger policy forwards a query to look for any new movies that are added to the graph that Keanu Reeves has acted in. Notice that the query is unwinding the txData, (transaction data), and is looking to see if a relationship of ACTED_IN has been created for the Person with the property name equal to "Keanu Reeves". If this query finds a match then a notification will be created.

In this case it is important to note that a property change on an existing node (the "Keanu Reeves" node) is necessary for the transaction data to properly pick up. For example, if we were to only create a new relationship of ACTED_IN between Keanu and a new movie, the transaction data would not get picked up. If we added a new relationship and a new property (for example properties like role or updatedAt) to the "Keanu Reeves" Person node the transaction data will be properly recognized by the trigger.

{
"metadata": {
"description": "On RabbitMQ, forward messages to rabbitmq publish queue for notifications.",
"displayName": "{{fuzePolicyName}}",
"createdAt": "2019-10-23T18:09:52+00:00",
"updatedAt": "2019-10-23T18:09:53+00:00",
"versions": null
},
"listeningBrokerEndpoint": {
"broker": "RABBITMQ",
"topic": null,
"exchange": "com.graphgrid.fuze.exchange",
"exchangeType": "topic",
"routingKey": "com.graphgrid.fuze.key",
"durable": true,
"exclusive": false,
"autoDelete": false,
"queue": "com.graphgrid.fuze.queue",
"region": null,
"bucket": null
},
"forwardingRules": [
{
"cypher": "WITH {txData} AS txData WITH txData.createdRelationships AS event, txData UNWIND KEYS(event) AS key WITH key,event[key] AS changes WHERE changes.type = 'ACTED_IN' MATCH (p:Person {name: 'Keanu Reeves', grn:replace(apoc.text.split(changes.uidOfStartNode, ':', 3)[-1],'`','')}) MERGE (n:NotificationType {name:'in-app'}) ON CREATE SET n.grn = 'grn:gg:' + toLower(head(labels(n))) + ':' + apoc.create.uuid() WITH n MATCH (u:User) WITH DISTINCT {userGrns: COLLECT(u.grn),message: 'New Notification!',title: 'New Notification', notificationTypeGrn: n.grn} AS results RETURN results",
"resultKey": "results",
"multicast": {
"retry": 0,
"stopOnFailure": false,
"brokers": [
{
"broker": "RABBITMQ",
"topic": null,
"exchange": "com.graphgrid.ingest",
"exchangeType": "topic",
"routingKey": "publish.key",
"durable": true,
"exclusive": false,
"autoDelete": false,
"queue": "com.graphgrid.ingest",
"region": null,
"bucket": null
}
]
}
}
],
"neo4jCredentials": {
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "admin"
}
}

Notification Object

The JSON used while creating a notification policy triggers the creation of a notification object. This object allows users to create customizable notifications with alert sounds, messages, and more.

Example of a Notification Object
{notificationTypeGrn: push.grn,userGrns: COLLECT(u.grn),message: '{\\\"APNS\\\":\\\"{\\\\\\\"aps\\\\\\\":{\\\\\\\"sound\\\\\\\":\\\\\\\"default\\\\\\\",\\\\\\\"badge\\\\\\\":9,\\\\\\\"alert\\\\\\\":\\\\\\\"A new movie starring Tom Hanks is showing near you!\\\\\\\"}}\\\"}

Below are all the possible properties of a notification object:

PropertyDescription
notificationTypeGrn stringThe grn of the notificationType node. Valid types are push-notification,voip-push-notification or in-app.
userGrns list, stringA list of user grns that the user will send the notification/message to.
groupGrns list, stringA list of group grns that the user will send the notification/message to.
deviceType stringThe type of device that the notification will be sent to.
eventTrigger stringThe event that will trigger the notification's creation.
userCreatorGrn stringThe user grn of the notification/message sender.
fileNodeGrns list, stringA list of file node grns to be attached to the notificaiton/message.
notificationDate stringThe date the notification was sent/created.
notificationTitle stringThe title of the notification.
showPopup booleanOption (true/false) to show a pop up upon notify.
message stringA message delivered with the notification. The AWS metadata goes inside here, it can be either a string or a configured json.
url stringA url delivered with the notification.
iconCategory stringAn icon delivered with the notification.
doNotDisplay booleanOption (true/false) to display notification.
internalMessage stringAn internal message delivered with the notification.
attributes list, stringAttributes assigned to the notification via creation of a MessageAttributeNode.
messageStructure stringThe message structure attributed to the notification.
userDetailGrn stringThe user detail grn.

Information on Apple Push Notification service (APNs), and Firebase Cloud Messaging (FCM) for Android.

Fuze Worker

Receive data on some message broker endpoints and run Geequel either on a schedule or when data is received on a message broker.

Worker Policy

A JSON document containing a list of tasks that form a directed graph modeling the flow of data through the system.

ParameterDescription
metadata objectA map of metadata for the client to use Default: null
workers worker listA list of Workers to (de)activate as a group. Default: null
defaultNeo4jCredentialsThe default Neo4jCredentials to use if a Worker does not specify the ONgDB instance it runs on. Default: null
statusThe status of the Worker.

Worker

A JSON document containing a list of tasks that form a directed graph modeling the flow of data through the system.

ParameterDescription
cypher stringA read-only Geequel snippet that performs some operation on any messages the worker receives. Default: null
listeningEndpoints BrokerEndpoint listA list of broker endpoints that listen for messages and execute the Geequel with the received message as input. Default: null
neo4jCredentials Neo4jCredentialsThe ONgDB instance to run the Geequel on. Default: null
triggerType stringThe type of trigger. Options are BROKER, FIXED_DELAY, and FIXED_RATE. Default: BROKER
fixedDelay intThe time interval the Geequel should be repeatedly executed at. Default: 1000ms
fixedRate intA time interval for the delay between each run of the Geequel. Default: 1000ms

Example Worker Policy

All Fuze components use the same API endpoints for basic interaction with policies. Please refer to save policy for more information.

{
"metadata": {
"displayName": "example-fuze-worker-policy",
"description": "Example Worker policy"
},
"workers": {
"periodicPeopleCount": {
"metadata": {
"name": "periodic-people-count",
"description": "Sends a count of Person nodes to a RabbitMQ queue every 60 seconds"
},
"cypher": "MATCH (p:Person) WITH COUNT(p) AS personCount CALL apoc.broker.send('rabbitmq', {personCount: personCount}, {queueName: 'com.graphgrid.queue.personCountResults', exchangeName: 'com.graphgrid.exchange', routingKey: 'com.graphgrid.key.personCountResults'}) YIELD connectionName, message, configuration RETURN 1",
"triggerType": "FIXED_RATE",
"fixedRate": "60s",
"neo4jCredentials": {
"uri": "bolt://ongdb:7688",
"username": "ongdb",
"password": "xxxxx"
}
}
}
}

Worker Policy Management

All Fuze components use the same API endpoints for basic interaction with policies. Please refer to the policy management section to save, retrieve, activate, deactivate, and/or delete Distribution policies.

Get Active Worker Policies

Returns a list of all active worker policies.

Base URL: /1.0/fuze/{{cluster-name}}/activeWorkerPolicies
Method: GET

ParameterDescription
cluster-nameName of the cluster on which the policy is saved
Request
curl --location --request GET "${API_BASE}/1.0/fuze/default/activeWorkerPolicies" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${BEARER_TOKEN}"
Response
{
"activePolicies": [
{
"metadata": {
"description": "Example Worker policy",
"displayName": "example-fuze-worker-policy",
"createdAt": "2021-03-10T14:18:32.109Z",
"updatedAt": "2021-03-10T14:18:32.11Z"
},
"status": "ACTIVE",
"workers": {
"periodicPeopleCount": {
"metadata": {
"name": "periodic-people-count",
"description": "Sends a count of Person nodes to a RabbitMQ queue every 60 seconds"
},
"cypher": "MATCH (p:Person) WITH COUNT(p) AS personCount CALL apoc.broker.send('rabbitmq', {personCount: personCount}, {queueName: 'com.graphgrid.queue.personCountResults', exchangeName: 'com.graphgrid.exchange', routingKey: 'com.graphgrid.key.personCountResults'}) YIELD connectionName, message, configuration RETURN 1",
"triggerType": "FIXED_RATE",
"listeningEndpoints": null,
"fixedDelay": "1000ms",
"fixedRate": "60s",
"route": null,
"neo4jCredentials": {
"uri": "bolt://ongdb:7688",
"username": "ongdb",
"password": "xxxxx"
}
}
},
"defaultNeo4jCredentials": null
}
]
}