Editing CommunityData:Hyak Spark

From CommunityData

Warning: You are not logged in. Your IP address will be publicly visible if you make any edits. If you log in or create an account, your edits will be attributed to your username, along with other benefits.

The edit can be undone. Please check the comparison below to verify that this is what you want to do, and then publish the changes below to finish undoing the edit.

Latest revision Your text
Line 88: Line 88:
</syntaxhighlight>
</syntaxhighlight>


This part imports some python utilities that we will use.  You can pretty safely treat the <code>SparkConf</code>, the <code>SparkSession</code> and the <code>SQLContext</code> imports as magic that creates a spark environment that supports working with Spark's SQL features.  
This part imports some python utilities that we will use.  You can pretty safely treat the <code> SparkConf </code>, the <code> SparkSession </code> and the <code> SQLContext </code> imports as magic that creates a spark environment that supports working with Spark's SQL features.  
<code>Window</code> is used to create Window functions. We will use a window function to count the nth edit made by each editor. <code>import pyspark.sql.functions as f</code> provides built in functions that can be applied to data in spark data frames. <code>types</code> are data types that we will use to specify the scheme for reading wikiq files.  
<code> Window </code> is used to create Window functions. We will use a window function to count the nth edit made by each editor. <code> import pyspark.sql.functions as f </code> provides built in functions that can be applied to data in spark data frames. <code> types </code> are data types that we will use to specify the scheme for reading wikiq files.  


<syntaxhighlight lang="python">
<syntaxhighlight lang="python">
Line 118: Line 118:
</syntaxhighlight>
</syntaxhighlight>
   
   
Spark is designed to read and write lists of files. The <code>args.input_file</code> uses <code>glob</code> to accept wildcards. The above lines build a list of files from the argument.  
Spark is designed to read and write lists of files. The <code> args.input_file </code> uses <code> glob </code> to accept wildcards. The above lines build a list of files from the argument.  


<syntaxhighlight lang="python">
<syntaxhighlight lang="python">
Line 155: Line 155:
</syntaxhighlight>
</syntaxhighlight>


This reads the data into a Spark Dataframe. Spark Dataframes are more like SQL tables than they are like pandas DataFrames.  Spark Dataframes are pretty abstract and can live on memory or on disk. Operations on Spark Dataframes are lazily evaluated, Spark will not actually run computations on your data until it has to. Calling <code>df.show()</code> will print the dataframe and trigger execution. <code>mode="PERMISSIVE"</code> stops Spark from giving up if it hits malformed rows.  
This reads the data into a Spark Dataframe. Spark Dataframes are more like SQL tables than they are like pandas DataFrames.  Spark Dataframes are pretty abstract and can live on memory or on disk. Operations on Spark Dataframes are lazily evaluated, Spark will not actually run computations on your data until it has to. Calling <code> df.show() </code> will print the dataframe and trigger execution. <code> mode="PERMISSIVE" </code> stops Spark from giving up if it hits malformed rows.  


<syntaxhighlight lang="python">
<syntaxhighlight lang="python">
Line 161: Line 161:
</syntaxhighlight>
</syntaxhighlight>


The first thing to do after reading the data is to <code>repartition</code> the data. This determines the number of files that spark will output. Choosing the right number of partitions isn't really an exact science. Having more partitions makes some operations more efficient and can make other operations slower. The rule of thumb is that the number of partitions increases linearly with the amount of data. 500 partitions seems pretty good for English Wikipedia. If you are interested [https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-partitions.html this page] is good. Now we are ready to build some variables. The first thing we are going to do is to create a new column <code>editor_id_or_ip</code>.
The first thing to do after reading the data is to <code> repartition </code> the data. This determines the number of files that spark will output. Choosing the right number of partitions isn't really an exact science. Having more partitions makes some operations more efficient and can make other operations slower. The rule of thumb is that the number of partitions increases linearly with the amount of data. 500 partitions seems pretty good for English Wikipedia. If you are interested [https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-partitions.html this page] is good. Now we are ready to build some variables. The first thing we are going to do is to create a new column <code> editor_id_or_ip </code>.


