First Handmade MapReduce Example

Wang William (WJWang)
7 min readJun 21, 2017

--

###改良WordCounting的結果

我們在這一次的範例中將改良先前WordCounting Example 中以單純的空白作為切字依據的方法,我們要map function中加上過濾與單字相連的標點符號的條件,舉例來說,我們會將先前結果:

``` Hello 1 Hello! 1 Hello… 1 Hello, 1 ``` 這四種情形皆視為相同的key來作為wordcounting的運算並得到`[Hello,4]`這樣的運算結果。

因此我們會使用到開發工具Eclipse做為我們的IDE(整合式開發環境)。 並使用Maven來管理我們專案中library的相依性關係。 而會使用到的Maven的Library我們可以透過Maven Repository這個地方來尋找及下載。

####(1)首先我們要創立一個新的Maven Project.

1.png
2.png
3.png
4.png

####(2)建立完新的Maven Project後,我們會需要透過修改專案中的pom.xml來引入我們在這個專案中會使用到的函式庫,如下圖所示,我們在其中加入如下的設定。 參考連結:hadoop-mapreduce-client-corehadoop-common

5.png

####(3)加入完成後,我們要新增一個.class檔來撰寫我們這一次的程式碼。

6.png
7.png

####(4)此次的程式碼如下:

package wordcount;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

import java.io.IOException;
import java.util.Iterator;

public class WordCount {

public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
char[] charArray = line.toCharArray();
int lastIndex = -1;
for (int i = 0; i < charArray.length; i++) {
char current = charArray[i];
if (!isEnglish(current)) {
if ((i - lastIndex) > 1) {
String candidate = line.substring(lastIndex + 1, i);
word.set(candidate);
output.collect(word, one);
}
lastIndex = i;
}
}
}
}

public static boolean isEnglish(char c) {
return (c >= 65 && c <= 90) || (c >= 97 && c <= 122);
}

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}

public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount");

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);

conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));

JobClient.runJob(conf);
}
}
8.png

###(5)再來我們必須將我們完成的程式碼打包成可以在JVM上可以執行的.jar檔,步驟如下:

9.png
10.png
11.png

###(6)此時我們可以得到一個打包好的可執行的.jar檔,我們便可以將這個檔案放上hadoop上,並透過hadoop指令如同前一篇(Run MapReduce WordCount Example)的相同作法來運行我們的程式碼了。

12.png
13.png

--

--

No responses yet