一、初识Hadoop
- 
    分布式文件系统-HDFS 
- 
    资源调度系统-YARN  
- 
    分布式计算框架-MapReduce  
- 优势
    - 高可靠性
- 高扩展性
- 其他:存储在廉价机器;成熟的生态圈
 
- 生态系统
 
- 常用发行版及选型
    - apache hadoop
- CDH:不开源,安装方便,jar冲突较少。https://archive.cloudera.com/cdh5/cdh/5/
- HDP:原装hadoop,开源
 
二、分布式文件系统HDFS
- HDFS概述.
    - 源于谷歌2003年论文GFS的开源实现。
- 设计目标:非常巨大的分布式文件系统;运行在普通廉价的硬件上;易扩展,为用户提供不错的文件存储服务
 
- HDFS架构.
    - Client:切分文件;访问HDFS;与NameNode交互,获取文件位置信息;与DataNode交互,读取和写入数据。
- NameNode:Master节点,负责客户端请求的响应;负责元数据(文件名称、副本系数、block存档的datanode)的管理
- DataNode:Slave节点,存储实际的数据,汇报存储信息给NameNode。
- Secondary NameNode:辅助NameNode,分担其工作量;定期合并fsimage和fsedits,推送给NameNode;紧急情况下,可辅助恢复NameNode,但Secondary NameNode并非NameNode的热备。
- NameNode+N个DataNode:最好部署在不同的节点上。
 
- HDFS副本机制.
  副本存放策略:默认3个副本,第一个节点存在当前操作客户端机架上,另外两个节点存在另一台机架上 副本存放策略:默认3个副本,第一个节点存在当前操作客户端机架上,另外两个节点存在另一台机架上
- HDFS环境搭建.
    - 需要预先装的软件:java(jdk,配置环境变量)、SSH
- 解压hadoop安装包: 
 tar -zxvf hadoop-2.6.0-cdh5.7.0.tar.gz -C ~/hadoop
- 
        配置文件修改(~/hadoop/etc/hadoop/): 1.hadoop-env.sh:加入JAVA_HOME环境. export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home 2.core-site.xml <property> <name>fs.defaultFS</name> <value>hdfs://hadoop-1:9000</value> </property> <property> <name>io.file.buffer.size</name> <value>131072</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/hadoop </value> </property> 3.hdfs-site.xml <property> <name>dfs.replication</name> <value>2</value> </property> 4.slaves:存储datanode的所有机器的hostname集群环境)
- 启动hdfs.
        - 格式化文件系统(仅第一次执行):bin/hdfs namenode -format
- 启动hdfs:sbin.start-dfs.sh
- 验证是否启动成功:jps、http://hadoop-1:50070
 
 
- HDFS shell.
    - 查看文件:hadoop fs -ls /
- 上传文件 hadoop fs -put file /
- 查看文件内容:hadoop fs -text /file hadoop fs -cat /file
- 创建目录:hadoop fs -mkdir /test
- 递归创建目录: hadoop fs -mkdir -p /test/a/b
- 查看递归目录: hadoop fs -ls -R /
- 从本地拷贝: hadoop fs -copyFromLocal file /test/a/b/desfile
- 从文件系统拷贝:hadoop fs -get /test/a/b/desfile
- 删除文件系统文件:hadoop fs -rm file
- 删除文件系统目录:hadoop fs -rm -rf dir
 
