Hadoop入门

惜我十年前,与君始相识

Posted by zhanghaichao on September 10, 2018

一、初识Hadoop

  1. 分布式文件系统-HDFS

  2. 资源调度系统-YARN

  3. 分布式计算框架-MapReduce

  4. 优势
    • 高可靠性
    • 高扩展性
    • 其他:存储在廉价机器;成熟的生态圈
  5. 生态系统
  1. 常用发行版及选型
    • apache hadoop
    • CDH:不开源,安装方便,jar冲突较少。https://archive.cloudera.com/cdh5/cdh/5/
    • HDP:原装hadoop,开源

二、分布式文件系统HDFS

  1. HDFS概述.
    • 源于谷歌2003年论文GFS的开源实现。
    • 设计目标:非常巨大的分布式文件系统;运行在普通廉价的硬件上;易扩展,为用户提供不错的文件存储服务
  2. HDFS架构.
    • Client:切分文件;访问HDFS;与NameNode交互,获取文件位置信息;与DataNode交互,读取和写入数据。
    • NameNode:Master节点,负责客户端请求的响应;负责元数据(文件名称、副本系数、block存档的datanode)的管理
    • DataNode:Slave节点,存储实际的数据,汇报存储信息给NameNode。
    • Secondary NameNode:辅助NameNode,分担其工作量;定期合并fsimage和fsedits,推送给NameNode;紧急情况下,可辅助恢复NameNode,但Secondary NameNode并非NameNode的热备。
    • NameNode+N个DataNode:最好部署在不同的节点上。
  3. HDFS副本机制.
    副本存放策略:默认3个副本,第一个节点存在当前操作客户端机架上,另外两个节点存在另一台机架上
  4. HDFS环境搭建.
    • 需要预先装的软件:java(jdk,配置环境变量)、SSH
    • 解压hadoop安装包:
      tar -zxvf hadoop-2.6.0-cdh5.7.0.tar.gz -C ~/hadoop
    • 配置文件修改(~/hadoop/etc/hadoop/):

        1.hadoop-env.sh:加入JAVA_HOME环境.  		
        export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home
           
        2.core-site.xml 	 
         <property>   
            <name>fs.defaultFS</name>   
            <value>hdfs://hadoop-1:9000</value>   
         </property>   
         <property>   
            <name>io.file.buffer.size</name>   
            <value>131072</value>   
         </property>   
         <property>   
            <name>hadoop.tmp.dir</name>   
            <value>/hadoop </value>   
         </property>   
          		
        3.hdfs-site.xml  	
         <property>   
      		 <name>dfs.replication</name>   
      		 <value>2</value>    
       </property>   
         
        4.slaves:存储datanode的所有机器的hostname集群环境)
      
    • 启动hdfs.
      • 格式化文件系统(仅第一次执行):bin/hdfs namenode -format
      • 启动hdfs:sbin.start-dfs.sh
      • 验证是否启动成功:jps、http://hadoop-1:50070
  5. HDFS shell.
    • 查看文件:hadoop fs -ls /
    • 上传文件 hadoop fs -put file /
    • 查看文件内容:hadoop fs -text /file hadoop fs -cat /file
    • 创建目录:hadoop fs -mkdir /test
    • 递归创建目录: hadoop fs -mkdir -p /test/a/b
    • 查看递归目录: hadoop fs -ls -R /
    • 从本地拷贝: hadoop fs -copyFromLocal file /test/a/b/desfile
    • 从文件系统拷贝:hadoop fs -get /test/a/b/desfile
    • 删除文件系统文件:hadoop fs -rm file
    • 删除文件系统目录:hadoop fs -rm -rf dir
  6. Java API操作HDFS.

    1.引入依赖(pom.xml)
     	 <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <hadoop.version>2.8.4</hadoop.version>
    </properties>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
    </repository>
       2.测试示例
      /**
     * 创建hdfs目录
     *
     * @throws IOException
     */
    @Test
    public void mkdir() throws IOException {
        fileSystem.mkdirs(new Path("/hdfs/api"));
    }
    
    /**
     * 创建文件
     *
     * @throws IOException
     */
    @Test
    public void create() throws IOException {
        FSDataOutputStream output = fileSystem.create(new Path("/hdfs/api/test.txt"));
        output.write("hello hadoop api".getBytes());
        output.flush();
        output.close();
    }
    
    /**
     * 查看HDFS内容
     *
     * @throws IOException
     */
    @Test
    public void cat() throws IOException {
        FSDataInputStream inputStream = fileSystem.open(new Path("/hdfs/api/test.txt"));
        IOUtils.copyBytes(inputStream, System.out, 1024);
        inputStream.close();
    }
    
    /**
     * 重命名HDFS文件
     *
     * @throws IOException
     */
    @Test
    public void rename() throws IOException {
        Path oldPath = new Path("/hdfs/api/test.txt");
        Path newPath = new Path("/hdfs/api/newtest.txt");
        fileSystem.rename(oldPath, newPath);
    }
    
    /**
     * 上传文件到HDFS
     *
     * @throws IOException
     */
    @Test
    public void copyFromLocalFile() throws IOException {
        Path localPath = new Path("/Users/zhanghaichao/hdu/1.cpp");
        Path hdfsPath = new Path("/hdfs/api/1.cpp");
        fileSystem.copyFromLocalFile(localPath, hdfsPath);
    }
    
    /**
     * 上传文件到HDFS,带进度条
     *
     * @throws IOException
     */
    @Test
    public void copyFromLocalFileWithProgress() throws IOException {
        Path hdfsPath = new Path("/hdfs/api/hadoop-2.8.4.tar.gz");
    
        InputStream inputStream = new BufferedInputStream(new FileInputStream(
                new File("/Users/zhanghaichao/Downloads/hadoop-2.8.4.tar.gz")));
    
        FSDataOutputStream outputStream = fileSystem.create(hdfsPath, new Progressable() {
            @Override
            public void progress() {
                System.out.println(".");
            }
        });
        IOUtils.copyBytes(inputStream, outputStream, 4096);
    
    }
    
    /**
     * 下载HDFS文件
     *
     * @throws IOException
     */
    @Test
    public void copyToLocalFile() throws IOException {
        Path localPath = new Path("/Users/zhanghaichao/1.cpp");
        Path hdfsPath = new Path("/hdfs/api/1.cpp");
        fileSystem.copyToLocalFile(hdfsPath, localPath);
    }
    
    /**
     * 查看HDFS目录下的所有文件
     *
     * @throws IOException
     */
    @Test
    public void listFiles() throws IOException {
        Path hdfsPath = new Path("/hdfs/api");
        FileStatus[] fileStatuses = fileSystem.listStatus(hdfsPath);
        for (FileStatus fileStatus : fileStatuses) {
            String isDir  = fileStatus.isDirectory() ? " is directory," : " is file,";
            short replication = fileStatus.getReplication(); 
            //hdfs shell 上传,副本系数使用配置文件中配置的,java api上传使用默认的(3)
            System.out.println(fileStatus.getPath().toString() + isDir
                    + "  replication is " + replication + "  length is :" + fileStatus.getLen());
        }
    }
    
    /**
     * 删除HDFS文件
     *
     * @throws IOException
     */
    @Test
    public void delete() throws IOException {
        Path hdfsPath = new Path("/fz");
        fileSystem.delete(hdfsPath,false); //默认递归删除,加参数控制
    }
    
    @Before
    public void setUp() throws URISyntaxException, IOException {
        configuration = new Configuration();
        fileSystem = FileSystem.get(new URI(HDFS_PATH), configuration);
        System.out.println("\n hdfs app setup\n");
    }
    
    @After
    public void tearDown() {
        configuration = null;
        fileSystem = null;
    
        System.out.println("\nhdfs app tear down\n");
    
    }
        
    
  7. HDFS文件读写流程
    通过漫画轻松掌握HDFS工作原理
  8. HDFS优缺点
    优点:a.数据冗余,硬件容错;b.处理流失的数据访问;c.适合存储大量数据;d.存储在廉价机器。
    缺点:a.低延迟的数据访问;b.不适合小文件存储(元数据较多,namenode压力大)。

