Recently I’ve had the opportunity to dig into Apache Spark, thanks to some training from Brian Bloechle from Cloudera.

What is spark?

Fast, flexible, and developer friendly, Apache Spark is the leading platform for large scale SQL, batch processing, stream processing, and machine learning.

Java, Scala, Python and R are first class citizens when its comes to consuming the various Spark API’s. I’ll cover PySpark in more detail.

Spark is an agnostic processing engine, that can target a number of cluster managers including Spark Standalone, Hadoop’s YARN, Apache Mesos and Kubernetes. In the context of Spark, some useful surrounding ecosystem to be aware of:

  • Apache Spark high-performance general-purpose data processing engine.
  • Apache Hadoop HDFS, MapReduce, and YARN.
  • Apache Parquet a fast framework/data model/programming language agnostic columnar storage format. This is the default storage format.
  • Apache Impala low latency, massively parallel SQL engine for Hadoop clusters.
  • Apache Hive provides a metastore service for projecting structure onto existing (unstructured) data in HDFS, enabling SQL and JDBC. Interestingly Hive’s metastore, has formed a defacto standard for representing table schema, and is being used by other structured systems including Impala.
  • Apache Hue Hadoop User Experience (HUE) is web UI for Hadoop, which includes a general purpose HDFS browser, and query editors for Hive and Impala.
  • Apache Arrow an in-memory data representation preventing serialization overhead.

Data Science 101

The below O’Reilly Python Data Science Handbook is great quality, and covers off essential Python libraries such as NumPy, Pandas, Matplotlib and Scikit-Learn. The machine learning chapter decomposes and explains classical algorithms (e.g. Linear Regression, k-Means Clustering, Guassian Mixture Models).

Spark 101

https://www.infoworld.com/article/3236869/analytics/what-is-apache-spark-the-big-data-analytics-platform-explained.html

Spark, given its in-memory performance and elegant programming model, has become the framework of choice when processing big data, overtaking Hadoop’s old MapReduce paradigm.

Spark Core Concepts

Fundamental to Spark is the Resilient Distributed Dataset or RDD; an abstraction that represents an immutable collection of elements partitioned across nodes in a cluster. Operations on RDDs are splittable across nodes, leading to fast and scalable parallel processing.

RDDs can be created from simple text files, SQL databases, NoSQL stores, AWS S3, and tons more. Spark Core API’s are built atop of the RDD, enabling traditional map-reduce functionality, but also providing built-in support for joining and shuffling data sets, filtering, sampling, and aggregation.

Spark runs in a distributed fashion by combining a driver core process that splits a Spark application into tasks and distributes them among many executor processes to perform the actual work. These executors can be scaled up and down as required for the application’s needs.

General Spark Doco: https://spark.apache.org/docs/latest/

RDD Resilient Distributed Datasets: https://spark.apache.org/docs/latest/rdd-programming-guide.html

Data frames, important concept to grasp. Are the unit of operation, and are conceptually equivalent to a table in a relational database. Due to functional underpinning are immutable, ephemeral and lazily evaluated.

Parquet - the default and preferred binary format for representing data. Storage of data in memory should be done contiguously if possible. Two options:

  • Row format: just store the complete record contiguously. Having all data, more applicable to ETL use-cases. Apache Avro is one representation for row format. Hive is more suited to row based representations.
  • Columnar format: particular columns in contiguous locations. Apache Parquet for columns, which is the default for Spark. Impala is well suited to Parquet.

Possible to convert a PySpark data frame to a SciPy including Pandas.

Its possible to tweak the local param below:

spark = SparkSession.builder \
  .master("local") \
  .appName("connect-local") \
  .getOrCreate()

Like this:

  .master("local[4]") // use four core
  .master("local[*]") // use all cores

Render a Pandas formatted data frame:

riders.limit(5).toPandas()

In CDSW, shift return lets you do multi-line editing.

Spark SQL Built-in Functions: https://spark.apache.org/docs/latest/api/sql/index.html

riders.createOrReplaceTempView("riders")
spark.sql("select count(id), count(distinct id) from riders").show()

MLLib = odl MLLIB = new SparkSql = old SparkSQL = new

Tip: some functions and methods work differently - example:

riders.select(“sex”).disinct().count() // returns 3

from pyspark.sql.functions import count, countDistinct riders.select(count(“id”), countDistinct(“sex”)).show() //returns 2 for gender

General advice, when working with big data, consider summarising/approximating the data, e.g. approxQuantile:

# Use the `approxQuantile` to get customized (approximate) quantiles:

riders.approxQuantile("home_lat", \
    probabilities=[0.0, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 1.0], \
    relativeError=0.1)

General Spark Doco: https://spark.apache.org/docs/latest/