- 
    Java API操作HDFS. 1.引入依赖(pom.xml) <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <hadoop.version>2.8.4</hadoop.version> </properties> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos</url> </repository> 2.测试示例 /** * 创建hdfs目录 * * @throws IOException */ @Test public void mkdir() throws IOException { fileSystem.mkdirs(new Path("/hdfs/api")); } /** * 创建文件 * * @throws IOException */ @Test public void create() throws IOException { FSDataOutputStream output = fileSystem.create(new Path("/hdfs/api/test.txt")); output.write("hello hadoop api".getBytes()); output.flush(); output.close(); } /** * 查看HDFS内容 * * @throws IOException */ @Test public void cat() throws IOException { FSDataInputStream inputStream = fileSystem.open(new Path("/hdfs/api/test.txt")); IOUtils.copyBytes(inputStream, System.out, 1024); inputStream.close(); } /** * 重命名HDFS文件 * * @throws IOException */ @Test public void rename() throws IOException { Path oldPath = new Path("/hdfs/api/test.txt"); Path newPath = new Path("/hdfs/api/newtest.txt"); fileSystem.rename(oldPath, newPath); } /** * 上传文件到HDFS * * @throws IOException */ @Test public void copyFromLocalFile() throws IOException { Path localPath = new Path("/Users/zhanghaichao/hdu/1.cpp"); Path hdfsPath = new Path("/hdfs/api/1.cpp"); fileSystem.copyFromLocalFile(localPath, hdfsPath); } /** * 上传文件到HDFS,带进度条 * * @throws IOException */ @Test public void copyFromLocalFileWithProgress() throws IOException { Path hdfsPath = new Path("/hdfs/api/hadoop-2.8.4.tar.gz"); InputStream inputStream = new BufferedInputStream(new FileInputStream( new File("/Users/zhanghaichao/Downloads/hadoop-2.8.4.tar.gz"))); FSDataOutputStream outputStream = fileSystem.create(hdfsPath, new Progressable() { @Override public void progress() { System.out.println("."); } }); IOUtils.copyBytes(inputStream, outputStream, 4096); } /** * 下载HDFS文件 * * @throws IOException */ @Test public void copyToLocalFile() throws IOException { Path localPath = new Path("/Users/zhanghaichao/1.cpp"); Path hdfsPath = new Path("/hdfs/api/1.cpp"); fileSystem.copyToLocalFile(hdfsPath, localPath); } /** * 查看HDFS目录下的所有文件 * * @throws IOException */ @Test public void listFiles() throws IOException { Path hdfsPath = new Path("/hdfs/api"); FileStatus[] fileStatuses = fileSystem.listStatus(hdfsPath); for (FileStatus fileStatus : fileStatuses) { String isDir = fileStatus.isDirectory() ? " is directory," : " is file,"; short replication = fileStatus.getReplication(); //hdfs shell 上传,副本系数使用配置文件中配置的,java api上传使用默认的(3) System.out.println(fileStatus.getPath().toString() + isDir + " replication is " + replication + " length is :" + fileStatus.getLen()); } } /** * 删除HDFS文件 * * @throws IOException */ @Test public void delete() throws IOException { Path hdfsPath = new Path("/fz"); fileSystem.delete(hdfsPath,false); //默认递归删除,加参数控制 } @Before public void setUp() throws URISyntaxException, IOException { configuration = new Configuration(); fileSystem = FileSystem.get(new URI(HDFS_PATH), configuration); System.out.println("\n hdfs app setup\n"); } @After public void tearDown() { configuration = null; fileSystem = null; System.out.println("\nhdfs app tear down\n"); }
- HDFS文件读写流程 
 通过漫画轻松掌握HDFS工作原理
- HDFS优缺点 
 优点:a.数据冗余,硬件容错;b.处理流失的数据访问;c.适合存储大量数据;d.存储在廉价机器。
 缺点:a.低延迟的数据访问;b.不适合小文件存储(元数据较多,namenode压力大)。
三、资源调度框架YARN
- 
    YARN产生背景. 
 Hadoop1.x时:
 MapReduce:Master/Slave架构,1个JobTracker带多个TaskTrackerJobTracker: 负责资源管理和作业调度. 
 TaskTracker: 定期向JobTracker汇报本节点的健康状况、资源使用情况、作业执行情况; 接收来自JobTracker的命令:启动任务/杀死任务.缺点:单点(jt)压力大,不易扩展;资源利用率低,运维成本高(跨集群数据移动) YARN:不同计算框架可以共享同一个HDFS集群上的数据,享受整体的资源调度 
 XXX on YARN的好处: 与其他计算框架共享集群资源,按资源需要分配,进而提高集群资源的利用率。
 XXX: Spark/MapReduce/Storm/Flink
- YARN概述
    - Yet Another Resource Negotiator。 
 	- 通用的资源管理系统。
 - 为上层的应用提供统一的资源和调度。
 
- Yet Another Resource Negotiator。 
 	- 通用的资源管理系统。
- 
    YARN架构 
 - ResourceManager: RM
 整个集群同一时间提供服务的RM只有一个,负责集群资源的统一管理和调度
 处理客户端的请求: 提交一个作业、杀死一个作业
 监控我们的NM,一旦某个NM挂了,那么该NM上运行的任务需要告诉我们的AM来如何进行处理- 
        NodeManager: NM 
 整个集群中有多个,负责自己本身节点资源管理和使用
 定时向RM汇报本节点的资源使用情况
 接收并处理来自RM的各种命令:启动Container
 处理来自AM的命令
 单个节点的资源管理
- 
        ApplicationMaster: AM 
 每个应用程序对应一个:RM、Spark,负责应用程序的管理
 为应用程序向RM申请资源(core、memory),分配给内部task
 需要与NM通信:启动/停止task,task是运行在container里面,AM也是运行在container里面
- 
        Container 
 封装了CPU、Memory等资源的一个容器
 是一个任务运行环境的抽象
- 
        Client 
 提交作业
 查询作业的运行进度
 杀死作业
 
- 
        
- 
    YARN环境搭建 1)mapred-site.xml,mr指定框架YARN <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> 2)yarn-site.xml <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> 3) 启动YARN相关的进程 sbin/start-yarn.sh 4)验证 jps ResourceManager NodeManager http://localhost:8088 5)停止YARN相关的进程 sbin/stop-yarn.sh
