Neo4J with Apache Spark

When we work on highly connected data sets such as social networks, world travel routes, material traceability for manufacturing & distribution industry, a robust graph database is must to store the data. On the other hand we need a big data processing tool to handle large datasets.
In this blog, we’ll show how to store a highly connected data set into the very well known graph database - Neo4J along with processing the data using Apache Spark.
Download Graph Data
We’ll use graph data about air-routes from here. air-routes-latest-nodes.csv contains details about airports, countries and continents. air-routes-latest-edges.csv contains relationship among airports (route), countries and continents (contains).
Understand the Data & Relationships
Before we push the data into a graph database, the first task will be to create the data models i.e. identifying the nodes, their properties, identifying the various relationships among the nodes and properties of the relationships.
Find below a simplified data modelling diagram of the air-route dataset we have downloaded.

----------------------------------------
Description of the node attributes
----------------------------------------Airport:
type = Vertex types; 'airport' for airport vertices
code = The three letter IATA code like CCU or LHR
icao = The four letter ICAO code or none. Example KAUS or EGLL
desc = Text description / name of the airport
region = The geographical region like US-TX or GB-ENG
runways = The number of available runways
longest = Length of the longest runway in feet
elev = Elevation in feet above sea level
country = Two letter ISO country code such as US, FR or DE.
city = The name of the city the airport is in
lat = Latitude of the airport
lon = Longitude of the airportContinent:
type = Vertex types; 'continent' for continents
code = The two letter continent code like AS or EU
desc = Name of the continentCountry:
type = Vertex types; 'country' for continents
code = The two letter country code like IN or UK
desc = Name of the country-------------------------------------------
Description of the relationships attributes
-------------------------------------------Route:
from = Origin airport id
to = Destination airport id
distance = Distance in milesContains:
from = Continent or country id the airport is in
to = airport id
Neo4J Setup
For this blog we have used the Neo4J desktop version. The download & installation details can be found here.
Create a new User
We can create a new user or use the already created user.

Create a new Database
We’ll create a new database to contain the air-route dataset.

Azure & Neo4J
There are different installation options available at Azure Marketplace. For deploying Neo4j Enterprise Edition, we can refer here.

Import & Query of Graph Data
Once we segregate the data in appropriate domain data files. We can upload the CSV files by keeping inside the database import directory…

… and loading the files from the Neo4j user interface.
Adding Nodes
LOAD CSV WITH HEADERS FROM 'file:///continent.csv' AS row
CREATE (:Continent {
id: toInteger(row.id),
label: row.label,
type: row.type,
code: row.code,
description: row.description
});LOAD CSV WITH HEADERS FROM 'file:///country.csv' AS row
CREATE (:Country {
id: toInteger(row.id),
label: row.label,
type: row.type,
code: row.code,
description: row.description
});LOAD CSV WITH HEADERS FROM 'file:///airport.csv' AS row
CREATE (:Airport {
id: toInteger(row.id),
label: row.label,
type: row.type,
code: row.code,
icao: row.icao,
description: row.description,
region: row.region,
runways: toInteger(row.runways),
longest: toInteger(row.longest),
elev: toInteger(row.elev),
country: row.country,
city: row.city,
lat: toFloat(row.lat),
lon: toFloat(row.lon)
});
Once we load all of the nodes, we can visualize the data by executing,
CALL db.schema.visualization()

Adding Relationships
Our next task will be to add the appropriate relationships among the nodes (for full syntax refer here).
LOAD CSV WITH HEADERS FROM "file:///continent_contains_airport.csv" AS row
//look up the two nodes we want to connect up
MATCH (c:Continent {id: toInteger(row.from)}), (a:Airport {id: toInteger(row.to)})
//now create a relationship between them
CREATE (c)-[:CONTAINS]->(a);

LOAD CSV WITH HEADERS FROM "file:///country_contains_airport.csv" AS row
//look up the two nodes we want to connect up
MATCH (c1:Country {id: toInteger(row.from)}), (a:Airport {id: toInteger(row.to)})
//now create a relationship between them
CREATE (c1)-[:CONTAINS]->(a);

