Editing CommunityData:Hyak Spark
From CommunityData
Warning: You are not logged in. Your IP address will be publicly visible if you make any edits. If you log in or create an account, your edits will be attributed to your username, along with other benefits.
The edit can be undone. Please check the comparison below to verify that this is what you want to do, and then publish the changes below to finish undoing the edit.
Latest revision | Your text | ||
Line 9: | Line 9: | ||
This page will help you decide if you should use Spark on Hyak for your problem and provide instructions on how to get started. | This page will help you decide if you should use Spark on Hyak for your problem and provide instructions on how to get started. | ||
So far only Ikt is supported. | |||
== Pros and Cons of Spark == | == Pros and Cons of Spark == | ||
Line 43: | Line 43: | ||
/ __/__ ___ _____/ /__ | / __/__ ___ _____/ /__ | ||
_\ \/ _ \/ _ `/ __/ '_/ | _\ \/ _ \/ _ `/ __/ '_/ | ||
/__ / .__/\_,_/_/ /_/\_\ version 2. | /__ / .__/\_,_/_/ /_/\_\ version 2.3.1 | ||
/_/ | /_/ | ||
If so then you are ready to start running Spark programs on a single node. If you don't see this, then you need to check your <code>$SPARK_HOME</code>, <code>$PATH</code>, <code>$JAVA_HOME</code>, and <code>$PYTHONPATH</code> environment variables. | If so then you are ready to start running Spark programs on a single node. If you don't see this, then you need to check your <code>$SPARK_HOME</code>, <code>$PATH</code>, <code>$JAVA_HOME</code>, and <code>$PYTHONPATH</code> environment variables. | ||
You should have the following in your .bashrc: | |||
export JAVA_HOME='/com/local/java/' | export JAVA_HOME='/com/local/java/' | ||
export PATH="$JAVA_HOME/bin:$PATH" | export PATH="$JAVA_HOME/bin:$PATH" | ||
Line 57: | Line 56: | ||
export PYTHONPATH="$SPARK_HOME/python:"$PYTHONPATH | export PYTHONPATH="$SPARK_HOME/python:"$PYTHONPATH | ||
You can also run spark programs on many nodes, but this requires additional steps. These are described below. | |||
You can also run spark programs on many nodes, but this requires additional steps. These are described below. | |||
== Spark Walkthrough == | == Spark Walkthrough == | ||
Spark | Spark programs is somewhat different from normal python programming. This section will walk you through a script to help you learn how to work with Spark. You may find this script useful as a template for building variables on top of [[wikiq]] data. | ||
This section presents a pyspark program that | This section presents a pyspark program that | ||
Line 72: | Line 68: | ||
# Output tsvs with the new variables. | # Output tsvs with the new variables. | ||
The script is on | The script is on itk here: /com/users/nathante/mediawiki_dump_tools/wikiq_users/wikiq_users_spark.py | ||
#!/usr/bin/env python3 | #!/usr/bin/env python3 | ||
import sys | import sys | ||
Line 86: | Line 81: | ||
from os import mkdir | from os import mkdir | ||
from os import path | from os import path | ||
This part imports some python utilities that we will use. You can pretty safely treat the <code>SparkConf</code>, the <code>SparkSession</code> and the <code>SQLContext</code> imports as magic that creates a spark environment that supports working with Spark's SQL features. | This part imports some python utilities that we will use. You can pretty safely treat the <code> SparkConf </code>, the <code> SparkSession </code> and the <code> SQLContext </code> imports as magic that creates a spark environment that supports working with Spark's SQL features. | ||
<code>Window</code> is used to create Window functions. We will use a window function to count the nth edit made by each editor. <code>import pyspark.sql.functions as f</code> provides built in functions that can be applied to data in spark data frames. <code>types</code> are data types that we will use to specify the scheme for reading wikiq files. | <code> Window </code> is used to create Window functions. We will use a window function to count the nth edit made by each editor. <code> import pyspark.sql.functions as f </code> provides built in functions that can be applied to data in spark data frames. <code> types </code> are data types that we will use to specify the scheme for reading wikiq files. | ||
def parse_args(): | def parse_args(): | ||
parser = argparse.ArgumentParser(description='Create a dataset of edits by user.') | parser = argparse.ArgumentParser(description='Create a dataset of edits by user.') | ||
Line 100: | Line 93: | ||
args = parser.parse_args() | args = parser.parse_args() | ||
return(args) | return(args) | ||
Above is just a function to build a command line interface. | Above is just a function to build a command line interface. | ||
if __name__ == "__main__": | if __name__ == "__main__": | ||
conf = SparkConf().setAppName("Wiki Users Spark") | conf = SparkConf().setAppName("Wiki Users Spark") | ||
spark = SparkSession.builder.getOrCreate() | spark = SparkSession.builder.getOrCreate() | ||
Now we are in the main function of the script. The above two lines complete setting up spark. If you are going to run this program on a multi-node cluster, then it would be nice to set the AppName to something friendly. This will be used by the job monitoring tools. | Now we are in the main function of the script. The above two lines complete setting up spark. If you are going to run this program on a multi-node cluster, then it would be nice to set the AppName to something friendly. This will be used by the job monitoring tools. | ||
args = parse_args() | args = parse_args() | ||
files = glob.glob(args.input_file) | files = glob.glob(args.input_file) | ||
files = [path.abspath(p) for p in files] | files = [path.abspath(p) for p in files] | ||
Spark is designed to read and write lists of files. The <code>args.input_file</code> uses <code>glob</code> to accept wildcards. The above lines build a list of files from the argument. | Spark is designed to read and write lists of files. The <code> args.input_file </code> uses <code> glob </code> to accept wildcards. The above lines build a list of files from the argument. | ||
reader = spark.read | reader = spark.read | ||
This creates a reader object that can read files. We are starting to get down to business. Next we will specify the schema for the files that we will read in. This is important so that spark can run efficiently and operate on the correct data types. | This creates a reader object that can read files. We are starting to get down to business. Next we will specify the schema for the files that we will read in. This is important so that spark can run efficiently and operate on the correct data types. | ||
# build a schema | # build a schema | ||
struct = types.StructType().add("anon",types.StringType(),True) | struct = types.StructType().add("anon",types.StringType(),True) | ||
Line 142: | Line 128: | ||
struct = struct.add("text_chars", types.LongType(), True) | struct = struct.add("text_chars", types.LongType(), True) | ||
struct = struct.add("title",types.StringType(), True) | struct = struct.add("title",types.StringType(), True) | ||
This is a little bit tedious, but it is necessary for Spark to work effectively on tsv data. If you are reading binary format such as Parquet (which is recommended, and easy to create using Spark) then you can skip this. | This is a little bit tedious, but it is necessary for Spark to work effectively on tsv data. If you are reading binary format such as Parquet (which is recommended, and easy to create using Spark) then you can skip this. | ||
df = reader.csv(files, | df = reader.csv(files, | ||
sep='\t', | sep='\t', | ||
Line 153: | Line 137: | ||
mode="PERMISSIVE", | mode="PERMISSIVE", | ||
schema = struct) | schema = struct) | ||
This reads the data into a Spark Dataframe. Spark Dataframes are more like SQL tables than they are like pandas DataFrames. Spark Dataframes are pretty abstract and can live on memory or on disk. Operations on Spark Dataframes are lazily evaluated, Spark will not actually run computations on your data until it has to. Calling <code>df.show()</code> will print the dataframe and trigger execution. <code>mode="PERMISSIVE"</code> stops Spark from giving up if it hits malformed rows. | This reads the data into a Spark Dataframe. Spark Dataframes are more like SQL tables than they are like pandas DataFrames. Spark Dataframes are pretty abstract and can live on memory or on disk. Operations on Spark Dataframes are lazily evaluated, Spark will not actually run computations on your data until it has to. Calling <code> df.show() </code> will print the dataframe and trigger execution. <code> mode="PERMISSIVE" </code> stops Spark from giving up if it hits malformed rows. | ||
df = df.repartition(args.num_partitions) | df = df.repartition(args.num_partitions) | ||
The first thing to do after reading the data is to <code>repartition</code> the data. This determines the number of files that spark will output. Choosing the right number of partitions isn't really an exact science. Having more partitions makes some operations more efficient and can make other operations slower | The first thing to do after reading the data is to <code> repartition </code> the data. This determines the number of files that spark will output. Choosing the right number of partitions isn't really an exact science. Having more partitions makes some operations more efficient and can make other operations slower. If you are interested [https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-partitions.html this page[ is good. Now we are ready to build some variables. The first thing we are going to do is to create a new column <code> editor_id_or_ip </code>. | ||
# replace na editor ids | # replace na editor ids | ||
df = df.select('*',f.coalesce(df['editor_id'],df['editor']).alias('editor_id_or_ip')) | df = df.select('*',f.coalesce(df['editor_id'],df['editor']).alias('editor_id_or_ip')) | ||
The first argument to select <code>'*'</code> causes select to return all the columns. Next we call the <code>coalesce</code> function which creates a new column with the value of <code>editor_id</code> if <code>editor_id</code> is not null and the value of <code>editor</code> if <code>editor_id</code> is null. The call to <code>alias</code> gives the new column a name. If you are familiar with SQL programming, this might seem familiar. You could write it as <code>SELECT *, COALESCE(editor_id, editor) AS editor_id_or_ip</code>. | The first argument to select <code> '*' </code> causes select to return all the columns. Next we call the <code> coalesce </code> function which creates a new column with the value of <code> editor_id </code> if <code> editor_id </code> is not null and the value of <code> editor </code> if <code> editor_id </code> is null. The call to <code> alias </code> gives the new column a name. If you are familiar with SQL programming, this might seem familiar. You could write it as <code> SELECT *, COALESCE(editor_id, editor) AS editor_id_or_ip </code>. | ||
Next we are going to identify the edits that revert each edit. <code>reverteds</code> lists the edits that the edit has reverted. | Next we are going to identify the edits that revert each edit. <code> reverteds </code> lists the edits that the edit has reverted. | ||
# assign which edit reverted what edit | # assign which edit reverted what edit | ||
reverteds_df = df.filter(~ df.reverteds.isNull()).select(['revid','reverteds']) | reverteds_df = df.filter(~ df.reverteds.isNull()).select(['revid','reverteds']) | ||
This line creates a new Spark Dataframe out of the rows of the first dataframe that have a value for <code>reverteds</code> with the columns <code>revid</code> and <code>reverteds</code>. | This line creates a new Spark Dataframe out of the rows of the first dataframe that have a value for <code> reverteds </code> with the columns <code> revid </code> and <code> reverteds </code>. | ||
reverteds_df = reverteds_df.select("*", f.split(reverteds_df.reverteds,',').alias("reverteds_new")) | reverteds_df = reverteds_df.select("*", f.split(reverteds_df.reverteds,',').alias("reverteds_new")) | ||
The above line converts <code>reverteds</code> from a string to an array. | The above line converts <code> reverteds </code> from a string to an array. | ||
reverteds_df = reverteds_df.drop("reverteds") | reverteds_df = reverteds_df.drop("reverteds") | ||
reverteds_df = reverteds_df.withColumnRenamed("reverteds_new", "reverteds") | reverteds_df = reverteds_df.withColumnRenamed("reverteds_new", "reverteds") | ||
The above two lines remove the old "reverteds" column, which was a string, and replaces it with the array column. This is required because unlike pandas, Spark dataframes do not have a column assignment syntax. | The above two lines remove the old "reverteds" column, which was a string, and replaces it with the array column. This is required because unlike pandas, Spark dataframes do not have a column assignment syntax. | ||
reverteds_df = reverteds_df.select(reverteds_df.revid.alias('reverted_by'), | reverteds_df = reverteds_df.select(reverteds_df.revid.alias('reverted_by'), | ||
f.explode(reverteds_df.reverteds).alias('reverted_id')) | f.explode(reverteds_df.reverteds).alias('reverted_id')) | ||
The most important part of the above is the function call to <code>explode</code>. | The most important part of the above is the function call to <code> explode </code>. Explode unfolds the array so that we get one line for each element of the array. Now we can join <code> reverteds_df </code> with <code> df </code> to put the <code> reverted_by </code> column in <code> df </code>. | ||
df = df.join(reverteds_df, df.revid == reverteds_df.reverted_id, how='left_outer') | df = df.join(reverteds_df, df.revid == reverteds_df.reverted_id, how='left_outer') | ||
df.drop("reverted_id") | df.drop("reverted_id") | ||
del(reverteds_df) | del(reverteds_df) | ||
Join the two tables so that each revision that was reverted gets a value for <code>reverted_by</code> | Join the two tables so that each revision that was reverted gets a value for <code> reverted_by </code>. The join is a <code> left_outer </code> join so we keep all the rows of <code> df </code> even the rows that don't have a value for <code> reverted_id </code> in <code> reverteds_df </code>. We remove the redundent <code> reverted_id </code> column and are don with building <code> reverted_by </code>. Next we add a column that counts the number of times a given editor has made a revert (this is called a ''cumulative count''). Since we aren't going to use <code> reverteds_df </code> again we can call <code> del(reveteds_df) </code>. This tells spark it is free to remove the object from storage and can improve performance. | ||
# sort by datetime | # sort by datetime | ||
df = df.orderBy(df.date_time.asc()) | df = df.orderBy(df.date_time.asc()) | ||
<code>orderBy</code> sorts the dataframe by date. | <code> orderBy </code> sorts the dataframe by date. | ||
win = Window.orderBy('date_time').partitionBy('editor_id_or_ip') | win = Window.orderBy('date_time').partitionBy('editor_id_or_ip') | ||
The above defines a <code>WindowSpec</code>, which is a kind of object that can be used to define rolling aggregations. We are going to use the <code>rank</code> function to perform the cumulative count, and <code>rank</code> requires a <code>WindowSpec</code>. The WindowSpec that we made says that we are grouping at the level of <code>editor_id_or_ip</code> and that we want to operate on each row of each group in chronological order. | The above defines a <code>WindowSpec</code>, which is a kind of object that can be used to define rolling aggregations. We are going to use the <code> rank </code> function to perform the cumulative count, and <code> rank </code> requires a <code>WindowSpec</code>. The WindowSpec that we made says that we are grouping at the level of <code> editor_id_or_ip </code> and that we want to operate on each row of each group in chronological order. | ||
# count reverts | # count reverts | ||
reverts_df = df.filter(df.revert==True).select(['revid',' | reverts_df = df.filter(df.revert==True).select(['revid','editor_id','date_time','revert']) | ||
The above creates a new table that only has reverts. | The above creates a new table that only has reverts. | ||
reverts_df = reverts_df.withColumn('editor_nth_revert',f.rank().over(win)) | reverts_df = reverts_df.withColumn('editor_nth_revert',f.rank().over(win)) | ||
This applies the <code>rank</code> function over the window to perform the cumulative count of the reverts. The <code>withColumn</code> function adds a new column to the dataframe called <code>editor_nth_revert</code>. | This applies the <code> rank </code> function over the window to perform the cumulative count of the reverts. The <code> withColumn </code> function adds a new column to the dataframe called <code> editor_nth_revert </code>. | ||
df = df.join(reverts_df, ["revid",'editor_id','date_time','revert'], how='left_outer') | |||
df = df.join(reverts_df, ["revid",' | |||
del(reverts_df) | del(reverts_df) | ||
Above we perform the join to add the new column to <code>df</code>. We join on all of the columns <code> ["revid",' | Above we perform the join to add the new column to <code> df </code>. We join on all of the columns <code> ["revid",'editor_id','date_time','revert'] </code> so that duplicate columns are not created in the <code> df </code> | ||
# count edits | # count edits | ||
df = df.withColumn('year', f.year(df.date_time)) | df = df.withColumn('year', f.year(df.date_time)) | ||
df = df.withColumn('month',f.month(df.date_time)) | df = df.withColumn('month',f.month(df.date_time)) | ||
Using <code>withColumn</code> again to illustrate creating some calendar variables from the <code>date_time</code>. | Using <code> withColumn </code> again to illustrate creating some calendar variables from the <code> date_time </code>. | ||
df = df.withColumn('editor_nth_edit',f.rank().over(win)) | df = df.withColumn('editor_nth_edit',f.rank().over(win)) | ||
We can reuse the <code>WindowSpec</code> to get the cumulative count for all edits as opposed to all reverts. | We can reuse the <code> WindowSpec </code> to get the cumulative count for all edits as opposed to all reverts. | ||
# output | # output | ||
if not path.exists(args.output_dir): | if not path.exists(args.output_dir): | ||
Line 260: | Line 218: | ||
if args.output_format == "csv" or args.output_format == "tsv": | if args.output_format == "csv" or args.output_format == "tsv": | ||
df.write.csv(args.output_dir, sep='\t', mode='overwrite',header=True,timestampFormat="yyyy-MM-dd HH:mm:ss") | df.write.csv(args.output_dir, sep='\t', mode='overwrite',header=True,timestampFormat="yyyy-MM-dd HH:mm:ss") | ||
Instead of writing our output to a single file, we output to a directory. Spark will write 1 file for each partition to the directory. | Instead of writing our output to a single file, we output to a directory. Spark will write 1 file for each partition to the directory. | ||
# format == "parquet" | # format == "parquet" | ||
else: | else: | ||
df.write.parquet(args.output_dir, mode='overwrite') | df.write.parquet(args.output_dir, mode='overwrite') | ||
It is also easy to write to parquet. | It is also easy to write to parquet. | ||
Line 277: | Line 232: | ||
If you have <code> /com/local/bin </code> in your <code> $PATH </code> then you should be able to run: | If you have <code> /com/local/bin </code> in your <code> $PATH </code> then you should be able to run: | ||
get_spark_nodes.sh 4 | |||
To checkout 4 nodes that can be used as a Spark cluster. The spark cluster will have 4 worker nodes, one of these is also the "master" node. When you run <code> get_spark_nodes.sh </code> you will be routed to the machine that will become the master. If you only want 2 nodes just do | To checkout 4 nodes that can be used as a Spark cluster. The spark cluster will have 4 worker nodes, one of these is also the "master" node. When you run <code> get_spark_nodes.sh </code> you will be routed to the machine that will become the master. If you only want 2 nodes just do | ||
get_spark_nodes.sh 2 | get_spark_nodes.sh 2 | ||
After you get the nodes and have a shell on the master node run | After you get the nodes and have a shell on the master node run | ||
start_spark_cluster.sh | |||
This will setup the cluster. Take note of the node that is assigned to be the master. Call this $SPARK_MASTER. The program <code> spark-submit </code> submits your script to the running Spark cluster. | |||
This will setup the cluster. | |||
spark-submit --master spark://$SPARK_MASTER:18899 your_script.py [Arguments to your script here]. | spark-submit --master spark://$SPARK_MASTER:18899 your_script.py [Arguments to your script here]. | ||
For example, we can submit the script we used in the walkthrough as: | For example, we can submit the script we used in the walkthrough as: | ||
spark-submit --master spark://$SPARK_MASTER:18899 wikiq_users_spark.py --output-format tsv -i "/com/output/wikiq-enwiki-20180301/enwiki-20180301-pages-meta-history*.tsv" -o "/com/output/wikiq-users-enwiki-20180301-tsv/" --num-partitions 500 | spark-submit --master spark://$SPARK_MASTER:18899 wikiq_users_spark.py --output-format tsv -i "/com/output/wikiq-enwiki-20180301/enwiki-20180301-pages-meta-history*.tsv" -o "/com/output/wikiq-users-enwiki-20180301-tsv/" --num-partitions 500 | ||
When you have a spark cluster running, it will serve some nice monitoring tools on ports 8989 and 4040 of the master. You can build an ssh tunnel between your laptop and these nodes to monitor the progress of your spark jobs. | When you have a spark cluster running, it will serve some nice monitoring tools on ports 8989 and 4040 of the master. You can build an ssh tunnel between your laptop and these nodes to monitor the progress of your spark jobs. | ||
=== Monitoring the cluster === | === Monitoring the cluster === | ||
From a login node ( | From a login node (itk): | ||
ssh -L localhost:8989:localhost:8989 $SPARK_MASTER -N -f && ssh -L localhost:4040:localhost:4040 $SPARK_MASTER -n -F | ssh -L localhost:8989:localhost:8989 $SPARK_MASTER -N -f && ssh -L localhost:4040:localhost:4040 $SPARK_MASTER -n -F | ||
From your laptop: | From your laptop: | ||
ssh -L localhost:8989:localhost:8989 itk -N -f && ssh -L localhost:4040:localhost:4040 itk -n -F | |||
Point your browser to localhost:8989 to see the cluster status and to localhost:4040 to monitor jobs. | |||
== Setting up Spark on your laptop == | == Setting up Spark on your laptop == | ||
Line 341: | Line 273: | ||
# Edit your environment variables (i.e. in your .bashrc) to | # Edit your environment variables (i.e. in your .bashrc) to | ||
JAVA_HOME=/home/you/Oracle_JDK/ | JAVA_HOME=/home/you/Oracle_JDK/ | ||
PATH=$JAVA_HOME/bin:$PATH | PATH=$JAVA_HOME/bin:$PATH | ||
Line 357: | Line 287: | ||
You should see this: | You should see this: | ||
Welcome to | Welcome to | ||
____ __ | ____ __ | ||
Line 364: | Line 294: | ||
/__ / .__/\_,_/_/ /_/\_\ version 2.3.1 | /__ / .__/\_,_/_/ /_/\_\ version 2.3.1 | ||
/_/ | /_/ | ||
Line 370: | Line 299: | ||
Add to your .bashrc: | Add to your .bashrc: | ||
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH | export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH | ||