当前位置: 首页 > news >正文

python 运行 hadoop 2.0 mapreduce 程序

要点:#!/usr/bin/python  因为要发送到各个节点,所以py文件必须是可执行的。 
1
) 统计(所有日志)独立ip数目,即不同ip的总数 ####################本地测试############################ cat /home/hadoop/Sep-2013/*/* | python ipmappper.py | sort | python ipreducer.py 本地部分测试结果: 99.67.46.254 13 99.95.174.29 47 sum of single ip 13349 #####################hadoop集群运行############################ bin/hadoop jar ./share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -mapper /data/hadoop/jobs_python/job_logstat/ipmapper.py -reducer /data/hadoop/jobs_python/job_logstat/ipreducer.py -input /log_original/* -output /log_ipnum -file /data/hadoop/jobs_python/job_logstat/ipmapper.py -file /data/hadoop/jobs_python/job_logstat/ipreducer.py 集群部分测试结果: 99.67.46.254 13 99.95.174.29 47 sum of single ip 13349 ipmapper.py: ##########################mapper代码####################################### #!/usr/bin/python # --*-- coding:utf-8 --*-- import re import sys pat = re.compile('(?P<ip>\d+.\d+.\d+.\d+).*?"\w+ (?P<subdir>.*?) ') for line in sys.stdin: match = pat.search(line) if match: print '%s\t%s' % (match.group('ip'), 1) ipreducer.py ##########################reducer代码##################################### #!/usr/bin/python from operator import itemgetter import sys dict_ip_count = {} for line in sys.stdin: line = line.strip() ip, num = line.split('\t') try: num = int(num) dict_ip_count[ip] = dict_ip_count.get(ip, 0) + num except ValueError: pass sorted_dict_ip_count = sorted(dict_ip_count.items(), key=itemgetter(0)) for ip, count in sorted_dict_ip_count: print '%s\t%s' % (ip, count) 2) 统计(所有日志)每个子目录访问次数 ########################本地测试###################################### cat /home/hadoop/Sep-2013/*/* | python subdirmapper.py | sort | python subdirreducer.py 部分结果: http://dongxicheng.org/recommend/ 2 http://dongxicheng.org/search-engine/scribe-intro/trackback/ 1 http://dongxicheng.org/structure/permutation-combination/ 1 http://dongxicheng.org/structure/sort/trackback/ 1 http://dongxicheng.org/wp-comments-post.php 5 http://dongxicheng.org/wp-login.php/ 3535 http://hadoop123.org/administrator/index.php 4 #######################hadoop集群运行######################################## bin/hadoop jar ./share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -mapper /data/hadoop/jobs_python/job_logstat/subdirmapper.py -reducer /data/hadoop/jobs_python/job_logstat/subdirreducer.py -input /log_original/* -output /log_subdirnum -file /data/hadoop/jobs_python/job_logstat/subdirmapper.py -file /data/hadoop/jobs_python/job_logstat/subdirreducer.py 部分结果: http://dongxicheng.org/search-engine/scribe-intro/trackback/ 1 http://dongxicheng.org/structure/permutation-combination/ 1 http://dongxicheng.org/structure/sort/trackback/ 1 http://dongxicheng.org/wp-comments-post.php 5 http://dongxicheng.org/wp-login.php/ 3535 http://hadoop123.org/administrator/index.php 4 #######################################mapper代码########################################### #!/usr/bin/python # --*-- coding:utf-8 --*-- import re import sys pat = re.compile('(?P<ip>\d+.\d+.\d+.\d+).*?"\w+ (?P<subdir>.*?) ') for line in sys.stdin: match = pat.search(line) if match: print '%s\t%s' % (match.group('subdir'), 1) #######################################reducer代码########################################### #!/usr/bin/python from operator import itemgetter import sys dict_subdir_count = {} for line in sys.stdin: line = line.strip() subdir, num = line.split('\t') try: num = int(num) dict_subdir_count[subdir] = dict_subdir_count.get(subdir, 0) + num except ValueError: pass sorted_dict_ip_count = sorted(dict_subdir_count.items(), key=itemgetter(0)) for subdir, count in sorted_dict_ip_count: print '%s\t%s' % (subdir, count)

 









【还是用java写mr程序吧】
参考网址:
http://asfr.blogbus.com/logs/44208067.html

