hadoop

Hadoop

1、大数据

指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。

大数据主要解决海量数据的采集存储分析计算问题。

2、大数据特点(4V)

  1. Volume(大量)
  2. Velocity(高速)
  3. Variety(多样)
  4. Value(低价值密度)

3、大数据部门组织架构

大数据部门组织架构

Hadoop入门

4、Hadoop是什么

  1. Hadoop 是一个由 Apache 基金会所开发的分布式系统基础架构。
  2. 主要解决海量数据的存储和海量数据的分析计算问题。

5、Hadoop的优势

  1. 高可靠性:hadoop 底层维护多个数据副本,所以即使 Hadoop 某个计算元素或存储出现故障,也不会导致数据的丢失。
  2. 高扩展性:在集群间分配任务数据,可方便的扩展数以千计的节点。
  3. 高效性:在 MapReduce 的思想下,Hadoop 是并行工作的,以加快任务处理速度。
  4. 高容错性:能够自动将失败的任务重新分配。

HDFS 简介

Hadoop Distributed File System 简称 HDFS,是一个分布式文件系统。

  1. NameNode(nn):存储文件的元数据,如文件名,文件目录结构,文件属性(生成时间、副本数、文件权限),以及每个文件的块列表和块所在的 DataNode 等。
  2. DataNode(dn):在本地文件系统存储文件块数据,以及块数据的校验和。
  3. Secondary NameNode(2nn):每隔一段时间对 NameNode 元数据备份。

YARN 简介

yarn

MapReduce 简介

MapReduce 将计算过程分为两个阶段:Map 和 Reduce

  1. Map 阶段并行处理输入数据
  2. Reduce 阶段对 Map 结果进行汇总

image-20220614212644361

三者关系

hdfs-yarn-mapreduce三者关系

大数据技术生态体系

image-20220614213906591

Hadoop集群搭建

6、虚拟机环境准备

IP地址 192.168.174.100、主机名称 hadoop100、内存4G、硬盘50G,linux系统采用centos 7.5。

  1. 按照 epel-release

    1
    yum install -y epel-release
  2. 按照 net-tool 工具包和 vim

    1
    2
    yum install -y net-tools
    yum install -y vim
  3. 关闭防火墙及其开机自启

    1
    2
    systemctl stop firewalld
    systemctl disable firewalld.service
  4. 创建 hqz 用户,并修改 hqz 用户的密码

    1
    2
    useradd hqz
    passwd hqz
  5. 配置 hqz 用户具有 root 权限,方便后期加 sudo 执行 root权限的命令

    1
    2
    3
    vim /etc/sudoers
    # 在 %wheel 这行下面添加一行
    hqz ALL=(ALL) NOPASSWD:ALL
  6. 在/opt目录下创建文件夹,并修改所属主和所属组

    • 在/opt目录下创建module、software文件夹

      1
      2
      mkdir /opt/module
      mkdir /opt/software
    • 修改module、software文件夹的所有者和所属组均为hqz用户

      1
      2
      chown hqz:root /opt/module
      chown hqz:root /opt/software
    • 查看module、software文件夹的所有者和所属组

      1
      2
      cd /opt/
      ll

7、克隆虚拟机

  1. 利用 hadoop100,克隆两台虚拟机:hadoop101、hadoop102

  2. 修改克隆机 IP,以 hadoop100 为例

    • 修改静态ip

      1
      2
      3
      4
      5
      6
      7
      8
      vim /etc/sysconfig/network-scripts/ifcfg-ens33

      BOOTPROTO=static
      NAME="ens33"
      IPADDR=192.168.174.102
      PREFIX=24
      GATEWAY=192.168.174.2
      DNS1=192.168.174.2
    • 查看 Linux 虚拟机的虚拟网络编辑器,编辑 -> 虚拟网络编辑器 -> VMnet8,设置子网 ip 为 192.168.174.0 和网关为 192.168.174.2

    • 查看Windows系统适配器VMware Network Adapter VMnet8 的 IP 地址 192.168.174.15 网关和 DNS 都为 192.168.174.2

  3. 修改主机名

    1
    2
    3
    vim /etc/hostname
    # 添加
    hadoop100
  4. 配置主机名称映射

    1
    2
    3
    4
    5
    vim /etc/hosts
    # 添加
    192.168.174.100 hadoop100
    192.168.174.101 hadoop101
    192.168.174.102 hadoop102
  5. 修改 windows 的 host 文件 C:\Windows\System32\drivers\etc

    1
    2
    3
    192.168.174.100 hadoop100
    192.168.174.101 hadoop101
    192.168.174.102 hadoop102

8、安装 jdk

  1. 将 jdk8 的安装包传入 /opt/software/ (我下载的

  2. 解压 jdk 安装包到 /opt/module/

  3. 配置 jdk 环境变量

    • 新建 /etc/profile.d/my_env.sh 文件

      1
      2
      3
      4
      5
      vim /etc/profile.d/my_env.sh
      # 添加以下内容
      #JAVA_HOME
      export JAVA_HOME=/opt/module/jdk1.8.0_212
      export PATH=$PATH:$JAVA_HOME/bin
    • 刷新环境变量

      1
      source /etc/profile

9、安装 Hadoop

  1. 将在官网上下载的 Hadoop 安装包拷贝到 /opt/software/ 目录

  2. 将 Hadoop 安装包解压到 /opt/module/

  3. 将 Hadoop 添加到环境变量

    • 打开 /etc/profile.d/my_env.sh 添加以下内容

      1
      2
      3
      4
      #HADOOP_HOME
      export HADOOP_HOME=/opt/module/hadoop-3.3.3
      export PATH=$PATH:$HADOOP_HOME/bin
      export PATH=$PATH:$HADOOP_HOME/sbin
    • source /etc/profile 刷新环境变量

10、编写集群分发脚本 xsync

  1. 安装 rsync

    1
    yum install -y rsync
  2. xsync 集群分发脚本

    • 打开 /etc/profile.d/my_env.sh,添加一下内容

      1
      2
      # xsync
      export PATH=$PATH:/home/hqz/bin
    • 在 /home/hqz/bin 目录下创建 xsync 文件,在文件中添加一下内容

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      #!/bin/bash
      #1. 判断参数个数
      if [ $# -lt 1 ]
      then
      echo Not Enough Arguement!
      exit;
      fi
      #2. 遍历集群所有机器,在 hadoop101 上遍历的机器改为 hadoop100 hadoop102
      for host in hadoop101 hadoop102
      do
      echo ==================== $host ====================
      #3. 遍历所有目录,挨个发送
      for file in $@
      do
      #4. 判断文件是否存在
      if [ -e $file ]
      then
      #5. 获取父目录
      pdir=$(cd -P $(dirname $file); pwd)

      #6. 获取当前文件的名称
      fname=$(basename $file)
      ssh $host "mkdir -p $pdir"
      rsync -av $pdir/$fname $host:$pdir
      else
      echo $file does not exists!
      fi
      done
      done

      # 在 vim 模式下执行以下命令
      :set ff
      :set fileformat=unix
      :wq
    • 修改脚本 xsync 具有执行权限

      1
      chmod +x xsync
    • 同步环境变量配置

      1
      xsync /etc/profile.d/my_env.sh

11、SSH 无密码登录配置

  1. 生成公钥和私钥

    1
    ssh-keygen -t rsa

    在 /root/.ssh 目录下可以找到 id_rsa 和 id_rsa.pub 两个文件

  2. 切换到 /root/.ssh 目录,将公钥拷贝到要免密登录的目标机器上

    1
    2
    ssh-copy-id hadoop101
    ssh-copy-id hadoop102

    其它的机器做类似的操作即可

12、集群配置

12.1、集群规划部署

注意:

  • NameNode 和 SecondaryNameNode 不要安装在同一个服务器
  • ResourceManager 也很消耗内存,不要和 NameNode、SecondaryNameNode 配置在同一台机器上
hadoop100 hadoop101 hadoop102
HDFS NameNode
DataNode
DataNode SecondaryNameNode
DataNode
YARN NodeManager ResourceManager
NodeManager
NodeManager

12.2、配置文件说明

Hadoop 配置文件分两类:默认配置文件和自定义配置文件,只有用户想修改某一默认配置值时,才需要修改自定义配置文件,更改相应属性值。自定义配置文件:

core-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml 四个配置文件存放在 $HADOOP_HOME/etc/hadoop 这个路径上,用户可以根据项目需求重新进行修改配置。

12.3、配置集群

  • 核心配置文件 core-site.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    <?xml version="1.0" encoding="UTF-8"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

    <configuration>
    <!-- 指定NameNode的地址 -->
    <property>
    <name>fs.defaultFS</name>
    <value>hdfs://hadoop100:8020</value>
    </property>

    <!-- 指定hadoop数据的存储目录 -->
    <property>
    <name>hadoop.tmp.dir</name>
    <value>/opt/module/hadoop-3.3.3/data</value>
    </property>

    <!-- 配置HDFS网页登录使用的静态用户为root -->
    <property>
    <name>hadoop.http.staticuser.user</name>
    <value>root</value>
    </property>
    </configuration>
  • HDFS 配置文件 hdfs-site.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    <?xml version="1.0" encoding="UTF-8"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

    <configuration>
    <!-- nn web端访问地址-->
    <property>
    <name>dfs.namenode.http-address</name>
    <value>hadoop100:9870</value>
    </property>
    <!-- 2nn web端访问地址-->
    <property>
    <name>dfs.namenode.secondary.http-address</name>
    <value>hadoop102:9868</value>
    </property>
    </configuration>
  • YARN 配置文件 yarn-site.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    <?xml version="1.0" encoding="UTF-8"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

    <configuration>
    <!-- 指定MR走shuffle -->
    <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
    </property>

    <!-- 指定ResourceManager的地址-->
    <property>
    <name>yarn.resourcemanager.hostname</name>
    <value>hadoop101</value>
    </property>

    <!-- 环境变量的继承 -->
    <property>
    <name>yarn.nodemanager.env-whitelist</name>
    <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
    </property>

    <property>
    <!-- 客户端通过该地址向RM提交对应用程序操作 -->
    <name>yarn.resourcemanager.address.rm1</name>
    <value>hadoop100:8032</value>
    </property>
    <property>
    <!--ResourceManager 对Applicationhadoop100暴露的访问地址。Applicationhadoop100通过该地址向RM申请资源、释放资源等。 -->
    <name>yarn.resourcemanager.scheduler.address.rm1</name>
    <value>hadoop100:8030</value>
    </property>
    <property>
    <!-- RM HTTP访问地址,查看集群信息-->
    <name>yarn.resourcemanager.webapp.address.rm1</name>
    <value>hadoop100:8088</value>
    </property>
    <property>
    <!-- NodeManager通过该地址交换信息 -->
    <name>yarn.resourcemanager.resource-tracker.address.rm1</name>
    <value>hadoop100:8031</value>
    </property>
    <property>
    <!--管理员通过该地址向RM发送管理命令 -->
    <name>yarn.resourcemanager.admin.address.rm1</name>
    <value>hadoop100:8033</value>
    </property>
    <property>
    <name>yarn.resourcemanager.ha.admin.address.rm1</name>
    <value>hadoop100:23142</value>
    </property>

    <property>
    <name>yarn.resourcemanager.address.rm2</name>
    <value>hadoop102:8032</value>
    </property>
    <property>
    <name>yarn.resourcemanager.scheduler.address.rm2</name>
    <value>hadoop102:8030</value>
    </property>
    <property>
    <name>yarn.resourcemanager.webapp.address.rm2</name>
    <value>hadoop102:8088</value>
    </property>
    <property>
    <name>yarn.resourcemanager.resource-tracker.address.rm2</name>
    <value>hadoop102:8031</value>
    </property>
    <property>
    <name>yarn.resourcemanager.admin.address.rm2</name>
    <value>hadoop102:8033</value>
    </property>
    <property>
    <name>yarn.resourcemanager.ha.admin.address.rm2</name>
    <value>hadoop102:23142</value>
    </property>
    </configuration>
  • MapReduce配置文件 mapred-site.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    <?xml version="1.0" encoding="UTF-8"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

    <configuration>
    <!-- 指定MapReduce程序运行在Yarn上 -->
    <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
    </property>
    </configuration>
  • 配置 workers

    1
    2
    3
    4
    5
    vim /opt/module/hadoop-3.3.3/etc/hadoop/workers
    # 添加以下内容
    hadoop102
    hadoop103
    hadoop104
  • 在集群上分发配置好的 Hadoop 配置文件

    1
    xsync /opt/module/hadoop-3.3.3/etc/hadoop/
  • 如果是 root 用户配置 hadoop

    • 在 start-dfs.sh,stop-dfs.sh 两个文件顶部添加以下参数

      1
      2
      3
      4
      HDFS_DATANODE_USER=root
      HADOOP_SECURE_DN_USER=hdfs
      HDFS_NAMENODE_USER=root
      HDFS_SECONDARYNAMENODE_USER=root
    • 在 start-yarn.sh,stop-yarn.sh 顶部添加以下参数

      1
      2
      3
      YARN_RESOURCEMANAGER_USER=root
      HADOOP_SECURE_DN_USER=yarn
      YARN_NODEMANAGER_USER=root
    • 执行 xsync ./sbin 同步文件内容到其它机器

12.4、启动集群

  • 如果是第一次启动需要在 hadoop100 格式化 NameNode

    1
    hdfs namenode -format
  • 如果集群在启动过程中报错,在格式化 NameNode 之前需要先停下 NameNode 和 DataNode 进程,并删除所有机器的 data 和 logs 目录,然后再进行格式化

  • 启动 HDFS

    1
    sbin/start-dfs.sh
  • 在配置了 ResourceManager 的节点(hadoop101)启动 YARN

    1
    sbin/start-yarn.sh
  • Web端查看 HDFS 的 NameNode

  • Web端查看 YARN 的 ResourceManager

12.5、集群测试

1
2
3
4
5
6
7
8
9
10
11
12
# 上传文件到集群
# 先创建集群的文件夹
hadoop fs -mkdir /input
# 上传小文件
hadoop fs ./README.txt /input
# 上传大文件
hadoop fs -put /opt/software/jdk-8u202-linux-x64.tar.gz /input
# 可以在 HDFS 的 web 面板中看到数据
# 下载
hadoop fs -get /jdk-8u202-linux-x64.tar.gz ./
# 执行 wordcount 任务
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.3.jar wordcount /input /output

12.6、配置历史服务器

为了查看程序的历史运行情况,需要配置一下历史服务器。

1
vim /etc/hadoop/mapred-site.xml
1
2
3
4
5
6
7
8
9
10
11
<!-- 历史服务器地址 -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>hadoop100:10020</value>
</property>

<!-- 历史服务器web端地址 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hadoop102:19888</value>
</property>
1
2
3
4
# 分发配置
xsync etc/hadoop/mapred-site.xml
# 启动历史服务器
bin/mapred --daemon start historyserver

12.7、配置日志的聚集

应用运行完成之后,将程序运行日志信息上传到 HDFS 系统上。

好处:可以方便的查看到程序运行详情,方便开发调试。

注意:开启日志聚集功能,需要重新启动 NodeManager、ResourceManager 和 HistoryServer

1
vim /etc/hadoop/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
  <!-- 开启日志聚集功能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>

<!-- 设置日志聚集服务器地址 -->
<property>
<name>yarn.log.server.url</name>
<value>http://hadoop100:19888/jobhistory/logs</value>
</property>

<!-- 设置日志保留的时间为7天 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 分发配置
xsync etc/hadoop/yarn-site.xml
# 关闭 NodeManager、ResourceManager、HistoryServer hadoop101
sbin/stop-yarn.sh
# hadoop100
bin/stop-dfs.sh
bin/mapred --daemon stop historyserver

# hadoop100
bin/start-dfs.sh
bin/mapred --daemon start historyserver
#启动 NodeManager、ResourceManager、HistoryServer hadoop101
sbin/start-yarn.sh

# 删除 HDFS 上已经存在的输出文件
hadoop fs -rm -r /optput
# 执行 wordCount 程序
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.3.jar wordcount /input /output

12.8、编写集群启停脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
cd /home/hqz/bin
vim myhadoop.sh
# 脚本内容
#!/bin/bash

if [ $# -lt 1 ]
then
echo "No Args Input..."
exit ;
fi

case $1 in
"start")
echo " =================== 启动 hadoop集群 ==================="

echo " --------------- 启动 hdfs ---------------"
ssh hadoop100 "/opt/module/hadoop-3.3.3/sbin/start-dfs.sh"
echo " --------------- 启动 yarn ---------------"
ssh hadoop101 "/opt/module/hadoop-3.3.3/sbin/start-yarn.sh"
echo " --------------- 启动 historyserver ---------------"
ssh hadoop100 "/opt/module/hadoop-3.3.3/bin/mapred --daemon start historyserver"
;;
"stop")
echo " =================== 关闭 hadoop集群 ==================="

echo " --------------- 关闭 historyserver ---------------"
ssh hadoop100 "/opt/module/hadoop-3.3.3/bin/mapred --daemon stop historyserver"
echo " --------------- 关闭 yarn ---------------"
ssh hadoop101 "/opt/module/hadoop-3.3.3/sbin/stop-yarn.sh"
echo " --------------- 关闭 hdfs ---------------"
ssh hadoop100 "/opt/module/hadoop-3.3.3/sbin/stop-dfs.sh"
;;
*)
echo "Input Args Error..."
;;
esac

# 保存后退出,然后赋予脚本执行权限
chmod +x myhadoop.sh
# 启动、停止
myhadoop stop
myhadoop start

# 同步文件
xsync /home/hqz/bin

12.9、编写查询集群运行情况的脚本

1
2
3
4
5
6
7
8
9
10
11
12
vim jpsall

#!/bin/bash

for host in hadoop100 hadoop101 hadoop102
do
echo =============== $host ===============
ssh $host jps
done

# 保存后退出,然后赋予脚本执行权限
chmod +x jpsall

12.10、常用端口号

端口名称 hadoop2.x hadoop3.x
NameNode 内部通信端口 8020/9000 8020/9000/9820
NameNode HTTP UI 50070 9870
MapReduce 查看执行任务端口 8088 8088
历史服务器通信端口 19888 19888

12.11、常用配置文件

版本 配置文件
2.x hdfs-site.xml yarn-site.xml mapred-site.xml workers
3.x hdfs-site.xml yarn-site.xml mapred-site.xml slaves

12.12、集群时间同步

如果服务器在公网环境(能连接外网),可以不采用集群时间同步。

如果服务器在内网,需要配置集群时间同步,否则可能会出现时间偏差,导致集群执行任务时间不同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 查看所有节点 ntpd 服务状态和开机自启动状态
systemctl status ntpd
# 如果服务不存在,则安装
yum install ntp
systemctl start ntpd
# 设为开机自启
systemctl enable ntpd

# hadoop100 修改配置文件
vim /etc/ntp.conf

# Hosts on local network are less restricted. 解开17行的注释 192.168.174.0 为我的网段
restrict 192.168.174.0 mask 255.255.255.0 nomodify notrap

# 第21行,将这四行注释掉,不向互联网获取时间
# server 0.centos.pool.ntp.org iburst
# server 1.centos.pool.ntp.org iburst
# server 2.centos.pool.ntp.org iburst
# server 3.centos.pool.ntp.org iburst

# 末尾添加注释,当节点丢失网络时,可以采用本地时间作为时间服务器,为集群中的其它节点提供时间同步
server 127.127.1.0
fudge 127.127.1.0 stratum 10

# 保存后退出
vim /etc/sysconfig/ntpd
# 增加一行,让硬件时间与系统时间一起同步
SYNC_HWCLOCK=yes

# 在 hadoop101 hadoop102 创建定时任务,一分钟同步一次时间
crontab -e
*/1 * * * * /usr/sbin/ntpdate hadoop100
# 保存后退出

HDFS

13、概述

13.1、HDFS 的产生背景和定义

为了解决海量数据的存储问题。

HDFS 是一个分布式文件系统,用于存储文件,通过目录树来定位文件。

适合一次写入,多次读出的场景。

13.2、优缺点

优点

  1. 高容错性
    • 数据自动保存多个副本,通过增加副本的形式,提高容错性。
    • 某一个副本丢失以后,可以自动恢复。
  2. 适合处理大数据
    • 数据规模:能处理的数据规模达到 GB、TB、甚至 PB 级别的数据。
    • 文件规模:能处理百万规模以上的文件数量。
  3. 可构建在廉价的机器上,通过多副本机制,提高可靠性。

缺点

  1. 不适合低延时数据访问。
  2. 无法高效的对大量小文件进行存储
    • 存储大量小文件的话,会占用 NameNode 大量的内存来存储文件目录和块信息,这样是不可取的,因为 NameNode 的内存总是有限的。
    • 小文件存储的寻址时间会超过读取时间,违反了 HDFS 的设计目标。
  3. 不支持并发写入和文件随机修改
    • 一个文件只能有一个写,不允许多个线程同时写。
    • 仅支持数据 append(追加),不支持文件的随机修改。

13.3、组成

hdfs

  1. NameNode(nn):
    • 管理 HDFS 的名称空间
    • 配置副本策略
    • 管理数据块(Block)映射信息
    • 处理客户端读写请求
  2. DataNode:NameNode 下达命令,DataNode 执行实际的操作
    • 存储实际的数据块
    • 执行数据块的读/写操作
  3. Client:
    • 文件切分,文件上传 HDFS 的时候,Client 将文件切分成一个一个的 Block,然后进行上传
    • 与 NameNode 交互,获取文件的位置信息
    • 与 DataNode 交互,读取或者写入数据
    • Client 提供一些命令来管理 HDFS,比如 NameNode 格式化
    • Clent 可以通过一些命令来访问 HDFS,比如对 HDFS 增删改查操作
  4. Secondary NameNode:并非 NameNode 的热备,当 NameNode 挂掉的时候,它并不是能马上替换 NameNode 并提供服务
    • 辅助 NameNode,分担其工作量,比如定期合并 Fsimage 和 Edits,并推送给 NameNode
    • 在紧急情况下,可辅助恢复 NameNode

13.4、文件块的大小

HDFS 中的文件在物理上是分块存储(Block),块的大小可以用过配置参数(dfs.blocksize)来规定默认大小在 Hadoop2.x/3.x 版本中是 128M,1.x 版本中是 64 M。

  1. 如果寻址时间为 10ms,即 查找到目标 block 的时间为 10ms
  2. 寻址时间为传输时间的 1% 时,为最佳状态,因此传输时间 = 10ms/0.01 = 1000mx = 1s
  3. 目前磁盘的传输速率普遍为 100MB/s

为什么块的大小不能设置太小,也不能设置太大?

  1. HDFS 的块设置太小,会增加寻址时间,程序一直在找块的开始位置
  2. 如果块设置的太大,从磁盘传输数据的时间会明显大于定位这个块开始位置所需的时间,导致程序在处理这块数据时,会非常慢。

总结:HDFS 块的大小设置主要取决于磁盘传输速率

14、HDFS 的 Shell 相关操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 输出这个命令的参数
hadoop fs -help rm
# 创建文件夹
hadoop fs -mkdir /sanguo

# 上传
# 从本地剪切粘贴到 HDFS
hadoop fs -moveFromLocal ./guanyu.txt /sanguo
# 从本地文件系统中拷贝文件到 HDFS 路径中
hadoop fs -copyFromLocal ./zhaoyun.txt /sanguo
# -put 等同于 copyFromLocal
hadoop fs -put ./caocao.tx /sanguo
# 追加一个文件到已经存在的文件末尾
hadoop fs -appendToFile ./caopei.txt /sanguo/caocao.txt

# 下载
# 从 HDFS 拷贝到本地
hadoop fs -copyToLocal /sanguo/caocao.txt ./
# -get 等同于 -copyToLocal
hadoop fs -get /sanguo/caocao.txt ./

# HDFS 直接操作
# -ls: 显示目录信息
hadoop fs -ls /sanguo
# -cat:显示文件内容
hadoop fs -cat /sanguo/caocao.txt
# -chgrp、-chmod、-chown:Linux文件系统中的用法一样,修改文件所属权限
hadoop fs -chmod 666 /sanguo/caocao.txt
hadoop fs -chown root:root /sanguo/caocao.txt
# -mkdir:创建路径
hadoop fs -mkdir /shuguo
# -cp:从HDFS的一个路径拷贝到HDFS的另一个路径
hadoop fs -cp /sanguo/guanyu.txt /shuguo
# -mv:在HDFS目录中移动文件
hadoop fs -mv /sanguo/zhaoyun.txt /shuguo
# -tail:显示一个文件的末尾1kb的数据
hadoop fs -tail /shuguo/guanyu.txt
# -rm:删除文件或文件夹
hadoop fs -rm /sanguo/guanyu.txt
# -rm -r:递归删除目录及目录里面内容
hadoop fs -rm -r /sanguo
# -du统计文件夹的大小信息
hadoop fs -du -s -h /shuguo
hadoop fs -du -h /shuguo

# -setrep:设置 HDFS 中文件的副本数量
hadoop fs -setrep 10 /shuguo/guanyu.txt
# 设置的副本数只是记录在 NameNode 的元数据中,是否真的会有这么多副本,还得看 DataNode 的数量。只有节点数增加到 10 台时,副本数才能达到 10

15、HDFS 的读写流程

15.1、HDFS 的写数据流程

HDFS 写流程

  1. 客户端通过 Distributed FileSystem 模块向 NameNode 请求上传文件,NameNode 检查目标文件是否已存在,父目录是否存在。

  2. NameNode 返回是否可以上传。

  3. 客户端请求第一个 Block 上传到哪几个 DataNode 服务器上。

  4. NameNode 返回3个 DataNode 节点,分别为 dn1、dn2、dn3。

  5. 客户端通过 FSDataOutputStream 模块请求 dn1 上传数据,dn1 收到请求会继续调用 dn2,然后 dn2 调用 dn3,将这个通信管道建立完成。

  6. dn1、dn2、dn3 逐级应答客户端。

  7. 客户端开始往 dn1 上传第一个 Block(先从磁盘读取数据放到一个本地内存缓存),以 Packet 为单位,dn1 收到一个 Packet 就会传给 dn2,dn2 传给 dn3;dn1 每传一个 packet 会放入一个应答队列等待应答

  8. 当一个 Block 传输完成之后,客户端再次请求 NameNode 上传第二个 Block 的服务器。(重复执行3-7步)。

15.2、网络拓扑-节点距离计算

在 HDFS 写数据的过程中,NameNode 会选择距离待上传数据最近距离的 DataNode 接收数据。

节点距离:两个节点到达最近的共同祖先的距离总和

15.3、机架感知

  1. 第一个副本在 Client 所处的节点上,如果客户端在集群外,随机选一个。
  2. 第二个副本在另一个机架的随机一个节点。
  3. 第三个副本在第二个副本所在机架的随机节点。

15.4、HDFS 读数据流程

HDFS 读数据流程

  1. 客户端通过 DistributedFileSystem 向 NameNode 请求下载文件,NameNode 通过查询元数据,找到文件块所在的 DataNode 地址。
  2. 挑选一台 DataNode(就近原则,然后随机)服务器,请求读取数据。
  3. DataNode 开始传输数据给客户端(从磁盘里面读取数据输入流,以 Packet 为单位来做校验)。
  4. 客户端以 Packet 为单位接收,先在本地缓存,然后写入目标文件。

16、NN 和 2NN

FsImage:磁盘中备份元数据的文件。

Edits:每当元数据有更新或者添加元数据时,修改内存中的元数据并追加到 Edits 中,只进行追加操作。

SecondaryNameNode:用于 FsImage 和 Edits 的合并。

NN 和 2NN 工作机制

  1. 第一阶段 NameNode 启动

    • 第一次启动 NameNode 格式化后,创建 Fsimage 和 Edits 文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
    • 客户端对元数据进行增删改的请求。
    • NameNode 记录操作日志,更新滚动日志。
    • NameNode 在内存中对元数据进行增删改。
  2. 第二阶段:Secondary NameNode工作

    • Secondary NameNode 询问 NameNode 是否需要 CheckPoint。直接带回 NameNode 是否检查结果。
    • Secondary NameNode 请求执行 CheckPoint。
    • NameNode 滚动正在写的 Edits 日志。
    • 将滚动前的编辑日志和镜像文件拷贝到 Secondary NameNode。
    • Secondary NameNode 加载编辑日志和镜像文件到内存,并合并。
    • 生成新的镜像文件 fsimage.chkpoint。
    • 拷贝 fsimage.chkpoint 到 NameNode。
    • NameNode 将 fsimage.chkpoint 重新命名成 fsimage。

16.1、FsImage 和 Edits

  1. FsImage文件:HDFS 文件系统元数据的一个永久性的检查点,其中包含 HDFS 文件系统的所有目录和文件 inode 的序列化信息
  2. Edits 文件:存放 HDFS 文件系统的所有更新操作的路径,文件系统客户端执行的所有写操作首先会被记录到 Edits 文件中
  3. seen_txid 文件保存的是一个数字,就是最后一个 edits_ 的数字
  4. 每次 NameNode 启动的时候,都会将 FsImage 文件读入内存,加载 Edits 里面的更新操作,保证内存中的元数据是最新的、同步的,可以看成 NameNode 启动的时候就将 FsImage 和 Edits 文件进行了合并

16.2、CheckPoint 时间设置

  1. 通常情况下,SecondaryNameNode 每隔一小时执行一次(hdfs-default.xml)

    1
    2
    3
    4
    <property>
    <name>dfs.namenode.checkpoint.period</name>
    <value>3600s</value>
    </property>
  2. 一分钟检查一次操作次数,当操作次数达到一百万时,SecondaryNameNode 执行一次。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    <property>
    <name>dfs.namenode.checkpoint.txns</name>
    <value>1000000</value>
    <description>操作动作次数</description>
    </property>

    <property>
    <name>dfs.namenode.checkpoint.check.period</name>
    <value>60s</value>
    <description> 1分钟检查一次操作次数</description>
    </property>

17、DataNode 工作机制

DataNode工作机制

  1. 一个数据块在 DataNode 上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。

  2. DataNode 启动后向 NameNode 注册,通过后,周期性(6小时)的向 NameNode 上报所有的块信息。

  3. DN 向 NN 汇报当前解读信息的时间间隔,默认6小时;

    1
    2
    3
    4
    5
    <property>
    <name>dfs.blockreport.intervalMsec</name>
    <value>21600000</value>
    <description>Determines block reporting interval in milliseconds.</description>
    </property>
  4. DN 扫描自己节点块信息列表的时间,默认6小时

    1
    2
    3
    4
    5
    6
    <property>
    <name>dfs.datanode.directoryscan.interval</name>
    <value>21600s</value>
    <description>Interval in seconds for Datanode to scan data directories and reconcile the difference between blocks in memory and on the disk.Support multiple time unit suffix(case insensitive), as describedin dfs.heartbeat.interval.
    </description>
    </property>
  5. 心跳是每3秒一次,心跳返回结果带有 NameNode 给该 DataNode 的命令如复制块数据到另一台机器,或删除某个数据块。如果超过10分钟没有收到某个 DataNode 的心跳,则认为该节点不可用。

  6. 集群运行中可以安全加入和退出一些机器。

17.1、数据完整性

数据完整性

  1. 当 DataNode 读取 Block 的时候,它会计算 CheckSum。
  2. 如果计算后的 CheckSum,与 Block 创建时值不一样,说明 Block 已经损坏。
  3. Client 读取其他 DataNode 上的 Block。
  4. 常见的校验算法crc(32),md5(128),sha1(160)
  5. DataNode 在其文件创建后周期验证 CheckSum。

17.2、掉线时限参数限制

掉线时限参数限制

hdfs-site.xml 配置文件中的 heartbeat.recheck.interval 的单位为毫秒,dfs.heartbeat.interval 的单位为秒。

1
2
3
4
5
6
7
8
<property>
<name>dfs.namenode.heartbeat.recheck-interval</name>
<value>300000</value>
</property>
<property>
<name>dfs.heartbeat.interval</name>
<value>3</value>
</property>

MapReduce

18、概述

MapReduce 是一个分布式运算程序的编程框架,是用户开发”基于 Hadoop 的数据分析应用“的核心框架,其核心功能是将用户编写的业务逻辑代码自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上。

