当前位置: 首页 > news >正文

怎样通过Java程序提交yarn的mapreduce计算任务

    因为项目需求,须要通过Java程序提交Yarn的MapReduce的计算任务。与一般的通过Jar包提交MapReduce任务不同,通过程序提交MapReduce任务须要有点小变动。详见下面代码。

    下面为MapReduce主程序,有几点须要提一下:

    1、在程序中,我将文件读入格式设定为WholeFileInputFormat,即不正确文件进行切分。

    2、为了控制reduce的处理过程。map的输出键的格式为组合键格式。

与常规的<key,value>不同,这里变为了<TextPair,Value>,TextPair的格式为<key1,key2>。

    3、为了适应组合键,又一次设定了分组函数。即GroupComparator。分组规则为,仅仅要TextPair中的key1同样(不要求key2同样),则数据被分配到一个reduce容器中。这样,当同样key1的数据进入reduce容器后,key2起到了一个数据标识的作用。

package web.hadoop;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

import util.Utils;

public class GEMIMain {
	
	public GEMIMain(){
		job = null;
	}
	
	public Job job;
	public static class NamePartitioner extends
			Partitioner<TextPair, BytesWritable> {
		@Override
		public int getPartition(TextPair key, BytesWritable value,
				int numPartitions) {
			return Math.abs(key.getFirst().hashCode() * 127) % numPartitions;
		}
	}

