CommunityData:Hyak Spark

From CommunityData
Revision as of 21:57, 24 August 2018 by Groceryheist (talk | contribs)

Apache Spark is a powerful system for writing programs that deal with large datasets. The most likely reason for using Spark on Hyak is that you run into memory limitations when building variables. For example suppose you want to compute:

  • The number of prior edits a Wikipedia editor has made, for every editor and every edit.
  • The number of views for every Wikipedia page for every month and every edit.
  • The number of times every Reddit user has commented on every thread on every subreddit for every week.

You might try writing a program to build these variables using a data science tool like pandas in Python or data.table, or plyr in R. These common data science tools are powerful, expressive, and fast, but do not work when data does not fit in memory. When a table does not fit in memory, but the computation you want to do only requires operating on one row at a time (such as in a simple transformation or aggregation), you can often work around this limitation by writing a simple custom program that operates in a streaming fashion. However, when computation cannot be done one row at a time, such as in a sort, group by, or join a streaming solution will not work. In this case your options are limited. One option is writing bespoke code to perform the required operations and building variables. However, this can be technically challenging and time consuming work. Moreover, your eventual solution is likely to be relatively slow and difficult to extend or maintain compared to a solution build using Spark. A number of us (Nate, Jeremy, Kaylea) have all at some point written bespoke code for computing user-level variables on Wikipedia data. The infamous "million file problem" is a result from abusing the filesystem to perform a massive group by.

This page will help you decide if you should use Spark on Hyak for your problem and provide instructions on how to get started.

Pros and Cons of Spark

The main advantages of Spark on Hyak:

  1. Work with "big data" without ever running out of memory.
  2. You get very good parallelism for free.
  3. Distribute computational work across many hyak nodes so your programs run faster.
  4. Common database operations (select, join, groupby, filter) are pretty easy.
  5. Spark supports common statistical and analytical tasks (stratified sampling, summary and pairwise statistics, common and simple models).
  6. Spark is a trendy technology that lots of people know or want to learn.

The main disadvantages of Spark are

  1. It takes several steps to get the cluster up and running.
  2. The programming paradigm is not super intuitive, especially if you are not familiar with SQL databases or lazy evaluation.
  3. Effectively developing your spark code means getting it setup on your own laptop, which isn't trivial.
  4. Doing more advanced things requires programming in Scala.

Getting Started with Spark

Spark on Hyak

If you are already set up on Hyak following the instructions on [CommunityData:Hyak] then you should already have a working spark installation on Hyak. Test this by running

  pyspark

