Image for post
Image for post

hen we work on highly connected data sets such as social networks, world travel routes, material traceability for manufacturing & distribution industry, a robust graph database is must to store the data. On the other hand we need a big data processing tool to handle large datasets.

In this blog, we’ll show how to store a highly connected data set into the very well known graph database - Neo4J along with processing the data using Apache Spark.

Download Graph Data

We’ll use graph data about air-routes from here. air-routes-latest-nodes.csv contains details about airports, countries and continents. air-routes-latest-edges.csv contains relationship among airports (route), countries and continents (contains).

Understand the Data & Relationships

Before we push the data into a graph database, the first task will be to create the data models i.e. identifying the nodes, their properties, identifying the various relationships among the nodes and properties of the relationships.

Find below a simplified data modelling diagram of the air-route dataset we have downloaded.

Image for post
Image for post
----------------------------------------
Description of the node attributes
----------------------------------------
Airport:
type = Vertex types; 'airport' for airport vertices
code = The three letter IATA code like CCU or LHR
icao = The four letter ICAO code or none. Example KAUS or EGLL
desc = Text description / name of the airport
region = The geographical region like US-TX or GB-ENG
runways = The number of available runways
longest = Length of the longest runway in feet
elev = Elevation in feet above sea level
country = Two letter ISO country code such as US, FR or DE.
city = The name of the city the airport is in
lat = Latitude of the airport
lon = Longitude of the airport
Continent:
type = Vertex types; 'continent' for continents
code = The two letter continent code like AS or EU
desc = Name of the continent
Country:
type = Vertex types; 'country' for continents
code = The two letter country code like IN or UK
desc = Name of the country
-------------------------------------------
Description of the relationships attributes
-------------------------------------------
Route:
from = Origin airport id
to = Destination airport id
distance = Distance in miles
Contains:
from = Continent or country id the airport is in
to = airport id

Neo4J Setup

For this blog we have used the Neo4J desktop version. The download & installation details can be found here.

Create a new User

We can create a new user or use the already created user.

Image for post
Image for post

Create a new Database

We’ll create a new database to contain the air-route dataset.

Image for post
Image for post

Azure & Neo4J

There are different installation options available at Azure Marketplace. For deploying Neo4j Enterprise Edition, we can refer here.

Image for post
Image for post
Neo4J options at Azure Marketplace.

Import & Query of Graph Data

Once we segregate the data in appropriate domain data files. We can upload the CSV files by keeping inside the database import directory…

Image for post
Image for post

… and loading the files from the Neo4j user interface.

Adding Nodes

LOAD CSV WITH HEADERS FROM 'file:///continent.csv' AS row
CREATE (:Continent {
id: toInteger(row.id),
label: row.label,
type: row.type,
code: row.code,
description: row.description
});
LOAD CSV WITH HEADERS FROM 'file:///country.csv' AS row
CREATE (:Country {
id: toInteger(row.id),
label: row.label,
type: row.type,
code: row.code,
description: row.description
});
LOAD CSV WITH HEADERS FROM 'file:///airport.csv' AS row
CREATE (:Airport {
id: toInteger(row.id),
label: row.label,
type: row.type,
code: row.code,
icao: row.icao,
description: row.description,
region: row.region,
runways: toInteger(row.runways),
longest: toInteger(row.longest),
elev: toInteger(row.elev),
country: row.country,
city: row.city,
lat: toFloat(row.lat),
lon: toFloat(row.lon)
});

Once we load all of the nodes, we can visualize the data by executing,

CALL db.schema.visualization()
Image for post
Image for post

Adding Relationships

Our next task will be to add the appropriate relationships among the nodes (for full syntax refer here).

LOAD CSV WITH HEADERS FROM "file:///continent_contains_airport.csv" AS row
//look up the two nodes we want to connect up
MATCH (c:Continent {id: toInteger(row.from)}), (a:Airport {id: toInteger(row.to)})
//now create a relationship between them
CREATE (c)-[:CONTAINS]->(a);
Image for post
Image for post
After adding ‘Continent’ contains ‘Airport’ relationship.
LOAD CSV WITH HEADERS FROM "file:///country_contains_airport.csv" AS row
//look up the two nodes we want to connect up
MATCH (c1:Country {id: toInteger(row.from)}), (a:Airport {id: toInteger(row.to)})
//now create a relationship between them
CREATE (c1)-[:CONTAINS]->(a);
Image for post
Image for post
After adding ‘Country’ contains ‘Airport’ relationship.
LOAD CSV WITH HEADERS FROM "file:///airport_route_airport.csv" AS row
//look up the two nodes we want to connect up
MATCH (a1:Airport {id: toInteger(row.from)}), (a2:Airport {id: toInteger(row.to)})
//now create a relationship between them
CREATE (a1)-[:ROUTE { distance: toInteger(row.distance)}]->(a2);
MATCH p=()-->() RETURN p LIMIT 25;
Image for post
Image for post
After adding ‘Airport’ routes ‘Airport’ relationship.
Image for post
Image for post
The database summary will show the total number of nodes & relationships loaded.

