下面提到的NN,2NN,nn,2nn之类的都是缩写,要写全
Hadoop是什么,组件有哪些
Hadoop是一个分布式系统,大数据框架,是一个 存储系统+计算框架 的软件框架。
组件:
yarn
HDFS
MR
yarn是什么,组件有哪些
yarn(Yet Another Resource Negotiator,另一种资源调度器),统一资源管理系统。
组件:
AM(ApplicationMaster):为应用程序申请资源并分配内部的任务,任务的监控和容错
NM(NodeManager):管理单个节点上的资源、处理来自AM、RM的命令
RM(ResourceManager):处理客户端请求、监控NM、启动或监控AM、资源的分配和调度
Container(底层资源抽象,实际上是NM创建的一个动态概念-节点)
HDFS是什么,组件有哪些
HDFS(Hadoop Distributed File System),分布式文件系统。
组件:
Client
NN(NameNode)
DN(DataNode)
2NN(SecondaryNameNode)
下面是扩展内容:
块的大小决定:(设置太小会导致寻址时间过长,设置太大会导致传输时间过长
可以配置参数决定(默认1.x版本64M,2.x/3.x版本内为128M)
给定磁盘传输速度XMB/s,给出查询时间为N s,则最佳的文件块大小是多少?
最佳:N/0.01*X=100NX(查询时间为传输时间的1%最佳)
NameNode(存储文件的元数据(文件名、文件目录结构、文件属性),以及每个文件的快列表和块所在的DataNode等):
(1)管理HDFS的名称空间;
(2)配置副本策略;
(3)管理数据块(Block)映射信息;
(4)处理客户端读写请求。
DataNode(在本地文件系统存储文件块数据,以及数据块的校验和):
(1)存储实际的数据块;
(2)执行数据块的读/写操作。
Client:
(1)文件切分。文件上传HDFS的时候,client将文件切分成一个一个的Block,然后进行上传;
(2)与NameNode交互,获取文件的位置信息;
(3)与DataNode交互,读取或者写入数据;
(4)Client提供一些命令来管理HDFS,比如NameNode格式化;
(5)Client可以通过一些命今来访问HDFS,比如对HDFS增删查改操作;
SecondaryNameNode(每隔一段时间对NameNode元数据备份):
(1)辅助NameNode,分担其工作量,比如定期合并Fsimage和Edits,并推送给NameNode;
(2)在紧急情况下,可辅助恢复NameNode。
MR是什么,组件有哪些
MR(MapReduce),分布式计算框架,用于大规模数据集(>1TB)的并行计算。
组件:
Mapper类
Reducer类
Driver类
MapReduce优缺点
优点
(1)MapReduce易于编程
(2)良好的扩展性
(3)高容错性
(4)适合PB级以上海量数据的离线处理
缺点
(1)不擅长实时计算
(2)不擅长流式计算
(3)不擅长DAG(有向无环图)计算
集群部署规划主要修改哪几个文件
目录:$HADOOP_HOME/etc/hadoop$
四个文件:core-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml(三大组件配置+hadoop核心配置)
*具体修改内容
core-site.xml:指定NN地址,Hadoop存储目录,配置HDFS网页登录静态用户名
hdfs-site.xml:指定NN Web访问地址、2NN web访问地址、配置参数:块大小、副本数量
yarn-site.xml:指定shuffle、指定RM地址、配置参数:资源管理器、节点管理器。
mapred-site.xml:指定MR运行在yarn上、历史服务器端地址,历史服务器web端地址、配置参数:任务跟踪器、作业跟踪器
集群部署规划有哪些注意事项?
NN和2NN不能安装在同一台服务器
RM也很消耗内存,不能和上面两个放同一台服务器。
hosts修改
/etc/hosts,192.168.10.102 hadoop102
如果要改hostname,/etc/hostname
文件中不可有多余的空格
HDFS的优缺点有哪些
优点:(1)高容错性、
(2)适合处理大数据、
(3)可构建在廉价机器上,通过多副本机制,提高可行性
缺点:(1)不适合低延时数据访问、
(2)无法高效对大量小文件进行存储、
(3)不支持并发写入、文件随机修改
集群启动
整体HDFS:start-dfs.sh/stop-dfs.sh
整体Yarn:start-yarn.sh/stop-yarn.sh
单个HDFS:hdfs –daemon start/stop nn/dn/2nn
单个yarn:yarn –daemon start/stop rm/nm
自定义脚本:myhadoop.sh start/stop
文件读流程
1)客户端请求下载,NameNode给出副本存储的DataNode地址
(副本在集群中有多个,但并不是每个DataNode都有副本存在的,所以给地址
2)就近挑选DataNode下载副本,输出Packet为单位校验,存以Packet为单位
先缓存,再写入文件,每次选择datanode都是在同距离下选择随机的一个
文件写流程
1)请求上传文件,查询文件是否存在,父目录是否存在,返回是否可上传
2)可上传后请求block传输给哪几个datanode,返回datanode节点地址
3)和datanode建立联系,1和2,2和3联系,最终反馈给客户端
4)传输block,传给1再给2再给3
5.)重复block的请求,联系datanode,传输block
网络拓扑、节点距离计算
就是拓扑类型,集群—数据中心—机房—节点,画出拓扑图即可,线路为距
两个节点到达最近的共同祖先的距离总和。
副本存储节点选择
(第一个在客户端或者随机)A-N0—-B-N1—-B-NX(随机选择)(机架随机)
若客户端在集群外则随机
NN和SNN工作机制
假如是第一次启动那么会创建Fsimage和Edit,如果不是,那么将fsimage读入
同时加载Edit的更新操作
客户端增删改查的操作,NN会记录到Edit中
SNN会询问是否需要Checkpoint,返回如果需要那么会拷贝Fsimage和Edit到SNN
然后在SNN中对两者进行合并形成一个新文件fsimage.checkpoint,返回NN中
NN会接受这个文件并重命名为fsimage,至此就完成了一次更新操作
Fsimage和Edit解析
Fsimage实际上就是一个永久的系统数据历史记录检查点
Edit就是每一次HDFS系统中任何更新、写操作都会记录的地方
当Edit满了或者别的触发条件到了以后fsimage就会合并此两者
即记录成为了当下的系统数据检查点,代替原来的
DN工作机制
先向NN注册,注册成功后每周期会上报其所有块信息
每一段时间就会向NN返回心跳,每次心跳的返回结果都会有NN给DN的命令
超过一段时间不返回心跳,NN就会认为这个DN不可用
DN数据完整性
DataNode读取block时会通过校验算法计算CheckSum,若和创建不同则损坏
此时Client客户端就和会选择另外一个DataNode进行读取Block,重复过程到
DataNode也会周期性检查CheckSum的值
DN节点掉线时限
正常DataNode都会每3秒返回一个心跳,若一个周期内不返回认为不可用
超时时长可以人工设置,配置为hdfs-site.xml
Heartbeat.recheck.interval, dfs.heartbeat.interval为需要配置的点
其中超时时长=2A+10B,其中A的单位是毫秒,B的单位是秒(切记!!
A和B分别对应上述的recheck和非recheck
掉线时间参数设置:
配置hdfs-site.xml文件中的heartbeat.recheck.interval,以及dfs.heartbeat.interval
A2+B10即为掉线时长,其中A的单位是毫秒,B的单位是秒
A、B分别对应上述的
Mapreduce切片方法、机制
默认切片:
默认TestInputFormat类实现切片,对任务按文件规划切片;按行读取每行数据
Fileinputformat切片流程:
见第十六页(MapReduce,值得看)
找到文件存储目录,遍历目录下的文件,获取文件大小,计算切片大小
形成切片,将切片信息写入规划文件中,最终提交切片规划文件到yarn上
Fileinputformat切片机制:
默认以Block大小进行切片,一个Block默认为128M(2.x/3.x)
针对每个文件进行分别切片,F1和F2分别切片,不会考虑合并切片
例:
file1.txt 320M
file2.txt 10M
F1-1:0-128、F1-2:128-256、F1-3:256-320、F2-1:0-10
Combiner切片机制:(用于小文件过多的情况下)
设置setMaxInputSplitSize大小为核心,首先以此作文件的虚拟存储
小于此数据的,单独存储,大于此数据但小于其两倍大小的,平均分两份存储
大于两倍该数据的,首先先分一个数据块大小,剩下的再根据上述逻辑作判断
如15.8M,Size设置4M,分2个4M的存储,剩下的7.8M平均分两个3.9M
虚拟存储后便是切片机制,存储区若大于等于设置数据,则单独切片
若小于设置数据则合并下一存储区域的数据直到大于等于设置数据为止
例:
a.txt 1.7M
b.txt 5.1M
c.txt 3.4M
d.txt 6.8M
1.7M单独存储,5.1M分两个2.55M存储,3.4M单独存储,6.8M分两个3.4M
切片则为(1.7+2.55)、(2.55+3.4)、(3.4+3.4)(相加大于block则分为一个切片
上课强调HDFS的基本命令
格式化NameNode(首次启动):hdfs namenode -format
Start-dfs.sh、start-yarn.sh(启动集群命令)(在RM配置的节点上启动)
Hadoop fs + 在集群上进行文件操作(如果在本地就不需要这个,并且不需要“-“
Put、copyfromlocal从本地文件系统中拷贝到HDFS路径中
Movefromlocal:从本地粘贴到HDFS
Appendtofile:追加一个文件到已经存在的文件末尾 后面输入路径A 路径B
Get/copytolocal:从HDFS复制到本地
Hadoop执行jar:hadoop jar xxx.jar(本地路径) /xxx/xx.txt /xxx(后两者均为集群路径
一些基本命令(linux常用的)
-ls:显示目录信息(后面可以加目录路径
-cat:显示文件内容(同上
-chown:修改某目录权限(跟着权限路径,叠加目录
-mkdir:创建路径
-cp:复制粘贴某文件到某路径
-mv:移动某文件到另外一个路径
-rm:删除一个文件/文件夹
-rm -r:递归删除该路径下的所有文件、文件夹
-du:统计文件夹的大小、信息
-tail:显示一个文件末尾1kb的信息
文件的压缩:tar -czvf archive.tar.gz /path/to/directory_or_file
文件的解压:tar -xzvf archive.tar.gz
调度器有哪些、分别的算法
fifo、capacity shchedule、fair shchedule
fifo:单队列,先进先出,时间顺序
容量:深度优先算法分配队列资源选择资源占用率低的分配资源,按照任务优先级、提交时间分配资源,按照容器优先级分配资源
公平:优先选择对资源的缺额较大的队列分配资源
Apache Hadoop3.1.3和CDH默认的资源调度器
A默认capacity,CDH默认为fair
在运行MapReduce程序时,输入的文件格式包括那些?
基于行的日志文件、二进制格式文件、数据库表等。
默认是按什么样的模式进行切片的?是如何读取信息的?
TextInputFormat切片机制
按行读取
Shuffle位于MapReduce流程中的哪个阶段?
Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。
MapReduce的工作流程中出现了几次排序,分别在哪个阶段,采用的是什么类型的排序方法,并说明原因?
3次排序
第一次:Map阶段环形缓冲区溢写前先按照分区编号Partition进行快速排序,然后按照key进行快速排序(升序)。
数据是乱序,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序,方便后续的排序
第二次:Map的Merge阶段对溢写的文件(key)进行归并排序(升序)
对第一次排序后的部分有序的数据进行高效排序与合并,最终得到一个大文件
第三次:Reduce的Sort阶段对key进行归并排序
由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可
Shuffle阶段缓冲区的大小默认是多少?
100MB
Shuffle阶段缓冲区的使用率一般达到多少比例后进行反向溢写?
80%
缓冲区主要存储哪些数据?
索引(index,partition,keystart,valstart)与数据(key,value)
在溢写之前,需要进行排序,对什么进行排序,按照什么顺序进行排序?
对缓冲区的中间键值对进行排序
按照升序
ReduceTask的数量大于分区数量,会出现什么样的结果?
产生出几个空的输出文件
ReduceTask的数量大于1,但小于分区数量,会出现什么样的结果?
会产生IO异常
当ReduceTask的数量等于1时,分区文件不小于1时,会出现什么样的结果?
所有数据都输出到1个文件中
Partitioner-k,v-的类型与mapper的输入-k,v-还是输出-k,v-的类型相同?
mapper的输出
分区是在MapReduce框架中哪一个阶段进行的?
shuffle阶段
MapTask和ReduceTask的数据排序是以什么标准来排的?
基于key的字典序,按照升序来排的。
对于MapTask,它会将处理的结果暂时放到哪里的?
处理的结果暂时放到环形缓冲区中,当环形缓冲区使⽤率达到⼀定阈值后,再对缓冲区中的数据进行⼀次快速排序,并将这些有序数据溢写到磁盘上。
MapReduce框架中的快速排序最先在那个时候进行排序?排序是在哪里进行的?
Map阶段环形缓冲区溢写前的时候
在内存中的环形缓冲区中进行
什么时候进行归并排序?
Map的Merge阶段对溢写的文件(key)进行归并排序
Reduce的Sort阶段统一对key进行归并排序
默认排序是按照什么标准进行排序的?
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
如何实现自定义排序?
使‘键’实现WritableComparable接口重写compareTo方法,并实现序列化和反序列化。
Yarn的常用命令有哪些?
列出所有
Application: yarn application -list
根据Application状态过滤:
yarn application -list -appStates
Kill掉Application:
yarn application -kill application_1612577921195_0001
查询Application日志:
yarn logs -applicationId
查询Container日志:
yarn logs -applicationId
列出所有Application尝试的列表:
yarn applicationattempt -list
打印ApplicationAttemp状态:
yarn applicationattempt -status
列出所有Container:
yarn container -list
打印Container状态:
yarn container -status
列出所有节点:
yarn node -list -all
加载队列配置:
yarn rmadmin -refreshQueues
打印队列信息:
yarn queue -status
如何实现Tool接口,进而实现命令行动态传参?
(1)新建Maven项目YarnDemo.pom
(2)新建com.atguigu.yarn包名
(3)创建类WordCount并实现Tool接口
(4)新建WordCountDriver
(5)在HDFS上准备输入文件,假设为/input目录,向集群提交该Jar包
yarn jar YarnDemo.jar com.atguigu.yarn.WordCountDriver wordcount /input /output
WordCountMapper类(重点)
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 切割
String[] words = line.split(" ");
// 3 输出
for (String word : words) {
k.set(word);
context.write(k, v);
}
}
}
Extends后mapper<>内四个分别为:
输入键类型,输入值类型,输出键类型,输出值类型
K,V的声明通过Hadoop中的输出类赋值
Protect声明的map,为了防止同一个包内其他类对其进行调用,仅限于框架内可以
Map中的变量分别为:
输入键类型,输入值类型,输出的上下文对象(收集mapper的输出)
String line = value .toString() 是为了把每一行读取的内容,化为一段字符串
再根据split(””)通过空格进行分割单词,在word数组中存储每个单词
再是循环words数组中每一个元素并命名为word,设置到K中
最后再是封装K、V到上下文对象中,因为V已经在前面声明过变量值为1
WordCountReducer类(重点)
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
int sum;
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
// 1 累加求和
sum = 0;
for (IntWritable count : values) {
sum += count.get();
}
// 2 输出
v.set(sum);
context.write(key,v);
}
}
继承reducer类,后面的变量也是同样的,但需要和map的输出相互对应
例如map的输出类型是text,intwritable,那么相对应reducer的输入也是要一样的
Throws都是一些抛出异常的异常类型,防止出现异常问题而程序依旧运行的
因为默认是按照键进行分区的,所以每个分区内都是同样键对应的键值对
而因为有多个maptask程序,最终输出的分区文件也是多个,而不是一个
如果不用迭代器进行遍历是无法得到同一键所对应的值的,迭代器声明、变量声明
所以这里也就使用了Iterable<Intwritable> value来声明变量,取得其键值对的值
Value内存储的就是相同键值对的一系列值,最终累加起来就是一个键所对应的值
通过contest类写入输出文件内,每一个reduce在这里设置为处理一个单词任务
最终写入v中(注意一般都是v.get(X)的类型来写入,最终写出键值对总和值)
WordCountDriver类(重点)
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1 获取配置信息以及获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 关联本Driver程序的jar
job.setJarByClass(WordCountDriver.class);
// 3 关联Mapper和Reducer的jar
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4 设置Mapper输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
一开始就设置了main中的输入变量声明:String[] args
这是为了调用时进行路径输入,后续6中也能看到arg[0],arg[1]即为输入的路径
Arg[0]中存储的就是命令jar包时的输入文件路径,arg[1]中存储的是输出危机路径
这个路径都需要是hdfs集群中的相对位置,而非本地
Configuration conf = new Configuration();创建了一个Hadoop配置对象
Job job = Job.getInstance(conf);创建了一个job对象,通过其配置和提交任务
job.setJarByClass(WordCountDriver.class);关联主类Driver类
再是关联mapper类,reducer类
再是设置mapper的输入输出的kv类型
再是设置最终的输出kv类型
再是设置输入、输出路径
再是提交job,true的含义是任务完成后返回打印
Exit是为了根据执行结果决定程序设置的退出状态
序列化是指把内存的中的对象转为字节序列(重点)
反序列化是指把字节序列转到内存中
public class FlowBean implements Writable,必须实现writable接口
//2 提供无参构造
public FlowBean() {
}
必须提供无参构造函数,名字和类名一样
//4 实现序列化和反序列化方法,注意顺序一定要保持一致
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}
//5 重写ToString
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
重写序列化、反序列化实现方法,writelong就是写到字节序列里面去
Readlong就是从字节序列重新转换到内存中来,两边的up/dowm/sum都是如此
顺序是需要一样的,不然读出来肯定有问题,建议全文背诵
至少需要纸上能默写出来,毕竟是闭卷考
Partitioner:
自定义类继承partitioner,重写getpartitioner方法
再在job驱动中设置自定义partitioner:job.setpartitionerclass(xxx.class)(继承的类
再在job驱动中根据分区设置reducetask的数量job.setnumreducetask(n)
排序:
Bean作为key传递,实现writablecomparable接口,重写compare to类
文件从磁盘上传到集群怎么提交?(流程是什么)(重点)
hadoop fs -copyFromLocal /local/path/to/file /hdfs/target/path,这是直接命令
还有一种就是API操作,既然上课谈了流程那么考的大概率就是API上传文件代码
首先初始化连接:
private FileSystem fs;
@Before
public void init() throws IOException, URISyntaxException, InterruptedException {
//连接集群的nn地址
URI uri = new URI("hdfs://ch1:9000");
//创建一个配置文件(hadoop配置对象)
Configuration configuration = new Configuration();
//用户
String user = "ch";
// 1 获取客户端对象
fs = FileSystem.get(uri, configuration, user);
}
连接集群NN地址、创建配置文件,获取客户端对象
public void testPut () throws IOException {
//参数解读:参数一:表示删除原数据;参数二:是否允许覆盖;参数三:原数据路径;参数四:目的地路径
fs.copyFromLocalFile(newPath("C://Users//carSales.csv"), new Path("Marvels/carSales.csv"));
}
完成对文件的上传后关闭资源
public void close() throws IOException {
// 3 关闭资源
fs.close();
}
在主类中调用这三个类
public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
App app = new App();
try {
app.init(); // 初始化连接
app.testPut();
} catch (Exception e) {
e.printStackTrace();
} finally {
app.close(); // 关闭资源
}
}
后面还有自定义类的部分,比如自定义排序,自定义继承,自定义啥啥啥的,这里就不放了。
希望rp++,看了wordcount和序列化就给过!
实际上没这么多内容,最后一节课老师讲了什么就会考什么。