优点

  1. 易于开发
  2. 良好的扩展性,可以通过简单的增加机器来扩展它的计算能力
  3. 高容错性,其中一个机器挂了,可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败
  4. 适合 TB/PB 级的海量数据处理

缺点

  1. 不擅长实时计算
  2. 不擅长流式计算,流式计算输入的数据是动态的,MapReduce 的输入数据集是静态的
  3. 不擅长 DAG(有向无环图)计算,DAG 指多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出

MapReduce 核心思想

MapReduce 核心思想

  1. 分布式的运算程序往往需要分成至少2个阶段。

  2. 第一个阶段的 MapTask 并发实例,完全并行运行,互不相干。

  3. 第二个阶段的 ReduceTask 并发实例互不相干,但是他们的数据依赖于上一个阶段的所有 MapTask 并发实例的输出。

  4. MapReduce 编程模型只能包含一个 Map 阶段和一个 Reduce 阶段,如果用户的业务逻辑非常复杂,那就只能多个 MapReduce 程序,串行运行。

MapReduce 实例进程

一个完整的 MapReduce 程序在分布式运行时有三类实例进程:

  1. MrAppMaster:负责整个程序的过程调度及状态协调
  2. MapTask:负责 Map 阶段的整个数据处理流程
  3. ReduceTask:负责 Reduce 阶段的整个数据处理流程

常用数据序列化类型

Java类型 Hadoop Writable类型
Boolean BooleanWritable
Byte ByteWritable
Int IntWritable
Float FloatWritable
Long LongWritable
Double DoubleWritable
String Text
Map MapWritable
Array ArrayWritable
Null NullWritable

MapReduce 编程规范

  1. Mapper 阶段

    • 用户自定义的 Mapper 要继承自己的父类
    • Mapper 的输入数据是 KV 对的形式(KV类型可自定义)
    • Mapper 中的业务逻辑写在 map() 方法中
    • Mapper 的输出数据是 KV 对的形式(KV类型可自定义)
    • map() 方法(MapTask 进程)对每一个 <K,V> 调用一次
  2. Reducer 阶段

    • 用户自定义的 Reducer 要继承自己的父类
    • Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是 KV
    • Reducer 的业务逻辑写在 reduce() 方法中
    • ReduceTask 进程对每一组相同 K 的 <K,V> 组调用一次 reduce() 方法
  3. Driver 阶段

    相当于 YARN 集群的客户端,用于提交整个程序到 YARN 集群,提交的是封装了 MapReduce 程序相关运行参数的 job 对象

19、序列化

Hadoop 序列化特点:

  1. 紧凑:高效使用存储空间
  2. 快速:读写数据的额外开销小
  3. 互操作:支持多语言的交互

20、核心框架原理

框架原理

20.1、输入的数据 InputFormat

20.1.1、切片与 MapTask 并行度决定机制

  • 数据块:Block 是 HDFS 物理上把数据分成一块一块,数据块是 HDFS 存储数据的单位
  • 数据切片:在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是 MapReduce 程序计算输入数据的单位,一个切片会对应启动一个 MapTask

MapTask 并行度决定机制

20.1.2、FileInputFormat

切片机制:

  1. 简单地按照文件的内容长度进行切片
  2. 切片大小,默认等于 Block 大小
  3. 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

20.2、FileInputFormat 实现类

20.2.0.1、TextInputFormat

是默认的 FileInputFormat 实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量, LongWritable 类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text 类型。

20.2.0.2、CombineTextInputFormat

CombineTextInputFormat 用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask 处理。

20.3、MapReduce 工作流程

20.3.1、MapTask 工作机制

MapTask 工作机制

  1. Read 阶段:MapTask 通过 InputFormat 获得的 RecordReader,从输入 InputSplit 中解析出一个个 key/value。

  2. Map 阶段:该节点主要是将解析出的 key/value 交给用户编写 map() 函数处理,并产生一系列新的 key/value。

  3. Collect 收集阶段:在用户编写 map() 函数中,当数据处理完成后,一般会调用 OutputCollector.collect() 输出结果。在该函数内部,它会将生成的 key/value 分区(调用 Partitioner),并写入一个环形内存缓冲区中。

  4. Spill 阶段:即“溢写”,当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

    溢写阶段详情:

    • 步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号 Partition 进行排序,然后按照 key 进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序
    • 步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件 output/spillN.out(N表示当前溢写次数)中。如果用户设置了 Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
    • 步骤3:将分区数据的元信息写到内存索引数据结构 SpillRecord 中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件 output/spillN.out.index 中。
  5. Merge 阶段:当所有数据处理完成后,MapTask 对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

    当所有数据处理完后,MapTask 会将所有临时文件合并成一个大文件,并保存到文件 output/file.out 中,同时生成相应的索引文件output/file.out.index。

    在进行文件合并过程中,MapTask 以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并mapreduce.task.io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

    让每个 MapTask 最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

20.3.2、ReduceTask 工作机制

ReduceTask 工作机制

  1. Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

  2. Sort 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。按照 MapReduce 语义,用户编写 reduce() 函数输入数据是按 key 进行聚集的一组数据。为了将 key 相同的数据聚在一起,Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此,ReduceTask 只需对所有数据进行一次归并排序即可。

  3. Reduce 阶段:reduce() 函数将计算结果写到 HDFS 上。

20.4、Shuffle

处于 Map 方法之后,Reduce 方法之前的数据处理过程称之为 Shuffle。

shuffle 流程

  1. MapTask 收集我们的 map() 方法输出的 kv 对,放到内存缓冲区中

  2. 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件

  3. 多个溢出文件会被合并成大的溢出文件

  4. 在溢出过程及合并的过程中,都要调用 Partitioner 进行分区和针对 key 进行排序

  5. ReduceTask 根据自己的分区号,去各个 MapTask 机器上取相应的结果分区数据

  6. ReduceTask 会抓取到同一个分区的来自不同 MapTask 的结果文件,ReduceTask 会将这些文件再进行合并(归并排序)

  7. 合并成大文件后,Shuffle 的过程也就结束了,后面进入 ReduceTask 的逻辑运算过程(从文件中取出一个一个的键值对 Group,调用用户自定义的 reduce() 方法)

  8. Shuffle 中的缓冲区大小会影响到 MapReduce 程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。

  9. 缓冲区的大小可以通过参数调整,参数:mapreduce.task.io.sort.mb 默认100M。

20.5、Partition 分区

将统计结果按照条件输出到不同文件中

默认分区是根据 key 的 hashCode 对 ReduceTasks 个数取模得到的,用户没法控制哪个 key 存储到哪个分区。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 自定义 Partitioner
// 重写 getPartition() 方法
@Override
Public int getPartition(Text key,FlowBean value,int numPartitions){
// 控制分区代码逻辑
...
return partition;
}

// 在 job 驱动中,设置自定义的 Partitioner
job.setPartitionerClass(CustomPartitioner.class);
// 自定义 Partition 后,要根据自定义 Partitioner 的逻辑设置相应数量的 ReduceTask
job.setNumReduceTasks(5);// 要大于1

分区总结

  1. 如果 ReduceTask 的数量大于 getPartition 的结果数,则会多产生几个空的输出文件 part-r-000xxx
  2. 如果 ReduceTask 的数量大于 1 且 小于 getPartition 的结果数,则有一部分分区数据无法安放,爆 IO/Exception
  3. 如果 ReduceTask 的数量 = 1,则不管 MapTask 端输出多少个分区文件,最终结果都交给一个 ReduceTask,最终也就只会产生一个结果文件 part-r-0000
  4. 分区号必须从零开始,逐一累加

20.6、排序

MapTask 和 ReduceTask 均会对数据按照 key 进行排序,该操作属于 Hadoop 的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要

默认排序是按照字典顺序排序,且实现该排序的方法是快速排序

对于 MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓中区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序
对于 ReduceTask,它从每个 MapTask 上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写在磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTaska统一对内存和磁盘上的所有数据进行一次归并排序

