1、hadoop

1.1 hadoop介绍

1)hadoop是一个由Apache基金会所开发的分布式系统基础架构。

2)主要解决海量数据的存储和海量数据的分析计算问题。

3)Hadoop生态圈

1.2 Lucence框架

1)Lucene框架使Doug Cutting开创的开源软件,用户Java书写代码,实现与Google类似的全文搜索功能,它提供了全文检索引擎的架构,包括完整的查询引擎和索引引擎。

2)2001年年底Lucene成为Apache基金会的一个子项目。

3)对于海量数据的场景,Lucence面对与Google同样的困难,存储数据困难,检索速度慢。

4)学习和模仿Google解决这些问题的办法:微型版Nutch。

5)Google是Haddop的思想之源。

6)2003-2004年,Google公开了部分GFS和MapReduce思想的细节,以此为基础Doug Cutting等人用了2年业余时间实现了DFS和MapReduce机制,使Nutch性能飙升。

7)2005年Hadoop作为Lucene的子项目Nutch的一部分正式引入Apache基金会。

8)2006年3月份,Map-Reduce和Nutch Distributed File System(NDFS)分别被纳入称为Hadoop的项目中。

9)名字来源于Doug Cutting儿子的玩具大象。

1.3 hadoop三大发行版本

Apache、Cloudera、Hortonworks。

1.4 hadoop的优势(面试)

1)高可靠性:Hadoop底层维护多个数据副本,所以即使Hadoop某个计算元素或存储出现故障,也不会导致数据的丢失。

2)高扩展性:在集群间分配任务数据,可方便的扩展数以千计的节点。

3)高效性:在MapReduce的思想下,Hadoop是并行工作的,以加快任务处理速度。

4)高容错性:能够自动将失败的任务重新分配。

1.5 hadoop1.x 和hadoop2.x区别

hadoop1.x组成:

  • Common(辅助工具)、HDFS(数据存储)、MapReduce(计算+资源调度)
  • CPU 8 内存:128MB 磁盘 8T

hadoop2.x组成:

  • Common(辅助工具)、HDFS(数据存储)、MapReduce(计算)、yarn(资源调度)
  • CPU 8 内存:128MB 磁盘 8T

对比:hadoop1.x时代,MapReduce既处理运算,又处理资源调度,耦合性较大。hadoop1.x时代,增加了Yarn。Yarn只负责资源的调度,MapReduce只负责运算。

1.6 组件架构

1、hdfs架构概述

1)NameNode(nn):存储文件的元数据,如文件名,文件目录结构,文件属性(生成时间、福本数、文件权限),以及每个文件的块列表和块所在的DataNode等。

2)DataNode(dn):在本地文件系统存储文件数据,以及数据的校验和。

3)Secondary NameNode(2nn):用来监控HDFS状态的辅助后台程序,每隔一段时间获取HDFS元数据的快照。加快启动速度。

2、Yarn架构

1)ResourceManager (RM)主要作用如下

(1)处理客户端请求

(2)监控NodeManager

(3)启动或监控ApplicationMaster(集群中运行的一个job)

(4)资源的分配与调度

yarn架构
yarn架构

2)NodeManager(NM)主要作用如下

(1)管理单个节点上的资源

(2)处理来自ResourceManager的命令

(3)处理来自ApplicationMaster的命令

3)ApplicationMaster(AM)作用如下

(1)负责数据的切分

(2)为某个程序申请资源并分配给内部的任务

(3)任务的监控与容错

4)Container

container是Yarn中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等。

3、MapReduce架构概述

MapReduce将计算过程分为两个阶段:Map和Reduce

1)Map阶段并行处理输入数据。

2)Reduce阶段对Map结果进行汇总

1.7 大数据技术生态体系

大数据技术生态体系
大数据技术生态体系

图中涉及的技术名词解释如下:

1)Sqoop:Sqoop是一款开源的工具,主要用于在Hadoop、Hive与传统的数据库(MySql)间进行**数据的迁移,可以将一个关系型数据库(例如 :MySQL,Oracle 等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。

2)Flume:Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

3)Kafka:Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:

(1)通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。

(2)高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息

(3)支持通过Kafka服务器和消费机集群来分区消息。

(4)支持Hadoop并行数据加载

