大数据框架Hadoop(四)-MapReduce1

大数据框架Hadoop(四)-MapReduce1

MapReduce概述

  • MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题。
  • MapReduce是分布式运行的,由两个阶段组成:Map和Reduce,Map阶段是一个独立程序,有很多节点同时运行,每个节点处理一部分数据。Reduce阶段是一个独立的程序,有很多节点同时运行,每个节点处理一部分数据。
  • MapReduce框架都有默认实现,用户只需要覆盖map()和reduce()两个函数,即可实现分布式计算。两个函数的形参和返回值都是<key,value>

MR执行过程-map阶段

  • map任务处理

    1. 框架使用InputFormat类的子类把输入文件(夹)划分为很多InputSplit,默认每个HDFS的block对应一个InputSplit。通过RecordReader类,把每一个InputSplit解析成一个个<k1,v1>。默认框架对每一个InputSplit中的每一行,解析成一个<k1,v1>。

    2. 框架调用Mapper类中的map(…)函数,map函数的形参是<k1,v1>对,输出是<k2,v2>对。一个InputSplit对应一个map task。我们可以覆盖map函数,实现自己的逻辑。

    3. (假设reduce存在)框架对map输出的<k2,v2>进行分区。不同的分区中的<k2,v2>由不同的reduce task处理。默认只有1个分区。

      (假设reduce不存在)框架对map结果直接输出到HDFS中。

    4. (假设reduce存在)框架对每个分区中的数据,按照k2进行排序分组。分组指的是相同k2的v2分成一组。分组不会减少<k2,v2>的数量。

    5. (假设reduce存在,可选)在map节点,框架可以执行reduce规约(combine)

    6. (假设reduce存在)框架会把map task输出的<k2,v2>写入到Linux的磁盘文件中。

  • 至此,map阶段结束。

  • reduce阶段

    1. 框架对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。这个过程称作shuffle。

    2. 框架对reduce端接收的map任务输出的相同分区的<k2,v2>数据进行合并、排序、分组。

    3. 框架调用Reducer类中的reduce方法,reduce方法的形参是<k2,{v2,…}>,输出是<k3,v3>。一个<k2,{v2,…}调用一次reduce函数。我们可以覆盖reduce函数,实现自己的逻辑。

    4. 把reduce的输出保存到HDFS中。

  • 至此,整个reduce阶段结束。

shuffle过程

mapreduce_shuffle

MapReduce执行流程总结

  • Map Task
    • 读取:框架调用InputFormat类的子类读取HDFS中文件数据,把文件转换为InputSplit。默认,文件的一个block对应一个InputSplit,一个InputSplit对应一个map task。一个InputSplit中的数据会被RecordReader解析成<k1,v1>。默认,InputSplit中的一行解析成一个<k1,v1>。默认v1表示一行的内容,k1表示偏移量。
    • map:框架调用Mapper类中的(k1,v1)方法,接收<k1,v1>,输出<k2,v2>。有多少个<k1,v1>,map会被执行多少次。我们可以覆盖map(),实现自己的业务逻辑。
    • 分区:框架对map的输出进行分区。分区的目的是确定哪些<k2,v2>进入哪个reduce task。默认只有一个分区。
    • 排序分组:框架对不同分区中的<k2,v2>进行排序、分组。
      • 排序是按照k2进行排序。
      • 分组是指相同k2的v2分到一个组中。
    • combiner:可以在map task中对<K2,v2>执行reduce规约。
    • 写入本地:框架对map的输出写入到Linux本地磁盘。
  • Reduce Task
    • shuffle:框架根据map不同分区,将数据,通过网络copy到不同的reduce节点。
    • 合并排序分组:每个reduce会把多个map传来的<k2,v2>进行合并、排序、分组。
    • reduce:框架调用reduce(k2,v2s).有多少个分组,就会执行多少次reduce函数。
    • 写入HDFS:框架对reduce的输出写入到HDFS中。

查看MapReduce任务输出日志

  • historyserver进程作用
    • 把收集散落在弄得manager节点上的日志

停止Yarn上的任务

  • yarn application -kill
  • 在shell中ctrl+c无法停止程序,因为程序已经提交到yarn集群运行了。
  • yarn application -kill 不仅可以停止mr任务,只要是在yarn上运行的任务都可以使用这个命令杀掉进程。