Querying the data

Following are few useful queries, we may need these to analyse the graph data.

-- Total number of Countries
MATCH (n:Country) RETURN count(n) as count
-- 237
-- Total number of Airports
MATCH (n:Airport) RETURN count(n) as count

-- 3497
-- Longest runway
MATCH (n:Airport) RETURN MAX(n.longest)
-- 18045
-- Shortest runway
MATCH (n:Airport) RETURN MIN(n.longest)
-- 1300
-- Name of the Airport with shortest runway
MATCH (Airport) WHERE Airport.longest=1300 RETURN Airport
{
"country":"BQ",
"longest":1300,
"code":"SAB",
"city":"Saba",
"description":"Juancho E. Yrausquin Airport",
"lon":-63.22000122,
"label":"airport",
"type":"airport",
"elev":60,
"icao":"TNCS",
"id":2623,
"runways":1,
"region":"BQ-U-A",
"lat":17.64500046
}
-- Airports in-degree & out-degree
MATCH (a:Airport)
RETURN a.code AS code,
size((a)-[:ROUTE]->()) AS outdegree,
size((a)<-[:ROUTE]-()) AS indegree
╒══════╤═══════════╤══════════╕
│"code"│"outdegree"│"indegree"│
╞══════╪═══════════╪══════════╡
│"ATL" │242 │242 │
├──────┼───────────┼──────────┤
│"ANC" │40 │40 │
├──────┼───────────┼──────────┤
│"AUS" │80 │80 │
-- Airports total degree
MATCH (a:Airport)
RETURN a.code AS code,
size((a)-[:ROUTE]->()) + size((a)<-[:ROUTE]-()) AS degree
╒══════╤════════╕
│"code"│"degree"│
╞══════╪════════╡
│"ATL" │484 │
├──────┼────────┤
│"ANC" │80 │
├──────┼────────┤
│"AUS" │160 │
-- Top 3 Airports with max degree (routes in and out)MATCH (a:Airport)
RETURN a.code AS airport_code, a.description AS airport_name,
size((a)-[:ROUTE]->()) AS outdegree,
size((a)<-[:ROUTE]-()) AS indegree
ORDER BY outdegree DESC, indegree DESC
LIMIT 3
╒══════════════╤════════════════════╤═══════════╤══════════╕
│"airport_code"│"airport_name" │"outdegree"│"indegree"│
╞══════════════╪════════════════════╪═══════════╪══════════╡
│"FRA" │"Frankfurt am Main" │307 │307 │
├──────────────┼────────────────────┼───────────┼──────────┤
│"IST" │"Istanbul Internatio│307 │307 │
│ │nal Airport" │ │ │
├──────────────┼────────────────────┼───────────┼──────────┤
│"CDG" │"Paris Charles de Ga│294 │295 │
│ │ulle" │ │ │
└──────────────┴────────────────────┴───────────┴──────────┘
-- To delete all nodes with relationshipsMATCH (n) DETACH DELETE n

Now if we want to find the paths (minimum hops) in between London Heathrow Airport (code: LHR) and Netaji Subhash Chandra Bose International Airport, Kolkata (CCU), we can run the following query:

