Hive的 mapreduce 计算例子
Hive的计算例子
一、计算任务
>数据源:手机号,上行流量,下行流量
>运算过程统计每个手机号的上行流量总和,下行流量的总和,流量总和
>运算结果:手机号->上行流量和,下行流量和,总和
二、代码实现
1:Master
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
public class FlowMaster{
public static void main(String[] args) throws Exception{
//初始化配置
Configuration cconfiguration = new Configuration();
//初始化job
Job job = Job.getInstance(cconfiguration);
//设置相关class
job.setJarByClass(FlowMaster.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Flow.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Flow.class);
//设置数据读入路径
FileInputFormat.setInputPaths(job,new Path("hdfs://teach01:8020/input/dataflow.log"));
//设置结果输出路径
FileOutputFormat.setOutputPaths(job,new Path("hdfs://teach01:8020/output/flowCount"));
//提交job
boolean result = job.waitForCompletion(true);
if(result){
system.out.println("成功");
}
}
}
2:Mapper
import org.apache.hadoop.mapreduce.Mapper;
public class FlowMapper extends Mapper<LongWritable,Text,Text,Flow>{
@Override
protected void map(LongWritable key,Text value,Mapper<LongWritable,Text,Text,Flow>.Context context) throws IOException,InterruptedException{
//获得当前行数据
String line = value.toString();
//逗号分隔
String[] fields = line.split(",");
//手机号
String phoneNumber = fields[0];
//上行流量
long upFlow = Long.parseLong(fields[1]);
//下行流量
long downFlow = Long.parseLong(fields[2]);
context.write(newText(phoneNumber),newFlow(upFlow,downFlow))
}
}
3:Reducer
import org.apache.hadoop.mapreduce.Reducer;
public class FlowReducer extends Reducer<Text,Flow,Text,Flow>{
@Override
protected void reduce(Text key,Iterable<Flow> values,Reducer<Text,Flow,Text,Flow>.Context context) throws IOException,InterruptedException{
long upFlow = 0;
long downFlow = 0;
//迭代计算求和
for(Flow flow:values){
upFlow += flow.getUpFlow();
downFlow += flow.getDownFlow();
}
//实例化过程中计算总和
Flow flow = new Flow(upFlow,downFlow);
context.write(key,flow);
}
}
4:封装实体类Flow
import org.apache.hadoop.id.Writables;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Flow implements Writables
{
public Flow(){}
public Flow(long upFlow,long downFlow){
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow+downFlow;
}
private long upFlow;
private long downFlow;
private long sumFlow;
//getter,setter方法省略
。。。。。。
//toString方法省略
。。。。。。
@Override
public void readFields(DataInput input) thows IOException{
upFlow = input.readLong();
downFlow = input.readLong();
sumFlow = input.readLong();
}
@Override
public void wirte(DataOutput output) thows IOException{
output.writeLong(upFlow);
output.writeLong(downFlow);
output.writeLong(sumFlow);
}
}
5:把NativeIO.java复制过来
修改609行为true
6:复制log4j.properties过来
7:在hdfs中创建目录
hdfs dfs -mkdir /input
hdfs dfs -mkdir /output
hdfs dfs -put dataflow.log /input