当前位置: 首页 > news >正文

使用python+hadoop-streaming编写hadoop处理程序

Hadoop Streaming提供了一个便于进行MapReduce编程的工具包,使用它可以基于一些可执行命令、脚本语言或其他编程语言来实现Mapper和 Reducer,从而充分利用Hadoop并行计算框架的优势和能力,来处理大数据

好吧我承认以上这句是抄的以下是原创干货

首先部署hadoop环境,这点可以参考 http://www.powerxing.com/install-hadoop-in-centos/

好吧原创从下一行开始

部署hadoop完成后,需要下载hadoop-streaming包,这个可以到http://www.java2s.com/Code/JarDownload/hadoop-streaming/hadoop-streaming-0.23.6.jar.zip去下载,或者访问http://www.java2s.com/Code/JarDownload/hadoop-streaming/选择最新版本,千万不要选择source否则后果自负,选择编译好的jar包即可,放到/usr/local/hadoop目录下备用

接下来是选择大数据统计的样本,我在阿里的天池大数据竞赛网站下载了母婴类购买统计数据,记录了900+个萌萌哒小baby的购买用户名、出生日期和性别信息,天池的地址https://tianchi.shuju.aliyun.com/datalab/index.htm

数据是一个csv文件,结构如下:

用户名,出生日期,性别(0女,1男,2不愿意透露性别)

比如:415971,20121111,0(数据已经脱敏处理)

下面我们来试着统计每年的男女婴人数

接下来开始写mapper程序mapper.py,由于hadoop-streaming是基于Unix Pipe的,数据会从标准输入sys.stdin输入,所以输入就写sys.stdin

#!/usr/bin/python
# -*- coding: utf-8 -*-

import sys

for line in sys.stdin:
    line = line.strip()
    data = line.split(',')
    if len(data)<3:
        continue
    user_id = data[0]
    birthyear = data[1][0:4]
    gender = data[2]
    print >>sys.stdout,"%s\t%s"%(birthyear,gender)

一个很简单的程序,看不懂的请自行提高姿势水平

下面是reduce程序,这里大家需要注意一下,map到reduce的期间,hadoop会自动给map出的key排序,所以到reduce中是一个已经排序的键值对,这简化了我们的编程工作

我是有洪荒之力的reducer.py,和外面的哪些妖艳贱货不一样

#!/usr/bin/python
# -*- coding: utf-8 -*-

import sys

gender_totle = {'0':0,'1':0,'2':0}
prev_key = False
for line in sys.stdin:#map的时候map中的key会被排序
    line = line.strip()    
    data = line.split('\t')
    birthyear = data[0]
    curr_key = birthyear
    gender = data[1]
    
    #寻找边界,输出结果
    if prev_key and curr_key !=prev_key:#不是第一次,并且找到了边界
        print >>sys.stdout,"%s year has female %s and male %s"%(prev_key,gender_totle['0'],gender_totle['1'])#先输出上一次统计的结果
        prev_key = curr_key
        gender_totle['0'] = 0
        gender_totle['1'] = 0
        gender_totle['2'] = 0#清零
        gender_totle[gender] +=1#开始计数
    else:
        prev_key = curr_key
        gender_totle[gender] += 1
#输出最后一行
if prev_key:
    print >>sys.stdout,"%s year has female %s and male %s"%(prev_key,gender_totle['0'],gender_totle['1'])

接下来就是将样本和mapper reducer上传到hdfs中并执行了,这也是我踩坑的地方

可以先这样测试下python脚本是否正确

cat sample.csv | ./mapper.py | sort -t ' ' -k 1 | ./reducer.py 

 

首先要在hdfs中创建相应的目录,为了方便,我将一部分hadoop命令做了别名

alias stop-dfs='/usr/local/hadoop/sbin/stop-dfs.sh'
alias start-dfs='/usr/local/hadoop/sbin/start-dfs.sh'
alias dfs='/usr/local/hadoop/bin/hdfs dfs'
echo "alias stop-dfs='/usr/local/hadoop/sbin/stop-dfs.sh'" >> /etc/profile
echo "alias start-dfs='/usr/local/hadoop/sbin/start-dfs.sh'" >> /etc/profile
echo "alias dfs='/usr/local/hadoop/bin/hdfs dfs'" >> /etc/profile

 

启动hadoop后,先创建一个用户目录

dfs -mkdir -p /user/root

将样本上传到此目录中

dfs -put ./sample.csv /user/root

当然也可以这样处理更加规范,这两者的差别一会儿会说

dfs -mkdir -p /user/root/input
dfs -put ./sample.csv /user/root/input

接下来将mapper.py和reducer.py上传到服务器上,切换到上传以上两个文件的目录

然后就可以执行了,执行命令如下

hadoop jar /usr/local/hadoop/hadoop-streaming-0.23.6.jar \
-D mapred.job.name="testhadoop" \
-D mapred.job.queue.name=testhadoopqueue \
-D mapred.map.tasks=50 \
-D mapred.min.split.size=1073741824 \
-D mapred.reduce.tasks=10 \
-D stream.num.map.output.key.fields=1 \
-D num.key.fields.for.partition=1 \
-input sample.csv \
-output output-streaming \
-mapper mapper.py \
-reducer reducer.py \
-file mapper.py \
-file reducer.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner  

如果是将sample.csv放到input下,这个命令就应该这么写,不过反正我也没试过,出错了不关我的事

