为什么我在apache hadoopp上执行单词频率统计却没有output-dir输出?

主题帖子积分
高级会员, 积分 2835, 距离下一级还需 2165 积分
高级会员, 积分 2835, 距离下一级还需 2165 积分
赞一个,,,,
欢迎加入about云群 、、 ,云计算爱好者群,亦可关注||
主题帖子积分
中级会员, 积分 409, 距离下一级还需 591 积分
中级会员, 积分 409, 距离下一级还需 591 积分
pig2 真是牛啊
主题帖子积分
新手上路, 积分 49, 距离下一级还需 1 积分
新手上路, 积分 49, 距离下一级还需 1 积分
非常好&&长知识了
主题帖子积分
注册会员, 积分 152, 距离下一级还需 48 积分
注册会员, 积分 152, 距离下一级还需 48 积分
多谢分享。
主题帖子积分
中级会员, 积分 770, 距离下一级还需 230 积分
中级会员, 积分 770, 距离下一级还需 230 积分
牛人,非常值得学习
主题帖子积分
中级会员, 积分 954, 距离下一级还需 46 积分
中级会员, 积分 954, 距离下一级还需 46 积分
mark,学习
主题帖子积分
中级会员, 积分 954, 距离下一级还需 46 积分
中级会员, 积分 954, 距离下一级还需 46 积分
经常参与各类话题的讨论,发帖内容较有主见
经常帮助其他会员答疑
活跃且尽责职守的版主
为论坛做出突出贡献的会员
站长推荐 /6
about云|新出视频,openstack零基础入门,解决你ping不通外网难题
云计算hadoop视频大全(新增 yarn、flume|storm、hadoop一套视频
视频资料大优惠
大数据零基础由入门到实战
阶段1:hadoop零基础入门基础篇
阶段2:hadoop2入门
阶段3:大数据非hadoop系列课程
阶段4:项目实战篇
阶段5:大数据高级系列应用课程
阶段6:工作实用系列教程
等待验证会员请验证邮箱
新手获取积分方法
Powered by单词计数是最简单也是最能体现MapReduce思想的程序之一,可以称为MapReduce版"Hello World",该程序的完整代码可以在Hadoop安装包的"src/examples"目录下找到。单词计数主要完成功能是:统计一系列文本文件中每个单词出现的次数,如下图所示。
现在我们以"hadoop"用户登录"Master.Hadoop"服务器。
1. 创建本地的示例数据文件:
依次进入【Home】-【hadoop】-【hadoop-1.2.1】创建一个文件夹file用来存储本地原始数据。
并在这个目录下创建2个文件分别命名为【myTest1.txt】和【myTest2.txt】或者你想要的任何文件名。
分别在这2个文件中输入下列示例语句:
2. 在HDFS上创建输入文件夹
呼出终端,输入下面指令:
bin/hadoop fs -mkdir hdfsInput
执行这个命令时可能会提示类似安全的问题,如果提示了,请使用
bin/hadoop dfsadmin -safemode leave
来退出安全模式。
当分布式文件系统处于安全模式的情况下,文件系统中的内容不允许修改也不允许删除,直到安全模式结 束。安全模式主要是为了系统启动的时候检查各个DataNode上数据块的有效性,同时根据策略必要的复制或者删除部分数据块。运行期通过命令也可以进入 安全模式。
意思是在HDFS远程创建一个输入目录,我们以后的文件需要上载到这个目录里面才能执行。
3. 上传本地file中文件到集群的hdfsInput目录下
在终端依次输入下面指令:
cd hadoop-1.2.1
bin/hadoop fs -put file/myTest*.txt hdfsInput
4. 运行例子:
在终端输入下面指令:
bin/hadoop jar hadoop-examples-1.2.1.jar wordcount hdfsInput hdfsOutput
注意,这里的示例程序是1.2.1版本的,可能每个机器有所不一致,那么请用*通配符代替版本号
bin/hadoop jar hadoop-examples-*.jar wordcount hdfsInput hdfsOutput
应该出现下面结果:
Hadoop命令会启动一个JVM来运行这个MapReduce程序,并自动获得Hadoop的配置,同时把类的路径(及其依赖关系)加入到Hadoop的库中。以上就是Hadoop Job的运行记录,从这里可以看到,这个Job被赋予了一个ID号:job__0002,而且得知输入文件有两个(Total input paths to process : 2),同时还可以了解map的输入输出记录(record数及字节数),以及reduce输入输出记录。
查看HDFS上hdfsOutput目录内容:
在终端输入下面指令:
bin/hadoop fs -ls hdfsOutput
从上图中知道生成了三个文件,我们的结果在"part-r-00000"中。
使用下面指令查看结果输出文件内容
bin/hadoop fs -cat output/part-r-00000
(注意:请忽视截图指令中的3)
输出目录日志以及输入目录中的文件是永久存在的,如果不删除的话,如果出现结果不一致,请参考这个因素。
Views(...) Comments()使用Hadoop 实现文档倒排索引
文档倒排索引主要是统计每个单词在各个文档中出现的频数,因此要以单词为key,value为文档以及该单词在此文档频数,即输出数据的格式形如:
& word1,[doc1,3] [doc2,4] ... & :表示word1这个单词在doc1文档中出现了3次,在doc2文档中出现了4次。
整个程序的输入是一系列文件,比如file01.txt, file02.txt, file03.txt ....,首先要将这些文件上传到hadoop hdfs中作为程序的输入。上传过程以及类的编译等可以参考这篇博客:运行Hadoop示例程序WordCount,这里不再详细介绍。本程序的源代码在文章最后面。
一、程序运行的大体思路
由于文档倒排索引考察的是一个单词和文档的关系,而默认的LineRecordReader是按照每行的偏移量作为map输入时的key值,每行的内容作为map的value值,这里的key值(即行偏移量对我们的意义不大),我们这里考虑将一个文档的名字作为关键字,而每一行的值作为value,这样处理起来比较方便,(即:map的输入形式为,主要是通过一个自定义的RecordReader类来实现,下面会有介绍)。整个程序数据处理流程如下面所示:
map类的主要作用是处理程序的输入,这里的输入形式是,即输入的关键字key是文件名如file01.txt,值value为一行数据,map的任务是将这一行数据进行分词,并以图中第一部分的形式进行输出。
combine类的主要作用是将map输出的相同的key的value进行合并(相加),这样有利于减少数据传输,combine是在本节点进行的。
partition的主要作用是对combine的输出进行分区,分区的目的是使key值相同的数据被分到同一个节点,这样在进行reduce操作的时候仅需要本地的数据就足够,不需要通过网络向其他节点寻找数据。上图中的 &partitionby word1 rather than word1#doc1& 意思是将word1作为分区时的关键字,而不是word1#doc1,因为我们在之前的输出的关键字的形式是word1#doc1的不是word1这样系统会默认按照进行word1#doc1分区,而我们最终想要的结果是按照word1分区的,所以需要我们自定义patition类。
reduce的操作主要是将结果进行求和整理,并使结果符合我们所要的形式。
2、程序和各个类的设计说明
这部分按照程序执行的顺序依次介绍每个类的设计和作用,有些子类继承了父类,但是并没有重新实现父类的方法,这里不详细介绍这些方法。
2.1、FileNameRecordReader类
FileNameRecordReader类继承自RecordReader,是RecordReader类的自定义实现,主要作用是将记录所在的文件名作为key,而不是记录行所在文件的偏移,获取文件名所用的语句为:
fileName = ((FileSplit) arg0).getPath().getName();
2.2、FileNameInputFormat类
因为我们重写了RecordReader类,这里要重写FileInputFormat类来使用我们的自定义FileNameRecordReader,这个类的主要作用就是返回一个FileNameRecordReader类的实例。
2.3、InvertedIndexMapper类
这个类继承自Mapper,主要方法有setup和map方法,setup方法的主要作用是在执行map前初始化一个stopwords的list,主要在map处理输入的单词时,如果该单词在stopwords的list中,则跳过该单词,不进行处理。stopwords刚开始是以一个文本文件的形式存放在hdfs中,程序在刚开始执行的时候通过Hadoop Configuration将这个文本文件设置为CacheFile供各个节点共享,并在执行map前,初始化一个stopwords列表。
InvertedIndexMapper的主要操作是map,这个方法将读入的一行数据进行分词操作,并以的键值对形式,向外写数据,在map方法中,写出的value都是1。InvertedIndexMapper类的类图如下图2所示。
2.4、SumCombiner类
这个类主要是将前面InvertedIndexMapper类的输出结果进行合并,如果一个单词在一个文档中出现了多次,则将value的值设置为出现的次数和。
2.5、NewPartitioner类
分区类主要是将前面的输出进行分区,即选择合适的节点,分区类一般使用关键字key进行分区,但是我们这里的关键字为word1#doc1,我们最终是想让word相同的记录在同一台节点上,故NewPartitioner的任务是利用word进行分区。
2.5、InvertedIndexReducer类
InvertedIndexReducerreduce的输入形式为:
,如第一个图中所示可见同一个单词会作为多次输入,传递给reduce,而最终的结果要求只输出一次单词,而不同的文档如doc1,doc2要作为这个单词的value输出,我们的reduce在实现此功能时,设置两个变量CurrentItem和postingList,其中CurrentItem保存每次每次读入的key,初始值为空,postingList是一个列表,表示这个key对于的出现的文档以及在此文档中出现的次数。因为同一个key可能被读入多次,每次在读入key时,同上一个CurrentItem进行比较,如果跟上一个CurrentItem相同,表示读入的是同一个key,进而将新读入的key的文档追加到postingList中;如果根上一个CurrentItem不同,表示相同的单词以及读完了,这时候我们要统计上一个CurrentItem出现的总次数,以及含有此item的总的文章数,这些信息我们之前都存放在postingList中,只要遍历此时的postingList就能得到上述信息,并在得到信息之后重置CurrentItem和postingList。具体见代码实现。其类图如上图所示。
3、运行结果截图
我编译以及执行使用的命令如下,大家可以根据自己目录情况适当调整
javac -classpath ~/hadoop-1.2.1/hadoop-core-1.2.1.jar -d ./ InvertedIndexer.java
jar -cfv inverted.jar -C ./* .
hadoop jar ./inverted.jar InvertedIndexer input output
#运行结束后显示
hadoop fs -cat output/part-r-00000
结果截图:
import java.io.BufferedR
import java.io.FileR
import java.io.IOE
import java.net.URI;
import java.util.L
import java.util.S
import java.util.StringT
import java.util.ArrayL
import java.util.TreeS
import org.apache.hadoop.conf.C
import org.apache.hadoop.filecache.DistributedC
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.RecordR
import org.apache.hadoop.mapreduce.lib.input.LineRecordR
import org.apache.hadoop.mapreduce.InputS
import org.apache.hadoop.mapreduce.lib.input.FileS
import org.apache.hadoop.mapreduce.TaskAttemptC
import org.apache.hadoop.mapreduce.lib.partition.HashP
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.R
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
public class InvertedIndexer {
public static class FileNameInputFormat extends FileInputFormat {
public RecordReader createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException,
InterruptedException {
FileNameRecordReader fnrr = new FileNameRecordReader();
fnrr.initialize(split, context);
public static class FileNameRecordReader extends RecordReader {
String fileN
LineRecordReader lrr = new LineRecordReader();
public Text getCurrentKey() throws IOException, InterruptedException {
return new Text(fileName);
public Text getCurrentValue() throws IOException, InterruptedException {
return lrr.getCurrentValue();
public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {
lrr.initialize(arg0, arg1);
fileName = ((FileSplit) arg0).getPath().getName();
public void close() throws IOException {
lrr.close();
public boolean nextKeyValue() throws IOException, InterruptedException {
return lrr.nextKeyValue();
public float getProgress() throws IOException, InterruptedException {
return lrr.getProgress();
public static class InvertedIndexMapper extends Mapper {
private Set
private Path[] localF
private String pattern = &[^\\w]&;
public void setup(Context context) throws IOException,InterruptedException {
stopwords = new TreeSet();
Configuration conf = context.getConfiguration();
localFiles = DistributedCache.getLocalCacheFiles(conf);
for (int i = 0; i & localFiles. i++) {
BufferedReader br = new BufferedReader(new FileReader(localFiles[i].toString()));
while ((line = br.readLine()) != null) {
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
stopwords.add(itr.nextToken());
br.close();
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
String temp = new String();
String line = value.toString().toLowerCase();
line = line.replaceAll(pattern, & &);
StringTokenizer itr = new StringTokenizer(line);
for (; itr.hasMoreTokens();) {
temp = itr.nextToken();
if (!stopwords.contains(temp)) {
Text word = new Text();
word.set(temp + &#& + key);
context.write(word, new IntWritable(1));
public static class SumCombiner extends Reducer {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
result.set(sum);
context.write(key, result);
public static class NewPartitioner extends HashPartitioner {
public int getPartition(Text key, IntWritable value, int numReduceTasks) {
String term = new String();
term = key.toString().split(&#&)[0]; // =&term
return super.getPartition(new Text(term), value, numReduceTasks);
public static class InvertedIndexReducer extends Reducer {
private Text word1 = new Text();
private Text word2 = new Text();
String temp = new String();
static Text CurrentItem = new Text(& &);
static List postingList = new ArrayList();
public void reduce(Text key, Iterable values,
Context context) throws IOException, InterruptedException {
int sum = 0;
String keyWord = key.toString().split(&#&)[0];
int needBlank = 15-keyWord.length();
for(int i=0;i&i++){ keyword=&& +=& & ;=&& }=&& word1.set(keyword);=&& temp=&key.toString().split(&#&)[1];& key的形式为word1#doc1,所以temp为doc1=&& for=&& (intwritable=&& val=&& :=&& values)=&& {=&& 得到某个单词在一个文件中的总数=&& sum=&& word2.set(&[&=&& &,&=&& &]&);=&& word2的格式为:[doc1,3]=&& if=&& (!currentitem.equals(word1)=&& &&=&& !currentitem.equals(&=&& &))=&& stringbuilder=&& out=&new& stringbuilder();=&& long=&& count=&0;& double=&& filecount=&0;& (string=&& p=&& postinglist)=&& out.append(p);=&& out.append(&=&& &);=&& long.parselong(p.substring(p.indexof(&,&)=&& 1,p.indexof(&]&)));=&& filecount++;=&& out.append(&[total,&=&& &]=&& average=&count/fileC& out.append(&[average,&+string.format(&%.3f&,=&& average)+&].&);=&& (count=&&& 0)
context.write(CurrentItem, new Text(out.toString()));
postingList = new ArrayList();
CurrentItem = new Text(word1);
postingList.add(word2.toString());
public void cleanup(Context context) throws IOException,InterruptedException {
StringBuilder out = new StringBuilder();
long count = 0;
for (String p : postingList) {
out.append(p);
out.append(& &);
count = count + Long.parseLong(p.substring(p.indexOf(&,&) + 1,p.indexOf(&]&)));
out.append(&[total,& + count + &].&);
if (count & 0)
context.write(CurrentItem, new Text(out.toString()));
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
DistributedCache.addCacheFile(new URI(&hdfs://namenode:9000/user/hadoop/stop_word/stop_word.txt&),conf);
Job job = new Job(conf, &inverted index&);
job.setJarByClass(InvertedIndexer.class);
job.setInputFormatClass(FileNameInputFormat.class);
job.setMapperClass(InvertedIndexMapper.class);
job.setCombinerClass(SumCombiner.class);
job.setReducerClass(InvertedIndexReducer.class);
job.setPartitionerClass(NewPartitioner.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
4、参考文献
《深入理解大数据 大数据处理与实战》主编:黄宜华老师(南京大学)
(window.slotbydup=window.slotbydup || []).push({
id: '2467140',
container: s,
size: '1000,90',
display: 'inlay-fix'
(window.slotbydup=window.slotbydup || []).push({
id: '2467141',
container: s,
size: '1000,90',
display: 'inlay-fix'
(window.slotbydup=window.slotbydup || []).push({
id: '2467142',
container: s,
size: '1000,90',
display: 'inlay-fix'
(window.slotbydup=window.slotbydup || []).push({
id: '2467143',
container: s,
size: '1000,90',
display: 'inlay-fix'
(window.slotbydup=window.slotbydup || []).push({
id: '2467148',
container: s,
size: '1000,90',
display: 'inlay-fix'一个hadoop的简单测试例子 - 开源中国社区
当前访客身份:游客 [
当前位置:
发布于 日 17时,
从网上找了一个测试例子,统计文本中指定某个单词出现的次数。
调试了下发现几个bug,我把修改后的分享下。
eclipse下编译
vm参数:-Xms64m -Xmx512m
代码片段(1)
SingleWordCount.java&~&3KB&&&&
package com.run.ayena.distributed.
import java.io.IOE
import java.util.StringT
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.IntW
import org.apache.hadoop.io.T
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.R
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.mapreduce.lib.output.FileOutputF
import org.apache.hadoop.util.GenericOptionsP
////统计文本中指定某个单词出现的次数
public class SingleWordCount {
public static class SingleWordCountMapper extends
Mapper&Object, Text, Text, IntWritable& {
private final static IntWritable one = new IntWritable(1);
private Text val = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
String keyword = context.getConfiguration().get("word");
while (itr.hasMoreTokens()) {
String nextkey = itr.nextToken();
if (nextkey.trim().equals(keyword)) {
val.set(nextkey);
context.write(val, one);
// do nothing
public static class SingleWordCountReducer extends
Reducer&Text,IntWritable,Text,IntWritable& {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable&IntWritable& values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
result.set(sum);
context.write(key, result);
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Usage: wordcount
System.exit(2);
// 输入指定的单词
conf.set("word", otherArgs[2]);
// 指定系统路
conf.set("mapred.system.dir", "/cygdrive/e/workspace_hadoop/SingleWordCount/");
// 设置运行的job名称
Job job = new Job(conf, "word count");
// 设置运行的job类
job.setJarByClass(SingleWordCount.class);
// 设置Mapper
job.setMapperClass(SingleWordCountMapper.class);
// 设置本地聚合类,该例本地聚合类同Reduer类
job.setCombinerClass(SingleWordCountReducer.class);
// 设置Reduer
job.setReducerClass(SingleWordCountReducer.class);
// 设置Map的输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置Reducer输出的key类型
job.setOutputKeyClass(Text.class);
// 设置Reducer输出的value类型
job.setOutputValueClass(IntWritable.class);
// 设置输入和输出的目录
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// 执行,直到结束就退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
开源中国-程序员在线工具:
好有压力啊
2楼:geyonghong 发表于
有什么压力啊
3楼:flyingtree 发表于
请问楼主用的hadoop版本是多少?我在使用0.20.203版本测试wordcount的时候,Mapper显示是已经过期的接口。
4楼:geyonghong 发表于
hadoop-0.20.2/
开源从代码分享开始
geyonghong的其它代码让python在hadoop上跑起来
作者:wing1995
字体:[ ] 类型:转载 时间:
让python在hadoop上跑起来,python如何在hadoop上跑起来?感兴趣的小伙伴们可以参考一下
本文实例讲解的是一般的hadoop入门程序“WordCount”,就是首先写一个map程序用来将输入的字符串分割成单个的单词,然后reduce这些单个的单词,相同的单词就对其进行计数,不同的单词分别输出,结果输出每一个单词出现的频数。
  注意:关于数据的输入输出是通过sys.stdin(系统标准输入)和sys.stdout(系统标准输出)来控制数据的读入与输出。所有的脚本执行之前都需要修改权限,否则没有执行权限,例如下面的脚本创建之前使用“chmod +x mapper.py”
1.mapper.py
#!/usr/bin/env python
import sys
for line in sys.stdin: # 遍历读入数据的每一行
line = line.strip() # 将行尾行首的空格去除
words = line.split() #按空格将句子分割成单个单词
for word in words:
print '%s\t%s' %(word, 1)
2.reducer.py
#!/usr/bin/env python
from operator import itemgetter
import sys
current_word = None # 为当前单词
current_count = 0 # 当前单词频数
word = None
for line in sys.stdin:
words = line.strip() # 去除字符串首尾的空白字符
word, count = words.split('\t') # 按照制表符分隔单词和数量
count = int(count) # 将字符串类型的‘1'转换为整型1
except ValueError:
if current_word == word: # 如果当前的单词等于读入的单词
current_count += count # 单词频数加1
if current_word: # 如果当前的单词不为空则打印其单词和频数
print '%s\t%s' %(current_word, current_count)
current_count = count # 否则将读入的单词赋值给当前单词,且更新频数
current_word = word
if current_word == word:
print '%s\t%s' %(current_word, current_count)
在shell中运行以下脚本,查看输出结果:
echo "foo foo quux labs foo bar zoo zoo hying" | /home/wuying/mapper.py | sort -k 1,1 | /home/wuying/reducer.py
# echo是将后面“foo ****”字符串输出,并利用管道符“|”将输出数据作为mapper.py这个脚本的输入数据,并将mapper.py的数据输入到reducer.py中,其中参数sort -k 1,1是将reducer的输出内容按照第一列的第一个字母的ASCII码值进行升序排序
其实,我觉得后面这个reducer.py处理单词频数有点麻烦,将单词存储在字典里面,单词作为‘key',每一个单词出现的频数作为'value',进而进行频数统计感觉会更加高效一点。因此,改进脚本如下:
mapper_1.py
但是,貌似写着写着用了两个循环,反而效率低了。关键是不太明白这里的current_word和current_count的作用,如果从字面上老看是当前存在的单词,那么怎么和遍历读取的word和count相区别?
下面看一些脚本的输出结果:
我们可以看到,上面同样的输入数据,同样的shell换了不同的reducer,结果后者并没有对数据进行排序,实在是费解~
让Python代码在hadoop上跑起来!
一、准备输入数据
接下来,先下载三本书:
$ mkdir -p tmp/gutenberg
$ cd tmp/gutenberg
$ wget http://www.gutenberg.org/ebooks/20417.txt.utf-8
$ wget http://www.gutenberg.org/files/.txt
$ wget http://www.gutenberg.org/ebooks/4300.txt.utf-8
&然后把这三本书上传到hdfs文件系统上:
$ hdfs dfs -mkdir /user/${whoami}/input # 在hdfs上的该用户目录下创建一个输入文件的文件夹
$ hdfs dfs -put /home/wuying/tmp/gutenberg/*.txt /user/${whoami}/input # 上传文档到hdfs上的输入文件夹中
寻找你的streaming的jar文件存放地址,注意2.6的版本放到share目录下了,可以进入hadoop安装目录寻找该文件:
$ cd $HADOOP_HOME
$ find ./ -name "*streaming*"
然后就会找到我们的share文件夹中的hadoop-straming*.jar文件:
寻找速度可能有点慢,因此你最好是根据自己的版本号到对应的目录下去寻找这个streaming文件,由于这个文件的路径比较长,因此我们可以将它写入到环境变量:
$ vi ~/.bashrc # 打开环境变量配置文件
# 在里面写入streaming路径
export STREAM=$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar
由于通过streaming接口运行的脚本太长了,因此直接建立一个shell名称为run.sh来运行:
hadoop jar $STREAM \
-files ./mapper.py,./reducer.py \
-mapper ./mapper.py \
-reducer ./reducer.py \
-input /user/$(whoami)/input/*.txt \
-output /user/$(whoami)/output
然后"source run.sh"来执行mapreduce。结果就响当当的出来啦。这里特别要提醒一下:
1、一定要把本地的输入文件转移到hdfs系统上面,否则无法识别你的input内容;
2、一定要有权限,一定要在你的hdfs系统下面建立你的个人文件夹否则就会被denied,是的,就是这两个错误搞得我在服务器上面痛不欲生,四处问人的感觉真心不如自己清醒对待来的好;
3、如果你是第一次在服务器上面玩hadoop,建议在这之前请在自己的虚拟机或者linux系统上面配置好伪分布式然后入门hadoop来的比较不那么头疼,之前我并不知道我在服务器上面运维没有给我运行的权限,后来在自己的虚拟机里面运行一下example实例以及wordcount才找到自己的错误。
好啦,然后不出意外,就会complete啦,你就可以通过如下方式查看计数结果:
以上就是本文的全部内容,希望对大家学习python软件编程有所帮助。
您可能感兴趣的文章:
大家感兴趣的内容
12345678910
最近更新的内容
常用在线小工具}

我要回帖

更多关于 hadoop 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信