Assignment 6: MapReduce

This handout was adapted from Jerry Cain’s Spring 2018 offering.

For your final CS110 assignment, you’ll harness the power of multiprocessing, networking, threads, concurrency, distributed computing, the myth computer cluster, and the shared AFS file system to build a fully operational MapReduce framework. Once you’re done, you’ll have implemented what, for most of you, will be the most complex system you’ve ever built. My hope is that you’ll not only finish the assignment (which I know you will), but that you’ll be proud of just how far you’ve advanced since the quarter began.

Note that the assignment is complex – not because you need to write a lot of code, but because you need to be familiar with a large number of sophisticated C++ classes, infer a system architecture, and coordinate multiple processes across multiple machines to accomplish a single goal.

Due date: Wednesday, August 15th at 11:59 pm

Note: Make every effort to finish this assignment on time, so that you have time to study for the final. It’s not an especially time-consuming assignment, but you should start early so that you have time to ask questions.

Background

Make sure you have read through all my lecture notes on MapReduce here.

In lecture, I presented a mapper and reducer for building a simple search index. In the starter code, I have a different mapper/reducer pair (word-count-mapper and word-count-reducer) designed for building a histogram of words across a large set of input text. For example, given Homer’s The Odyssey as input, the mapper might generate the following:

project 1
gutenberg 1
ebook 1
of 1
the 1
odyssey 1
of 1
by 1
alexander 1
pope 1
in 1
our 1
series 1
by 1
homer 1
copyright 1
laws 1
are 1
changing 1
all 1

The shuffle/group step consolidates all “1”s for a particular word onto one line:

advocates 1
aegyptius 1
aegysthus 1
aerial 1 1 1
aethiopia 1
affable 1
affianced 1
afflicts 1
affords 1 1 1 1
afresh 1
after 1
against 1 1 1 1
age 1 1
aged 1
aid 1 1

Finally, the reducer sums the “1”s to calculate a word count for each word:

advocates 1
aegyptius 1
aegysthus 1
aerial 3
aethiopia 1
affable 1
affianced 1
afflicts 1
affords 4
afresh 1
after 1
against 4
age 2
aged 1
aid 2

This isn’t a very interesting mapper/reducer, and calculating a word count across some text seems easy to do on one machine. However, we might still benefit from scaling the computation across many machines, and this mapper/reducer pair is easier to work with (and easier to debug) than the pair I presented in lecture. You are welcome to play with my search index mapper/reducer from lecture, but you needn’t do so.

Getting started

The first thing you should do, of course, is clone your assign6 repo, like this:

$ git clone /usr/class/cs110/repos/assign6/$USER assign6

Descend into that assign6 folder of yours, type make directories, and then ls a specific set of directory entries.