You should see this:

   Welcome to
         ____              __
        / __/__  ___ _____/ /__
       _\ \/ _ \/ _ `/ __/  '_/
      /__ / .__/\_,_/_/ /_/\_\   version 2.3.1
         /_/

If so then you are ready to start running Spark programs on a single node. If you don't see this, then you need to check your $SPARK_HOME, $PATH, $JAVA_HOME, and $PYTHONPATH environment variables.

You should have the following in your .bashrc:

   export JAVA_HOME='/com/local/java/'
   export PATH="$JAVA_HOME/bin:$PATH"
   export SPARK_HOME='/com/local/spark'
   export PATH="$SPARK_HOME/bin":$PATH
   export PYTHONPATH="$SPARK_HOME/python:"$PYTHONPATH

You can also run spark programs on many nodes, but this requires additional steps. These are described below.

Spark Walkthrough

Spark programs is somewhat different from normal python programming. This section will walk you through a script to help you learn how to work with Spark. You may find this script useful as a template for building variables on top of wikiq data.

This section presents a pyspark program that

  1. Reads wikiq tsvs
  2. Computes the nth edit for each editor
  3. For edits that were reverted, identify the edit that made the revert.
  4. Output tsvs with the new variables.

The script is on itk here: /com/users/nathante/mediawiki_dump_tools/wikiq_users/wikiq_users_spark.py

   #!/usr/bin/env python3                                                                                                                                                                                             
   import sys
   from pyspark import SparkConf
   from pyspark.sql import SparkSession, SQLContext
   from pyspark.sql import Window
   import pyspark.sql.functions as f
   from pyspark.sql import types
   import argparse
   import glob
   from os import mkdir
   from os import path

This part imports some python utilities that we will use. You can pretty safely treat the SparkConf , the SparkSession and the SQLContext imports as magic that creates a spark environment that supports working with Spark's SQL features. Window is used to create Window functions. We will use a window function to count the nth edit made by each editor. import pyspark.sql.functions as f provides built in functions that can be applied to data in spark data frames. types are data types that we will use to specify the scheme for reading wikiq files.

   def parse_args():
       parser = argparse.ArgumentParser(description='Create a dataset of edits by user.')
       parser.add_argument('-i', '--input-file', help='Tsv file of wiki edits. Supports wildcards ', required=True, type=str)
       parser.add_argument('-o', '--output-dir', help='Output directory', default='./output', type=str)
       parser.add_argument('--output-format', help = "[csv, parquet] format to output",type=str)
       parser.add_argument('--num-partitions', help = "number of partitions to output",type=int, default=1)
       args = parser.parse_args()
       return(args)

Above is just a function to build a command line interface.

   if __name__ == "__main__":
       conf = SparkConf().setAppName("Wiki Users Spark")
       spark = SparkSession.builder.getOrCreate()

Now we are in the main function of the script. The above two lines complete setting up spark. If you are going to run this program on a multi-node cluster, then it would be nice to set the AppName to something friendly. This will be used by the job monitoring tools.

       args = parse_args()
       files = glob.glob(args.input_file)
       files = [path.abspath(p) for p in files]

Spark is designed to read and write lists of files. The args.input_file uses glob to accept wildcards. The above lines build a list of files from the argument.


   reader = spark.read

This creates a reader object that can read files. We are starting to get down to business. Next we will specify the schema for the files that we will read in. This is important so that spark can run efficiently and operate on the correct data types.

   # build a schema                                                                                                                                                                                               
   struct = types.StructType().add("anon",types.StringType(),True)
   struct = struct.add("articleid",types.LongType(),True)
   struct = struct.add("date_time",types.TimestampType(), True)
   struct = struct.add("deleted",types.BooleanType(), True)
   struct = struct.add("editor",types.StringType(),True)
   struct = struct.add("editor_id",types.LongType(), True)
   struct = struct.add("minor", types.BooleanType(), True)
   struct = struct.add("namespace", types.LongType(), True)
   struct = struct.add("revert", types.BooleanType(), True)
   struct = struct.add("reverteds", types.StringType(), True)
   struct = struct.add("revid", types.LongType(), True)
   struct = struct.add("sha1", types.StringType(), True)
   struct = struct.add("text_chars", types.LongType(), True)
   struct = struct.add("title",types.StringType(), True)

This is a little bit tedious, but it is necessary for Spark to work effectively on tsv data. If you are reading binary format such as Parquet (which is recommended, and easy to create using Spark) then you can skip this.

   df = reader.csv(files,
                   sep='\t',
                   inferSchema=False,
                   header=True,
                   mode="PERMISSIVE",
                   schema = struct)

This reads the data.

   df = df.repartition(args.num_partitions)

The first thing to do after reading the data is to repartition the data.

   # replace na editor ids                                                                                                                                                                                        
   df = df.select('*',f.coalesce(df['editor_id'],df['editor']).alias('new_editor_id'))
   df = df.drop("editor_id")
   df = df.withColumnRenamed('new_editor_id',"editor_id")
   # assign which edit reverted what edit                                                                                                                                                                         
   reverteds_df = df.filter(~ df.reverteds.isNull()).select(['revid','reverteds'])
   reverteds_df = reverteds_df.select("*", f.split(reverteds_df.reverteds,',').alias("reverteds_new"))
   reverteds_df = reverteds_df.drop("reverteds")
   reverteds_df = reverteds_df.withColumnRenamed("reverteds_new", "reverteds")
   reverteds_df = reverteds_df.select(reverteds_df.revid.alias('reverted_by'),
                                      f.explode(reverteds_df.reverteds).alias('reverted_id'))
   df = df.join(reverteds_df, df.revid == reverteds_df.reverted_id, how='left_outer')
   df.drop("reverted_id")
   # sort by datetime                                                                                                                                                                                             
   df = df.orderBy(df.date_time.asc())
   win = Window.orderBy('date_time').partitionBy('editor_id')
   # count reverts                                                                                                                                                                                                
   reverts_df = df.filter(df.revert==True).select(['revid','editor_id','date_time','revert'])
   reverts_df = reverts_df.withColumn('editor_nth_revert',f.rank().over(win))
   df = df.join(reverts_df, ["revid",'editor_id','date_time','revert'], how='left_outer')
   del(reverts_df)
   # count edits                                                                                                                                                                                                  
   df = df.withColumn('year', f.year(df.date_time))
   df = df.withColumn('month',f.month(df.date_time))
   df = df.withColumn('editor_nth_edit',f.rank().over(win))
   # output                                                                                                                                                                                                       
   if not path.exists(args.output_dir):
       mkdir(args.output_dir)
   if args.output_format == "csv" or args.output_format == "tsv":
       df.write.csv(args.output_dir, sep='\t', mode='overwrite',header=True,timestampFormat="yyyy-MM-dd HH:mm:ss")
           # format == "parquet"                                                                                                                                                                                  
   else:
       df.write.parquet(args.output_dir, mode='overwrite')
       # for writing to csv we need to urlencode      


Setting up Spark on your laptop

You might want to have a working stand alone Spark installation on your laptop. You should develop your Spark code on your laptop before running on hyak. To get spark working on your laptop you need to first install the Oracle Java Development Toolkit (Oracle JDK) and then install Spark.

Installing Java

To install java, all that should be required is to download and unzip the software and then set the $JAVA_HOME environment variable and add the java programs to your $PATH . We have Java 8 on Hyak.

  1. Download the java jdk appropriate for your Operating System from here.
  2. Unpack the archive where you want, for example /home/you/Oracle_JDK.
  3. Edit your environment variables (i.e. in your .bashrc) to
   JAVA_HOME=/home/you/Oracle_JDK/
   PATH=$JAVA_HOME/bin:$PATH 


Installing Spark

Now we can install spark.

  1. Download the latest version of spark 'prebuilt for Apache Hadoop 2.7 or later'. From here
  2. Extract the archive. (i.e. to /home/you/spark)
  3. Set the $SPARK_HOME environment variable and update your path. I.e. update your .bashrc to set: SPARK_HOME=/home/you/spark .
  4. Test your spark install by running $SPARK_HOME/bin/pyspark

You should see this:


   Welcome to
         ____              __
        / __/__  ___ _____/ /__
       _\ \/ _ \/ _ `/ __/  '_/
      /__ / .__/\_,_/_/ /_/\_\   version 2.3.1
         /_/


For working with Python you also want to add pyspark (Spark Python bindings) to your $PYTHONPATH.

Add to your .bashrc:

   export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH