Introduction
In parts 2 and 3 of the first assignment, you will build a Map/Reduce library as a way to learn the Go programming language and as a way to learn about fault tolerance in distributed systems. For part 2, you will work with a sequential Map/Reduce implementation and write a sample program that uses it.
The interface to the library is similar to the one described in the original MapReduce paper.
Software
Youll implement this assignment (and all the assignments) in Go. The Go web site contains lots of tutorial information which you may want to look at.
For the next two parts of this assignment, we will provide you with a significant amount of scaffolding code to get started. The relevant code is under this directory. We will ensure that all the code we supply works on the CS servers (cycles.cs.princeton.edu). We expect that it is likely to work on your own development environment that supports Go.
In this assignment, we supply you with parts of a flexible MapReduce implementation. It has support for two modes of operation, sequential and distributed. Part 2 deals with the former. The map and reduce tasks are all executed in serial: the first map task is executed to completion, then the second, then the third, etc. When all the map tasks have finished, the first reduce task is run, then the second, etc. This mode, while not very fast, can be very useful for debugging, since it removes much of the noise seen in a parallel execution. The sequential mode also simplifies or eliminates various corner cases of a distributed system.
Getting familiar with the source
The mapreduce package (located at $GOPATH/src/mapreduce) provides a simple Map/Reduce library with a sequential implementation. Applications would normally call Distributed() located in mapreduce/master.go to start a job, but may instead call Sequential() also in mapreduce/master.go to get a sequential execution, which will be our approach in this assignment.
The flow of the mapreduce implementation is as follows:
- The application provides a number of input files, a map function, a reduce function, and the number of reduce tasks (nReduce).
- A master is created with this knowledge. It spins up an RPC server (see mapreduce/master_rpc.go), and waits for workers to register (using the RPC call Register() defined in mapreduce/master.go). As tasks become available, schedule() located in mapreduce/schedule.go decides how to assign those tasks to workers, and how to handle worker failures.
- The master considers each input file one map task, and makes a call to doMap() in mapreduce/common_map.go at least once for each task. It does so either directly (when using Sequential()) or by issuing the DoTask RPC located in mapreduce/worker.go on a worker. Each call to doMap() reads the appropriate file, calls the map function on that files contents, and produces nReduce files for each map file. Thus, after all map tasks are done, the total number of files will be the product of the number of files given to map (nIn) and nReduce.
f0-0, ..., f0-[nReduce-1],...f[nIn-1]-0, ..., f[nIn-1]-[nReduce-1].
- The master next makes a call to doReduce() in mapreduce/common_reduce.go at least once for each reduce task. As with doMap(), it does so either directly or through a worker. doReduce() collects corresponding files from each map result (e.g. f0-i, f1-i, ... f[nIn-1]-i), and runs the reduce function on each collection. This process produces nReduce result files.
- The master calls mr.merge() in mapreduce/master_splitmerge.go, which merges all the nReduce files produced by the previous step into a single output.
- The master sends a Shutdown RPC to each of its workers, and then shuts down its own RPC server.
You should look through the files in the MapReduce implementation, as reading them might be useful to understand how the other methods fit into the overall architecture of the system hierarchy. However, for this assignment, you will write/modify only doMap in mapreduce/common_map.go, doReduce in mapreduce/common_reduce.go, and mapF and reduceF in main/wc.go. You will not be able to submit other files or modules.
Part I: Map/Reduce input and output
The Map/Reduce implementation you are given is missing some pieces. Before you can write your first Map/Reduce function pair, you will need to fix the sequential implementation. In particular, the code we give you is missing two crucial pieces: the function that divides up the output of a map task, and the function that gathers all the inputs for a reduce task. These tasks are carried out by the doMap() function in mapreduce/common_map.go, and the doReduce() function in mapreduce/common_reduce.go respectively. The comments in those files should point you in the right direction.
To help you determine if you have correctly implemented doMap() and doReduce(), we have provided you with a Go test suite that checks the correctness of your implementation. These tests are implemented in the file test_test.go. To run the tests for the sequential implementation that you have now fixed, follow this (or a non-bash equivalent) sequence of shell commands, starting from the 418/assignment1-2 directory:
# Go needs $GOPATH to be set to the directory containing "src"$ cd 418/assignment1-2$ lsREADME.md src$ export GOPATH="$PWD"$ cd src$ go test -run Sequential mapreduce/...ok mapreduce 4.515s
If the output did not show ok next to the tests, your implementation has a bug in it. To give more verbose output, set debugEnabled = true in mapreduce/common.go, and add -v to the test command above. You will get much more output along the lines of:
$ go test -v -run Sequential=== RUN TestSequentialSinglemaster: Starting Map/Reduce task testMerge: read mrtmp.test-res-0master: Map/Reduce task completed--- PASS: TestSequentialSingle (2.30s)=== RUN TestSequentialManymaster: Starting Map/Reduce task testMerge: read mrtmp.test-res-0Merge: read mrtmp.test-res-1Merge: read mrtmp.test-res-2master: Map/Reduce task completed--- PASS: TestSequentialMany (2.32s)PASSok mapreduce4.635s
Part II: Single-worker word count
Now that the map and reduce tasks are connected, we can start implementing some interesting Map/Reduce operations. For this assignment, we will be implementing word count a simple and classic Map/Reduce example. Specifically, your task is to modify mapF and reduceF within main/wc.go so that the application reports the number of occurrences of each word. A word is any contiguous sequence of letters, as determined by unicode.IsLetter.
There are some input files with pathnames of the form pg-*.txt in the main directory, downloaded from Project Gutenberg. This is the result when you initially try to compile the code we provide you and run it:
$ cd "$GOPATH/src/main"$ go run wc.go master sequential pg-*.txt# command-line-arguments./wc.go:14: missing return at end of function./wc.go:21: missing return at end of function
The compilation fails because we havent written a complete map function (mapF()) nor a complete reduce function (reduceF()) in wc.go yet. Before you start coding read Section 2 of the MapReduce paper. Your mapF() and reduceF() functions will differ a bit from those in the papers Section 2.1. Your mapF() will be passed the name of a file, as well as that files contents; it should split it into words, and return a Go slice of key/value pairs, of type mapreduce.KeyValue. Your reduceF() will be called once for each key, with a slice of all the values generated by mapF() for that key; it should return a single output value.
You can test your solution using:
$ cd "$GOPATH/src/main"$ go run wc.go master sequential pg-*.txtmaster: Starting Map/Reduce task wcseqMerge: read mrtmp.wcseq-res-0Merge: read mrtmp.wcseq-res-1Merge: read mrtmp.wcseq-res-2master: Map/Reduce task completed
The output will be in the file mrtmp.wcseq. We will test your implementations correctness with the following command, which should produce the following top 10 words:
$ sort -n -k2 mrtmp.wcseq | tail -10he: 34077was: 37044that: 37495I: 44502in: 46092a: 60558to: 74357of: 79727and: 93990the: 154024
(this sample result is also found in main/mr-testout.txt)
You can remove the output file and all intermediate files with:
$ rm mrtmp.*
To make testing easy for you, from the $GOPATH/src/main directory, run:
$ sh ./test-wc.sh
and it will report if your solution is correct or not.
Resources and Advice
- a good read on what strings are in Go is the Go Blog on strings.
- you can use strings.FieldsFunc to split a string into components.
- the strconv package (http://golang.org/pkg/strconv/) is handy to convert strings to integers etc.
Submitting Assignment
You hand in your assignment exactly as youve been letting us know your progress:
You should verify that you are able to see your final commit and your a12-handin tag on the Github page in your repository for this assignment.
You will receive full credit if your software passes the Sequential tests in test_test.go and test-wc.sh.
We will use the timestamp of your last tag for the purpose of calculating late days, and we will only grade that version of the code. (Well also know if you backdate the tag, dont do that.)
Our test script is not a substitute for doing your own testing on the full go test cases detailed above. Before submitting, please run the full tests given above for both parts one final time. You are responsible for making sure your code works.
You will receive full credit for Part I if your software passes the Sequential tests (as run by the go test commands above) on the CS servers. You will receive full credit for Part II if your Map/Reduce word count output matches the correct output for the sequential execution above when run on the CS servers.
The final portion of your credit is determined by code quality tests, using the standard tools gofmt and go vet. You will receive full credit for this portion if all files submitted conform to the style standards set by gofmt and the report from go vet is clean for your mapreduce package (that is, produces no errors). If your code does not pass the gofmt test, you should reformat your code using the tool. You can also use the Go Checkstyle tool for advice to improve your codes style, if applicable. Additionally, though not part of the graded cheks, it would also be advisable to produce code that complies with Golint where possible.
PART 3
COS418 Assignment 1 (Part 3): Distributed Map/Reduce
Introduction
Part c continues the work from assignment 1.2 building a Map/Reduce library as a way to learn the Go programming language and as a way to learn about fault tolerance in distributed systems. In this part of the assignment, you will tackle a distributed version of the Map/Reduce library, writing code for a master that hands out tasks to multiple workers and handles failures in workers. The interface to the library and the approach to fault tolerance is similar to the one described in the original MapReduce paper. As with the previous part of this assignment, you will also complete a sample Map/Reduce application.
Software
You will use the same mapreduce package as in part b, focusing this time on the distributed mode.
Over the course of this assignment, you will have to modify schedule from schedule.go, as well as mapF and reduceF in main/ii.go.
As with the previous part of this assignment, you should not need to modify any other files, but reading them might be useful in order to understand how the other methods fit into the overall architecture of the system.
To get start, copy all source files from assignment1-2/src
to assignment1-3/src
# start from your 418 GitHub repo$ cd 418$ lsREADME.md assignment1-1 assignment1-2 assignment1-3 assignment2 assignment3 assignment4 assignment5 setup.md$ cp -r assignment1-2/src/* assignment1-3/src/$ ls assignment1-3/srcmain mapreduce
Part I: Distributing MapReduce tasks
One of Map/Reduces biggest selling points is that the developer should not need to be aware that their code is running in parallel on many machines. In theory, we should be able to take the word count code you wrote in Part II of assignment 1.2, and automatically parallelize it!
Our current implementation runs all the map and reduce tasks one after another on the master. While this is conceptually simple, it is not great for performance. In this part of the assignment, you will complete a version of MapReduce that splits the work up over a set of worker threads, in order to exploit multiple cores. Computing the map tasks in parallel and then the reduce tasks can result in much faster completion, but is also harder to implement and debug. Note that for this part of the assignment, the work is not distributed across multiple machines as in real Map/Reduce deployments, your implementation will be using RPC and channels to simulate a truly distributed computation.
To coordinate the parallel execution of tasks, we will use a special master thread, which hands out work to the workers and waits for them to finish. To make the assignment more realistic, the master should only communicate with the workers via RPC. We give you the worker code (mapreduce/worker.go), the code that starts the workers, and code to deal with RPC messages (mapreduce/common_rpc.go).
Your job is to complete schedule.go in the mapreduce package. In particular, you should modify schedule() in schedule.go to hand out the map and reduce tasks to workers, and return only when all the tasks have finished.
Look at run() in master.go. It calls your schedule() to run the map and reduce tasks, then calls merge() to assemble the per-reduce-task outputs into a single output file. schedule only needs to tell the workers the name of the original input file (mr.files[task]) and the task task; each worker knows from which files to read its input and to which files to write its output. The master tells the worker about a new task by sending it the RPC call Worker.DoTask, giving a DoTaskArgs object as the RPC argument.
When a worker starts, it sends a Register RPC to the master. master.go already implements the masters Master.Register RPC handler for you, and passes the new workers information to mr.registerChannel. Your schedule should process new worker registrations by reading from this channel.
Information about the currently running job is in the Master struct, defined in master.go. Note that the master does not need to know which Map or Reduce functions are being used for the job; the workers will take care of executing the right code for Map or Reduce (the correct functions are given to them when they are started by main/wc.go).
To test your solution, you should use the same Go test suite as you did in Part I of assignment 1.2, except swapping out -run Sequential with -run TestBasic. This will execute the distributed test case without worker failures instead of the sequential ones we were running before:
$ go test -run TestBasic mapreduce/...
As before, you can get more verbose output for debugging if you set debugEnabled = true in mapreduce/common.go, and add -v to the test command above. You will get much more output along the lines of:
$ go test -v -run TestBasic mapreduce/...=== RUN TestBasic/var/tmp/824-32311/mr8665-master: Starting Map/Reduce task testSchedule: 100 Map tasks (50 I/Os)/var/tmp/824-32311/mr8665-worker0: given Map task #0 on file 824-mrinput-0.txt (nios: 50)/var/tmp/824-32311/mr8665-worker1: given Map task #11 on file 824-mrinput-11.txt (nios: 50)/var/tmp/824-32311/mr8665-worker0: Map task #0 done/var/tmp/824-32311/mr8665-worker0: given Map task #1 on file 824-mrinput-1.txt (nios: 50)/var/tmp/824-32311/mr8665-worker1: Map task #11 done/var/tmp/824-32311/mr8665-worker1: given Map task #2 on file 824-mrinput-2.txt (nios: 50)/var/tmp/824-32311/mr8665-worker0: Map task #1 done/var/tmp/824-32311/mr8665-worker0: given Map task #3 on file 824-mrinput-3.txt (nios: 50)/var/tmp/824-32311/mr8665-worker1: Map task #2 done...Schedule: Map phase doneSchedule: 50 Reduce tasks (100 I/Os)/var/tmp/824-32311/mr8665-worker1: given Reduce task #49 on file 824-mrinput-49.txt (nios: 100)/var/tmp/824-32311/mr8665-worker0: given Reduce task #4 on file 824-mrinput-4.txt (nios: 100)/var/tmp/824-32311/mr8665-worker1: Reduce task #49 done/var/tmp/824-32311/mr8665-worker1: given Reduce task #1 on file 824-mrinput-1.txt (nios: 100)/var/tmp/824-32311/mr8665-worker0: Reduce task #4 done/var/tmp/824-32311/mr8665-worker0: given Reduce task #0 on file 824-mrinput-0.txt (nios: 100)/var/tmp/824-32311/mr8665-worker1: Reduce task #1 done/var/tmp/824-32311/mr8665-worker1: given Reduce task #26 on file 824-mrinput-26.txt (nios: 100)/var/tmp/824-32311/mr8665-worker0: Reduce task #0 done...Schedule: Reduce phase doneMerge: read mrtmp.test-res-0Merge: read mrtmp.test-res-1...Merge: read mrtmp.test-res-49/var/tmp/824-32311/mr8665-master: Map/Reduce task completed--- PASS: TestBasic (25.60s)PASSok mapreduce25.613s
Part II: Handling worker failures
In this part you will make the master handle failed workers. MapReduce makes this relatively easy because workers dont have persistent state. If a worker fails, any RPCs that the master issued to that worker will fail (e.g., due to a timeout). Thus, if the masters RPC to the worker fails, the master should re-assign the task given to the failed worker to another worker.
An RPC failure doesnt necessarily mean that the worker failed; the worker may just be unreachable but still computing. Thus, it may happen that two workers receive the same task and compute it. However, because tasks are idempotent, it doesnt matter if the same task is computed twice both times it will generate the same output. So, you dont have to do anything special for this case. (Our tests never fail workers in the middle of task, so you dont even have to worry about several workers writing to the same output file.)
You dont have to handle failures of the master; we will assume it wont fail. Making the master fault-tolerant is more difficult because it keeps persistent state that would have to be recovered in order to resume operations after a master failure. Much of the rest of this course is devoted to this challenge.
Your implementation must pass the two remaining test cases in test_test.go. The first case tests the failure of one worker, while the second test case tests handling of many failures of workers. Periodically, the test cases start new workers that the master can use to make forward progress, but these workers fail after handling a few tasks. To run these tests:
$ go test -run Failure mapreduce/...
Part III: Inverted index generation
Word count is a classical example of a Map/Reduce application, but it is not an application that many large consumers of Map/Reduce use. It is simply not very often you need to count the words in a really large dataset. For this application exercise, we will instead have you build Map and Reduce functions for generating an inverted index.
Inverted indices are widely used in computer science, and are particularly useful in document searching. Broadly speaking, an inverted index is a map from interesting facts about the underlying data, to the original location of that data. For example, in the context of search, it might be a map from keywords to documents that contain those words.
We have created a second binary in main/ii.go that is very similar to the wc.go you built earlier. You should modify mapF and reduceF in main/ii.go so that they together produce an inverted index. Running ii.go should output a list of tuples, one per line, in the following format:
$ go run ii.go master sequential pg-*.txt$ head -n5 mrtmp.iiseqA: 16 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txtABC: 2 pg-les_miserables.txt,pg-war_and_peace.txtABOUT: 2 pg-moby_dick.txt,pg-tom_sawyer.txtABRAHAM: 1 pg-dracula.txtABSOLUTE: 1 pg-les_miserables.txt
If it is not clear from the listing above, the format is:
word: #documents documents,sorted,and,separated,by,commas
We will test your implementations correctness with the following command, which should produce these resulting last 10 items in the index:
$ sort -k1,1 mrtmp.iiseq | sort -snk2,2 mrtmp.iiseq | grep -v '16' | tail -10women: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txtwon: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txtwonderful: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txtwords: 15 pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txtworked: 15 pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txtworse: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txtwounded: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txtyes: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-metamorphosis.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txtyounger: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txtyours: 15 pg-being_ernest.txt,pg-dorian_gray.txt,pg-dracula.txt,pg-emma.txt,pg-frankenstein.txt,pg-great_expectations.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-les_miserables.txt,pg-moby_dick.txt,pg-sherlock_holmes.txt,pg-tale_of_two_cities.txt,pg-tom_sawyer.txt,pg-ulysses.txt,pg-war_and_peace.txt
(this sample result is also found in main/mr-challenge.txt)
To make testing easy for you, from the $GOPATH/src/main directory, run:
$ sh ./test-ii.sh
and it will report if your solution is correct or not.
Resources and Advice
- The master should send RPCs to the workers in parallel so that the workers can work on tasks concurrently. You will find the go statement useful for this purpose and the Go RPC documentation.
- The master may have to wait for a worker to finish before it can hand out more tasks. You may find channels useful to synchronize threads that are waiting for reply with the master once the reply arrives. Channels are explained in the document on Concurrency in Go.
- The code we give you runs the workers as threads within a single UNIX process, and can exploit multiple cores on a single machine. Some modifications would be needed in order to run the workers on multiple machines communicating over a network. The RPCs would have to use TCP rather than UNIX-domain sockets; there would need to be a way to start worker processes on all the machines; and all the machines would have to share storage through some kind of network file system.
- The easiest way to track down bugs is to insert debug() statements, set debugEngabled = true in mapreduce/common.go, collect the output in a file with, e.g., go test -run TestBasic mapreduce/... > out, and then think about whether the output matches your understanding of how your code should behave. The last step (thinking) is the most important.
- When you run your code, you may receive many errors like method has wrong number of ins. You can ignore all of these as long as your tests pass.
Submission
You hand in your assignment as before.
You should verify that you are able to see your final commit and tags on the Github page of your repository for this assignment.
You will receive full credit for Part I if your software passes TestBasic from test_test.go (the test given in Part I) on the CS servers. You will receive full credit for Part II if your software passes the tests with worker failures (the Failure pattern to go test given in Part II) on the CS servers. You will receive full credit for Part II if your index output matches the correct output when run on the CS servers.
The final portion of your credit is determined by code quality tests, using the standard tools gofmt and go vet. You will receive full credit for this portion if all files submitted conform to the style standards set by gofmt and the report from go vet is clean for your mapreduce package (that is, produces no errors). If your code does not pass the gofmt test, you should reformat your code using the tool. You can also use the Go Checkstyle tool for advice to improve your codes style, if applicable. Additionally, though not part of the graded cheks, it would also be advisable to produce code that complies with Golint where possible.
5/5 – (2 votes)
$ git commit -am "[you fill me in]"$ git tag -a -m "i finished assignment 1-2" a12-handin$ git push origin master$ git push origin a12-handin$
Reviews
There are no reviews yet.