当前位置: 首页 > news >正文

05 Hadoop简单使用

目录

    • 一、hadoop安装配置
    • 二、运行hadoop
    • 三、hadoop2.x和hadoop3.x变化
    • 四、HDFS常用命令
    • 五、Java操作HDFS
    • 六、MapReduce
    • 七、压缩
    • 八、yarn常用命令

一、hadoop安装配置

​ 1、下载解压hadoop-x.x.x.tar.gz

tar -xzvf hadoop-x.x.x.tar.gz

​ 2、下载解压jdk

tar -xzvf jdkx.x.x_xxx.tar.gz

3、配置环境变量

vi /etc/profileexport JAVA_HOME=/root/env/jdk1.8.0_301
export HADOOP_HOME=/root/env/hadoop-3.3.1
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

​ 3.1、hadoop3.x版本需要额外添加

export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root

​ 3.2、加载环境变量

source /etc/profile

4、配置主机名&主机映射

​ 4.1、主机名

vi /etc/hostname

​ 4.2、主机映射

vi /etc/hosts172.16.162.71 namenode01
172.16.162.75 resourcemanager01
172.16.162.199 secondaryNN01
172.16.162.223 database

5、修改hadoop配置文件(路径:$HADOOP_HOME/etc/hadoop)

​ 5.1、core-site.xml

<configuration><!-- 指定namenode地址 --><property><name>fs.defaultFS</name><value>hdfs://namenode01:9820</value></property><!-- 指定hadoop数据的存储目录 --><property><name>hadoop.tmp.dir</name><value>/root/env/hadoop-3.3.1/data</value></property><!-- namenode网页用户 --><property><name>hadoop.http.staticuser.user</name><value>root</value></property>
</configuration>

​ 5.2、hdfs-site.xml

<configuration><!-- 指定Hadoop辅助名称节点主机配置 --><property><name>dfs.namenode.secondary.http-address</name><value>secondaryNN01:9868</value></property>
</configuration>

​ 5.3、mapred-site.xml