- 
    提交mr作业到YARN上运行: 
 /Users/zhanghaichao/hadoop-2.8.4/share/hadoop/mapreducehadoop-mapreduce-examples-2.8.4.jar.hadoop jar hadoop jar hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar pi 2 3 
四、分布式处理框架MapReduce
- MapReduce概述。
    - 源于google论文,2004.12发表。 Hadoop MapReduce是Google MapReduce的克隆版
- 优点:海量数据离线处理&易开发&易运行
- 缺点:不能实时流式计算
 
- MapReduce编程模型。
    - 作业拆分成map阶段和reduce阶段.
- Map阶段:map tasks.
- Reduce极端:reduce tasks.
- 核心概念。
 Split:交由MapReduce作业来处理的数据块,是MapReduce中最小的计算单元
 HDFS:blocksize 是HDFS中最小的存储单元 128M
 默认情况下:他们两是一一对应的,也可以手工设置他们之间的关系(不建议)
 InputFormat: 将我们的输入数据进行分片(split): InputSplit[] getSplits(JobConf job, int numSplits)
 TextInputFormat: 处理文本格式的数据 OutputFormat: 输出
 
- MapReduce架构。
    - 
        MapReduce1.x的架构 
 1)JobTracker: JT
 作业的管理者
 将作业分解成一堆的任务:Task(MapTask和ReduceTask)
 将任务分派给TaskTracker运行
 作业的监控、容错处理(task作业挂了,重启task的机制)
 在一定的时间间隔内,JT没有收到TT的心跳信息,TT可能是挂了,TT上运行的任务会被指派到其他TT上去执行2)TaskTracker: TT 任务的执行者 
 在TT上执行我们的Task(MapTask和ReduceTask)
 会与JT进行交互:执行/启动/停止作业,发送心跳信息给JT3)MapTask 
 自己开发的map任务交由该Task
 解析每条记录的数据,交给自己的map方法处理
 将map的输出结果写到本地磁盘(有些作业只仅有map没有reduce==>HDFS)4)ReduceTask 
 将Map Task输出的数据进行读取
 按照数据进行分组传给我们自己编写的reduce方法处理
 输出结果写到HDFS
- 
        MapReduce2.x的架构  
 
- 
        
- MapReduce实现WordCount(使用IDEA+Maven开发)。
 详细代码- 开发
