红联Linux门户
Linux帮助

Hadoop流实现WordCount程序样例

发布时间:2016-10-08 09:42:45来源:linux网站作者:武睿傲雪
Hadoop提供了一个API,允许用户使用任何脚本语言写Map函数或Reduce函数。Hadoop流的关键时,它使用UNIX函数标准作为程序与Hadoop之间的接口。因此,任何程序只要可以从标准输入流中读取数据并且可以写入数据到标准输出流,那么就可以通过Hadoop流使用其他语言编写MapReduce程序的Map函数或Reduce函数。
 
下面使用一个简单的例子(本例子运行的环境:Ubuntu 16.04 LTS,Hadoop-2.7.1):
 
(1)从Hadoop-2.7.1包中找到hadoop-streaming-2.7.1.jar包,寻找方式通过Crtl+F在如图所示的文件夹下寻找。
Hadoop流实现WordCount程序样例
 
(2)在Hadoop-2.7.1目录下创建一个input文件用于存放本地的输入文件。后面将通过命令上传到HDFS文件中去即hdfs://localhost:9000/user/..目录下。
在HDFS上创建dtw文件:
Hadoop流实现WordCount程序样例
查看有没有创建成功:
Hadoop流实现WordCount程序样例
然后继续创建input文件:
Hadoop流实现WordCount程序样例
查看有没有创建成功:
Hadoop流实现WordCount程序样例
将本地input文件下的两个文件上传到HDFS上的input文件夹中:
 
首先看一下input里的两个文件内容分别是什么:
Hadoop流实现WordCount程序样例
 
file01内容:
Hello World Bye World
Hello Hadoop Bye Hadoop
Bye Hadoop Hello Hadoop
yes me
good yes
 
file02内容:
Hello World Bye World
Hello Hadoop Bye Hadoop
Bye Hadoop Hello Hadoop
yes me
catch ese
 
从本地文件中复制到HDFS input文件中的命令:
Hadoop流实现WordCount程序样例
查看有没有上传成功:
Hadoop流实现WordCount程序样例
执行MapReduce流命令:
Hadoop流实现WordCount程序样例
 
