CommunityData:Hyak Spark: Difference between revisions

From CommunityData
 
(53 intermediate revisions by 5 users not shown)
Line 8: Line 8:


This page will help you decide if you should use Spark on Hyak for your problem and provide instructions on how to get started.  
This page will help you decide if you should use Spark on Hyak for your problem and provide instructions on how to get started.  
== Video screen casts ==
Nate walked Mako and others through an introduction to using Spark on Hyak that includes both a description of how to use Spark for running jobs as well as how to import and update new data. It's long but it's worth a walk through. You probably want to do the rest of the setup things on this page first so you can follow along:
* [https://media.communitydata.science/~mako/cdsc_only/spark_on_hyak-01-GMT20230125-181815_Recording_1920x1080.mp4 Hyak on Spark Tutorial Part 1]
* [https://media.communitydata.science/~mako/cdsc_only/spark_on_hyak-02-GMT20230125-221015_Recording_1920x1080.mp4  Hyak on Spark Tutorial Part 2]
These will require access to Mako's <code>cdsc_only</code> video folder which requires a username and password. If you are in the CDSC you should be able to find those in the [https://lists.communitydata.science/hyperkitty/ email archives] for your collective after you log in using an account that has an email address on the list (you might have to create the account if you haven't before).


== Pros and Cons of Spark ==  
== Pros and Cons of Spark ==  
Line 24: Line 34:
# It takes several steps to get the cluster up and running.
# It takes several steps to get the cluster up and running.
# The programming paradigm is not super intuitive, especially if you are not familiar with SQL databases or lazy evaluation.
# The programming paradigm is not super intuitive, especially if you are not familiar with SQL databases or lazy evaluation.
# Effectively developing your spark code means getting it setup on your own laptop, which isn't trivial.
# Doing more advanced things requires programming in Scala.
# Doing more advanced things requires programming in Scala.


= Getting Started with Spark =  
= Getting Started with Spark =  


The first thing to do is to get a working stand alone Spark installation on your laptop. You should develop your Spark code on your laptop before running on hyak. To get spark working on your laptop you need to first install the Oracle Java Development Toolkit (Oracle JDK) and then install Spark.  
== Spark on Hyak ==
 
If you are already set up on Hyak following the instructions on [[CommunityData:Hyak]] then you should already have a working spark installation on Hyak.  Test this by running
  pyspark
 
from a hyak cluster node (directly on the login node will give you an insufficient memory error).
 
You should see this:
 
    Welcome to
          ____              __
        / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
      /__ / .__/\_,_/_/ /_/\_\  version 3.3.1
          /_/
 
If so then you are ready to start running Spark programs on a single node. If you don't see this, then you need to check your <code>$SPARK_HOME</code>, <code>$PATH</code>, <code>$JAVA_HOME</code>, and <code>$PYTHONPATH</code> environment variables.
 
If you are using the [[CommunityData:Hyak-Mox | cdsc mox setup]] then you should have a working spark configuration in your environment already.  Otherwise, you'll need to have the following in your .bashrc (remember to source .bashrc or re-login to load up these changes):
 
<syntaxhighlight lang="bash">
    export JAVA_HOME='/gscratch/comdata/local/open-jdk'
    export PATH="$JAVA_HOME/bin:$PATH"
    export SPARK_HOME='/gscratch/comdata/local/spark'
    export PATH="$SPARK_HOME/bin":$PATH
    export PYTHONPATH="$SPARK_HOME/python:"$PYTHONPATH
 
    export TMPDIR="/gscratch/comdata/users/$USER/tmpdir"
</syntaxhighlight>
 
You can also run spark programs on many nodes, but this requires additional steps. These are described below.
 
== Spark Walkthrough ==
 
Spark programming is somewhat different from normal python programming. This section will walk you through a script to help you learn how to work with Spark. You may find this script useful as a template for building variables on top of [[wikiq]] data.
 
This section presents a pyspark program that
# Reads wikiq tsvs
# Computes the nth edit for each editor
# For edits that were reverted, identify the edit that made the revert.
# Output tsvs with the new variables.
 
The script is on ikt here: /com/users/nathante/mediawiki_dump_tools/wikiq_users/wikiq_users_spark.py
 
<syntaxhighlight lang="python">
    #!/usr/bin/env python3                                                                                                                                                                                           
    import sys
    from pyspark import SparkConf
    from pyspark.sql import SparkSession, SQLContext
    from pyspark.sql import Window
    import pyspark.sql.functions as f
    from pyspark.sql import types
    import argparse
    import glob
    from os import mkdir
    from os import path
</syntaxhighlight>
 
This part imports some python utilities that we will use.  You can pretty safely treat the <code>SparkConf</code>, the <code>SparkSession</code> and the <code>SQLContext</code> imports as magic that creates a spark environment that supports working with Spark's SQL features.
<code>Window</code> is used to create Window functions. We will use a window function to count the nth edit made by each editor. <code>import pyspark.sql.functions as f</code> provides built in functions that can be applied to data in spark data frames. <code>types</code> are data types that we will use to specify the scheme for reading wikiq files.
 
<syntaxhighlight lang="python">
    def parse_args():
        parser = argparse.ArgumentParser(description='Create a dataset of edits by user.')
        parser.add_argument('-i', '--input-file', help='Tsv file of wiki edits. Supports wildcards ', required=True, type=str)
        parser.add_argument('-o', '--output-dir', help='Output directory', default='./output', type=str)
        parser.add_argument('--output-format', help = "[csv, parquet] format to output",type=str)
        parser.add_argument('--num-partitions', help = "number of partitions to output",type=int, default=1)
        args = parser.parse_args()
        return(args)
</syntaxhighlight>
 
Above is just a function to build a command line interface.
 
<syntaxhighlight lang="python">
    if __name__ == "__main__":
        conf = SparkConf().setAppName("Wiki Users Spark")
        spark = SparkSession.builder.getOrCreate()
</syntaxhighlight>
 
Now we are in the main function of the script. The above two lines complete setting up spark. If you are going to run this program on a multi-node cluster, then it would be nice to set the AppName to something friendly. This will be used by the job monitoring tools. 
 
<syntaxhighlight lang="python">
        args = parse_args()
        files = glob.glob(args.input_file)
        files = [path.abspath(p) for p in files]
</syntaxhighlight>
Spark is designed to read and write lists of files. The <code>args.input_file</code> uses <code>glob</code> to accept wildcards. The above lines build a list of files from the argument.
 
<syntaxhighlight lang="python">
    reader = spark.read
</syntaxhighlight>
 
This creates a reader object that can read files. We are starting to get down to business. Next we will specify the schema for the files that we will read in. This is important so that spark can run efficiently and operate on the correct data types.
 
<syntaxhighlight lang="python">
    # build a schema                                                                                                                                                                                             
    struct = types.StructType().add("anon",types.StringType(),True)
    struct = struct.add("articleid",types.LongType(),True)
    struct = struct.add("date_time",types.TimestampType(), True)
    struct = struct.add("deleted",types.BooleanType(), True)
    struct = struct.add("editor",types.StringType(),True)
    struct = struct.add("editor_id",types.LongType(), True)
    struct = struct.add("minor", types.BooleanType(), True)
    struct = struct.add("namespace", types.LongType(), True)
    struct = struct.add("revert", types.BooleanType(), True)
    struct = struct.add("reverteds", types.StringType(), True)
    struct = struct.add("revid", types.LongType(), True)
    struct = struct.add("sha1", types.StringType(), True)
    struct = struct.add("text_chars", types.LongType(), True)
    struct = struct.add("title",types.StringType(), True)
</syntaxhighlight>
 
This is a little bit tedious, but it is necessary for Spark to work effectively on tsv data. If you are reading binary format such as Parquet (which is recommended, and easy to create using Spark) then you can skip this.
 
<syntaxhighlight lang="python">
    df = reader.csv(files,
                    sep='\t',
                    inferSchema=False,
                    header=True,
                    mode="PERMISSIVE",
                    schema = struct)
</syntaxhighlight>
 
This reads the data into a Spark Dataframe. Spark Dataframes are more like SQL tables than they are like pandas DataFrames.  Spark Dataframes are pretty abstract and can live on memory or on disk. Operations on Spark Dataframes are lazily evaluated, Spark will not actually run computations on your data until it has to. Calling <code>df.show()</code> will print the dataframe and trigger execution. <code>mode="PERMISSIVE"</code> stops Spark from giving up if it hits malformed rows.
 
<syntaxhighlight lang="python">
    df = df.repartition(args.num_partitions)
</syntaxhighlight>
 
The first thing to do after reading the data is to <code>repartition</code> the data. This determines the number of files that spark will output. Choosing the right number of partitions isn't really an exact science. Having more partitions makes some operations more efficient and can make other operations slower. The rule of thumb is that the number of partitions increases linearly with the amount of data. 500 partitions seems pretty good for English Wikipedia. If you are interested [https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-partitions.html this page] is good. Now we are ready to build some variables. The first thing we are going to do is to create a new column <code>editor_id_or_ip</code>.
 
<syntaxhighlight lang="python">
    # replace na editor ids                                                                                                                                                                                       
    df = df.select('*',f.coalesce(df['editor_id'],df['editor']).alias('editor_id_or_ip'))
</syntaxhighlight>
 
The first argument to select <code>'*'</code> causes select to return all the columns.  Next we call the <code>coalesce</code> function which creates a new column with the value of <code>editor_id</code> if <code>editor_id</code> is not null and the value of <code>editor</code> if <code>editor_id</code> is null. The call to <code>alias</code> gives the new column a name.  If you are familiar with SQL programming, this might seem familiar. You could write it as <code>SELECT *, COALESCE(editor_id, editor) AS editor_id_or_ip</code>.
 
Next we are going to identify the edits that revert each edit.  <code>reverteds</code> lists the edits that the edit has reverted.
 
<syntaxhighlight lang="python">
    # assign which edit reverted what edit                                                                                                                                                                       
    reverteds_df = df.filter(~ df.reverteds.isNull()).select(['revid','reverteds'])
</syntaxhighlight>
 
This line creates a new Spark Dataframe out of the rows of the first dataframe that have a value for <code>reverteds</code> with the columns <code>revid</code> and <code>reverteds</code>.
 
<syntaxhighlight lang="python">
    reverteds_df = reverteds_df.select("*", f.split(reverteds_df.reverteds,',').alias("reverteds_new"))
</syntaxhighlight>
 
The above line converts <code>reverteds</code> from a string to an array.
 
<syntaxhighlight lang="python">
    reverteds_df = reverteds_df.drop("reverteds")
    reverteds_df = reverteds_df.withColumnRenamed("reverteds_new", "reverteds")
</syntaxhighlight>
 
The above two lines remove the old "reverteds" column, which was a string, and replaces it with the array column. This is required because unlike pandas, Spark dataframes do not have a column assignment syntax. 
 
<syntaxhighlight lang="python">
    reverteds_df = reverteds_df.select(reverteds_df.revid.alias('reverted_by'),
                                      f.explode(reverteds_df.reverteds).alias('reverted_id'))
</syntaxhighlight>
 
The most important part of the above is the function call to <code>explode</code>. Explode💥 unfolds the array so that we get one line for each element of the array. Now we can join <code>reverteds_df</code> with <code>df</code> to put the <code>reverted_by</code> column in <code>df</code>.
 
<syntaxhighlight lang="python">
    df = df.join(reverteds_df, df.revid == reverteds_df.reverted_id, how='left_outer')
    df.drop("reverted_id")
    del(reverteds_df)
</syntaxhighlight>
 
Join the two tables so that each revision that was reverted gets a value for <code>reverted_by</code>. There are many kinds of joins and there is some detail on this in the [[#Join help|Join help section of this page]]. The join is a <code>left_outer</code> join so we keep all the rows of <code>df</code> even the rows that don't have a value for <code>reverted_id</code> in <code>reverteds_df</code>. We remove the redundent <code>reverted_id</code> column and are don with building <code>reverted_by</code>. Next we add a column that counts the number of times a given editor has made a revert (this is called a ''cumulative count''). Since we aren't going to use <code>reverteds_df</code> again we can call <code>del(reveteds_df)</code>. This tells spark it is free to remove the object from storage and can improve performance.
 
<syntaxhighlight lang="python">
    # sort by datetime                                                                                                                                                                                           
    df = df.orderBy(df.date_time.asc())
</syntaxhighlight>
 
<code>orderBy</code> sorts the dataframe by date. 
 
<syntaxhighlight lang="python">
    win = Window.orderBy('date_time').partitionBy('editor_id_or_ip')
</syntaxhighlight>
 
The above defines a <code>WindowSpec</code>, which is a kind of object that can be used to define rolling aggregations. We are going to use the <code>rank</code> function to perform the cumulative count, and <code>rank</code> requires a <code>WindowSpec</code>. The WindowSpec that we made says that we are grouping at the level of <code>editor_id_or_ip</code> and that we want to operate on each row of each group in chronological order.
 
<syntaxhighlight lang="python">
    # count reverts                                                                                                                                                                                               
    reverts_df = df.filter(df.revert==True).select(['revid','editor_id_or_ip','date_time','revert'])
</syntaxhighlight>
 
The above creates a new table that only has reverts. 
 
<syntaxhighlight lang="python">
    reverts_df = reverts_df.withColumn('editor_nth_revert',f.rank().over(win))
</syntaxhighlight>
 
This applies the <code>rank</code> function over the window to perform the cumulative count of the reverts. The <code>withColumn</code> function adds a new column to the dataframe called <code>editor_nth_revert</code>.
 
<syntaxhighlight lang="python">
    df = df.join(reverts_df, ["revid",'editor_id_or_ip','date_time','revert'], how='left_outer')
    del(reverts_df)
</syntaxhighlight>
 
Above we perform the join to add the new column to <code>df</code>. We join on all of the columns <code> ["revid",'editor_id_or_ip','date_time','revert']</code> so that duplicate columns are not created in the <code>df</code>
 
<syntaxhighlight lang="python">
    # count edits                                                                                                                                                                                                 
    df = df.withColumn('year', f.year(df.date_time))
    df = df.withColumn('month',f.month(df.date_time))
</syntaxhighlight>
 
Using <code>withColumn</code> again to illustrate creating some calendar variables from the <code>date_time</code>.
 
<syntaxhighlight lang="python">
    df = df.withColumn('editor_nth_edit',f.rank().over(win))
</syntaxhighlight>
 
We can reuse the <code>WindowSpec</code> to get the cumulative count for all edits as opposed to all reverts.
 
<syntaxhighlight lang="python">
    # output                                                                                                                                                                                                     
    if not path.exists(args.output_dir):
        mkdir(args.output_dir)
    if args.output_format == "csv" or args.output_format == "tsv":
        df.write.csv(args.output_dir, sep='\t', mode='overwrite',header=True,timestampFormat="yyyy-MM-dd HH:mm:ss")
</syntaxhighlight>
 
Instead of writing our output to a single file, we output to a directory. Spark will write 1 file for each partition to the directory.
 
<syntaxhighlight lang="python">
            # format == "parquet"                                                                                                                                                                                 
    else:
        df.write.parquet(args.output_dir, mode='overwrite')
</syntaxhighlight>
 
It is also easy to write to parquet.
 
== Starting a Spark cluster with many nodes on Hyak ==
 
It is pretty easy to start up a multiple node cluster on Hyak.
 
If you have <code> /com/local/bin </code> in your <code> $PATH </code> then you should be able to run:
 
<syntaxhighlight lang="bash">
    get_spark_nodes.sh 4
</syntaxhighlight>
 
To checkout 4 nodes that can be used as a Spark cluster. The spark cluster will have 4 worker nodes, one of these is also the "master" node. When you run <code> get_spark_nodes.sh </code> you will be routed to the machine that will become the master. If you only want 2 nodes just do
 
<syntaxhighlight lang="bash">
    get_spark_nodes.sh 2
</syntaxhighlight>
 
After you get the nodes and have a shell on the master node run
 
<syntaxhighlight lang="bash">
  start_spark_cluster.sh
</syntaxhighlight>
 
This will setup the cluster.  Make sure you start up the cluster from the same session you used to get_spark_nodes -- otherwise, the startup script doesn't have access to the assigned node list and will fail. Take note of the node that is assigned to be the master, and use that information to set your $SPARK_MASTER environment variable, for example <code>export SPARK_MASTER="n0650"</code>. The program <code> spark-submit </code> submits your script to the running Spark cluster.
 
<syntaxhighlight lang="bash">
    spark-submit --master  spark://$SPARK_MASTER:18899 your_script.py [Arguments to your script here].
</syntaxhighlight>
 
For example, we can submit the script we used in the walkthrough as: 
 
<syntaxhighlight lang="bash">
    spark-submit --master  spark://$SPARK_MASTER:18899 wikiq_users_spark.py --output-format tsv  -i "/com/output/wikiq-enwiki-20180301/enwiki-20180301-pages-meta-history*.tsv" -o  "/com/output/wikiq-users-enwiki-20180301-tsv/" --num-partitions 500
</syntaxhighlight>
 
When you have a spark cluster running, it will serve some nice monitoring tools on ports 8989 and 4040 of the master.  You can build an ssh tunnel between your laptop and these nodes to monitor the progress of your spark jobs.
 
When you are done with the cluster, you should shut it down using the script in <code> $SPARK_HOME/sbin </code>
 
<syntaxhighlight lang="bash">
  $SPARK_HOME/sbin/stop-all.sh
</syntaxhighlight>
 
=== Monitoring the cluster ===
 
From a login node (hyak):
 
<syntaxhighlight lang="bash">
    ssh -L localhost:8989:localhost:8989 $SPARK_MASTER -N -f && ssh -L localhost:4040:localhost:4040 $SPARK_MASTER -n -F
</syntaxhighlight>
 
From your laptop:
 
<syntaxhighlight lang="bash">
    ssh -L localhost:8989:localhost:8989 hyak -N -f && ssh -L localhost:4040:localhost:4040 hyak -n -F
</syntaxhighlight>
 
Point your browser to localhost:8989 to see the cluster status and to localhost:4040 to monitor jobs.
 
== Setting up Spark on your laptop ==
You might want to have a working stand alone Spark installation on your laptop. You should develop your Spark code on your laptop before running on hyak. To get spark working on your laptop you need to first install the Oracle Java Development Toolkit (Oracle JDK) and then install Spark.  


=== Installing Java ===
=== Installing Java ===
Line 39: Line 349:
# Edit your environment variables (i.e. in your .bashrc) to  
# Edit your environment variables (i.e. in your .bashrc) to  


<syntaxhighlight lang="bash">
     JAVA_HOME=/home/you/Oracle_JDK/
     JAVA_HOME=/home/you/Oracle_JDK/
     PATH=$JAVA_HOME/bin:$PATH  
     PATH=$JAVA_HOME/bin:$PATH  
</syntaxhighlight>




Line 53: Line 365:
You should see this:  
You should see this:  


 
<syntaxhighlight lang="bash">
     Welcome to
     Welcome to
           ____              __
           ____              __
Line 60: Line 372:
       /__ / .__/\_,_/_/ /_/\_\  version 2.3.1
       /__ / .__/\_,_/_/ /_/\_\  version 2.3.1
           /_/
           /_/
</syntaxhighlight>




Line 65: Line 378:


Add to your .bashrc:
Add to your .bashrc:
<syntaxhighlight lang="bash">
     export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
     export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
</syntaxhighlight>
= Tips, Recipes, and Resources =
To gain access to various useful SparkContext functions, you need to instantiate a pointer to the context which encloses your session. It seems to be common for Spark users to call this pointer sc, e.g. after you do
<syntaxhighlight lang="python">
    spark = SparkSession.builder.getOrCreate()
</syntaxhighlight>
add a line like
<syntaxhighlight lang="python">
    sc = spark.sparkContext
</syntaxhighlight>
and then you can use sc to access the functions described here: [http://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.SparkContext].
==== To create an empty dataframe ====
One way to create an empty dataframe is to generate a schema as in the example script, and then pass the schema into the create method, with an empty RDD object as data.
<syntaxhighlight lang="python">
    myAwesomeDataset = spark.createDataFrame(data=sc.emptyRDD(), schema=myGroovySchema)
</syntaxhighlight>
==== Pyspark string slicing seems to be non-pythonic ====
# Strings begin at 1, not 0. [0:5] will yield the same results as [1:5].
# [x:y] will give you "give me y total characters, starting with the char in position x."
So, given a column in articleDF called timestamp, with contents like....
   
    20150701000000
You can access 2015 with
<syntaxhighlight lang="python">
    articleDF.timestamp[1:4]
</syntaxhighlight>
And to get 07:
<syntaxhighlight lang="python">
    articleDF.timestamp[5:2]
</syntaxhighlight>
==== When Reading In Multiple Files with Different Schemas ====
Make sure you re-instantiate your reader object, e.g.
<syntaxhighlight lang="python">
    sparkReader = spark.read
</syntaxhighlight>
when changing to a new file. The reader may cache the schema of the previous file and fail to detect the new schema. To make sure you have what you're expecting, try a
<syntaxhighlight lang="python">
    if DEBUG:
        yourDataset.show()
</syntaxhighlight>
This will get you the same behavior as pandas print(yourDataset.head()) -- 20 rows, nicely formatted in your stdout.
Note that calling show(), while filtered to show 20 rows, still causes all steps of the job to execute, and can take a long time.
==== Getting Help & Useful Links ====
Getting help with Spark in the usual fora such as Stack Exchange or even a straight-up Google search seems to be a less effective strategy for pyspark than it is for normal python queries and errors. Specifying pyspark in your search terms helps in getting only Python answers, but debugging an error may require looking at Java documentation, and some online recipe blogs speak of writing code "in Spark", so presumably the console language.
The apache spark site at [https://spark.apache.org/ https://spark.apache.org/] is useful but not all of the example code is localized to python.
==== Join help ====
There's some good info about joins here: [http://www.learnbymarketing.com/1100/pyspark-joins-by-example/ http://www.learnbymarketing.com/1100/pyspark-joins-by-example/]
=== Java Errors and Responses ===
When I got:
    spark java.io.IOException: No subfolder can be created in .
the University of Google told me it was a disk space issue. But shutting down the cluster and restarting it solved the problem -- maybe it hadn't shut down cleanly the last time someone used spark.
=== Slurm kills my job! Encountering Memory Limits ===
In theory, Spark enables you to run computations on data of any size without memory limitations. In practice, memory management issues occur. We are trying to understand these issues and to learn how to write Spark scripts that don't overuse memory.
==== Things to try if you run out of memory ====
# The 'out of memory' may be ephemeral or due to memory management issues in some layer other than your code -- try starting up a new cluster and running the same job unchanged.
# Repartition your data: increasing the number of partitions should make it easier for the Spark scheduler to avoid exceeding its memory limits.
# Increase the number of nodes. This can solve the problem essentially by giving Spark more Ram to work with.
# Be careful moving data out of distributed Spark objects into normal python objects. Work your way up from a small sample to a larger sample.
# You might try tweaking memory management options in <code>$SPARK_HOME/conf/spark-env.sh</code> and <$SPARK_HOME/conf/spark-defaults.conf</code>. Decreasing the number of executors, and the total memory allocated to executors should make Spark more resilient at the cost of performance.
=== Errors While Starting Cluster ===
Sometimes I get errors like this:
    n0650: failed to launch: nice -n 0 /com/local/spark/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://n0649:18899
Usually it seems to happen if I relinquish my spark cluster (whether I use the kill script or not) and then immediately restart one. The error goes away if I shut down and wait a minute or two before re-launching; my assumption is that there's some hygienic work being done behind the scenes that the scheduler doesn't know about and I need to let that finish.
And sometimes I get errors like this:
scontrol: error: host list is empty
when trying to launch spark. This means I'm in a session that doesn't know what nodes are assigned to the spark cluster. This will launch a dysfunctional cluster. Stop-all and then start_spark_cluster from the same session you landed in when you ran get_spark_nodes.
When I get errors about full logs, I go in and clean up the temporary files folder it refers to.

Latest revision as of 23:11, 12 July 2024

Apache Spark is a powerful system for writing programs that deal with large datasets. The most likely reason for using Spark on Hyak is that you run into memory limitations when building variables. For example suppose you want to compute:

  • The number of prior edits a Wikipedia editor has made, for every editor and every edit.
  • The number of views for every Wikipedia page for every month and every edit.
  • The number of times every Reddit user has commented on every thread on every subreddit for every week.

You might try writing a program to build these variables using a data science tool like pandas in Python or data.table, or plyr in R. These common data science tools are powerful, expressive, and fast, but do not work when data does not fit in memory. When a table does not fit in memory, but the computation you want to do only requires operating on one row at a time (such as in a simple transformation or aggregation), you can often work around this limitation by writing a simple custom program that operates in a streaming fashion. However, when computation cannot be done one row at a time, such as in a sort, group by, or join a streaming solution will not work. In this case your options are limited. One option is writing bespoke code to perform the required operations and building variables. However, this can be technically challenging and time consuming work. Moreover, your eventual solution is likely to be relatively slow and difficult to extend or maintain compared to a solution build using Spark. A number of us (Nate, Jeremy, Kaylea) have all at some point written bespoke code for computing user-level variables on Wikipedia data. The infamous "million file problem" is a result from abusing the filesystem to perform a massive group by.

This page will help you decide if you should use Spark on Hyak for your problem and provide instructions on how to get started.


Video screen casts[edit]

Nate walked Mako and others through an introduction to using Spark on Hyak that includes both a description of how to use Spark for running jobs as well as how to import and update new data. It's long but it's worth a walk through. You probably want to do the rest of the setup things on this page first so you can follow along:

These will require access to Mako's cdsc_only video folder which requires a username and password. If you are in the CDSC you should be able to find those in the email archives for your collective after you log in using an account that has an email address on the list (you might have to create the account if you haven't before).

Pros and Cons of Spark[edit]

The main advantages of Spark on Hyak:

  1. Work with "big data" without ever running out of memory.
  2. You get very good parallelism for free.
  3. Distribute computational work across many hyak nodes so your programs run faster.
  4. Common database operations (select, join, groupby, filter) are pretty easy.
  5. Spark supports common statistical and analytical tasks (stratified sampling, summary and pairwise statistics, common and simple models).
  6. Spark is a trendy technology that lots of people know or want to learn.

The main disadvantages of Spark are

  1. It takes several steps to get the cluster up and running.
  2. The programming paradigm is not super intuitive, especially if you are not familiar with SQL databases or lazy evaluation.
  3. Doing more advanced things requires programming in Scala.

Getting Started with Spark[edit]

Spark on Hyak[edit]

If you are already set up on Hyak following the instructions on CommunityData:Hyak then you should already have a working spark installation on Hyak. Test this by running

  pyspark

from a hyak cluster node (directly on the login node will give you an insufficient memory error).

You should see this:

   Welcome to
         ____              __
        / __/__  ___ _____/ /__
       _\ \/ _ \/ _ `/ __/  '_/
      /__ / .__/\_,_/_/ /_/\_\   version 3.3.1
         /_/

If so then you are ready to start running Spark programs on a single node. If you don't see this, then you need to check your $SPARK_HOME, $PATH, $JAVA_HOME, and $PYTHONPATH environment variables.

If you are using the cdsc mox setup then you should have a working spark configuration in your environment already. Otherwise, you'll need to have the following in your .bashrc (remember to source .bashrc or re-login to load up these changes):

    export JAVA_HOME='/gscratch/comdata/local/open-jdk'
    export PATH="$JAVA_HOME/bin:$PATH"
    export SPARK_HOME='/gscratch/comdata/local/spark'
    export PATH="$SPARK_HOME/bin":$PATH
    export PYTHONPATH="$SPARK_HOME/python:"$PYTHONPATH

    export TMPDIR="/gscratch/comdata/users/$USER/tmpdir"

You can also run spark programs on many nodes, but this requires additional steps. These are described below.

Spark Walkthrough[edit]

Spark programming is somewhat different from normal python programming. This section will walk you through a script to help you learn how to work with Spark. You may find this script useful as a template for building variables on top of wikiq data.

This section presents a pyspark program that

  1. Reads wikiq tsvs
  2. Computes the nth edit for each editor
  3. For edits that were reverted, identify the edit that made the revert.
  4. Output tsvs with the new variables.

The script is on ikt here: /com/users/nathante/mediawiki_dump_tools/wikiq_users/wikiq_users_spark.py

    #!/usr/bin/env python3                                                                                                                                                                                             
    import sys
    from pyspark import SparkConf
    from pyspark.sql import SparkSession, SQLContext
    from pyspark.sql import Window
    import pyspark.sql.functions as f
    from pyspark.sql import types
    import argparse
    import glob
    from os import mkdir
    from os import path

This part imports some python utilities that we will use. You can pretty safely treat the SparkConf, the SparkSession and the SQLContext imports as magic that creates a spark environment that supports working with Spark's SQL features. Window is used to create Window functions. We will use a window function to count the nth edit made by each editor. import pyspark.sql.functions as f provides built in functions that can be applied to data in spark data frames. types are data types that we will use to specify the scheme for reading wikiq files.

    def parse_args():
        parser = argparse.ArgumentParser(description='Create a dataset of edits by user.')
        parser.add_argument('-i', '--input-file', help='Tsv file of wiki edits. Supports wildcards ', required=True, type=str)
        parser.add_argument('-o', '--output-dir', help='Output directory', default='./output', type=str)
        parser.add_argument('--output-format', help = "[csv, parquet] format to output",type=str)
        parser.add_argument('--num-partitions', help = "number of partitions to output",type=int, default=1)
        args = parser.parse_args()
        return(args)

Above is just a function to build a command line interface.

    if __name__ == "__main__":
        conf = SparkConf().setAppName("Wiki Users Spark")
        spark = SparkSession.builder.getOrCreate()

Now we are in the main function of the script. The above two lines complete setting up spark. If you are going to run this program on a multi-node cluster, then it would be nice to set the AppName to something friendly. This will be used by the job monitoring tools.

        args = parse_args()
        files = glob.glob(args.input_file)
        files = [path.abspath(p) for p in files]

Spark is designed to read and write lists of files. The args.input_file uses glob to accept wildcards. The above lines build a list of files from the argument.

    reader = spark.read

This creates a reader object that can read files. We are starting to get down to business. Next we will specify the schema for the files that we will read in. This is important so that spark can run efficiently and operate on the correct data types.

    # build a schema                                                                                                                                                                                               
    struct = types.StructType().add("anon",types.StringType(),True)
    struct = struct.add("articleid",types.LongType(),True)
    struct = struct.add("date_time",types.TimestampType(), True)
    struct = struct.add("deleted",types.BooleanType(), True)
    struct = struct.add("editor",types.StringType(),True)
    struct = struct.add("editor_id",types.LongType(), True)
    struct = struct.add("minor", types.BooleanType(), True)
    struct = struct.add("namespace", types.LongType(), True)
    struct = struct.add("revert", types.BooleanType(), True)
    struct = struct.add("reverteds", types.StringType(), True)
    struct = struct.add("revid", types.LongType(), True)
    struct = struct.add("sha1", types.StringType(), True)
    struct = struct.add("text_chars", types.LongType(), True)
    struct = struct.add("title",types.StringType(), True)

This is a little bit tedious, but it is necessary for Spark to work effectively on tsv data. If you are reading binary format such as Parquet (which is recommended, and easy to create using Spark) then you can skip this.

    df = reader.csv(files,
                    sep='\t',
                    inferSchema=False,
                    header=True,
                    mode="PERMISSIVE",
                    schema = struct)

This reads the data into a Spark Dataframe. Spark Dataframes are more like SQL tables than they are like pandas DataFrames. Spark Dataframes are pretty abstract and can live on memory or on disk. Operations on Spark Dataframes are lazily evaluated, Spark will not actually run computations on your data until it has to. Calling df.show() will print the dataframe and trigger execution. mode="PERMISSIVE" stops Spark from giving up if it hits malformed rows.

    df = df.repartition(args.num_partitions)

The first thing to do after reading the data is to repartition the data. This determines the number of files that spark will output. Choosing the right number of partitions isn't really an exact science. Having more partitions makes some operations more efficient and can make other operations slower. The rule of thumb is that the number of partitions increases linearly with the amount of data. 500 partitions seems pretty good for English Wikipedia. If you are interested this page is good. Now we are ready to build some variables. The first thing we are going to do is to create a new column editor_id_or_ip.

    # replace na editor ids                                                                                                                                                                                        
    df = df.select('*',f.coalesce(df['editor_id'],df['editor']).alias('editor_id_or_ip'))

The first argument to select '*' causes select to return all the columns. Next we call the coalesce function which creates a new column with the value of editor_id if editor_id is not null and the value of editor if editor_id is null. The call to alias gives the new column a name. If you are familiar with SQL programming, this might seem familiar. You could write it as SELECT *, COALESCE(editor_id, editor) AS editor_id_or_ip.

Next we are going to identify the edits that revert each edit. reverteds lists the edits that the edit has reverted.

    # assign which edit reverted what edit                                                                                                                                                                         
    reverteds_df = df.filter(~ df.reverteds.isNull()).select(['revid','reverteds'])

This line creates a new Spark Dataframe out of the rows of the first dataframe that have a value for reverteds with the columns revid and reverteds.

    reverteds_df = reverteds_df.select("*", f.split(reverteds_df.reverteds,',').alias("reverteds_new"))

The above line converts reverteds from a string to an array.

    reverteds_df = reverteds_df.drop("reverteds")
    reverteds_df = reverteds_df.withColumnRenamed("reverteds_new", "reverteds")

The above two lines remove the old "reverteds" column, which was a string, and replaces it with the array column. This is required because unlike pandas, Spark dataframes do not have a column assignment syntax.

    reverteds_df = reverteds_df.select(reverteds_df.revid.alias('reverted_by'),
                                       f.explode(reverteds_df.reverteds).alias('reverted_id'))

The most important part of the above is the function call to explode. Explode💥 unfolds the array so that we get one line for each element of the array. Now we can join reverteds_df with df to put the reverted_by column in df.

    df = df.join(reverteds_df, df.revid == reverteds_df.reverted_id, how='left_outer')
    df.drop("reverted_id")
    del(reverteds_df)

Join the two tables so that each revision that was reverted gets a value for reverted_by. There are many kinds of joins and there is some detail on this in the Join help section of this page. The join is a left_outer join so we keep all the rows of df even the rows that don't have a value for reverted_id in reverteds_df. We remove the redundent reverted_id column and are don with building reverted_by. Next we add a column that counts the number of times a given editor has made a revert (this is called a cumulative count). Since we aren't going to use reverteds_df again we can call del(reveteds_df). This tells spark it is free to remove the object from storage and can improve performance.

    # sort by datetime                                                                                                                                                                                             
    df = df.orderBy(df.date_time.asc())

orderBy sorts the dataframe by date.

    win = Window.orderBy('date_time').partitionBy('editor_id_or_ip')

The above defines a WindowSpec, which is a kind of object that can be used to define rolling aggregations. We are going to use the rank function to perform the cumulative count, and rank requires a WindowSpec. The WindowSpec that we made says that we are grouping at the level of editor_id_or_ip and that we want to operate on each row of each group in chronological order.

    # count reverts                                                                                                                                                                                                
    reverts_df = df.filter(df.revert==True).select(['revid','editor_id_or_ip','date_time','revert'])

The above creates a new table that only has reverts.

    reverts_df = reverts_df.withColumn('editor_nth_revert',f.rank().over(win))

This applies the rank function over the window to perform the cumulative count of the reverts. The withColumn function adds a new column to the dataframe called editor_nth_revert.

    df = df.join(reverts_df, ["revid",'editor_id_or_ip','date_time','revert'], how='left_outer')
    del(reverts_df)

Above we perform the join to add the new column to df. We join on all of the columns ["revid",'editor_id_or_ip','date_time','revert'] so that duplicate columns are not created in the df

    # count edits                                                                                                                                                                                                  
    df = df.withColumn('year', f.year(df.date_time))
    df = df.withColumn('month',f.month(df.date_time))

Using withColumn again to illustrate creating some calendar variables from the date_time.

    df = df.withColumn('editor_nth_edit',f.rank().over(win))

We can reuse the WindowSpec to get the cumulative count for all edits as opposed to all reverts.

    # output                                                                                                                                                                                                       
    if not path.exists(args.output_dir):
        mkdir(args.output_dir)
    if args.output_format == "csv" or args.output_format == "tsv":
        df.write.csv(args.output_dir, sep='\t', mode='overwrite',header=True,timestampFormat="yyyy-MM-dd HH:mm:ss")

Instead of writing our output to a single file, we output to a directory. Spark will write 1 file for each partition to the directory.

            # format == "parquet"                                                                                                                                                                                  
    else:
        df.write.parquet(args.output_dir, mode='overwrite')

It is also easy to write to parquet.

Starting a Spark cluster with many nodes on Hyak[edit]

It is pretty easy to start up a multiple node cluster on Hyak.

If you have /com/local/bin in your $PATH then you should be able to run:

    get_spark_nodes.sh 4

To checkout 4 nodes that can be used as a Spark cluster. The spark cluster will have 4 worker nodes, one of these is also the "master" node. When you run get_spark_nodes.sh you will be routed to the machine that will become the master. If you only want 2 nodes just do

    get_spark_nodes.sh 2

After you get the nodes and have a shell on the master node run

   start_spark_cluster.sh

This will setup the cluster. Make sure you start up the cluster from the same session you used to get_spark_nodes -- otherwise, the startup script doesn't have access to the assigned node list and will fail. Take note of the node that is assigned to be the master, and use that information to set your $SPARK_MASTER environment variable, for example export SPARK_MASTER="n0650". The program spark-submit submits your script to the running Spark cluster.

    spark-submit --master  spark://$SPARK_MASTER:18899 your_script.py [Arguments to your script here].

For example, we can submit the script we used in the walkthrough as:

    spark-submit --master  spark://$SPARK_MASTER:18899 wikiq_users_spark.py --output-format tsv  -i "/com/output/wikiq-enwiki-20180301/enwiki-20180301-pages-meta-history*.tsv" -o  "/com/output/wikiq-users-enwiki-20180301-tsv/" --num-partitions 500

When you have a spark cluster running, it will serve some nice monitoring tools on ports 8989 and 4040 of the master. You can build an ssh tunnel between your laptop and these nodes to monitor the progress of your spark jobs.

When you are done with the cluster, you should shut it down using the script in $SPARK_HOME/sbin

   $SPARK_HOME/sbin/stop-all.sh

Monitoring the cluster[edit]

From a login node (hyak):

    ssh -L localhost:8989:localhost:8989 $SPARK_MASTER -N -f && ssh -L localhost:4040:localhost:4040 $SPARK_MASTER -n -F

From your laptop:

    ssh -L localhost:8989:localhost:8989 hyak -N -f && ssh -L localhost:4040:localhost:4040 hyak -n -F

Point your browser to localhost:8989 to see the cluster status and to localhost:4040 to monitor jobs.

Setting up Spark on your laptop[edit]

You might want to have a working stand alone Spark installation on your laptop. You should develop your Spark code on your laptop before running on hyak. To get spark working on your laptop you need to first install the Oracle Java Development Toolkit (Oracle JDK) and then install Spark.

Installing Java[edit]

To install java, all that should be required is to download and unzip the software and then set the $JAVA_HOME environment variable and add the java programs to your $PATH . We have Java 8 on Hyak.

  1. Download the java jdk appropriate for your Operating System from here.
  2. Unpack the archive where you want, for example /home/you/Oracle_JDK.
  3. Edit your environment variables (i.e. in your .bashrc) to
    JAVA_HOME=/home/you/Oracle_JDK/
    PATH=$JAVA_HOME/bin:$PATH


Installing Spark[edit]

Now we can install spark.

  1. Download the latest version of spark 'prebuilt for Apache Hadoop 2.7 or later'. From here
  2. Extract the archive. (i.e. to /home/you/spark)
  3. Set the $SPARK_HOME environment variable and update your path. I.e. update your .bashrc to set: SPARK_HOME=/home/you/spark .
  4. Test your spark install by running $SPARK_HOME/bin/pyspark

You should see this:

    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /__ / .__/\_,_/_/ /_/\_\   version 2.3.1
          /_/


For working with Python you also want to add pyspark (Spark Python bindings) to your $PYTHONPATH.

Add to your .bashrc:

    export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH

Tips, Recipes, and Resources[edit]

To gain access to various useful SparkContext functions, you need to instantiate a pointer to the context which encloses your session. It seems to be common for Spark users to call this pointer sc, e.g. after you do

    spark = SparkSession.builder.getOrCreate()

add a line like

    sc = spark.sparkContext

and then you can use sc to access the functions described here: [1].

To create an empty dataframe[edit]

One way to create an empty dataframe is to generate a schema as in the example script, and then pass the schema into the create method, with an empty RDD object as data.

    myAwesomeDataset = spark.createDataFrame(data=sc.emptyRDD(), schema=myGroovySchema)

Pyspark string slicing seems to be non-pythonic[edit]

  1. Strings begin at 1, not 0. [0:5] will yield the same results as [1:5].
  2. [x:y] will give you "give me y total characters, starting with the char in position x."

So, given a column in articleDF called timestamp, with contents like....

   20150701000000

You can access 2015 with

    articleDF.timestamp[1:4]

And to get 07:

    articleDF.timestamp[5:2]

When Reading In Multiple Files with Different Schemas[edit]

Make sure you re-instantiate your reader object, e.g.

    sparkReader = spark.read

when changing to a new file. The reader may cache the schema of the previous file and fail to detect the new schema. To make sure you have what you're expecting, try a

    if DEBUG:
        yourDataset.show()

This will get you the same behavior as pandas print(yourDataset.head()) -- 20 rows, nicely formatted in your stdout.

Note that calling show(), while filtered to show 20 rows, still causes all steps of the job to execute, and can take a long time.

Getting Help & Useful Links[edit]

Getting help with Spark in the usual fora such as Stack Exchange or even a straight-up Google search seems to be a less effective strategy for pyspark than it is for normal python queries and errors. Specifying pyspark in your search terms helps in getting only Python answers, but debugging an error may require looking at Java documentation, and some online recipe blogs speak of writing code "in Spark", so presumably the console language.

The apache spark site at https://spark.apache.org/ is useful but not all of the example code is localized to python.

Join help[edit]

There's some good info about joins here: http://www.learnbymarketing.com/1100/pyspark-joins-by-example/

Java Errors and Responses[edit]

When I got:

   spark java.io.IOException: No subfolder can be created in .

the University of Google told me it was a disk space issue. But shutting down the cluster and restarting it solved the problem -- maybe it hadn't shut down cleanly the last time someone used spark.

Slurm kills my job! Encountering Memory Limits[edit]

In theory, Spark enables you to run computations on data of any size without memory limitations. In practice, memory management issues occur. We are trying to understand these issues and to learn how to write Spark scripts that don't overuse memory.

Things to try if you run out of memory[edit]

  1. The 'out of memory' may be ephemeral or due to memory management issues in some layer other than your code -- try starting up a new cluster and running the same job unchanged.
  2. Repartition your data: increasing the number of partitions should make it easier for the Spark scheduler to avoid exceeding its memory limits.
  3. Increase the number of nodes. This can solve the problem essentially by giving Spark more Ram to work with.
  4. Be careful moving data out of distributed Spark objects into normal python objects. Work your way up from a small sample to a larger sample.
  5. You might try tweaking memory management options in $SPARK_HOME/conf/spark-env.sh and <$SPARK_HOME/conf/spark-defaults.conf. Decreasing the number of executors, and the total memory allocated to executors should make Spark more resilient at the cost of performance.

Errors While Starting Cluster[edit]

Sometimes I get errors like this:

   n0650: failed to launch: nice -n 0 /com/local/spark/bin/spark-class org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://n0649:18899

Usually it seems to happen if I relinquish my spark cluster (whether I use the kill script or not) and then immediately restart one. The error goes away if I shut down and wait a minute or two before re-launching; my assumption is that there's some hygienic work being done behind the scenes that the scheduler doesn't know about and I need to let that finish.

And sometimes I get errors like this:

scontrol: error: host list is empty

when trying to launch spark. This means I'm in a session that doesn't know what nodes are assigned to the spark cluster. This will launch a dysfunctional cluster. Stop-all and then start_spark_cluster from the same session you landed in when you ran get_spark_nodes.

When I get errors about full logs, I go in and clean up the temporary files folder it refers to.