- 编译:mvn clean package -DskipTests
- 上传到服务器:scp target/hadoop-train-1.0.jar hadoop@hadoop000:~/lib
- 运行 hadoop jar hadoop-demo-1.0-SNAPSHOT.jar com.cc.hadoop.mapreduce.WordCount hdfs://localhost:9000/in hdfs://localhost:9000/out
 相同的代码和脚本再次执行,会报错 security.UserGroupInformation: PriviledgedActionException as:hadoop (auth:SIMPLE) cause: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://hadoop000:8020/output/wc already exists Exception in thread “main” org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://hadoop000:8020/output/wc already exists 注:在MR中,输出文件是不能事先存在的 
 1)先手工通过shell的方式将输出文件夹先删除 hadoop fs -rm -r /output/wc
 2) 在代码中完成自动删除功能:Path outputPath = new Path(args[1]); FileSystem fileSystem = FileSystem.get(configuration); if(fileSystem.exists(outputPath)){ fileSystem.delete(outputPath, true); System.out.println("output file exists, but is has deleted"); } - MapReduce编程之Combiner 在map端做一个reduce操作,简化数据,提高传输速度 - MapReduce编程之Partitioner [详细代码](https://github.com/zhanghaichao520/hadoopdemo/blob/master/src/main/java/com/cc/hadoop/mapreduce/Parititoner.java)
- jobHistory
    - 记录运行完的MR信息到指定的HDFS目录下
- 默认不开启
- 启动方法:
 cd hadoop-2.8.4/etc/hadoop/ vim mapred-site.xml 加入下面四组标签: <property> <name>mapreduce.jobhistory.address</name> <value>localhost:10020</value> </property> <property> <name>mapreduce.jobhistory.webapp.address</name> <value>localhost:19888</value> </property> <property> <name>mapreduce.jobhistory.done-dir</name> <value>/history/done</value> </property> <property> <name>mapreduce.jobhistory.intermediate-done-dir</name> <value>/history/done_intermediate</value> </property> vim yarn-site.xml 加入下面的标签 <property> <name>yarn.log-aggregation-enable</name> <value>true</value> </property>
五、Hadoop项目实战
用户行为日志:用户每次访问网站时所有的行为数据(访问、浏览、搜索、点击…) 用户行为轨迹、流量日志
日志数据内容: 
1)访问的系统属性: 操作系统、浏览器等等。 
2)访问特征:点击的url、从哪个url跳转过来的(referer)、页面上的停留时间等    
3)访问信息:session_id、访问ip(访问城市)等
数据处理流程
1)数据采集   
	Flume: web日志写入到HDFS
2)数据清洗 
	脏数据 
	Spark、Hive、MapReduce 或者是其他的一些分布式计算框架   
	清洗完之后的数据可以存放在HDFS(Hive/Spark SQL)
3)数据处理  
	按照我们的需要进行相应业务的统计和分析 
	Spark、Hive、MapReduce 或者是其他的一些分布式计算框架
4)处理结果入库 
	结果可以存放到RDBMS、NoSQL
5)数据的可视化  
	通过图形化展示的方式展现出来:饼图、柱状图、地图、折线图  
	ECharts、HUE、Zeppelin
六、Hadoop分布式集群搭建
- 前置准备:
    - 配置每台机器hostname,并将主机名和ip写入主节点hosts中
- 配置ssh免密码登陆:ssh-keygen -t rsa
- 在主节点机器中添加每台机器的sshkey:ssh-copy-id -i ~/.ssh/id_rsa.pub hostname
- jdk安装,并配置环境变量
 
- 集群安装
    - hadoop安装:解压hadoop安装包,并配置环境变量
- hadoop配置:参考2.4和3.4配置,需额外在yarn-site.xml中配置yarn.resourcemanager.hostname为主节点机器
- slaves:配置从结点,顶格写入所有从结点机器的主机名
- 分发安装包到所有结点机器中(包括jdk和hadoop):scp -r ~/app hostname2:~/app
- 分发环境变量并生效:.bash_profile:scp ~/.bash_profile ~/
 
- 启动集群
    - 在主节点对namenode格式化:bin/hdfs namenode -format
- 在主节点启动:sbin/start-all.sh
- 验证:jps,主节点5个进程,从结点2个进程。访问50070(hdfs)或者8088(yarn)端口。
- 停止:sbin/stop-all.sh
 
七、前沿技术Spark、Flink、Beam
- 
    分布式计算框架spark. 
 spark启动:spark-shell –master local[2]
 spark实现wc:val file = sc.textFile("file:///home/hadoop/data/hello.txt") val a = file.flatMap(line => line.split(" ")) val b = a.map(word => (word,1)) val c = b.reduceByKey(_ + _) c.collect sc.textFile("file:///home/hadoop/data/hello.txt").flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _).collect
- 
    分布式计算框架flink. ./bin/flink run ./examples/batch/WordCount.jar \ --input file:///home/hadoop/data/hello.txt --output file:///home/hadoop/tmp/flink_wc_output
- 
    统一编程模型beam. wc的beam程序以多种不同的runner运行。 - 
        direct方式运行 mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args=”–inputFile=/home/hadoop/data/hello.txt –output=counts” \ -Pdirect-runner 
- 
        spark方式运行 mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args=”–runner=SparkRunner –inputFile=/home/hadoop/data/hello.txt –output=counts” -Pspark-runner 
 
- 
        
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=FlinkRunner --inputFile=/home/hadoop/data/hello.txt --output=counts" -Pflink-runner