三、资源调度框架YARN

  1. YARN产生背景.
    Hadoop1.x时:
    MapReduce:Master/Slave架构,1个JobTracker带多个TaskTracker

    JobTracker: 负责资源管理和作业调度.
    TaskTracker: 定期向JobTracker汇报本节点的健康状况、资源使用情况、作业执行情况; 接收来自JobTracker的命令:启动任务/杀死任务.

    缺点:单点(jt)压力大,不易扩展;资源利用率低,运维成本高(跨集群数据移动)

    YARN:不同计算框架可以共享同一个HDFS集群上的数据,享受整体的资源调度
    XXX on YARN的好处: 与其他计算框架共享集群资源,按资源需要分配,进而提高集群资源的利用率。
    XXX: Spark/MapReduce/Storm/Flink

  2. YARN概述
    • Yet Another Resource Negotiator。 - 通用的资源管理系统。
      - 为上层的应用提供统一的资源和调度。
  3. YARN架构
    - ResourceManager: RM
    整个集群同一时间提供服务的RM只有一个,负责集群资源的统一管理和调度
    处理客户端的请求: 提交一个作业、杀死一个作业
    监控我们的NM,一旦某个NM挂了,那么该NM上运行的任务需要告诉我们的AM来如何进行处理

    • NodeManager: NM
      整个集群中有多个,负责自己本身节点资源管理和使用
      定时向RM汇报本节点的资源使用情况
      接收并处理来自RM的各种命令:启动Container
      处理来自AM的命令
      单个节点的资源管理

    • ApplicationMaster: AM
      每个应用程序对应一个:RM、Spark,负责应用程序的管理
      为应用程序向RM申请资源(core、memory),分配给内部task
      需要与NM通信:启动/停止task,task是运行在container里面,AM也是运行在container里面

    • Container
      封装了CPU、Memory等资源的一个容器
      是一个任务运行环境的抽象

    • Client
      提交作业
      查询作业的运行进度
      杀死作业

  4. YARN环境搭建

     1)mapred-site.xml,mr指定框架YARN
     <property>
         <name>mapreduce.framework.name</name>
         <value>yarn</value>
     </property>
    	
     2)yarn-site.xml
     <property>
         <name>yarn.nodemanager.aux-services</name>
         <value>mapreduce_shuffle</value>
     </property>
    	
     3) 启动YARN相关的进程
     sbin/start-yarn.sh
    	
     4)验证
         jps
             ResourceManager
             NodeManager
         http://localhost:8088
    	
     5)停止YARN相关的进程
         sbin/stop-yarn.sh
    
  5. 提交mr作业到YARN上运行:
    /Users/zhanghaichao/hadoop-2.8.4/share/hadoop/mapreducehadoop-mapreduce-examples-2.8.4.jar.

    hadoop jar

    hadoop jar hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar pi 2 3