4)Storm:Storm用于“连续计算”,对数据流做连续查询,在计算时就将结果以流的形式输出给用户。

5)Spark:Spark是当前最流行的开源大数据内存计算框架。可以基于Hadoop上存储的大数据进行计算

6)Oozie:Oozie是一个管理Hdoop作业(job)的工作流程调度管理系统。

7)Hbase:HBase是一个分布式的、面向列的开源数据库。HBase不同于一般的关系数据库,它是一个适合于非结构化数据存储的数据库。

8)Hive:Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的SQL查询功能,可以将SQL语句转换为MapReduce任务进行运行。 其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析。

10)R语言:R是用于统计分析、绘图的语言和操作环境。R是属于GNU系统的一个自由、免费、源代码开放的软件,它是一个用于统计计算和统计制图的优秀工具。

11)Mahout:Apache Mahout是个可扩展的机器学习和数据挖掘库。

12)ZooKeeper:Zookeeper是Google的Chubby一个开源的实现。它是一个针对大型分布式系统的可靠协调系统,提供的功能包括:配置维护、名字服务、 分布式同步、组服务等。ZooKeeper的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。

1.8 hadoop重要目录

(1)bin目录:存放对Hadoop相关服务(HDFS,YARN)进行操作的脚本

(2)etc目录:Hadoop的配置文件目录,存放Hadoop的配置文件

(3)lib目录:存放Hadoop的本地库(对数据进行压缩解压缩功能)

(4)sbin目录:存放启动或停止Hadoop相关服务的脚本

(5)share目录:存放Hadoop的依赖jar包、文档、和官方案例

1.9完全分布式运行模式

1、准备三台客户机(关闭防火墙、静态ip、主机名称)

2、安装JDK

3、配置环境变量

4、安装hadoop

5、配置环境变量

6、配置集群

7、单点启动

8、配置ssh

9、群起并测试集群

2、HDFS

2、1 介绍

产生背景:数据量大,一个操作系统存不过来,需要一种系统管理多台机器上的文件

定义:HDFS(Hadoop Distributed File System),分布式文件管理系统。用于存储文件,通过目录树来定位文件。

使用场景:适合一次写入,多次读出的场景,且不支持文件的修改。

2.2.1 优点

1、高容错性

(1)数据自动保存多个副本。通过增加副本的形式,提高容错性。

(2)某个副本丢失后,可以自动恢复

2、适合处理大数据

(1)数据规模:那个处理数据规模达到GB、TB、甚至PB级别的数据;

(2)文件规模:能够处理百万规模以上的文件数量,数量相当之大。

3、可构建在廉价机器上,通过多副本机制,提高可靠性。

2.2.2 缺点

1)不适合低延时数据访问,比如毫秒级的存储数据,是做不到的。

2)无法高效的对大量小文件进行存储。

(1)存储大量小文件的话,它会占用NameNode大量的内存来存储文件目录块信息。这样是不可取的,因为NameNode的内存总是有限的;

(2)小文件存储的寻址时间会超过读取时间,它违反了HDFS的设计目标。

3)不支持并发写入、文件随机修改

(1)一个文件只能有一个写,不允许 多个线程同时写。

(2)仅支持数据append(追加),不支持文件的随机修改。

2.3 组件功能

1)NameNode(nn):就是Master,它是一个管理者。

(1)管理HDFS的名称空间

(2)配置副本策略

(3)管理数据块(Block)映射信息

(4)处理客户端读写请求

2)DataNode:就是slave。NameNode下达命令。DataNode执行实际的操作。

(1)存储实际的数据块

(2)执行数据块的读/写操作

3)client:客户端

(1)文件切分。文件上传HDFS的时候,Client将文件切分成一个一个的Block,然后进行上传;

(2)与NameNode交互,获取文件的位置信息;

(3)与DataNode交互,读取或者写入数据;

(4)Client提供一些命令来管理HDFS,比如NameNode格式化;

(5)Client可以通过一些命令来访问HDFS,比如对HDFS增删查改操作;

4)Secondary NameNode:并非NameNode的热备。当NameNode挂掉的时候,它并不能替代NameNode并提供服务。

(1)辅助NameNode,分担其工作量,比如定期合并Fsimage和Edits,并推送给NameNode;

