<H1>This notebook is a companion to the <a href="#blank">Building a Knowledge Graph in Neptune using Unstructured Text</a> blog post. </H1>

<p>In this notebook, we will copy output from Amazon Comprehend Events from an S3 bucket, transform it into bulk loader files in both Property Graph (Gremlin) and RDF formats, load it into a Neptune cluster, and perform some analysis queries on it. This notebook relies on environment variables injected by a CloudFormation script and therefore should only be used when installed by the CloudFormation script at <a href="https://aws-neptune-customer-samples.s3.amazonaws.com/knowledge-graph-unstructured/neptune-kg-unstructured-blog-stack.yml">https://aws-neptune-customer-samples.s3.amazonaws.com/knowledge-graph-unstructured/neptune-kg-unstructured-blog-stack.yml</a>.</p>

<H2>Section 1:  Transforming Amazon Comprehend Events API output into our graph models</H2>

<p> The cell below is reference code to transform the raw output from Comprehend into the model we describe in the blog.  The first cell creates the Property Graph version and writes the CSV files in the Property Graph CSV file format used by Neptune for bulk loading.  These files will be stored locally on the instance in the working directory as bulk-load-nodes.csv and bulk-load-edges.csv. After running this cell, you should see the word "Complete" in the cell output to indicate it was successful.</p>

In [None]:
import sys
import boto3
import json
import csv
import urllib

s3 = boto3.resource('s3')

class Vertex:
    def __init__(self):
        self.words = set()
        self.primaryName = ""
        self.entityType = ""
        self.id = ""
        
    def addWord(self, word):
        self.words.add(word)
    
    def setPrimaryName(self, name):
        self.primaryName = name
        
    def setEntityType(self, entityType):
        self.entityType = entityType
        
    def setId(self, rawIdString):
        self.id = "_".join(rawIdString.split()).lower() # replace all whitespace with underscores

    def getId(self):
        return self.id
    
    def toString(self):
        print("Entity " + self.getId() + "; type=" + self.entityType + "; words: " + str(self.words))
        

class Edge:
    def __init__(self, fromEntity, toEntity, edgeType):
        self.fromEntity = fromEntity
        self.toEntity = toEntity
        self.edgeType = edgeType
    
    def getId(self):
        return "_".join(("edge__" + self.fromEntity.getId() + "_" + self.toEntity.getId() + "_" + self.edgeType).split()).lower()  # replace all whitespace with underscores
    
    def toString(self):
        print("Edge " + self.fromEntity.getId() + " --" + self.edgeType + "-> " + self.toEntity.getId())

# We have stored the Comprehend output for our dataset into a public S3 bucket.
s3bucket = 'aws-neptune-customer-samples'
s3path = 'knowledge-graph-unstructured/sample_finance_dataset.txt.out'

obj = s3.Object(s3bucket, s3path)
# The Comprehend output is in JSON lines format
jsonlines = obj.get()['Body'].iter_lines()

nodeList = []
edgeList = []
nodeWordList = {}
# We will filter out names referring to each entity with less than 0.95 group certainty.
# You can change this threshold to be lower if you are tolerant of less certain values in your data set.
groupThreshold = 0.95