MapReduce在Yarn上执行流程

  • 首先,Resource Manager会为每一个application(比如一个用户提交的MapReduce Job)在NodeManager里面申请一个container,然后在该container里面启动一个Application Master。container在Yarn中是分配资源的容器(内存、cpu、硬盘等),它启动时便会相应启动一个JVM。
  • 然后,Aplication Master便陆续为application包含的每一个task(一个Map Task或Reduce task)向Resource Manager申请一个container。等每得到一个container后,便要求该container所属的NodeManager将此container启动,然后就在这个container里面执行相应的task。
  • 等这个task执行完后,这个container便会被NodeManager收回,而container所拥有的JVM也相应地被退出。

Yarn核心组件功能

  • Yarn Client
    • Yarn Client提交Application到RM,它会首先创建一个Application上下文对象,并设置AM必须的资源请求信息,然后提交到RM。Yarn client也可以与RM通信,获取一个已经提交并运行的Application的状态信息等。
  • ResourceManager(RM)
    • RM是Yarn集群的Master,负责管理整个集群的资源和资源分配。RM作为集群资源的管理和调度的角色,如果存在单点故障,则整个集群的资源都无法使用。在2.4.0版本才新增RM HA的特性,这样就增加了RM的可用性。
  • NodeManager(NM)
    • NM是Yarn集群的Slave,是集群中实际拥有实际资源的工作节点。我们提交Job以后,会将组成Job的多个Task调度到对应的NM上进行执行。Hadoop集群中,为了获得分布式计算中的Locality特性,会将DN和NM在同一个节点上运行,这样对应的HDFS上的Block可能就在本地,而无需在网络间进行数据的传输。
  • Container
    • Container是Yarn集群中资源的抽象,将NM上的资源进行量化,根据需要组成一个个Container,然后服务于已授权资源的计算任务。计算任务在完成计算后,系统会回收资源,以供后续计算任务申请使用。Container包含两种资源:内存和CPU,后续Hadoop版本可能会增加硬盘、网络等资源。
  • ApplicationMaster(AM)
    • AM主要管理监控部署在Yarn集群上的Application,以MapReduce为例,MapReduce ApplicationMaster是一个用来处理MapReduce计算的服务框架程序,为用户编写的MapReduce程序提供运行时支持。通常我们在编写的一个MapReduce程序可能包含多个Map Task或Reduce Task,而各个Task的运行管理与监控都是由这个MapReduce ApplicationMaster来负责,比如运行Task的资源申请,由AM向RM申请;启动/停止NM上某Task的对应的Container,由AM向NM请求来完成。

Map中的shuffle

  1. 每个map有一个环形内存缓存区,用于存储map的输出。默认大小100MB(io.sort.mb属性),一旦达到阈值0.8(io.sort.spill.percent),一个后台线程把内容溢写到(spill)磁盘的指定目录(mapred.local.dir)下的一个新建文件中。
  2. 写磁盘前,要partition,sort。如果有combiner,combine排序后数据。
  3. 等最后记录写完,合并全部文件为一个分区且排序的文件。
  • Reduce通过Http方式得到输出文件的特点分区的数据。
  • 排序阶段合map输出。然后走Reduce阶段。
  • reduce执行完之后,写入到HDFS中。

Hadoop序列化的特点

  • hadoop序列化的特点:
    1. 紧凑:高效使用存储空间。
    2. 快速:读写数据的额外开销小。
    3. 可扩展:可透明地读取老格式的数据。
    4. 互操作:支持多语言的交互
  • Java序列化的不足:
    1. 不精简。附加信息多。不大适合随机访问。
    2. 存储空间大。递归地输出类的超类描述直到不再有超类。
    3. 扩展性差。而Writable方便用户自定义
  • Hadoop的序列化格式:Writable。

Writable接口

  • Writable接口,是根据DataInput和DataOutput实现的简单、有效的序列化对象。
  • MR的任意Key和value必须实现Writable接口
  • MR的任意key必须实现WritableComparable接口

常用的Writable实现类

