Skip to main content
Version: 2.0

Fuze Distribution

The Distribution component of Fuze is a single listening endpoint that forwards its received messages to a number of other broker endpoints. This guide will help you gain a better understanding of what Fuze Distribution does and includes a tutorial that walks through how to set one up.

Use Case

Fuze listens for incoming messages in the form of transaction data sent via a trigger after changes are made to the database. In other words, a distributor will pick up any changes to the graph and send that information to other message brokers which will update the appropriate modules.

Setting up a distributor is important if you're indexing text data that is being processed by NLP. Since NLP data extraction takes a variable amount of time to complete, the Search index policy won't pick up on the changes to the graph ...unless a distributor is set up!

In this tutorial we'll walkthrough how to set up a Fuze distributor that listens for the NLP data extraction process to finish so that the Search indexes the complete data associated with NLP processed article nodes.

danger

If you haven't done the Search Basics tutorial yet turn back now! We'll only be going over how to set up an index policy very briefly. For a much more detailed walkthrough of setting up an index policy, and of Search in general, click here.

Prepare the Graph

To ensure your success make sure that continuousProcessing and continuousIndexing are running. To easily find out make this query in ONgDB: CALL apoc.trigger.list. You should see both in the name column of the results list. If you are unsure about what either of these continuous triggers are, we will briefly go over how to set them up; but it is recommended you look at the NLP Basics and Search Tutorial before continuing if you'd like a more in-depth understanding.

Set up Continuous Processing

Continuous Processing (CP) automatically starts the data extraction process for any new article nodes that are added to the graph.

To set CP up, simply use the Continuous Processing Trigger endpoint.

You will need the name of the document policy you created and the corpus grn. To easily find the corpus grn run this query in ONgDB:

MATCH (n:Corpus) RETURN n.grn
curl --location --request POST "${API_BASE}/1.0/nlp/default/continuousProcessingTrigger/distributor-document-policy" \
--header 'Content-Type: application/json' \
--header "Authorization: Bearer ${BEARER_TOKEN}" \
--data-raw '{
"corpusGrn": "grn:gg:corpus:xjSR1yr4u08ERPxxcBKpgQpeZF1wvEbm3JYb3UeAkUcV",
"processor": "bert"
}'

If you don't have any article data of your own, for this example you can use the file below. Copy and paste the contents of the "Example Movie Articles" file into ONgDB to create several article nodes based on movie summaries.

Example Movie Articles

Set up Continuous Indexing

Our next step is to set up continuous indexing for search. Continuous indexing will continuously index new nodes on the graph and store that data in elasticsearch.

Continuous indexing requires a message broker. In our example we're using the default broker of RabbitMQ.

For example, to set up continuous indexing with RabbitMQ, use this broker endpoint:

curl --location --request POST "${API_BASE}/1.0/search/default/rabbitmqContinuousIndexingTrigger/default-search-index-policy/defaultBroker" \
--header "Authorization: Bearer ${BEARER_TOKEN}"

Use a Distribution to Prevent Incomplete Indexing

These are the results from searching "Green Mile." Notice that the annotated_text data from NLP processing is missing:

  {
"report": {
"summary": {
"totalCount": 1,
"countDistribution": {
"article": 1
}
}
},
"data": {
"article": {
"documentCount": 1,
"maxScore": 0.9362215,
"documentData": [
{
"id": "grn:gg:article:69XSbXP8vhKkjnuL5P4gzvaOdmQ06nzV6fj1t3DaUUYx",
"score": 0.9362215,
"caption": "The Green Mile",
"source": {
"grn": "grn:gg:article:69XSbXP8vhKkjnuL5P4gzvaOdmQ06nzV6fj1t3DaUUYx",
"lastSearchIndexedAt": "2021-08-16T16:51:56+00:00",
"mentions": [],
"link": "https://www.imdb.com/title/tt0120689/?ref_=nv_sr_srsg_0",
"description": "The lives of guards on Death Row are affected by one of their charges: a black man accused of child murder and rape, yet who has a mysterious gift.",
"source": "IMDb",
"title": "The Green Mile",
"pubDate": null,
"formattedPublishDate": null,
"content": "The Green Mile is a 1999 American fantasy drama film written and directed by Frank Darabont and based on Stephen King's 1996 novel of the same name. It stars Tom Hanks as a death row corrections officer during the Great Depression who witnesses supernatural events that occur after an enigmatic inmate (Michael Clarke Duncan) is brought to his facility...",
"updatedAt": "2021-08-16T16:51:56+00:00"
}
}
]
}
},
"suggest": {},
"collapse": {},
"searchParams": {}
}

