05 Hadoop简单使用
目录
- 一、hadoop安装配置
- 二、运行hadoop
- 三、hadoop2.x和hadoop3.x变化
- 四、HDFS常用命令
- 五、Java操作HDFS
- 六、MapReduce
- 七、压缩
- 八、yarn常用命令
一、hadoop安装配置
1、下载解压hadoop-x.x.x.tar.gz
tar -xzvf hadoop-x.x.x.tar.gz
2、下载解压jdk
tar -xzvf jdkx.x.x_xxx.tar.gz
3、配置环境变量
vi /etc/profileexport JAVA_HOME=/root/env/jdk1.8.0_301
export HADOOP_HOME=/root/env/hadoop-3.3.1
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
3.1、hadoop3.x版本需要额外添加
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
3.2、加载环境变量
source /etc/profile
4、配置主机名&主机映射
4.1、主机名
vi /etc/hostname
4.2、主机映射
vi /etc/hosts172.16.162.71 namenode01
172.16.162.75 resourcemanager01
172.16.162.199 secondaryNN01
172.16.162.223 database
5、修改hadoop配置文件(路径:$HADOOP_HOME/etc/hadoop)
5.1、core-site.xml
<configuration><!-- 指定namenode地址 --><property><name>fs.defaultFS</name><value>hdfs://namenode01:9820</value></property><!-- 指定hadoop数据的存储目录 --><property><name>hadoop.tmp.dir</name><value>/root/env/hadoop-3.3.1/data</value></property><!-- namenode网页用户 --><property><name>hadoop.http.staticuser.user</name><value>root</value></property>
</configuration>
5.2、hdfs-site.xml
<configuration><!-- 指定Hadoop辅助名称节点主机配置 --><property><name>dfs.namenode.secondary.http-address</name><value>secondaryNN01:9868</value></property>
</configuration>
5.3、mapred-site.xml
<configuration><!-- 指定Mapreduce在yarn上运行 --><property><name>mapreduce.framework.name</name><value>yarn</value></property><!-- hadoop3.x版本中需要添加classpath配置 --><property><name>mapreduce.application.classpath</name><value>/root/env/hadoop-3.3.1/etc/hadoop:/root/env/hadoop-3.3.1/share/hadoop/common/lib/*:/root/env/hadoop-3.3.1/share/hadoop/common/*:/root/env/hadoop-3.3.1/share/hadoop/hdfs:/root/env/hadoop-3.3.1/share/hadoop/hdfs/lib/*:/root/env/hadoop-3.3.1/share/hadoop/hdfs/*:/root/env/hadoop-3.3.1/share/hadoop/mapreduce/*:/root/env/hadoop-3.3.1/share/hadoop/yarn:/root/env/hadoop-3.3.1/share/hadoop/yarn/lib/*:/root/env/hadoop-3.3.1/share/hadoop/yarn/*</value></property>
</configuration>
5.4、yarn-site.xml
<configuration><!-- 指定MR走shuffle --><property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property><!-- 指定resourcemanager地址 --><property><name>yarn.resourcemanager.hostname</name><value>resourcemanager01</value></property>
</configuration>
5.5、在workers文件中配置所有节点的ip或者主机名(hadoop2.x版本中是slaves)
5.6、配置完后,分发配置
rsync [path] [user]@[ip地址]:[path]
二、运行hadoop
1、使用命令格式namenode
hadoop namenode -format
或
hdfs namenode -format
2、在namenode节点启动hdfs
start-dfs.sh
3、在resourcemanager节点启动yarn
start-yarn.sh
三、hadoop2.x和hadoop3.x变化
1、功能
功能 | hadoop2.x | hadoop3.x |
---|---|---|
支持的最低Java版本 | Hadoop 2.x - java的最低支持版本是java 7 | Hadoop 3.x - java的最低支持版本是java 8 |
容错 | Hadoop 2.x - 可以通过复制(浪费空间)来处理容错。 | Hadoop 3.x - 可以通过Erasure编码处理容错。 |
数据平衡 | Hadoop 2.x - 对于数据,平衡使用HDFS平衡器。 | Hadoop 3.x - 对于数据,平衡使用Intra-data节点平衡器,该平衡器通过HDFS磁盘平衡器CLI调用。 |
存储Scheme | Hadoop 2.x - 使用3X副本Scheme | Hadoop 3.x - 支持HDFS中的擦除编码。 |
存储开销 | Hadoop 2.x - HDFS在存储空间中有200%的开销。 | Hadoop 3.x - 存储开销仅为50%。 |
存储开销示例 | Hadoop 2.x - 如果有6个块,那么由于副本方案(Scheme),将有18个块占用空间。 | Hadoop 3.x - 如果有6个块,那么将有9个块空间,6块block,3块用于奇偶校验。 |
YARN时间线服务 | Hadoop 2.x - 使用具有可伸缩性问题的旧时间轴服务。 | Hadoop 3.x - 改进时间线服务v2并提高时间线服务的可扩展性和可靠性。 |
兼容的文件系统 | Hadoop 2.x - HDFS(默认FS),FTP文件系统:它将所有数据存储在可远程访问的FTP服务器上。Amazon S3(简单存储服务)文件系统Windows Azure存储Blob(WASB)文件系统。 | Hadoop 3.x - 它支持所有前面以及Microsoft Azure Data Lake文件系统。 |
Datanode资源 | Hadoop 2.x - Datanode资源不专用于MapReduce,我们可以将它用于其他应用程序。 | Hadoop 3.x - 此处数据节点资源也可用于其他应用程序。 |
MR API兼容性 | Hadoop 2.x - 与Hadoop 1.x程序兼容的MR API,可在Hadoop 2.X上执行 | Hadoop 3.x - 此处,MR API与运行Hadoop 1.x程序兼容,以便在Hadoop 3.X上执行 |
HDFS联盟 | Hadoop 2.x - 在Hadoop 1.0中,只有一个NameNode来管理所有Namespace,但在Hadoop 2.0中,多个NameNode用于多个Namespace。 | Hadoop 3.x - Hadoop 3.x还有多个名称空间用于多个名称空间。 |
更快地访问数据 | Hadoop 2.x - 由于数据节点缓存,我们可以快速访问数据。 | Hadoop 3.x - 这里也通过Datanode缓存我们可以快速访问数据。 |
平台 | Hadoop 2.x - 可以作为各种数据分析的平台,可以运行事件处理,流媒体和实时操作。 | Hadoop 3.x - 这里也可以在YARN的顶部运行事件处理,流媒体和实时操作。 |
2、端口
应用 | Haddop 2.x port | Haddop 3.x port |
---|---|---|
Namenode | 8020 | 9820 |
NN HTTP UI | 50070 | 9870 |
NN HTTPS UI | 50470 | 9871 |
SNN HTTP | 50091 | 9869 |
SNN HTTP UI | 50090 | 9868 |
DN IPC | 50020 | 9867 |
DN | 50010 | 9866 |
DN HTTP UI | 50075 | 9864 |
Datanode | 50475 | 9865 |
其余端口没有变化,reourcemanager网页端口 8088
四、HDFS常用命令
(此时操作的是hdfs的目录,并不是Linux的目录)
1、帮助命令
hdfs dfs --help
查看
Usage: hadoop fs [generic options][-appendToFile <localsrc> ... <dst>][-cat [-ignoreCrc] <src> ...][-checksum [-v] <src> ...][-chgrp [-R] GROUP PATH...][-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...][-chown [-R] [OWNER][:[GROUP]] PATH...][-concat <target path> <src path> <src path> ...][-copyFromLocal [-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>][-copyToLocal [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>][-count [-q] [-h] [-v] [-t [<storage type>]] [-u] [-x] [-e] [-s] <path> ...][-cp [-f] [-p | -p[topax]] [-d] <src> ... <dst>][-createSnapshot <snapshotDir> [<snapshotName>]][-deleteSnapshot <snapshotDir> <snapshotName>][-df [-h] [<path> ...]][-du [-s] [-h] [-v] [-x] <path> ...][-expunge [-immediate] [-fs <path>]][-find <path> ... <expression> ...][-get [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>][-getfacl [-R] <path>][-getfattr [-R] {-n name | -d} [-e en] <path>][-getmerge [-nl] [-skip-empty-file] <src> <localdst>][-head <file>][-help [cmd ...]][-ls [-C] [-d] [-h] [-q] [-R] [-t] [-S] [-r] [-u] [-e] [<path> ...]][-mkdir [-p] <path> ...][-moveFromLocal [-f] [-p] [-l] [-d] <localsrc> ... <dst>][-moveToLocal <src> <localdst>][-mv <src> ... <dst>][-put [-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>][-renameSnapshot <snapshotDir> <oldName> <newName>][-rm [-f] [-r|-R] [-skipTrash] [-safely] <src> ...][-rmdir [--ignore-fail-on-non-empty] <dir> ...][-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]][-setfattr {-n name [-v value] | -x name} <path>][-setrep [-R] [-w] <rep> <path> ...][-stat [format] <path> ...][-tail [-f] [-s <sleep interval>] <file>][-test -[defswrz] <path>][-text [-ignoreCrc] <src> ...][-touch [-a] [-m] [-t TIMESTAMP (yyyyMMdd:HHmmss) ] [-c] <path> ...][-touchz <path> ...][-truncate [-w] <length> <path> ...][-usage [cmd ...]]Generic options supported are:
-conf <configuration file> specify an application configuration file
-D <property=value> define a value for a given property
-fs <file:///|hdfs://namenode:port> specify default filesystem URL to use, overrides 'fs.defaultFS' property from configurations.
-jt <local|resourcemanager:port> specify a ResourceManager
-files <file1,...> specify a comma-separated list of files to be copied to the map reduce cluster
-libjars <jar1,...> specify a comma-separated list of jar files to be included in the classpath
-archives <archive1,...> specify a comma-separated list of archives to be unarchived on the compute machinesThe general command line syntax is:
command [genericOptions] [commandOptions]
整体命令类似于Linux
2、HDFS上传下载操作
2.1、上传
hdfs dfs -put [src file] [HDFS path]
2.2、下载
hdfs dfs -get [HDFS path] [local path]
3、查看文件系统健康状态
hdfs dfsadmin -report
4、安全模式
安全模式是hadoop的一种保护机制,用于保证集群中的数据块的安全性。当hdfs进入安全模式时不允许客户端进行任何修改文件的操作,包括上传文件,删除文件,重命名,创建文件夹等操作。
当集群启动的时候,会首先进入安全模式。当系统处于安全模式时会检查数据块的完整性。假设我们设置的副本数(即参数dfs.replication)是5,那么在datanode上就应该有5个副本存在,假设只存在3个副本,那么比例就是3/5=0.6。通过配置dfs.safemode.threshold.pct
定义最小的副本率,默认为0.999。
4.1、查看安全模式
hdfs dfsadmin -safemode get
4.2、强制进入安全模式
hdfs dfsadmin -safemode enter
4.3、强制离开安全模式
hdfs dfsadmin -safemode leave
五、Java操作HDFS
1、pom导入依赖
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.10.1</version>
</dependency>
2、编码(使用模板方法设计模式)
2.1、Template接口
import org.apache.hadoop.fs.FileSystem;public interface Template {public void template(FileSystem fileSystem) throws Exception;}
2.2、Template实现类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;import java.net.URI;public class HadoopTemplate {public static void exec(URI uri, Configuration configuration, String user, Template template)throws Exception{//1.获取到客户端对象FileSystem fileSystem = FileSystem.get(uri, configuration, user);//2.执行操作template.template(fileSystem);//3.关闭资源fileSystem.close();}
}
2.3、调用类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;import java.net.URI;public class Test01 {public static void main(String[] args)throws Exception {String uri = "hdfs://172.16.106.56:9000";String user = "root";String path = "/test/input/a.txt";HadoopTemplate.exec(new URI(uri), new Configuration(), user, new Template() {@Overridepublic void template(FileSystem fileSystem) throws Exception {//执行操作FSDataInputStream stream = fileSystem.open(new Path(path));byte[] bytes = new byte[1024];int length = 0;while ((length = stream.read(bytes)) != -1){System.out.println(new String(bytes, 0, length));}stream.close();}});}
}
可对HDFS上的文件及目录进行增删改查操作
六、MapReduce
使用新API写法
1、MR程序
import lombok.extern.java.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class WordCount extends Configured implements Tool {/** 一次读取一行数据泛型LongWritable, Text, Text, IntWritableLongWritable为偏移量,就是每行数据开始的下标Text为当前行Text, IntWritable Map阶段输出的k,v类型(自定义)*/public static class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable>{private Text outKey = new Text();private IntWritable outValue = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 指定行内每个字段的分割符String[] words = value.toString().split(" ");for (String word : words) {outKey.set(word);// 循环写出给reduce处理context.write(outKey, outValue);}}}// Text, IntWritable 对应Map阶段的输出k,v// Text, IntWritable reduce阶段输出的k,v类型public static class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable>{/** 传进来的数据类似于key:hellovalues:(1,1,1)key为map阶段设置的key,values为map阶段输出的所有同一个key的value组成的可迭代对象*/@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable value : values) {sum += value.get();}context.write(key, new IntWritable(sum));}}@Overridepublic int run(String[] args) throws Exception {// 获取jobJob job = Job.getInstance(this.getConf());//设置jar包路径job.setJarByClass(this.getClass());//关联mapper和reducerjob.setMapperClass(WCMapper.class);job.setReducerClass(WCReducer.class);//设置map输出的k,v类型(对应mapper的输出类型)job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//设置最终输出的k,v类型(对应reducer输出类型)job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//设置输入、输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//提交jobboolean result = job.waitForCompletion(true);return result ? 0 : 1;}// args参数为命令行下输入的运行时参数public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();Path output = new Path(args[1]);FileSystem fileSystem = FileSystem.get(configuration);//判断HDFS上是否存在输出路径,有则删除if(fileSystem.exists(output)){System.out.println("==========目录已存在,执行删除==========");fileSystem.delete(output, true);}//运行int status = ToolRunner.run(configuration, new WordCount(), args);System.exit(status);}
}
2、自定义对象作为key或者value
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;// lombok自动生成get/set方法
@Data
// lombok自动生成无参构造器
@NoArgsConstructor
// lombok自动生成全参构造器
@AllArgsConstructor
/**
bean对象需要实现hadoop序列化接口
作为map/reduce的value只需要实现 Writable 接口,实现write和readFields方法
作为map/reduce的key需要再实现Comparable<T>比较接口,可直接实现WritableComparable<T>达到效果
*/
public class User implements WritableComparable<FlowBean> {private String name;private Integer age;// write和readFields属性顺序必须一致@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(this.name);dataOutput.writeInt(this.age);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.name = dataInput.readUTF();this.age = dataInput.readInt;}// 重写object的toString方法,不然MR输出的对象为内存地址@Overridepublic String toString() {return this.name + "\t" + this.age;}// Comparable<T>的比较方法,用于key排序@Overridepublic int compareTo(FlowBean o) {return totalFlow.compareTo(o.getTotalFlow());}
}
3、MR其余操作
3.1、多个小文件
job.setInputFormatClass(CombineTextInputFormat.class);
通过设置InpuFormat类型处理,不设置则会几个文件启动几个maptask 即使该文件没有达到切片大小
3.2、分区,reduce之后输出的文件
/**
Text, FlowBean为map阶段输出的k,v
通过方法return数字决定输出文件的分区,从0开始
*/
public static class FlowPartitioner extends Partitioner<Text, FlowBean>{@Overridepublic int getPartition(Text text, FlowBean flowBean, int i) {return 0;}
}
在run方法中设置
// 有几个分区就设置几个reduceTask
job.setNumReduceTasks(1);
// 分区的实现类
job.setPartitionerClass(FlowPartitioner.class);
3.3、reduce之前预聚合,MR优化(全局排序等不能使用)
// 可直接把自定义的redue类的class放入
job.setCombinerClass(mrEnum.getReducerClass());
4、输出到database
import mapreducer.DBWordCount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import entity.DBWCBean;public class File2DB extends Configured implements Tool {public static class DBMapper extends Mapper<LongWritable, Text, Text, IntWritable>{private Text outKey = new Text();private IntWritable outValue = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {String[] words = value.toString().split(" ");for (String word : words) {outKey.set(word);context.write(outKey, outValue);}}}public static class DBReducer extends Reducer<Text, IntWritable, DBWCBean, NullWritable>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, DBWCBean, NullWritable>.Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable value : values) {sum += value.get();}context.write(new DBWCBean(key.toString(), sum), NullWritable.get());}}@Overridepublic int run(String[] args) throws Exception {DBConfiguration.configureDB(this.getConf(), "com.mysql.cj.jdbc.Driver", "jdbc:mysql://database:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC", "root", "123456");Job job = Job.getInstance(this.getConf());job.setJarByClass(this.getClass());job.setMapperClass(DBMapper.class);job.setReducerClass(DBReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(DBWCBean.class);job.setOutputValueClass(NullWritable.class);job.setOutputFormatClass(DBOutputFormat.class);FileInputFormat.setInputPaths(job, new Path(args[0]));DBOutputFormat.setOutput(job, "wc", "word", "count");return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();System.exit(ToolRunner.run(configuration, new File2DB(), args));}
}
5、可以自定义OutputFormat
5.1、继承FileOutputFormat<K,V>,实现getRecordWriter
5.2、继承RecordWriter<K,V>,并对其按业务需求进行实现
不做演示
6、MR实现Join操作
6.1、reduce端实现
实例文件
order.xt
01 1001 1
02 1002 2
03 1003 3
01 1004 4
02 1005 5
03 1006 6
pd.txt
01 小米
02 华为
03 苹果
自定义bean对象(包含所有值)
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;@Data
@NoArgsConstructor
@AllArgsConstructor
public class TableJoinBean implements WritableComparable<TableJoinBean> {private String orderId = "";private String productName = "";private int sum;private boolean flag;public TableJoinBean(String orderId, int sum){this.orderId = orderId;this.sum = sum;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(this.orderId);dataOutput.writeUTF(this.productName);dataOutput.writeInt(this.sum);dataOutput.writeBoolean(this.flag);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.orderId = dataInput.readUTF();this.productName = dataInput.readUTF();this.sum = dataInput.readInt();this.flag = dataInput.readBoolean();}@Overridepublic int compareTo(TableJoinBean o) {return this.orderId.compareTo(o.getOrderId());}@Overridepublic String toString() {return orderId + '\t' + productName + '\t' + sum;}
}
Reduce&Mapper
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;
import java.util.ArrayList;public class TableJoin {public static class TJMapper extends Mapper<LongWritable, Text, Text, TableJoinBean>{private String fileName;private Text k = new Text();private TableJoinBean v = new TableJoinBean();// map方法执行前,并且只执行一下 map方法每来一行数据执行一次@Overrideprotected void setup(Context context) throws IOException, InterruptedException {FileSplit inputSplit = (FileSplit) context.getInputSplit();// 获取当前文件的名称fileName = inputSplit.getPath().getName();}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] strings = value.toString().split(" ");k.set(strings[0]);// 对象文件名称进行判断,并给对象对应的属性赋值v.setFlag(fileName.contains("order"));if(v.isFlag()){v.setOrderId(strings[1]);v.setSum(Integer.parseInt(strings[2]));}else {v.setProductName(strings[1]);}context.write(k, v);}}public static class TJReducer extends Reducer<Text, TableJoinBean, Text, TableJoinBean>{@Overrideprotected void reduce(Text key, Iterable<TableJoinBean> values, Context context) throws IOException, InterruptedException {String productName = "";ArrayList<TableJoinBean> tableJoinBeans = new ArrayList<>();// 对同一key的对象属性进行聚合操作for (TableJoinBean value : values) {if(value.isFlag()){// 因为Hadoop重写了Iterable,复用对象以达到优化的效果 所有得每次得重新创建对象tableJoinBeans.add(new TableJoinBean(value.getOrderId(), value.getSum()));}else {productName = value.getProductName();}}for (TableJoinBean tableJoinBean : tableJoinBeans) {tableJoinBean.setProductName(productName);context.write(key, tableJoinBean);}}}}
6.2、Map端实现(效率会比reduce端快,mapTask数量多余reduceTask)
思路:
不需要reduce,设置reduceTask为0
job.setNumReduceTasks(0);
在map端把较小的一张表加载为缓存,并把k,v用map存起来
job.addCacheFile(new URL("HDFS上文件的路径,或者本地文件的路径"));
setup()方法中:
-
获取缓存的文件
URL[] cacheFiles = context.getCacheFiles(); FileSystem fs = FileSystem.get(context.getConfiguration()); FSDataInputStream fis = fs.open(new Path(cacheFiles[0]));
-
循环读取文件的一行
BufferdReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));// java io流读取操作 while(...){... }
-
切割
-
缓存数据到HashMap
-
关闭流
map()方法中:
- 获取一行
- 截取
- 获取对应HashMap的key
- 获取其余值
- 拼接
- 写出
七、压缩
1、MR支持的压缩编码
压缩格式 | Hadoop自带 | 算法 | 文件扩展名 | 是否支持切片 | 是否修改程序 |
---|---|---|---|---|---|
DEFAULT | 是 | DEFAULT | .default | 否 | 否 |
Gzip | 是 | DEFAULT | .gz | 否 | 否 |
bzip2 | 是 | bzip2 | .bz2 | 是 | 否 |
LZO | 否(需要安装) | LOZ | .loz | 是 | 需要创建索引,指定输入格式 |
Snappy | 是 | Snappy` | .snappy | 都 | 否 |
2、输入端
2.1、数据量小于块大小,优先考虑压缩速度 LZO/Snappy
2.2、数据量大,优先考虑支持切片 Bzip/LZO
3、mapper输出端
重点考虑压缩/解压速度 LZO/Snappy
4、Reduce输出端
4.1、如果数据永久保存,考虑压缩率较高的 Bzip2/Gzip
4.2、如果传给下一个MR做处理需考虑数据量和是否支持切片
5、实际配置
参数 | 默认值 | 阶段 |
---|---|---|
io.compression.codecs(core-site.xml) | 无 | 输入压缩 |
mapreduce.map.output.compress(mapred-site.xml) | false | mapper输出 |
mapreduce.map.output.compress.codec(mapred-site.xml) | org.apache.hadoop.io.compress.DefaultCodec | mapper输出 |
mapreduce.output.fileoutputformat.compress(mapred-site.xml) | false | reduce输出 |
mapreduce.output.fileoutputformat.compress.codec(mapred-site.xml) | org.apache.hadoop.io.compress.DefaultCodec | reduce输出 |
6、在Java代码中配置
...
Configuration conf = new Configuration();// map输出端开启压缩
/**
conf.setBoolean("mapreduce.map.output.compress", true);
conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
*/// reduce输出端开启压缩
/**
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, Bzip2Codec.class);
*/
八、yarn常用命令
1、yarn application 查看任务
1.1、列出所有任务
yarn application -list
1.2、根据 application 状态过滤
所有状态:ALL、NEW、NEW_SAVING、SUBMITTED、ACCEPTED、RUNNING、FINISHED、FAILED、KILLED
yarn application -list -appStates [状态]
1.3、kill 掉 application
yarn application -kill [applicaionId]
1.4、查询 application 日志
yarn logs -applicationId [applicationId]
1.5、查看 container日志
yarn logs -applicationId [applicationId] -containerId [containerId]
1.6、列出 application 尝试的列表
yarn applicationattempt -list [applicationId]
1.7、打印 applicationAttemp状态
yarn applicationattempt -status [applicationAttemptId]
1.8、列出所有容器
yarn container -list [applicationAttemptId]
1.9、打印容器状态
yarn container -status [containerId]
1.10、查看node状态
yarn node -list -all
1.11、加载队列配置
yarn remadmin -reefreshQueues
1.12、查看队列
yarn queue -status [queueName]
九、yarn生产环境核心参数配置
1、resourceManager相关
参数 | 作用 |
---|---|
yarn.resourcemanager.scheduler.class | 配置调度器 |
yarn.resourcemanager.scheduler.client.thread.count | resourcemanager处理的线程数,默认50 |
2、nodeManager相关(每个节点单独配置)
参数 | 作用 |
---|---|
yarn.nodemanager.resource.detect-hardware-capabilities | yarn自己检查硬件进行配置,默认false |
yarn.nodemanager.resource.count-logical-processors-as-cores | 是否将虚拟核数当中cpu核数,默认false |
yarn.nodemanager.resource.pcores-vcores-multiplier | 虚拟核数与物理核数的比值,默认1.0 |
yarn.nodemanager.resource.memory-mb | nodeManager使用内存,默认8G |
yarn.nodemanager.resource.system-reserved-memore-mb | nodeManager为系统保留多少内存 |
yarn.nodemanager.resource.cpu-vcores | nodeManager使用CPU核数,默认8 |
yarn.nodemanager.pmem-check-enabled | 是否开启物理内存检查限制container,默认true |
yarn.nodemanager.vmem-check-enabled | 是否开启虚拟内存检查限制container,默认true |
yarn.nodemanager.vmem-pmem-ratio | 虚拟内存物理内存比例,默认2.1 |
3、container相关
参数 | 作用 |
---|---|
yarn.scheduler.minimum-allocation-mb | 容器最小内存,默认1G |
yarn.scheduler.maximum-allocation-mb | 容器最大内存,默认8G |
yarn.scheduler.minimum-allocation-vcores | 容器最小cpu核数,默认1个 |
yarn.scheduler.maximum-allocation-vcores | 容器最大cpu核数,默认4个 |
es-vcores-multiplier | 虚拟核数与物理核数的比值,默认1.0 |
| yarn.nodemanager.resource.memory-mb | nodeManager使用内存,默认8G |
| yarn.nodemanager.resource.system-reserved-memore-mb | nodeManager为系统保留多少内存 |
| yarn.nodemanager.resource.cpu-vcores | nodeManager使用CPU核数,默认8 |
| yarn.nodemanager.pmem-check-enabled | 是否开启物理内存检查限制container,默认true |
| yarn.nodemanager.vmem-check-enabled | 是否开启虚拟内存检查限制container,默认true |
| yarn.nodemanager.vmem-pmem-ratio | 虚拟内存物理内存比例,默认2.1 |
3、container相关
参数 | 作用 |
---|---|
yarn.scheduler.minimum-allocation-mb | 容器最小内存,默认1G |
yarn.scheduler.maximum-allocation-mb | 容器最大内存,默认8G |
yarn.scheduler.minimum-allocation-vcores | 容器最小cpu核数,默认1个 |
yarn.scheduler.maximum-allocation-vcores | 容器最大cpu核数,默认4个 |