MapReduce with Hadoop Streaming in bash – Part 2

Share Button

In MapReduce with Hadoop Streaming in bash – Part 1 we found the ‘term frequency’ of words within a collection of documents. For the documents I chose 8 Stephen Crane poems, and our bash Map and Reduce jobs tokenized the words and found their frequency among the entire set. The final output was “term-file-tf”, where tf is term frequency and the dash delimiters were actually tabs. A sample of the output looked like this:

Today we will be calculating the ‘document frequency’, or the number of documents each word appears in. This will help us calculate the ‘inverse document frequency’ (IDF) portion of our TF-IDF algorithm. To do this, we’ll be using our term frequency output as the input to our document frequency MapReduce job, using term and filename as our input key. The actual key/value transformation will look like this (key is the first variable or parenthetical:

  • {(term, file),tf} -> Map -> {term,(file, tf, 1)}
  • {term,(file, tf, 1)} -> Reduce -> {(term, file), (tf, df)}

This is the trickiest part of the TF-IDF calculation, because the reduce job has to span multiple documents in a single read loop and therefore buffer in-progress rows. But more on that later. For now let’s get started!

The Mapper

For the purposes of testing, I’m first going to pull the results of yesterday’s term frequency job to the local filesystem.

Now let’s take a look at our document frequency mapper code.

This script is exceedingly simple this time because we’re working with more structured input as opposed to yesterday where we had to tokenize unstructured data (plain text). The code above does the following:

  1. Read and loop the input in as three variables: term, file, and num (tf from our last job’s output)
  2. Print the variables back out, appending a new column with a value of “1″. All we’re showing here is that yes, this term made an appearance in this file. Since this calculation is for document frequency each word-per-doc result is just 1.

That was easy, right? Let’s test it using the file we grabbed from the last job.

Looks good! Moving on.

The Reducer

This is where our feeling of “oh, that was easy” is bashed (pardon the pun) beyond recognition.

Alright, let’s slog through it.

  1. Just like our term frequency example, we’re going to read the first line of the file in as the variables “currterm”, “currfile”, “currtf”, and “currdf”.
  2. Loop through the rest of the file with the variables “term”, “file”, “tf”, and “df”.
  3. Remember that the “term” is the only key for the input–the rest count as values. As such, we check to see if the newest value of “term” equals the last one stored in “currterm”.
    • If matched
      1. increment our document frequency (df) by the loop value (always 1 in this case)
      2. Add the term, file, and tf to a buffer so we can print it out later (very important)
    • If not matched (new term)
      1. Print the buffer, adding the total document frequency for the term to the end of each line in it (saved during incrementing from before)
      2. Print the last and most recent term, file, term frequency, and document frequency.
      3. Reset the buffer and set all the curr* variables to the latest variable value to begin again.
  4. Print out the final buffer and final line of the file.

The HammerTrust me, it was more painful to write than to read. One of the tougher parts about Hadoop Streaming is that you are responsible for maintaining the state and scope of the keys, as opposed to Java where it’s done for you. Beyond my bash shortcomings, I had problems early on with this because I was using the wrong key in my conditions–it is absolutely vital that you keep track of the key in your reducer calculations. Let’s see how it looks with a bash test:

Unfortunately not much to see here (and I don’t want to paste the whole thing in the interest of space), but at least it is correct. Remember, the fields here are term, file, tf (frequency of the term within the file), and df (frequency of the term across all files). Those terms only appear once in their associated file and in the document set overall. Thank you Stephen Crane for your uniqueness.

Time to Hadoop

Now that the mapper and reducer are done, here’s the command we will use to process it through MapReduce:

Remember, the backslashes are only there to say this is a multi-line input. If you type it all on one line you don’t need them. Also note that the stream.num.map.output.key.fields is set to 1 here, as the output from the Mapper has only one column for the key: term. This is important because the shuffle and sort phase needs to sort on the key. The input location is the results from the last job (crane_out/) and the output is a new directory (must not exist) called crane_out2/.

So let’s run it and see what happens!

Looks good! Well…completed at least. Let’s take a look at the output.

Beautiful! Each word/file combination now has an associated term frequency and document frequency. Simple checks against your source data with ‘grep’ can determine if it’s correct or not. For example, take a look at the word ‘from’:

According to my MapReduce job, the word ‘from’ has a document frequency of 4 because it appears in 4 different files. In ‘a_spirit_sped.txt’ it has a term frequency of 2 (appears twice) and in the others it has a term frequency of 1 (appears once). Let’s see if that’s right.

Looks good to me! I think we’re set for the day.

Conclusion

In Part 1, we completed a MapReduce job to calculate the term frequency of words within documents. In this part, we completed a MapReduce job to go through the output and append document frequency for each term–i.e., the amount of documents the term appears in. Both of these numbers are critical for our final calculation in the next article which will calculate the Term Frequency/Inverse Document Frequency (TF-IDF). Stay tuned!

Share Button

4 Responses to “MapReduce with Hadoop Streaming in bash – Part 2”

  1. sudheer1313 says:

    Thanks for sharing this valuble information and itis useful for me .Hadoop online trainings also provides the best Hadoop online training classes in India,uk.

  2. Since your map script really doesn’t do anything, you’d probably be better off using the identity mapper and just always assuming df=1 in the reducer (which is true). It saves you a script, and your command becomes:

    $ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -D stream.num.map.output.key.fields=1 -input crane_out -output crane_out2 \
    -file reddf.sh -reducer reddf.sh \
    -inputformat org.apache.hadoop.mapred.KeyValueTextInputFormat

    The -inputformat is because the identity mapper spits out whatever it’s handed, and the default input format hands it keys that are type Long. The streaming reducer expects Text keys, and unhappiness ensues. The KeyValueTextInputFormat spits out text keys, as the reducer expects.

  3. Steve Karam says:

    Thank you for pointing that out Daniel. That makes a lot of sense, not sure why I didn’t think to use the IdentityMapper. Since you brought it up though I have a question for you…I tried to use “org.apache.hadoop.mapred.lib.IdentityReducer” in Oozie (through Hue) and it didn’t like that so I wrote my own IdentityReducer. Do you know if I missed something there or is it not supported?

  4. Off the top of my head, no idea. My Oozie experience is limited.

Leave a Reply