-- Finds all of the shortest paths between the two nodesMATCH paths = allShortestPaths((a:Airport {code: 'LHR'})-[:ROUTE*]-(b:Airport {code: 'CCU'}))
WITH paths, relationships(paths) AS rels
UNWIND rels AS rel
WITH paths, collect(rel.distance) AS air_distance_inbetween, sum(rel.distance) AS total_air_distance
RETURN DISTINCT [n IN nodes(paths) | n.code] AS origin_connecting_destination_airports, air_distance_inbetween, total_air_distance
ORDER BY total_air_distance;
╒═══════════════════╤═══════════════════╤═══════════════════╕
│"origin_connecting_│"air_distance_inbet│"total_air_distance│
│destination_airport│ween" │" │
│s" │ │ │
╞═══════════════════╪═══════════════════╪═══════════════════╡
│["LHR","DEL","CCU"]│[4180,815] │4995 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","DAC","CCU"]│[4977,149] │5126 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","AMD","CCU"]│[4264,1006] │5270 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","DXB","CCU"]│[3414,2090] │5504 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","BOM","CCU"]│[4479,1034] │5513 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","HYD","CCU"]│[4815,750] │5565 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","DOH","CCU"]│[3255,2324] │5579 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","BLR","CCU"]│[4996,960] │5956 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","MAA","CCU"]│[5113,860] │5973 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","BKK","CCU"]│[5947,1016] │6963 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","CAN","CCU"]│[5896,1578] │7474 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","HKG","CCU"]│[5980,1623] │7603 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","SGN","CCU"]│[6345,1452] │7797 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","KUL","CCU"]│[6585,1637] │8222 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","SIN","CCU"]│[6758,1803] │8561 │
├───────────────────┼───────────────────┼───────────────────┤
│["LHR","DPS","CCU"]│[7779,2822] │10601 │
└───────────────────┴───────────────────┴───────────────────┘

Neo4J & Apache Spark

Neo4J has a stable library for Apache Spark. It has created a builder API which supports (for full details, follow here):

  1. RDD
  2. DataFrame
  3. GraphFrame
  4. GraphX

We’ll use Azure Databricks as our data analytics platform.

Libraries required

We’ll get the latest Neo4J-Spark connector from the Maven coordinate. We’ll also install the Spark GraphFrame library.

Image for post
Image for post
The required libraries.

Connecting Databricks with Neo4J

As we’re using a desktop installation of Neo4J, we’ll use a tunneling software — ngrok to create a public connectivity endpoint. Once installed, we’ll execute:

ngrok tcp <Neo4j Bolt port>
e.g. ngrok tcp 7687

We’ll note down the public URL.

Image for post
Image for post

We’ll now configure the Databricks Spark Config settings with the following.

spark.neo4j.bolt.url bolt://0.tcp.ngrok.io:<port>
spark.neo4j.bolt.user <user name>
spark.neo4j.bolt.password <password>
Image for post
Image for post
We need to point to the Neo4J endpoint.

After configuring, we’ll just restart the Spark cluster and then it’ll be ready to go!

Data Load from Spark

Once the data are ingested into the data lake raw layer, we can use Apache Spark (and Databricks) for data quality validation, cleansing, transformation and using Spark-Neo4J API, load the data into the Neo4J in bulk.

%scala// Read records & create a Spark DataFrame
val continentDF = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/FileStore/tables/test_continent2.csv")
import org.neo4j.spark.dataframe.Neo4jDataFrame// Creating nodes using the Neo4jDataFrame
Neo4jDataFrame.createNodes(sc, continentDF, ("Continent", continentDF.columns.toSeq))

We may need to load the data from Neo4J into Spark and perform further analytics or run ML algorithms. We can push down the query condition using the Cypher Query Language for efficient execution at Neo4J end.

import org.neo4j.spark.Neo4jval neo = Neo4j(sc)// Load records via Cypher query
val df = neo.cypher("MATCH (a:Airport) RETURN a.code AS code, size((a)-[:ROUTE]->()) + size((a)<-[:ROUTE]-()) AS degree").partitions(4).batch(25).loadDataFrame
display (df)
Image for post
Image for post

We can also load the data into Spark GraphFrame if we want to play with at Spark end.

import org.neo4j.spark._ 
import org.graphframes._
val neo = Neo4j(sc)
val graphFrame = neo.pattern(("Airport", "code"),("ROUTE", "distance"),("Airport", "code")).partitions(6).batch(10000).loadGraphFrame
println (graphFrame.vertices.count)
println (graphFrame.edges.count)
Image for post
Image for post
Image for post
Image for post
val paths = graphFrame.bfs.fromExpr("value='LHR'").toExpr("value='CCU'").run()
display (paths)
Image for post
Image for post

Reference Architecture Diagram

A simplified architecture diagram using Neo4J may look as follows. This will vary depending on the project requirement, selected platform and services.

Image for post
Image for post
A simplified reference architecture diagram.

Conclusion

  • Neo4J is a stable graph database having robust query language and integration options.
  • It provides API to be connected with Apache Spark. While choosing a graph database for our requirement we should keep these in mind.
  • For graph data processing, we should first try to push down the query/execute the query at Neo4J end for higher performance.
  • Apache Spark can be used for running data analytics or machine learning algorithms on graph data.

Thanks for reading!! If you have enjoyed, Clap & Share it!! To see similar posts, follow me on Medium & LinkedIn.

Written by

Tech enthusiast, Azure Big Data Architect.