CommunityData:Hyak: Difference between revisions

From CommunityData
Line 131: Line 131:


More information on parallelizing your R code can be found in the [https://stat.ethz.ch/R-manual/R-devel/library/parallel/doc/parallel.pdf <code>parallel</code> package documentation].
More information on parallelizing your R code can be found in the [https://stat.ethz.ch/R-manual/R-devel/library/parallel/doc/parallel.pdf <code>parallel</code> package documentation].
<!-- The hyak machines have 16 cpu cores.  The Mox machines will have 28! Running your program on all the cores can speed things up a lot! We make heavy use of R for building datasets and for fitting models. Like most programming languages, R uses only one cpu by default. However, for typical computation-heavy data science tasks it is pretty easy to make R use all the cores.
For fitting models, the R installed in Gentoo should use all cores automatically. This is thanks to OpenBlas, which is a numerical library that implements and parallelizes linear algebra routines like matrix factorization, matrix inversion, and other operations that bottleneck model fitting.
However, for building datasets, you need to do a little extra work. One common strategy is to break up the data into independent chunks (for example, when building wikia datasets there is one input file for each wiki) and then use <code>mcapply</code> from <code>library(parallel)</code> to build variables from each chunk. Here is an example:
    library(parallel)
    options(mc.cores=detectCores())  ## tell R to use all the cores
   
    mcaffinity(1:detectCores()) ## required and explained below
 
    library(data.table) ## for rbindlist, which concatenates a list of data.tables into a single data.table
   
    ## imagine defining a list of wikis to analyze
    ## and a function to build variables for each wiki
    source("wikilist_and_buildvars")
   
    dataset <- rbindlist(mclapply(wikilist,buildvars))
   
    mcaffinity(rep(1,detectCores())) ## return processor affinities to the status preferred by OpenBlas
A working example can be found in the [[Message Walls]] git repository.
<code>mcaffinity(1:detectCores())</code> is required for the gentoo R <code>library(parallel)</code> to use multiple cores. The reason is technical and has to do with OpenBlas. Essentially, OpenBlas changes settings that govern how R assigns processes to cores. OpenBlas wants all processes assigned to the same core, so that the other cores do not interfere with it's fancy multicore linear algebra. However, when building datasets, the linear algebra is not typically the bottleneck. The bottleneck is instead operations like sorting and merging that OpenBlas does not parallelize.
The important thing to know is that if you want to use mclapply, you need to do <code>mcaffinity(1:detectCores())</code>. If you want to then fit models you should do <code>mcaffinity(rep(1,detectCores())</code> so that OpenBlas can do its magic. -->


=== Using the Checkpoint Queue ===
=== Using the Checkpoint Queue ===

Revision as of 00:06, 20 May 2020

To use Hyak, you must first have a UW NetID, access to Hyak, and a two factor authentication token which you will need as part of getting setup. The following links will be useful.

There are a number of other sources of documentation:


Setting up SSH

When you connect to SSH, it will ask you for a key from your token. Typing this in every time you start a connection be a pain. One approach is to create an .ssh config file that will create a "tunnel" the first time you connect and send all subsequent connections to Hyak over that tunnel. Some details in the Hyak documentation.

I've added the following config to the file ~/.ssh/config on my laptop (you will want to change the username):

 Host hyak mox2.hyak.uw.edu
     User <YOURNETID>
     HostName mox2.hyak.uw.edu
     ControlPath ~/.ssh/master-%r@%h:%p
     ControlMaster auto
     ControlPersist yes
     Compression yes

Note Note: If your SSH connection becomes stale or disconnected (e.g., if you change networks) it may take some time for the connection to time out. Until that happens, any connections you make to hyak will silently hang. If your connections to ssh hyak are silently hanging but your Internet connection seems good, look for ssh processes running on your local machine with:

ps ax|grep hyak

If you find any, kill them with kill <PROCESSID>. Once that is done, you should have no problem connecting to Hyak.

X11 forwarding

You may also want to add these two lines to your Hyak .ssh/config (indented under the line starting with "Host"):

ForwardX11 yes
ForwardX11Trusted yes

These lines will mean that if you have "checked out" an interactive machine, you can ssh from your computer to Hyak and then directly through an addition hop to the machine (like ssh n2347). Those ForwardX11 lines means if you graph things on this session, they will open on your local display.

Connecting to Hyak

To connect to Hyak, you now only need to do:

ssh hyak

It will prompt you for your UWNetID's password. Once you type in your password, you will have to respond to a 2-factor authentication request.

Setting Up Hyak environment

Everybody who uses Hyak as part of our group must add the following line to their ~/.bashrc file on Hyak:

source /gscratch/comdata/env/cdsc_mox_bashrc

This line will load scripts that will initialize a good data science environment and set the umask so that the files and directors you create are readable by others in the group. Please do this immediately before you do any other work on Hyak. When you are done, you can reload the shell by logging out and back into Hyak or by running exec bash.

Using the CDSC Hyak Environment

Storing Files

By default you have access to a home directory with a relatively small quota. There are several dozen terabytes of CDSC-allocated storage in /gscratch/comdata/ and you should explore that space. Typically we download large datasets to /gscratch/comdata/raw_data (see downloading new datasets below), processed data in /gscratch/comdata/output, and personal workspaces with the need for large data storage in /gscratch/comdata/users/<YOURNETID>.

Basic Commands

Once you have loaded load modern versions of R and Python and places Spark in your environment. It also provides a number of convenient commands for interacting with the SLURM HPC system for checking out nodes and monitoring jobs. Particularly important commands include

 any_machine

which attempts to check out a supercomputing node.

 big_machine

Requests a node with 240GB of memory.

 build_machine

Checks out a build node which can access the internet and is intended to be used to install software.

 ourjobs

Prints all the running jobs by people in the group.

 myjobs

Displays jobs by members of the group.

Read the files in /gscratch/comdata/env to see how these commands are created as well as other features not documented here.

Anaconda

We recently switched to using Anaconda to manage Python on Hyak. Anaconda comes with the `conda` tool for managing python packages and versions. Multiple python environments can co-exist in a single Anaconda installation, this allows different projects to use different versions of Python or python packages, which can be useful for maintaining projects that use old versions.

By default, our shared setup loads a conda environment called `minimal_ds` that provides recent versions of python packages commonly used in data science workflows. This is probably a good setup for most use-cases, and allows everyone to use the same packages, but it can be even better to create different environments for each project. See the anaconda documentation for how to create an environment.

To learn how to install Python packages, see the Python packages installation instructions on this wiki.

SSH into compute nodes

The hyak wiki has instructions for how to enable ssh within hyak. Reproduced below:

You should be able to ssh from the login node to a compute node without giving a password. If it does not work then do below steps:

  1. ssh-keygen then press enter for each question. This will ensure default options.
  2. cd ~/.ssh
  3. cat id_rsa.pub >> authorized_keys

Running Jobs on Hyak

When you first log in to Hyak, you will be on a "login node". These are nodes that have access to the Internet, and can be used to update code, move files around, etc. They should not be used for computationally intensive tasks. To actually run jobs, there are a few different options, described in detail in the Hyak User documentation. Following are basic instructions for some common use cases.

Interactive nodes

Interactive nodes are systems where you get a bash shell from which you can run your code. This mode of operation is conceptually similar to running your code on your own computer, the difference being that you have access to much more CPU and memory. To check out an interactive node, run the big_machine or any_machine command from your login shell. Before running these commands, you will want to be in a tmux or screen session so that you can start your job, and log off without having to worry about your job getting terminated.

Note Note: At a given point of time, unless you are using the ckpt (formerly the bf) queue, our entire group can collectiveley have one instance of big_machine and three instances of any_machine running at the same time. You may need to coordinate over IRC if you need to use a specific node for any reason.

Killing jobs on compute nodes

The Slurm scheduler provides a command called scancel to terminate jobs. For example, you might run queue_state from a login node to figure out the ID number for your job (let's say it's 12345), then run scancel --signal=TERM 12345 to send a SIGTERM signal or scancel --signal=KILL 12345 to send a SIGKILL signal that will bring job 12345 to an end.

Parallel R

The nodes on Hyak have 28 CPU cores. These may help in speeding up your analysis significantly. If you are using R functions such as lapply, there are parallelized equivalents (e.g. mclappy) which can take advantage of all the cores and give you a 2800% boost! However, something to be aware of here is your code's memory requirement—if you are running 28 processes in parallel, your memory needs can also go up to 28x, which may be more than the ~200GB that the big_machine node will have. In such cases, you may want to dial down the number of CPU cores being used—a way to do that globally in your code is to run the following snippet of code before calling any of the parallelized functions.

library(parallel)
options(mc.cores=20)  ## tell the mc* functions to use 20 cores unless otherwise specified
mcaffinity(1:20)

More information on parallelizing your R code can be found in the parallel package documentation.

Using the Checkpoint Queue

Hyak has a special way of scheduling jobs using the checkpoint queue. When you run jobs on the checkpoint queue, they run on someone else's hyak node that they aren't using right now. This is awesome as it gives us a huge amount of free (as in beer) computing. But using the checkpoint queue does take some effort, mainly because your jobs can get killed at any time if the owner of the node checks it out. So if you want to run a job for more than a few minutes on the checkpoint queue it will need to be able to "checkpoint" by saving it's state periodically and then restarting.

This would be a pain to do manually, fortunately, we have dmtcp which can automatically checkpoint and resume most programs.

Nate's working got dmtcp working for arbitrary scripts, and also with wikiq using parallel_sql.

dmtcp 3.0 is installed on Mox.


This will make more sense if you know that dmtcp works by starting a coordinator process which is responsible for pausing and saving the checkpointed process. A tutorial on dmtcp with slurm from USC has a bash function for starting the coordinator called start_dmtcp_coordinator. Nate added this function to the shared .bashrc. So it should be available in your environment on Mox.

Starting a checkpoint queue job

To start a checkpoint queue job we'll use sbatch instead of srun. See the documentation for a refresher starting hpc jobs using sbatch.

To request a job on the checkpoint queue put the following in the top of your sbatch script.

   #SBATCH --export=ALL
   #SBATCH --account=comdata-ckpt
   #SBATCH --partition=ckpt

You'll might have other stuff in your SBATCH script to request a certain number of cores or memory. Those will matter when we run wikiq below, but here they can be whatever they would be if you were running an sbatch job on one of our machines. The next thing you need to do specifically for a ckpt job is to run start_coordinator. This function takes care of making sure that we start a coordinator using the right set of ports and temporary files. We still need to pass in the interval that we want checkpoints. The bigger this interval the faster your job will run but the more work will be lost when it's interrupted.

   start_dmtcp_coordinator -i 600  #checkpoint every 10 minutes

Next you need to run your job in a special way so that it is managed by dmtcp and restarted if it gets interrupted.

   # The restart script is created by dmtcp_launch after initialization
   if [ -x dmtcp_restart_script.sh ]; then
       bash dmtcp_restart_script.sh
   else
       # On first pass, run program under DMTCP
       dmtcp_launch --rm $your_script.sh	# must run interpreter for scripts
   fi
 

This works because dmtcp_restart_script.sh is created when you launch your job using dmtcp_launch. If that script exists your job should run it instead of your job.

There are options that you can pass to dmtcp_launch that can be important. In particular --checkpoint-open-files and --allow-file-overwrite modify how IO is checkpointed.

Running wikiq with dmtcp and parallel_sql

To run wikiq with parallelsql the following need to be arranged:

  1. A shell script for each dumpfile that makes a workspace for dmtcp to keep it's data and restart script.
  2. These shell scripts loaded in parallel sql.
  3. A sbatch script that gets a checkpoint node and starts running jobs from parallel_sql.
  4. You need to restart jobs that get interrupted using parallel sql.

Nate made a python script that generates the scripts and makes a file with all the scripts. Notice that each dumpfile gets a script, it's own checkpoint directory, and a line in wikiq_parallel_jobs.sh

#!/usr/bin/env python3
from os import path
import os
import stat
import glob

archives = glob.glob("/gscratch/comdata/raw_data/wikia_dumps/2010-04-mako/*.xml.7z")

scripts_dir = '/gscratch/comdata/users/nathante/wikiq_parallel_scripts'
output_dir =  '/gscratch/comdata/users/nathante/wikiq_output'
checkpoint_dir = '/gscratch/comdata/users/nathante/wikiq_checkpoint'

if not path.isdir(scripts_dir):
    os.mkdir(scripts_dir)

if not path.isdir(output_dir):
    os.mkdir(output_dir)

script ="""#!/bin/bash
mkdir -p {0}
cd {0}
start_dmtcp_coordinator -i 60  #checkpoint every 20 minutes

if [ -x dmtcp_restart_script.sh ]; then
    bash dmtcp_restart_script.sh
else
    # On first pass, run program under DMTCP
    dmtcp_launch --rm {1}
fi
"""

with open("wikiq_parallel_jobs.sh",'w') as calls:
    for dumpfile in archives:
        wikiq_base_call = f"wikiq -u -o {output_dir} {dumpfile}"
        wikiq_call = wikiq_base_call
        wiki = path.split(dumpfile)[1]
        wikiq_script = script.format( path.join(checkpoint_dir,wiki), wikiq_call)

        script_file = path.join(scripts_dir, wiki + '.sh')
        with open(script_file,'w') as of:
            of.write(wikiq_script)
        
        os.chmod(script_file,os.stat(script_file).st_mode | stat.S_IEXEC)

        calls.write(script_file)
        calls.write('\n')

We also need an sbatch script as parallel_sql_job.sh.

#!/bin/bash
## parallel_sql_job.sh
#SBATCH --job-name=wikiq_dmtcp
## Allocation Definition
#SBATCH --account=comdata-ckpt
#SBATCH --partition=ckpt
## Resources
## Nodes. This should always be 1 for parallel-sql.
#SBATCH --nodes=1    
## Walltime (12 hours)
#SBATCH --time=12:00:00
## Memory per node
#SBATCH --mem=100G

module load parallel_sql

#Put here commands to load other modules (e.g. matlab etc.)
#Below command means that parallel_sql will get tasks from the database
#and run them on the node (in parallel). So a 16 core node will have
#16 tasks running at one time.
parallel-sql --sql -a parallel --exit-on-term

Next load the scripts into parallel_sql

 module load parallel_sql
 cat wikiq_parallel_jobs.sh | psu --load

We can now fire up a whole bunch of checkpoint nodes. The limit is technically 2000! But let's just ask for 10 nodes :)

  for job in $(seq 1 10); do sbatch parallel_sql_job.sh; done

If our jobs get interrupted we'll need to run psu --reset-slurm to set them back into avail state. We can run a little script running on a login node to do this automatically every minute or so.

#!/usr/bin/env python3
## auto_reset_psu.py
import time
import subprocess

running = subprocess.run(["psu", "--show-running"],  universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(running)
while hasattr(running, 'stdout') and len(running.stdout) > 0:
    subprocess.run(["psu","--reset-slurm"])
    time.sleep(60)
    running = subprocess.run(["psu", "--show-running"],  stdout=subprocess.PIPE)

That's it! Unleash the power of the checkpoint queue! Reach out to Nate if you try this and have problems or if you have any questions!