MapReduce with Hadoop Streaming in bash – Part 1

Share Button

So to commemorate my recent certification and because my Java absolutely sucks, I decided to do a common algorithm using Hadoop Streaming.

Hadoop Streaming

Hadoop Streaming allows you to write MapReduce code in any language that can process stdin and stdout. This includes Python, PHP, Ruby, Perl, bash, node.js, and tons of others. I’m a huge fan of node and PHP but not everyone knows those. Python is desirable and I’m working to learn it but nowhere near ready yet. So I went for bash, since most Oracle-heads and other Linux lovers know it.

The algorithm I’m using is TF-IDF, which stands for Term Frequency – Inverse Document Frequency. According to Wikipedia, TF-IDF is “a numerical statistic which reflects how important a word is to a document in a collection or corpus”. It’s useful for search ranking, collaborative filtering, and other tasks. In this article (Part 1), we’re going to calculate term frequency by grabbing the lines of each file, parsing out all the words (map), then summing them up to show the frequency of each word per document (reduce).

The Setup

To set this up I’m using the Cloudera QuickStart VM. This is a phenomenal resource that is preconfigured with CDH4.3 and tons of extra tools. The data I’m working with is small and simple since I’m running in pseudo-distributed mode on a VM, consisting of 8 Stephen Crane poems (my favorite) in text format.

I had to load this data into Hadoop, so I made a ‘crane’ directory and put the files in there.

And we’re set!

The Mapper

So here’s the mapper (maptf.sh), which reads lines of whatever file is sent to it, tokenizes it, then emits keys and values (tab separated).

Let’s go through the code:

  1. Define the exclude variable. This variable holds the regex characters that will be stripped out during the map.
  2. Main loop. This reads stdin (while read) into a variable called ‘split’, one line at a time.
  3. Inner loop. For each word in the ‘split’ variable (native tokenizing)
  4. Set the ‘term’ variable equal to the current word, excluding characters from the ‘exclude’ variable, and converted to lowercase.
  5. Make sure ‘term’ isn’t empty.
  6. Print the output in the form of: term-inputfile-1 (with tabs instead of dashes). Inputfile in this case is represented by the environment variable ‘map_input_file’. This is a standard Map variable normally denoted as map.input.file; however, Hadoop Streaming turns the periods into underscores for compatibility.

The cool part is that since this is a shell script, we can test it at the command prompt to see how it works by reading the file and piping the script. Note that I’m setting the ‘map_input_file’ variable manually for the test so I get the proper output.

At this point it’s no different from a simple wordcount Mapper. Which is sort of what the term frequency portion of this algorithm is, except that it takes the file and the term into account.

The Reducer