$ cd assign6
$ make directories
// make command listings removed for brevity
$ ls -lu odyssey-full.cfg samples/*_soln 
-rw-r--r-- 1 poohbear operator    200 May 30 21:30 odyssey-full.cfg
-rwxr-xr-x 1 poohbear operator  72680 May 28 09:25 samples/mrm_soln
-rwxr-xr-x 1 poohbear operator  64416 May 28 09:25 samples/mrr_soln
-rwxr-xr-x 1 poohbear operator 257576 May 28 09:25 samples/mr_soln

mr_soln, mrm_soln, and mrr_soln are solution executables, courtesy of yours truly, and odyssey-full.cfg is a configuration file used to drive a particular MapReduce job. If you type make at the command prompt, you’ll generate your very own mr, mrm, and mrr executables.

Eventually, your three executables (mr, mrm, and mrr) should do the same thing my solution executables (mr_soln, mrm_soln, and mrr_soln) do. Not surprisingly, you’ll need to write some code before that happens.

Information about what specific map and reduce executables should be used, how many mappers there should be, how many reducers there should be, where the input files live, where all intermediate and final output files should be placed, etc., are presented in a configuration file like odyssey-full.cfg.

If, for example, you want to compile a histogram of all the words appearing in Homer’s “The Odyssey”, you could invoke the following from the command line, which makes use of my own solution executables to get the job done:

$ ./samples/mr_soln --mapper ./samples/mrm_soln --reducer ./samples/mrr_soln --config odyssey-full.cfg

and then look inside the files/output subdirectory to see all of the mr’s output files. In fact, you should do that right now, just to see what happens. In fact, every time you want to run the solution on odyssey-full.cfg, you can type this:

$ make filefree
// make command listings removed for brevity
$ ./samples/mr_soln --mapper ./samples/mrm_soln --reducer ./samples/mrr_soln --config odyssey-full.cfg

That make filefree line removes all output files generated by previous MapReduce jobs. It’s a good thing to type it in pretty much every time, else files left by previous jobs might confuse the current one.

What’s inside odyssey-full.cfg? Let us take a look:

mapper word-count-mapper
reducer word-count-reducer
num-mappers 8
num-reducers 4
input-path /usr/class/cs110/samples/assign6/odyssey-full
intermediate-path files/intermediate
output-path files/output

The well-formed configuration file contains exactly 7 lines of space-separated key/value pairs that specify how mr should do its job. Each of the 7 keys encodes the following:

A high-level overview of the code

Running mr starts the MapReduce orchestrator server, implemented mostly in mapreduce-server.cc. This orchestrator first selects a handful of myth machines to execute the mapper executable, SSHes into them, and executes mrm on each of those servers. (This process happens in the spawnMappers function.)

In the meantime, in a separate thread, the orchestrateWorkers function starts running a server that workers can connect to. When each worker starts running, it phones home, asking for work. The orchestrateWorkers function will assign each worker a task, until there is nothing more to do, at which point it will tell the workers they can shut down.

(Note: orchestrateWorkers only uses one thread in the starter code, but Task 2 will ask you to respond to incoming requests in a multithreaded fashion. It’s here that you’ll need to think about race conditions.)

Once all the mappers are told that their work is finished, they exit. This causes the ssh command (issued by spawnMappers) to exit, and spawnMappers can return once all of the mappers exit.

Once spawnMappers returns, you can proceed to the reduce phase of the operation. You’ll want to write a similar spawnReducers function. This happens in Task 4.

Task 1: Getting Acquainted With Your Starter Code

Your assign6 repo includes a server that spawns precisely one mapper manager – a client on some other myth machine that, in principle, knows how to invoke word-count-mapper (or whatever the mapper executable might be) to generate intermediate output files of key/value pairs. This is high time you actually run the starter code to see what the system initially manages for you. No matter what you’ve done prior, type these lines out:

$ make directories filefree
// make command listings removed for brevity
$ make
// make command listings removed for brevity
$ ./mr --mapper ./mrm --reducer ./mrr --config odyssey-full.cfg --map-only --quiet 
/afs/ir.stanford.edu/users/p/o/poohbear/assign6/files/intermediate/00001.mapped hashes to 2579744460591809953
/afs/ir.stanford.edu/users/p/o/poohbear/assign6/files/intermediate/00002.mapped hashes to 15803262022774104844
/afs/ir.stanford.edu/users/p/o/poohbear/assign6/files/intermediate/00003.mapped hashes to 15899354350090661280
/afs/ir.stanford.edu/users/p/o/poohbear/assign6/files/intermediate/00004.mapped hashes to 15307244185057831752
/afs/ir.stanford.edu/users/p/o/poohbear/assign6/files/intermediate/00005.mapped hashes to 13459647136135605867
/afs/ir.stanford.edu/users/p/o/poohbear/assign6/files/intermediate/00006.mapped hashes to 2960163283726752270
/afs/ir.stanford.edu/users/p/o/poohbear/assign6/files/intermediate/00007.mapped hashes to 3717115895887543972
/afs/ir.stanford.edu/users/p/o/poohbear/assign6/files/intermediate/00008.mapped hashes to 8824063684278310934
/afs/ir.stanford.edu/users/p/o/poohbear/assign6/files/intermediate/00009.mapped hashes to 673568360187010420
/afs/ir.stanford.edu/users/p/o/poohbear/assign6/files/intermediate/00010.mapped hashes to 9867662168026348720
/afs/ir.stanford.edu/users/p/o/poohbear/assign6/files/intermediate/00011.mapped hashes to 5390329291543335432
/afs/ir.stanford.edu/users/p/o/poohbear/assign6/files/intermediate/00012.mapped hashes to 13755032733372518054

Of course, we’re relying on your compilation products – mr, mrm, and mrr – instead of the solution versions. We’re also telling mr to stop after the map phase (that comes via --map-only), and we’re telling mr to be relatively quiet (that’s what --quiet does). The starter version doesn’t even implement the reduce part of the framework, so the --map-only flag is mostly redundant, except that its presence informs the mr system to print the file hashes of all the intermediate files.

Once the above MapReduce job finishes, you’ll notice a few things. First off, the intermediate files have names like 00001.mapped instead of 00001.00023.mapped. That’s because the starter code indeed processes all of the input files, but it doesn’t further split each output file into 32 smaller output files. As a result, we expect the word so – a word that still hashes to 23 modulo 32 – to appears in many if not all of the generated output files.

$ head -10 files/intermediate/00010.mapped
the 1
friendly 1
goddess 1
stretchd 1
the 1
swelling 1
sails 1
we 1
drop 1
our 1
$ grep "^so " files/intermediate/00001.mapped | wc -l
25
$ grep "^so " files/intermediate/00002.mapped | wc -l
12
$ grep "^so " files/intermediate/00003.mapped | wc -l
14
$ grep "^so " files/intermediate/000[01][0-9].mapped | wc -l
178

The above sequence of commands implies that the word so appears in at least four of the twelve output files (as it turns out, so appears in every single one), but at least the total number of appearances across all intermediate files is 178. That’s the number we saw earlier when running the solution versions of mr, mrm, and mrr on odyssey-full.cfg.

The initial implementation we present you doesn’t even try to spawn reducers, much less reduce the intermediate files to output files. So don’t be surprised to discover the files/output directory is completely empty.

$ ls -lt files/output
total 0

Of course, these things need to be fixed, but for the moment, bask in the glory that comes with knowing you’ve been given a partially working system. Now would be a stellar time to inspect mrm.cc, mapreduce-worker.h/cc, mapreduce-mapper.h/cc, mr-messages./cc, and mapreduce-server.h/cc.

In particular, you’ll see that mrm.cc is the entry point into a program that operates as a client of the MapReduce server. That client program is invoked remotely by the server with the expectation that it telephones the server to request the name of an input file to be processed, to notify the server whether it succeeded or failed to process the named file, and to send little progress reports back to the server.

By reading though mapreduce-mapper.h and mapreduce-mapper.cc, you’ll learn that the MapReduceMapper class – the class used by mrm.cc – keeps track of the hostname and port of the server so it can connect to it as needed. You’ll see how a MapReduceMapper relies on a custom protocol of request and response formats (as defined in mr-messages.h/cc) to communicate with the server. And you’ll note some use of inheritance (MapReduceMapper subclasses MapReduceWorker) so that the base class can maintain state and helper method implementations relevant to both MapReduceMapper and MapReduceReducer.

Look at the starter version of mapreduce-server.cc last, since it’s the largest file we give you. Much of the file is concerned with configuration of the server, but a part of it spawns off a single mapper, and another part of it launches a server thread that can answer all incoming requests from the one mapper it created.

Note: There’s no code to be written here. This is simply a get-to-know-the-code-base task. :)

Task 2: Spawning Multiple Mappers

Revisit the implementation of spawnMappers. The implementation we’ve given you only creates a single client, which of course means all input files are being processed off-server by one overworked mapper. That mapper, by design, can only process one input file at a time, so the worker will process each and every input file in some order, and the server will supply input file names and receive progress reports back over many, many short network conversations. This single-server/single-worker architecture demonstrates that our system technically distributes the work across multiple machines, but for the moment, multiple means exactly two. It would be lame to argue that the two-node system performs better than a single, sequential executable. The network latency alone would slow things down to the point where it would be a step in the wrong direction.

Still, it’s neat! :)

However, if the server can spawn a single worker on a separate host, it can spawn two workers on two separate hosts (or maybe two workers on the same host), or 20 workers on up to 20 separate hosts, or 100 workers on up to 100 separate hosts, all working in parallel. Now, 100 workers all want work, so they’re often trying to connect to and message the server at the same time. But well-implemented, scalable servers can deal with a burst of many, many connection requests and respond to each and every one of them on threads other than the thread listening for incoming requests. You’ve seen this very model with the ThreadPool in Assignments 4 and 5. Spoiler alert: you’re going to see it again here.

You should upgrade your implementation of spawnMappers to spawn not just one worker (which completely ignores the num-mappers value in the configuration file), but num-mappers of them. You need to install as much parallelism into the server as it can stomach by introducing a ThreadPool to the MapReduceServer class definition. The thread routine running the server – a method called orchestrateWorkers – handles each incoming request on the server thread via a method called handleRequest, which can be scheduled to be executed off the server thread within a thunk. You’ll need to add some concurrency directives to guard against the threat of race conditions that simply weren’t present before. I have not, however, removed the oslock and osunlock manipulators present in my own solution, because making you put them back in would be mean.

By introducing multiple workers, the ThreadPool, and the necessary concurrency directives to make the server thread-safe, you will most certainly improve the speed of the overall system. You should expect the set of output files – the files placed in the directory identified by the intermediate-path entry of the configuration file – to be precisely the same as they were before. You should, however, notice that they’re generated more quickly, because more players are collectively working as a team (across a larger set of system resources) to create them.

Task 3: Hashing Keys, Creating Multiple Intermediate Files

Before advancing on to Task 3, your implementation should know how to apply a mapper executable to, say, 00007.input and generate 00007.mapped. After you’ve implemented Task 3, your mapper executable should generate 00007.00000.mapped, 00007.000001.mapped, and so forth. You should rely on the hash<string> class – it’s already used in mapreduce-server.cc in one other place to generate different port numbers for different SUNet IDs – to generate hash codes for each of the keys, and that hash code modulo num-legal-hash-codes should dictate where a key-value pair belongs. It’s okay to temporarily generate something like 00007.mapped, but you should be sure to delete it after you’ve distributed all of its key-value pairs across all of the files with names like 00007.00000.mapped, 00007.00001.mapped, etc. Also, when one key appears before a second key in 00007.mapped, and each of those two keys hash to 1 module 32, then the first key should appear before the second key in 00007.00001.mapped.

You will need to change the implementation of buildMapperCommand to supply one additional argument, which is the number of hash codes used by each mapper when generating all intermediate files on behalf of a single input file. You’ll need to update mrm.cc to accept one more argv argument, which will in turn require you extend the MapReducerMapper constructor to accept one more parameter. This additional parameter is, of course, the number of hash codes that should be used to determine how to distribute the keys in a single input file across multiple intermediate files.

The number of hash codes – at least for the purposes of this assignment – should always be equal to the number of mappers multiplied by the number of reducers. These two numbers – num-mappers and num-reducers – are embedded within the configuration file supplied at launch time, so this number of hash codes is easily computed. For those curious why I’m going with that hash value: it’s pretty arbitrary, but I chose it because it’s simple to compute, and the more intermediate files, the more likely I am to expose race conditions and other concurrency issues with your implementation.

Task 4: Implementing spawnReducers

I’m leaving this fairly open ended, since it should be clear what the reducers need to do in order to generate the correct output files. Each reducer, of course, needs to collate the collection of intermediate files storing keys with the same hash code, sort that collation, group that sorted collation by key, and then invoke the reducer executable on that sorted collation of key/vector-of-value pairs to produce final output files. The number of coexisting reducers is dictated by the num-reducers entry within the configuration file, and you should maximize parallelism without introducing any deadlock or race conditions on the server.

Hint 1: you’ll need to add a good number of additional helper methods to MapReduceServer, and you’ll need to implement the MapReduceReducer::reduce method as well.

Hint 2: once the MapReduce job has transitioned to the reduce phase, you should rely on the server to respond to reducer client requests with file name patterns instead of actual file names. The server, for instance, might send an absolute file name pattern ending in files/intermediate/00001 as an instruction to the reducer that it should gather, collate, sort, and group all intermediate files ending in .00001.mapped before pressing all of it through the reduce executable to generate files/output/00001.output.

Assignment 6 Files

Here’s the complete list of all of the files contributing to the assign6 code base. It’s our expectation that you read through all of the code in all files, initially paying attention to the documentation present in the interface files, and then reading through and internalizing the implementations of the functions and methods that we provide.

mr.cc

mr.cc defines the main entry point for the MapReduce server. The entire file is very short, as all it does is pass responsibility to a single instance of MapReduceServer. (You should not need to change this file, although you can if you want to.)

mapreduce-server.h/cc

These two files collectively define and implement the MapReduceServer class. mapreduce-server.cc is by far the largest file you’ll work with, because the MapReduceServer class is the central player in the whole MapReduce system you’re building. Each MapReduceServer instance is responsible for:

You should expect to make a good number of changes to this file.

mrm.cc

mrm.cc defines the main entry point for the MapReduce mapper. mrm.cc is to the mapper what mr.cc is to the server. It is very short, because the heart and lungs of a worker have been implanted inside the MapReduceMapper class, which is defined and implemented in mapreduce-mapper.h/cc. A quick read of mrm.cc will show that it does little more than create an instance of a MapReduceMapper, instructing it to coordinate with the server to do some work, invoking mapper executables to process input files (the names of which are shared via messages from the server), and then shutting itself down once the server says all input files have been processed. (You will need to change this file just a bit when time comes to support intermediate file splits by key hash codes.)

mapreduce-mapper.h/cc

These two files collectively define and implement the MapReduceMapper class. The meat of its implementation can be found in its map method, where the worker churns for as long as necessary to converse with the server, accepting tasks, applying mapper executables to input files, reporting back to the server when a job succeeds and when a job fails, and then shutting itself down – or rather, exiting from its map method – when it hears from the server that all input files have been processed. (You should expect to make a small number of changes to this file.)

mrr.cc

mrr.cc defines the main entry point for the MapReduce reducer. You can pretty much take the entire discussion of the mrm.cc file above and replace the word mapper with reducer and the class name MapReduceMapper with MapReduceReducer. (You probably won’t need to change this file, but you can if you want to.)

mapreduce-reducer.h/cc

These two files collectively define and implement the MapReduceReducer class. Once you advance on to the spawnReducers phase of the assignment, you’ll need to implement MapReduceReducer::reduce to imitate what MapReduceMapper::map does. MapReduceReducer::reduce isn’t an exact replica of MapReduceMapper::map, but they’re each similar enough that you’ll want to consult map while implementing reduce. (You will definitely change these two files.)

mapreduce-worker.h/cc

These two files define a parent class to unify state and logic common to both the MapReduceMapper and MapReduceReducer classes. In fact, you’ll see that each of MapReduceMapper and MapReduceReducer subclass MapReduceWorker. (You shouldn’t need to change these two files, though you may if you want to.)

mr-nodes.h/cc The mr-nodes module exports a single function that tells us what myth cluster machines are up and running and able to contribute to our mr system. (You should not need to change these files, though you may if you want to.)

mr-messages.h/cc

The mr-messages module defines the small set of messages that can be exchanged between workers and servers. (You should not need to change these files, though you may if you want to.)

mr-env.h/cc

The mr-env module defines a small collection of functions that helps surface shell variables, like that for the logged-in user, the current host machine, and the current working directory. (You should not need to change these files, though if you notice these functions don’t work with the shell you’re using, let Ryan know and he’ll fix it.)

mr-random.h/cc

Defines a pair of very short functions that you’ll likely ignore. The two exported functions – sleepRandomAmount and randomChance – are used to fake the failure of the word-count-mapper and word-count-reducer binaries. Real MapReduce jobs fail from time to time, so we should understand how to build a system that’s sensitive – even if superficially so – to failure. (You should not need to change these files, though you’re welcome to if you want.)

mr-names.h/cc

The mr-names module defines another small collection of helper functions that help use generate the names of intermediate and final output files. Inspect the interface file for documentation on how extractBase, changeExtension, and numberToString all behave, and grep through the other .cc files to see how they’re already being used. (You shouldn’t need to change these files, though you’re welcome to if you want to.)

mr-hash.h

This interface file is short but dense, and defines a class that can be used to generate a hash code for an ifstream (or rather, the payload of the file it’s layered on top of). You should not change this file, and to be honest, you don’t even need to know what it’s doing. It’s already being used by some starter code in mapreduce-server.cc, and you shouldn’t need to use it anywhere else.

mr-utils.h/cc

The mr-utils module defines another small collection of helper functions that didn’t fit well in mr-env, mr-random, or mr-names. You can inspect the header and implementation file to see you get functionality for parsing strings, managing string-to-number conversions, and ensuring that directories that need to exist actually exist. The code in here is pretty boring, which is why I wanted to get it out of the way and into a module you probably don’t need to look at for more than a few seconds. (You should not need to change these files, but yes, you can change it if you’d like to.)

server-socket.h/cc

Supplies the interface and implementation for the createServerSocket routine we implemented in lecture. The mr executable, which is responsible for spawning workers and exchanging messages with them, acts as the one server that all workers contact. As a result, the mr executable – or more specifically, the mapreduce-server module – must bind a server socket to a particular port on the local host, and then share the server host and port with all spawned workers so they know how to get back in touch. As a result, you’ll find a call to createServerSocket slap dab in the middle of mapreduce-server.cc. (You should not need to change these files, and I kinda don’t want you to, so don’t. :))

client-socket.h/cc

Supplies the interface and implementation for the createClientSocket routine we know must contribute to the implementation of the MapReduceWorker class if the workers are to establish contact with the server.

mapreduce-server-exception.h

Defines the one exception type – the MapReduceServerException – used to identify exceptional circumstances (configuration issues, network hiccups, malformed messages between server and worker, etc.) The implementation is so short and so obvious that it’s been inlined into a .h file. (You should not change this file.)

thread-pool.h

Fearing you’d be disappointed had our final assignment not rely on a ThreadPool, I’ve included the interface file, and updated the Makefile as I did for Assignment 5 to link against code for one that’s fully operational. (You should not change the thread-pool.h file. If you do, things might break, so don’t.)

word-count-mapper.cc, word-count-reducer.cc

These are standalone C++ executables that conform to the MapReduce programming model. Each takes two arguments: an input file name and an output file name. (You shouldn’t change these files.)

Additional Information

Here are a bunch of tips and nuggets to help you make better decisions:

Once you’re done, you should commit your work to your local repository and then run ./tools/submit as you have for all of your previous assignments. And then you should tweet about how awesome you are that you built a MapReduce framework and that MapReduce is your new jam.