LOAD CSV WITH HEADERS FROM "file:///airport_route_airport.csv" AS row
//look up the two nodes we want to connect up
MATCH (a1:Airport {id: toInteger(row.from)}), (a2:Airport {id: toInteger(row.to)})
//now create a relationship between them
CREATE (a1)-[:ROUTE { distance: toInteger(row.distance)}]->(a2);MATCH p=()-->() RETURN p LIMIT 25;


Querying the data
Following are few useful queries, we may need these to analyse the graph data.
-- Total number of Countries
MATCH (n:Country) RETURN count(n) as count
-- 237-- Total number of Airports
MATCH (n:Airport) RETURN count(n) as count
-- 3497-- Longest runway
MATCH (n:Airport) RETURN MAX(n.longest)
-- 18045-- Shortest runway
MATCH (n:Airport) RETURN MIN(n.longest)
-- 1300-- Name of the Airport with shortest runway
MATCH (Airport) WHERE Airport.longest=1300 RETURN Airport{
"country":"BQ",
"longest":1300,
"code":"SAB",
"city":"Saba",
"description":"Juancho E. Yrausquin Airport",
"lon":-63.22000122,
"label":"airport",
"type":"airport",
"elev":60,
"icao":"TNCS",
"id":2623,
"runways":1,
"region":"BQ-U-A",
"lat":17.64500046
}-- Airports in-degree & out-degree
MATCH (a:Airport)
RETURN a.code AS code,
size((a)-[:ROUTE]->()) AS outdegree,
size((a)<-[:ROUTE]-()) AS indegree╒══════╤═══════════╤══════════╕
│"code"│"outdegree"│"indegree"│
╞══════╪═══════════╪══════════╡
│"ATL" │242 │242 │
├──────┼───────────┼──────────┤
│"ANC" │40 │40 │
├──────┼───────────┼──────────┤
│"AUS" │80 │80 │-- Airports total degree
MATCH (a:Airport)
RETURN a.code AS code,
size((a)-[:ROUTE]->()) + size((a)<-[:ROUTE]-()) AS degree╒══════╤════════╕
│"code"│"degree"│
╞══════╪════════╡
│"ATL" │484 │
├──────┼────────┤
│"ANC" │80 │
├──────┼────────┤
│"AUS" │160 │-- Top 3 Airports with max degree (routes in and out)MATCH (a:Airport)
RETURN a.code AS airport_code, a.description AS airport_name,
size((a)-[:ROUTE]->()) AS outdegree,
size((a)<-[:ROUTE]-()) AS indegree
ORDER BY outdegree DESC, indegree DESC
LIMIT 3╒══════════════╤════════════════════╤═══════════╤══════════╕
│"airport_code"│"airport_name" │"outdegree"│"indegree"│
╞══════════════╪════════════════════╪═══════════╪══════════╡
│"FRA" │"Frankfurt am Main" │307 │307 │
├──────────────┼────────────────────┼───────────┼──────────┤
│"IST" │"Istanbul Internatio│307 │307 │
│ │nal Airport" │ │ │
├──────────────┼────────────────────┼───────────┼──────────┤
│"CDG" │"Paris Charles de Ga│294 │295 │
│ │ulle" │ │ │
└──────────────┴────────────────────┴───────────┴──────────┘-- To delete all nodes with relationshipsMATCH (n) DETACH DELETE n
Now if we want to find the paths (minimum hops) in between London Heathrow Airport (code: LHR) and Netaji Subhash Chandra Bose International Airport, Kolkata (CCU), we can run the following query:
-- Finds all of the shortest paths between the two nodesMATCH paths = allShortestPaths((a:Airport {code: 'LHR'})-[:ROUTE*]-(b:Airport {code: 'CCU'}))
WITH paths, relationships(paths) AS rels
UNWIND rels AS rel
WITH paths, collect(rel.distance) AS air_distance_inbetween, sum(rel.distance) AS total_air_distance
RETURN DISTINCT [n IN nodes(paths) | n.code] AS origin_connecting_destination_airports, air_distance_inbetween, total_air_distance
ORDER BY total_air_distance;╒═══════════════════╤═══════════════════╤═══════════════════╕
│"origin_connecting_│"air_distance_inbet│"total_air_distance│
│destination_airport│ween" │" │
│s" │ │ │
╞═══════════════════╪═══════════════════╪═══════════════════╡
│["LHR","DEL","CCU"]│[4180,815] │4995 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","DAC","CCU"]│[4977,149] │5126 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","AMD","CCU"]│[4264,1006] │5270 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","DXB","CCU"]│[3414,2090] │5504 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","BOM","CCU"]│[4479,1034] │5513 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","HYD","CCU"]│[4815,750] │5565 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","DOH","CCU"]│[3255,2324] │5579 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","BLR","CCU"]│[4996,960] │5956 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","MAA","CCU"]│[5113,860] │5973 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","BKK","CCU"]│[5947,1016] │6963 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","CAN","CCU"]│[5896,1578] │7474 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","HKG","CCU"]│[5980,1623] │7603 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","SGN","CCU"]│[6345,1452] │7797 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","KUL","CCU"]│[6585,1637] │8222 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","SIN","CCU"]│[6758,1803] │8561 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","DPS","CCU"]│[7779,2822] │10601 │
└───────────────────┴───────────────────┴───────────────────┘
Neo4J & Apache Spark
Neo4J has a stable library for Apache Spark. It has created a builder API which supports (for full details, follow here):
- RDD
- DataFrame
- GraphFrame
- GraphX
We’ll use Azure Databricks as our data analytics platform.
Libraries required
We’ll get the latest Neo4J-Spark connector from the Maven coordinate. We’ll also install the Spark GraphFrame library.