排序分类

  1. 部分排序

    MapReduce 根据输入记录的键对数据集排序,保证输出的每个文件内部有序

  2. 全排序

    最终输出结果只有一个文件,且文件内部有序,实现方式是只设置一个 ReduceTask,但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了 MapReduce 所提供的并行架构。

  3. 辅助排序(GroupingComparator 分组)

    在 Reduce 端对 key 进行分组,应用于:在接收的 key 为 bean 对象时,想让一个或几个字段相同(全部字段比较不相同)的 key 进入到同一个 Reduce 方法时,可以采用分组排序。

  4. 二次排序

    在自定义排序过程中,如果 compareTo 中的判断条件为两个即为二次排序。

自定义排序 WritableComparable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// bean 对象作为 key 传输,需要实现 WritableComparable 接口重写 compareTo 方法
public class FlowBean implements WritableComparable<FlowBean> {

private long upFlow; //上行流量
private long downFlow; //下行流量
private long sumFlow; //总流量

@Override
public int compareTo(FlowBean bean) {
int result;
// 按照总流量大小,倒序排列
if (this.sumFlow > bean.getSumFlow()) {
result = -1;
}else if (this.sumFlow < bean.getSumFlow()) {
result = 1;
}else {
result = 0;
}
return result;
}

//实现序列化和反序列化方法,注意顺序一定要一致
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(this.upFlow);
out.writeLong(this.downFlow);
out.writeLong(this.sumFlow);

}

@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
}

20.7、Combiner 合并

  1. Combiner 是 MR 程序中,Mapper 和 Reducer 之外的一种组件
  2. Combiner 组件的父类就是 Reducer
  3. Combiner 和 Reducer 的区别在于运行的位置
    • Combiner 是在每一个 MapTask 所在的节点运行
    • Reducer 是接收全局所有 Mapper 的输出结果
  4. Combiner 的意义就是对每一个 MapTask 的输出进行局部汇总,以减小网络传输量
  5. Combiner 能够应用的前提是不能影响最终的业务逻辑,而且 Combiner 的输出 kv 应该跟 Reducer 的输入 kv 类型要对应起来

自定义 Combiner

1
2
3
4
5
6
7
8
9
10
11
//自定义一个Combiner继承Reducer,重写Reduce方法
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {

private IntWritable outV = new IntWritable();

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
...
}
// 在Job驱动类中设置
job.setCombinerClass(WordCountCombiner.class);

20.8、输出数据 OutputFormat

OutputFormat 是 MapReduce 输出的基类,所有实现 MapReduce 输出都实现了 OutputFormat 接口。

自定义 OutputFormat

  1. 自定义一个类继承 FileOutputFormat
  2. 改写 RecordWriter,具体改写输出数据的 write 方法

20.9、ReduceTask 并行度决定机制

ReduceTask 的并行度同样影响整个 Job 的执行并发度和执行效率,但与 MapTask 的并发数由切片数决定不同,ReduceTask 数量的决定是可以直接手动设置:

1
2
// 默认值是1,手动设置为4
job.setNumReduceTasks(4);

注意事项

  1. ReduceTask = 0,表示没有 Reduce 阶段,输出文件个数和 Map 个数一致
  2. ReduceTask 默认值就是 1,所以输出文件个数为一个
  3. 如果数据分布不均匀,就有可能在 Reduce 阶段产生数据倾斜
  4. ReduceTask 数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有一个 ReduceTask
  5. 具体多少个 ReduceTask 需要根据集群性能而定
  6. 如果分区数不是 1,但是 ReduceTask 为 1,不会执行分区过程

20.10、Join

Map 端的主要工作:为来自不同表或文件的 key/value 对,打标签以区别不同来源的记录。然后用连接字段作为 key,其余部分和新加的标志作为 value,最后进行输出。

Reduce 端的主要工作:在 Reduce 端以连接字段作为 key 的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在 Map 阶段已经打标志)分开,最后进行合并。

20.10.1、Reduce Join

缺点:合并的操作是在 Reduce 阶段完成,Reduce 端的处理压力太大,Map 节点的运算负载则很低,资源利用率不高,且在 Reduce 阶段极易产生数据倾斜。

20.10.2、Map Join

适用于一张表十分小、一张表很大的场景。

在 Map 端缓存多张表,提前处理业务逻辑,这样增加 Map 端业务,减少 Reduce 端数据的压力,尽可能的减少数据倾斜。

DistributedCache

  1. 在 Mapper 的 setup 阶段,将文件读取到缓存集合中

  2. 在 Driver 驱动类中加载缓存

    1
    2
    3
    4
    //缓存普通文件到Task运行节点。
    job.addCacheFile(new URI("file://E:/desktop/cache/pd.txt"));
    //如果是集群运行,需要设置HDFS路径
    job.addCacheFile(new URI("hdfs://hadoop100:8020/cache/pd.txt"));

20.11、ETL 数据清洗

ETL(Extract-Transform-Load)用来描述将数据从来源端经过抽取(Extract)、转换(Transform)、加载(Load)至目的端的过程。

在运行核心业务 MapReduce 程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行 Mapper 程序,不需要运行 Reduce 程序。

21、压缩

压缩的优缺点

优点:减少磁盘 IO、减少磁盘存储空间

缺点:增加 cpu 开销

压缩使用原则

  1. 运算密集型的 Job,少用压缩
  2. IO 密集型的 Job,多用压缩

21.1、压缩算法

压缩格式 Hadoop自带? 算法 文件扩展名 是否可切片 换成压缩格式后,原来的程序是否需要修改 优缺点
DEFLATE 是,直接使用 DEFLATE .deflate 和文本处理一样,不需要修改
Gzip 是,直接使用 DEFLATE .gz 和文本处理一样,不需要修改 压缩率比较高;不支持split,压缩/解压速度一般
bzip2 是,直接使用 bzip2 .bz2 和文本处理一样,不需要修改 压缩率高,支持split;压缩/解压速度慢
LZO 否,需要安装 LZO .lzo 需要建索引,还需要指定输入格式 压缩/解压速度比较快,支持split;压缩率一般,想支持切片需要额外创建索引
Snappy 是,直接使用 Snappy .snappy 和文本处理一样,不需要修改 压缩/解压速度快;不支持split,压缩率一般

21.2、性能比较

压缩算法 原始文件大小 压缩文件大小 压缩速度 解压速度
gzip 8.3GB 1.8GB 17.5MB/s 58MB/s
bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s
LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s

21.3、压缩方式的选择

选择压缩方式时应考虑:压缩/解压缩速度、压缩率(压缩后存储大小)、压缩后是否可以支持切片

压缩方式的选择

21.4、压缩参数配置

编码器&解码器

压缩格式 对应的编码/解码器
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
Snappy org.apache.hadoop.io.compress.SnappyCodec

压缩参数配置

参数 默认值 阶段 建议
io.compression.codecs
(在core-site.xml中配置)
无,这个需要在命令行输入
hadoop checknative查看
输入压缩 Hadoop使用文件扩展名判断是否支持某种编解码器
mapreduce.map.output.compress
(在mapred-site.xml中配置)
false mapper输出 这个参数设为true启用压缩
mapreduce.map.output.compress.codec
(在mapred-site.xml中配置)
org.apache.hadoop.io
.compress.DefaultCodec
mapper输出 企业多使用LZO或Snappy编解码器在此阶段压缩数据
mapreduce.output.fileoutputformat.compress
(在mapred-site.xml中配置)
false reducer输出 这个参数设为true启用压缩
mapreduce.output.fileoutputformat.compress.codec
(在mapred-site.xml中配置)
org.apache.hadoop.io
.compress.DefaultCodec
reducer输出 使用标准工具或者编解码器,如gzip和bzip2

22、MapReduce开发总结

  1. 输入数据接口:InputFormat
    • 默认使用的实现类是:TextInputFormat
    • TextInputFormat 的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为 key,行内容作为 value 返回。
    • CombineTextInputFormat 可以把多个小文件合并成一个切片处理,提高处理效率。
  2. 逻辑处理接口:Mapper
    • 用户根据业务需求实现其中三个方法:setup() map() cleanup()
  3. 分区:Partitioner
    • 有默认实现 HashPartitioner,逻辑是根据 key 的哈希值和 numReduces 来返回一个分区号;key.hashCode() & Integer.MAXVALUE % numReduces
    • 如果业务上有特别的需求,可以自定义分区
  4. 排序:Comparable
    • 当我们用自定义的对象作为 key 来输出时,就必须要实现 WritableComparable 接口,重写其中的 compareTo() 方法。
    • 部分排序:对最终输出的每一个文件进行内部排序
    • 全排序:对所有数据进行排序,通常只有一个 Reduce
    • 二次排序:排序的条件有两个
  5. 合并:Combiner
    • Combiner 合并可以提高程序执行效率,减少IO传输。但是使用时必须不能影响原有的业务处理结果。
  6. 逻辑处理接口:Reducer
    • 用户根据业务需求实现其中三个方法:setup() reduce() cleanup ()
  7. 输出数据接口:OutputFormat
    • 默认实现类是 TextOutputFormat,功能逻辑是:将每一个 KV 对,向目标文本文件输出一行。
    • 用户还可以自定义 OutputFormat。

