无root权限下hadoop与map reduce的完全分布式搭建方案

首先说用一下,用到了域名,纯IP地址理论上也是可以的,不过没试。网上搜了一大堆教程,全都要求改/etc/hosts,对于一个没有root权限的用户来说,后面的教程全部都是一纸废话罢了。

这里就记录下自己的一次搭建过程,踩过的坑。

搭建主要有两部分,一个是Hadoop的分布式文件存储系统HDFS,另外一个是基于Yarn的Map Reduce框架。

集群描述#

这里用的是3个物理机,要求3个物理机都可以任意相互访问。

  • 机器A:IP 192.168.1.101,主机名为hostA,用户名为userA,分配的域名为machine-a.xxxxx.local
  • 机器B:IP 192.168.1.102,主机名为hostB,用户名为userB,分配的域名为machine-b.xxxxx.local
  • 机器C:IP 192.168.1.103,主机名为hostC,用户名为userC,分配的域名为machine-c.xxxxx.local

简单来说,运行ssh userA@machine-a.xxxxx.local时,最后一行显示的是userA@hostA:~$就对了。

首先需要配置3个物理机的ssh免密登录,用的是ssh-keygen和ssh-copy-id的命令,这个随便搜都有教程,就不再啰嗦了。其次,需要为所有机子安装jdk,其实jre也可以,就是少了个jps命令而已。

然后往每个机子上放一份hadoop,目前用的是3.2.2版的,解压到~/hadoop-3.2.2/(举个例子)。接着把hadoop-3.2.2/binhadoop-3.2.2/sbin加到PATH环境变量中。

HDFS搭建#