# each line of the Comprehend output in an independent JSON formatted record
for line in jsonlines:
    jsonl = json.loads(line)
    
    # Create a node for the source document
    document_id = jsonl["File"] + "_" + str(jsonl["Line"])
    documentNode = Vertex()
    documentNode.setEntityType("DOCUMENT")
    documentNode.setPrimaryName(document_id)
    documentNode.addWord("File " + jsonl["File"] + "; Line " + str(jsonl["Line"]) + ";")
    documentNode.setId("node__document_" + document_id)
    nodeList.append(documentNode)
    # Comprehend Events references entities it refers to by index, so we need to retain the ordered list of entities
    # within the document
    docEntityList = []
    
    for entity in jsonl["Entities"]:
        # convert each object under the "Entities" list into a Node
        theEntity = Vertex()
        theEntity.setPrimaryName(entity["Mentions"][0]["Text"])
        theEntity.setEntityType(entity["Mentions"][0]["Type"])
        theEntity.setId("node__" + entity["Mentions"][0]["Type"] + "_" + entity["Mentions"][0]["Text"])
        for mention in entity["Mentions"]:
            if (mention["GroupScore"] >= groupThreshold):
                theEntity.addWord(mention["Text"])

        docEntityList.append(theEntity)
        nodeList.append(theEntity)
        
    for event in jsonl["Events"]:
        #convert each object under the "Events" list to a Node
        theEntity = Vertex()
        theEntity.setEntityType(event["Type"])
        theEntity.setPrimaryName(event["Triggers"][0]["Text"])
        theEntity.setId("node__event_" + document_id + "_" + event["Type"] + "_" + event["Triggers"][0]["Text"] + str(event["Triggers"][0]["BeginOffset"]))
        for trigger in event["Triggers"]:
            theEntity.addWord(trigger["Text"])

        nodeList.append(theEntity)

        # add edges between the event node and the entity node, 
        # annotated with a label describing the Comprehend Event role assigned to the entity in the event.
        for argument in event["Arguments"]:
            edgeList.append(Edge(theEntity, docEntityList[argument["EntityIndex"]], argument["Role"]))
        
        # add an edge between the document and the event nodes
        edgeList.append(Edge(documentNode, theEntity, "EVENT"))
        