Yarn

Yarn 是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而 MapReduce 等运算程序则相当于运行于操作系统之上的应用程序

23、Yarn 基础架构

Yarn 基础架构

24、Yarn 工作机制

Yarn 工作机制

  1. MR 程序提交到客户端所在的节点
  2. YarnRunner 向 ResourceManager 申请一个 Application
  3. RM 将该应用程序的资源路径返回给 YarnRunner
  4. 该程序将运行所需资源提交到 HDFS 上
  5. 程序资源提交完毕后,申请运行 mrAppMaster
  6. RM 将用户的请求初始化成一个 Task
  7. 其中一个 NodeManager 领取到Task任务
  8. 该 NodeManager 创建容器 Container,并产生 MRAppmaster
  9. Container 从 HDFS 上拷贝资源到本地
  10. MRAppmaster 向 RM 申请运行 MapTask 资源
  11. RM 将运行 MapTask 任务分配给另外两个 NodeManager,另两个 NodeManager 分别领取任务并创建容器
  12. MR 向两个接收到任务的 NodeManager 发送程序启动脚本,这两个 NodeManager 分别启动 MapTask,MapTask 对数据分区排序
  13. MrAppMaster 等待所有 MapTask 运行完毕后,向RM申请容器,运行 ReduceTask
  14. ReduceTask 向 MapTask 获取相应分区的数据
  15. 程序运行完毕后,MR 会向 RM 申请注销自己

25、Yarn调度器和调度算法

Hadoop 作业调度器主要有三种:FIFO 调度器(First In Firest Out)、容量调度器(Capacity Scheduler)、公平调度器(Fair Scheduler),Apache Hadoop 3.x 默认的资源调度器是 Capacity Scheduler

25.1、先进先出调度器(FIFO)

根据提交任务的先后顺序,先来先服务。

FIFO 调度器

25.2、容量调度器(Capacity Scheduler)

Capacity Scheduler 是 Yahoo 开发的多用户调度器

容量调度器特点

容量调度器资源分配算法

25.3、公平调度器(Fair Scheduler)

公平调度器的特点

缺额

  • 公平调度器设计目标是:在时间尺度上,所有作业获得公平的资源,某一时刻一个作业应获资源和实际获取资源的差额称之为”缺额“
  • 调度器会优先为缺额大的作业分配资源

公平调度器队列资源分配方式

26、Yarn 常用命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 列出所有的 Application
yarn application -list
# Application 状态查询过滤
yarn application -list -appStates ALL/NEW/NEW_SAVING/SUBMITTED/ACCEPTED/RUNNING/FINISHED/FAILED/KILLED
# kill Application
yarn application -kill <ApplicationId>

# 日志查看
# 查询 Application 日志
yarn logs -applicationId <ApplicationId>
# 查看 Container 日志
yarn logs -applicationId <ApplicationId> -containerId <ContainerId>

# 查看尝试运行的任务
# 列出所有 Application 尝试的列表
yarn applicationattempt -list <ApplicationId>
# 打印 ApplicationAttemp 状态
yarn applicationattempt -status <ApplicationAttemptId>

# 查看容器
# 列出所有 Container
yarn container -list <ApplicationAttemptId>
# 打印 Container 状态,只有在任务运行的途中才能看到 container 的状态
yarn container -status <ContainerId>

# 查看节点状态
# 列出所有节点
yarn node -list -all

# 更新配置
yarn rmadmin -refreshQueues

# 查看队列
yarn queue -status <QueueName>

27、Yarn 生产环境核心参数

Yarn 生产环境核心参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
<!-- yarn-site.xml 配置案例,若是不同节点的 cpu、内存不一样,要单独配置,不能集群分发 -->
<!-- 选择调度器,默认容量 -->
<property>
<description>The class to use as the resource scheduler.</description>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>

<!-- ResourceManager处理调度器请求的线程数量,默认50;如果提交的任务数大于50,可以增加该值,但是不能超过3台 * 4线程 = 12线程(去除其他应用程序实际不能超过8) -->
<property>
<description>Number of threads to handle scheduler interface.</description>
<name>yarn.resourcemanager.scheduler.client.thread-count</name>
<value>8</value>
</property>

<!-- 是否让yarn自动检测硬件进行配置,默认是false,如果该节点有很多其他应用程序,建议手动配置。如果该节点没有其他应用程序,可以采用自动 -->
<property>
<description>Enable auto-detection of node capabilities such as
memory and CPU.
</description>
<name>yarn.nodemanager.resource.detect-hardware-capabilities</name>
<value>false</value>
</property>

<!-- 是否将虚拟核数当作CPU核数,默认是false,采用物理CPU核数 -->
<property>
<description>Flag to determine if logical processors(such as
hyperthreads) should be counted as cores. Only applicable on Linux
when yarn.nodemanager.resource.cpu-vcores is set to -1 and
yarn.nodemanager.resource.detect-hardware-capabilities is true.
</description>
<name>yarn.nodemanager.resource.count-logical-processors-as-cores</name>
<value>false</value>
</property>

<!-- 虚拟核数和物理核数乘数,默认是1.0 -->
<property>
<description>Multiplier to determine how to convert phyiscal cores to
vcores. This value is used if yarn.nodemanager.resource.cpu-vcores
is set to -1(which implies auto-calculate vcores) and
yarn.nodemanager.resource.detect-hardware-capabilities is set to true. The number of vcores will be calculated as number of CPUs * multiplier.
</description>
<name>yarn.nodemanager.resource.pcores-vcores-multiplier</name>
<value>1.0</value>
</property>

<!-- NodeManager使用内存数,默认8G,修改为4G内存 -->
<property>
<description>Amount of physical memory, in MB, that can be allocated
for containers. If set to -1 and
yarn.nodemanager.resource.detect-hardware-capabilities is true, it is
automatically calculated(in case of Windows and Linux).
In other cases, the default is 8192MB.
</description>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>4096</value>
</property>

<!-- nodemanager的CPU核数,不按照硬件环境自动设定时默认是8个,修改为4个 -->
<property>
<description>Number of vcores that can be allocated
for containers. This is used by the RM scheduler when allocating
resources for containers. This is not used to limit the number of
CPUs used by YARN containers. If it is set to -1 and
yarn.nodemanager.resource.detect-hardware-capabilities is true, it is
automatically determined from the hardware in case of Windows and Linux.
In other cases, number of vcores is 8 by default.</description>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>4</value>
</property>

<!-- 容器最小内存,默认1G -->
<property>
<description>The minimum allocation for every container request at the RM in MBs. Memory requests lower than this will be set to the value of this property. Additionally, a node manager that is configured to have less memory than this value will be shut down by the resource manager.
</description>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>1024</value>
</property>

<!-- 容器最大内存,默认8G,修改为2G -->
<property>
<description>The maximum allocation for every container request at the RM in MBs. Memory requests higher than this will throw an InvalidResourceRequestException.
</description>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>2048</value>
</property>

<!-- 容器最小CPU核数,默认1个 -->
<property>
<description>The minimum allocation for every container request at the RM in terms of virtual CPU cores. Requests lower than this will be set to the value of this property. Additionally, a node manager that is configured to have fewer virtual cores than this value will be shut down by the resource manager.
</description>
<name>yarn.scheduler.minimum-allocation-vcores</name>
<value>1</value>
</property>

<!-- 容器最大CPU核数,默认4个,修改为2个 -->
<property>
<description>The maximum allocation for every container request at the RM in terms of virtual CPU cores. Requests higher than this will throw an
InvalidResourceRequestException.</description>
<name>yarn.scheduler.maximum-allocation-vcores</name>
<value>2</value>
</property>