<configuration><!-- 指定Mapreduce在yarn上运行 --><property><name>mapreduce.framework.name</name><value>yarn</value></property><!-- hadoop3.x版本中需要添加classpath配置 --><property><name>mapreduce.application.classpath</name><value>/root/env/hadoop-3.3.1/etc/hadoop:/root/env/hadoop-3.3.1/share/hadoop/common/lib/*:/root/env/hadoop-3.3.1/share/hadoop/common/*:/root/env/hadoop-3.3.1/share/hadoop/hdfs:/root/env/hadoop-3.3.1/share/hadoop/hdfs/lib/*:/root/env/hadoop-3.3.1/share/hadoop/hdfs/*:/root/env/hadoop-3.3.1/share/hadoop/mapreduce/*:/root/env/hadoop-3.3.1/share/hadoop/yarn:/root/env/hadoop-3.3.1/share/hadoop/yarn/lib/*:/root/env/hadoop-3.3.1/share/hadoop/yarn/*</value></property>
</configuration>

​ 5.4、yarn-site.xml

<configuration><!-- 指定MR走shuffle --><property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property><!-- 指定resourcemanager地址 --><property><name>yarn.resourcemanager.hostname</name><value>resourcemanager01</value></property>
</configuration>

​ 5.5、在workers文件中配置所有节点的ip或者主机名(hadoop2.x版本中是slaves)

​ 5.6、配置完后,分发配置

rsync [path] [user]@[ip地址]:[path]

二、运行hadoop

1、使用命令格式namenode

hadoop namenode -format

hdfs namenode -format

2、在namenode节点启动hdfs

start-dfs.sh

3、在resourcemanager节点启动yarn

start-yarn.sh

三、hadoop2.x和hadoop3.x变化

1、功能

功能hadoop2.xhadoop3.x
支持的最低Java版本Hadoop 2.x - java的最低支持版本是java 7Hadoop 3.x - java的最低支持版本是java 8
容错Hadoop 2.x - 可以通过复制(浪费空间)来处理容错。Hadoop 3.x - 可以通过Erasure编码处理容错。
数据平衡Hadoop 2.x - 对于数据,平衡使用HDFS平衡器。Hadoop 3.x - 对于数据,平衡使用Intra-data节点平衡器,该平衡器通过HDFS磁盘平衡器CLI调用。
存储SchemeHadoop 2.x - 使用3X副本SchemeHadoop 3.x - 支持HDFS中的擦除编码。
存储开销Hadoop 2.x - HDFS在存储空间中有200%的开销。Hadoop 3.x - 存储开销仅为50%。
存储开销示例Hadoop 2.x - 如果有6个块,那么由于副本方案(Scheme),将有18个块占用空间。Hadoop 3.x - 如果有6个块,那么将有9个块空间,6块block,3块用于奇偶校验。
YARN时间线服务Hadoop 2.x - 使用具有可伸缩性问题的旧时间轴服务。Hadoop 3.x - 改进时间线服务v2并提高时间线服务的可扩展性和可靠性。
兼容的文件系统Hadoop 2.x - HDFS(默认FS),FTP文件系统:它将所有数据存储在可远程访问的FTP服务器上。Amazon S3(简单存储服务)文件系统Windows Azure存储Blob(WASB)文件系统。Hadoop 3.x - 它支持所有前面以及Microsoft Azure Data Lake文件系统。
Datanode资源Hadoop 2.x - Datanode资源不专用于MapReduce,我们可以将它用于其他应用程序。Hadoop 3.x - 此处数据节点资源也可用于其他应用程序。
MR API兼容性Hadoop 2.x - 与Hadoop 1.x程序兼容的MR API,可在Hadoop 2.X上执行Hadoop 3.x - 此处,MR API与运行Hadoop 1.x程序兼容,以便在Hadoop 3.X上执行
HDFS联盟Hadoop 2.x - 在Hadoop 1.0中,只有一个NameNode来管理所有Namespace,但在Hadoop 2.0中,多个NameNode用于多个Namespace。Hadoop 3.x - Hadoop 3.x还有多个名称空间用于多个名称空间。
更快地访问数据Hadoop 2.x - 由于数据节点缓存,我们可以快速访问数据。Hadoop 3.x - 这里也通过Datanode缓存我们可以快速访问数据。
平台Hadoop 2.x - 可以作为各种数据分析的平台,可以运行事件处理,流媒体和实时操作。Hadoop 3.x - 这里也可以在YARN的顶部运行事件处理,流媒体和实时操作。

2、端口

应用Haddop 2.x portHaddop 3.x port
Namenode80209820
NN HTTP UI500709870
NN HTTPS UI504709871
SNN HTTP500919869
SNN HTTP UI500909868
DN IPC500209867
DN500109866
DN HTTP UI500759864
Datanode504759865

其余端口没有变化,reourcemanager网页端口 8088

四、HDFS常用命令

(此时操作的是hdfs的目录,并不是Linux的目录)

1、帮助命令

hdfs dfs --help

查看

Usage: hadoop fs [generic options][-appendToFile <localsrc> ... <dst>][-cat [-ignoreCrc] <src> ...][-checksum [-v] <src> ...][-chgrp [-R] GROUP PATH...][-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...][-chown [-R] [OWNER][:[GROUP]] PATH...][-concat <target path> <src path> <src path> ...][-copyFromLocal [-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>][-copyToLocal [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>][-count [-q] [-h] [-v] [-t [<storage type>]] [-u] [-x] [-e] [-s] <path> ...][-cp [-f] [-p | -p[topax]] [-d] <src> ... <dst>][-createSnapshot <snapshotDir> [<snapshotName>]][-deleteSnapshot <snapshotDir> <snapshotName>][-df [-h] [<path> ...]][-du [-s] [-h] [-v] [-x] <path> ...][-expunge [-immediate] [-fs <path>]][-find <path> ... <expression> ...][-get [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>][-getfacl [-R] <path>][-getfattr [-R] {-n name | -d} [-e en] <path>][-getmerge [-nl] [-skip-empty-file] <src> <localdst>][-head <file>][-help [cmd ...]][-ls [-C] [-d] [-h] [-q] [-R] [-t] [-S] [-r] [-u] [-e] [<path> ...]][-mkdir [-p] <path> ...][-moveFromLocal [-f] [-p] [-l] [-d] <localsrc> ... <dst>][-moveToLocal <src> <localdst>][-mv <src> ... <dst>][-put [-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>][-renameSnapshot <snapshotDir> <oldName> <newName>][-rm [-f] [-r|-R] [-skipTrash] [-safely] <src> ...][-rmdir [--ignore-fail-on-non-empty] <dir> ...][-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]][-setfattr {-n name [-v value] | -x name} <path>][-setrep [-R] [-w] <rep> <path> ...][-stat [format] <path> ...][-tail [-f] [-s <sleep interval>] <file>][-test -[defswrz] <path>][-text [-ignoreCrc] <src> ...][-touch [-a] [-m] [-t TIMESTAMP (yyyyMMdd:HHmmss) ] [-c] <path> ...][-touchz <path> ...][-truncate [-w] <length> <path> ...][-usage [cmd ...]]Generic options supported are:
-conf <configuration file>        specify an application configuration file
-D <property=value>               define a value for a given property
-fs <file:///|hdfs://namenode:port> specify default filesystem URL to use, overrides 'fs.defaultFS' property from configurations.
-jt <local|resourcemanager:port>  specify a ResourceManager
-files <file1,...>                specify a comma-separated list of files to be copied to the map reduce cluster
-libjars <jar1,...>               specify a comma-separated list of jar files to be included in the classpath
-archives <archive1,...>          specify a comma-separated list of archives to be unarchived on the compute machinesThe general command line syntax is:
command [genericOptions] [commandOptions]

整体命令类似于Linux

2、HDFS上传下载操作

​ 2.1、上传

hdfs dfs -put [src file] [HDFS path]

​ 2.2、下载

hdfs dfs -get [HDFS path] [local path]

3、查看文件系统健康状态

hdfs dfsadmin -report

4、安全模式

安全模式是hadoop的一种保护机制,用于保证集群中的数据块的安全性。当hdfs进入安全模式时不允许客户端进行任何修改文件的操作,包括上传文件,删除文件,重命名,创建文件夹等操作。

当集群启动的时候,会首先进入安全模式。当系统处于安全模式时会检查数据块的完整性。假设我们设置的副本数(即参数dfs.replication)是5,那么在datanode上就应该有5个副本存在,假设只存在3个副本,那么比例就是3/5=0.6。通过配置dfs.safemode.threshold.pct定义最小的副本率,默认为0.999。

​ 4.1、查看安全模式

hdfs dfsadmin -safemode get

​ 4.2、强制进入安全模式

hdfs dfsadmin -safemode enter

​ 4.3、强制离开安全模式

hdfs dfsadmin -safemode leave

五、Java操作HDFS

1、pom导入依赖

<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.10.1</version>
</dependency>

2、编码(使用模板方法设计模式)

​ 2.1、Template接口

import org.apache.hadoop.fs.FileSystem;public interface Template {public void template(FileSystem fileSystem) throws Exception;}

​ 2.2、Template实现类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;import java.net.URI;public class HadoopTemplate {public static void exec(URI uri, Configuration configuration, String user, Template template)throws Exception{//1.获取到客户端对象FileSystem fileSystem = FileSystem.get(uri, configuration, user);//2.执行操作template.template(fileSystem);//3.关闭资源fileSystem.close();}
}

​ 2.3、调用类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;import java.net.URI;public class Test01 {public static void main(String[] args)throws Exception {String uri = "hdfs://172.16.106.56:9000";String user = "root";String path = "/test/input/a.txt";HadoopTemplate.exec(new URI(uri), new Configuration(), user, new Template() {@Overridepublic void template(FileSystem fileSystem) throws Exception {//执行操作FSDataInputStream stream = fileSystem.open(new Path(path));byte[] bytes = new byte[1024];int length = 0;while ((length = stream.read(bytes)) != -1){System.out.println(new String(bytes, 0, length));}stream.close();}});}
}

可对HDFS上的文件及目录进行增删改查操作

六、MapReduce

使用新API写法

1、MR程序

import lombok.extern.java.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class WordCount extends Configured implements Tool {/** 一次读取一行数据泛型LongWritable, Text, Text, IntWritableLongWritable为偏移量,就是每行数据开始的下标Text为当前行Text, IntWritable Map阶段输出的k,v类型(自定义)*/public static class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable>{private Text outKey = new Text();private IntWritable outValue = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 指定行内每个字段的分割符String[] words = value.toString().split(" ");for (String word : words) {outKey.set(word);// 循环写出给reduce处理context.write(outKey, outValue);}}}// Text, IntWritable 对应Map阶段的输出k,v// Text, IntWritable reduce阶段输出的k,v类型public static class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable>{/** 传进来的数据类似于key:hellovalues:(1,1,1)key为map阶段设置的key,values为map阶段输出的所有同一个key的value组成的可迭代对象*/@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable value : values) {sum += value.get();}context.write(key, new IntWritable(sum));}}@Overridepublic int run(String[] args) throws Exception {// 获取jobJob job = Job.getInstance(this.getConf());//设置jar包路径job.setJarByClass(this.getClass());//关联mapper和reducerjob.setMapperClass(WCMapper.class);job.setReducerClass(WCReducer.class);//设置map输出的k,v类型(对应mapper的输出类型)job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//设置最终输出的k,v类型(对应reducer输出类型)job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//设置输入、输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//提交jobboolean result = job.waitForCompletion(true);return result ? 0 : 1;}// args参数为命令行下输入的运行时参数public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();Path output = new Path(args[1]);FileSystem fileSystem = FileSystem.get(configuration);//判断HDFS上是否存在输出路径,有则删除if(fileSystem.exists(output)){System.out.println("==========目录已存在,执行删除==========");fileSystem.delete(output, true);}//运行int status = ToolRunner.run(configuration, new WordCount(), args);System.exit(status);}
}

2、自定义对象作为key或者value

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;// lombok自动生成get/set方法
@Data
// lombok自动生成无参构造器
@NoArgsConstructor
// lombok自动生成全参构造器
@AllArgsConstructor
/**
bean对象需要实现hadoop序列化接口
作为map/reduce的value只需要实现 Writable 接口,实现write和readFields方法
作为map/reduce的key需要再实现Comparable<T>比较接口,可直接实现WritableComparable<T>达到效果
*/
public class User implements WritableComparable<FlowBean> {private String name;private Integer age;// write和readFields属性顺序必须一致@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(this.name);dataOutput.writeInt(this.age);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.name = dataInput.readUTF();this.age = dataInput.readInt;}// 重写object的toString方法,不然MR输出的对象为内存地址@Overridepublic String toString() {return this.name + "\t" + this.age;}// Comparable<T>的比较方法,用于key排序@Overridepublic int compareTo(FlowBean o) {return totalFlow.compareTo(o.getTotalFlow());}
}

3、MR其余操作

​ 3.1、多个小文件

job.setInputFormatClass(CombineTextInputFormat.class);

​ 通过设置InpuFormat类型处理,不设置则会几个文件启动几个maptask 即使该文件没有达到切片大小

​ 3.2、分区,reduce之后输出的文件

/**
Text, FlowBean为map阶段输出的k,v
通过方法return数字决定输出文件的分区,从0开始
*/
public static class FlowPartitioner extends Partitioner<Text, FlowBean>{@Overridepublic int getPartition(Text text, FlowBean flowBean, int i) {return 0;}
}

​ 在run方法中设置

// 有几个分区就设置几个reduceTask
job.setNumReduceTasks(1);
// 分区的实现类
job.setPartitionerClass(FlowPartitioner.class);

​ 3.3、reduce之前预聚合,MR优化(全局排序等不能使用)

// 可直接把自定义的redue类的class放入
job.setCombinerClass(mrEnum.getReducerClass());

4、输出到database

import mapreducer.DBWordCount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import entity.DBWCBean;public class File2DB extends Configured implements Tool {public static class DBMapper extends Mapper<LongWritable, Text, Text, IntWritable>{private Text outKey = new Text();private IntWritable outValue = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {String[] words = value.toString().split(" ");for (String word : words) {outKey.set(word);context.write(outKey, outValue);}}}public static class DBReducer extends Reducer<Text, IntWritable, DBWCBean, NullWritable>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, DBWCBean, NullWritable>.Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable value : values) {sum += value.get();}context.write(new DBWCBean(key.toString(), sum), NullWritable.get());}}@Overridepublic int run(String[] args) throws Exception {DBConfiguration.configureDB(this.getConf(), "com.mysql.cj.jdbc.Driver", "jdbc:mysql://database:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC", "root", "123456");Job job = Job.getInstance(this.getConf());job.setJarByClass(this.getClass());job.setMapperClass(DBMapper.class);job.setReducerClass(DBReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(DBWCBean.class);job.setOutputValueClass(NullWritable.class);job.setOutputFormatClass(DBOutputFormat.class);FileInputFormat.setInputPaths(job, new Path(args[0]));DBOutputFormat.setOutput(job, "wc", "word", "count");return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();System.exit(ToolRunner.run(configuration, new File2DB(), args));}
}