The Reducer is where we’ll aggregate the data that was emitted. The 8 files that will serve as input to this MapReduce job will be broken into 8 Mappers which each run the maptf.sh for their specific input split. Then results are then put through the ‘shuffle and sort’ phase where the keys are sorted (the first two output columns are the key in this case, more on this later) and sent to the reducer(s). The reducer then takes all the data and aggregates it into the final format. Our reducer will take the Map data with format (term-file-1) and sum it up to (term-file-termfrequency).

  1. Read the first line, putting the fields into the variables ‘currterm’, ‘currfile’, and ‘currnum’
  2. Loop through the rest of the file, putting new terms into the variables ‘term’, ‘file’, and ‘num’
  3. Check to see if the latest term matches the previous term and the latest file matches the previous file. Remember, this works because the input to a reducer is ALWAYS sorted by key! The magic of shuffle and sort.
    1. Set ‘currnum’ equal to ‘currnum’ plus the latest value of ‘num’ (always 1 in this case
  4. Else… (no match, it’s a new term/file combo)
    1. Print the current term, current file, and current sum in tab delimited format.
    2. Set ‘currterm’ equal to the latest ‘term’
    3. Set ‘currfile’ equal to the latest ‘file’
    4. Set ‘currnum’ equal to the latest ‘num’
  5. Keep doing that until the loop’s exhausted, then print the final value.

Fun, right? What’s cool as that we can test this the same way we tested the mapper, as long as we sort first. Remember, sorting has to be done on the first two columns which make up the key. So:

And that’s our expected result.

Hadoop It Up

The time has finally come to run our MapReduce script. To do this we’re going to use the ‘hadoop’ command with the Hadoop Streaming JAR file included with the distro. Here’s the command we’ll use:

NOTE: The backslashes (\) are just to say that I’m splitting the command up over multiple lines.

This command is doing a few critical things. First, it says we want to run the hadoop-streaming.jar file. The -D option then allows us to enter any general options.

The first one is absolutely critical: stream.num.map.output.key.fields=2. This tells my MapReduce job that the first two fields output by the Mapper will be the key. It’s critical because the sort and shuffle phase needs to sort keys in order for the reducer to work properly. This is the case for all MapReduce jobs in any language, but only Hadoop Streaming needs to worry about this parameter.

The next parameter is the ‘-input’ option which is the HDFS location of the input files. It can be either a directory or any POSIX compliant glob match. The next parameter is ‘-output’ which is the location on HDFS where the output should be dumped. This directory MUST NOT exist. Then we define the ‘-mapper’ and ‘-reducer’ parameters, pointing them to my shell scripts. Simple.

You can see the output of this command here:

Now we can go look at our results to see how the job did. The results will be in the ‘crane_out’ directory as specified by the hadoop command. So let’s take a look:

The ‘part-00000′ file is our output. By default, MapReduce ignores files that begin with an underscore (_) or a period (.). The output of the MapReduce job produced two ignorable files and one ‘part’ file which was the output of the single reducer used to aggregate our numbers. If this were a bigger dataset with more reducers, we’d have more part files.

So let’s take a look at our final output:

As you can see, each output record consists of a term, a filename, and a count (term frequency).

Special Note

Daniel Templeton left a very important note in the comments. In these examples I am running my scripts from the local filesystem; however, it’s a much better practice to load them into HDFS. Running on a VM is great but can make you lazy…once you move on to running on a cluster it will make a huge difference! He offered up this example:

Conclusion

Now normally you’d want to check your words against a stoplist and rule out all the common ones like ‘a’ and ‘and’ and such. However, since this is a small dataset and Stephen Crane is a man of few words, we’ll leave them in to see how our final algorithm holds up.

What we just calculated is the crucial ‘term frequency’ part of the TF-IDF algorithm. In the next part, we’ll be calculating the number of documents each term appears in (document frequency), an important part of the IDF portion of the algorithm. We’ll do this with another MapReduce job using different code, using the output from today’s job as input. See you then!

Oh yeah, one more thing. I’m not the best bash coder out there so if I could have coded the two functions better let me know! I tried using arrays first but that was slooooooow.

Added Note: I uploaded the source data and scripts into GitHub and will add new scripts as the three part blog tutorial moves forward. The Cloudera VM comes preconfigured with git.

Share Button

7 Responses to “MapReduce with Hadoop Streaming in bash – Part 1”

  1. Timothy Potter says:

    This is a quality blog post for sure, but I think it is worth mentioning that this approach looks a little cumbersome for computing TF-IDF. I get that you chose TF-IDF because it’s a simple and popular computation but a better solution is only a few lines of trivial Pig code (e.g. http://hortonworks.com/blog/pig-macro-for-tf-idf-makes-topic-summarization-2-lines-of-pig/). Pig will create the optimized M/R jobs to do the actual work. Not to mention it would be hard (and/or slow) to integrate a more sophisticated analysis/tokenization strategy into a streaming job, i.e. using Lucene’s StandardAnalyzer to tokenize and split text. With Pig, you could write a simple UDF to invoke a Lucene Analyzer in a few lines of code.

  2. Steve Karam says:

    Timothy, thanks for the input into better tech to use for this purpose! I’m hoping to tackle some Mahout tasks later for blogging and will probably use Mahout/Lucene for the same type of things.

    I figured TF-IDF would be a good algorithm to use to demo the fact that shell scripts can be used for MapReduce…as cumbersome as they may be. ;) Your acronym on LinkedIn was spot on: I’m trying not to create YAWCE (Yet Another Word Count Example)! Love it.

  3. I absolutely LOVE that you did this with with Crane’s poetry!

    One thing to watch, though, is that you skipped the step where you uploaded your scripts into HDFS. A better approach would be to let streaming handle it for you:

    $ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -D stream.num.map.output.key.fields=2 -input crane -output crane_out \
    -file ./maptf.sh -mapper maptf.sh -file ./redtf.sh -reducer redtf.sh

    The -file args tell streaming to upload your scripts into the working directory of the job, so the commands passed to -mapper and -reducer are running with your scripts in the cwd.

    A man cried out to the JobTracker, “Sir, my job exists!”

    “However,” replied the JobTracker, “that fact has not created in me a guarantee of data locality.”

  4. Steve Karam says:

    Daniel, you’re awesome. Thanks for the -file note, that’s definitely something that was missing! I’ll add it into the post.

    And that parody is spot on as well. Absolutely hilarious.

  5. Ben Okopnik says:

    I’m afraid that ‘split’ trick doesn’t work too well; it simply eliminates the specified characters, joining the (possibly unrelated) words:

    $ map_input_file=test ./maptf.sh
    not-connected
    notconnected test 1
    joe,frank,lisa
    joefranklisa test 1

    Try this instead:

    ———————————————————————————————————–
    #!/bin/bash

    old_ifs=”$IFS”
    while read split; do
    IFS=’ .,?!-_:;][#|$()"'
    for word in $split; do
    term=echo $word | tr [:upper:] [:lower:]
    [ -n "$term" ] && printf "%s\t%s\t%s\n" "$term" "$map_input_file" 1
    done
    IFS="$old_ifs"
    done
    -----------------------------------------------------------------------------------------------------------

    $ map_input_file=test ./maptf.sh
    not-connected
    not test 1
    connected test 1
    joe,frank,lisa
    joe test 1
    frank test 1
    lisa test 1

    Good informative post overall, though - thank you!

    Ben Okopnik

  6. rICh says:

    Steve, this. is. awesome! I’m sharing it with every developer class I have going forward. Hugely useful stuff and I love how you did it in bash. Ben, thank you for the clarifications & code rewrite.

  7. Pswain says:

    Cant get past the error java.lang.ClassNotFoundException: Class org.apache.hadoop.streaming.PipeMapRunner not found – while running mapreduce streaming job.
    > hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming-2.0.0-cdh4.4.0.jar -D stream.num.map.output.key.fields=2 -input crane -output crane_out -file ./maptf.sh -mapper maptf.sh -file ./redtf.sh -reducer redtf.sh -verbose

    I have replaced the shared lib under oozie. But I’m not quite sure what could be causing this. please ignore if this is not the right forum. I’m new to this area.

    thanks.

Trackbacks/Pingbacks

  1. Bash on Hadoop Streaming - […] I have seen some examples here which though involves reading the contents of the file. Though in my case, …

Leave a Reply