bin/hadoop jar ./share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -mapper /data/hadoop/mapper.py -reducer /data/hadoop/reducer.py -input /in/* -output /py_out -file /data/hadoop/mapper.py -file /data/hadoop/reducer.py
python开发mapreduce的原理:
》与linux管道机制一致
》通过标准输入输出实现进程间通信
》标准输入输出是任何语言都支持的。
举几个例子:
cat 1.txt | grep 'dong' | sort
cat 1.txt | python grep.py | java sort.jar

以标准输入流作为输入:
c++: cin
c: scanf
以标准输出流作为输出:
c++:count
c:printf

局限性:可以实现Mapper Reducer,其他组件需要用java实现。

hadoop-streaming 进行测试很简单的哦。
编译程序,生成可执行文件
g++ -o mapper mapper.cpp
g++ -o reducer reduer.cpp
测试程序:
cat test.txt | ./mappper | sort | ./reducer










#!/usr/bin/python # coding=utf-8 import sys # input comes from STDIN (standard input) for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # split the line into words words = line.split() # increase counters for word in words: # write the results to STDOUT (standard output); # what we output here will be the input for the # Reduce step, i.e. the input for reducer.py # # tab-delimited; the trivial word count is 1 print '%s\t%s' % (word, 1) #!/usr/bin/python # coding=utf-8 from operator import itemgetter import sys # maps words to their counts word2count = {} # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # parse the input we got from mapper.py word, count = line.split('\t', 1) # convert count (currently a string) to int try: count = int(count) word2count[word] = word2count.get(word, 0) + count except ValueError: # count was not a number, so silently # ignore/discard this line pass # sort the words lexigraphically; # # this step is NOT required, we just do it so that our # final output will look more like the official Hadoop # word count examples sorted_word2count = sorted(word2count.items(), key=itemgetter(0)) # write the results to STDOUT (standard output) for word, count in sorted_word2count: print '%s\t%s'% (word, count)



packageJobJar: [/data/hadoop/mapper.py, /data/hadoop/reducer.py, /data/hadoop/hadoop_tmp/hadoop-unjar4601454529868960285/] [] /tmp/streamjob2970217681900457939.jar tmpDir=null
14/03/21 16:23:09 INFO client.RMProxy: Connecting to ResourceManager at /192.168.2.200:8032
14/03/21 16:23:09 INFO client.RMProxy: Connecting to ResourceManager at /192.168.2.200:8032
14/03/21 16:23:10 INFO mapred.FileInputFormat: Total input paths to process : 2
14/03/21 16:23:10 INFO mapreduce.JobSubmitter: number of splits:2
14/03/21 16:23:10 INFO Configuration.deprecation: user.name is deprecated. Instead, use mapreduce.job.user.name
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.cache.files.filesizes is deprecated. Instead, use mapreduce.job.cache.files.filesizes
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.cache.files is deprecated. Instead, use mapreduce.job.cache.files
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.mapoutput.value.class is deprecated. Instead, use mapreduce.map.output.value.class
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.cache.files.timestamps is deprecated. Instead, use mapreduce.job.cache.files.timestamps
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.mapoutput.key.class is deprecated. Instead, use mapreduce.map.output.key.class
14/03/21 16:23:10 INFO Configuration.deprecation: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir
14/03/21 16:23:10 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1394086709210_0008
14/03/21 16:23:10 INFO impl.YarnClientImpl: Submitted application application_1394086709210_0008 to ResourceManager at /192.168.2.200:8032
14/03/21 16:23:10 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1394086709210_0008/
14/03/21 16:23:10 INFO mapreduce.Job: Running job: job_1394086709210_0008
14/03/21 16:23:14 INFO mapreduce.Job: Job job_1394086709210_0008 running in uber mode : false
14/03/21 16:23:14 INFO mapreduce.Job:  map 0% reduce 0%
14/03/21 16:23:19 INFO mapreduce.Job:  map 100% reduce 0%
14/03/21 16:23:23 INFO mapreduce.Job:  map 100% reduce 100%
14/03/21 16:23:24 INFO mapreduce.Job: Job job_1394086709210_0008 completed successfully
14/03/21 16:23:24 INFO mapreduce.Job: Counters: 43
    File System Counters
        FILE: Number of bytes read=47
        FILE: Number of bytes written=248092
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=197
        HDFS: Number of bytes written=25
        HDFS: Number of read operations=9
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters
        Launched map tasks=2
        Launched reduce tasks=1
        Data-local map tasks=2
        Total time spent by all maps in occupied slots (ms)=5259
        Total time spent by all reduces in occupied slots (ms)=2298
    Map-Reduce Framework
        Map input records=2
        Map output records=4
        Map output bytes=33
        Map output materialized bytes=53
        Input split bytes=172
        Combine input records=0
        Combine output records=0
        Reduce input groups=3
        Reduce shuffle bytes=53
        Reduce input records=4
        Reduce output records=3
        Spilled Records=8
        Shuffled Maps =2
        Failed Shuffles=0
        Merged Map outputs=2
        GC time elapsed (ms)=71
        CPU time spent (ms)=1300
        Physical memory (bytes) snapshot=678060032
        Virtual memory (bytes) snapshot=2662100992
        Total committed heap usage (bytes)=514326528
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters
        Bytes Read=25
    File Output Format Counters
        Bytes Written=25
14/03/21 16:23:24 INFO streaming.StreamJob: Output directory: /py_out



1,hadoop上在java开发可用:
 
FileSplit fileSplit = (FileSplit)reporter.getInputSplit();
String fileName = fileSplit.getPath().getName();
来获取文件名称。
,2,同样python开发时,可以用:
 
来获取文件名:
 
import os
 
os.environ["map_input_file"]
这里的 map_input_file 相当于map.input.file




 

相关文章:

  • ListView优化
  • 蓝桥杯第二届试题集锦
  • OpenAI发文怒怼:对抗样本怎么不会对检测产生干扰了?
  • mina编解码(摘录)
  • 团队作业7——第二次项目冲刺-Beta版本项目计划
  • mysql登陆密码忘记,解决办法
  • VMPlayer Ubuntu 16.04 Copy and Paste with Host 主机与宿机之间的复制粘贴
  • MyISAM 与 InnoDB 的区别
  • 牛逼的 弹出层 layer !!!
  • mysql中OPTIMIZE TABLE的作用
  • Speed up your eclipse as a super fast IDE
  • grep 正则表达式 vim及相关知识
  • 自动部署ubuntu系统时ks.cfg和ks.seed有什么不同
  • Android学习系列(19)--App离线下载
  • oracle12c管理作业资源的一种方式
  • 自己简单写的 事件订阅机制
  • JWT究竟是什么呢?
  • Python打包系统简单入门
  • vue-router 实现分析
  • 给初学者:JavaScript 中数组操作注意点
  • 海量大数据大屏分析展示一步到位:DataWorks数据服务+MaxCompute Lightning对接DataV最佳实践...
  • 区块链分支循环
  • 小程序01:wepy框架整合iview webapp UI
  • 一个SAP顾问在美国的这些年
  • 优秀架构师必须掌握的架构思维
  • k8s使用glusterfs实现动态持久化存储
  • Nginx惊现漏洞 百万网站面临“拖库”风险
  • 阿里云移动端播放器高级功能介绍
  • ​​​​​​​Installing ROS on the Raspberry Pi
  • # 飞书APP集成平台-数字化落地
  • #Z0458. 树的中心2
  • #我与Java虚拟机的故事#连载07:我放弃了对JVM的进一步学习
  • $.ajax,axios,fetch三种ajax请求的区别
  • (编译到47%失败)to be deleted
  • (附源码)spring boot网络空间安全实验教学示范中心网站 毕业设计 111454
  • (免费分享)基于springboot,vue疗养中心管理系统
  • (入门自用)--C++--抽象类--多态原理--虚表--1020
  • (实战篇)如何缓存数据
  • (数位dp) 算法竞赛入门到进阶 书本题集
  • (一)基于IDEA的JAVA基础1
  • (转)利用PHP的debug_backtrace函数,实现PHP文件权限管理、动态加载 【反射】...
  • (转载)在C#用WM_COPYDATA消息来实现两个进程之间传递数据
  • (最完美)小米手机6X的Usb调试模式在哪里打开的流程
  • .apk 成为历史!
  • .bat批处理(六):替换字符串中匹配的子串
  • @Conditional注解详解
  • @Mapper作用
  • [ 代码审计篇 ] 代码审计案例详解(一) SQL注入代码审计案例
  • []使用 Tortoise SVN 创建 Externals 外部引用目录
  • [AIGC] 如何建立和优化你的工作流?
  • [AndroidStudio]_[初级]_[修改虚拟设备镜像文件的存放位置]
  • [Angular 基础] - 数据绑定(databinding)
  • [AUTOSAR][诊断管理][ECU][$37] 请求退出传输。终止数据传输的(上传/下载)
  • [BZOJ 3680]吊打XXX(模拟退火)
  • [Bzoj4722]由乃(线段树好题)(倍增处理模数小快速幂)