I am trying to create knowledge graph from RDF triples produced from an Open IE tool. The issue is that it crashes for large number of requests. So I am trying to scale it. One solution I could think is using Apache Spark.
After creating individual graphs for each request, I would like to merge them into a sinlge large-scale knowledge graph. How is this possible? Can I use MapReduce for this? How will the pipeline look like?
My ultimate goal will be to query upon large-scale knowledge graphs produced. How can I achieve this? Do I need Neo4j for this?
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx.{Graph, Edge, VertexId}
import org.neo4j.driver.v1._
object KnowledgeGraphBuilder {
def main(args: Array[String]): Unit = {
// Initialize Spark
val conf = new SparkConf().setAppName("KnowledgeGraphBuilder").setMaster("local[*]")
val sc = new SparkContext(conf)
// Assume your OpenIE tool outputs triples in a format like "subject predicate object"
val input = sc.textFile("path/to/your/rdf_triples.txt")
// Parse triples into graph elements
val triples = input.map(line => {
val parts = line.split("\\s+")
if (parts.length == 3) (parts(0), parts(1), parts(2)) else ("", "", "")
}).filter(triple => triple._1.nonEmpty && triple._2.nonEmpty && triple._3.nonEmpty)
// Convert triples to vertices and edges in GraphX
val vertices = triples.flatMap { case (subject, predicate, obj) =>
List((subject, ""), (obj, ""))
}.distinct().zipWithIndex.map { case ((name, _), id) => (id, name) }
val edges = triples.map { case (subject, predicate, obj) =>
val srcId = vertices.lookup(subject).head
val dstId = vertices.lookup(obj).head
Edge(srcId, dstId, predicate)
}
// Create graph
val graph = Graph(vertices, edges)
// Here you might want to perform some graph operations like connecting subgraphs or cleaning
// Merge graphs if you have multiple sources or batches
// For simplicity, we're assuming one large graph here
val mergedGraph = graph // If you had multiple graphs, you'd merge them here
// Convert to Neo4j format for batch insertion
val neo4jCypherQueries = mergedGraph.triplets.map(triplet => {
s"CREATE (s:Node {name: '${triplet.srcAttr}'})-[:${triplet.attr}]->(o:Node {name: '${triplet.dstAttr}'})"
}).collect()
// Connect to Neo4j
val driver = GraphDatabase.driver("bolt://localhost:7687", AuthTokens.basic("neo4j", "password"))
val session = driver.session()
// Batch insert into Neo4j
neo4jCypherQueries.foreach(query => {
session.run(query)
})
session.close()
driver.close()
sc.stop()
}
}