Configure SparkSession, SparkContext, DataFrameReader and DataStreamReader objects. With minor tweaks — mostly to the conf object k/v pairs — the following initialization code can be use virtually anywhere:

In [1]:
import multiprocessing
import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as sFn # Usage: sFn.col(), sFn.window()

cpu_count = multiprocessing.cpu_count()
spark_uri = os.environ.get('SPARK_MASTER', 'local[%d]' % cpu_count)

spark_conf = SparkConf()
spark_conf.setAll( [('spark.master', spark_uri),
                    ('spark.app.name', 'demoApp'),
                    ('spark.submit.deployMode', 'client'),
                    ('spark.ui.showConsoleProgress', 'true'),
                    ('spark.eventLog.enabled', 'false'),
                    ('spark.logConf', 'false')] )

spark_sesn         = SparkSession.builder.config(conf=spark_conf).getOrCreate()
spark_ctxt         = spark_sesn.sparkContext
spark_reader       = spark_sesn.read       # pyspark.sql.readwriter.DataFrameReader
spark_streamReader = spark_sesn.readStream # pyspark.sql.streaming.DataStreamReader

spark_ctxt.setLogLevel("INFO")

In [2]:
spark_reader.format('csv')                             # CSV format.
spark_reader.option("inferSchema", "true")             # Infer schema from CSV.
spark_reader.option("header", "true")                  # CSV file has a header line.
spark_reader.option("ignoreLeadingWhiteSpace", "true") # Guard against spaces surrounding the delimiter.
Out[2]:
<pyspark.sql.readwriter.DataFrameReader at 0x7fa6018ccfd0>

For this example, we'll load real-estate sales data from Zillow into a pseudo-randomly named temporary filesystem file, which we'll delete at the end. The file contents will be the data-source for our resulting DAGs.

In [3]:
import requests, tempfile
 
CSV_URL = 'https://fla.st/2PsNXsY'
# ===============================================================================
# Generate a temporary pseudo-random filesystem file and write the CSV-structured
# data to it. We'll use os.unlink() to delete it when we're done, but read the
# caution/gotcha note below which explains when we can delete it.
# ===============================================================================
with tempfile.NamedTemporaryFile(dir='/tmp', suffix='.csv', mode='r+', delete=False) as f:
    f.writelines(requests.get(CSV_URL).content.decode('utf-8')) 
    CSV_FILE = f.name
# ===============================================================================
 
df = spark_reader.load(CSV_FILE) # Read CSV file and create DataFrame.
df.show(5) # Preview with formatted output ...
 
# ===============================================================================
# We're tempted to delete the temporary file here, but can't! Why? Recall that
# Spark Lazily evaluates statements. Thus from here-on, every Action (i.e.
# outputs to console, persistence to external data-stores, or to Python-native
# data-structures) will trigger a DAG-flow which includes re-reading from the source
# data-store (i.e. our temporary file). We therefore cannot delete that file
# until the very end of this program. Further, we'll need to re-run this cell if
# we're re-running parts of this notebook and the file was already deleted. =:)
# ===============================================================================
# os.unlink(CSV_FILE) # Run this statement ONLY at the end of this notebook/file!
# ===============================================================================
+-----+--------------------+----+-----+-----+----+--------------+
|Index|Living Space (sq ft)|Beds|Baths|  Zip|Year|List Price ($)|
+-----+--------------------+----+-----+-----+----+--------------+
|    1|                2222|   3|  3.5|32312|1981|        250000|
|    2|                1628|   3|  2.0|32308|2009|        185000|
|    3|                3824|   5|  4.0|32312|1954|        399000|
|    4|                1137|   3|  2.0|32309|1993|        150000|
|    5|                3560|   6|  4.0|32309|1973|        315000|
+-----+--------------------+----+-----+-----+----+--------------+
only showing top 5 rows


Perform transformations and actions on this DataFrame, using either the DataFrame API or SparkSQL statements. The following statements return DataFrames with identical contents and execution plans:

In [4]:
df.createOrReplaceTempView("df_asTable") # Register a SparkSQL table as name: 'df_asTable'
df_df  = df.filter(df.Zip == 32312) # .filter() and .where() are aliases.
df_sql = spark_sesn.sql(""" SELECT * FROM df_asTable WHERE Zip=32312 """)
In [5]:
print(id(df_df) == id(df_sql)) # Different DataFrame/Python objects
df_df.show()  # Display rows obtained via the DataFrame API.
df_sql.show() # Display rows obtained via SparkSQL statements.
False
+-----+--------------------+----+-----+-----+----+--------------+
|Index|Living Space (sq ft)|Beds|Baths|  Zip|Year|List Price ($)|
+-----+--------------------+----+-----+-----+----+--------------+
|    1|                2222|   3|  3.5|32312|1981|        250000|
|    3|                3824|   5|  4.0|32312|1954|        399000|
|    6|                2893|   4|  3.0|32312|1994|        699000|
|    8|                2483|   4|  3.0|32312|2016|        399000|
|    9|                2400|   4|  4.0|32312|2002|        613000|
|   12|                3200|   5|  4.0|32312|1964|        465000|
+-----+--------------------+----+-----+-----+----+--------------+

+-----+--------------------+----+-----+-----+----+--------------+
|Index|Living Space (sq ft)|Beds|Baths|  Zip|Year|List Price ($)|
+-----+--------------------+----+-----+-----+----+--------------+
|    1|                2222|   3|  3.5|32312|1981|        250000|
|    3|                3824|   5|  4.0|32312|1954|        399000|
|    6|                2893|   4|  3.0|32312|1994|        699000|
|    8|                2483|   4|  3.0|32312|2016|        399000|
|    9|                2400|   4|  4.0|32312|2002|        613000|
|   12|                3200|   5|  4.0|32312|1964|        465000|
+-----+--------------------+----+-----+-----+----+--------------+


We can finally delate the temporary file:

In [6]:
if os.path.exists(CSV_FILE): os.unlink(CSV_FILE) # We can finally delete the temporary file.
spark_sesn.stop() # Shutdown Spark.

The end! =:)