	/**
	 * 分组设置类。仅仅要两个TextPair的第一个key同样。他们就属于同一组。

他们的Value就放到一个Value迭代器中, * 然后进入Reducer的reduce方法中。 * * @author hduser * */ public static class GroupComparator extends WritableComparator { public GroupComparator() { super(TextPair.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { TextPair t1 = (TextPair) a; TextPair t2 = (TextPair) b; // 比較同样则返回0,比較不同则返回-1 return t1.getFirst().compareTo(t2.getFirst()); // 仅仅要是第一个字段同样的就分成为同一组 } } public boolean runJob(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); // 在conf中设置outputath变量,以在reduce函数中能够获取到该參数的值 conf.set("outputPath", args[args.length - 1].toString()); //设置HDFS中,每次任务生成产品的质量文件所在目录。args数组的倒数第二个原数为质量文件所在目录 conf.set("qualityFolder", args[args.length - 2].toString()); //假设在Server中执行。则须要获取web项目的根路径;假设以java应用方式调试,则读取/opt/hadoop-2.5.0/etc/hadoop/目录下的配置文件 //MapReduceProgress mprogress = new MapReduceProgress(); //String rootPath= mprogress.rootPath; String rootPath="/opt/hadoop-2.5.0/etc/hadoop/"; conf.addResource(new Path(rootPath+"yarn-site.xml")); conf.addResource(new Path(rootPath+"core-site.xml")); conf.addResource(new Path(rootPath+"hdfs-site.xml")); conf.addResource(new Path(rootPath+"mapred-site.xml")); this.job = new Job(conf); job.setJobName("Job name:" + args[0]); job.setJarByClass(GEMIMain.class); job.setMapperClass(GEMIMapper.class); job.setMapOutputKeyClass(TextPair.class); job.setMapOutputValueClass(BytesWritable.class); // 设置partition job.setPartitionerClass(NamePartitioner.class); // 在分区之后依照指定的条件分组 job.setGroupingComparatorClass(GroupComparator.class); job.setReducerClass(GEMIReducer.class); job.setInputFormatClass(WholeFileInputFormat.class); job.setOutputFormatClass(NullOutputFormat.class); // job.setOutputKeyClass(NullWritable.class); // job.setOutputValueClass(Text.class); job.setNumReduceTasks(8); // 设置计算输入数据的路径 for (int i = 1; i < args.length - 2; i++) { FileInputFormat.addInputPath(job, new Path(args[i])); } // args数组的最后一个元素为输出路径 FileOutputFormat.setOutputPath(job, new Path(args[args.length - 1])); boolean flag = job.waitForCompletion(true); return flag; } @SuppressWarnings("static-access") public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException { String[] inputPaths = new String[] { "normalizeJob", "hdfs://192.168.168.101:9000/user/hduser/red1/", "hdfs://192.168.168.101:9000/user/hduser/nir1/","quality11111", "hdfs://192.168.168.101:9000/user/hduser/test" }; GEMIMain test = new GEMIMain(); boolean result = test.runJob(inputPaths); } }

下面为TextPair类

public class TextPair implements WritableComparable<TextPair> {
	private Text first;
	private Text second;

	public TextPair() {
		set(new Text(), new Text());
	}

	public TextPair(String first, String second) {
		set(new Text(first), new Text(second));
	}

	public TextPair(Text first, Text second) {
		set(first, second);
	}

	public void set(Text first, Text second) {
		this.first = first;
		this.second = second;
	}

	public Text getFirst() {
		return first;
	}

	public Text getSecond() {
		return second;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		first.write(out);
		second.write(out);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		first.readFields(in);
		second.readFields(in);
	}

	@Override
	public int hashCode() {
		return first.hashCode() * 163 + second.hashCode();
	}

	@Override
	public boolean equals(Object o) {
		if (o instanceof TextPair) {
			TextPair tp = (TextPair) o;
			return first.equals(tp.first) && second.equals(tp.second);
		}
		return false;
	}

	@Override
	public String toString() {
		return first + "\t" + second;
	}
	
	@Override
	/**A.compareTo(B)
	 * 假设比較同样,则比較结果为0
	 * 假设A大于B,则比較结果为1
	 * 假设A小于B。则比較结果为-1
	 * 
	 */
	public int compareTo(TextPair tp) {
		int cmp = first.compareTo(tp.first);
		if (cmp != 0) {
			return cmp;
		}
		//此时实现的是升序排列
		return second.compareTo(tp.second);
	}
}

下面为WholeFileInputFormat,其控制数据在mapreduce过程中不被切分

package web.hadoop;

import java.io.IOException;  

import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.BytesWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.InputSplit;  
import org.apache.hadoop.mapreduce.JobContext;  
import org.apache.hadoop.mapreduce.RecordReader;  
import org.apache.hadoop.mapreduce.TaskAttemptContext;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {  
	  
    @Override  
    public RecordReader<Text, BytesWritable> createRecordReader(  
            InputSplit arg0, TaskAttemptContext arg1) throws IOException,  
            InterruptedException {  
        // TODO Auto-generated method stub  
        return new WholeFileRecordReader();  
    }  
  
    @Override  
    protected boolean isSplitable(JobContext context, Path filename) {  
        // TODO Auto-generated method stub  
        return false;  
    }  
}  

下面为WholeFileRecordReader类

package web.hadoop;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeFileRecordReader extends RecordReader<Text, BytesWritable> {

	private FileSplit fileSplit;
	private FSDataInputStream fis;

	private Text key = null;
	private BytesWritable value = null;

	private boolean processed = false;

	@Override
	public void close() throws IOException {
		// TODO Auto-generated method stub
		// fis.close();
	}

	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return this.key;
	}

	@Override
	public BytesWritable getCurrentValue() throws IOException,
			InterruptedException {
		// TODO Auto-generated method stub
		return this.value;
	}

	@Override
	public void initialize(InputSplit inputSplit, TaskAttemptContext tacontext)
			throws IOException, InterruptedException {

		fileSplit = (FileSplit) inputSplit;
		Configuration job = tacontext.getConfiguration();
		Path file = fileSplit.getPath();
		FileSystem fs = file.getFileSystem(job);
		fis = fs.open(file);
	}

	@Override
	public boolean nextKeyValue() {

		if (key == null) {
			key = new Text();
		}

		if (value == null) {
			value = new BytesWritable();
		}

		if (!processed) {
			byte[] content = new byte[(int) fileSplit.getLength()];

			Path file = fileSplit.getPath();

			System.out.println(file.getName());
			key.set(file.getName());

			try {
				IOUtils.readFully(fis, content, 0, content.length);
				// value.set(content, 0, content.length);
				value.set(new BytesWritable(content));
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} finally {
				IOUtils.closeStream(fis);
			}

			processed = true;
			return true;
		}

		return false;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return processed ? fileSplit.getLength() : 0;
	}

}


相关文章:

  • netstat 查询网络结构的用法
  • HDU 5298 Solid Geometry Homework 暴力
  • JavaWeb使用Session防止表单重复提交
  • redis高级(分布式缓存实现,spring integration)
  • iOS 参考 网络书籍
  • react redux 登陆拦截
  • 细谈多个平台编程与网页设计切换启示录----my note
  • elasticsearch 性能监控基础
  • 企业内部DNS从服务器架构的步骤
  • select a method for export 选项
  • 使用JNI与原生代码的通信
  • Yii源码解读-服务定位器(Service Locator)
  • JAVA-JSP之include指令
  • xml 与dto的相互转换
  • ubuntu下安装cx_oracle
  • 《Java8实战》-第四章读书笔记(引入流Stream)
  • 5、React组件事件详解
  • Apache的基本使用
  • co.js - 让异步代码同步化
  • egg(89)--egg之redis的发布和订阅
  • rc-form之最单纯情况
  • react-native 安卓真机环境搭建
  • select2 取值 遍历 设置默认值
  • ViewService——一种保证客户端与服务端同步的方法
  • 构造函数(constructor)与原型链(prototype)关系
  • 盘点那些不知名却常用的 Git 操作
  • 浅谈JavaScript的面向对象和它的封装、继承、多态
  • 数组大概知多少
  • 微信公众号开发小记——5.python微信红包
  • 小程序上传图片到七牛云(支持多张上传,预览,删除)
  • 由插件封装引出的一丢丢思考
  • ​MPV,汽车产品里一个特殊品类的进化过程
  • ​一些不规范的GTID使用场景
  • #[Composer学习笔记]Part1:安装composer并通过composer创建一个项目
  • #{}和${}的区别是什么 -- java面试
  • #gStore-weekly | gStore最新版本1.0之三角形计数函数的使用
  • #pragam once 和 #ifndef 预编译头
  • (51单片机)第五章-A/D和D/A工作原理-A/D
  • (8)Linux使用C语言读取proc/stat等cpu使用数据
  • (BFS)hdoj2377-Bus Pass
  • (PHP)设置修改 Apache 文件根目录 (Document Root)(转帖)
  • (Redis使用系列) Springboot 实现Redis消息的订阅与分布 四
  • (二)基于wpr_simulation 的Ros机器人运动控制,gazebo仿真
  • (附源码)springboot美食分享系统 毕业设计 612231
  • (每日持续更新)jdk api之StringBufferInputStream基础、应用、实战
  • (牛客腾讯思维编程题)编码编码分组打印下标题目分析
  • (五)网络优化与超参数选择--九五小庞
  • (原)本想说脏话,奈何已放下
  • ***详解账号泄露:全球约1亿用户已泄露
  • ... fatal error LINK1120:1个无法解析的外部命令 的解决办法
  • .java 9 找不到符号_java找不到符号
  • .NET Core实战项目之CMS 第十二章 开发篇-Dapper封装CURD及仓储代码生成器实现
  • .NET LINQ 通常分 Syntax Query 和Syntax Method
  • //解决validator验证插件多个name相同只验证第一的问题
  • /var/spool/postfix/maildrop 下有大量文件