所有配置文件都在~/hadoop-3.2.2/etc/hadoop/*里面,每个机子的配置都要改。

首先是hadoop-env.sh,要修改的是JAVA_HOME这一行:export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64。这里填你实际的JAVA_HOME的路径。

注意一点的是,所有机子的HADOOP_HOME(即Hadoop的安装路径)必须一致,否则会报很多莫名其妙的错(比如说无法加载类YarnChild啥的),如果所有机子用的是同一个用户名还好说,不同机子的用户名还不一样的话就比较棘手了。不用root的话,这里提供一个临时的解决方案:在/tmp创建个符号链接:ln -s /home/userX/hadoop-3.2.2 /tmp/hadoop-3.2.2,然后把HADOOP_HOME设为/tmp/hadoop-3.2.2

然后是hdfs-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.http-address</name>
<value>machine-a.xxxxx.local:50088</value>
</property>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>machine-c.xxxxx.local:50090</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
<property>
<name>dfs.namenode.datanode.registration.ip-hostname-check</name>
<value>false</value>
</property>
</configuration>

这里指明了机器A作为name node,而机器C作为secondary name node,dfs.replication指数据块的重复度,默认为3,这里改为1。

core-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>/home/userX/hdfs/tmp</value>
</property>
<property>
<name>fs.defaultFS</name>
<value>hdfs://machine-a.xxxxx.local:9000</value>
</property>
</configuration>

hadoop.tmp.dir指定临时文件夹的位置,记得别忘了用mkdir创建就行,fs.defaultFS指向的是name node(也就是机器A)的地址

workers:填的是3台机子的域名

1
2
3
machine-a.xxxxx.local
machine-b.xxxxx.local
machine-c.xxxxx.local

差不多就是这样吧,然后大概就可以跑起来了。首先在机器A格式化name node:hdfs namenode -format,运行name node daemon:hadoop-daemon.sh start namenode,在机器C上运行secondary name node daemon:hadoop-daemon.sh start secondarynamenode,然后在作为data node的机子上运行hadoop-daemon.sh start datanode,这样子就可以在 http://machine-a.xxxxx.local:50088/ 上看到name node的状态和所有的data node了。

正常来说,到这里应该可以在每个节点上都能运行hdfs dfs -ls /之类的HDFS文件操作指令了。可以用jps命令看到底有哪些服务是运行着的。

Map Reduce框架的搭建#

照样是先改hadoop-3.2.2/etc/hadoop下的配置文件。

yarn-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
<?xml version="1.0"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<configuration>

<!-- Site specific YARN configuration properties -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>machine-b.xxxxx.local</value>
</property>
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>106800</value>
</property>
<property>
<name>yarn.nodemanager.address</name>
<value>machine-x.xxxxx.local:0</value>
</property>
<property>
<name>yarn.application.classpath</name>
<value>/tmp/hadoop-3.2.2/etc/hadoop:/tmp/hadoop-3.2.2/share/hadoop/common/lib/*:/tmp/hadoop-3.2.2/share/hadoop/common/*:/tmp/hadoop-3.2.2/share/hadoop/hdfs:/tmp/hadoop-3.2.2/share/hadoop/hdfs/lib/*:/tmp/hadoop-3.2.2/share/hadoop/hdfs/*:/tmp/hadoop-3.2.2/share/hadoop/mapreduce/lib/*:/tmp/hadoop-3.2.2/share/hadoop/mapreduce/*:/tmp/hadoop-3.2.2/share/hadoop/yarn:/tmp/hadoop-3.2.2/share/hadoop/yarn/lib/*:/tmp/hadoop-3.2.2/share/hadoop/yarn/*</value>
</property>
</configuration>

这里用机器B作为resource manager,把yarn.nodemanager.address改为每台机子实际的域名,yarn.application.classpath的一大堆东西通过运行hadoop classpath得到,复制粘贴过去。

mapred-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>yarn.app.mapreduce.am.env</name>
<value>HADOOP_MAPRED_HOME=/tmp/hadoop-3.2.2</value>
</property>
<property>
<name>mapreduce.map.env</name>
<value>HADOOP_MAPRED_HOME=/tmp/hadoop-3.2.2</value>
</property>
<property>
<name>mapreduce.reduce.env</name>
<value>HADOOP_MAPRED_HOME=/tmp/hadoop-3.2.2</value>
</property>
<property>
<name>mapreduce.jobhistory.address</name>
<value>machine-a.xxxxx.local:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>machine-a.xxxxx.local:10040</value>
</property>
<property>
<name>mapreduce.application.classpath</name>
<value>/tmp/hadoop-3.2.2/etc/hadoop:/tmp/hadoop-3.2.2/share/hadoop/common/lib/*:/tmp/hadoop-3.2.2/share/hadoop/common/*:/tmp/hadoop-3.2.2/share/hadoop/hdfs:/tmp/hadoop-3.2.2/share/hadoop/hdfs/lib/*:/tmp/hadoop-3.2.2/share/hadoop/hdfs/*:/tmp/hadoop-3.2.2/share/hadoop/mapreduce/lib/*:/tmp/hadoop-3.2.2/share/hadoop/mapreduce/*:/tmp/hadoop-3.2.2/share/hadoop/yarn:/tmp/hadoop-3.2.2/share/hadoop/yarn/lib/*:/tmp/hadoop-3.2.2/share/hadoop/yarn/*</value>
</property>
<property>
<name>mapreduce.reduce.shuffle.input.buffer.percent</name>
<value>0.5</value>
</property>
<property>
<name>mapred.child.java.opts</name>
<value>-Xmx4096m</value>
</property>
</configuration>

这里用机器A作为job history server,mapreduce.application.classpath同样通过hadoop classpath得到。注意把HADOOP_MAPRED_HOME改为实际自己的Hadoop路径。

如果reduce阶段有什么out of memory的错误,尝试继续调低mapreduce.reduce.shuffle.input.buffer.percent以及增大mapred.child.java.opts

然后就可以运行起来了,机器B启动resource manager daemon:yarn-daemon.sh start resourcemanager,接着机器A启动job history daemon:mr-jobhistory-daemon.sh start historyserver,最后每个机子都作为一个计算节点,运行yarn-daemon.sh start nodemanager

这时候应该可以访问 http://machine-b.xxxxx.local:8088/cluster/cluster 查看每个节点的信息了。

正常来说,已经可以试着跑sample了,比如:hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.2.jar pi 10 10

爬点数据#

个人偏向于爬英文wiki,毕竟规则够简单,随便以一个页面为起点,把所有站内超链接爬一遍就可以了,数据清洗也很方便。这里就不贴爬虫和清洗数据的代码了(毕竟就随便写写)。

比如从 https://en.wikipedia.org/wiki/Field_(mathematics) 开始爬,甚至还能顺着爬到 https://en.wikipedia/wiki/Shirou_Emiya :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Shirou Emiya
From Wikipedia, the free encyclopedia
Jump to navigation
Jump to search
Shirou Emiya Fate character Shirou Emiya in Fate/Grand Order , as illustrated by Takashi Takeuchi First appearance Fate/stay night (2004) Created by Kinoko Nasu Voiced by Japanese Noriaki Sugiyama Junko Noda (young) English Sam Riegel ( Fate/stay night ) Patrick Poole ( Prisma Illya ) Bryce Papenbrook ( UBW , Heaven's Feel ) Mona Marshall (young) In-universe information Relatives Kiritsugu Emiya (adoptive father; deceased) Taiga Fujimura (guardian)
Shirou Emiya ( Japanese : 衛宮 士郎 , Hepburn : Emiya Shirō ) , also written as "Shiro Emiya" in Fate/unlimited codes , is a character and the main protagonist of the 2004 visual novel Fate/stay night , published by Type-Moon . Shirou is a teenager who accidentally participates in the "Holy Grail War" alongside six other mages looking for the eponymous treasure , an all-powerful, wish-granting relic. Shirou was the sole survivor of a fire in a city and was saved by a man named Kiritsugu Emiya who inspired him to become a hero and avoid killing people during fights. While fighting alongside the servant Saber , Shirou develops his own magical skills and, depending on the player's choices, he forms relationships with the novel's other characters. He also appears in the visual novel sequel Fate/hollow ataraxia , the prequel light novel Fate/Zero , and printed and animated adaptations of the original game.
Writer Kinoko Nasu created Shirou and Saber in stories he had written as a teenager. Nasu was worried that the story would not work as a bishōjo game because the main character was a girl. Artist Takashi Takeuchi suggested switching the genders of the protagonist and Saber to fit into the game market. For the anime adaptations following Fate/Zero , the staff wanted to make the character more serious in his interactions with the other characters while also giving him a more cheerful personality in contrast to the original visual novel. Shirou is regularly voiced in Japanese by Noriaki Sugiyama as a teenager and Junko Noda as a child; multiple voice actors have voiced him in the English releases of the anime adaptations.
Critics have commented on Shirou's different characterizations; his role in each part of the original Fate/stay night visual novel has received positive reaction due to his character development and relationship with the character Archer . Shirou's appearance in Studio Deen 's first Fate/stay night -based anime received a mixed response; critics initially disliked Shirou, but praised how his relationship with Saber evolved over time. In Ufotable 's anime series, based on the visual novel's route Unlimited Blade Works , the character was praised for how he dealt with questions about his ideals. Shirou has also appeared in multiple polls related to Fate and anime in general.
Contents
1 Creation and conception
1.1 Development
1.2 Voice actors
2 Characterization and themes
3 Appearances
3.1 In Fate/stay night
3.1.1 Fate route
3.1.2 Unlimited Blade Works route
3.1.3 Heaven's Feel route
3.2 In Fate/hollow ataraxia
3.3 Appearances in other media
3.3.1 Manga and anime
3.3.2 Video games and CD dramas
4 Cultural impact
4.1 Popularity
4.2 Critical reception
5 References
6 External links
Creation and conception [ edit ]
Shirou was originally conceived as a young girl by Kinoko Nasu until it was decided to change his gender to appeal to gamers.
Before writing Fate/stay night , Kinoko Nasu wrote the Fate route of the visual novel in his spare time as a high school student. Nasu originally imagined Shirou Emiya as a female character named Ayaka Sajyou ( 沙条綾香 ) who wore glasses and Saber as male. [1] Nasu swapped their genders due to his experience writing the novel Tsukihime and because Type-Moon believed a male protagonist would better fit the target demographic. [2] Shirou's role in the story was meant to highlight parts of his personality and growth based on the paths the player picks. The first Fate storyline shows his slanted mind; the next, Unlimited Blade Works, presents his resolve, and in the last storyline, Heaven's Feel, he becomes Sakura Matou 's ally and abandons his life-long passion of becoming a hero. [2] Shirou was created with the idea of being a stubborn man with ideals that would change the way his role in the story based on the different routes, something the Type-Moon originally wanted to make with the protagonist of Tsukihime . [3] Furthermore, Nasu wanted to portray him as a typical teenager while artist Takashi Takeuchi did not want him to have too much individuality in order to make players project themselves into him. [4] By the end of the making of the visual novel, Nasu described Shirou as a joyless hero disinterested in the war, denying himself personal happiness in order to save as many people as possible. [5] Shirou's character theme, "Emiya", while remixes and other themes were created to focus on important scenes related to his character. [6] [7]

...

注意:千万不要把几十万个小文件扔到HDFS(血的教训)。这里给一个合理点的方案:把URL当Key,Base64编码后的纯文本为Value,写到一个大文件中的一行就可以了,Key和Value中间用点什么分割就好。这个大文件可以适当分为若干个较小的文件作为Map Reduce的输入。

编写自己的Map Reduce程序:倒排索引#

直接在IDEA上建个maven项目就行了。

POM.xml#

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>mapreduce</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.2.2</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.2.2</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.2.2</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>3.2.2</version>
</dependency>
</dependencies>
</project>

Map Reduce代码#

src/main/java/org/example/MyWordCount.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package org.example;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;

public class MyWordCount extends Configured implements Tool {

private static class MyUtil {
public static String buildValuesFromMap(Map<String, Integer> map) {
StringBuilder sb = new StringBuilder();
Iterator<String> keys = map.keySet().iterator();
if (keys.hasNext()) {
String filePath = keys.next();
sb.append(filePath);
sb.append(":");
sb.append(map.get(filePath));
}
while (keys.hasNext()) {
String filePath = keys.next();
sb.append("|");
sb.append(filePath);
sb.append(":");
sb.append(map.get(filePath));
}
return sb.toString();
}
}

public static class MyWordCountMap extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {

private final Set<String> replaceWords = new HashSet<>();
private final Set<String> stopWords = new HashSet<>();

public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
// K1: ignore (long)
// V1: <file_path>:<base64_encoded_file_content> (string)
// K2: <file_path>:<word> (string)
// V2: 1 (int)
String line = value.toString();
int splitIndex = line.indexOf(":");
String filePath = line.substring(0, splitIndex);
Text filePathText = new Text(filePath);
line = line.substring(splitIndex+1);
line = new String(Base64.getDecoder().decode(line), StandardCharsets.UTF_8);
line = line.toLowerCase();
for (String replaceWord : replaceWords) {
line = line.replace(replaceWord, " ");
}
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
String token = tokenizer.nextToken();
if (stopWords.contains(token))
continue;
output.collect(new Text(token), filePathText);
}
}

@SuppressWarnings("deprecation")
@Override
public void configure(JobConf job) {
try {
Path[] paths = DistributedCache.getLocalCacheFiles(job);
if (paths.length == 0)
return;
BufferedReader br = new BufferedReader(new FileReader(paths[0].toString()));
String word;
while ((word = br.readLine()) != null) {
if (word.length() < 2)
continue;
if (word.charAt(0) == 'r')
replaceWords.add(word.substring(2));
else if (word.charAt(0) == 's')
stopWords.add(word.substring(2));
}
}
catch (IOException ignored) {
}
finally {
super.configure(job);
}
}
}


public static class MyWordCountCombine extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
// K2: <word> (string)
// V2: [<file_path>:<word_count>[|<file_path2>:<word_count2>[|...]], ...] (string[])
// K3: <word> (string)
// V3: <file_path>:<word_count>|<file_path2>:<word_count2>|... (string)

Map<String, Integer> map = new HashMap<>();
while (values.hasNext()) {
String value = values.next().toString();
for (String combinedFilePath : value.split("\\|")) {
int splitIndex = combinedFilePath.indexOf(":");
int incrementCount = splitIndex == -1 ? 1 : Integer.parseInt(combinedFilePath.substring(splitIndex+1));
String filePath = splitIndex == -1 ? combinedFilePath: combinedFilePath.substring(0, splitIndex);
if (map.containsKey(filePath))
map.put(filePath, map.get(filePath) + incrementCount);
else
map.put(filePath, incrementCount);
}
}
output.collect(key, new Text(MyUtil.buildValuesFromMap(map)));
}
}

public static class MyWordCountReduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
// private final Log log = LogFactory.getLog(MyWordCountReduce.class);
@Override
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
// K3: <word> (string)
// V3: [<file_path>:<word_count>|<file_path2>:<word_count2>..., <file_path3>:<word_count3>..., ...] (string[])
// K4: <word> (string)
// V4: <file_path>:<word_count>|<file_path2>:<word_count2>|... (string)
// log.warn("\"" + key.toString() + "\"");
Map<String, Integer> map = new HashMap<>();
while (values.hasNext()) {
String[] filePathWordCountList = values.next().toString().split("\\|");
for (String filePathWordCount: filePathWordCountList) {
int splitIndex = filePathWordCount.indexOf(":");
String filePath = filePathWordCount.substring(0, splitIndex);
int count = Integer.parseInt(filePathWordCount.substring(splitIndex+1));
if (map.containsKey(filePath)) {
map.put(filePath, map.get(filePath) + count);
}
else {
map.put(filePath, count);
}
}
}
output.collect(key, new Text(MyUtil.buildValuesFromMap(map)));
}
}

@Override
@SuppressWarnings("deprecation")
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(MyWordCount.class);
conf.setJobName("My fucking word count");
conf.setBoolean("mapreduce.input.fileinputformat.input.dir.recursive", true);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(MyWordCountMap.class);
conf.setCombinerClass(MyWordCountCombine.class);
conf.setReducerClass(MyWordCountReduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
FileInputFormat.setInputPaths(conf, new Path(args[0]));
if (args.length > 2) {
DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);
}
JobClient.runJob(conf);
return 0;
}

public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new Configuration(), new MyWordCount(), args));
}
}

这段代码以<url>:<base64(content)>为输入(即之前生成的文本中的一行,URL已默认去掉所有的”:”字符),map阶段负责content去停用词,分词,然后输出的Key为<token>,Value为<url>

Combiner将所有来自Mapper以及之前Combiner所产生的输出作为输入进行组合,输出Key依旧保持<token>不变,而Value则为token在某个URL页面中的出现次数<url1>:<token_count_for_url1>|<url2>:<token_count_for_url2>|...(”|”用作URL间的分隔符)。注意的是,来自Mapper的Value是<url>,而来自Combiner的中间结果是上面一长串字符串,小心处理好就ok。

最后的Reducer则将Combiner产生的所有字符串再做最后的拼接和去重,生成最终的结果。Key和Value的输入和输出格式与Combiner的输出相同,Key为<token>,Value依然为<url1>:<token_count_for_url1>|<url2>:<token_count_for_url2>|...

编译直接运行mvn package就ok了,生成的jar直接在hadoop上跑:hadoop jar mapreduce-1.0-SNAPSHOT.jar org.example.MyWordCount /enwiki /enwiki_output /stopwords.txt

第一个参数指定输入的HDFS路径,第二个参数指定Map Reduce的输出文件夹路径,第三个可选参数为停用词,格式为:

1
^([rs])\s(.+)$

第一个字符为r时执行替换操作,将后面的字符替换为空格,适用于去标点,如r ,就相当于把所有逗号替换为空格,注意r o会把hello替换成hell加一个空格。而第一个字符为s时则是全单词匹配,适用于去停用词,如s of会去掉所有的of,但不会去掉off(笑),更不会把它变为f。

应用场景:一个简单的搜索引擎?#

把词频拿到手,然后取3w个出现频率最高的单词作为词汇表,计算TF-IDF,存成稀疏矩阵形式。根据query和实际文档的余弦相似度作为排序依据,最后得到的结果如下:

准确率很堪忧就对了。