5、可以自定义OutputFormat

​ 5.1、继承FileOutputFormat<K,V>,实现getRecordWriter

​ 5.2、继承RecordWriter<K,V>,并对其按业务需求进行实现

不做演示

6、MR实现Join操作

​ 6.1、reduce端实现

​ 实例文件

​ order.xt

01 1001 1
02 1002 2
03 1003 3
01 1004 4
02 1005 5
03 1006 6

​ pd.txt

01 小米
02 华为
03 苹果

​ 自定义bean对象(包含所有值)

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;@Data
@NoArgsConstructor
@AllArgsConstructor
public class TableJoinBean implements WritableComparable<TableJoinBean> {private String orderId = "";private String productName = "";private int sum;private boolean flag;public TableJoinBean(String orderId, int sum){this.orderId = orderId;this.sum = sum;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(this.orderId);dataOutput.writeUTF(this.productName);dataOutput.writeInt(this.sum);dataOutput.writeBoolean(this.flag);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.orderId = dataInput.readUTF();this.productName = dataInput.readUTF();this.sum = dataInput.readInt();this.flag = dataInput.readBoolean();}@Overridepublic int compareTo(TableJoinBean o) {return this.orderId.compareTo(o.getOrderId());}@Overridepublic String toString() {return orderId + '\t' + productName + '\t' + sum;}
}

​ Reduce&Mapper

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;
import java.util.ArrayList;public class TableJoin {public static class TJMapper extends Mapper<LongWritable, Text, Text, TableJoinBean>{private String fileName;private Text k = new Text();private TableJoinBean v = new TableJoinBean();// map方法执行前,并且只执行一下   map方法每来一行数据执行一次@Overrideprotected void setup(Context context) throws IOException, InterruptedException {FileSplit inputSplit = (FileSplit) context.getInputSplit();// 获取当前文件的名称fileName = inputSplit.getPath().getName();}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] strings = value.toString().split(" ");k.set(strings[0]);// 对象文件名称进行判断,并给对象对应的属性赋值v.setFlag(fileName.contains("order"));if(v.isFlag()){v.setOrderId(strings[1]);v.setSum(Integer.parseInt(strings[2]));}else {v.setProductName(strings[1]);}context.write(k, v);}}public static class TJReducer extends Reducer<Text, TableJoinBean, Text, TableJoinBean>{@Overrideprotected void reduce(Text key, Iterable<TableJoinBean> values, Context context) throws IOException, InterruptedException {String productName = "";ArrayList<TableJoinBean> tableJoinBeans = new ArrayList<>();// 对同一key的对象属性进行聚合操作for (TableJoinBean value : values) {if(value.isFlag()){// 因为Hadoop重写了Iterable,复用对象以达到优化的效果  所有得每次得重新创建对象tableJoinBeans.add(new TableJoinBean(value.getOrderId(), value.getSum()));}else {productName = value.getProductName();}}for (TableJoinBean tableJoinBean : tableJoinBeans) {tableJoinBean.setProductName(productName);context.write(key, tableJoinBean);}}}}

​ 6.2、Map端实现(效率会比reduce端快,mapTask数量多余reduceTask)

​ 思路:

​ 不需要reduce,设置reduceTask为0

job.setNumReduceTasks(0);

​ 在map端把较小的一张表加载为缓存,并把k,v用map存起来

job.addCacheFile(new URL("HDFS上文件的路径,或者本地文件的路径"));

setup()方法中:

  1. 获取缓存的文件

    URL[] cacheFiles = context.getCacheFiles();
    FileSystem fs = FileSystem.get(context.getConfiguration());
    FSDataInputStream fis = fs.open(new Path(cacheFiles[0]));
    
  2. 循环读取文件的一行

    BufferdReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));// java io流读取操作
    while(...){...
    }
    
  3. 切割

  4. 缓存数据到HashMap

  5. 关闭流