Java基本类型 Writable 序列化大小(字节)
布尔型(boolean) BooleanWritable 1
字节型(byte) ByteWritable 1
整型(int) IntWritable/VIntWritable 4/1~5
浮点型(float) FloatWritable 4
长整型(long) LongWritable/VLongWritable 8/1~9
双精度浮点型(double) DoubleWritable 8
  • Text一般认为它等价于java.lang.String的Writable。针对UTF-8序列。
  • NullWritable是单例,获取实例使用NullWritable.get()。

MapReduce默认输入处理类

  • InputFormat
    • 抽象类,只是定义了两个方法
  • FileInputFormat
    • FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,FileInputFormat保存作为Job输入的所有文件,并实现了对输入文件计算splits的方法。至于获得记录的方法是有不同的子类——TextIntFormat进行实现的。
  • TextInputFormat
    • 是默认的处理类,处理普通文本文件。
    • 把文件中每一行作为一个记录,它将每一行在文件中的起始偏移量作为key,每一行的内容作为value。
    • 默认以\n或回车作为一行记录。

InputSplit

  • 在执行mapreduce之前,原始数据被分割成若干split,每个split作为一个map任务的输入。
  • 当Hadoop处理很多小文件(文件大小小于hdfs block大小)的时候,由于FileInputFormat不会对小文件进行划分,所以每一个小文件都会被当作一个InputSplit并分配一个map任务,会有大量的map task运行,导致效率低下。
  • 例如:一个1G的文件会被划分成8个128MB的split,并分配8个map任务处理,而10000个100kb的文件会被10000个map任务处理
  • map任务的数量
    • 一个InputSplit对应一个Map task
    • InputSplit的大小是由Math.max(minSize,Math.min(maxSize,blockSize))决定
    • 单节点建议运行10——100个map task
    • map task执行时长不建议低于1分钟,否则效率低
  • 特殊:一个输入文件大小为140M,会有几个map task?1个
    • FileInputFormat类中的getSplits

RecordReader

  • 每一个InputSplit都有一个RecordReader,作用是把InputSplit中的数据解析成Record,即<k1,v1>。
  • 在TextInputFormat中的RecordReader是LineRecordReader,每一行解析成一个<k1,v1>。其中,k1表示偏移量,v1表示文本内容。