This occurs because while NLP is processing the article nodes, the continuousIndexing trigger has already indexed what is on the graph. This leads to incomplete indexing and search results. How do we fix this? With a distributor policy!

Set up a Distribution Policy

A Fuze Distribution policy defines the broker endpoint to listen to for TransactionData coming from the graph. This TransactionData contains the changes that are filtered via definitions in forwardingRules. The forwardingRules not only define where to send the data, but each one can also be customized using Geequel to transform the data however you want to filter or send to its broker endpoints.

A distribution policy has the following elements:

  • metadata - Information to uniquely identify the policy.
  • listeningBrokerEndpoint - Defines the message broker to use.
  • neo4jCredentials - The URI and credentials for a ONgDB instance.
  • forwardingRules - The broker endpoints where received messages should be sent.

Forwarding Rules

A List of all broker endpoints to forward the received message to and Geequel to filter and/or transform the message before sending it to each endpoint.

The distributor policy that we'll use for this tutorial will listen for messages on the RabbitMQ exchange/queue com.graphgrid.fuze.exchange/com.graphgrid.fuze queue. For more information about how to set up message forwarding with S3 see the steps listed here.

The messages in this case are coming from ONgDB in the form of transaction data sent with a trigger after changes are made to the database. This data is then run through 3 rules, each filtering for different changes in the graph (one for created relationships of a certain type, and the other two for changes to certain types of nodes). The filtered results are returned as a JSON object that is then sent to the RabbitMQ queue for Search.

Our forwardingRules filter for changes to the text nodes in the graph. As annotated_text nodes are created by NLP for each article, the distributor listens for those data processing additions and updates the article index stored in Elasticsearch.

See the full distributor policy request below. Notice that we have our listeningBrokerEndpoint set to RABBITMQ with the required exchange, queue, and routing key for our target environment that we set up here. Next we have our forwardingRules which filter what changes we want to capture and send to the messaging broker. Finally, we have our neo4jCredentials that connects our policy to our ONgDB uri along with the username and password to allow access.

curl --location --request POST "${API_BASE}/1.0/fuze/default/savePolicy/distributor-document-policy" \
--header "Authorization: Bearer ${BEARER_TOKEN}" \
--header 'Content-Type: application/json' \
--data-raw '{
"metadata": {
"description": "Send new NLP nodes and relationships to Search to be indexed.",
"displayName": "example-fuze-distributor-policy"
},
"listeningBrokerEndpoint": {
"broker": "RABBITMQ",
"exchange": "com.graphgrid.fuze.exchange",
"routingKey": "com.graphgrid.fuze.key",
"queue": "com.graphgrid.fuze.queue"
},
"forwardingRules": [
{
"cypher": "WITH {txData} AS txData WITH txData.createdRelationships AS rels WITH [k IN KEYS(rels) WHERE ANY(x IN rels[k] WHERE x.type = \"SUMMARY_SENTENCE\") | rels[k].uidOfStartNode] AS resultList UNWIND resultList as annotatedTextUid WITH annotatedTextUid, apoc.text.split(annotatedTextUid, \":`\", 3)[-1] AS tmp WITH annotatedTextUid, substring(tmp, 0, length(tmp)-1) AS annotatedTextGrn MATCH (a:Article)-[:HAS_ANNOTATED_TEXT]->(an:AnnotatedText {grn: annotatedTextGrn}) WITH COLLECT(DISTINCT \"``:``:`\" + a.grn + \"`\") AS articleGrns RETURN {numTries: 0, policyName: \"default-search-index-policy\", clusterName: \"default\", transactionId: timestamp(), indexName: \"article\", strategyUsed: \"defaultBroker\", idPartition: apoc.convert.toString(articleGrns)} AS results",
"resultKey": "results",
"multicast": {
"retry": 0,
"stopOnFailure": false,
"brokers": [
{
"broker": "RABBITMQ",
"exchange": "com.graphgrid.search.exchange",
"routingKey": "com.graphgrid.search.key",
"queue": "com.graphgrid.search.queue"
}
]
}
}
],
"neo4jCredentials": {
"uri": "bolt://localhost:7687",
"username": "ongdb",
"password": "admin"
}
}'

