Editing CommunityData:Hyak Datasets
From CommunityData
Warning: You are not logged in. Your IP address will be publicly visible if you make any edits. If you log in or create an account, your edits will be attributed to your username, along with other benefits.
The edit can be undone. Please check the comparison below to verify that this is what you want to do, and then publish the changes below to finish undoing the edit.
Latest revision | Your text | ||
Line 7: | Line 7: | ||
We maintain an archive of Reddit submissions and comments going back to Reddit's early history that is up-do-date with January 2019 (for comments) and August 2019 (for submissions). We have copies of dumps collected and published by [https://Pushshift.io pushshift] and tabular datasets derived from them. Compared to obtaining data from the Reddit (or pushshift) APIs, working with these archival datasets will be faster and less work for you. The tabular datasets in particular are quite fast thanks to the parquet file format making it possible to pull subsets of the data (e.g. complete history of a subreddit) in as little as 15 minutes. In contrast it takes about a day to extract and parse the dumps on a mox node. | We maintain an archive of Reddit submissions and comments going back to Reddit's early history that is up-do-date with January 2019 (for comments) and August 2019 (for submissions). We have copies of dumps collected and published by [https://Pushshift.io pushshift] and tabular datasets derived from them. Compared to obtaining data from the Reddit (or pushshift) APIs, working with these archival datasets will be faster and less work for you. The tabular datasets in particular are quite fast thanks to the parquet file format making it possible to pull subsets of the data (e.g. complete history of a subreddit) in as little as 15 minutes. In contrast it takes about a day to extract and parse the dumps on a mox node. | ||
Code for this project is located in the | Code for this project is located in the (currently private) cdsc_reddit [[CommunityData:git|git repository]] on code.communitydata.science. | ||
For computational efficiency it is best to parse the dumps as little as possible. So if it is possible for you to work with the tabular datasets, please do so. The tabular datasets currently have the variables that most projects will want to use, but there are many other metadata variables including ones related to moderation, media, Reddit gold and more. If you want a variable from the pushshift json that isn't in parquet tables, don't fret! It will not be too much work to add it. Reach out to [[User:Groceryheist|Nate]]. | For computational efficiency it is best to parse the dumps as little as possible. So if it is possible for you to work with the tabular datasets, please do so. The tabular datasets currently have the variables that most projects will want to use, but there are many other metadata variables including ones related to moderation, media, Reddit gold and more. If you want a variable from the pushshift json that isn't in parquet tables, don't fret! It will not be too much work to add it. Reach out to [[User:Groceryheist|Nate]]. | ||
Line 23: | Line 23: | ||
</code> | </code> | ||
"<code>by_author</code>" and "<code>by_subreddit</code>" refer to how the data is | "<code>by_author</code>" and "<code>by_subreddit</code>" refer to how the data is sorted. Sorting the data makes filtering by the sorted column fast and determines the order that the data will be read. The <code>by_author</code> datasets are sorted by <code>author</code> and then by <code>CreatedAt</code>. The <code>by_subreddit</code> datasets are sorted by <code>subreddit</code> and then by <code>author</code>. Sorting by author makes it possible to stream the dataset one user at a time to build user-level variables without resorting to Spark. | ||
=== Reading Reddit parquet datasets === | === Reading Reddit parquet datasets === | ||
Line 35: | Line 35: | ||
# A pyarrow dataset abstracts reading, writing, or filtering a parquet file. It does not read dataa into memory. | # A pyarrow dataset abstracts reading, writing, or filtering a parquet file. It does not read dataa into memory. | ||
#dataset = ds.dataset(pathlib.Path('/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet/'), format='parquet', partitioning='hive') | |||
dataset = ds.dataset('/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet/', format='parquet') | dataset = ds.dataset('/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet/', format='parquet', partitioning='hive') | ||
# let's get all the comments to two subreddits: | # let's get all the comments to two subreddits: | ||
Line 46: | Line 46: | ||
# Since data from just these 2 subreddits fits in memory we can just turn our table into a pandas dataframe. | # Since data from just these 2 subreddits fits in memory we can just turn our table into a pandas dataframe. | ||
df = table.to_pandas() | df = table.to_pandas() | ||
# We should save this smaller dataset so we don't have to wait 15 min to pull from parquet next time. | |||
df.to_csv("mydataset.csv") | |||
</syntaxhighlight> | </syntaxhighlight> | ||
Line 56: | Line 59: | ||
One option is to just use [[CommunityData:Hyak_Spark|Spark]] which is likely a good option if you want to do large and complex joins or group-bys. Downsides of Spark include issues of stability and complexity. Spark is capable, can be fast, and can scale to many nodes, but it can also crash and be complex to program. | One option is to just use [[CommunityData:Hyak_Spark|Spark]] which is likely a good option if you want to do large and complex joins or group-bys. Downsides of Spark include issues of stability and complexity. Spark is capable, can be fast, and can scale to many nodes, but it can also crash and be complex to program. | ||
An alternative is to stream data from parquet using pyarrow. Pyarrow can load a large dataset one chunk at a time and you can turn these chunks into stream of rows. The stream of rows will have the same order as the data on disk | An alternative is to stream data from parquet using pyarrow. Pyarrow can load a large dataset one chunk at a time and you can turn these chunks into stream of rows. The stream of rows *will not* have the same order as the data on disk. | ||
<syntaxhighlight | <syntaxhighlight language='python'> | ||
import pyarrow.dataset as ds | import pyarrow.dataset as ds | ||
from itertools import groupby | from itertools import chain, groupby, islice | ||
# A pyarrow dataset abstracts reading, writing, or filtering a parquet file. It does not read | # A pyarrow dataset abstracts reading, writing, or filtering a parquet file. It does not read dataa into memory. | ||
#dataset = ds.dataset(pathlib.Path('/gscratch/comdata/output/ | #dataset = ds.dataset(pathlib.Path('/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet/'), format='parquet', partitioning='hive') | ||
dataset = ds.dataset('/gscratch/comdata/output/reddit_submissions_by_author.parquet', format='parquet') | dataset = ds.dataset('/gscratch/comdata/output/reddit_submissions_by_author.parquet', format='parquet', partitioning='hive') | ||
# let's get all the comments to two subreddits: | # let's get all the comments to two subreddits: | ||
subreddits_to_pull = ['seattlewa','seattle'] | subreddits_to_pull = ['seattlewa','seattle'] | ||
# instead of loading the data into a pandas dataframe all at once we can stream it. | # instead of loading the data into a pandas dataframe all at once we can stream it. This lets us start working with it while it is read. | ||
scan_tasks = dataset.scan(filter = ds.field('subreddit').isin(subreddits_to_pull), columns=['id','subreddit','CreatedAt','author','ups','downs','score','subreddit_id','stickied','title','url','is_self','selftext']) | scan_tasks = dataset.scan(filter = ds.field('subreddit').isin(subreddits_to_pull), columns=['id','subreddit','CreatedAt','author','ups','downs','score','subreddit_id','stickied','title','url','is_self','selftext']) | ||
# simple function to execute scantasks and | # simple function to execute scantasks and create a stream of pydict rows | ||
def | def execute_scan_task(st): | ||
# an executed scan task yields an iterator of record_batches | |||
def unroll_record_batch(rb): | |||
df = rb.to_pandas() | |||
return df.itertuples() | |||
row_iter = | for rb in st.execute(): | ||
yield unroll_record_batch(rb) | |||
# now we just need to flatten and we have our iterator | |||
row_iter = chain.from_iterable(chain.from_iterable(map(lambda st: execute_scan_task(st), scan_tasks))) | |||
# now we can use python's groupby function to read one author at a time | # now we can use python's groupby function to read one author at a time | ||
# note that the same author can appear more than once since the record batches may not be in the correct order. | # note that the same author can appear more than once since the record batches may not be in the correct order. | ||
author_submissions = groupby(row_iter, lambda row: row.author) | author_submissions = groupby(row_iter, lambda row: row.author) | ||
for auth, posts in author_submissions: | for auth, posts in author_submissions: | ||
print(f"{auth} has {len(list(posts))} posts") | |||
</syntaxhighlight> | </syntaxhighlight> | ||