# Assignment #2: A Concurrent Key/Value Store
The purpose of this assignment is to give you an opportunity to see how to use
concurrency in a realistic setting.
## Do This First
Immediately after pulling `p2` from Bitbucket, you should start a container,
navigate to the `p2` folder, and type `chmod +x solutions/*.exe`.This command
only needs to be run once.It will make sure that certain files from the
assignment have their executable bit set.
## Assignment Details
There are two main reasons for using concurrency.One of them, which is more
typically called parallelism, is the desire to use multiple CPU cores to get a
job done in less time.The second involves overlapping communication with
computation.Often, these go hand in hand. As an example, consider our server
from assignment 1.For the entire time that the server is responding to one
client, it cannot even accept a connection from another client.Certainly it
would be good if we could overlap one clients operations in the directory with
another clients network communication.But what if two clients need to access
the `directory` at the same time?We can make the `directory` thread-safe by
protecting it with a `mutex`, but thats not going to let us use many cores to
satisfy many requests at the same time.
In this assignment, we are going to extend our client and server in two main
ways.The first is that we are going to add a new feature to the server, a
key/value store (KVS).This new data structure will be a hash table into
which clients can store arbitrary data (the value) by giving it a unique name
(the key, a `std::string`).The client will need to be able to perform five new
operations:
* KVI: Key/Value `Insert`: insert a new key/value pair into the servers KVS
* KVD: Key/Value `Delete`: remove an existing key/value pair from the servers KVS
* KVG: Key/Value `Get`: get the value associated with a key
* KVU: Key/Value `Upsert`: set or change the value for a key in the servers KVS
* KVA: Key/Value `All`: get a list of all the keys in the servers KVS
These changes to the client are relatively straightforward, and we have done
them for you.You do not need to edit the client code at all.In fact, we
dont even provide you with code for the client anymore.Your focus should be
entirely on the server.
First and foremost, we will need a way for the server to give incoming requests
to concurrent threads.Our solution will be to create a new thread pool.The
thread pool can be thought of as a single-producer multi-consumer queue.Each
time a new connection arrives, the servers main thread should accept the
connection and put it into the queue.Worker threads will be blocked trying to
pop elements from the queue.The worker threads can then read the requests,
compute results, and send them back to clients.
The other main task will be to make the servers data structures thread-safe. We
could do this by protecting each with a coarse-grained `std::mutex` object.
However, thats not going to be particularly good for performance.Instead, we
will create a concurrent, generic hash table.When the hash table is complete,
we will be able to use it twice: for the KVS, and for the authentication table.
Note that it will be important to think very carefully about two-phase locking
for this new hash table the comments in the provided code are a good place to
start.
Finally, it will be necessary to update the persistence mechanism of our server,
because the new functionalities will need to be incorporated into the way that
we persist data.Again, see the comments in the provided code for details of
the new persistence requirements.
## Getting Started
Your git repository has a new folder, `p2`, which contains a significant portion
of the code for the final solution to this assignment.It also includes binary
files that represent much of the solution to assignment 1.The `Makefile` has
been updated so that you do not need to have completed the first assignment in
order to get full credit for this assignment.
There is a test script called `scripts/p2.py`, which you can use to test your
code.Note that these tests are not sufficient to show that your code is
thread-safe and free of data races.We will inspect your code to evaluate its
correctness in the face of concurrency.
## Tips and Reminders
You should probably start by creating the thread pool.Without it, you will not
be able to achieve concurrent behaviors in your server, and thus it will be
harder to find bugs in your KVS implementation.
As in the last assignment, you should keep in mind that subsequent assignments
will build upon this so as you craft a solution, be sure to think about how
to make it maintainable.
**Start Early**.Just reading the code and understanding what is happening
takes time.If you start reading the assignment early, youll give yourself
time to think about what is supposed to be happening, and that will help you to
figure out what you will need to do.
As always, please be careful about not committing unnecessary files into your
repository.
Your server should **not** store plain-text passwords in the file.
## Grading
The `scripts` folder has four python scripts that exercise the main portions of
the assignment: basic authentication and correctness, file-based persistence,
the thread pool, and the hash table.These scripts use specialized `Makefiles`
that integrate some of the solution code with some of your code, so that you can
get partial credit when certain portions of your code work.Note that these
`Makefiles` are very strict: they will crash on any compiler warning.You
should *not* turn off these warnings; they are there to help you.If your code
does not compile with these scripts, you will not receive any credit.If you
have partial solutions and you do not know how to coerce the code into being
warning-free, you should ask the professor.
If the scripts pass when using your client with our server, and when using our
client with your server, then you will receive full functionality points for
that portion of the assignment.We will manually inspect your code to determine
if it is correct with regard to concurrency.As before, we reserve the right to
deduct style points if your code is especially convoluted.Please be sure to
use your tools well.For example, Visual Studio Code (and emacs, and vim) have
features that auto-format your code.You should use these features.
While it is our intention to give partial credit, *no credit will be given for
code that does not compile without warnings.*If you decide to move functions
between files, or add files, or do other things that break the scripts in the
grading folder, then you will lose at least 10%. If you think some change is
absolutely necessary, speak with the professor first.
There are five main graded portions of the assignment:
* Is the thread pool correct?
* Files: `common/pool.cc`
* Is the persistence mechanism correct?
* Files: `server/storage.cc`
* Is the concurrent hash table Store implemented correctly?
* Files: `common/concurrenthashtable_impl.h`
* Is persistence implemented correctly, and according to `format.h`
* Files: `server/storage.cc`
* Does the Key/Value store scale?
* Files: `common/concurrenthashtable_impl.h`
Note: for the fifth part of the assignment, we have provided a separate `bench`
folder, which has a benchmark you can use to test scalability.Be sure to pay
careful attention to your Docker settings.By default, Docker does not give
many cores to each container, so you may find that you arent seeing
scalability, even though your laptop has many cores.
Also, note that we have removed a lot of code from the `p2` folder, and instead
are providing `.o` files for the corresponding libraries.This means, for
example, that if you did not correctly implement cryptography in the last
assignment, it will not affect your grade in this assignment.In fact, we are
providing a complete `client.exe`, and expecting all of your work to only be in
three files: `common/pool.cc`, `common/concurrenthashtable_impl.h`, and
`server/storage.cc`.You will probably want to begin by re-using a lot of your
`server/storage.cc` code from the last assignment.
Remember that the graded components are intentionally vague: you need to start
thinking about what it means for code to be correct.There are many criteria
that cannot be established through unit testing.This is especially true when
thinking about race freedom and serializability.
## Notes About the Reference Solution
In the following subsections, you will find some details about the reference
solution.
### `server/storage.cc`
The reference solution is about 500 lines, including comments.Most of the
functions are not implemented at all.Note that the `Storage::Internal` struct
that we provide is complete.It has everything that you need to implement
storage correctly.
One of the biggest challenges in this code is to get the concurrency correct,
and its hard to do because our tests wont crash if the concurrency is wrong.
In particular, while it is fine to assume that `load()` will only be called
once, before there are threads, and thus does not need concurrency, you cannot
assume the same about any other methods.You will need to use two-phase locking
to `persist()` correctly.The easiest way is to chain lambdas together.If you
arent sure how, you should think carefully about the `do_all_readonly()` method
of the hash table.
Most of the K/V operations are relatively straightforward once youve got everything else working.For example, the code for `kv_upsert()` looks something like this:
`c++
auto r = auth(user, pass);
if (!r.succeeded)
return {false, r.msg, {}};
if (fields->kv_store.upsert(key, bytevec(val)))
return {true, RES_OKINS, {}};
return {true, RES_OKUPD, {}};
`
### pool.cc
The thread pool is fundamental to having any concurrency in your server.
Consider the old implementation of `accept_client()` in `net.cc` from p1:
`c++
/// Given a listening socket, start calling accept() on it to get new
/// connections.Each time a connection comes in, use the provided handler to
/// process the request.Note that this is not multithreaded.Only one client
/// will be served at a time.
///
/// @param sd The socket file descriptor on which to call accept
/// @param handler A function to call when a new connection comes in
void accept_client(int sd, function
// Use accept() to wait for a client to connect.When it connects, service
// it.When it disconnects, then and only then will we accept a new client.
while (true) {
cout << “Waiting for a client to connect…
“;sockaddr_in clientAddr = {0};socklen_t clientAddrSize = sizeof(clientAddr);int connSd = accept(sd, (sockaddr *)&clientAddr, &clientAddrSize);if (connSd < 0) {close(sd);sys_error(errno, “Error accepting request from client: “);return;}char clientname[1024];cout << “Connected to ” << inet_ntop(AF_INET, &clientAddr.sin_addr, clientname,sizeof(clientname)) << endl;bool done = handler(connSd);// NB: ignore errors in close()close(connSd);if (done)return;}}“`Each time a new socket connection happened, the server would directly executethe handler to parse the request, decrypt it, and interact with`server_storage.h` accordingly.If an operation in `server_storage.h` took along time, then no other requests would be satisfied in the meantime.The purpose of a thread pool is to overcome this sequential bottleneck.Thethread pool is essentially a bunch of threads that are all waiting (by using acondition variable) for things to arrive in a queue.When the main thread (theone who is running `accept_client`) receives a new connection, it should justpush it into the queue and then wait for the next connection to arrive.Let’s look at the new `accept_client()` code:“`c++/// Given a listening socket, start calling accept() on it to get new/// connections.Each time a connection comes in, pass it to the thread pool so/// that it can be processed.////// @param sd The socket file descriptor on which to call accept/// @param pool The thread pool that handles new requestsvoid accept_client(int sd, thread_pool &pool) {atomic
pool.set_shutdown_handler([&]() {
safe_shutdown = true;
shutdown(sd, SHUT_RDWR);
});
// Use accept() to wait for a client to connect.When it connects, service
// it.When it disconnects, then and only then will we accept a new client.
while (pool.check_active()) {
cout << “Waiting for a client to connect…
“;sockaddr_in clientAddr = {0};socklen_t clientAddrSize = sizeof(clientAddr);int connSd = accept(sd, (sockaddr *)&clientAddr, &clientAddrSize);if (connSd < 0) {// If safe_shutdown() was called, and it’s EINVAL, then the pool has been// halted, and the listening socket closed, so don’t print an error.if (errno != EINVAL || !safe_shutdown)sys_error(errno, “Error accepting request from client: “);return;}char clientname[1024];cout << “Connected to ” << inet_ntop(AF_INET, &clientAddr.sin_addr, clientname,sizeof(clientname)) << endl;pool.service_connection(connSd);}}“`This code is mostly the same, except that it just hands the connection to apool, instead of handling it.However, there is one other important difference:what happens on a `BYE` message.Before, the main thread was processing eachmessage, so it would receive a `BYE`, `handler()` would return false, and itwould exit the loop.Now we need some way for the threads of the pool tocommunicate back to the main thread.We do this by sending a `shutdown_handler`lambda to the pool.This handler tells the pool how to notify the main threadof a `BYE` message.The code above is trickier than it seems: the code in the handler is not run bythe main thread, but by *one of* the threads in the pool.To prevent races, weneed to use an `atomic` variable.Since the main thread may be blocked in acall to `accept`, we also need to `shutdown` the socket.While we provide thiscode for you, it highlights an important point: thinking about concurrency ismuch more difficult than thinking about sequential code.With all of the above as background, note that `pool.cc` is only about 130 linesof code.You will need to use a `queue`, a `mutex`, and a `condition variable`in your `Internal` class.You will also need to have an `atomic` variable forletting a thread who receives `BYE` indicate to other threads that they shouldnot keep waiting on the queue (probably in addition to broadcasting on thecondition variable).### `concurrenthashtable_impl.h`Note: The specifics of how we are structuring the concurrent hash table (threefiles… yuck!) are not ideal, but we are doing it like this so that theassignment is easier to grade, and so that we can distribute a compiled solutionfor the hash table in a later assignment.The hash table you create needs to be concurrent, and it needs to be scalable,but it does not need to be particularly clever.It is sufficient to have afixed-size array/vector (whose size is governed by a command-line argument) of`std::unordered_map`s.Then you can have a single mutex per entry in thefixed-size array, and you’ll end up with a reasonably scalable hash table.You will need to implement your hash table correctly, so that it can beinstantiated twice in `server/storage.cc` file: once for the directory, and oncefor the key/value store.There are some basic concurrency best practices you should follow, like usingmutexes *even when reading* and using `lock_guard`.The hardest part though isremembering to use two-phase locking (2PL) everywhere where more than one bucketis accessed.Recall that with 2PL, all locks that an operation might acquire*must* be acquired before any lock is released.This can be done byincrementally growing the lock set, and releasing all at the end; byaggressively locking everything early, and then releasing as the data structureis released; or by aggressively locking everything early, and then releasingeverything at the end.We won’t be testing the performance of your 2PL code, soyou don’t really need to worry about which one performs best.However, you willneed to implement `do_all_readonly` carefully, since it will be used by your`storage.cc` file in the `persist()` method.The good news is that the reference solution is only about 150 lines of code.But as you probably have guessed by now, there is a lot of subtlety to gettingthat code to be correct.
Reviews
There are no reviews yet.