Editing CommunityData:Hyak Spark
From CommunityData
Warning: You are not logged in. Your IP address will be publicly visible if you make any edits. If you log in or create an account, your edits will be attributed to your username, along with other benefits.
The edit can be undone. Please check the comparison below to verify that this is what you want to do, and then publish the changes below to finish undoing the edit.
Latest revision | Your text | ||
Line 8: | Line 8: | ||
This page will help you decide if you should use Spark on Hyak for your problem and provide instructions on how to get started. | This page will help you decide if you should use Spark on Hyak for your problem and provide instructions on how to get started. | ||
== Pros and Cons of Spark == | == Pros and Cons of Spark == | ||
Line 26: | Line 24: | ||
# It takes several steps to get the cluster up and running. | # It takes several steps to get the cluster up and running. | ||
# The programming paradigm is not super intuitive, especially if you are not familiar with SQL databases or lazy evaluation. | # The programming paradigm is not super intuitive, especially if you are not familiar with SQL databases or lazy evaluation. | ||
# Effectively developing your spark code means getting it setup on your own laptop, which isn't trivial. | |||
# Doing more advanced things requires programming in Scala. | # Doing more advanced things requires programming in Scala. | ||
Line 34: | Line 33: | ||
If you are already set up on Hyak following the instructions on [CommunityData:Hyak] then you should already have a working spark installation on Hyak. Test this by running | If you are already set up on Hyak following the instructions on [CommunityData:Hyak] then you should already have a working spark installation on Hyak. Test this by running | ||
pyspark | pyspark | ||
You should see this: | You should see this: | ||
Line 43: | Line 40: | ||
/ __/__ ___ _____/ /__ | / __/__ ___ _____/ /__ | ||
_\ \/ _ \/ _ `/ __/ '_/ | _\ \/ _ \/ _ `/ __/ '_/ | ||
/__ / .__/\_,_/_/ /_/\_\ version 2. | /__ / .__/\_,_/_/ /_/\_\ version 2.3.1 | ||
/_/ | /_/ | ||
If so then you are ready to start running Spark programs on a single node. If you don't see this, then you need to check your <code>$SPARK_HOME</code>, <code>$PATH</code>, <code>$JAVA_HOME</code>, and <code>$PYTHONPATH</code> environment variables. | If so then you are ready to start running Spark programs on a single node. If you don't see this, then you need to check your <code>$SPARK_HOME</code>, <code>$PATH</code>, <code>$JAVA_HOME</code>, and <code>$PYTHONPATH</code> environment variables. | ||
You should have the following in your .bashrc: | |||
export JAVA_HOME='/com/local/java/' | export JAVA_HOME='/com/local/java/' | ||
export PATH="$JAVA_HOME/bin:$PATH" | export PATH="$JAVA_HOME/bin:$PATH" | ||
Line 57: | Line 53: | ||
export PYTHONPATH="$SPARK_HOME/python:"$PYTHONPATH | export PYTHONPATH="$SPARK_HOME/python:"$PYTHONPATH | ||
You can also run spark programs on many nodes, but this requires additional steps. These are described below. | |||
You can also run spark programs on many nodes, but this requires additional steps. These are described below. | |||
== Spark Walkthrough == | == Spark Walkthrough == | ||
Spark | Spark programs is somewhat different from normal python programming. This section will walk you through a script to help you learn how to work with Spark. You may find this script useful as a template for building variables on top of [[wikiq]] data. | ||
This section presents a pyspark program that | This section presents a pyspark program that | ||
Line 72: | Line 65: | ||
# Output tsvs with the new variables. | # Output tsvs with the new variables. | ||
The script is on | The script is on itk here: /com/users/nathante/mediawiki_dump_tools/wikiq_users/wikiq_users_spark.py | ||
#!/usr/bin/env python3 | #!/usr/bin/env python3 | ||
import sys | import sys | ||
Line 86: | Line 78: | ||
from os import mkdir | from os import mkdir | ||
from os import path | from os import path | ||
This part imports some python utilities that we will use. You can pretty safely treat the <code>SparkConf</code>, the <code>SparkSession</code> and the <code>SQLContext</code> imports as magic that creates a spark environment that supports working with Spark's SQL features. | This part imports some python utilities that we will use. You can pretty safely treat the <code> SparkConf </code>, the <code> SparkSession </code> and the <code> SQLContext </code> imports as magic that creates a spark environment that supports working with Spark's SQL features. | ||
<code>Window</code> is used to create Window functions. We will use a window function to count the nth edit made by each editor. <code>import pyspark.sql.functions as f</code> provides built in functions that can be applied to data in spark data frames. <code>types</code> are data types that we will use to specify the scheme for reading wikiq files. | <code> Window </code> is used to create Window functions. We will use a window function to count the nth edit made by each editor. <code> import pyspark.sql.functions as f </code> provides built in functions that can be applied to data in spark data frames. <code> types </code> are data types that we will use to specify the scheme for reading wikiq files. | ||
def parse_args(): | def parse_args(): | ||
parser = argparse.ArgumentParser(description='Create a dataset of edits by user.') | parser = argparse.ArgumentParser(description='Create a dataset of edits by user.') | ||
Line 100: | Line 90: | ||
args = parser.parse_args() | args = parser.parse_args() | ||
return(args) | return(args) | ||
Above is just a function to build a command line interface. | Above is just a function to build a command line interface. | ||
if __name__ == "__main__": | if __name__ == "__main__": | ||
conf = SparkConf().setAppName("Wiki Users Spark") | conf = SparkConf().setAppName("Wiki Users Spark") | ||
spark = SparkSession.builder.getOrCreate() | spark = SparkSession.builder.getOrCreate() | ||
Now we are in the main function of the script. The above two lines complete setting up spark. If you are going to run this program on a multi-node cluster, then it would be nice to set the AppName to something friendly. This will be used by the job monitoring tools. | Now we are in the main function of the script. The above two lines complete setting up spark. If you are going to run this program on a multi-node cluster, then it would be nice to set the AppName to something friendly. This will be used by the job monitoring tools. | ||
args = parse_args() | args = parse_args() | ||
files = glob.glob(args.input_file) | files = glob.glob(args.input_file) | ||
files = [path.abspath(p) for p in files] | files = [path.abspath(p) for p in files] | ||
Spark is designed to read and write lists of files. The <code>args.input_file</code> uses <code>glob</code> to accept wildcards. The above lines build a list of files from the argument. | Spark is designed to read and write lists of files. The <code> args.input_file </code> uses <code> glob </code> to accept wildcards. The above lines build a list of files from the argument. | ||
reader = spark.read | reader = spark.read | ||
This creates a reader object that can read files. We are starting to get down to business. Next we will specify the schema for the files that we will read in. This is important so that spark can run efficiently and operate on the correct data types. | This creates a reader object that can read files. We are starting to get down to business. Next we will specify the schema for the files that we will read in. This is important so that spark can run efficiently and operate on the correct data types. | ||
# build a schema | # build a schema | ||
struct = types.StructType().add("anon",types.StringType(),True) | struct = types.StructType().add("anon",types.StringType(),True) | ||
Line 142: | Line 125: | ||
struct = struct.add("text_chars", types.LongType(), True) | struct = struct.add("text_chars", types.LongType(), True) | ||
struct = struct.add("title",types.StringType(), True) | struct = struct.add("title",types.StringType(), True) | ||
This is a little bit tedious, but it is necessary for Spark to work effectively on tsv data. If you are reading binary format such as Parquet (which is recommended, and easy to create using Spark) then you can skip this. | This is a little bit tedious, but it is necessary for Spark to work effectively on tsv data. If you are reading binary format such as Parquet (which is recommended, and easy to create using Spark) then you can skip this. | ||
df = reader.csv(files, | df = reader.csv(files, | ||
sep='\t', | sep='\t', | ||
Line 153: | Line 134: | ||
mode="PERMISSIVE", | mode="PERMISSIVE", | ||
schema = struct) | schema = struct) | ||
This reads the data | This reads the data. | ||
df = df.repartition(args.num_partitions) | df = df.repartition(args.num_partitions) | ||
The first thing to do after reading the data is to <code>repartition</code> the data | The first thing to do after reading the data is to <code> repartition </code> the data. | ||
# replace na editor ids | # replace na editor ids | ||
df = df.select('*',f.coalesce(df['editor_id'],df['editor']).alias(' | df = df.select('*',f.coalesce(df['editor_id'],df['editor']).alias('new_editor_id')) | ||
df = df.drop("editor_id") | |||
df = df.withColumnRenamed('new_editor_id',"editor_id") | |||
# assign which edit reverted what edit | # assign which edit reverted what edit | ||
reverteds_df = df.filter(~ df.reverteds.isNull()).select(['revid','reverteds']) | reverteds_df = df.filter(~ df.reverteds.isNull()).select(['revid','reverteds']) | ||
reverteds_df = reverteds_df.select("*", f.split(reverteds_df.reverteds,',').alias("reverteds_new")) | reverteds_df = reverteds_df.select("*", f.split(reverteds_df.reverteds,',').alias("reverteds_new")) | ||
reverteds_df = reverteds_df.drop("reverteds") | reverteds_df = reverteds_df.drop("reverteds") | ||
reverteds_df = reverteds_df.withColumnRenamed("reverteds_new", "reverteds") | reverteds_df = reverteds_df.withColumnRenamed("reverteds_new", "reverteds") | ||
reverteds_df = reverteds_df.select(reverteds_df.revid.alias('reverted_by'), | reverteds_df = reverteds_df.select(reverteds_df.revid.alias('reverted_by'), | ||
f.explode(reverteds_df.reverteds).alias('reverted_id')) | f.explode(reverteds_df.reverteds).alias('reverted_id')) | ||
df = df.join(reverteds_df, df.revid == reverteds_df.reverted_id, how='left_outer') | df = df.join(reverteds_df, df.revid == reverteds_df.reverted_id, how='left_outer') | ||
df.drop("reverted_id") | df.drop("reverted_id") | ||
# sort by datetime | # sort by datetime | ||
df = df.orderBy(df.date_time.asc()) | df = df.orderBy(df.date_time.asc()) | ||
win = Window.orderBy('date_time').partitionBy('editor_id') | |||
# count reverts | # count reverts | ||
reverts_df = df.filter(df.revert==True).select(['revid',' | reverts_df = df.filter(df.revert==True).select(['revid','editor_id','date_time','revert']) | ||
reverts_df = reverts_df.withColumn('editor_nth_revert',f.rank().over(win)) | reverts_df = reverts_df.withColumn('editor_nth_revert',f.rank().over(win)) | ||
df = df.join(reverts_df, ["revid",'editor_id','date_time','revert'], how='left_outer') | |||
df = df.join(reverts_df, ["revid",' | |||
del(reverts_df) | del(reverts_df) | ||
# count edits | # count edits | ||
df = df.withColumn('year', f.year(df.date_time)) | df = df.withColumn('year', f.year(df.date_time)) | ||
df = df.withColumn('month',f.month(df.date_time)) | df = df.withColumn('month',f.month(df.date_time)) | ||
df = df.withColumn('editor_nth_edit',f.rank().over(win)) | df = df.withColumn('editor_nth_edit',f.rank().over(win)) | ||
# output | # output | ||
if not path.exists(args.output_dir): | if not path.exists(args.output_dir): | ||
Line 260: | Line 176: | ||
if args.output_format == "csv" or args.output_format == "tsv": | if args.output_format == "csv" or args.output_format == "tsv": | ||
df.write.csv(args.output_dir, sep='\t', mode='overwrite',header=True,timestampFormat="yyyy-MM-dd HH:mm:ss") | df.write.csv(args.output_dir, sep='\t', mode='overwrite',header=True,timestampFormat="yyyy-MM-dd HH:mm:ss") | ||
# format == "parquet" | # format == "parquet" | ||
else: | else: | ||
df.write.parquet(args.output_dir, mode='overwrite') | df.write.parquet(args.output_dir, mode='overwrite') | ||
# for writing to csv we need to urlencode | |||
</code> | |||
</ | |||
== Setting up Spark on your laptop == | == Setting up Spark on your laptop == | ||
Line 341: | Line 196: | ||
# Edit your environment variables (i.e. in your .bashrc) to | # Edit your environment variables (i.e. in your .bashrc) to | ||
JAVA_HOME=/home/you/Oracle_JDK/ | JAVA_HOME=/home/you/Oracle_JDK/ | ||
PATH=$JAVA_HOME/bin:$PATH | PATH=$JAVA_HOME/bin:$PATH | ||
Line 357: | Line 210: | ||
You should see this: | You should see this: | ||
Welcome to | Welcome to | ||
____ __ | ____ __ | ||
Line 364: | Line 217: | ||
/__ / .__/\_,_/_/ /_/\_\ version 2.3.1 | /__ / .__/\_,_/_/ /_/\_\ version 2.3.1 | ||
/_/ | /_/ | ||
Line 370: | Line 222: | ||
Add to your .bashrc: | Add to your .bashrc: | ||
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH | export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH | ||