Lab 8: Proxy and MapReduce

These questions were written by Jerry Cain and Ryan Eberhardt.

Before the end of lab, be sure to fill out the lab checkoff sheet here!

Problem 1: proxy Thought Questions

Problem 2: Using MapReduce

Before starting, go ahead and clone the lab8 folder:

$ git clone /usr/class/cs110/repos/lab8/shared lab8
$ cd lab8
$ make directories

For this problem, you’re going to implement mapper and reducer executables that process CSV file chunks housing 20GB of COVID-19 test data. You’ll rely on these mapper and reducer executables to generate output files that indicate which regions had the largest spikes in COVID-19 cases. (Data for this problem came from global.health.)

Step 1: Getting familiar with MapReduce

To start off, spend the time needed to read through the Assignment 7 handout. In particular, I’d like you to clone your assign7 repo, read enough of the assignment handout to know what make and make directories do, and follow through all of the steps necessary to run partial and complete MapReduce jobs that tokenize the text of The Odyssey. You’ll want to learn how to invoke mr_soln (with mrm_soln and mrr_soln as worker executables), letting a supplied configuration file inform mr where the data files live, how many mappers should be spawned, how many reducers should be spawned, and so forth. Read up through Task 1 of the Assignment 7 handout.

Step 2: Designing a mapper and a reducer

In samples/covid-data/full/, we have a collection of input CSV files containing various data about global COVID-19 cases. Each line contains data about a COVID-19 case somewhere in the world. We have provided a load_csv_data function that assembles the data from these files into a list of Python dictionaries:

[{
  'demographics.ageRange.end': '20',
  'demographics.ageRange.start': '20',
  'demographics.ethnicity': '',
  'demographics.gender': 'Female',
  'demographics.nationalities': 'India',
  'demographics.occupation': '',
  'events.confirmed.date': '2020-01-30',
  'events.confirmed.value': '',
  'genomeSequences': '',
  'genomeSequences.repositoryUrl': '',
  'genomeSequences.sampleCollectionDate': '',
  'genomeSequences.sequenceId': '',
  'genomeSequences.sequenceLength': '',
  'genomeSequences.sequenceName': '',
  'location.administrativeAreaLevel1': 'Kerala',
  'location.administrativeAreaLevel2': 'Thrissur',
  'location.administrativeAreaLevel3': 'Thrissur',
  'location.country': 'India',
  'location.geoResolution': 'Admin3',
  'location.geometry.latitude': '10.52',
  'location.geometry.longitude': '76.21',
  'location.name': 'Thrissur, Kerala, India',
  'location.place': '',
  'notes': 'Travelled from Wuhan',
  'pathogens': '',
  'preexistingConditions.hasPreexistingConditions': '',
  'preexistingConditions.values': '',
  'revisionMetadata.creationMetadata.date': '2020-09-29T22:20:21.304Z',
  'revisionMetadata.creationMetadata.notes': '',
  'revisionMetadata.editMetadata.date': '',
  'revisionMetadata.editMetadata.notes': '',
  'revisionMetadata.revisionNumber': '0',
  'symptoms.status': '',
  'symptoms.values': '',
  'variantOfConcern': ''},
  { ... },
  { ... },
  ...]

Note that there are many fields, and many of them will be empty due to the nature of this dataset, which was aggregated from many different sources. We’re particularly interested in the location.name and events.confirmed.date fields.

We would like to process this dataset and figure out when each region had its worst spike in cases. The following output shows that New York experienced a 26x surge in cases in March 2020. Santa Clara was hit around the same time frame, but not as hard (11.5x increase over a 7-day period). Cook County (where Chicago, IL is) experienced a less rapid rise in cases, with only a 4x surge over 7 days. Meanwhile, many areas, such as São Paulo, didn’t see the worst surges until 2021.

NewYork,UnitedStates 2020-03-11 25.9375
SantaClaraCounty,California,UnitedStates 2020-03-10 11.5
CookCounty,Illinois,UnitedStates 2020-03-08 4.0
Louveira,SãoPaulo,Brazil 2021-03-25 104.0

Given this input data and desired output format, think about how you might design a mapper (preparation step) and reducer (assembly/consolidation step) to produce this output. How should the intermediate data (mapper output) be structured – what is the key of each line, and what other data would you want to include?

Step 3: Implementing the mapper

Open covid-stats-mapper.py and implement your concept for the mapper. The starter code loads case data from the specified input file, creating a list of dictionaries as explained above:

cases = load_csv_data(input_path)

Keep in mind that each line is keyed by the first token, so you’ll need to ensure there are no spaces in your key.

You can test your implementation by running the mapper on a single file:

🍉 ./covid-stats-mapper.py samples/covid-data/full/00000.input test-intermediate
🍉 cat test-intermediate

Once you’re confident in your implementation, you can try running the mapper as part of an actual MapReduce job, running on a subset of the input files:

🍉 ls samples/covid-data/partial
🍉 ./mr_soln --map-only --mapper ./mrm_soln --reducer ./mrr_soln --config covid-stats-partial.cfg
🍉 ls files/intermediate/

Our MapReduce implementation runs ./covid-stats-mapper.py on each of the four input files (in parallel, on 4 different myth machines), then splits the lines of the mapper’s output into eight buckets (four mappers * two reducers). The resulting intermediate files are named inputFileNumber.hashBucketNumber.mapped. For example, our MapReduce implementation first calls ./covid-stats-mapper samples/covid-data/partial/00000.input files/intermediate/00000.mapped on one myth machine to produce a mapped intermediate file without any hashing of keys. Then, it takes each line from that .mapped file, hashes the first token (the key) of each line, and distributes each line across 00000.hashBucketNum.mapped files (where hashBucketNum is between 00000 and 00007 – four mappers * two reducers = 8 hash buckets). This makes it easier to run the shuffle/group step: a machine that is grouping lines in hash bucket 7 can simply concatenate *.00007.mapped, sort, and group by key.

Step 4: Implementing the reducer

Your reducer will take aggregated lines from the mapper output, and output the final data about when each city experienced its highest surge. You can get a sense of what this looks like by combining all the intermediates from hash bucket 00000 and grouping the lines by key:

🍉 cat files/intermediate/*.00000.mapped | sort | ./group-by-key.py > 00000.grouped
🍉 less 00000.grouped
🍉 ./covid-stats-reducer.py 00000.grouped 00000.output

Open covid-stats-reducer.py and implement your concept for the reducer. The starter code includes make_histogram and find_biggest_weekly_spike functions that will find the biggest weekly spike for you, given a list of dates on which COVID cases were reported.

You can try running your code using the sample ./covid-stats-reducer.py invocation above. Then, test it out as part of a full MapReduce job:

🍉 ./mr_soln --mapper ./mrm_soln --reducer ./mrr_soln --config covid-stats-partial.cfg
🍉 ls files/output/
🍉 cat files/output/* | sort | less

If you’re up to the task, you can run this using covid-stats-full.cfg instead of partial, but beware that this may take quite a long time to run, since we don’t have that many myth machines to run on and this implementation is not at all optimized for performance.