GraphGrid Fuze
2.0
API Version 1.0
Intro to Fuze
GraphGrid Fuze provides integration services to distribute, route and transform transactional event data from ONgDB to trigger dynamic workers for graph processing, searching, indexing and machine learning processes. Fuze is comprised of 5 components: Distributor, ONgDB Writer/Reader, Trigger Manager, and Worker.
This Fuze API version is 1.0 and as such all endpoints are rooted at /1.0/
.
For example, http://localhost/1.0/fuze
(requires auth) would be the base context for this Fuze API under the GraphGrid API deployed at
http://localhost
.
Environment
Fuze requires the following integrations:
- ONgDB 1.0+
- Redis 5.0.7+
- Elasticsearch 6.3+
- Kafka 2.0+
- RabbitMQ 3.7+
Policy Management
Each component of Fuze (Distributor, ONgDB Writer, Trigger Manager, and Worker) uses policies to manage which data flows through it and how that data is processed. For details on how to create a policy for a particular Fuze component, see the section for that component. All Fuze components use the same API endpoints for basic interaction with policies.
Common Policy Field Types
There are several field types that appear in multiple types of policies and so are detailed below.
Metadata
Provides information about the policy. This information is purely for the user and is not used by Fuze in any way.
Parameter | Description |
---|---|
description string | A description of what the policy does Default: null |
displayName string | A human parsable name for the policy. Default: null |
Example
{
"displayName": "ongdbwriter-replication-policy",
"description": "Listen on broker endpoints for TxData, convert it to Geequel, and copy the changes to this ONgDB instance."
}
Broker Endpoint
Specifies an endpoint on a message broker to either send data to or receive data from.
The fields used when either sending a message or creating a listener depends on the value of the broker field.
Parameter | Description |
---|---|
broker string | The message broker to use. Options are KAFKA , RABBITMQ , or SQS Default: null |
topic string | the Kafka topic Default: null |
exchange string | The RabbitMQ exchange Default: null |
exchangeType string | The type of the RabbitMQ exchange. Options are direct , topic , fanout , headers , or system Default: topic |
routingKey string | The RabbitMQ routing key. Default: null |
durable boolean | Is the RabbitMQ queue durable? Default: true |
exclusive boolean | Is the RabbitMQ queue exclusive? Default: false |
autoDelete boolean | Delete the RabbitMQ queue after the consumer is destroyed? Default: false |
prefetchCount int | (optional) The number of RabbitMQ messages to prefetch. Default: 250 |
queue string | The RabbitMQ or SQS queue. Default: null |
region string | The AWS region in which the SQS queue exists. Default: null |
bucket string | The S3 bucket to use to temporarily hold large messages. Default: null |
Examples
KAFKA
{
"broker": "KAFKA",
"topic": "com.graphgrid.fuze.topic"
}
RabbitMQ
{
"broker": "RABBITMQ",
"exchange": "com.graphgrid.fuze.exchange",
"routingKey": "com.graphgrid.fuze.routingKey",
"queue": "com.graphgrid.fuze.queue"
}
SQS
{
"broker": "SQS",
"region": "us-west-2",
"bucket": "graphgrid-fuze-bucket",
"queue": "graphgrid-fuze-queue"
}
ONgDB Credentials
Specifies how to connect to a ONgDB instance (or cluster).
Parameter | Description |
---|---|
uri string | The bolt or bolt+routing endpoint for the ONgDB instance (including the protocol, hostname, and port number). Default: null |
username string | A valid username for the ONgDB instance. Default: null |
password string | The password that goes with the username. Default: null |
Example
{
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
}
Get Status
Get the status of GraphGrid Fuze. Will return a 200 OK
response if healthy.
Base URL: /1.0/fuze/status
Method: GET
Request
curl --location --request GET "${API_BASE}/1.0/fuze/status"
Response
{
"status": "OK"
}
Save Policy
Save a Policy for later use and returns the saved Policy.
Base URL: /1.0/fuze/{{cluster-name}}/savePolicy/{{policy-name}}
Method: POST
Parameter | Description |
---|---|
cluster-name | Name of the cluster to save the policy on |
policy-name | Name of the policy |
Request
This request saves a Distributor policy. For details on how to create a policy for a particular Fuze component, see the section for that component. All Fuze components use the same API endpoints for basic interaction with policies.
curl --location --request POST "${API_BASE}/1.0/fuze/default/savePolicy/example-distributor-policy" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${BEARER_TOKEN}" \
--data-raw '{
"metadata": {
"description": "Handle txData related to NLP.",
"displayName": "example-distributor-policy"
},
"listeningBrokerEndpoint": {
"broker": "RABBITMQ",
"exchange": "com.graphgrid.fuze.exchange",
"routingKey": "com.graphgrid.fuze.key",
"queue": "com.graphgrid.fuze.queue"
},
"forwardingRules": [
{
"cypher": "WITH {txData} AS txData WITH txData.createdRelationships AS rels WITH [k IN KEYS(rels) WHERE ANY(x IN rels[k] WHERE x.type = \"SUMMARY_SENTENCE\") | rels[k].uidOfStartNode] AS resultList UNWIND resultList as annotatedTextUid WITH annotatedTextUid, apoc.text.split(annotatedTextUid, \":`\", 3)[-1] AS tmp WITH annotatedTextUid, substring(tmp, 0, length(tmp)-1) AS annotatedTextGrn MATCH (a:Article)-[:HAS_ANNOTATED_TEXT]->(an:AnnotatedText {grn: annotatedTextGrn}) WITH COLLECT(DISTINCT \"``:``:`\" + a.grn + \"`\") AS articleGrns RETURN {numTries: 0, policyName: \"default-index-policy\", clusterName: \"default\", transactionId: timestamp(), indexName: \"article\", strategyUsed: \"defaultBroker\", idPartition: apoc.convert.toString(articleGrns)} AS results",
"resultKey": "results",
"multicast": {
"retry": 0,
"stopOnFailure": false,
"brokers": [
{
"broker": "RABBITMQ",
"exchange": "com.graphgrid.search.exchange",
"routingKey": "com.graphgrid.search.key",
"queue": "com.graphgrid.search.queue"
}
]
}
}
],
"neo4jCredentials": {
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "*****"
}
}'
Response
{
"policy": {
"metadata": {
"description": "Handle txData related to NLP.",
"displayName": "example-distributor-policy",
"createdAt": "2019-10-30T19:26:37+00:00",
"updatedAt": "2021-03-05T15:03:27.302Z",
"versions": []
},
"status": "ACTIVE",
"listeningBrokerEndpoint": {
"broker": "RABBITMQ",
"topic": null,
"exchange": "com.graphgrid.fuze.exchange",
"exchangeType": "topic",
"routingKey": "com.graphgrid.fuze.key",
"durable": true,
"exclusive": false,
"autoDelete": false,
"prefetchCount": 250,
"queue": "com.graphgrid.fuze.queue",
"region": null,
"bucket": null
},
"forwardingRules": [
{
"cypher": "WITH {txData} AS txData WITH txData.createdRelationships AS rels WITH [k IN KEYS(rels) WHERE ANY(x IN rels[k] WHERE x.type = \"SUMMARY_SENTENCE\") | rels[k].uidOfStartNode] AS resultList UNWIND resultList as annotatedTextUid WITH annotatedTextUid, apoc.text.split(annotatedTextUid, \":`\", 3)[-1] AS tmp WITH annotatedTextUid, substring(tmp, 0, length(tmp)-1) AS annotatedTextGrn MATCH (a:Article)-[:HAS_ANNOTATED_TEXT]->(an:AnnotatedText {grn: annotatedTextGrn}) WITH COLLECT(DISTINCT \"``:``:`\" + a.grn + \"`\") AS articleGrns RETURN {numTries: 0, policyName: \"default-index-policy\", clusterName: \"gg-dev-neo\", transactionId: timestamp(), indexName: \"article\", strategyUsed: \"defaultBroker\", idPartition: apoc.convert.toString(articleGrns)} AS results",
"resultKey": "results",
"multicast": {
"retry": 0,
"stopOnFailure": false,
"brokers": [
{
"broker": "RABBITMQ",
"topic": null,
"exchange": "com.graphgrid.search.exchange",
"exchangeType": "topic",
"routingKey": "com.graphgrid.search.key",
"durable": true,
"exclusive": false,
"autoDelete": false,
"prefetchCount": 250,
"queue": "com.graphgrid.search.queue",
"region": null,
"bucket": null
}
]
}
}
],
"neo4jCredentials": {
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "*****"
}
}
}
Retrieve Policy
Returns a saved Policy.
Base URL: /1.0/fuze/{{cluster-name}}/retrievePolicy/{{policy-name}}
Method: GET
Parameter | Description |
---|---|
cluster-name | Name of the cluster on which the policy is saved |
policy-name | Name of the policy |
Delete Policy
Deactivate and delete the policy.
Base URL: /1.0/fuze/{{cluster-name}}/deletePolicy/{{policy-name}}
Method: DELETE
Parameter | Description |
---|---|
cluster-name | Name of the cluster on which the policy is saved |
policy-name | Name of the policy |
Activate Policy
Sets the status field on the Policy to ACTIVE
. Fuze will then start using the Policy when it next polls for active policies.
Base URL: /1.0/fuze/{{cluster-name}}/activatePolicy/{{policy-name}}
Method: POST
Parameter | Description |
---|---|
cluster-name | Name of the cluster on which the policy is saved |
policy-name | Name of the policy |
Deactivate Policy
Sets the status field on the Policy to INACTIVE
. Fuze will then stop using the Policy and cleanup any resources associated with
it the next time it polls for active policies.
Base URL: /1.0/fuze/{{cluster-name}}/deactivatePolicy/{{policy-name}}
Method: POST
Parameter | Description |
---|---|
cluster-name | Name of the cluster on which the policy is saved |
policy-name | Name of the policy |
Distributor
Sometimes a message needs to be sent on a message broker, or sent to multiple endpoints on one or more different brokers. The Distributor simplifies this by providing a single listening endpoint that forwards its received messages to any number of other broker endpoints.
The Distributor uses Geequel to filter which messages are forwarded, so any data transformation that can happen in Geequel can be performed as well.
Setting up the Distributor
A few steps are needed before message forwarding can begin.
- Create an S3 bucket for Fuze, if one does not already exist in the target environment
- Set the AWS param store value for
/1.4/fuze/<env>/spring.aws.bucket
to the bucket name created in step 1 - Create a distribution policy (see Distribution Policy))
- Set up the listening broker endpoint from the policy (RabbitMQ requires an exchange, queue, and routing key; SQS requires a queue; Kafka will create the topic automatically)
- Deploy Fuze to the target environment
- Upload the distribution policy created in step 3 (see Save Policy)
- Activate the distribution policy (see Activate Policy)
- If incoming data will be coming from ONgDB, install a trigger to send transaction data to the listening broker. This can also be set up by a Trigger Policy.
An example trigger:
CALL apoc.trigger.add("gg-dev-fuze-transactionData", "CALL apoc.broker.send(\"rabbitmq\", {txData}, {exchangeName: \"com.graphgrid.fuze.exchange\", routingKey:
\"com.graphgrid.fuze.key\", queueName: \"com.graphgrid.fuze.queue\"}) YIELD connectionName, message, configuration RETURN 1", {phase: "after"}, {uidKey: "grn"})
Distribution Policy
A JSON document that defines the broker endpoint to listen to for TransactionData
coming from the graph and a number of forwarding rules that define where to
send the data. Each of these forwarding rules may also contain some Geequel to transform the data.
A distribution policy has the following elements:
- metadata - Information to uniquely identify the policy.
- listeningBrokerEndpoint - Defines the message broker to use.
- neo4jCredentials - The URI and credentials for a ONgDB instance.
- forwardingRules - The broker endpoints where received messages should be sent.
Forwarding Rules
A List of all broker endpoints to forward the received message to and Geequel to filter and/or transform the message before sending it to each endpoint.
Parameter | Description |
---|---|
cypher string | The Geequel that filters and/or transforms the received message. If {txData} is included in the query it is assumed that the query begins with WITH {txData} AS <varName> and {txData} will be replaced by apoc.convert.fromJsonMap("<txData>") . Otherwise WITH apoc.convert.fromJsonMap("<txData>") AS txData will be prepended to the query. |
resultKey string | (optional) The key that results from the Geequel query will be wrapped in. Allows the forwarded message to just be the data of interest, rather than wrapped in an object. |
multicast object | Contains a list of broker endpoints (brokers) to forward the transformed message to, if the message passes the filter. Each item in this list requires the broker. For KAFKA, it additionally requires a topic. For RABBITMQ it requires an exchange and routingKey. For SQS it requires a region, queue, and bucket. |
Forwarding Rules Example
{
"forwardingRules": [
{
"cypher": "WITH {txData} AS txData WITH txData.assignedNodeProperties.byLabel.Movie AS assignedMovieProperties, txData WITH [k IN KEYS(assignedMovieProperties) WHERE ANY(x IN assignedMovieProperties[k] WHERE x.key=\"released\" AND x.value>=2000)] AS grnsWeWant UNWIND grnsWeWant AS result RETURN result",
"resultKey": "result",
"multicast": {
"brokers": [
{
"broker": "KAFKA",
"topic": "com.graphgrid.ml.movieTopic"
},
{
"broker": "RABBITMQ",
"exchange": "com.graphgrid.ml.exchange",
"routingKey": "com.graphgrid.ml.movieRoutingKey"
},
{
"broker": "SQS",
"region": "us-west-2",
"queue": "graphrid-ml-movieQueue",
"bucket": "graphgrid-ml-fuze"
}
]
}
},
{
"cypher": "WITH txData.assignedNodeProperties.byLabel.Person AS assignedPersonProperties, txData WITH [k IN KEYS(assignedPersonProperties) WHERE ANY(x IN assignedPersonProperties[k] WHERE x.key=\"born\" AND x.value<1980)] AS grnsWeWant UNWIND grnsWeWant AS result RETURN result",
"multicast": {
"brokers": [
{
"broker": "KAFKA",
"topic": "com.graphgrid.ml.personTopic"
},
{
"broker": "RABBITMQ",
"exchange": "com.graphgrid.ml.exchange",
"routingKey": "com.graphgrid.ml.personRoutingKey"
},
{
"broker": "SQS",
"region": "us-west-2",
"queue": "graphrid-ml-personQueue",
"bucket": "graphgrid-ml-fuze"
}
]
}
}
]
}
This example defines two forwarding rules. The first filters for movies released in the 21st century and forwards a subset of the TransactionData
to each of a
Kafka, a RabbitMQ, and an SQS endpoint. The second rule filters for people born before 1980 and forwards a subset of the TransactionData
to each of a Kafka, a
RabbitMQ, and an SQS endpoint.
Example Distribution Policy
This policy listens for messages on the RabbitMQ exchange/queue com.graphgrid.fuze.exchange/com.graphgrid.fuze.queue. The messages in this case are coming from ONgDB in the form of transaction data sent with a trigger after changes are made to the database. This data is then run through 3 rules, each filtering for different changes in the graph (one for created relationships of a certain type, and the other two for changes to certain types of nodes). The filtered results are returned as a JSON object that is then sent to the RabbitMQ queue for Search.
{
"metadata": {
"description": "Send new NLP related nodes and relationships to Search for indexing.",
"displayName": "example-distributor-policy"
},
"listeningBrokerEndpoint": {
"broker": "RABBITMQ",
"exchange": "com.graphgrid.fuze.exchange",
"routingKey": "com.graphgrid.fuze.key",
"queue": "com.graphgrid.fuze.queue"
},
"neo4jCredentials": {
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
},
"forwardingRules": [
{
"cypher": "WITH {txData} AS txData WITH txData.createdRelationships AS rels WITH [k IN KEYS(rels) WHERE ANY(x IN rels[k] WHERE x.type = \"SUMMARY_SENTENCE\") | rels[k].uidOfStartNode] AS resultList UNWIND resultList as annotatedTextUid WITH annotatedTextUid, apoc.text.split(annotatedTextUid, \":`\", 3)[-1] AS tmp WITH annotatedTextUid, substring(tmp, 0, length(tmp)-1) AS annotatedTextGrn MATCH (a:Article)-[:HAS_ANNOTATED_TEXT]->(an:AnnotatedText {grn: annotatedTextGrn}) WITH COLLECT(DISTINCT \"``:``:`\" + a.grn + \"`\") AS articleGrns RETURN {numTries: 0, policyName: \"default-index-policy\", clusterName: \"gg-dev-neo\", transactionId: timestamp(), indexName: \"article\", strategyUsed: \"defaultBroker\", idPartition: apoc.convert.toString(articleGrns)} AS results",
"resultKey": "results",
"multicast": {
"brokers": [
{
"broker": "RABBITMQ",
"exchange": "com.graphgrid.search.exchange",
"routingKey": "com.graphgrid.search.key",
"queue": "com.graphgrid.search.queue"
}
]
}
}
]
}
Distribution Policy Management
All Fuze components use the same API endpoints for basic interaction with policies. Please refer to the policy management section to save, retrieve, activate, deactivate, and/or delete Distribution policies.
Get Active Distribution Policies
Returns a list of all active distribution policies.
Base URL: /1.0/fuze/{{cluster-name}}/activeDistributionPolicies
Method: GET
Parameter | Description |
---|---|
cluster-name | Name of the cluster on which the policy is saved |
Request
curl --location --request GET "${API_BASE}/1.0/fuze/default/activeDistributionPolicies" \
--header "Authorization: Bearer ${BEARER_TOKEN}"
Response
{
"activePolicies": [
{
"metadata": {
"description": "Send new NLP related nodes and relationships to Search for indexing.",
"displayName": "example-distributor-policy",
"createdAt": "2019-10-30T19:26:37+00:00",
"updatedAt": "2021-03-05T15:03:27.302Z",
"versions": []
},
"status": "ACTIVE",
"listeningBrokerEndpoint": {
"broker": "RABBITMQ",
"topic": null,
"exchange": "com.graphgrid.fuze.exchange",
"exchangeType": "topic",
"routingKey": "com.graphgrid.fuze.key",
"durable": true,
"exclusive": false,
"autoDelete": false,
"prefetchCount": 250,
"queue": "com.graphgrid.fuze.queue",
"region": null,
"bucket": null
},
"forwardingRules": [
{
"cypher": "WITH {txData} AS txData WITH txData.createdRelationships AS rels WITH [k IN KEYS(rels) WHERE ANY(x IN rels[k] WHERE x.type = \"SUMMARY_SENTENCE\") | rels[k].uidOfStartNode] AS resultList UNWIND resultList as annotatedTextUid WITH annotatedTextUid, apoc.text.split(annotatedTextUid, \":`\", 3)[-1] AS tmp WITH annotatedTextUid, substring(tmp, 0, length(tmp)-1) AS annotatedTextGrn MATCH (a:Article)-[:HAS_ANNOTATED_TEXT]->(an:AnnotatedText {grn: annotatedTextGrn}) WITH COLLECT(DISTINCT \"``:``:`\" + a.grn + \"`\") AS articleGrns RETURN {numTries: 0, policyName: \"default-index-policy\", clusterName: \"gg-dev-neo\", transactionId: timestamp(), indexName: \"article\", strategyUsed: \"defaultBroker\", idPartition: apoc.convert.toString(articleGrns)} AS results",
"resultKey": "results",
"multicast": {
"retry": 0,
"stopOnFailure": false,
"brokers": [
{
"broker": "RABBITMQ",
"topic": null,
"exchange": "com.graphgrid.search.exchange",
"exchangeType": "topic",
"routingKey": "com.graphgrid.search.key",
"durable": true,
"exclusive": false,
"autoDelete": false,
"prefetchCount": 250,
"queue": "com.graphgrid.search.queue",
"region": null,
"bucket": null
}
]
}
}
],
"neo4jCredentials": {
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "*****"
}
}
]
}
ONgDB Writer
Fuze provides a service for writing to ONgDB that offers robust failure handling and retry mechanisms. It will attempt to execute any Geequel statements it receives at least once, retrying on transient failures, splitting requests around failed statements, and quarantining requests that could not commit to the graph, even after retry.
Requests may be made either through the REST API or via the message brokers. Both TransactionRequests and TransactionResults are also persisted to Elasticsearch.
ONgDB Writer Policy
A JSON document that defines the settings to use when executing a TransactionRequest and/or a list of message broker endpoints that listen for TransactionRequests to execute. A ONgDB Writer policy has the following elements:
- metadata - Information to uniquely identify the policy.
- defaultNeo4jCredentials - The Neo4jCredentials to use if a request does not specify any.
- listeningBrokerEndpoints - A list of broker endpoints on which to listen for requests.
- maxReadAttempts - The maximum number of times to attempt executing the statements in a read request before quarantining the request.
- maxWriteAttempts - The maximum number of times to attempt executing a write request before quarantining the request.
- minBatchSize - the minimum number of TransactionStatements that must be gathered into a batch request before executing.
- maxBatchSize - (currently unavailable) the maximum number of TransactionStatements that may be present in a batch request.
Example ONgDB Writer Policy
{
"metadata": {
"description": "The settings to use when making requests to ONgDB",
"displayName": "ongdbwriter-policy"
},
"listeningBrokerEndpoints": [
{
"broker": "KAFKA",
"topic": "com.graphgrid.fuze.neo4jWriter.topic"
}
],
"defaultNeo4jCredentials": {
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
},
"maxReadAttempts": 5,
"maxWriteAttempts": 5,
"minBatchSize": 20,
"maxBatchSize": 100
}
ONgED Writer Policy Management
All Fuze components use the same API endpoints for basic interaction with policies. Please refer to the policy management section to save, retrieve, activate, deactivate, and/or delete Distribution policies.
Get Active ONgDB Writer Policies
Returns a list of all active ONgDB Writer policies.
Base URL: /1.0/fuze/{{cluster-name}}/activeNeo4jWriterPolicies/{{policy-name}}
Method: GET
Parameter | Description |
---|---|
cluster-name | Name of the cluster on which the policy is saved |
Request
curl --location --request GET "${API_BASE}/1.0/fuze/default/activeNeo4jWriterPolicies" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${BEARER_TOKEN}" \
Response
{
"activePolicies": [
{
"metadata": {
"description": "The settings to use when making requests to ONgDB",
"displayName": "ongdbwriter-policy",
"createdAt": "2021-03-05T15:53:45.646Z",
"updatedAt": "2021-03-05T15:53:50.615Z"
},
"status": "ACTIVE",
"defaultNeo4jCredentials": {
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
},
"txRequestBrokerEndpoints": null,
"txDataBrokerEndpoints": null,
"lastTransactionId": null,
"maxWriteAttempts": 5,
"maxReadAttempts": 5,
"minBatchSize": 20,
"maxBatchSize": 100
}
]
}
Write
Returns a TransactionResult object. Ignores the requestType field of the request and executes the request in write mode.
Base URL: /1.0/fuze/write
Method: POST
The write endpoint requires a json body called a Transaction Request which is detailed below. For an example of a full Write request see Write with Policy
Transaction Request
A JSON document that defines a request for the ONgDB Writer to execute.
Parameter | Description |
---|---|
statements TransactionStatement list | A list of TransactionStatement objects containing the Geequel to execute this request. Default: null |
metadata object | A map of metadata for the client to use. Default: null |
requestType enum | The execution mode under which this request should be run. Options are READ and WRITE . Default: WRITE |
route object list | A list of message broker endpoints where the TransactionResult object created by executing this request should be sent. Default: null |
requiredBookmarks string list | A list of ONgDB bookmarks that must be present in the graph before this request's statements may execute. Default: null |
splitAllowed boolean | Indicates whether or not the request may be split into child requests in the event of a failure while executing a statement. Set to false if all statements in the request must succeed as one unit. Default: true |
Neo4jCredentials Neo4jCredentials list | A list of Neo4jCredentials for the ONgDB instances on which the statements should be run. Default: null |
requiredBookmarks is not currently available as an option when making batchable requests and any provided value will be ignored. Note also that using the read and write (and readAsync and writeAsync) API endpoints overrides the requestType value. When a request is sent through one of the message brokers, the requestType field is used to determine the appropriate access mode to run under.
Transaction Statement
The statements field specifies the Geequel queries to execute. It consists of a list of TransactionStatements, each of which contain the following fields:
Parameter | Description |
---|---|
statement string | The Geequel statement to execute. Default: null |
parameters object | A map of parameter names to values to use in the statement. Default: null |
metadata object | A map of metadata for the client to use. Default: null |
actions object | A set of TransactionStatements to execute after a request either successfully executes or fails and is sent to quarantine. Default: null |
Actions
Parameter | Description |
---|---|
onSuccess TransactionStatement list | A list of TransactionStatements to run after this statement succeeds. An example use might be sending a broker message with APOC indicating that it succeeded. These statements are run after the request succeeds and any potential changes have been committed to the graph. Default: null |
onFailure TransactionStatement list | A list of TransactionStatements to run after this statement fails. An example use might be sending a broker message with APOC indicating that it failed. These statements are run after the request has failed and is quarantined. Default: null |
TransactionResult
A JSON document containing a list of values returned by executing the request and the input request with some additional values to assist in profiling and debugging requests.
Field | Meaning |
---|---|
records object list | The list of values returned by running the statements of a request. Default: [] |
txRequest TransactionRequest | The request that caused the creation of this result object. Contains some additional fields Default: null |
Additional Transaction Request Fields
In addition to the fields provided when creating the request, the returned request contains some additional fields that may be useful when profiling the request or debugging failures.
Field | Meaning |
---|---|
fuzeId string | The UUID assigned to this request. |
batchId object | The UUID of the batch request this one was a part of. Default: null |
parentId object | The UUID of this request's parent. Default: null |
status enum | The current status of this request. |
returned bookmark string | The last ONgDB bookmark created after running the statements in this request. Default: null |
queryAttemptNumber integer | The number of attempts made so far to run the statements in this request. |
creationTime long | The time, in milliseconds, when this request was created. |
startTime long | The time, in milliseconds, when execution of this request started. Default: 0 |
endTime long | The time, in milliseconds, when execution of this request finished. Default: 0 |
executionDuration long | The number of milliseconds spent executing this request. Default: -1 |
children TransactionRequest list | A list of child requests resulting from splitting the request. Splits may occur either due to errors when running a statement or because multiple Neo4jCredentials were provided (in this case each child contains the same statements but only one set of credentials). Default: null |
errorIndex integer | The index of the statement that caused an error to occur. Default: null |
errorCode integer | The ONgDB code of the error that occurred. Default: null |
errorMessage string | The error message of the error that occurred. Default: null |
Transaction Request Status
A request may be in one of several states:
- DOES_NOT_EXIST - A request with this fuzeId has not been created.
- NOT_RUN - The request has been created, but execution has not yet started.
- BATCH - The request contains several child requests whose statements will be run under the same ONgDB session.
- SPLIT - The request was broken into child requests before execution was attempted. This usually occurs when the request contains more than one set of
Neo4jCredentials
- IN_PROGRESS - Execution of this request is ongoing.
- SUCCEEDED - Execution of this request succeeded without any problems.
- FAILED_WITH_RETRY - Execution of this request failed, but it was retried. See the children for subsequent attempts. See the
errorIndex
,errorCode
, anderrorMessage
for details about the failure. - FAILED - Execution of this request failed with an error that cannot be retried. See the
errorIndex
,errorCode
, and `errorMessage for details about the failure.
Additional Transaction Statemenet Fields
Like the TransactionRequest containing the statements, each TransactionStatement in a returned request also contains some additional fields.
Field | Meaning |
---|---|
executed boolean | The request that caused the creation of this result object. Contains some additional fields. Default: null |
startTime long | The time, in milliseconds, when execution of this statement started. Default: 0 |
endTime long | The time, in milliseconds, when execution of this statement finished. Default: 0 |
executionDuration long | The number of milliseconds spent executing this statement. Default: -1 |
status enum | The current state of this statement. Possible values are NOT_RUN , IN_PROGRESS , SUCCEEDED , and FAILED . |
Example
This is a sample of the TransactionResult from a request to retrieve the properties of every Person node in the graph:
{
"txRequest": {
"statements": [
{
"statement": "MATCH (p:Person) RETURN PROPERTIES(p)",
"parameters": null,
"metadata": null,
"actions": null,
"executed": true,
"startTime": 1553265673299,
"endTime": 1553265673359,
"status": "SUCCEEDED",
"executionDuration": 60
}
],
"fuzeId": "6523221b-61d0-4695-875c-70d565a2ea12",
"batchId": null,
"parentId": null,
"metadata": {
"id": "abc123"
},
"requestType": "WRITE",
"sendResultsTo": null,
"status": "SUCCEEDED",
"requiredBookmarks": null,
"returnedBookmark": null,
"queryAttemptNumber": 1,
"creationTime": 1553265670537,
"startTime": 1553265671272,
"endTime": 1553265674624,
"splitAllowed": true,
"children": null,
"errorIndex": null,
"errorCode": null,
"errorMessage": null,
"neo4jCredentials": [
{
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
}
],
"executionDuration": 3352
},
"records": [
{
"PROPERTIES(p)": {
"name": "David"
}
},
{
"PROPERTIES(p)": {
"name": "Keanu Reeves",
"born": 1964
}
},
{
"PROPERTIES(p)": {
"name": "Edgar"
}
}
]
}
Write With Policy
Returns a TransactionRequest object.
Uses the ONgDB Writer Policy {{policy-name}}
stored on cluster {{cluster-name}}
to determine settings for the maximum number of write attempts and the
default Neo4jCredentials to use if the TransactionRequest does not contain any.
Base URL: /1.0/fuze/{{cluster-name}}/write/{{policy-name}}
Method: POST
Ignores the requestType field of the request and executes the request in write mode.
Request
This request simply writes a single node.
curl --location --request POST "${API_BASE}/1.0/fuze/default/write/example-ongdbwriter-policy" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${BEARER_TOKEN}" \
--data-raw '{
"metadata": {
"statementId": 42
},
"statement": "MERGE (:Person {name: $name, born: $born})",
"parameters": {
"name": "Keanu Reeves",
"born": 1964
},
"actions": {
"onSuccess": [
{
"statement": "CALL apoc.broker.send(\"rabbitmq\", {msg: \"Successfully wrote statement $id\"}, {exchangeName: \"com.graphgrid.fuze.exchange\", routingKey: \"com.graphgrid.fuze.successKey\", queueName: \"com.graphgrid.fuze.successQueue\"}) YIELD connectionName, message, configuration RETURN 1",
"parameters": {
"id": 42
}
}
],
"onFailure": [
{
"statement": "CALL apoc.broker.send(\"rabbitmq\", {msg: \"Failed to write statement $id\"}, {exchangeName: \"com.graphgrid.fuze.exchange\", routingKey: \"com.graphgrid.fuze.failureKey\", queueName: \"com.graphgrid.fuze.failureQueue\"}) YIELD connectionName, message, configuration RETURN 1",
"parameters": {
"id": 42
}
}
]
}
}
'
Response
{
"transactionResult": {
"transactionRequest": {
"statements": [],
"fuzeId": "c7b891c3-8abc-4cfa-af13-635da90e5cf6",
"batchId": null,
"parentId": null,
"metadata": {
"statementId": 42
},
"requestType": "WRITE",
"sendResultsTo": null,
"status": "SUCCEEDED",
"requiredBookmarks": null,
"returnedBookmark": "neo4j:bookmark:v1:tx581",
"queryAttemptNumber": 1,
"creationTime": "2021-02-24T18:02:04.828Z",
"timeToCreate": null,
"startTime": "2021-02-24T18:02:04.829Z",
"endTime": "2021-02-24T18:02:04.831Z",
"splitAllowed": true,
"children": null,
"errorIndex": null,
"errorCode": null,
"errorMessage": null,
"neo4jCredentials": null,
"executionDuration": 2
},
"records": []
}
}
Write Async
Returns the fuzeId of the provided request. This value may be used to check the status of the request and, when results are available, to retrieve the TransactionResult generated from executing the request.
Ignores the requestType field of the request and executes the request in write mode.
Base URL: /1.0/fuze/writeAsync
Method: POST
For an example of a Write Async request see Write Async With Policy.
Write Async With Policy
Returns the fuzeId of the provided request. This value may be used to check the status of the request and, when results are available, to retrieve the TransactionResult generated from executing the request.
Uses the ONgDB Writer Policy {{policy-name}}
stored on cluster {{cluster-name}}
to determine settings for the maximum number of write attempts and the
default Neo4jCredentials to use if the TransactionRequest does not contain any.
Ignores the requestType field of the request and executes the request in write mode.
Base URL: /1.0/fuze/{{cluster-name}}/writeAsync/{{policy-name}}
Method: POST
Request
curl --location --request POST "${API_BASE}/1.0/fuze/default/writeAsync/example-ongdbwriter-policy" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${BEARER_TOKEN}" \
--data-raw '{
"metadata": {
"id": "24601"
},
"statements": [
{
"statement": "CREATE (:Person {name: $name})",
"parameters": {
"name": "Edgar"
}
}
],
"neo4jCredentials": [
{
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
}
]
}'
Response
{
"fuzeId": "4b5fe3f4-d27d-4f89-a8ea-bac14b390b1d"
}
Read
Return a TransactionResult object.
Ignores the requestType field of the request and executes the request in read mode.
Base URL: /1.0/fuze/read
Method: POST
For an example of a Read request see Read With Policy.
Read With Policy
Return a TransactionResult object.
Uses the ONgDB Writer Policy {{policy-name}}
stored on cluster {{cluster-name}}
to determine settings for the maximum number of read
attempts and the default Neo4jCredentials to use if the
TransactionRequest does not contain any.
Ignores the requestType
field of the request and executes the request in read mode.
Base URL: /1.0/fuze/{{cluster-name}}/read/{{policy-name}}
Method: POST
Request
curl --location --request GET "${API_BASE}/1.0/fuze/default/read/example-ongdbwriter-policy" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${BEARER_TOKEN}" \
--data-raw '{
"metadata": {
"id": "abc123"
},
"statements": [
{
"statement": "MATCH (p:Person) RETURN PROPERTIES(p)"
}
],
"neo4jCredentials": [
{
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
}
]
}'
Response
{
"transactionResult": {
"transactionRequest": {
"statements": [
{
"statement": "MATCH (p:Person) RETURN PROPERTIES(p)",
"parameters": null,
"startTime": "2021-02-24T18:09:31.353Z",
"endTime": "2021-02-24T18:09:31.357Z",
"executionDuration": 4,
"metadata": null,
"actions": null,
"status": "SUCCEEDED",
"executed": true
}
],
"fuzeId": "780b2c3b-2821-4e91-a287-711e30cf8e6f",
"batchId": null,
"parentId": null,
"metadata": {
"id": "abc123"
},
"requestType": "READ",
"sendResultsTo": null,
"status": "SUCCEEDED",
"requiredBookmarks": null,
"returnedBookmark": null,
"queryAttemptNumber": 1,
"creationTime": "2021-02-24T18:09:31.351Z",
"timeToCreate": null,
"startTime": "2021-02-24T18:09:31.352Z",
"endTime": "2021-02-24T18:09:31.358Z",
"splitAllowed": true,
"children": null,
"errorIndex": null,
"errorCode": null,
"errorMessage": null,
"neo4jCredentials": [
{
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
}
],
"executionDuration": 6
},
"records": [
{
"PROPERTIES(p)": {
"name": "Edgar",
"createdAt": "2021-02-24T16:25:19+00:00",
"grn": "grn:gg:person:iY4Ehm0Bb3Jccu42S8rmjvvKkUqz3lc9qmZtBTFkrLeV",
"lastSearchIndexedAt": "0",
"updatedAt": "2021-02-24T16:25:19+00:00"
}
},
{
"PROPERTIES(p)": {
"createdAt": "2021-02-11T16:21:22+00:00",
"grn": "grn:gg:person:pyXToRhZEh5iIZq5GLGNvrTiXmSMEEAjcdHEdL3jIcTq",
"lastSearchIndexedAt": "0",
"born": 1964,
"name": "Keanu Reeves",
"index": "Person",
"updatedAt": "2021-02-11T16:21:22+00:00"
}
}
]
}
Read Async
Returns the fuzeId of the provided request. This value may be used to check the status of the request and, when results are available, to retrieve the TransactionResult generated from executing the request.
Ignores the requestType field of the request and executes the request in read mode.
Base URL: /1.0/fuze/readAsync
Method: POST
For an example of a Read Async request see Read Async With Policy.
Read Async With Policy
Returns the fuzeId of the provided request. This value may be used to check the status of the request and, when results are available, to retrieve the TransactionResult generated from executing the request.
Uses the ONgDB Writer Policy {{policy-name}}
stored on cluster {{cluster-name}}
to determine settings for the maximum number of read attempts and the
default Neo4jCredentials to use if the TransactionRequest does not contain any.
Ignores the requestType field of the request and executes the request in read mode.
Base URL: /1.0/fuze/{{cluster-name}}/readAsync/{{policy-name}}
Method: POST
Request
curl --location --request GET "${API_BASE}/1.0/fuze/default/readAsync/example-ongdbwriter-policy" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${BEARER_TOKEN}" \
--data-raw '{
"metadata": {
"id": "abc123"
},
"statements": [
{
"statement": "MATCH (p:Person) RETURN PROPERTIES(p)"
}
],
"neo4jCredentials": [
{
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
}
]
}'
Response
{
"fuzeId": "389627ad-129a-4888-9048-ae9487b21ee8"
}
Check Request Status
Accepts the fuzeId string as a query parameter. Returns the current status of the request. Possible values are defined and explained here.
Base URL: /1.0/fuze/checkTransactionRequestStatus/{{fuzeId}}
Method: GET
Request
We will be using the fuzeId
from the Read Async with Policy response for the following transaction examples.
curl --location --request GET "${API_BASE}/1.0/fuze/checkTransactionRequestStatus/389627ad-129a-4888-9048-ae9487b21ee8" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${BEARER_TOKEN}"
Response
{
"status": "SUCCEEDED"
}
Get Transaction Request
Base URL: /1.0/fuze/getTransactionRequest/{{fuzeId}}
Method: GET
Accepts the fuzeId string as a query parameter. Returns the TransactionRequest with that fuzeId, if it exists.
Request
curl --location --request GET "${API_BASE}/1.0/fuze/getTransactionRequest/389627ad-129a-4888-9048-ae9487b21ee8" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${BEARER_TOKEN}"
Response
{
"transactionRequest": {
"statements": [
{
"statement": "MATCH (p:Person) RETURN PROPERTIES(p)",
"parameters": null,
"startTime": "2021-02-25T16:48:40.583Z",
"endTime": "2021-02-25T16:48:40.585Z",
"executionDuration": 2,
"metadata": null,
"actions": null,
"status": "SUCCEEDED",
"executed": true
}
],
"fuzeId": "389627ad-129a-4888-9048-ae9487b21ee8",
"batchId": null,
"parentId": null,
"metadata": {
"id": "abc123"
},
"requestType": "READ",
"sendResultsTo": null,
"status": "SUCCEEDED",
"requiredBookmarks": null,
"returnedBookmark": null,
"queryAttemptNumber": 1,
"creationTime": "2021-02-25T16:48:40.580Z",
"timeToCreate": null,
"startTime": "2021-02-25T16:48:40.582Z",
"endTime": "2021-02-25T16:48:40.586Z",
"splitAllowed": true,
"children": null,
"errorIndex": null,
"errorCode": null,
"errorMessage": null,
"neo4jCredentials": [
{
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
}
],
"executionDuration": 4
}
}
Get Transaction Result
Base URL: 1.0/fuze/getTransactionResult/{{fuzeId}}
Method: GET
Accepts the fuzeId string as a query parameter. Returns the TransactionRequest generated by executing the request with that id, if it is available.
Request
curl --location --request GET "${API_BASE}/1.0/fuze/getTransactionResult/389627ad-129a-4888-9048-ae9487b21ee8" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${BEARER_TOKEN}"
Response
{
"transactionResult": {
"transactionRequest": {
"statements": [
{
"statement": "MATCH (p:Person) RETURN PROPERTIES(p)",
"parameters": null,
"startTime": "2021-02-25T16:48:40.583Z",
"endTime": "2021-02-25T16:48:40.585Z",
"executionDuration": 2,
"metadata": null,
"actions": null,
"status": "SUCCEEDED",
"executed": true
}
],
"fuzeId": "389627ad-129a-4888-9048-ae9487b21ee8",
"batchId": null,
"parentId": null,
"metadata": {
"id": "abc123"
},
"requestType": "READ",
"sendResultsTo": null,
"status": "SUCCEEDED",
"requiredBookmarks": null,
"returnedBookmark": null,
"queryAttemptNumber": 1,
"creationTime": "2021-02-25T16:48:40.580Z",
"timeToCreate": null,
"startTime": "2021-02-25T16:48:40.582Z",
"endTime": "2021-02-25T16:48:40.586Z",
"splitAllowed": true,
"children": null,
"errorIndex": null,
"errorCode": null,
"errorMessage": null,
"neo4jCredentials": [
{
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
}
],
"executionDuration": 4
},
"records": [
{
"PROPERTIES(p)": {
"name": "Edgar",
"createdAt": "2021-02-24T16:25:19+00:00",
"grn": "grn:gg:person:iY4Ehm0Bb3Jccu42S8rmjvvKkUqz3lc9qmZtBTFkrLeV",
"lastSearchIndexedAt": "0",
"updatedAt": "2021-02-24T16:25:19+00:00"
}
}
]
}
}
View Quarantine
Returns a list of TransactionRequests that have failed in a way such that they are no longer being retried.
Base URL: /1.0/fuze/viewQuarantine
Method: GET
Before we jump into this request let's set up an example. Let's say we accidently made a single statement write request with a snytax error. Our json body could like this:
{
"statements": [
{
"statement": "CREAT (:Person {name: $name})",
"parameters": {
"name": "Freda"
}
}
],
"neo4jCredentials": [
{
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
}
]
}
Notice that our statement says CREAT
instead of CREATE
. This request will fail and show up in our quarantine queue.
Request
curl --location --request GET "${API_BASE}/1.0/fuze/viewQuarantine" \
--header "Authorization: Bearer ${BEARER_TOKEN}"
Response
{
"quarantine": [
{
"statements": [
{
"statement": "CREAT (:Person {name: $name})",
"parameters": {
"name": "Freda"
},
"startTime": "2021-02-25T17:05:42.582Z",
"endTime": "2021-02-25T17:05:42.583Z",
"executionDuration": 1,
"metadata": null,
"actions": null,
"status": "FAILED",
"executed": true
}
],
"fuzeId": "9e9d8b4c-53ab-47f4-b951-6a947a516622",
"batchId": null,
"parentId": null,
"metadata": null,
"requestType": "WRITE",
"sendResultsTo": null,
"status": "FAILED",
"requiredBookmarks": null,
"returnedBookmark": null,
"queryAttemptNumber": 1,
"creationTime": "2021-02-25T17:05:42.584Z",
"timeToCreate": null,
"startTime": "1970-01-01T00:00:00Z",
"endTime": "1970-01-01T00:00:00Z",
"splitAllowed": true,
"children": null,
"errorIndex": null,
"errorCode": "Neo.ClientError.Statement.SyntaxError",
"errorMessage": "Invalid input ' ': expected 'e/E' (line 1, column 6 (offset: 5))\n\"CREAT (:Person {name: $name})\"\n ^",
"neo4jCredentials": [
{
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
}
],
"executionDuration": -1
}
]
}
Clear Quarantine
Clears the quarantine from memory and from Elasticsearch. This endpoint does not return anything.
Base URL: /1.0/fuze/clearQuarantine
Method: DELETE
Request
curl --location --request DELETE "${API_BASE}/1.0/fuze/clearQuarantine" \
--header "Authorization: Bearer ${BEARER_TOKEN}"
Response
200 OK
Queue Request for Batching
Returns the fuzeId of the provided request. This value may be used to check the status of the request and, when results are available, to retrieve the TransactionResult generated from executing the request.
Requests sent to this endpoint will not be executed until the batchAndExecuteQueuedRequests endpoint is called.
Base URL: /1.0/fuze/{{cluster-name}}/queueRequestForBatching
Method: POST
Request
curl --location --request POST "${API_BASE}/1.0/fuze/queueRequestForBatching" \
--header "Authorization: Bearer ${BEARER_TOKEN}" \
--header 'Content-Type: application/json' \
--data-raw '{
"metadata": {
"id": "24601"
},
"statements": [
{
"statement": "CREATE (:Person {name: $name})",
"parameters": {
"name": "David"
}
}
],
"neo4jCredentials": [
{
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "xxxxx"
}
]
}'
{
"fuzeId": "a12721c7-4126-4085-86d6-1ab8532fa61d"
}
Batch and Execute Queued Requests
Runs the queued requests in batches. Requests are grouped for batching by their neo4jCredentials. Basically, all requests to write to a particular Neo4j instance will be executed together. This endpoint does not return anything.
Base URL: /1.0/fuze/{{cluster-name}}/batchAndExecuteQueuedRequests
Method: POST
curl --location --request POST "${API_BASE}/1.0/fuze/batchAndExecuteQueuedRequests" \
--header "Authorization: Bearer ${BEARER_TOKEN}" \
--header 'Content-Type: application/json'
Response
200 OK
Trigger Manager
Ensure that each APOC trigger exists on the specified ONgDB instances.
Trigger Policy
Parameter | Description |
---|---|
metadata metadata | A map of metadata for the client to use Default: null |
instances Map< string, Neo4jCredentials > | The collection of ONgDB instances that each trigger may reference. Default: null |
triggers List< Trigger > | The list of triggers to add/remove when de/activating the policy Default: null |
Trigger
Parameter | Description |
---|---|
triggerName string | Name of the triggers as it should appear in ONgDB Default: null |
triggerCypher string | Geequel to execute when the trigger runs. Default: null |
phase string | The phase to run the trigger in. Options are before and after Default: null |
uidKeys List< string > | (optional) ordered set of properties to prioritize when creating uids (used for txData) Default: [] |
uidLabels List< string > | (optional) ordered set of labels to prioritize when creating uids (used for txData) Defaul: [] |
includedInstances List < string > | The list of keys from the instances field in the policy specifying which instances the trigger applies to. Default: null |
excludedInstances List < string > | The list of keys from the instances field in the policy specifying which instances the trigger does not apply to. Default: null |
A Trigger cannot have both includedInstances
and excludedInstances
set, and if neither includedInstances
and excludedInstances
are set, then the
trigger will apply to all database instances in the instances
Map.
Example Trigger Policies
Triggers to apply GRNs to new nodes and relationships:
{
"metadata": {
"description": "Ensure that the txData trigger exists on ongdb",
"displayName": "trigger-policy"
},
"instances": {
"db1": {
"uri": "bolt://someHost:7687",
"username": "ongdb",
"password": "***"
},
"db2": {
"uri": "bolt://someOtherHost:7687",
"username": "ongdb",
"password": "***"
}
},
"triggers": [
{
"triggerName": "addGrnToCreatedNodes",
"triggerCypher": "UNWIND {createdNodes} AS n WITH n WHERE NOT EXISTS (n.grn) CALL apoc.do.when( size(labels(n)) > 1, \"SET n.grn = 'grn:gg:' + lower(filter(l IN labels(n)WHERE l <> 'Resource')[0]) + ':' + apoc.text.random(44)\" , \"SET n.grn = 'grn:gg:' + toLower(labels(n)[0]) + ':' + apoc.text.random(44)\", {n : n } ) yield value RETURN n",
"phase": "before"
},
{
"triggerName": "addGrnToCreatedRelationships",
"triggerCypher": "UNWIND {createdRelationships} AS r WITH r WHERE NOT exists(r.grn) SET r.grn = 'grn:gg:' + toLower(type(r)) + ':' + apoc.text.random(44)",
"phase": "before"
}
]
}
Trigger Policy Management
All Fuze components use the same API endpoints for basic interaction with policies. Please refer to the policy management section to save, retrieve, activate, deactivate, and/or delete Distribution policies.
Get Active Trigger Policies
Returns a list of all active trigger policies.
Base URL: /1.0/fuze/activeTriggerPolicies
Method: GET
Request
curl
curl --location --request GET "${API_BASE}/1.0/fuze/activeTriggerPolicies" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${BEARER_TOKEN}"
Response
{
"activePolicies": [
{
"metadata": {
"description": "Ensure that the txData trigger exists on ongdb",
"displayName": "trigger-policy",
"createdAt": "2021-03-05T15:13:56.65Z",
"updatedAt": "2021-03-05T15:14:08.937Z",
"versions": []
},
"status": "ACTIVE",
"instances": {
"db1": {
"uri": "bolt://someHost:7687",
"username": "ongdb",
"password": "***"
},
"db2": {
"uri": "bolt://someOtherHost:7687",
"username": "ongdb",
"password": "***"
}
},
"triggers": [
{
"triggerName": "addGrnToCreatedNodes",
"triggerCypher": "UNWIND {createdNodes} AS n WITH n WHERE NOT EXISTS (n.grn) CALL apoc.do.when( size(labels(n)) > 1, \"SET n.grn = 'grn:gg:' + lower(filter(l IN labels(n)WHERE l <> 'Resource')[0]) + ':' + apoc.text.random(44)\" , \"SET n.grn = 'grn:gg:' + toLower(labels(n)[0]) + ':' + apoc.text.random(44)\", {n : n } ) yield value RETURN n",
"phase": "before",
"uidKeys": null,
"uidLabels": null,
"includedInstances": null,
"excludedInstances": null
},
{
"triggerName": "addGrnToCreatedRelationships",
"triggerCypher": "UNWIND {createdRelationships} AS r WITH r WHERE NOT exists(r.grn) SET r.grn = 'grn:gg:' + toLower(type(r)) + ':' + apoc.text.random(44)",
"phase": "before",
"uidKeys": null,
"uidLabels": null,
"includedInstances": null,
"excludedInstances": null
}
]
}
]
}
Create a Notification Trigger
A trigger policy is capable of setting up event listeners that will send transaction data to the com.graphgrid.ingest
queue in RabbitMQ. This data is used
to create a Notification
node which connects to the user's UserInbox
node.
Example Fuze Notification Trigger Policy
This trigger policy forwards a query to look for any new movies that are added to the graph that Keanu Reeves has acted in. Notice that the query is unwinding
the txData
, (transaction data), and is looking to see if a relationship of ACTED_IN
has been created for the Person
with the property name
equal to "Keanu
Reeves". If this query finds a match then a notification will be created.
In this case it is important to note that a property change on an existing node (the "Keanu Reeves" node)
is necessary for the transaction data to properly pick up. For example, if we were to only create a new relationship of ACTED_IN
between Keanu and a new movie,
the transaction data would not get picked up. If we added a new relationship and a new property (for example properties like role
or updatedAt
) to
the "Keanu Reeves"
Person
node the transaction data will be properly recognized by the trigger.
{
"metadata": {
"description": "On RabbitMQ, forward messages to rabbitmq publish queue for notifications.",
"displayName": "{{fuzePolicyName}}",
"createdAt": "2019-10-23T18:09:52+00:00",
"updatedAt": "2019-10-23T18:09:53+00:00",
"versions": null
},
"listeningBrokerEndpoint": {
"broker": "RABBITMQ",
"topic": null,
"exchange": "com.graphgrid.fuze.exchange",
"exchangeType": "topic",
"routingKey": "com.graphgrid.fuze.key",
"durable": true,
"exclusive": false,
"autoDelete": false,
"queue": "com.graphgrid.fuze.queue",
"region": null,
"bucket": null
},
"forwardingRules": [
{
"cypher": "WITH {txData} AS txData WITH txData.createdRelationships AS event, txData UNWIND KEYS(event) AS key WITH key,event[key] AS changes WHERE changes.type = 'ACTED_IN' MATCH (p:Person {name: 'Keanu Reeves', grn:replace(apoc.text.split(changes.uidOfStartNode, ':', 3)[-1],'`','')}) MERGE (n:NotificationType {name:'in-app'}) ON CREATE SET n.grn = 'grn:gg:' + toLower(head(labels(n))) + ':' + apoc.create.uuid() WITH n MATCH (u:User) WITH DISTINCT {userGrns: COLLECT(u.grn),message: 'New Notification!',title: 'New Notification', notificationTypeGrn: n.grn} AS results RETURN results",
"resultKey": "results",
"multicast": {
"retry": 0,
"stopOnFailure": false,
"brokers": [
{
"broker": "RABBITMQ",
"topic": null,
"exchange": "com.graphgrid.ingest",
"exchangeType": "topic",
"routingKey": "publish.key",
"durable": true,
"exclusive": false,
"autoDelete": false,
"queue": "com.graphgrid.ingest",
"region": null,
"bucket": null
}
]
}
}
],
"neo4jCredentials": {
"uri": "bolt://ongdb:7687",
"username": "ongdb",
"password": "admin"
}
}
Notification Object
The JSON used while creating a notification policy triggers the creation of a notification object. This object allows users to create customizable notifications with alert sounds, messages, and more.
Example of a Notification Object
{notificationTypeGrn: push.grn,userGrns: COLLECT(u.grn),message: '{\\\"APNS\\\":\\\"{\\\\\\\"aps\\\\\\\":{\\\\\\\"sound\\\\\\\":\\\\\\\"default\\\\\\\",\\\\\\\"badge\\\\\\\":9,\\\\\\\"alert\\\\\\\":\\\\\\\"A new movie starring Tom Hanks is showing near you!\\\\\\\"}}\\\"}
Below are all the possible properties of a notification object:
Property | Description |
---|---|
notificationTypeGrn string | The grn of the notificationType node. Valid types are push-notification ,voip-push-notification or in-app . |
userGrns list, string | A list of user grns that the user will send the notification/message to. |
groupGrns list, string | A list of group grns that the user will send the notification/message to. |
deviceType string | The type of device that the notification will be sent to. |
eventTrigger string | The event that will trigger the notification's creation. |
userCreatorGrn string | The user grn of the notification/message sender. |
fileNodeGrns list, string | A list of file node grns to be attached to the notificaiton/message. |
notificationDate string | The date the notification was sent/created. |
notificationTitle string | The title of the notification. |
showPopup boolean | Option (true /false ) to show a pop up upon notify. |
message string | A message delivered with the notification. The AWS metadata goes inside here, it can be either a string or a configured json. |
url string | A url delivered with the notification. |
iconCategory string | An icon delivered with the notification. |
doNotDisplay boolean | Option (true /false ) to display notification. |
internalMessage string | An internal message delivered with the notification. |
attributes list, string | Attributes assigned to the notification via creation of a MessageAttributeNode . |
messageStructure string | The message structure attributed to the notification. |
userDetailGrn string | The user detail grn. |
Information on Apple Push Notification service (APNs), and Firebase Cloud Messaging (FCM) for Android.
Fuze Worker
Receive data on some message broker endpoints and run Geequel either on a schedule or when data is received on a message broker.
Worker Policy
A JSON document containing a list of tasks that form a directed graph modeling the flow of data through the system.
Parameter | Description |
---|---|
metadata object | A map of metadata for the client to use Default: null |
workers worker list | A list of Workers to (de)activate as a group. Default: null |
defaultNeo4jCredentials | The default Neo4jCredentials to use if a Worker does not specify the ONgDB instance it runs on. Default: null |
status | The status of the Worker. |
Worker
A JSON document containing a list of tasks that form a directed graph modeling the flow of data through the system.
Parameter | Description |
---|---|
cypher string | A read-only Geequel snippet that performs some operation on any messages the worker receives. Default: null |
listeningEndpoints BrokerEndpoint list | A list of broker endpoints that listen for messages and execute the Geequel with the received message as input. Default: null |
neo4jCredentials Neo4jCredentials | The ONgDB instance to run the Geequel on. Default: null |
triggerType string | The type of trigger. Options are BROKER , FIXED_DELAY , and FIXED_RATE . Default: BROKER |
fixedDelay int | The time interval the Geequel should be repeatedly executed at. Default: 1000ms |
fixedRate int | A time interval for the delay between each run of the Geequel. Default: 1000ms |
Example Worker Policy
All Fuze components use the same API endpoints for basic interaction with policies. Please refer to save policy for more information.
{
"metadata": {
"displayName": "example-fuze-worker-policy",
"description": "Example Worker policy"
},
"workers": {
"periodicPeopleCount": {
"metadata": {
"name": "periodic-people-count",
"description": "Sends a count of Person nodes to a RabbitMQ queue every 60 seconds"
},
"cypher": "MATCH (p:Person) WITH COUNT(p) AS personCount CALL apoc.broker.send('rabbitmq', {personCount: personCount}, {queueName: 'com.graphgrid.queue.personCountResults', exchangeName: 'com.graphgrid.exchange', routingKey: 'com.graphgrid.key.personCountResults'}) YIELD connectionName, message, configuration RETURN 1",
"triggerType": "FIXED_RATE",
"fixedRate": "60s",
"neo4jCredentials": {
"uri": "bolt://ongdb:7688",
"username": "ongdb",
"password": "xxxxx"
}
}
}
}
Worker Policy Management
All Fuze components use the same API endpoints for basic interaction with policies. Please refer to the policy management section to save, retrieve, activate, deactivate, and/or delete Distribution policies.
Get Active Worker Policies
Returns a list of all active worker policies.
Base URL: /1.0/fuze/{{cluster-name}}/activeWorkerPolicies
Method: GET
Parameter | Description |
---|---|
cluster-name | Name of the cluster on which the policy is saved |
Request
curl --location --request GET "${API_BASE}/1.0/fuze/default/activeWorkerPolicies" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${BEARER_TOKEN}"
Response
{
"activePolicies": [
{
"metadata": {
"description": "Example Worker policy",
"displayName": "example-fuze-worker-policy",
"createdAt": "2021-03-10T14:18:32.109Z",
"updatedAt": "2021-03-10T14:18:32.11Z"
},
"status": "ACTIVE",
"workers": {
"periodicPeopleCount": {
"metadata": {
"name": "periodic-people-count",
"description": "Sends a count of Person nodes to a RabbitMQ queue every 60 seconds"
},
"cypher": "MATCH (p:Person) WITH COUNT(p) AS personCount CALL apoc.broker.send('rabbitmq', {personCount: personCount}, {queueName: 'com.graphgrid.queue.personCountResults', exchangeName: 'com.graphgrid.exchange', routingKey: 'com.graphgrid.key.personCountResults'}) YIELD connectionName, message, configuration RETURN 1",
"triggerType": "FIXED_RATE",
"listeningEndpoints": null,
"fixedDelay": "1000ms",
"fixedRate": "60s",
"route": null,
"neo4jCredentials": {
"uri": "bolt://ongdb:7688",
"username": "ongdb",
"password": "xxxxx"
}
}
},
"defaultNeo4jCredentials": null
}
]
}