(2)在紧急情况下,可辅助恢复NameNode。

2.4 文件块大小

HDFS中的文件在物理上是分块存储(Block),块的大小可以通过配置参数(dfs.blocksize)来规定,默认大小在Hadoop2.x版本中是128M,老版中是64M。

1、集群中的block,2、如果寻址时间约为10ms,即查到目标block的时间为10ms,3寻址时间为传输时间的1%时,则为最佳状态。因此,传输时间=10ms/0.01=1000ms=1s。4 而目前磁盘的传输速率普遍为100MB/s。

思考:为什么块的大小不能设置太小,也不能设置太大?

1)HDFS的块设置太小,会增加寻址时间,程序一直在找块的开始位置;

2)如果块设置的太大,从磁盘传输数据的时间会明显大于定位这个块开始位置所需要的时间。导致程序在处理这块数据时。会非常慢。

总结:HDFS块的大小设置主要取决于磁盘传输速率。

2.5 HDFS的数据流(面试重点)

1、HDFS的写数据流程

1568448314627
1568448314627

然后重复3-7步骤

2、网络拓扑-节点距离计算

节点距离:两个节点到达最近的共同祖先的距离总和。

1568448821111
1568448821111

3、副本节点选择

1568449200994
1568449200994

4、HDFS的读数据流程

1568449571543
1568449571543

2.6 NameNode和SecondaryNameNode(面试开发重点)

1、NN和2NN工作机制

思考:NameNode中的元数据是存储在哪里的?

首先,我们做个假设,如果存储在NameNode节点的磁盘中,因为经常需要进行随机访问,还有响应客户请求,必然是效率过低。因此,元数据需要存放在内存中。但如果只存在内存中,一旦断电,元数据丢失,整个集群就无法工作了。因此产生在磁盘中备份元数据的FsImage

这样又会带来新的问题,当在内存中的元数据更新时,如果同时更新FsImage,就会导致效率过低,但如果不更新,就会发生一致性问题,一旦NameNode节点断电,就会产生数据丢失。因此,引入Edits文件(只进行追加操作,效率很高)。每当元数据有更新或者添加元数据时,修改内存中的元数据并追加到Edits中。这样,一旦NameNode节点断电,可以通过FsImage和Edits的合并,合成元数据。

但是,如果长时间添加数据到Edits中,会导致该文件数据过大,效率降低,而且一旦断电,恢复元数据需要的时间过长。因此,需要定期进行FsImage和Edits的合并,如果这个操作由NameNode节点完成,又会效率过低。因此,引入一个新的节点SecondaryNamenode,专门用于FsImage和Edits的合并。

1568450098120
1568450098120
  1. 第一阶段:NameNode启动

(1)第一次启动NameNode格式化后创建FsimageEdits文件。如果不是第一次启动直接加载编辑日志和镜像文件到内存。

(2)客户端对元数据进行增删改的请求。

(3)NameNode记录操作日志更新滚动日志

(4)NameNode在内存中对数据进行增删改

\2. 第二阶段:Secondary NameNode工作

​ (1)Secondary NameNode询问NameNode是否需要CheckPoint。直接带回NameNode是否检查结果。

​ (2)Secondary NameNode请求执行CheckPoint。

​ (3)NameNode滚动正在写的Edits日志。

​ (4)将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode。

​ (5)Secondary NameNode加载编辑日志和镜像文件到内存,并合并

​ (6)生成新的镜像文件fsimage.chkpoint

​ (7)拷贝fsimage.chkpoint到NameNode。

​ (8)NameNode将fsimage.chkpoint重新命名成fsimage。

注:

NN和2NN工作机制详解:

Fsimage:NameNode内存中元数据序列化后形成的文件。

Edits:记录客户端更新元数据信息的每一步操作(可通过Edits运算出元数据)。

NameNode启动时,先滚动Edits并生成一个空的edits.inprogress,然后加载Edits和Fsimage到内存中,此时NameNode内存就持有最新的元数据信息。Client开始对NameNode发送元数据的增删改的请求,这些请求的操作首先会被记录到edits.inprogress中(查询元数据的操作不会被记录在Edits中,因为查询操作不会更改元数据信息),如果此时NameNode挂掉,重启后会从Edits中读取元数据的信息。然后,NameNode会在内存中执行元数据的增删改的操作。