Once the distributor policy has been saved, we need to activate it. Do so by hitting the activate policy endpoint:

curl --location --request POST "${API_BASE}/1.0/fuze/default/activatePolicy/default-fuze-distribution-policy" \
--header "Authorization: Bearer ${BEARER_TOKEN}"

Complete Search Results

Now if we search for "Green Mile" again our distributor policy will have updated the article index with the annotated text data created by NLP processing. These results return much more data about the article including summaries, mentions, and keyphrases. This data was collected from the annotated_text data created during NLP data extraction. During the data extraction process, Search had already indexed the "Green Mile" article and stored its basic information that was on the graph. Our distributor policy picked up on the changes in the graph with the filters that were defined in the forwardingRules and sent that information to the search indexing endpoint.

{
"report": {
"summary": {
"totalCount": 10,
"countDistribution": {
"mentionoccurrence": 2,
"sentence": 2,
"mention": 2,
"annotatedtext": 1,
"ner_miscellaneous": 2,
"article": 1
}
}
},
"data": {
"mentionoccurrence": {
"documentCount": 2,
"maxScore": 3.7310498,
"documentData": [
{
"id": "grn:gg:mentionoccurrence:WokHjAfCQS1IwgY8B8UVY6RQOx3FMwPbm2J4C7LrFW5V",
"score": 3.7310498,
"caption": "grn:gg:mentionoccurrence:WokHjAfCQS1IwgY8B8UVY6RQOx3FMwPbm2J4C7LrFW5V",
"source": {
"createdAt": "2021-08-16T16:57:06+00:00",
"grn": "grn:gg:mentionoccurrence:WokHjAfCQS1IwgY8B8UVY6RQOx3FMwPbm2J4C7LrFW5V",
"lastSearchIndexedAt": "2021-08-16T16:57:08+00:00",
"pos": [
"NNP"
],
"value": "Green Mile",
"updatedAt": "2021-08-16T16:57:06+00:00"
}
},
{
"id": "grn:gg:mentionoccurrence:9DkrO3xve1LeZHBEXqnqW32uFFnZBIEMXVhV6iJNSXyg",
"score": 2.7614653,
"caption": "grn:gg:mentionoccurrence:9DkrO3xve1LeZHBEXqnqW32uFFnZBIEMXVhV6iJNSXyg",
"source": {
"createdAt": "2021-08-16T16:57:06+00:00",
"grn": "grn:gg:mentionoccurrence:9DkrO3xve1LeZHBEXqnqW32uFFnZBIEMXVhV6iJNSXyg",
"lastSearchIndexedAt": "2021-08-16T16:57:08+00:00",
"pos": [
"DT",
"NNP"
],
"value": "The Green Mile",
"updatedAt": "2021-08-16T16:57:06+00:00"
}
}
]
},
"sentence": {
"documentCount": 2,
"maxScore": 3.213169,
"documentData": [
{
"id": "grn:gg:sentence:oAsBJ63SvbNITlVpK6xYg9jD5ImDfYLv62sOP95ptUN4",
"score": 3.213169,
"caption": "grn:gg:sentence:oAsBJ63SvbNITlVpK6xYg9jD5ImDfYLv62sOP95ptUN4",
"source": {
"sentence": "The Green Mile is a 1999 American fantasy drama film written and directed by Frank Darabont and based on Stephen King's 1996 novel of the same name. ",
"createdAt": "2021-08-16T16:57:06+00:00",
"grn": "grn:gg:sentence:oAsBJ63SvbNITlVpK6xYg9jD5ImDfYLv62sOP95ptUN4",
"lastSearchIndexedAt": "2021-08-16T16:57:07+00:00",
"sentenceNumber": 0,
"updatedAt": "2021-08-16T16:57:06+00:00"
}
},
{
"id": "grn:gg:sentence:19si0mDo3zLAFPNG9VGg1r0zEhzuLOcNLDmMucybr3En",
"score": 2.8401928,
"caption": "grn:gg:sentence:19si0mDo3zLAFPNG9VGg1r0zEhzuLOcNLDmMucybr3En",
"source": {
"sentence": "His companion Elaine becomes concerned, and Paul explains to her that the film reminded him of events that he witnessed in 1935, when he was a correctional officer at Cold Mountain Penitentiary's death row, nicknamed 'The Green Mile'.\n",
"createdAt": "2021-08-16T16:57:06+00:00",
"grn": "grn:gg:sentence:19si0mDo3zLAFPNG9VGg1r0zEhzuLOcNLDmMucybr3En",
"lastSearchIndexedAt": "2021-08-16T16:57:07+00:00",
"sentenceNumber": 5,
"updatedAt": "2021-08-16T16:57:06+00:00"
}
}
]
},
"mention": {
"documentCount": 2,
"maxScore": 2.8005762,
"documentData": [
{
"id": "grn:gg:mention:85HXcq3f3fGsvhGrTpMZVjVP7sG9jLDcINCrEAvxvNnk",
"score": 2.8005762,
"caption": "grn:gg:mention:85HXcq3f3fGsvhGrTpMZVjVP7sG9jLDcINCrEAvxvNnk",
"source": {
"mentionId": "green mile_en",
"multiplicity": 1.0,
"createdAt": "2021-08-16T16:57:06+00:00",
"grn": "grn:gg:mention:85HXcq3f3fGsvhGrTpMZVjVP7sG9jLDcINCrEAvxvNnk",
"lastSearchIndexedAt": "2021-08-16T16:57:07+00:00",
"pos": [
"NNP"
],
"ne": [
"MISCELLANEOUS"
],
"language": "en",
"value": "Green Mile",
"updatedAt": "2021-08-16T16:57:06+00:00"
}
},
{
"id": "grn:gg:mention:OuyvzRe1w6odABtDIBX1wJRsyHKNKcxbyyMQjY3KSp8R",
"score": 2.3033235,
"caption": "grn:gg:mention:OuyvzRe1w6odABtDIBX1wJRsyHKNKcxbyyMQjY3KSp8R",
"source": {
"mentionId": "the green mile_en",
"multiplicity": 1.0,
"createdAt": "2021-08-16T16:57:06+00:00",
"grn": "grn:gg:mention:OuyvzRe1w6odABtDIBX1wJRsyHKNKcxbyyMQjY3KSp8R",
"lastSearchIndexedAt": "2021-08-16T16:57:07+00:00",
"pos": [
"DT",
"NNP"
],
"ne": [
"MISCELLANEOUS"
],
"language": "en",
"value": "The Green Mile",
"updatedAt": "2021-08-16T16:57:06+00:00"
}
}
]
},
"annotatedtext": {
"documentCount": 1,
"maxScore": 1.5937637,
"documentData": [
{
"id": "grn:gg:annotatedtext:EMx7JOH5REIxJEYxZ8GZkcNVDdKY9UbmcnZQF6696yPU",
"score": 1.5937637,
"caption": "grn:gg:annotatedtext:EMx7JOH5REIxJEYxZ8GZkcNVDdKY9UbmcnZQF6696yPU",
"source": {
"createdAt": "2021-08-16T16:57:06+00:00",
"grn": "grn:gg:annotatedtext:EMx7JOH5REIxJEYxZ8GZkcNVDdKY9UbmcnZQF6696yPU",
"lastSearchIndexedAt": "2021-08-16T16:57:06+00:00",
"text": "The Green Mile is a 1999 American fantasy drama film written and directed by Frank Darabont and based on Stephen King's 1996 novel of the same name. It stars Tom Hanks as a death row corrections officer during the Great Depression who witnesses supernatural events that occur after an enigmatic inmate (Michael Clarke Duncan) is brought to his facility...",
"updatedAt": "2021-08-16T16:57:06+00:00"
}
}
]
},
"ner_miscellaneous": {
"documentCount": 2,
"maxScore": 1.0630728,
"documentData": [
{
"id": "grn:gg:mention:85HXcq3f3fGsvhGrTpMZVjVP7sG9jLDcINCrEAvxvNnk",
"score": 1.0630728,
"caption": "grn:gg:mention:85HXcq3f3fGsvhGrTpMZVjVP7sG9jLDcINCrEAvxvNnk",
"source": {
"mentionId": "green mile_en",
"multiplicity": 1.0,
"createdAt": "2021-08-16T16:57:06+00:00",
"grn": "grn:gg:mention:85HXcq3f3fGsvhGrTpMZVjVP7sG9jLDcINCrEAvxvNnk",
"lastSearchIndexedAt": "2021-08-16T16:57:07+00:00",
"pos": [
"NNP"
],
"ne": [
"MISCELLANEOUS"
],
"language": "en",
"value": "Green Mile",
"updatedAt": "2021-08-16T16:57:06+00:00"
}
},
{
"id": "grn:gg:mention:OuyvzRe1w6odABtDIBX1wJRsyHKNKcxbyyMQjY3KSp8R",
"score": 0.88964105,
"caption": "grn:gg:mention:OuyvzRe1w6odABtDIBX1wJRsyHKNKcxbyyMQjY3KSp8R",
"source": {
"mentionId": "the green mile_en",
"multiplicity": 1.0,
"createdAt": "2021-08-16T16:57:06+00:00",
"grn": "grn:gg:mention:OuyvzRe1w6odABtDIBX1wJRsyHKNKcxbyyMQjY3KSp8R",
"lastSearchIndexedAt": "2021-08-16T16:57:07+00:00",
"pos": [
"DT",
"NNP"
],
"ne": [
"MISCELLANEOUS"
],
"language": "en",
"value": "The Green Mile",
"updatedAt": "2021-08-16T16:57:06+00:00"
}
}
]
},
"article": {
"documentCount": 1,
"maxScore": 0.9362215,
"documentData": [
{
"id": "grn:gg:article:69XSbXP8vhKkjnuL5P4gzvaOdmQ06nzV6fj1t3DaUUYx",
"score": 0.9362215,
"caption": "The Green Mile",
"source": {
"grn": "grn:gg:article:69XSbXP8vhKkjnuL5P4gzvaOdmQ06nzV6fj1t3DaUUYx",
"lastSearchIndexedAt": "2021-08-16T16:51:56+00:00",
"mentions": [],
"link": "https://www.imdb.com/title/tt0120689/?ref_=nv_sr_srsg_0",
"description": "The lives of guards on Death Row are affected by one of their charges: a black man accused of child murder and rape, yet who has a mysterious gift.",
"source": "IMDb",
"title": "The Green Mile",
"pubDate": null,
"formattedPublishDate": null,
"content": "The Green Mile is a 1999 American fantasy drama film written and directed by Frank Darabont and based on Stephen King's 1996 novel of the same name. It stars Tom Hanks as a death row corrections officer during the Great Depression who witnesses supernatural events that occur after an enigmatic inmate (Michael Clarke Duncan) is brought to his facility...",
"updatedAt": "2021-08-16T16:51:56+00:00"
}
}
]
}
},
"suggest": {},
"collapse": {},
"searchParams": {}
}