Configuring and using SparkSession, SparkContext, DataFrameReader and DataStreamReader objects in a Python REPL or IDE with interactive code-completion and docstring support.
Note: An adaptation of this article was used in my answer to a StackOverflow question here.
Note: A companion JupyterLab notebook attachment appears at the end of this post.
After pip-installing the PySpark Python package, issuing the pyspark(1) command will invoke a Terminal REPL session using your Python interpreter (for example, using /usr/bin/python). However, this session will lack code-completion and accompanying docstring support, making it difficult to explore and interactively learn the Spark API. Matters worsen when using a proper Python IDE, where there’s no ability to even issue the pyspark(1) command. How does one, then, set up an environment such that a Terminal REPL or Python IDE has access to the full PySpark framework (including plugins), yet provides code-completion and accompanying docstring support? In other words, something similar to the screen-capture below, taking note of the beige code-completion pop-up near the bottom:
Interactive PySpark Session
We illustrate how to do this now.
Configure a SparkSession, SparkContext, DataFrameReader and DataStreamReader object. Assuming you’ve pip-installed the pyspark and ptpython Python packages, start an ad-hoc interactive session with code-completion and docstring support, by saving the following code block to, say, ./pyspark_init.py, then running it as follows:
nmvega@fedora$ ptpython -i ./pyspark_init.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
import sys, os from pyspark.conf import SparkConf from pyspark.sql import SparkSession, Catalog from pyspark.sql import DataFrame, DataFrameStatFunctions, DataFrameNaFunctions from pyspark.sql import functions as F # Spark Functions. USAGE: F.col(), F.countDistinct() from pyspark.sql import types as T # Spark Types. USAGE: T.Integer() from pyspark.sql.types import Row # ------------------------------------------ # Note: Row() in .../pyspark/sql/types.py # isn't included in '__all__' list(), so # we must import it by name here. # ------------------------------------------ PROJECT_BASE_DIR = '/home/nmvega/agile.ds.d/' SPARK_WAREHOUSE_DIR = PROJECT_BASE_DIR + 'stack.d/spark.d/spark-warehouse' SPARK_CONF_DIR = PROJECT_BASE_DIR + 'stack.d/spark.d/SPARK.CONF.DIR.d' SPARK_JARS_IVY = PROJECT_BASE_DIR + 'stack.d/spark.d/SPARK_JARS_IVY.d' # Cache downloaded JARS here. SPARK_JARS_REPOSITORIES = ','.join([ 'http://repo.hortonworks.com/content/repositories/releases', ]) os.environ.pop('SPARK_MASTER_HOST', None) # Since we're using pip/pySpark, these next three environment os.environ.pop('SPARK_MASTER_POST', None) # variables aren't needed; and we ensure pySpark doesn't os.environ.pop('SPARK_HOME', None) # get confused by them, should they be set. os.environ.pop('PYTHONSTARTUP', None) # Just in case pySpark 2.x attempts to read this. os.environ['PYSPARK_PYTHON'] = sys.executable # Make SPARK Workers/Executors use same Python as Master. os.environ['JAVA_HOME'] = '/usr/lib/jvm/jre' # Oracle JAVA for our pip/python3/pySpark 2.4 (not CDH's JRE). os.environ['SPARK_CONF_DIR'] = SPARK_CONF_DIR # Where spark-defaults.conf; spark-env.sh; log4j; etc. go. os.environ['HADOOP_CONF_DIR'] = SPARK_CONF_DIR # Where symlinks to CDH's core|hdfs|hive|yarn-site.xml should # be created. (We use same directory as above). Both of these # locations can also be set in '(SPARK_CONF_DIR)/spark-env.sh'. # ====================================================================== # Append CDH's Hadoop Native Library directory to LD_LIBRARY_PATH so # Spark/PySpark can find them when interacting with HIVE/HADOOP/HDFS. # Spark emits a warning when this path is absent, but it doesn't fail. # ====================================================================== HADOOP_NATIVE_LIBS = '/opt/cloudera/parcels/CDH/lib/hadoop/lib/native/' os.environ['LD_LIBRARY_PATH'] = os.environ.get('LD_LIBRARY_PATH', '') + ':' + HADOOP_NATIVE_LIBS # ====================================================================== # ====================================================================== # Maven Coordinates for JARs (and their dependencies) needed to plug # extra functionality into Spark 2.x. # -- spark-sql-kafka (Batch R/W from/to Kafka). # -- spark-streaming-kafka (Stream from/to Kafka). # -- Spark-Hbase-Connector from HortonWorks. # 1) http://repo.hortonworks.com/content/repositories/releases/ # Released JAR and pom.xml deps. # 2) https://github.com/hortonworks-spark/shc/ # Source for latest and pom.xml deps. # ====================================================================== SPARK_JARS_PACKAGES = ','.join([ 'org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0', 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0', 'com.hortonworks:shc-core:1.1.1-2.1-s_2.11', ]) # ====================================================================== spark_conf = SparkConf() spark_conf.setAll([ ('spark.master', 'local[*]'), ('spark.app.name', 'myApp'), ('spark.submit.deployMode', 'client'), ('spark.ui.showConsoleProgress', 'true'), ('spark.eventLog.enabled', 'false'), ('spark.logConf', 'false'), ('spark.driver.bindAddress', '0.0.0.0'), ('spark.driver.host', '0.0.0.0'), ('spark.jars.repositories', SPARK_JARS_REPOSITORIES), ('spark.jars.ivy', SPARK_JARS_IVY), ('spark.jars.packages', SPARK_JARS_PACKAGES), ('spark.sql.warehouse.dir', SPARK_WAREHOUSE_DIR), ('spark.sql.catalogImplementation', 'hive'), ('spark.sql.hive.metastore.version', '1.2.1'), #('spark.sql.hive.metastore.version', '2.1.1'), #('spark.sql.hive.metastore.jars', '/path/to/2.1.1/jars'), # core|hive|hdfs|yarn-site.xml K/V pairs can be converted into equivalent SparkConf() # K/V pairs by prefixing their Keys w/ 'spark.hadoop.'. Also, symlinks to those CDH # files can be created in SPARK_CONF_DIR for Spark clients to use as (overrridable) defaults. ('spark.hadoop.fs.defaultFS', 'hdfs://vps00:8020'), # From: 'core-site.xml' ('spark.hadoop.hive.metastore.uris', 'thrift://vps00:9083'), # From: 'hive-site.xml' ('spark.hadoop.hive.metastore.warehouse.dir', '/user/hive/warehouse'), # From: 'hive-site.xml' ]) spark_sesn = SparkSession.builder.config(conf=spark_conf).enableHiveSupport().getOrCreate() spark_ctxt = spark_sesn.sparkContext spark_reader = spark_sesn.read spark_streamReader = spark_sesn.readStream spark_ctxt.setLogLevel("WARN") # ====================================================================== # Create small arbitrary DataFrame (myDF) and GroupedData (myGD) # instances so we can inspect methods() and attributes inside them. # Also, register table name for SparkSQL queries ('myDF_as_SQLtable'). # ====================================================================== myDF = spark_sesn.createDataFrame([Row(0,1,2), Row(3,4,5), Row(6,7,8)]).toDF('col0', 'col1', 'col2') myGDF = myDF.select('*').groupBy('col0') myDF.createOrReplaceTempView('mydf_as_sqltable') # ====================================================================== #spark_sens.stop() # ====================================================================== |
Note that with very minor tweaks (mostly to the conf object k/v pairs), the above initialization code snippet was designed to be usable virtually anywhere. Thus to use it within a proper Python IDE, you can simply paste the above code snippet into a Python helper-module and import it (… pyspark(1) command not needed). \o/
With a code-completion and docstring enabled interactive PySpark session loaded, let’s now perform some basic Spark data engineering within it.
Configure the DataFrameReader object. DataFrameReader objects offer a method to load various kinds of serialized formats (e.g. csv, json, parquet, etc) into a DataFrame object, as well as a method to set options related to that format. The following example is for a CSV file format:
1 2 3 4 |
spark_reader.format('csv') # CSV format. spark_reader.option("inferSchema", "true") # Infer schema from CSV. spark_reader.option("header", "true") # CSV file has a header line. spark_reader.option("ignoreLeadingWhiteSpace", "true") # Guard against spaces surrounding the delimiter. |
For this example, we’ll load real-estate sales data from Zillow into a pseudo-randomly named temporary filesystem file, which we’ll delete at the end. The file contents will be the data source for our resulting DAGs.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
import requests CSV_URL = 'https://fla.st/2PsNXsY' # =============================================================================== # Generate a temporary pseudo-random filesystem file and write the CSV-structured # data to it. We'll use os.unlink() to delete it when we're done, but read the # caution/gotcha note below which explains when we can delete it. # =============================================================================== with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.csv', mode='r+', delete=False) as f: f.writelines(requests.get(CSV_URL).content.decode('utf-8')) CSV_FILE = f.name # =============================================================================== df = spark_reader.load(CSV_FILE) # Read CSV file and create DataFrame. df.show(5) # Preview with formatted output ... # =============================================================================== # We're tempted to delete the temporary file here, but can't! Why? Recall that # Spark Lazily evaluates statements. Thus from here-on, every Action (i.e. # outputs to console, persistence to external data-stores, or to Python-native # data-structures) will trigger a DAG-flow which includes re-reading from the source # data-store (i.e. our temporary file). We therefore cannot delete that file # until the very end of this program. Further, we'll need to re-run this cell if # we're re-running parts of this notebook and the file was already deleted. # =============================================================================== # os.unlink(CSV_FILE) # Run this statement ONLY at the end of this notebook/file! # =============================================================================== |
1 2 3 4 5 6 7 8 9 |
+-----+-----------------------+-------+--------+-------+-------+-----------------+ |Index| "Living Space (sq ft)"| "Beds"| "Baths"| "Zip"| "Year"| "List Price ($)"| +-----+-----------------------+-------+--------+-------+-------+-----------------+ | 1.0| 2222.0| 3.0| 3.5|32312.0| 1981.0| 250000.0| | 2.0| 1628.0| 3.0| 2.0|32308.0| 2009.0| 185000.0| | 3.0| 3824.0| 5.0| 4.0|32312.0| 1954.0| 399000.0| | 4.0| 1137.0| 3.0| 2.0|32309.0| 1993.0| 150000.0| | 5.0| 3560.0| 6.0| 4.0|32309.0| 1973.0| 315000.0| +-----+-----------------------+-------+--------+-------+-------+-----------------+ |
Perform Transformations and Actions on this DataFrame, using either the DataFrame API or SQL statements. The following statements return DataFrames with identical contents and execution plans:
1 2 3 |
df.createOrReplaceTempView("df_asTable") # Register a SparkSQL table as name: 'df_asTable' df_df = df.filter(df.Zip == 32312) # .filter() and .where() are aliases. df_sql = spark_sesn.sql(""" SELECT * FROM df_asTable WHERE Zip=32312 """) |
1 2 3 |
print(id(df_df) == id(df_sql)) # Different DataFrame/Python objects df_df.show() # Display rows obtained via the DataFrame API. df_sql.show() # Display rows obtained via SparkSQL statements. |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
False +-----+--------------------+----+-----+-----+--------------------+ |Index|Living Space (sq ft)|Beds|Baths| Zip|Year, List Price ($)| +-----+--------------------+----+-----+-----+--------------------+ | 1| 2222| 3| 3.5|32312| 1981| | 3| 3824| 5| 4.0|32312| 1954| | 6| 2893| 4| 3.0|32312| 1994| | 8| 2483| 4| 3.0|32312| 2016| | 9| 2400| 4| 4.0|32312| 2002| | 12| 3200| 5| 4.0|32312| 1964| +-----+--------------------+----+-----+-----+--------------------+ +-----+--------------------+----+-----+-----+--------------------+ |Index|Living Space (sq ft)|Beds|Baths| Zip|Year, List Price ($)| +-----+--------------------+----+-----+-----+--------------------+ | 1| 2222| 3| 3.5|32312| 1981| | 3| 3824| 5| 4.0|32312| 1954| | 6| 2893| 4| 3.0|32312| 1994| | 8| 2483| 4| 3.0|32312| 2016| | 9| 2400| 4| 4.0|32312| 2002| | 12| 3200| 5| 4.0|32312| 1964| +-----+--------------------+----+-----+-----+--------------------+ |
Finally, we can delete the temporary file and shutdown the SparkSession …
1 2 |
if os.path.exists(CSV_FILE): os.unlink(CSV_FILE) spark_sesn.stop() |
The end! =:)
ATTACHMENTS:
‣ Jupyter notebook of above session in HTML format