Geohash NEO4j Graph with Spark

spark

(Haddadahmed1994) #1

I am using Neo4j/Cypher , my data is about 200GB , so i thought of scalable solution "spark".

Two solutions are available to make neo4j graphs with spark :

  1. Cypher for Apache Spark (CAPS)

  2. Neo4j-Spark-Connector

I used the first one ,CAPS .
The pre-processed CSV got two "geohash" informations : one for pickup and another for drop off for each row

what i want is to make a connected graph of geohash nodes.

CAPS allow only to make a graph by mapping nodes :
If node with id 0 is to be connected to node with id 1 you need to have a relationship with start id 0 and end id 1.

A very simple layout would be:

Nodes: (just id, no properties)

id
0
1
2

Relationships: (just the mandatory fields)

id | start | end
0  | 0     | 1
1  | 0     | 2

based on that i ve loaded my CSV into a Spark Dataframe , then i 've splitted the dataframe into :

  • Pickup dataframe

  • Drop off data-frame and

  • Trip data frame

I've generated an id for the two first data-frames, and created a mapping by adding columns to third data-frame
and this was the result :


A pair of nodes ( pickup-[Trip]->drop off) generated for each mapped rows.

The problem that i got is:

  1. the geohash of pickup or a drop off could be repeated for different trips=> i want to merge the creation of nodes

  2. a drop off for a trip could be a pickup for another trip so i need to merge this two nodes into one

i tried to change the graph but i was surprised that spark graphs are immutable=>you can't apply cypher queries to change it.

So is there a way to make a connected ,oriented and merged geohash graph with spark ?

This is my code :

package org.opencypher.spark.examples

import org.opencypher.spark.api.CAPSSession
import org.opencypher.spark.api.io.{CAPSNodeTable, CAPSRelationshipTable}
import org.opencypher.spark.util.ConsoleApp
import java.net.URI
import org.opencypher.okapi.api.io.conversion.NodeMapping
import org.opencypher.okapi.api.io.conversion.RelationshipMapping
import org.opencypher.spark.api.io.neo4j.Neo4jPropertyGraphDataSource
import org.opencypher.spark.api.io.neo4j.Neo4jConfig
import org.apache.spark.sql.functions._
import org.opencypher.okapi.api.graph.GraphName

object GreenCabsInputDataFrames extends ConsoleApp {

 //1) Create CAPS session and retrieve Spark session
    implicit val session: CAPSSession = CAPSSession.local()
    val spark = session.sparkSession

 //2) Load a csv into dataframe
 val df=spark.read.csv("C:\\Users\\Ahmed\\Desktop\\green_result\\green_data.csv").select("_c0","_c3","_c4","_c9","_c10","_c11","_c12","_c13","_c14","_c15","_c16","_c17","_c18","_c19","_c20","_c21","_c22","_c23")

//3) cache the dataframe
 val df1=df.cache()

 //4) subset the dataframe
 val pickup_dataframe=df1.select("_c0","_c3","_c4","_c9","_c10","_c11","_c12","_c13","_c14","_c15","_c16","_c17","_c18","_c19","_c20","_c21")
 val dropoff_dataframe=df1.select("_c22","_c23")

 //5) uncache the dataframe
 df1.unpersist()

 //6) add id columns to pickup , dropoff and trip dataframes
 val pickup_dataframe2= pickup_dataframe.withColumn("id1",monotonically_increasing_id+pickup_dataframe.count()).select("id1",pickup_dataframe.columns:_*)
 val dropoff_dataframe2= dropoff_dataframe.withColumn("id2",monotonically_increasing_id+pickup_dataframe2.count()+pickup_dataframe.count()).select("id2",dropoff_dataframe.columns:_*)
 //7) create the relationship "trip" is dataframe
 val trip_data_dataframe2=pickup_dataframe2.withColumn("idj",monotonically_increasing_id).join(dropoff_dataframe2.withColumn("idj",monotonically_increasing_id),"idj")

 //drop unnecessary columns
 val  pickup_dataframe3=pickup_dataframe2.drop("_c0","_c3","_c4","_c9","_c10","_c11","_c12","_c13","_c14","_c15","_c16","_c17","_c18","_c19")
 val trip_data_dataframe3=trip_data_dataframe2.drop("_c20","_c21","_c22","_c23")

  //8) reordering the columns of trip dataframe

 val trip_data_dataframe4=trip_data_dataframe3.select("idj", "id1", "id2", "_c0", "_c10", "_c11", "_c12", "_c13", "_c14", "_c15", "_c16", "_c17", "_c18", "_c19", "_c3", "_c4","_c9")

  //8.1)displaying dataframes in console
   pickup_dataframe3.show()
   dropoff_dataframe2.show()
   trip_data_dataframe4.show()
 //9) mapping the columns
 val Pickup_mapping=NodeMapping.withSourceIdKey("id1").withImpliedLabel("HashNode").withPropertyKeys("_c21","_c20")
 val Dropoff_mapping=NodeMapping.withSourceIdKey("id2").withImpliedLabel("HashNode").withPropertyKeys("_c23","_c22")
 val Trip_mapping=RelationshipMapping.withSourceIdKey("idj").withSourceStartNodeKey("id1").withSourceEndNodeKey("id2").withRelType("TRIP").withPropertyKeys("_c0","_c3","_c4","_c9","_c10","_c11","_c12","_c13","_c14","_c15","_c16","_c17","_c18","_c19")

 //10)  create tables
val Pickup_Table2 = CAPSNodeTable(Pickup_mapping, pickup_dataframe3)
val Dropoff_Table = CAPSNodeTable(Dropoff_mapping, dropoff_dataframe2)
val Trip_Table = CAPSRelationshipTable(Trip_mapping,trip_data_dataframe4)

//11) Create graph
val graph = session.readFrom(Pickup_Table2,Dropoff_Table, Trip_Table)

//12)  Connect to Neo4j
val boltWriteURI: URI = new URI("bolt://localhost:7687")
val neo4jWriteConfig: Neo4jConfig = new Neo4jConfig(boltWriteURI, "neo4j", Some("wakarimashta"), true)
val neo4jResult: Neo4jPropertyGraphDataSource = new Neo4jPropertyGraphDataSource(neo4jWriteConfig)(session)


 //13) Store graph in neo4j
 val neo4jResultName: GraphName = new GraphName("neo4jgraphs151")
 neo4jResult.store(neo4jResultName, graph)
 }

(M. David Allen) #3

CAPS stores nodes in a table for each label. To merge pickups and drop-offs, you should have a third "location" table. If you have location information for pickups and drop-offs, this then becomes another node table with a relationship table that you can compute. So in the end you want to have:

pickup (id)
-------
1
2

dropoff (id)
----------
3
4

location (id, name)
------
5,"1st St"
6,"2nd St"

pickup_location (id, pickup_id, location_id)
7,1,5

dropoff_location (id, dropoff_id, location_id)
8,3,6

At this point with CAPS only, I would probably not try to "merge" the pickups and drop-offs, but I would link them to the appropriate location, so that your graph would look like this:

(:Pickup)-[:HAS_LOCATION]->(:Location)<-[:HAS_LOCATION]-(:Dropoff)

(Michael Hunger) #4

You also said you tried it with the spark-connector.
That one should be able to MERGE if you chose the geo-hash-field as ID field.
So you would only have one Node type :Location with the geohash as ID-field.

And encode the type as the relationship from :Trip.

(:Location)<-[:PICKUP]-(:Trip)-[:DROPOFF]->(:Location)

HTH