<syntaxhighlight lang="python">
<syntaxhighlight lang="python">
Line 168: Line 168:
</syntaxhighlight>
</syntaxhighlight>


The first argument to select <code>'*'</code> causes select to return all the columns.  Next we call the <code>coalesce</code> function which creates a new column with the value of <code>editor_id</code> if <code>editor_id</code> is not null and the value of <code>editor</code> if <code>editor_id</code> is null. The call to <code>alias</code> gives the new column a name.  If you are familiar with SQL programming, this might seem familiar. You could write it as <code>SELECT *, COALESCE(editor_id, editor) AS editor_id_or_ip</code>.  
The first argument to select <code> '*' </code> causes select to return all the columns.  Next we call the <code> coalesce </code> function which creates a new column with the value of <code> editor_id </code> if <code> editor_id </code> is not null and the value of <code> editor </code> if <code> editor_id </code> is null. The call to <code> alias </code> gives the new column a name.  If you are familiar with SQL programming, this might seem familiar. You could write it as <code> SELECT *, COALESCE(editor_id, editor) AS editor_id_or_ip </code>.  


Next we are going to identify the edits that revert each edit.  <code>reverteds</code> lists the edits that the edit has reverted.
Next we are going to identify the edits that revert each edit.  <code> reverteds </code> lists the edits that the edit has reverted.


<syntaxhighlight lang="python">
<syntaxhighlight lang="python">
Line 177: Line 177:
</syntaxhighlight>
</syntaxhighlight>


This line creates a new Spark Dataframe out of the rows of the first dataframe that have a value for <code>reverteds</code> with the columns <code>revid</code> and <code>reverteds</code>.  
This line creates a new Spark Dataframe out of the rows of the first dataframe that have a value for <code> reverteds </code> with the columns <code> revid </code> and <code> reverteds </code>.  


<syntaxhighlight lang="python">
<syntaxhighlight lang="python">
Line 183: Line 183:
</syntaxhighlight>
</syntaxhighlight>


The above line converts <code>reverteds</code> from a string to an array.  
The above line converts <code> reverteds </code> from a string to an array.  


<syntaxhighlight lang="python">
<syntaxhighlight lang="python">
Line 197: Line 197:
</syntaxhighlight>
</syntaxhighlight>


The most important part of the above is the function call to <code>explode</code>. Explode💥 unfolds the array so that we get one line for each element of the array. Now we can join <code>reverteds_df</code> with <code>df</code> to put the <code>reverted_by</code> column in <code>df</code>.
The most important part of the above is the function call to <code> explode </code>. Explode💥 unfolds the array so that we get one line for each element of the array. Now we can join <code> reverteds_df </code> with <code> df </code> to put the <code> reverted_by </code> column in <code> df </code>.


<syntaxhighlight lang="python">
<syntaxhighlight lang="python">
Line 205: Line 205:
</syntaxhighlight>
</syntaxhighlight>