MapReduce其他输入类

  • DBInputFormat

    • 一般不用,实际工作中会把数据库的数据导出到hdfs上,然后再进行计算。
  • CombineFileInputFormat

    • 将多个小文件合成一个split作为输入
    • 相对于大量的小文件来说,Hadoop更适合处理少量的大文件
    • CombineFileInputFormat可用缓解这个问题,它是针对小文件而设计的
    • 针对小文件的解决思路
      • 一种是使用这个CombineFileInputFormat
      • 另一种是使用SequenceFileFormat
    • 实际工作中针对小文件建议使用下面方案处理
      1. 源头尽量避免产生很多小文件,因为小文件会导致namenode内存消耗过高【默认每一个文件在namenode中大致占用150字节】
      2. 源头无法避免的话,可以定时对小文件进行合并,建议合并为SequenceFile
      3. 如果上述两种方法都无法解决的话再使用这种方案。
  • KeyValueTextInputFormat

    • 当输入数据的每一行是两列,并用tab分离的形式的时候,KeyValueTextInputFormat处理这种数据非常合适

    • 1
      2
      3
      4
      //	如果行中有分隔符,那么分隔符前面的作为key,后面的作为value;如果行中没有分隔符,那么整行作为key,value为空
      job.setInputFormatClass(KeyValueTextInputFormat.class);
      // 默认分隔符就是制表符,可以使用其他分隔符
      // conf.setStrings(KeyValueLineRecordReader.KEY_VALUE_SEPERATOE,"\t");
  • NLineInputFormat

    • NLineInputFormat可以控制在每个split中数据的行数

    • 1
      2
      3
      4
      //设置具体输入类
      job.setInputFormatClass(NLineInputFormat.class);
      // 设置每个split的行数
      NLineInputFormat.setNumLinesPerSplit(job,Integer.parseInt(args[2]));
  • SequenceFileInputFormat

    • 当输入文件格式是sequenceFile的时候,要使用SequenceFileInputFormat作为输入。
  • MultipleInputs

    • 1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      FileSystem fileSystem = FileSystem.newInstance(conf)
      fileSystem.delete(new Path(outPath),true);

      Path path1 = new Path(inputPath1);
      MultipleInputs.addInputPath(job,path1,TextInputFormat.class,Mapper1.class);
      Path path2 = new Path(intputPath2);
      MultipleInputs.addInputPath(job,path2,SequenceFileInputFormat.class,Mapper2.class);

      job.setReduceClass(WordCountReducer.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(LongWritable.class)
      FileOutputFormat.setOutputPath(job,new Path(outputPath));

OutputFormat

  • MapReduce的输出
  • TextOutputFormat
    • 默认输出格式,key和value中间用tab隔开
  • DBOutputFormat
    • 一般不用,实际工作中会把数据库的数据保存到hdfs上,然后再导入到db中。
  • SequenceFileOutputFormat
    • 将key和value以sequenceFile格式输出。
  • SequenceFileAsOutputFormat
    • 将key和value以原始二进制的格式输出。
  • MapFileOutputFormat
    • 将key和value写入MapFile中。由于MapFile中的key是有序的,所以写入的时候必须保证记录时按key值顺序写入的。
  • MultipleOutputFormat(多路输出)
    • 默认情况下一个reducer会产生一个输出,但是有些时候我们想一个reduce产生多个输出,MultipleOutputFormat和MultipleOutputs可以实现这个功能。

SequenceFile和MapFile

  • 小文件存在的问题
    • Hadoop的HDFS和MapReduce子框架主要是针对大数据文件来设计的,再小文件的处理上不但效率低下,而且十分消耗内存资源(每个小文件占用一个Block,每一个block的元数据都存储再namenode的内存里,每个文件信息和块信息大约都要占150字节)。
    • 解决办法通常是选择一个容器,将这些小文件组织起来统一存储。HDFS提供了两种类型的容易,分别是SequenceFile和MapFile。
  • SequenceFile
    • SequenceFile是Hadoop API提供的一种二进制文件。这种二进制文件直接将<key,value>对序列化到文件中。一般对小文件可以使用这种文件合并,即将文件名作为key。文件内容作为value序列化到大文件中。这种文件格式有以下好处:
      • 支持压缩,且可定制为基于Record或Block压缩(Block级压缩性能较优)
      • 本地化任务支持:因为文件可以被切分,因此MapReduce执行任务时数据的本地化情况应该是非常好的。
      • 对key、value的长度进行了定义,(反)序列化速度比较快
      • 缺点时需要一个合并文件的过程,文件较大,且合并后的文件不方面查看,必须通过遍历查看每一个小文件。
  • MapFile
    • MapFile是排序后的SequenceFile,通过观察其目录结构可以看到MapFile由两部分组成,分别是index和data。
    • index作为文件的数据索引,主要记录了每个Record的key值,以及该Record在文件中的便宜位置。在MapFile被访问的时候,索引文件会被加载到内存,通过索引映射关系可迅速定位到指定Record所在文件位置,因此,相对SequenceFile而言,MapFile的检索效率是高效的,缺点是消耗一部分内存来存储index数据。

MapReduce引用第三方jar包

  • 第一种:将第三方jar包和你的MapReduce程序打成一个jar包。

    • 优点:使用的时候方便,只需要指定mapreduce的jar包即可
    • 缺点:如果依赖的jar包很多,会造成打的依赖包很大,上传服务器会比较慢
  • 第二种: 使用libjars这个参数

    • Hadoop jar hello.jar packagename.className -libjars /data/fastjson-1.2.47.jar /inputpath /outputpath

    • 优点:mapreduce的jar包中包含业务代码,打包以及上传都很快。多个依赖jar之间用逗号隔开

      • jar包的路径可以使用hdfs路径
    • 缺点:每次启动jar包的时候都需要在后面指定一堆依赖的jar名称

      • 解决方案:可以使用shell脚本保存命令

      • 想要使用 -libjars需要调整一下代码

      • 1
        2
        3
        4
        5
        6
        Configuration conf = new Configuration();
        // 处理命令行传递的参数
        String[] remainArgs = new GenericOptionsPaser(conf,args).getRemainArgs();
        // 此时这两个参数不能直接从main函数的args参数中获取,需要从remainingArgs中获取
        String inputPath = remainingArgs[0];
        String outPutPath = remainingArgs[1];

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×