Connecting Databricks with Neo4J
As we’re using a desktop installation of Neo4J, we’ll use a tunneling software — ngrok to create a public connectivity endpoint. Once installed, we’ll execute:
ngrok tcp <Neo4j Bolt port>
e.g. ngrok tcp 7687
We’ll note down the public URL.

We’ll now configure the Databricks Spark Config settings with the following.
spark.neo4j.bolt.url bolt://0.tcp.ngrok.io:<port>
spark.neo4j.bolt.user <user name>
spark.neo4j.bolt.password <password>

After configuring, we’ll just restart the Spark cluster and then it’ll be ready to go!
Data Load from Spark
Once the data are ingested into the data lake raw layer, we can use Apache Spark (and Databricks) for data quality validation, cleansing, transformation and using Spark-Neo4J API, load the data into the Neo4J in bulk.
%scala// Read records & create a Spark DataFrame
val continentDF = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/FileStore/tables/test_continent2.csv")import org.neo4j.spark.dataframe.Neo4jDataFrame// Creating nodes using the Neo4jDataFrame
Neo4jDataFrame.createNodes(sc, continentDF, ("Continent", continentDF.columns.toSeq))
We may need to load the data from Neo4J into Spark and perform further analytics or run ML algorithms. We can push down the query condition using the Cypher Query Language for efficient execution at Neo4J end.
import org.neo4j.spark.Neo4jval neo = Neo4j(sc)// Load records via Cypher query
val df = neo.cypher("MATCH (a:Airport) RETURN a.code AS code, size((a)-[:ROUTE]->()) + size((a)<-[:ROUTE]-()) AS degree").partitions(4).batch(25).loadDataFramedisplay (df)

We can also load the data into Spark GraphFrame if we want to play with at Spark end.
import org.neo4j.spark._
import org.graphframes._val neo = Neo4j(sc)
val graphFrame = neo.pattern(("Airport", "code"),("ROUTE", "distance"),("Airport", "code")).partitions(6).batch(10000).loadGraphFrameprintln (graphFrame.vertices.count)
println (graphFrame.edges.count)


val paths = graphFrame.bfs.fromExpr("value='LHR'").toExpr("value='CCU'").run()
display (paths)

Reference Architecture Diagram
A simplified architecture diagram using Neo4J may look as follows. This will vary depending on the project requirement, selected platform and services.

Conclusion
- Neo4J is a stable graph database having robust query language and integration options.
- It provides API to be connected with Apache Spark. While choosing a graph database for our requirement we should keep these in mind.
- For graph data processing, we should first try to push down the query/execute the query at Neo4J end for higher performance.
- Apache Spark can be used for running data analytics or machine learning algorithms on graph data.