shell中显示的内容:
16/10/07 21:08:51 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/10/07 21:08:51 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/10/07 21:08:51 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/10/07 21:08:52 INFO mapred.FileInputFormat: Total input paths to process : 2
16/10/07 21:08:52 INFO mapreduce.JobSubmitter: number of splits:2
16/10/07 21:08:52 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local233242151_0001
16/10/07 21:08:53 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
16/10/07 21:08:53 INFO mapred.LocalJobRunner: OutputCommitter set in config null
16/10/07 21:08:53 INFO mapreduce.Job: Running job: job_local233242151_0001
16/10/07 21:08:53 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapred.FileOutputCommitter
16/10/07 21:08:53 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/10/07 21:08:53 INFO mapred.LocalJobRunner: Waiting for map tasks
16/10/07 21:08:53 INFO mapred.LocalJobRunner: Starting task: attempt_local233242151_0001_m_000000_0
16/10/07 21:08:53 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/10/07 21:08:53 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
16/10/07 21:08:53 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/user/dtw/input/file02:0+87
16/10/07 21:08:53 INFO mapred.MapTask: numReduceTasks: 1
16/10/07 21:08:53 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
16/10/07 21:08:53 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
16/10/07 21:08:53 INFO mapred.MapTask: soft limit at 83886080
16/10/07 21:08:53 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
16/10/07 21:08:53 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
16/10/07 21:08:53 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
16/10/07 21:08:53 INFO streaming.PipeMapRed: PipeMapRed exec [/bin/cat]
16/10/07 21:08:53 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
16/10/07 21:08:53 INFO Configuration.deprecation: user.name is deprecated. Instead, use mapreduce.job.user.name
16/10/07 21:08:53 INFO Configuration.deprecation: map.input.start is deprecated. Instead, use mapreduce.map.input.start
16/10/07 21:08:53 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
16/10/07 21:08:53 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
16/10/07 21:08:53 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
16/10/07 21:08:53 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
16/10/07 21:08:53 INFO Configuration.deprecation: map.input.length is deprecated. Instead, use mapreduce.map.input.length
16/10/07 21:08:53 INFO Configuration.deprecation: mapred.local.dir is deprecated. Instead, use mapreduce.cluster.local.dir
16/10/07 21:08:53 INFO Configuration.deprecation: mapred.work.output.dir is deprecated. Instead, use mapreduce.task.output.dir
16/10/07 21:08:53 INFO Configuration.deprecation: map.input.file is deprecated. Instead, use mapreduce.map.input.file
16/10/07 21:08:53 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
16/10/07 21:08:53 INFO streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]
16/10/07 21:08:53 INFO streaming.PipeMapRed: MRErrorThread done
16/10/07 21:08:53 INFO streaming.PipeMapRed: Records R/W=5/1
16/10/07 21:08:53 INFO streaming.PipeMapRed: mapRedFinished
16/10/07 21:08:53 INFO mapred.LocalJobRunner: 
16/10/07 21:08:53 INFO mapred.MapTask: Starting flush of map output
16/10/07 21:08:53 INFO mapred.MapTask: Spilling map output
16/10/07 21:08:53 INFO mapred.MapTask: bufstart = 0; bufend = 92; bufvoid = 104857600
16/10/07 21:08:53 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214380(104857520); length = 17/6553600
16/10/07 21:08:53 INFO mapred.MapTask: Finished spill 0
16/10/07 21:08:53 INFO mapred.Task: Task:attempt_local233242151_0001_m_000000_0 is done. And is in the process of committing
16/10/07 21:08:53 INFO mapred.LocalJobRunner: Records R/W=5/1
16/10/07 21:08:53 INFO mapred.Task: Task 'attempt_local233242151_0001_m_000000_0' done.
16/10/07 21:08:53 INFO mapred.LocalJobRunner: Finishing task: attempt_local233242151_0001_m_000000_0
16/10/07 21:08:53 INFO mapred.LocalJobRunner: Starting task: attempt_local233242151_0001_m_000001_0
16/10/07 21:08:53 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/10/07 21:08:53 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
16/10/07 21:08:53 INFO mapred.MapTask: Processing split: hdfs://localhost:9000/user/dtw/input/file01:0+86
16/10/07 21:08:53 INFO mapred.MapTask: numReduceTasks: 1
16/10/07 21:08:53 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
16/10/07 21:08:53 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
16/10/07 21:08:53 INFO mapred.MapTask: soft limit at 83886080
16/10/07 21:08:53 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
16/10/07 21:08:53 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
16/10/07 21:08:53 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
16/10/07 21:08:53 INFO streaming.PipeMapRed: PipeMapRed exec [/bin/cat]
16/10/07 21:08:54 INFO streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]
16/10/07 21:08:54 INFO streaming.PipeMapRed: MRErrorThread done
16/10/07 21:08:54 INFO streaming.PipeMapRed: Records R/W=5/1
16/10/07 21:08:54 INFO streaming.PipeMapRed: mapRedFinished
16/10/07 21:08:54 INFO mapred.LocalJobRunner: 
16/10/07 21:08:54 INFO mapred.MapTask: Starting flush of map output
16/10/07 21:08:54 INFO mapred.MapTask: Spilling map output
16/10/07 21:08:54 INFO mapred.MapTask: bufstart = 0; bufend = 91; bufvoid = 104857600
16/10/07 21:08:54 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214380(104857520); length = 17/6553600
16/10/07 21:08:54 INFO mapred.MapTask: Finished spill 0
16/10/07 21:08:54 INFO mapred.Task: Task:attempt_local233242151_0001_m_000001_0 is done. And is in the process of committing
16/10/07 21:08:54 INFO mapred.LocalJobRunner: Records R/W=5/1
16/10/07 21:08:54 INFO mapred.Task: Task 'attempt_local233242151_0001_m_000001_0' done.
16/10/07 21:08:54 INFO mapred.LocalJobRunner: Finishing task: attempt_local233242151_0001_m_000001_0
16/10/07 21:08:54 INFO mapred.LocalJobRunner: map task executor complete.
16/10/07 21:08:54 INFO mapred.LocalJobRunner: Waiting for reduce tasks
16/10/07 21:08:54 INFO mapred.LocalJobRunner: Starting task: attempt_local233242151_0001_r_000000_0
16/10/07 21:08:54 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
16/10/07 21:08:54 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
16/10/07 21:08:54 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@14f35a42
16/10/07 21:08:54 INFO mapreduce.Job: Job job_local233242151_0001 running in uber mode : false
16/10/07 21:08:54 INFO mapreduce.Job:  map 100% reduce 0%
16/10/07 21:08:54 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=333971456, maxSingleShuffleLimit=83492864, mergeThreshold=220421168, ioSortFactor=10, memToMemMergeOutputsThreshold=10
16/10/07 21:08:54 INFO reduce.EventFetcher: attempt_local233242151_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
16/10/07 21:08:54 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local233242151_0001_m_000001_0 decomp: 103 len: 107 to MEMORY
16/10/07 21:08:54 INFO reduce.InMemoryMapOutput: Read 103 bytes from map-output for attempt_local233242151_0001_m_000001_0
16/10/07 21:08:54 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 103, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->103
16/10/07 21:08:54 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local233242151_0001_m_000000_0 decomp: 104 len: 108 to MEMORY
16/10/07 21:08:54 INFO reduce.InMemoryMapOutput: Read 104 bytes from map-output for attempt_local233242151_0001_m_000000_0
16/10/07 21:08:54 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 104, inMemoryMapOutputs.size() -> 2, commitMemory -> 103, usedMemory ->207
16/10/07 21:08:54 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning
16/10/07 21:08:54 INFO mapred.LocalJobRunner: 2 / 2 copied.
16/10/07 21:08:54 INFO reduce.MergeManagerImpl: finalMerge called with 2 in-memory map-outputs and 0 on-disk map-outputs
16/10/07 21:08:55 INFO mapred.Merger: Merging 2 sorted segments
16/10/07 21:08:55 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 155 bytes
16/10/07 21:08:55 INFO reduce.MergeManagerImpl: Merged 2 segments, 207 bytes to disk to satisfy reduce memory limit
16/10/07 21:08:55 INFO reduce.MergeManagerImpl: Merging 1 files, 209 bytes from disk
16/10/07 21:08:55 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
16/10/07 21:08:55 INFO mapred.Merger: Merging 1 sorted segments
16/10/07 21:08:55 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 179 bytes
16/10/07 21:08:55 INFO mapred.LocalJobRunner: 2 / 2 copied.
16/10/07 21:08:55 INFO streaming.PipeMapRed: PipeMapRed exec [/usr/bin/wc]
16/10/07 21:08:55 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
16/10/07 21:08:55 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
16/10/07 21:08:55 INFO streaming.PipeMapRed: R/W/S=1/0/0 in:NA [rec/s] out:NA [rec/s]
16/10/07 21:08:55 INFO streaming.PipeMapRed: R/W/S=10/0/0 in:NA [rec/s] out:NA [rec/s]
16/10/07 21:08:55 INFO streaming.PipeMapRed: MRErrorThread done
16/10/07 21:08:55 INFO streaming.PipeMapRed: Records R/W=10/1
16/10/07 21:08:55 INFO streaming.PipeMapRed: mapRedFinished
16/10/07 21:08:55 INFO mapred.Task: Task:attempt_local233242151_0001_r_000000_0 is done. And is in the process of committing
16/10/07 21:08:55 INFO mapred.LocalJobRunner: 2 / 2 copied.
16/10/07 21:08:55 INFO mapred.Task: Task attempt_local233242151_0001_r_000000_0 is allowed to commit now
16/10/07 21:08:55 INFO output.FileOutputCommitter: Saved output of task 'attempt_local233242151_0001_r_000000_0' to hdfs://localhost:9000/user/dtw/output/_temporary/0/task_local233242151_0001_r_000000
16/10/07 21:08:55 INFO mapred.LocalJobRunner: Records R/W=10/1 > reduce
16/10/07 21:08:55 INFO mapred.Task: Task 'attempt_local233242151_0001_r_000000_0' done.
16/10/07 21:08:55 INFO mapred.LocalJobRunner: Finishing task: attempt_local233242151_0001_r_000000_0
16/10/07 21:08:55 INFO mapred.LocalJobRunner: reduce task executor complete.
16/10/07 21:08:56 INFO mapreduce.Job:  map 100% reduce 100%
16/10/07 21:08:56 INFO mapreduce.Job: Job job_local233242151_0001 completed successfully
16/10/07 21:08:56 INFO mapreduce.Job: Counters: 35
File System Counters
FILE: Number of bytes read=319317
FILE: Number of bytes written=1147162
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=433
HDFS: Number of bytes written=25
HDFS: Number of read operations=22
HDFS: Number of large read operations=0
HDFS: Number of write operations=5
Map-Reduce Framework
Map input records=10
Map output records=10
Map output bytes=183
Map output materialized bytes=215
Input split bytes=190
Combine input records=0
Combine output records=0
Reduce input groups=6
Reduce shuffle bytes=215
Reduce input records=10
Reduce output records=1
Spilled Records=20
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=19
Total committed heap usage (bytes)=938999808
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters 
Bytes Read=173
File Output Format Counters 
Bytes Written=25
16/10/07 21:08:56 INFO streaming.StreamJob: Output directory: output
 
HDFS中会创建一个output文件,文件里包含两个文件:_SUCCESS和part-00000(part-00000里面就是我们想要的结果)
 
我们想要看一下output文件中返回的结果可以通过这条命令:
Hadoop流实现WordCount程序样例
得到结果:
10      32     183
 
从这个例子中可以看出,Hadoop流引用的包是hadoop-streaming-2.7.1.jar,并且具有如下命令:
-input 指明输入文件路径
-output 指明输出文件路径
-mapper 指定map函数
-reducer 指定reduce函数
通过结果我们可以看出wc命令用来统计文件中的行数 单词数与字节数,可以看出这个结果是正确的。
 
至此关于Hadoop流实现MapReduce就讲完了希望对大家有帮助!
 
本文永久更新地址:http://www.linuxdiyf.com/linux/24833.html