Join the two tables so that each revision that was reverted gets a value for <code>reverted_by</code>. There are many kinds of joins and there is some detail on this in the [[#Join help|Join help section of this page]]. The join is a <code>left_outer</code> join so we keep all the rows of <code>df</code> even the rows that don't have a value for <code>reverted_id</code> in <code>reverteds_df</code>. We remove the redundent <code>reverted_id</code> column and are don with building <code>reverted_by</code>. Next we add a column that counts the number of times a given editor has made a revert (this is called a ''cumulative count''). Since we aren't going to use <code>reverteds_df</code> again we can call <code>del(reveteds_df)</code>. This tells spark it is free to remove the object from storage and can improve performance.  
Join the two tables so that each revision that was reverted gets a value for <code> reverted_by </code>. There are many kinds of joins and there is some detail on this in the [[#Join help|Join help section of this page]]. The join is a <code> left_outer </code> join so we keep all the rows of <code> df </code> even the rows that don't have a value for <code> reverted_id </code> in <code> reverteds_df </code>. We remove the redundent <code> reverted_id </code> column and are don with building <code> reverted_by </code>. Next we add a column that counts the number of times a given editor has made a revert (this is called a ''cumulative count''). Since we aren't going to use <code> reverteds_df </code> again we can call <code> del(reveteds_df) </code>. This tells spark it is free to remove the object from storage and can improve performance.  


<syntaxhighlight lang="python">
<syntaxhighlight lang="python">
Line 212: Line 212:
</syntaxhighlight>
</syntaxhighlight>


<code>orderBy</code> sorts the dataframe by date.   
<code> orderBy </code> sorts the dataframe by date.   


<syntaxhighlight lang="python">
<syntaxhighlight lang="python">
Line 218: Line 218:
</syntaxhighlight>
</syntaxhighlight>


The above defines a <code>WindowSpec</code>, which is a kind of object that can be used to define rolling aggregations. We are going to use the <code>rank</code> function to perform the cumulative count, and <code>rank</code> requires a <code>WindowSpec</code>. The WindowSpec that we made says that we are grouping at the level of <code>editor_id_or_ip</code> and that we want to operate on each row of each group in chronological order.  
The above defines a <code>WindowSpec</code>, which is a kind of object that can be used to define rolling aggregations. We are going to use the <code> rank </code> function to perform the cumulative count, and <code> rank </code> requires a <code>WindowSpec</code>. The WindowSpec that we made says that we are grouping at the level of <code> editor_id_or_ip </code> and that we want to operate on each row of each group in chronological order.  


<syntaxhighlight lang="python">
<syntaxhighlight lang="python">
Line 231: Line 231:
</syntaxhighlight>
</syntaxhighlight>


This applies the <code>rank</code> function over the window to perform the cumulative count of the reverts. The <code>withColumn</code> function adds a new column to the dataframe called <code>editor_nth_revert</code>.
This applies the <code> rank </code> function over the window to perform the cumulative count of the reverts. The <code> withColumn </code> function adds a new column to the dataframe called <code> editor_nth_revert </code>.


<syntaxhighlight lang="python">
<syntaxhighlight lang="python">
Line 238: Line 238:
</syntaxhighlight>
</syntaxhighlight>


Above we perform the join to add the new column to <code>df</code>. We join on all of the columns <code> ["revid",'editor_id_or_ip','date_time','revert']</code> so that duplicate columns are not created in the <code>df</code>
Above we perform the join to add the new column to <code> df </code>. We join on all of the columns <code> ["revid",'editor_id_or_ip','date_time','revert'] </code> so that duplicate columns are not created in the <code> df </code>


<syntaxhighlight lang="python">
<syntaxhighlight lang="python">
Line 246: Line 246:
</syntaxhighlight>
</syntaxhighlight>


Using <code>withColumn</code> again to illustrate creating some calendar variables from the <code>date_time</code>.  
Using <code> withColumn </code> again to illustrate creating some calendar variables from the <code> date_time </code>.  


<syntaxhighlight lang="python">
<syntaxhighlight lang="python">
Line 252: Line 252:
</syntaxhighlight>
</syntaxhighlight>


We can reuse the <code>WindowSpec</code> to get the cumulative count for all edits as opposed to all reverts.  
We can reuse the <code> WindowSpec </code> to get the cumulative count for all edits as opposed to all reverts.  


<syntaxhighlight lang="python">
<syntaxhighlight lang="python">
Please note that all contributions to CommunityData are considered to be released under the Attribution-Share Alike 3.0 Unported (see CommunityData:Copyrights for details). If you do not want your writing to be edited mercilessly and redistributed at will, then do not submit it here.
You are also promising us that you wrote this yourself, or copied it from a public domain or similar free resource. Do not submit copyrighted work without permission!

To protect the wiki against automated edit spam, we kindly ask you to solve the following CAPTCHA:

Cancel Editing help (opens in new window)