四、分布式处理框架MapReduce

  1. MapReduce概述。
    • 源于google论文,2004.12发表。 Hadoop MapReduce是Google MapReduce的克隆版
    • 优点:海量数据离线处理&易开发&易运行
    • 缺点:不能实时流式计算
  2. MapReduce编程模型。
    • 作业拆分成map阶段和reduce阶段.
    • Map阶段:map tasks.
    • Reduce极端:reduce tasks.
    • 核心概念。
      Split:交由MapReduce作业来处理的数据块,是MapReduce中最小的计算单元
      HDFS:blocksize 是HDFS中最小的存储单元 128M
      默认情况下:他们两是一一对应的,也可以手工设置他们之间的关系(不建议)
      InputFormat: 将我们的输入数据进行分片(split): InputSplit[] getSplits(JobConf job, int numSplits)
      TextInputFormat: 处理文本格式的数据 OutputFormat: 输出
  3. MapReduce架构。
    • MapReduce1.x的架构
      1)JobTracker: JT
      作业的管理者
      将作业分解成一堆的任务:Task(MapTask和ReduceTask)
      将任务分派给TaskTracker运行
      作业的监控、容错处理(task作业挂了,重启task的机制)
      在一定的时间间隔内,JT没有收到TT的心跳信息,TT可能是挂了,TT上运行的任务会被指派到其他TT上去执行

      2)TaskTracker: TT 任务的执行者
      在TT上执行我们的Task(MapTask和ReduceTask)
      会与JT进行交互:执行/启动/停止作业,发送心跳信息给JT

      3)MapTask
      自己开发的map任务交由该Task
      解析每条记录的数据,交给自己的map方法处理
      将map的输出结果写到本地磁盘(有些作业只仅有map没有reduce==>HDFS)

      4)ReduceTask
      将Map Task输出的数据进行读取
      按照数据进行分组传给我们自己编写的reduce方法处理
      输出结果写到HDFS

    • MapReduce2.x的架构

  4. MapReduce实现WordCount(使用IDEA+Maven开发)。
    详细代码
    • 开发
    • 编译:mvn clean package -DskipTests
    • 上传到服务器:scp target/hadoop-train-1.0.jar hadoop@hadoop000:~/lib
    • 运行 hadoop jar hadoop-demo-1.0-SNAPSHOT.jar com.cc.hadoop.mapreduce.WordCount hdfs://localhost:9000/in hdfs://localhost:9000/out

    相同的代码和脚本再次执行,会报错 security.UserGroupInformation: PriviledgedActionException as:hadoop (auth:SIMPLE) cause: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://hadoop000:8020/output/wc already exists Exception in thread “main” org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://hadoop000:8020/output/wc already exists

    注:在MR中,输出文件是不能事先存在的
    1)先手工通过shell的方式将输出文件夹先删除 hadoop fs -rm -r /output/wc
    2) 在代码中完成自动删除功能:

     Path outputPath = new Path(args[1]);   
        
     FileSystem fileSystem = FileSystem.get(configuration);  
        
     if(fileSystem.exists(outputPath)){
        
         fileSystem.delete(outputPath, true);
            
         System.out.println("output file exists, but is has deleted");
            
     }  - MapReduce编程之Combiner     在map端做一个reduce操作,简化数据,提高传输速度  - MapReduce编程之Partitioner     [详细代码](https://github.com/zhanghaichao520/hadoopdemo/blob/master/src/main/java/com/cc/hadoop/mapreduce/Parititoner.java)
    
  5. jobHistory
    • 记录运行完的MR信息到指定的HDFS目录下
    • 默认不开启
    • 启动方法:
     cd hadoop-2.8.4/etc/hadoop/ 
     vim mapred-site.xml 
     加入下面四组标签:
     <property>
        <name>mapreduce.jobhistory.address</name>
        <value>localhost:10020</value>
    </property>
     <property>
        <name>mapreduce.jobhistory.webapp.address</name>
        <value>localhost:19888</value>
    </property>
    <property>
        <name>mapreduce.jobhistory.done-dir</name>
        <value>/history/done</value>
    </property>
    <property>
        <name>mapreduce.jobhistory.intermediate-done-dir</name>
        <value>/history/done_intermediate</value>
    </property>
    vim yarn-site.xml
    加入下面的标签
    <property>
         <name>yarn.log-aggregation-enable</name>
         <value>true</value>
    </property>
    

五、Hadoop项目实战

用户行为日志:用户每次访问网站时所有的行为数据(访问、浏览、搜索、点击…) 用户行为轨迹、流量日志

日志数据内容:
1)访问的系统属性: 操作系统、浏览器等等。
2)访问特征:点击的url、从哪个url跳转过来的(referer)、页面上的停留时间等
3)访问信息:session_id、访问ip(访问城市)等