map()方法中:

  1. 获取一行
  2. 截取
  3. 获取对应HashMap的key
  4. 获取其余值
  5. 拼接
  6. 写出

七、压缩

1、MR支持的压缩编码

压缩格式Hadoop自带算法文件扩展名是否支持切片是否修改程序
DEFAULTDEFAULT.default
GzipDEFAULT.gz
bzip2bzip2.bz2
LZO否(需要安装)LOZ.loz需要创建索引,指定输入格式
SnappySnappy`.snappy

2、输入端

​ 2.1、数据量小于块大小,优先考虑压缩速度 LZO/Snappy

​ 2.2、数据量大,优先考虑支持切片 Bzip/LZO

3、mapper输出端

​ 重点考虑压缩/解压速度 LZO/Snappy

4、Reduce输出端

​ 4.1、如果数据永久保存,考虑压缩率较高的 Bzip2/Gzip

​ 4.2、如果传给下一个MR做处理需考虑数据量和是否支持切片

5、实际配置

参数默认值阶段
io.compression.codecs(core-site.xml)输入压缩
mapreduce.map.output.compress(mapred-site.xml)falsemapper输出
mapreduce.map.output.compress.codec(mapred-site.xml)org.apache.hadoop.io.compress.DefaultCodecmapper输出
mapreduce.output.fileoutputformat.compress(mapred-site.xml)falsereduce输出
mapreduce.output.fileoutputformat.compress.codec(mapred-site.xml)org.apache.hadoop.io.compress.DefaultCodecreduce输出

6、在Java代码中配置

...
Configuration conf = new Configuration();// map输出端开启压缩
/**
conf.setBoolean("mapreduce.map.output.compress", true);
conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
*/// reduce输出端开启压缩
/**
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, Bzip2Codec.class);
*/

八、yarn常用命令

1、yarn application 查看任务

​ 1.1、列出所有任务

yarn application -list

​ 1.2、根据 application 状态过滤

​ 所有状态:ALL、NEW、NEW_SAVING、SUBMITTED、ACCEPTED、RUNNING、FINISHED、FAILED、KILLED

yarn application -list -appStates [状态]

​ 1.3、kill 掉 application

yarn application -kill [applicaionId]

​ 1.4、查询 application 日志

yarn logs -applicationId [applicationId]

​ 1.5、查看 container日志

yarn logs -applicationId [applicationId] -containerId [containerId]

​ 1.6、列出 application 尝试的列表

yarn applicationattempt -list [applicationId]

​ 1.7、打印 applicationAttemp状态

yarn applicationattempt -status [applicationAttemptId]

​ 1.8、列出所有容器

yarn container -list [applicationAttemptId]

​ 1.9、打印容器状态

yarn container -status [containerId]

​ 1.10、查看node状态

yarn node -list -all

​ 1.11、加载队列配置

yarn remadmin -reefreshQueues

​ 1.12、查看队列

yarn queue -status [queueName]

九、yarn生产环境核心参数配置

1、resourceManager相关

参数作用
yarn.resourcemanager.scheduler.class配置调度器
yarn.resourcemanager.scheduler.client.thread.countresourcemanager处理的线程数,默认50

2、nodeManager相关(每个节点单独配置)

参数作用
yarn.nodemanager.resource.detect-hardware-capabilitiesyarn自己检查硬件进行配置,默认false
yarn.nodemanager.resource.count-logical-processors-as-cores是否将虚拟核数当中cpu核数,默认false
yarn.nodemanager.resource.pcores-vcores-multiplier虚拟核数与物理核数的比值,默认1.0
yarn.nodemanager.resource.memory-mbnodeManager使用内存,默认8G
yarn.nodemanager.resource.system-reserved-memore-mbnodeManager为系统保留多少内存
yarn.nodemanager.resource.cpu-vcoresnodeManager使用CPU核数,默认8
yarn.nodemanager.pmem-check-enabled是否开启物理内存检查限制container,默认true
yarn.nodemanager.vmem-check-enabled是否开启虚拟内存检查限制container,默认true
yarn.nodemanager.vmem-pmem-ratio虚拟内存物理内存比例,默认2.1

3、container相关

参数作用
yarn.scheduler.minimum-allocation-mb容器最小内存,默认1G
yarn.scheduler.maximum-allocation-mb容器最大内存,默认8G
yarn.scheduler.minimum-allocation-vcores容器最小cpu核数,默认1个
yarn.scheduler.maximum-allocation-vcores容器最大cpu核数,默认4个

es-vcores-multiplier | 虚拟核数与物理核数的比值,默认1.0 |
| yarn.nodemanager.resource.memory-mb | nodeManager使用内存,默认8G |
| yarn.nodemanager.resource.system-reserved-memore-mb | nodeManager为系统保留多少内存 |
| yarn.nodemanager.resource.cpu-vcores | nodeManager使用CPU核数,默认8 |
| yarn.nodemanager.pmem-check-enabled | 是否开启物理内存检查限制container,默认true |
| yarn.nodemanager.vmem-check-enabled | 是否开启虚拟内存检查限制container,默认true |
| yarn.nodemanager.vmem-pmem-ratio | 虚拟内存物理内存比例,默认2.1 |

3、container相关

参数作用
yarn.scheduler.minimum-allocation-mb容器最小内存,默认1G
yarn.scheduler.maximum-allocation-mb容器最大内存,默认8G
yarn.scheduler.minimum-allocation-vcores容器最小cpu核数,默认1个
yarn.scheduler.maximum-allocation-vcores容器最大cpu核数,默认4个

相关文章:

  • 品牌与产品:消费者决策的经济逻辑与品牌宣传的战略意义
  • Keil MDK 下载安装相对应CPU的Software Packs
  • QT6 学生管理系统以及登录(QSQLITE数据库)
  • “探索AIGC市场:腾讯元宝APP加入竞争,大模型产品的未来走向与个人选择“
  • 在 .NET Core 中构建工作服务和调度运行
  • Java 关于抽象 -- Java 语言的抽象类、接口和函数式接口
  • STM32项目分享:智能蓝牙手环
  • 攻防世界--杂项misc-2017_Dating_in_Singapore
  • 力扣爆刷第149天之TOP100五连刷(LRU、K个一组)
  • 专栏【汇总】
  • Ansible——shell模块
  • 面试题:如何避免索引失效?
  • LCD电子广告牌课程设计
  • R语言绘图 --- 桑基图(Biorplot 开发日志 --- 5)
  • Win10下CodeBlock实现socket TCP server/client
  • Apache的80端口被占用以及访问时报错403
  • ES6--对象的扩展
  • Java教程_软件开发基础
  • java中的hashCode
  • js 实现textarea输入字数提示
  • SpiderData 2019年2月13日 DApp数据排行榜
  • Travix是如何部署应用程序到Kubernetes上的
  • use Google search engine
  • WinRAR存在严重的安全漏洞影响5亿用户
  • 高性能JavaScript阅读简记(三)
  • 正则学习笔记
  • 继 XDL 之后,阿里妈妈开源大规模分布式图表征学习框架 Euler ...
  • 摩拜创始人胡玮炜也彻底离开了,共享单车行业还有未来吗? ...
  • ​​​​​​​STM32通过SPI硬件读写W25Q64
  • ​如何使用QGIS制作三维建筑
  • !!java web学习笔记(一到五)
  • #Js篇:单线程模式同步任务异步任务任务队列事件循环setTimeout() setInterval()
  • (04)odoo视图操作
  • (3)Dubbo启动时qos-server can not bind localhost22222错误解决
  • (Spark3.2.0)Spark SQL 初探: 使用大数据分析2000万KF数据
  • (附源码)springboot建达集团公司平台 毕业设计 141538
  • (转) RFS+AutoItLibrary测试web对话框
  • *ST京蓝入股力合节能 着力绿色智慧城市服务
  • .JPG图片,各种压缩率下的文件尺寸
  • .NET WPF 抖动动画
  • .net通用权限框架B/S (三)--MODEL层(2)
  • .NET学习全景图
  • @EventListener注解使用说明
  • @synthesize和@dynamic分别有什么作用?
  • [AI]文心一言出圈的同时,NLP处理下的ChatGPT-4.5最新资讯
  • [Algorithm][动态规划][路径问题][不同路径][不同路径Ⅱ][珠宝的最高价值]详细讲解
  • [BZOJ] 1001: [BeiJing2006]狼抓兔子
  • [hdu4622 Reincarnation]后缀数组
  • [hive]中的字段的数据类型有哪些
  • [IE技巧] IE8中HTTP连接数目的变化
  • [JAVA] 什么是Java线程同步机制?
  • [Java]深入剖析常见排序
  • [kotlin]处理延时操作
  • [luoguP2401] 不等数列
  • [M二叉树] lc236. 二叉树的最近公共祖先(dfs+二叉搜索树)