Not logged in
Talk
Contributions
Create account
Log in
Navigation
Main page
About
People
Publications
Teaching
Resources
Research Blog
Wiki Functions
Recent changes
Help
Licensing
Project page
Discussion
Edit
View history
Editing
CommunityData:Hyak Spark
(section)
From CommunityData
Jump to:
navigation
,
search
Warning:
You are not logged in. Your IP address will be publicly visible if you make any edits. If you
log in
or
create an account
, your edits will be attributed to your username, along with other benefits.
Anti-spam check. Do
not
fill this in!
= Getting Started with Spark = == Spark on Hyak == If you are already set up on Hyak following the instructions on [CommunityData:Hyak] then you should already have a working spark installation on Hyak. Test this by running pyspark from a hyak cluster node (directly on the login node will give you an insufficient memory error). You should see this: Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.4.4 /_/ If so then you are ready to start running Spark programs on a single node. If you don't see this, then you need to check your <code>$SPARK_HOME</code>, <code>$PATH</code>, <code>$JAVA_HOME</code>, and <code>$PYTHONPATH</code> environment variables. If you are using the [[CommunityData:Hyak-Mox | cdsc mox setup]] then you should have a working spark configuration in your environment already. Otherwise, you'll need to have the following in your .bashrc (remember to source .bashrc or re-login to load up these changes): <syntaxhighlight lang="bash"> export JAVA_HOME='/com/local/java/' export PATH="$JAVA_HOME/bin:$PATH" export SPARK_HOME='/com/local/spark' export PATH="$SPARK_HOME/bin":$PATH export PYTHONPATH="$SPARK_HOME/python:"$PYTHONPATH export TMPDIR="/com/users/[YOU]/tmpdir" </syntaxhighlight> You can also run spark programs on many nodes, but this requires additional steps. These are described below. == Spark Walkthrough == Spark programming is somewhat different from normal python programming. This section will walk you through a script to help you learn how to work with Spark. You may find this script useful as a template for building variables on top of [[wikiq]] data. This section presents a pyspark program that # Reads wikiq tsvs # Computes the nth edit for each editor # For edits that were reverted, identify the edit that made the revert. # Output tsvs with the new variables. The script is on ikt here: /com/users/nathante/mediawiki_dump_tools/wikiq_users/wikiq_users_spark.py <syntaxhighlight lang="python"> #!/usr/bin/env python3 import sys from pyspark import SparkConf from pyspark.sql import SparkSession, SQLContext from pyspark.sql import Window import pyspark.sql.functions as f from pyspark.sql import types import argparse import glob from os import mkdir from os import path </syntaxhighlight> This part imports some python utilities that we will use. You can pretty safely treat the <code>SparkConf</code>, the <code>SparkSession</code> and the <code>SQLContext</code> imports as magic that creates a spark environment that supports working with Spark's SQL features. <code>Window</code> is used to create Window functions. We will use a window function to count the nth edit made by each editor. <code>import pyspark.sql.functions as f</code> provides built in functions that can be applied to data in spark data frames. <code>types</code> are data types that we will use to specify the scheme for reading wikiq files. <syntaxhighlight lang="python"> def parse_args(): parser = argparse.ArgumentParser(description='Create a dataset of edits by user.') parser.add_argument('-i', '--input-file', help='Tsv file of wiki edits. Supports wildcards ', required=True, type=str) parser.add_argument('-o', '--output-dir', help='Output directory', default='./output', type=str) parser.add_argument('--output-format', help = "[csv, parquet] format to output",type=str) parser.add_argument('--num-partitions', help = "number of partitions to output",type=int, default=1) args = parser.parse_args() return(args) </syntaxhighlight> Above is just a function to build a command line interface. <syntaxhighlight lang="python"> if __name__ == "__main__": conf = SparkConf().setAppName("Wiki Users Spark") spark = SparkSession.builder.getOrCreate() </syntaxhighlight> Now we are in the main function of the script. The above two lines complete setting up spark. If you are going to run this program on a multi-node cluster, then it would be nice to set the AppName to something friendly. This will be used by the job monitoring tools. <syntaxhighlight lang="python"> args = parse_args() files = glob.glob(args.input_file) files = [path.abspath(p) for p in files] </syntaxhighlight> Spark is designed to read and write lists of files. The <code>args.input_file</code> uses <code>glob</code> to accept wildcards. The above lines build a list of files from the argument. <syntaxhighlight lang="python"> reader = spark.read </syntaxhighlight> This creates a reader object that can read files. We are starting to get down to business. Next we will specify the schema for the files that we will read in. This is important so that spark can run efficiently and operate on the correct data types. <syntaxhighlight lang="python"> # build a schema struct = types.StructType().add("anon",types.StringType(),True) struct = struct.add("articleid",types.LongType(),True) struct = struct.add("date_time",types.TimestampType(), True) struct = struct.add("deleted",types.BooleanType(), True) struct = struct.add("editor",types.StringType(),True) struct = struct.add("editor_id",types.LongType(), True) struct = struct.add("minor", types.BooleanType(), True) struct = struct.add("namespace", types.LongType(), True) struct = struct.add("revert", types.BooleanType(), True) struct = struct.add("reverteds", types.StringType(), True) struct = struct.add("revid", types.LongType(), True) struct = struct.add("sha1", types.StringType(), True) struct = struct.add("text_chars", types.LongType(), True) struct = struct.add("title",types.StringType(), True) </syntaxhighlight> This is a little bit tedious, but it is necessary for Spark to work effectively on tsv data. If you are reading binary format such as Parquet (which is recommended, and easy to create using Spark) then you can skip this. <syntaxhighlight lang="python"> df = reader.csv(files, sep='\t', inferSchema=False, header=True, mode="PERMISSIVE", schema = struct) </syntaxhighlight> This reads the data into a Spark Dataframe. Spark Dataframes are more like SQL tables than they are like pandas DataFrames. Spark Dataframes are pretty abstract and can live on memory or on disk. Operations on Spark Dataframes are lazily evaluated, Spark will not actually run computations on your data until it has to. Calling <code>df.show()</code> will print the dataframe and trigger execution. <code>mode="PERMISSIVE"</code> stops Spark from giving up if it hits malformed rows. <syntaxhighlight lang="python"> df = df.repartition(args.num_partitions) </syntaxhighlight> The first thing to do after reading the data is to <code>repartition</code> the data. This determines the number of files that spark will output. Choosing the right number of partitions isn't really an exact science. Having more partitions makes some operations more efficient and can make other operations slower. The rule of thumb is that the number of partitions increases linearly with the amount of data. 500 partitions seems pretty good for English Wikipedia. If you are interested [https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-partitions.html this page] is good. Now we are ready to build some variables. The first thing we are going to do is to create a new column <code>editor_id_or_ip</code>. <syntaxhighlight lang="python"> # replace na editor ids df = df.select('*',f.coalesce(df['editor_id'],df['editor']).alias('editor_id_or_ip')) </syntaxhighlight> The first argument to select <code>'*'</code> causes select to return all the columns. Next we call the <code>coalesce</code> function which creates a new column with the value of <code>editor_id</code> if <code>editor_id</code> is not null and the value of <code>editor</code> if <code>editor_id</code> is null. The call to <code>alias</code> gives the new column a name. If you are familiar with SQL programming, this might seem familiar. You could write it as <code>SELECT *, COALESCE(editor_id, editor) AS editor_id_or_ip</code>. Next we are going to identify the edits that revert each edit. <code>reverteds</code> lists the edits that the edit has reverted. <syntaxhighlight lang="python"> # assign which edit reverted what edit reverteds_df = df.filter(~ df.reverteds.isNull()).select(['revid','reverteds']) </syntaxhighlight> This line creates a new Spark Dataframe out of the rows of the first dataframe that have a value for <code>reverteds</code> with the columns <code>revid</code> and <code>reverteds</code>. <syntaxhighlight lang="python"> reverteds_df = reverteds_df.select("*", f.split(reverteds_df.reverteds,',').alias("reverteds_new")) </syntaxhighlight> The above line converts <code>reverteds</code> from a string to an array. <syntaxhighlight lang="python"> reverteds_df = reverteds_df.drop("reverteds") reverteds_df = reverteds_df.withColumnRenamed("reverteds_new", "reverteds") </syntaxhighlight> The above two lines remove the old "reverteds" column, which was a string, and replaces it with the array column. This is required because unlike pandas, Spark dataframes do not have a column assignment syntax. <syntaxhighlight lang="python"> reverteds_df = reverteds_df.select(reverteds_df.revid.alias('reverted_by'), f.explode(reverteds_df.reverteds).alias('reverted_id')) </syntaxhighlight> The most important part of the above is the function call to <code>explode</code>. Explode💥 unfolds the array so that we get one line for each element of the array. Now we can join <code>reverteds_df</code> with <code>df</code> to put the <code>reverted_by</code> column in <code>df</code>. <syntaxhighlight lang="python"> df = df.join(reverteds_df, df.revid == reverteds_df.reverted_id, how='left_outer') df.drop("reverted_id") del(reverteds_df) </syntaxhighlight> Join the two tables so that each revision that was reverted gets a value for <code>reverted_by</code>. There are many kinds of joins and there is some detail on this in the [[#Join help|Join help section of this page]]. The join is a <code>left_outer</code> join so we keep all the rows of <code>df</code> even the rows that don't have a value for <code>reverted_id</code> in <code>reverteds_df</code>. We remove the redundent <code>reverted_id</code> column and are don with building <code>reverted_by</code>. Next we add a column that counts the number of times a given editor has made a revert (this is called a ''cumulative count''). Since we aren't going to use <code>reverteds_df</code> again we can call <code>del(reveteds_df)</code>. This tells spark it is free to remove the object from storage and can improve performance. <syntaxhighlight lang="python"> # sort by datetime df = df.orderBy(df.date_time.asc()) </syntaxhighlight> <code>orderBy</code> sorts the dataframe by date. <syntaxhighlight lang="python"> win = Window.orderBy('date_time').partitionBy('editor_id_or_ip') </syntaxhighlight> The above defines a <code>WindowSpec</code>, which is a kind of object that can be used to define rolling aggregations. We are going to use the <code>rank</code> function to perform the cumulative count, and <code>rank</code> requires a <code>WindowSpec</code>. The WindowSpec that we made says that we are grouping at the level of <code>editor_id_or_ip</code> and that we want to operate on each row of each group in chronological order. <syntaxhighlight lang="python"> # count reverts reverts_df = df.filter(df.revert==True).select(['revid','editor_id_or_ip','date_time','revert']) </syntaxhighlight> The above creates a new table that only has reverts. <syntaxhighlight lang="python"> reverts_df = reverts_df.withColumn('editor_nth_revert',f.rank().over(win)) </syntaxhighlight> This applies the <code>rank</code> function over the window to perform the cumulative count of the reverts. The <code>withColumn</code> function adds a new column to the dataframe called <code>editor_nth_revert</code>. <syntaxhighlight lang="python"> df = df.join(reverts_df, ["revid",'editor_id_or_ip','date_time','revert'], how='left_outer') del(reverts_df) </syntaxhighlight> Above we perform the join to add the new column to <code>df</code>. We join on all of the columns <code> ["revid",'editor_id_or_ip','date_time','revert']</code> so that duplicate columns are not created in the <code>df</code> <syntaxhighlight lang="python"> # count edits df = df.withColumn('year', f.year(df.date_time)) df = df.withColumn('month',f.month(df.date_time)) </syntaxhighlight> Using <code>withColumn</code> again to illustrate creating some calendar variables from the <code>date_time</code>. <syntaxhighlight lang="python"> df = df.withColumn('editor_nth_edit',f.rank().over(win)) </syntaxhighlight> We can reuse the <code>WindowSpec</code> to get the cumulative count for all edits as opposed to all reverts. <syntaxhighlight lang="python"> # output if not path.exists(args.output_dir): mkdir(args.output_dir) if args.output_format == "csv" or args.output_format == "tsv": df.write.csv(args.output_dir, sep='\t', mode='overwrite',header=True,timestampFormat="yyyy-MM-dd HH:mm:ss") </syntaxhighlight> Instead of writing our output to a single file, we output to a directory. Spark will write 1 file for each partition to the directory. <syntaxhighlight lang="python"> # format == "parquet" else: df.write.parquet(args.output_dir, mode='overwrite') </syntaxhighlight> It is also easy to write to parquet. == Starting a Spark cluster with many nodes on Hyak == It is pretty easy to start up a multiple node cluster on Hyak. If you have <code> /com/local/bin </code> in your <code> $PATH </code> then you should be able to run: <syntaxhighlight lang="bash"> get_spark_nodes.sh 4 </syntaxhighlight> To checkout 4 nodes that can be used as a Spark cluster. The spark cluster will have 4 worker nodes, one of these is also the "master" node. When you run <code> get_spark_nodes.sh </code> you will be routed to the machine that will become the master. If you only want 2 nodes just do <syntaxhighlight lang="bash"> get_spark_nodes.sh 2 </syntaxhighlight> After you get the nodes and have a shell on the master node run <syntaxhighlight lang="bash"> start_spark_cluster.sh </syntaxhighlight> This will setup the cluster. Make sure you start up the cluster from the same session you used to get_spark_nodes -- otherwise, the startup script doesn't have access to the assigned node list and will fail. Take note of the node that is assigned to be the master, and use that information to set your $SPARK_MASTER environment variable, for example <code>export SPARK_MASTER="n0650"</code>. The program <code> spark-submit </code> submits your script to the running Spark cluster. <syntaxhighlight lang="bash"> spark-submit --master spark://$SPARK_MASTER:18899 your_script.py [Arguments to your script here]. </syntaxhighlight> For example, we can submit the script we used in the walkthrough as: <syntaxhighlight lang="bash"> spark-submit --master spark://$SPARK_MASTER:18899 wikiq_users_spark.py --output-format tsv -i "/com/output/wikiq-enwiki-20180301/enwiki-20180301-pages-meta-history*.tsv" -o "/com/output/wikiq-users-enwiki-20180301-tsv/" --num-partitions 500 </syntaxhighlight> When you have a spark cluster running, it will serve some nice monitoring tools on ports 8989 and 4040 of the master. You can build an ssh tunnel between your laptop and these nodes to monitor the progress of your spark jobs. When you are done with the cluster, you should shut it down using the script in <code> $SPARK_HOME/sbin </code> <syntaxhighlight lang="bash"> $SPARK_HOME/sbin/stop-all.sh </syntaxhighlight> === Monitoring the cluster === From a login node (hyak): <syntaxhighlight lang="bash"> ssh -L localhost:8989:localhost:8989 $SPARK_MASTER -N -f && ssh -L localhost:4040:localhost:4040 $SPARK_MASTER -n -F </syntaxhighlight> From your laptop: <syntaxhighlight lang="bash"> ssh -L localhost:8989:localhost:8989 hyak -N -f && ssh -L localhost:4040:localhost:4040 hyak -n -F </syntaxhighlight> Point your browser to localhost:8989 to see the cluster status and to localhost:4040 to monitor jobs. == Setting up Spark on your laptop == You might want to have a working stand alone Spark installation on your laptop. You should develop your Spark code on your laptop before running on hyak. To get spark working on your laptop you need to first install the Oracle Java Development Toolkit (Oracle JDK) and then install Spark. === Installing Java === To install java, all that should be required is to download and unzip the software and then set the <code> $JAVA_HOME </code> environment variable and add the java programs to your <code> $PATH </code>. We have Java 8 on Hyak. # Download the java jdk appropriate for your Operating System from [http://www.oracle.com/technetwork/pt/java/javase/downloads/jdk8-downloads-2133151.html here]. # Unpack the archive where you want, for example <code>/home/you/Oracle_JDK</code>. # Edit your environment variables (i.e. in your .bashrc) to <syntaxhighlight lang="bash"> JAVA_HOME=/home/you/Oracle_JDK/ PATH=$JAVA_HOME/bin:$PATH </syntaxhighlight> === Installing Spark === Now we can install spark. # Download the latest version of spark 'prebuilt for Apache Hadoop 2.7 or later'. From [http://spark.apache.org/downloads.html here] # Extract the archive. (i.e. to /home/you/spark) # Set the <code> $SPARK_HOME </code> environment variable and update your path. I.e. update your .bashrc to set: <code> SPARK_HOME=/home/you/spark </code>. # Test your spark install by running <code> $SPARK_HOME/bin/pyspark </code> You should see this: <syntaxhighlight lang="bash"> Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.3.1 /_/ </syntaxhighlight> For working with Python you also want to add pyspark (Spark Python bindings) to your $PYTHONPATH. Add to your .bashrc: <syntaxhighlight lang="bash"> export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH </syntaxhighlight>
Summary:
Please note that all contributions to CommunityData are considered to be released under the Attribution-Share Alike 3.0 Unported (see
CommunityData:Copyrights
for details). If you do not want your writing to be edited mercilessly and redistributed at will, then do not submit it here.
You are also promising us that you wrote this yourself, or copied it from a public domain or similar free resource.
Do not submit copyrighted work without permission!
To protect the wiki against automated edit spam, we kindly ask you to solve the following CAPTCHA:
Cancel
Editing help
(opens in new window)
Tools
What links here
Related changes
Special pages
Page information