<!-- 虚拟内存检查,默认打开,修改为关闭 -->
<property>
<description>Whether virtual memory limits will be enforced for
containers.</description>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

<!-- 虚拟内存和物理内存设置比例,默认2.1 -->
<property>
<description>Ratio between virtual memory to physical memory when setting memory limits for containers. Container allocations are expressed in terms of physical memory, and virtual memory usage is allowed to exceed this allocation by this ratio.
</description>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>2.1</value>
</property>

28、容量调度器多队列

好处:

  1. 可以避免因为单队列耗尽所有资源而导致集群不可用
  2. 可以实现任务的降级,在特殊时期保证重要的任务队列资源充足

28.1、多队列配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
<!-- capacity-scheduler.xml 中配置如下 -->
<!-- 指定多队列,增加hive队列 -->
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>default,hive</value>
<description>
The queues at the this level (root is the root queue).
</description>
</property>

<!-- 降低default队列资源额定容量为40%,默认100% -->
<property>
<name>yarn.scheduler.capacity.root.default.capacity</name>
<value>40</value>
</property>

<!-- 降低default队列资源最大容量为60%,默认100% -->
<property>
<name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
<value>60</value>
</property>

<!-- 新加队列添加必要属性 -->
<!-- 指定hive队列的资源额定容量 -->
<property>
<name>yarn.scheduler.capacity.root.hive.capacity</name>
<value>60</value>
</property>

<!-- 用户最多可以使用队列多少资源,1表示 -->
<property>
<name>yarn.scheduler.capacity.root.hive.user-limit-factor</name>
<value>1</value>
</property>

<!-- 指定hive队列的资源最大容量 -->
<property>
<name>yarn.scheduler.capacity.root.hive.maximum-capacity</name>
<value>80</value>
</property>

<!-- 启动hive队列 -->
<property>
<name>yarn.scheduler.capacity.root.hive.state</name>
<value>RUNNING</value>
</property>

<!-- 哪些用户有权向队列提交作业 -->
<property>
<name>yarn.scheduler.capacity.root.hive.acl_submit_applications</name>
<value>*</value>
</property>

<!-- 哪些用户有权操作队列,管理员权限(查看/杀死) -->
<property>
<name>yarn.scheduler.capacity.root.hive.acl_administer_queue</name>
<value>*</value>
</property>

<!-- 哪些用户有权配置提交任务优先级 -->
<property>
<name>yarn.scheduler.capacity.root.hive.acl_application_max_priority</name>
<value>*</value>
</property>

<!-- 任务的超时时间设置:yarn application -appId appId -updateLifetime Timeout
参考资料:https://blog.cloudera.com/enforcing-application-lifetime-slas-yarn/ -->

<!-- 如果application指定了超时时间,则提交到该队列的application能够指定的最大超时时间不能超过该值。
-->
<property>
<name>yarn.scheduler.capacity.root.hive.maximum-application-lifetime</name>
<value>-1</value>
</property>

<!-- 如果application没指定超时时间,则用default-application-lifetime作为默认值 -->
<property>
<name>yarn.scheduler.capacity.root.hive.default-application-lifetime</name>
<value>-1</value>
</property>
<!-- 向集群分发配置 -->
<!-- 重启 Yarn 或者执行命令:yarn rmadmin -refreshQueues -->
1
2
3
4
5
# 执行任务时指定队列
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.3.jar wordcount -D mapreduce.job.queuename=hive /input /optput
# 也可以在 Driver 类中中指定
# new Configuration().set("mapreduce.job.qunuename","hive")

28.2、任务优先级

1
2
3
4
5
6
<!-- 修改yarn-site.xml文件,增加以下参数 -->
<property>
<name>yarn.cluster.max-application-priority</name>
<value>5</value>
</property>
<!-- 分发配置,重启 yarn -->
1
2
3
4
# 提交任务时,指定优先级 mapreduce.job.priority=优先级
hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.3.jar pi -D mapreduce.job.priority=5 5 2000000
# 提升正在执行的任务的优先级
yarn application -appID <ApplicationID> -updatePriority 优先级

29、公平调度器

创建两个队列,分别是 test 和 hqz(以用户所属组命名)。

期望实现以下效果:若用户提交任务时指定队列,则任务提交到指定队列运行;

若未指定队列,test 用户提交的任务到 root.group.test 队列运行,hqz提交的任务到 root.group.hqz 队列运行(注:group 为用户所属组)。

公平调度器的配置涉及到两个文件,一个是 yarn-site.xml,另一个是公平调度器队列分配文件 fair-scheduler.xml(文件名可自定义)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<!-- 修改yarn-site.xml文件,加入以下参数 -->
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
<description>配置使用公平调度器</description>
</property>

<property>
<name>yarn.scheduler.fair.allocation.file</name>
<value>/opt/module/hadoop-3.1.3/etc/hadoop/fair-scheduler.xml</value>
<description>指明公平调度器队列分配配置文件</description>
</property>

<property>
<name>yarn.scheduler.fair.preemption</name>
<value>false</value>
<description>禁止队列间资源抢占</description>
</property>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<!-- 配置fair-scheduler.xml -->
<?xml version="1.0"?>
<allocations>
<!-- 单个队列中Application Master占用资源的最大比例,取值0-1 ,企业一般配置0.1 -->
<queueMaxAMShareDefault>0.5</queueMaxAMShareDefault>
<!-- 单个队列最大资源的默认值 test hqz default -->
<queueMaxResourcesDefault>4096mb,4vcores</queueMaxResourcesDefault>

<!-- 增加一个队列test -->
<queue name="test">
<!-- 队列最小资源 -->
<minResources>2048mb,2vcores</minResources>
<!-- 队列最大资源 -->
<maxResources>4096mb,4vcores</maxResources>
<!-- 队列中最多同时运行的应用数,默认50,根据线程数配置 -->
<maxRunningApps>4</maxRunningApps>
<!-- 队列中Application Master占用资源的最大比例 -->
<maxAMShare>
<!-- 增加一个队列hqz -->
<queue name="hqz" type="parent">
<!-- 队列最小资源 -->
<minResources>2048mb,2vcores</minResources>
<!-- 队列最大资源 -->
<maxResources>4096mb,4vcores</maxResources>
<!-- 队列中最多同时运行的应用数,默认50,根据线程数配置 -->
<maxRunningApps>4</maxRunningApps>
<!-- 队列中Application Master占用资源的最大比例 -->
<maxAMShare>0.5</maxAMShare>
<!-- 该队列资源权重,默认值为1.0 -->
<weight>1.0</weight>
<!-- 队列内部的资源分配策略 -->
<schedulingPolicy>fair</schedulingPolicy>
</queue>

<!-- 任务队列分配策略,可配置多层规则,从第一个规则开始匹配,直到匹配成功 -->
<queuePlacementPolicy>
<!-- 提交任务时指定队列,如未指定提交队列,则继续匹配下一个规则; false表示:如果指定队列不存在,不允许自动创建-->
<rule name="specified" create="false"/>
<!-- 提交到root.group.username队列,若root.group不存在,不允许自动创建;若root.group.user不存在,允许自动创建 -->
<rule name="nestedUserQueue" create="true">
<rule name="primaryGroup" create="false"/>
</rule>
<!-- 最后一个规则必须为reject或者default。Reject表示拒绝创建提交失败,default表示把任务提交到default队列 -->
<rule name="reject" />
</queuePlacementPolicy>
</allocations>
1
2
3
4
5
# 分发配置之后,重启 Yarn
# 提交任务时指定队列,按照配置规则,任务会到指定的 root.test 队列
hadoop jar /opt/module/hadoop-3.3.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.3.jar pi -Dmapreduce.job.queuename=root.test 1 1
# 提交任务时不指定队列,按照配置规则,任务会到 root.hqz 队列
hadoop jar /opt/module/hadoop-3.3.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.3.jar pi 1 1

29.1、Yarn的Tool接口

可以实现动态传参

1
ToolRunner.run(conf, tool, Arrays.copyOfRange(args, 1, args.length));
  • Copyrights © 2022-2023 hqz

请我喝杯咖啡吧~

支付宝
微信