# write all of our nodes to a CSV file
with open('bulk-load-nodes.csv', 'w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(['~id','~label','primaryName','names'])
    for node in nodeList:
        for word in node.words:
            # there will be a row for each word assigned to the entity, 
            # but Neptune will aggregate them into single set of words on the node
            writer.writerow([node.id, node.entityType, node.primaryName, word])

# write all of our nodes to a CSV file
with open('bulk-load-edges.csv', 'w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(['~id','~from','~to','~label'])
    for edge in edgeList:
        writer.writerow([edge.getId(), edge.fromEntity.id, edge.toEntity.id, edge.edgeType])

print("Complete")

<p>Similar to the cell above, we will transform that same Comprehend Events output into RDF NTriple file format used by Neptune for bulk loading.  This file will be saved locally in the working directory as bulk-load.nt.  Now execute the cell below and again you should see the word "Complete" in the cell output to indicate it ran successfully </p>

In [None]:
import sys
import boto3
import json
import urllib
import rdflib
from rdflib.namespace import XSD

s3 = boto3.resource('s3')

# We have stored the Comprehend output for our dataset into a public S3 bucket.
s3bucket = 'aws-neptune-customer-samples'
s3path = 'knowledge-graph-unstructured/sample_finance_dataset.txt.out'

obj = s3.Object(s3bucket, s3path)
# The Comprehend output is in JSON lines format
jsonlines = obj.get()['Body'].iter_lines()

g = rdflib.Graph()

# We are using "example.org" as our namespace in this example.
ns_base = "http://example.org/"
entity_type_base = ns_base + "entities"
role_type_base = ns_base + "roles"

# These are constant URIs for various properties on our entities
rel_primaryName = rdflib.URIRef(ns_base + "relations/primaryName")
rel_name = rdflib.URIRef(ns_base + "relations/name")
rel_event = rdflib.URIRef(ns_base + "relations/event")
rel_filename = rdflib.URIRef(ns_base + "relations/filename")
rel_linenumber = rdflib.URIRef(ns_base + "relations/lineNumber")

# We will filter out names referring to each entity with less than 0.95 group certainty.
# You can change this threshold to be lower if you are tolerant of less certain values in your data set.
groupThreshold = 0.95

# each line of the Comprehend output in an independent JSON formatted record
for line in jsonlines:
    jsonl = json.loads(line)

    document_id = jsonl["File"] + "_" + str(jsonl["Line"])
    theDocumentUri = rdflib.URIRef(entity_type_base + "/document/" + urllib.parse.quote(document_id))
    g.add((theDocumentUri, rdflib.RDF.type, rdflib.URIRef(entity_type_base + "/document")))
    g.add((theDocumentUri, rel_filename, rdflib.Literal(jsonl["File"])))
    g.add((theDocumentUri, rel_linenumber, rdflib.Literal(jsonl["Line"])))
        
    # Comprehend Events references entities it refers to by index, so we need to retain the ordered list of entities
    # within the document    
    docEntityList = []

    for entity in jsonl["Entities"]:
        # convert each object under the "Entities" list into a Node
        entityName = entity["Mentions"][0]["Text"]
        entityType = entity["Mentions"][0]["Type"]
        theEntityUri = rdflib.URIRef(entity_type_base + "/" + entityType.lower() + "/" + urllib.parse.quote(entityName.lower()))
        g.add((theEntityUri, rdflib.RDF.type, rdflib.URIRef(entity_type_base + "/" + entityType.lower())))
        g.add((theEntityUri, rel_primaryName, rdflib.Literal(entityName)))
        
        for mention in entity["Mentions"]:
            if (mention["GroupScore"] >= groupThreshold):
                g.add((theEntityUri, rel_name, rdflib.Literal(mention["Text"])))

        docEntityList.append(theEntityUri)

    for event in jsonl["Events"]:
        # convert each object under the "Events" list into a Node
        entityType = event["Type"]
        entityPrimaryName = event["Triggers"][0]["Text"]
        theEventEntityUri = rdflib.URIRef(ns_base + "entities/" + document_id + "/" + entityType.lower() + "/" + urllib.parse.quote(entityPrimaryName.lower()) + "/" + str(event["Triggers"][0]["BeginOffset"]))
        g.add((theEventEntityUri, rdflib.RDF.type, rdflib.URIRef(entity_type_base + "/events/" + entityType.lower())))
        g.add((theEventEntityUri, rel_primaryName, rdflib.Literal(entityPrimaryName)))
        # add an edge between the document and the event nodes
        g.add((theDocumentUri, rel_event, theEventEntityUri))
        for trigger in event["Triggers"]:
            g.add((theEventEntityUri, rel_name, rdflib.Literal(trigger["Text"])))

        for argument in event["Arguments"]:
            # add relationships between the event node and the entity node, 
            # the relationship URI describing the Comprehend Event role assigned to the entity in the event.
            role = argument["Role"]
            roleUri = rdflib.URIRef(role_type_base + "/" + role.lower())
            g.add((theEventEntityUri, roleUri, docEntityList[argument["EntityIndex"]]))

# In RDF, we do not need to separate edges and nodes into separate files as everything is a triple
g.serialize(format="nt", destination="bulk-load.nt")
print("Complete")

<p>Now that we have created our bulk loader files, we need to copy this data into an S3 bucket that we created to use for loading into Neptune. We created several required pieces of infrastructure in addition to the bucket for you.  These items are:
    <ul><li>An IAM role attached to the Neptune cluster giving it permission to access this bucket.</li>
        <li>We added an IAM policy to this Notebook instance giving it permission to access this bucket.</li>
        <li>We created a S3 VPC endpoint and attached it to the Neptune VPC to give it access to S3</li>
        <li>We injected the name of the bucket we created into this notebook instance as an environment variable called "S3_WORKING_BUCKET". This value can also be found in the Outputs section of the CloudFormation stack.</li>
    </ul>

<p>This shell script will use the AWS Command Line Interface (CLI) to copy the existing files into the S3 bucket. It will also output two S3 paths that we will use in the next section where we call the Neptune Bulk Loader API using Notebook magics included in the Neptune Workbench.</p>

In [None]:
%%bash

aws s3 cp ./bulk-load-edges.csv s3://$S3_WORKING_BUCKET/pg/bulk-load-edges.csv
aws s3 cp ./bulk-load-nodes.csv s3://$S3_WORKING_BUCKET/pg/bulk-load-nodes.csv                                                                                        
aws s3 cp ./bulk-load.nt s3://$S3_WORKING_BUCKET/rdf/bulk-load.nt

echo "The path for the Property Graph bulk loading step is 's3://$S3_WORKING_BUCKET/pg/'"
echo "The path for the RDF bulk loading is 's3://$S3_WORKING_BUCKET/rdf/'"


<H2> Section 2:  Loading the Graph Data into Neptune </H2>

<p>The Neptune Workbench "load" magic gives us a form to bulk load data into our Neptune cluster.
(see <a href="https://docs.aws.amazon.com/neptune/latest/userguide/notebooks-magics.html#notebooks-line-magics-load">https://docs.aws.amazon.com/neptune/latest/userguide/notebooks-magics.html#notebooks-line-magics-load</a> for more information).  We need to run the cell below twice.  The first run will load the property graph files and then we will run it again to load the RDF file.  </p>

<p>To load the Property Graph, we will execute the cell below and that will present a form.  Use the following values in the form:<br />
    <b>S3 folder</b>: Find the path from the Property Graph bulk loading in the output of the previous step<br />
    <b>Format</b>: csv <br />
The remaining defaults can be left as shown.  Then click the "Submit" button.<br /></p>

<p>After the status turns to "LOAD_COMPLETED", we will execute the cell again to load RDF.  This time we will use these values: <br />
    <b>S3 folder</b>: Find the path from the RDF bulk loading in the output of the previous step <br />
    <b>Format</b>: ntriples <br />
The remaining defaults can be left as shown.  Again click the "Submit" button and wait for the "LOAD_COMPLETED" status to show.<br /></p>

In [None]:
%load

<H2>Section 3: Querying the Graph in Neptune</H2>

<p>Now that we have data loaded, we can run Gremlin, openCypher, and SPARQL queries analyze the property graph and RDF graph versions of the data.  Execute the cell below to run a Gremlin query that will show us our top 6 organizations in order of decreasing number of incoming edges (e.g., number of financial events they were associated with). <p>

In [None]:
%%gremlin

g.V().hasLabel('ORGANIZATION').
  order().
  by(inE().count(),desc).
  limit(6).
  project('primaryName','edgeCount','nodeId').
    by('primaryName').
    by(inE().count()).
    by(T.id)

Here is the same query in openCypher

In [None]:
%%oc

MATCH (org:ORGANIZATION)
WITH org, collect(org.primaryName) As primaryNames
MATCH (org)<-[e]-()
RETURN id(org), primaryNames, count(e) As edgeCount
ORDER BY edgeCount DESC
LIMIT 6

<p>This query will perform the same work in SPARQL as the Gremlin and openCypher queries above against our RDF graph.</p>

In [None]:
%%sparql

PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX entities: <http://example.org/entities/>
PREFIX rels: <http://example.org/relations/>

SELECT DISTINCT ?org (COUNT(*) as ?cnt) WHERE {
    ?org rdf:type entities:organization .
    ?event ?role ?org .
} GROUP BY ?org ?name
ORDER BY DESC(?cnt)
LIMIT 6

<p>In both cases, you notice that we actually have 3 different entities that were found that all refer to Amazon. "Amazon", "Amazon.com, Inc.", and "Amazon Web Services" as all listed as separate companies, but in the steps below we will query them all together.</p>

<H3>Visualizing the results</H3>

<p> In this next cell, we are utilizing the visualization capabilities of Neptune Workbench to show all of the events extracted by Comprehend that are linking Amazon and Whole Foods Market in our corpus. After running the cell, click the Graph tab to view the graph.</p>

In [None]:
%%gremlin -p v,ine,outv,oute,inv -l 30 -le 30

g.V(['node__organization_amazon','node__organization_amazon.com,_inc.','node__organization_amazon.com']).as('amazon').
    inE().as('roleEdge').
    outV().as('eventNode').
    outE().as('otherRoleEdge').
    inV().hasId('node__organization_whole_foods_market').as('otherOrg').
    path().by('primaryName').by().by().by().by('primaryName')

This is the equivalent in openCypher

In [None]:
my_node_labels = '{"ORGANIZATION":"primaryName"}'

In [None]:
%%oc -d $my_node_labels -l 30 -rel 30
MATCH p = (org)<--()-->(org2)
WHERE id(org) IN ['node__organization_amazon','node__organization_amazon.com,_inc.','node__organization_amazon.com']
                  AND id(org2) = 'node__organization_whole_foods_market'
RETURN p

<p>Here is the visualization of our graph in SPARQL</p>

In [None]:
%%sparql -le 30 -l 30

PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX entities: <http://example.org/entities/>
PREFIX rels: <http://example.org/relations/>
PREFIX org: <http://example.org/entities/organization/>

SELECT ?s ?p ?o WHERE {
    {
    SELECT ?s ?p ?o WHERE {
        VALUES ?s {<http://example.org/entities/organization/amazon> <http://example.org/entities/organization/amazon.com%2C%20inc.> <http://example.org/entities/organization/amazon.com>}
        VALUES ?end {<http://example.org/entities/organization/whole%20foods%20market>} .
        ?o ?p ?s .
        ?o rdf:type ?event_type .
        ?o ?role_2 ?end .    
      }
    } UNION {
    {
    SELECT ?s ?p ?o WHERE {
        VALUES ?start {<http://example.org/entities/organization/amazon> <http://example.org/entities/organization/amazon.com%2C%20inc.> <http://example.org/entities/organization/amazon.com>}
        VALUES ?o {<http://example.org/entities/organization/whole%20foods%20market>} .
        ?s ?role_1 ?start .
        ?s rdf:type ?event_type .
        ?s ?p ?o .
      }
    }
  }
}

<H3>Reporting on the results</H3>

<p>While it is nice to visualize the path through the data for exploration, for reporting purposes instead we will create a list of all of the relationships between Amazon and Whole Foods Markets with a count of the number of occurrences of each path. We will use the visualized graph above to confirm the counts are correct in our report.</p>

<p>Execute the Gremlin query below to create our report.</p>

In [None]:
%%gremlin

g.V(['node__organization_amazon','node__organization_amazon.com,_inc.','node__organization_amazon.com']).as('amazon').
    inE().as('roleEdge').
    outV().as('eventNode').
    outE().as('otherRoleEdge').
    inV().hasId('node__organization_whole_foods_market').as('otherOrg').
    groupCount().by(path().from('roleEdge').to('otherRoleEdge').by(label)).
    unfold()

<p>Here is the openCypher version</p>

In [None]:
%%oc
MATCH p = (org)<-[e1]-(event)-[e2]->(org2)
WHERE id(org) IN ['node__organization_amazon','node__organization_amazon.com,_inc.','node__organization_amazon.com']
                  AND id(org2) = 'node__organization_whole_foods_market'
RETURN type(e1), labels(event), type(e2), COUNT(*) as cnt
ORDER BY cnt DESC

<p>Here you can execute that same report in SPARQL format. The answers will be the same in either flavor.</p>

In [None]:
%%sparql

PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX entities: <http://example.org/entities/>
PREFIX rels: <http://example.org/relations/>
PREFIX org: <http://example.org/entities/organization/>

SELECT ?role_1 ?event_type ?role_2 (COUNT(*) as ?cnt) WHERE {
    VALUES ?start {<http://example.org/entities/organization/amazon> <http://example.org/entities/organization/amazon.com%2C%20inc.> <http://example.org/entities/organization/amazon.com>}
    ?event ?role_1 ?start .
    ?event rdf:type ?event_type .
    ?event ?role_2 <http://example.org/entities/organization/whole%20foods%20market> .
} GROUP BY ?role_1 ?event_type ?role_2
ORDER BY DESC(?cnt)

<p>You can see from the reports that of the 6 events, 3 are CORPORATE_ACQUISITION events where Amazon was in the INVESTOR role and Whole Foods Market in the INVESTEE role, 1 is CORPORATE_MERGER where Amazon and Whole Foods Market are both in the  PARTICIPANT role, and 2 are INVESTMENT_GENERAL events with Amazon in the INVESTOR role and Whole Foods Market in the INVESTEE role.  Again, you can confirm this with the visualized graph we saw previously (you can mouse over nodes with truncated text to see the full value).</p>

<H2>Cleaning Up</H2>
<p>For our last step, we will remove the bulk load files that we copied into the S3 bucket. This is not required if you will continue to use this infrastructure, but the CloudFormation script will only delete the working bucket if it is empty. Execute the cell below to call the AWS CLI to delete all of the files and folders in our bucket</p>

In [None]:
%%bash

aws s3 rm s3://$S3_WORKING_BUCKET/ --recursive

<H2>Conclusion</H2>
<p>We just saw how we can create our own Knowledge Graph from unstructured content using Amazon Comprehend Events and Amazon Neptune. You can use this as a base for your Knowledge Graph adding your own unstructured sources and even transforming your structured data into nodes and relationships linking it all together. Other similar use cases that this can be leveraged for are “Know your Customer” (KYC) and Customer 360/Identity Graphs. How can your business unlock the entities and relationships in your unstructured text and use it as a strategic advantage? The Comprehend and Neptune teams would love to hear about it and are available to discuss it further if you need us.</p> 