由于Edits中记录的操作会越来越多,Edits文件会越来越大,导致NameNode在启动加载Edits时会很慢,所以需要对Edits和Fsimage进行合并(所谓合并,就是将Edits和Fsimage加载到内存中,照着Edits中的操作一步步执行,最终形成新的Fsimage)。SecondaryNameNode的作用就是帮助NameNode进行Edits和Fsimage的合并工作。

SecondaryNameNode首先会询问是否需要(触发需要满足两个条件中的任意一个,定时时间到和中数据写满了)。直接带回是否检查结果。执行操作,首先会让滚动并生成一个空的,滚动的目的是给打个标记,以后所有新的操作都写入,其他未合并的和会拷贝到的本地,然后将拷贝的和加载到内存中进行合并,生成,然后将拷贝给,重命名为后替换掉原来的。在启动时就只需要加载之前未合并的和即可,因为合并过的中的元数据信息已经被记录在中。

1568451755272
1568451755272

2.7 DataNode 工作机制

2.7.1DataNode掉线时限参数设置

1、DataNode进程死亡或者网络故障造成DataNode无法与NameNode通信

2、NameNode不会立即把该节点判定为死亡,要经过一段时间,这段时间暂称作超时时长。

3、HDFS默认的超时时长为10分钟+30秒。

4=如果定义超时时间为TimeOut,则超时时长的计算公式为:timeout = 2XXX(分钟)+10\YYY(秒)。

2.7.2 服役新数据节点、退役旧数据节点

1、添加白名单

2、黑名单退役

3、不允许白名单和黑名单中同时出现同一个主机名称。

2.8 HDFS新特性

2.8.1 集群间的数据拷贝

1.scp实现两个远程主机之间的文件复制

1
2
3
4
5
scp -r hello.txt [root@hadoop103:/user/atguigu/hello.txt](mailto:root@hadoop103:/user/atguigu/hello.txt)             // 推 push

scp -r [root@hadoop103:/user/atguigu/hello.txt hello.txt](mailto:root@hadoop103:/user/atguigu/hello.txt hello.txt) // 拉 pull

scp -r [root@hadoop103:/user/atguigu/hello.txt](mailto:root@hadoop103:/user/atguigu/hello.txt) root@hadoop104:/user/atguigu //是通过本地主机中转实现两个远程主机的文件复制;如果在两个远程主机之间ssh没有配置的情况下可以使用该方式。

2.采用distcp命令实现两个Hadoop集群之间的递归数据复制

1
2
[atguigu@hadoop102 hadoop-2.7.2]$  bin/hadoop distcp
hdfs://haoop102:9000/user/atguigu/hello.txt hdfs://hadoop103:9000/user/atguigu/hello.txt

2.8.2 小文件存档

1、HDFS存储小文件的弊端

大量的小文件会消耗NameNode中的大部分内存。但注意,存储小文件所需要的磁盘容量与数据块的大小无关。

2、解决存储小文件的办法之一

HDFS存档文件或HAR文件,是一个更高效的文件存档工具,它将文件存入HDFS块,在减少NameNode内存使用的同时,允许对文件进行透明的访问。也就是说HDFS存档问你件对内还是一个一个独立文件,对NameNode而言却是一个整体,减少了NameNode的内存

3、实操

1)先启动hadoop集群

2)把某目录下的所有文件归档成xx.har的归档文件,并把归档后文件存储到xxx/output路径下。

1
[root@master ~]# hadoop archive -archiveName input.har -p /data/input /data/out

3)查看归档

1
2
3
4
[root@master ~]# hadoop fs -ls -R har:///data/out/input.har
-rw-r--r-- 3 root supergroup 38 2019-09-15 23:21 har:///data/out/input.har/1
-rw-r--r-- 3 root supergroup 26 2019-09-15 23:21 har:///data/out/input.har/2
-rw-r--r-- 3 root supergroup 97 2019-09-13 22:54 har:///data/out/input.har/wc.input

4)解归档文件

1
hadoop fs -cp har:/// data/output/input.har/*    /data

2.8.3 回收站功能参数

