Editing CommunityData:Hyak Datasets

From CommunityData

Warning: You are not logged in. Your IP address will be publicly visible if you make any edits. If you log in or create an account, your edits will be attributed to your username, along with other benefits.

The edit can be undone. Please check the comparison below to verify that this is what you want to do, and then publish the changes below to finish undoing the edit.

Latest revision Your text
Line 56: Line 56:
One option is to just use [[CommunityData:Hyak_Spark|Spark]] which is likely a good option if you want to do large and complex joins or group-bys.  Downsides of Spark include issues of stability and complexity.  Spark is capable, can be fast, and can scale to many nodes, but it can also crash and be complex to program.
One option is to just use [[CommunityData:Hyak_Spark|Spark]] which is likely a good option if you want to do large and complex joins or group-bys.  Downsides of Spark include issues of stability and complexity.  Spark is capable, can be fast, and can scale to many nodes, but it can also crash and be complex to program.


An alternative is to stream data from parquet using pyarrow. Pyarrow can load a large dataset one chunk at a time and you can turn these chunks into stream of rows.  The stream of rows will have the same order as the data on disk. In the example below the datasets are partitoned by author and the partitions are sorted so edits can be read one author at a time.  This is convenient as a starting point for building author-level variables.  
An alternative is to stream data from parquet using pyarrow. Pyarrow can load a large dataset one chunk at a time and you can turn these chunks into stream of rows.  The stream of rows *will not* have the same order as the data on disk.


<syntaxhighlight lang='python'>
<syntaxhighlight lang='python'>
import pyarrow.dataset as ds
import pyarrow.dataset as ds
from itertools import groupby
from itertools import chain, groupby, islice


# A pyarrow dataset abstracts reading, writing, or filtering a parquet file. It does not read data into memory.  
# A pyarrow dataset abstracts reading, writing, or filtering a parquet file. It does not read dataa into memory.  
#dataset = ds.dataset(pathlib.Path('/gscratch/comdata/output/reddit_submissions_by_author.parquet/'), format='parquet', partitioning='hive')
#dataset = ds.dataset(pathlib.Path('/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet/'), format='parquet', partitioning='hive')
dataset = ds.dataset('/gscratch/comdata/output/reddit_submissions_by_author.parquet', format='parquet')
dataset = ds.dataset('/gscratch/comdata/output/reddit_submissions_by_author.parquet', format='parquet', partitioning='hive')


# let's get all the comments to two subreddits:
# let's get all the comments to two subreddits:
subreddits_to_pull = ['seattlewa','seattle']
subreddits_to_pull = ['seattlewa','seattle']


# instead of loading the data into a pandas dataframe all at once we can stream it.
# instead of loading the data into a pandas dataframe all at once we can stream it. This lets us start working with it while it is read.
scan_tasks = dataset.scan(filter = ds.field('subreddit').isin(subreddits_to_pull), columns=['id','subreddit','CreatedAt','author','ups','downs','score','subreddit_id','stickied','title','url','is_self','selftext'])
scan_tasks = dataset.scan(filter = ds.field('subreddit').isin(subreddits_to_pull), columns=['id','subreddit','CreatedAt','author','ups','downs','score','subreddit_id','stickied','title','url','is_self','selftext'])


# simple function to execute scantasks and generate rows
# simple function to execute scantasks and create a stream of pydict rows  
def iterate_rows(scan_tasks):
def execute_scan_task(st):
     for st in scan_tasks:
     # an executed scan task yields an iterator of record_batches
        for rb in st.execute():
    def unroll_record_batch(rb):
            df = rb.to_pandas()
        df = rb.to_pandas()
            for t in df.itertuples():
        return df.itertuples()
                yield t


row_iter = iterate_rows(scan_tasks)
    for rb in st.execute():
        yield unroll_record_batch(rb)
 
 
# now we just need to flatten and we have our iterator
row_iter = chain.from_iterable(chain.from_iterable(map(lambda st: execute_scan_task(st), scan_tasks)))


# now we can use python's groupby function to read one author at a time
# now we can use python's groupby function to read one author at a time
# note that the same author can appear more than once since the record batches may not be in the correct order.
# note that the same author can appear more than once since the record batches may not be in the correct order.
author_submissions = groupby(row_iter, lambda row: row.author)
author_submissions = groupby(row_iter, lambda row: row.author)
count_dict = {}
for auth, posts in author_submissions:
for auth, posts in author_submissions:
     if auth in count_dict:
     print(f"{auth} has {len(list(posts))} posts")
        count_dict[auth] = count_dict[auth] + 1
    else:
        count_dict[auth] = 1
 
# since it's partitioned and sorted by author, we get one group for each author
any([ v != 1 for k,v in count_dict.items()])
</syntaxhighlight>
</syntaxhighlight>


Please note that all contributions to CommunityData are considered to be released under the Attribution-Share Alike 3.0 Unported (see CommunityData:Copyrights for details). If you do not want your writing to be edited mercilessly and redistributed at will, then do not submit it here.
You are also promising us that you wrote this yourself, or copied it from a public domain or similar free resource. Do not submit copyrighted work without permission!

To protect the wiki against automated edit spam, we kindly ask you to solve the following CAPTCHA:

Cancel Editing help (opens in new window)