hadoop jar /usr/local/hadoop/hadoop-streaming-0.23.6.jar \
-D mapred.job.name="testhadoop" \
-D mapred.job.queue.name=testhadoopqueue \
-D mapred.map.tasks=50 \
-D mapred.min.split.size=1073741824 \
-D mapred.reduce.tasks=10 \
-D stream.num.map.output.key.fields=1 \
-D num.key.fields.for.partition=1 \
-input input/sample.csv \
-output output-streaming \
-mapper mapper.py \
-reducer reducer.py \
-file mapper.py \
-file reducer.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner  

命令的解释如下

 

(1)-input:输入文件路径
(2)-output:输出文件路径
(3)-mapper:用户自己写的mapper程序,可以是可执行文件或者脚本
(4)-reducer:用户自己写的reducer程序,可以是可执行文件或者脚本
(5)-file:打包文件到提交的作业中,可以是mapper或者reducer要用的输入文件,如配置文件,字典等。
         这个一般是必须有的,因为mapper和reducer函数都是写在本地的文件中,因此需要将文件上传到集群中才能被执行
(6)-partitioner:用户自定义的partitioner程序
(7)-D:作业的一些属性(以前用的是-jonconf),具体有:
              1)mapred.map.tasks:map task数目  
              设置的数目与实际运行的值并不一定相同,若输入文件含有M个part,而此处设置的map_task数目超过M,那么实际运行map_task仍然是M
              2)mapred.reduce.tasks:reduce task数目  不设置的话,默认值就为1
              3)num.key.fields.for.partition=N:shuffle阶段将数据集的前N列作为Key;所以对于wordcount程序,map输出为“word  1”,shuffle是以word作为Key,因此这里N=1
(8)-D stream.num.map.output.key.fields=1 这个是指在reduce之前将数据按前1列做排序,一般情况下可以去掉

 

接下来就是激动人心的一刻了,要非常用力地跪着按下enter键

如果有报错output-streaming already exists就用命令dfs -rm -R /user/root/output-streaming 然后跳起来按下enter键

即使出现奇怪的刷屏也不要惊奇恩妈妈是这么教我的

如果出现以下字样就是成功了

16/08/18 18:35:20 INFO mapreduce.Job:  map 100% reduce 100%
16/08/18 18:35:20 INFO mapreduce.Job: Job job_local926114196_0001 completed successfully

 之后使用如下命令将结果取回本地,使用cat命令就能查看

dfs -get /user/root/output-streaming/* ./output-streaming
cat ./output-streaming/*

 很惭愧,只做了一点微小的工作

转载于:https://www.cnblogs.com/wuxie1989/p/5785093.html

相关文章:

  • php ci框架整合银盛支付
  • SQL Server编程(06)触发器
  • 关于写东西
  • 1164 统计数字
  • 大神的Blog挂了,从Bing快照里复制过来的备份
  • linux内核值shmmax问题
  • 一行神奇的javascript代码
  • Mybatis初体验
  • JSP_内置对象_out
  • ubuntu desktop解决系统启动后网络没有主动连接
  • 第6集_奇点和安迪吃饭2 吃野生鲫鱼
  • 老男孩教育-linux面试题-基础题1
  • Redis常用命令入门2:散列类型
  • mysql通过配置文件进行优化
  • linux基础概念和个人笔记总结(6)
  • 【Leetcode】101. 对称二叉树
  • 「译」Node.js Streams 基础
  • 【391天】每日项目总结系列128(2018.03.03)
  • ES6, React, Redux, Webpack写的一个爬 GitHub 的网页
  • es6要点
  • gulp 教程
  • Java比较器对数组,集合排序
  • Java反射-动态类加载和重新加载
  • MySQL-事务管理(基础)
  • Swoft 源码剖析 - 代码自动更新机制
  • 构建二叉树进行数值数组的去重及优化
  • 基于Android乐音识别(2)
  • 基于Dubbo+ZooKeeper的分布式服务的实现
  • 前端_面试
  • 前端面试题总结
  • 前端之Sass/Scss实战笔记
  • 浅谈Golang中select的用法
  • 如何将自己的网站分享到QQ空间,微信,微博等等
  • 实战|智能家居行业移动应用性能分析
  • 我的zsh配置, 2019最新方案
  • 无服务器化是企业 IT 架构的未来吗?
  • 写给高年级小学生看的《Bash 指南》
  • 东超科技获得千万级Pre-A轮融资,投资方为中科创星 ...
  • 好程序员web前端教程分享CSS不同元素margin的计算 ...
  • 积累各种好的链接
  • 如何用纯 CSS 创作一个货车 loader
  • ​html.parser --- 简单的 HTML 和 XHTML 解析器​
  • # centos7下FFmpeg环境部署记录
  • ###C语言程序设计-----C语言学习(6)#
  • #周末课堂# 【Linux + JVM + Mysql高级性能优化班】(火热报名中~~~)
  • (16)Reactor的测试——响应式Spring的道法术器
  • (C++17) optional的使用
  • (C语言)深入理解指针2之野指针与传值与传址与assert断言
  • (八)Spring源码解析:Spring MVC
  • (二)构建dubbo分布式平台-平台功能导图
  • (翻译)terry crowley: 写给程序员
  • (转)用.Net的File控件上传文件的解决方案
  • (转载)跟我一起学习VIM - The Life Changing Editor
  • .aanva
  • .java 指数平滑_转载:二次指数平滑法求预测值的Java代码