RDD Resilient Distributed Datasets https://spark.apache.org/docs/latest/rdd-programming-guide.html

Data frames, important concept to grasp. Are the unit of operation, and are conceptually equivalent to a table in a relational database. Due to functional underpinning are immutable, ephemeral and lazily evaluated.

Parquet - the default and preferred binary format for representing data. Storage of data in memory should be done contiguously if possible. Two options:

  • Row format: just store the complete record contiguously. Having all data, more applicable to ETL use-cases. Apache Avro is one representation for row format. Hive is more suited to row based representations.
  • Columnar format: particular columns in contiguous locations. Apache Parquet for columns, which is the default for Spark. Impala is well suited to Parquet.

Possible to convert a PySpark data frame to a SciPy including Pandas.

Its possible to tweak the local param below:

spark = SparkSession.builder \
  .master("local") \
  .appName("connect-local") \
  .getOrCreate()

Like this:

.master("local[4]") // use four core
.master("local[*]") // use all cores

Render a Pandas formatted data frame:

riders.limit(5).toPandas()

Shift return lets you do multi-line editing.

Spark SQL Built-in Functions: https://spark.apache.org/docs/latest/api/sql/index.html

riders.createOrReplaceTempView("riders")
spark.sql("select count(id), count(distinct id) from riders").show()

Tip: some functions and methods work differently - example:

riders.select("sex").disinct().count() // returns 3
from pyspark.sql.functions import count, countDistinct
riders.select(count("id"), countDistinct("sex")).show()  //returns 2 for gender

Sample Data Analysis

riders (1 big file)

id,birth_date,start_date,first_name,last_name,sex,ethnicity,student,home_block,home_lat,home_lon,work_lat,work_lon
220200000001,1962-03-18,2017-01-01,Natalie,Prosser,female,White,0,380170405002188,46.816399,-96.874038,46.831427,-96.827786

rides (1 big file)

id,driver_id,rider_id,date_time,utc_offset,service,origin_lat,origin_lon,dest_lat,dest_lon,distance,duration,cancelled,star_rating
0000000001,220200000214,220200000084,2017-02-01 00:14,-6,,46.850956,-96.902849,46.860050,-96.825442,10123,729,0,5

ride_routes (1 file per day)

Tab delim, no header

0000000001	0000000001	46.849960	-96.901848	0	0
0000000001	0000000002	46.850060	-96.901558	25	6
0000000001	0000000003	46.850090	-96.901405	37	9

ride_reviews

0000000009	Dale is extremely cordial.
0000000037	Very junky car.
0000000071	most awful stench of all time! throw away your air freshener!
0000000083	No trouble of note.

drivers (1 big file)

id,birth_date,start_date,first_name,last_name,sex,ethnicity,student,home_block,home_lat,home_lon,vehicle_make,vehicle_model,vehicle_year,vehicle_color,vehicle_grand,vehicle_noir,vehicle_elite,rides,stars
220200000007,1996-12-21,2017-01-01,Adam,Abrahamson,male,White,1,270270204001008,46.868308,-96.786160,Chevrolet,Cruze,2013,gray,0,0,0,89,398

offices (1 file)

office_id,postal_code,city,country
1,93300,Paris,France
2,13006,Marseille,France

demographics (1 big file)

block_group	median_income	median_age
020130001001	53125	43.4
020130001002	63917	45.3
020130001003	60227	36.0
020160001001	57500	39.6
020160002001	88750	36.7

weather (1 file)

Station_ID,Date,Max_TemperatureF,Mean_TemperatureF,Min_TemperatureF,Max_Dew_PointF,MeanDew_PointF,Min_DewpointF,Max_Humidity,Mean_Humidity,Min_Humidity,Max_Sea_Level_PressureIn,Mean_Sea_Level_PressureIn,Min_Sea_Level_PressureIn,Max_VisibilityMiles,Mean_VisibilityMiles,Min_VisibilityMiles,Max_Wind_SpeedMPH,Mean_Wind_SpeedMPH,Max_Gust_SpeedMPH,PrecipitationIn,CloudCover,Events,WindDirDegrees
KFAR,2017-01-01,27,22,17,20,14,10,85,70,46,30.17,30.04,29.71,10,9,1,17,12,,0.01,7,Snow,33
KFAR,2017-01-02,21,17,12,18,16,10,92,87,73,30.15,30.07,29.97,10,4,0,20,12,,0.20,8,Fog-Snow,26
KFAR,2017-01-03,12,2,-8,7,-2,-15,84,77,58,30.37,30.21,30.02,10,5,2,25,20,34,0.00,6,Snow,314

data_scientists (1 file)

employee_id,first_name,last_name,office_id
63,Sophia,Laurent,1
88,Thomas,Dubois,1