1、默认值fs.trash.interval = 0, 0表示禁用回收站;其他值表示设置文件的存活时间。

2、默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为0,则该值设置和fs.trash.interval的参数值相等。

3、要求:fs.trash.checkpoint.interval <= fs.trash.interval 。< p>

2.9 HDFS HA高可用

策略:消除单点故障

2.9.1HDFS-HA工作要点

  1. 元数据管理方式需要改变

内存中各自保存一份元数据;

Edits日志只有Active状态的NameNode节点可以做写操作

两个NameNode都可以读取Edits

共享的Edits放在一个共享存储中管理(qjournal和NFS两个主流实现);

  1. 需要一个状态管理功能模块

实现了一个zkfailover,常驻在每一个namenode所在的节点,每一个zkfailover负责监控自己所在NameNode节点,利用zk进行状态标识,当需要进行状态切换时,由zkfailover来负责切换,切换时需要防止brain split现象的发生。

  1. 必须保证两个NameNode之间能够ssh无密码登录

  2. 隔离(Fence),即同一时刻仅仅有一个NameNode对外提供服务

img
img

3、MapReduce

定义:MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。

核心功能是将用户编写的业务逻辑代码自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。

3.1 优缺点

1、优点:

1、MapReeduce易于编程:它简单的实现一些接口,就可以完成一个分布式程序。

2、良好的扩展性。通过简单的增加机器来扩展计算能力。

3、高容错性:如果一台机器宕机,它可以把上面的计算任务转移到另一个节点上运行,不至于这个任务运行的失败。

4、适合PB级以上海量数据的离线处理

缺点:

1、不擅长实时计算

2、不擅长流式计算:流式计算的输入数据时动态的,而MapReduce的输入数据集是静态的。

3、不擅长DAG(有向图)计算:多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,造成大量的磁盘IO,导致性能非常的低下。

3.2 核心编程思想

img
img

一个完整的MapReduce程序在分布式运行时有三类实例进程:

1)MrAppMaster:负责整个程序的过程调度及状态协调

2)MapTask:负责Map阶段的整个数据处理流程。

3)ReduceTask:负责Reduce阶段的整个数据处理流程。

3.3 编程规范

1、Mapper阶段

(1)用户自定义的Mapper要继承自己的父类

(2)Mapper的输入数据时KV对的形式(KV的类型可自定义)

(3)Mapper中的业务逻辑写在map()方法中

(4)Mapper的输出数据时KV对的形式(KV的类型可自定义)

(5)map()方法(MapTask进程)对每一个调用一次

2、Reducer阶段

(1)用户自定义的Reducer要继承自己的父类

(2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV

(3)Reducer的业务逻辑卸载reduce()方法中

(4)ReducerTask进程对每一组相同的k的组调用一次reduce()方法

3、Driver阶段

相当于Yarn集群的客户端,用于提交我们整个程序到Yarn集群,提交的是封装了MapReduce程序相关运行参数的job对象。

3.4 wordcount

img
img

3.5 序列化和反序列化(重要)

3.5.1 定义与作用

序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)网络传输

反序列化就是将字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

为什么要序列化:对象只生存在内存里,关机断电就莫得了,序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。

!!!为什么不用java的序列化

java的序列化是重量级序列化框架,一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。

hadoop序列化的特点

1)紧凑:高效实用存储空间。

2)快速:读写数据的额外开销小。

3)可扩展:随着通信协议的升级而可升级

4)互操作:支持多语言的交互

3.5.2 操作

七步走:

(1)必须实现Writable接口

(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造

1
2
3
public FlowBean() {
super();
}
1
2
3
4
5
@Override   
public void write(DataOutput out) throws IOException { out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}

(4)重写反序列化方法

1
2
3
4
5
6
@Override   
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}

(5)注意反序列化的顺序和序列化的顺序完全一致

(6)要想把结果显示在文件中,需要重写toString(),可用”\t”分开,方便后续用。