数据处理流程

1)数据采集
Flume: web日志写入到HDFS

2)数据清洗
脏数据
Spark、Hive、MapReduce 或者是其他的一些分布式计算框架
清洗完之后的数据可以存放在HDFS(Hive/Spark SQL)

3)数据处理
按照我们的需要进行相应业务的统计和分析
Spark、Hive、MapReduce 或者是其他的一些分布式计算框架

4)处理结果入库
结果可以存放到RDBMS、NoSQL

5)数据的可视化
通过图形化展示的方式展现出来:饼图、柱状图、地图、折线图
ECharts、HUE、Zeppelin

六、Hadoop分布式集群搭建

  1. 前置准备:
    • 配置每台机器hostname,并将主机名和ip写入主节点hosts中
    • 配置ssh免密码登陆:ssh-keygen -t rsa
    • 在主节点机器中添加每台机器的sshkey:ssh-copy-id -i ~/.ssh/id_rsa.pub hostname
    • jdk安装,并配置环境变量
  2. 集群安装
    • hadoop安装:解压hadoop安装包,并配置环境变量
    • hadoop配置:参考2.4和3.4配置,需额外在yarn-site.xml中配置yarn.resourcemanager.hostname为主节点机器
    • slaves:配置从结点,顶格写入所有从结点机器的主机名
    • 分发安装包到所有结点机器中(包括jdk和hadoop):scp -r ~/app hostname2:~/app
    • 分发环境变量并生效:.bash_profile:scp ~/.bash_profile ~/
  3. 启动集群
    • 在主节点对namenode格式化:bin/hdfs namenode -format
    • 在主节点启动:sbin/start-all.sh
    • 验证:jps,主节点5个进程,从结点2个进程。访问50070(hdfs)或者8088(yarn)端口。
    • 停止:sbin/stop-all.sh

七、前沿技术Spark、Flink、Beam

  1. 分布式计算框架spark.
    spark启动:spark-shell –master local[2]
    spark实现wc:

     val file = sc.textFile("file:///home/hadoop/data/hello.txt")   
     val a = file.flatMap(line => line.split(" "))   
     val b = a.map(word => (word,1))   
     val c = b.reduceByKey(_ + _)   
     c.collect
    	
     sc.textFile("file:///home/hadoop/data/hello.txt").flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _).collect
    
  2. 分布式计算框架flink.

     ./bin/flink run ./examples/batch/WordCount.jar \
      --input file:///home/hadoop/data/hello.txt --output file:///home/hadoop/tmp/flink_wc_output	
    
  3. 统一编程模型beam.

    wc的beam程序以多种不同的runner运行。

    • direct方式运行 mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args=”–inputFile=/home/hadoop/data/hello.txt –output=counts” \ -Pdirect-runner

    • spark方式运行 mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args=”–runner=SparkRunner –inputFile=/home/hadoop/data/hello.txt –output=counts” -Pspark-runner

- flink方式运行
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=FlinkRunner --inputFile=/home/hadoop/data/hello.txt --output=counts" -Pflink-runner