(7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。详见后面排序案例。

1
2
3
@Override   
public int compareTo(FlowBean o) { // 倒序排列,从大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

3.6.2 切片与MapTask并行度决定机制

1.问题引出

MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。

思考:1G的数据,启动8个MapTask,可以提高集群的并发处理能力。那么1K的数据,也启动8个MapTask,会提高集群性能吗?MapTask并行任务是否越多越好呢?哪些因素影响了MapTask并行度?

2.MapTask并行度决定机制

数据块:Block是HDFS物理上把数据分成一块一块。

数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。

img

3.6.3 FileInputFormat切片源码解析:

重点:

默认情况:切片大小= blocksize

每次切片时,要判断切完剩余部分是否大于块的1.1倍,不大于1.1倍就划分1块切片

img
img

切片时不考虑数据集整体,而是逐个对每一个文件单独切片

3.6.4 FileInputFormat切片大小的参数配置

img
img

3.6.5 CombineTextInputFormat切片机制

框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下

1、应用场景:

CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。

2、虚拟存储切片最大值设置

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m

注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。

3、切片机制

生成切片过程包括:虚拟存储过程和切片过程二部分。!!!!

img
img

3.6 MapReduce详细工作流程

1568639170123
1568639170123
1568639177258
1568639177258

3.7 Shuffle机制

3.7.1 Partition分区

1、问题引出

要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)

2、默认Partitioner分区

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。如图4-14所示。

img
img

3.7.2 WritableComparable排序

排序概述:

​ 排序是MapReduce框架中最重要的操作之一。

​ MapTask和ReduceTask均会对数据按照Key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。

​ 默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。

​ 对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。

​ 对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

排序的分类

​ 1、部分排序

​ MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。

​ 2、全排序

​ 最终输出结果只有一个文件,且文件内部有序。实现方法是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。

​ 3、辅助排序(GroupingComparator分组)

​ 在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。

​ 4、二次排序

​ 在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

1、实现全排序

2、实现区内排序

3.7.3 Combiner合并

(1)Combiner是MR程序中Mapper和Reducer之外的一种组件。

(2)Combiner组件的父类就是Reducer

(3)Combiner和Reducer的区别在于运行的位置

Combiner是在每一个MapTask所在的节点运行

Reducer是接收全局所有Mapper的输出结果

(4)Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。

(5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且Combiner的输出kv应该跟Reducer的输入kv类型对应起来。

order举例

3.8 MapReduce +shuffle 工作机制(面试)

img
img

(1)Read阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。

​ (2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。

​ (3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。

​ (4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

​ 溢写阶段详情:

​ 步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。

​ 步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。

​ 步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。

​ (5)Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

​ 当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。

​ 在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

​ 让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

img
img

(1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

​ (2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。

​ (3)Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。

​ (4)Reduce阶段:reduce()函数将计算结果写到HDFS上。

3.9 Join多种应用

3.9.1 Reduce Join

1、Reduce Join工作原理:

​ Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。

​ Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就OK了。

2、缺点及解决方案

​ 缺点:这种方式中,合并的操作是在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。

​ 解决方案:Map端实现数据合并

3.9.2 Map Join

1.使用场景

Map Join适用于一张表十分小、一张表很大的场景。

2.优点

思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?

在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。

3.具体办法:采用DistributedCache

​ (1)在Mapper的setup阶段,将文件读取到缓存集合中。

​ (2)在驱动函数中加载缓存。

// 缓存普通文件到Task运行节点。

job.addCacheFile(new URI(“file://e:/cache/pd.txt”));

3.10 计数器应用

hadoop为每个作业维护若干内置计数器,以描述多项指标。例如,某些计数器记录已处理的字节数和记录数,使用户可监控已处理的输入数据量和已产生的输出数据量

1、计数器API

(1)采用枚举的方式统计计数

(2)采用计数器组、计数器名称的方式统计

(3)计数结果在程序运行后的控制台上查看。

3.11 数据清洗(ETL)

3.12 Hadoop数据压缩

3.12.1 压缩概念

压缩概述:

压缩技术能够有效减少底层存储系统(HDFS)读写字节数。压缩提高了网络带宽和磁盘空间的效率。在运行MR程序时,I/O操作、网络数据传输、Shuffle和Merge要花大量的时间,尤其是数据规模很大和工作负载密集的情况下,因此,使用数据压缩显得非常重要。

数据压缩对于节省资源、最小化磁盘I/O和网络传输非常有帮助。可以在任意MapReduce阶段启动压缩。

压缩策略和原则

优化策略

通过对Mapper、Reducer运行过程的数据进行压缩,以减少磁盘IO,提高MR程序运行速度。

注意:采用压缩技术减少了磁盘IO,但同时也增加了CPU运算负担。所以,压缩特性运用得当能提高性能,但运用不当也可能降低性能。

压缩原则

1、运算密集型的job,少用压缩

2、IO密集型的job,多用压缩

3.12.2 MR支持的压缩编码

压缩格式 hadoop自带? 算法 文件扩展名 是否可切分 换成压缩格式后,原来的程序是否需要修改
DEFLATE 是,直接使用 DEFLATE .deflate 和文本处理一样,不需要修改
Gzip 是,直接使用 DEFLATE .gz 和文本处理一样,不需要修改
bzip2 是,直接使用 bzip2 .bz2 和文本处理一样,不需要修改
LZO 否,需要安装 LZO .lzo 需要建索引,还需要指定输入格式
Snappy 否,需要安装 Snappy .snappy 和文本处理一样,不需要修改

为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器,如下表所示。

压缩格式 对应的编码/解码器
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
Snappy org.apache.hadoop.io.compress.SnappyCodec

压缩性能的比较

压缩算法 原始文件大小 压缩文件大小 压缩速度 解压速度
gzip 8.3GB 1.8GB 17.5MB/s 58MB/s
bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s
LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s

http://google.github.io/snappy/

On a single core of a Core i7 processor in 64-bit mode, Snappy compresses at about 250 MB/sec or more and decompresses at about 500 MB/sec or more.

3.12.3 压缩方式选择

Gzip压缩:

优点:压缩率比较高,压缩/解压速度快;hadoop本身支持,在应用中处理Gzip格式的文件就和直接处理文本一样;大部分Linux系统都自带Gzip命令,使用方便。

缺点:不支持split。

应用场景:当每个文件压缩之后130M以内的(1个块大小内),都可以考虑Gzip压缩格式。例如说一天或者一个小时的日志压缩成一个Gzip文件。

Bzip2压缩:

优点:支持split,具有很高的压缩率,比Gzip压缩率都高;Hadoop本身自带,使用方便。

缺点:压缩/解压速度慢。

应用场景:适合对速度要求不高,但需要较高的压缩率的时候;或者输出之后的数据比较大,处理之后的数据需要压缩存档减少磁盘空间并以后数据用得比较少的情况;或者对单个很大的文本文件想压缩减少存储空间,同时又需要支持split,而且兼容之前的应用程序的情况。

Lzo

优点:压缩/解压速度也比较快,合理的压缩率,支持split,是Hadoop中最流行的压缩格式;可以在linux系统下安装lzop命令,使用方便。

缺点:压缩率比Gzip要低一些;Hadoop本身不支持,需要安装;在应用中对Lzo格式的文件需要做一些特殊处理(为了支持split需要建索引,还需要指定InputFormat为Lzo格式)。

应用场景:一个很大的文本文件,压缩之后还大于200M以上的可以考虑,而且单个文件越大,Lzo优点越明显

Snappy

优点:高速压缩速度和合理的压缩率。

缺点:不支持Split;压缩率比Gzip要低;Hadoop本身不支持,需要安装。

应用场景:当MapReduce作业的Map输出的数据比较大的时候,作为Map到Reduce的中间数据的压缩格式;或者作为一个MapReduce作业的输出和另一个MapReduce作业的输入。

3.12.4 压缩位置的选择

img
img

5、Yarn资源调度器

Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式操作系统平台,而MapReduce等运算程序则相当于运行于操作系统之上的应用程序

img
img

img

工作机制详解

​ (1)MR程序提交到客户端所在的节点。

​ (2)YarnRunner向ResourceManager申请一个Application。

​ (3)RM将该应用程序的资源路径返回给YarnRunner。

​ (4)该程序将运行所需资源提交到HDFS上。

​ (5)程序资源提交完毕后,申请运行mrAppMaster。

​ (6)RM将用户的请求初始化成一个Task。

​ (7)其中一个NodeManager领取到Task任务。

​ (8)该NodeManager创建容器Container,并产生MRAppmaster。

​ (9)Container从HDFS上拷贝资源到本地。

​ (10)MRAppmaster向RM 申请运行MapTask资源。

​ (11)RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。

​ (12)MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。

​ (13)MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。

​ (14)ReduceTask向MapTask获取相应分区的数据。

​ (15)程序运行完毕后,MR会向RM申请注销自己。

img
img

6、优化

6.1 MapReduce优化

MapReduce程序效率的瓶颈在于两点:

1、计算机性能

​ CPU、内存、磁盘健康、网络

2、I/O操作优化

(1)数据倾斜

(2)Map和Reduce数设置不合理

(3)Map运行时间太长,导致Reduce等待过久

(4)小文件过多

(5)大量的不可分块超大文件

(6)Spill次数过多

(7)Merge次数过多等。

优化方法:

1、数据输入

(1)合并小文件:在执行MR任务前将小文件进行合并,大量的小文件会产生大量的Map任务,增大Map任务装载次数,而任务的装载比较耗时,从而导致MR运行较慢。

(2)采用CombineTextInputFormat来作为输入,解决输入端大量小文件场景。

2、Map阶段

(1)减少溢写(Spill)次数:通过调整io.sort.mb及sort.spill.percent参数值,增大触发Spill的内存上限,减少Spill次数,从而减少磁盘IO.

(2)减少合并(Merge)次数:通过调整io.sort.factor参数,增大Merge的文件数目,减少Merge的次数,从而缩短MR处理时间。

(3)在Map之后,不影响业务逻辑前提下,先进行Combine处理,减少I/O

3、Reduce阶段

(1)合理设置Map和Reduce数

(2)设置Map、Reduce共存。调整slowstart.completedmaps参数,使Map运行到一定程度后,Reduce也开始运行,减少Reduce的等待时间。

(3)规避使用Reduce:因为Reduce在用于连接数据集的时候将会产生大量的网络消耗。

(4)合理设置Reduce端的Buffer

4、I/O传输

(1)采用数压缩的方式

(2)使用SequenceFile二进制文件

5、数据倾斜问题

1)数据倾斜现象

数据频率倾斜——某一个区域的数据量要远远大于其他区域。

数据大小倾斜——部分记录的大小远远大于平均值。

2)减少数据倾斜的方法

1)抽样和范围分区

2)自定义分区

3)Combine

4)采用Map Join, 尽量避免Reduce Join。

6.2HDFS小文件优化

6.2.1 HDFS小文件弊端

HDFS上每个文件都要在NameNode上建立一个索引,这个索引的大小约为150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用NameNode的内存空间,另一方面就是索引文件过大使得索引速度变慢。

6.2.2 HDFS小文件解决方案

小文件的优化无非以下几种方式:

(1)在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS。

(2)在业务处理之前,在HDFS上使用MapReduce程序对小文件进行合并。

(3)在MapReduce处理时,可采用CombineTextInputFormat提高效率。

1、MAVEN内存溢出!

(1)MAVEN install时候JVM内存溢出

处理方式:在环境配置文件和maven的执行文件均可调整MAVEN_OPT的heap大小。(详情查阅MAVEN 编译 JVM调优问题,如:http://outofmemory.cn/code-snippet/12652/maven-outofmemoryerror-method)

(2)编译期间maven报错。可能网络阻塞问题导致依赖库下载不完整导致,多次执行命令(一次通过比较难):

[root@hadoop101 hadoop-2.7.2-src]#mvn package -Pdist,nativeN -DskipTests -Dtar

(3)报ant、protobuf等错误,插件下载未完整或者插件版本问题,最开始链接有较多特殊情况,同时推荐

DataNode和NameNode进程同时只能工作一个。

clusterId不统一
clusterId不统一

10)执行命令不生效,粘贴word中命令时,遇到-和长–没区分开。导致命令失效

解决办法:尽量不要粘贴word中代码。

11)jps发现进程已经没有,但是重新启动集群,提示进程已经开启。原因是在linux的根目录下/tmp目录中存在启动的进程临时文件,将集群相关进程删除掉,再重新启动集群。

12)jps不生效。

原因:全局变量hadoop java没有生效。解决办法:需要source /etc/profile文件。

13)8088端口连接不上

[atguigu@hadoop102 桌面]$ cat /etc/hosts

注释掉如下代码

#127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4

#::1 hadoop102