一、初识Hadoop
-
分布式文件系统-HDFS
-
资源调度系统-YARN
-
分布式计算框架-MapReduce
- 优势
- 高可靠性
- 高扩展性
- 其他:存储在廉价机器;成熟的生态圈
- 生态系统

- 常用发行版及选型
- apache hadoop
- CDH:不开源,安装方便,jar冲突较少。https://archive.cloudera.com/cdh5/cdh/5/
- HDP:原装hadoop,开源
二、分布式文件系统HDFS
- HDFS概述.
- 源于谷歌2003年论文GFS的开源实现。
- 设计目标:非常巨大的分布式文件系统;运行在普通廉价的硬件上;易扩展,为用户提供不错的文件存储服务
- HDFS架构.
- Client:切分文件;访问HDFS;与NameNode交互,获取文件位置信息;与DataNode交互,读取和写入数据。
- NameNode:Master节点,负责客户端请求的响应;负责元数据(文件名称、副本系数、block存档的datanode)的管理
- DataNode:Slave节点,存储实际的数据,汇报存储信息给NameNode。
- Secondary NameNode:辅助NameNode,分担其工作量;定期合并fsimage和fsedits,推送给NameNode;紧急情况下,可辅助恢复NameNode,但Secondary NameNode并非NameNode的热备。
- NameNode+N个DataNode:最好部署在不同的节点上。
- HDFS副本机制.
副本存放策略:默认3个副本,第一个节点存在当前操作客户端机架上,另外两个节点存在另一台机架上
- HDFS环境搭建.
- 需要预先装的软件:java(jdk,配置环境变量)、SSH
- 解压hadoop安装包:
tar -zxvf hadoop-2.6.0-cdh5.7.0.tar.gz -C ~/hadoop
-
配置文件修改(~/hadoop/etc/hadoop/):
1.hadoop-env.sh:加入JAVA_HOME环境. export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home 2.core-site.xml <property> <name>fs.defaultFS</name> <value>hdfs://hadoop-1:9000</value> </property> <property> <name>io.file.buffer.size</name> <value>131072</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/hadoop </value> </property> 3.hdfs-site.xml <property> <name>dfs.replication</name> <value>2</value> </property> 4.slaves:存储datanode的所有机器的hostname集群环境)
- 启动hdfs.
- 格式化文件系统(仅第一次执行):bin/hdfs namenode -format
- 启动hdfs:sbin.start-dfs.sh
- 验证是否启动成功:jps、http://hadoop-1:50070
- HDFS shell.
- 查看文件:hadoop fs -ls /
- 上传文件 hadoop fs -put file /
- 查看文件内容:hadoop fs -text /file hadoop fs -cat /file
- 创建目录:hadoop fs -mkdir /test
- 递归创建目录: hadoop fs -mkdir -p /test/a/b
- 查看递归目录: hadoop fs -ls -R /
- 从本地拷贝: hadoop fs -copyFromLocal file /test/a/b/desfile
- 从文件系统拷贝:hadoop fs -get /test/a/b/desfile
- 删除文件系统文件:hadoop fs -rm file
- 删除文件系统目录:hadoop fs -rm -rf dir
-
Java API操作HDFS.
1.引入依赖(pom.xml) <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <hadoop.version>2.8.4</hadoop.version> </properties> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos</url> </repository> 2.测试示例 /** * 创建hdfs目录 * * @throws IOException */ @Test public void mkdir() throws IOException { fileSystem.mkdirs(new Path("/hdfs/api")); } /** * 创建文件 * * @throws IOException */ @Test public void create() throws IOException { FSDataOutputStream output = fileSystem.create(new Path("/hdfs/api/test.txt")); output.write("hello hadoop api".getBytes()); output.flush(); output.close(); } /** * 查看HDFS内容 * * @throws IOException */ @Test public void cat() throws IOException { FSDataInputStream inputStream = fileSystem.open(new Path("/hdfs/api/test.txt")); IOUtils.copyBytes(inputStream, System.out, 1024); inputStream.close(); } /** * 重命名HDFS文件 * * @throws IOException */ @Test public void rename() throws IOException { Path oldPath = new Path("/hdfs/api/test.txt"); Path newPath = new Path("/hdfs/api/newtest.txt"); fileSystem.rename(oldPath, newPath); } /** * 上传文件到HDFS * * @throws IOException */ @Test public void copyFromLocalFile() throws IOException { Path localPath = new Path("/Users/zhanghaichao/hdu/1.cpp"); Path hdfsPath = new Path("/hdfs/api/1.cpp"); fileSystem.copyFromLocalFile(localPath, hdfsPath); } /** * 上传文件到HDFS,带进度条 * * @throws IOException */ @Test public void copyFromLocalFileWithProgress() throws IOException { Path hdfsPath = new Path("/hdfs/api/hadoop-2.8.4.tar.gz"); InputStream inputStream = new BufferedInputStream(new FileInputStream( new File("/Users/zhanghaichao/Downloads/hadoop-2.8.4.tar.gz"))); FSDataOutputStream outputStream = fileSystem.create(hdfsPath, new Progressable() { @Override public void progress() { System.out.println("."); } }); IOUtils.copyBytes(inputStream, outputStream, 4096); } /** * 下载HDFS文件 * * @throws IOException */ @Test public void copyToLocalFile() throws IOException { Path localPath = new Path("/Users/zhanghaichao/1.cpp"); Path hdfsPath = new Path("/hdfs/api/1.cpp"); fileSystem.copyToLocalFile(hdfsPath, localPath); } /** * 查看HDFS目录下的所有文件 * * @throws IOException */ @Test public void listFiles() throws IOException { Path hdfsPath = new Path("/hdfs/api"); FileStatus[] fileStatuses = fileSystem.listStatus(hdfsPath); for (FileStatus fileStatus : fileStatuses) { String isDir = fileStatus.isDirectory() ? " is directory," : " is file,"; short replication = fileStatus.getReplication(); //hdfs shell 上传,副本系数使用配置文件中配置的,java api上传使用默认的(3) System.out.println(fileStatus.getPath().toString() + isDir + " replication is " + replication + " length is :" + fileStatus.getLen()); } } /** * 删除HDFS文件 * * @throws IOException */ @Test public void delete() throws IOException { Path hdfsPath = new Path("/fz"); fileSystem.delete(hdfsPath,false); //默认递归删除,加参数控制 } @Before public void setUp() throws URISyntaxException, IOException { configuration = new Configuration(); fileSystem = FileSystem.get(new URI(HDFS_PATH), configuration); System.out.println("\n hdfs app setup\n"); } @After public void tearDown() { configuration = null; fileSystem = null; System.out.println("\nhdfs app tear down\n"); }
- HDFS文件读写流程
通过漫画轻松掌握HDFS工作原理 - HDFS优缺点
优点:a.数据冗余,硬件容错;b.处理流失的数据访问;c.适合存储大量数据;d.存储在廉价机器。
缺点:a.低延迟的数据访问;b.不适合小文件存储(元数据较多,namenode压力大)。
三、资源调度框架YARN
-
YARN产生背景.
Hadoop1.x时:
MapReduce:Master/Slave架构,1个JobTracker带多个TaskTrackerJobTracker: 负责资源管理和作业调度.
TaskTracker: 定期向JobTracker汇报本节点的健康状况、资源使用情况、作业执行情况; 接收来自JobTracker的命令:启动任务/杀死任务.缺点:单点(jt)压力大,不易扩展;资源利用率低,运维成本高(跨集群数据移动)
YARN:不同计算框架可以共享同一个HDFS集群上的数据,享受整体的资源调度
XXX on YARN的好处: 与其他计算框架共享集群资源,按资源需要分配,进而提高集群资源的利用率。
XXX: Spark/MapReduce/Storm/Flink - YARN概述
- Yet Another Resource Negotiator。
- 通用的资源管理系统。
- 为上层的应用提供统一的资源和调度。
- Yet Another Resource Negotiator。
- 通用的资源管理系统。
-
YARN架构
- ResourceManager: RM
整个集群同一时间提供服务的RM只有一个,负责集群资源的统一管理和调度
处理客户端的请求: 提交一个作业、杀死一个作业
监控我们的NM,一旦某个NM挂了,那么该NM上运行的任务需要告诉我们的AM来如何进行处理-
NodeManager: NM
整个集群中有多个,负责自己本身节点资源管理和使用
定时向RM汇报本节点的资源使用情况
接收并处理来自RM的各种命令:启动Container
处理来自AM的命令
单个节点的资源管理 -
ApplicationMaster: AM
每个应用程序对应一个:RM、Spark,负责应用程序的管理
为应用程序向RM申请资源(core、memory),分配给内部task
需要与NM通信:启动/停止task,task是运行在container里面,AM也是运行在container里面 -
Container
封装了CPU、Memory等资源的一个容器
是一个任务运行环境的抽象 -
Client
提交作业
查询作业的运行进度
杀死作业
-
-
YARN环境搭建
1)mapred-site.xml,mr指定框架YARN <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> 2)yarn-site.xml <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> 3) 启动YARN相关的进程 sbin/start-yarn.sh 4)验证 jps ResourceManager NodeManager http://localhost:8088 5)停止YARN相关的进程 sbin/stop-yarn.sh
-
提交mr作业到YARN上运行:
/Users/zhanghaichao/hadoop-2.8.4/share/hadoop/mapreducehadoop-mapreduce-examples-2.8.4.jar.hadoop jar
hadoop jar hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar pi 2 3
四、分布式处理框架MapReduce
- MapReduce概述。
- 源于google论文,2004.12发表。 Hadoop MapReduce是Google MapReduce的克隆版
- 优点:海量数据离线处理&易开发&易运行
- 缺点:不能实时流式计算
- MapReduce编程模型。
- 作业拆分成map阶段和reduce阶段.
- Map阶段:map tasks.
- Reduce极端:reduce tasks.
- 核心概念。
Split:交由MapReduce作业来处理的数据块,是MapReduce中最小的计算单元
HDFS:blocksize 是HDFS中最小的存储单元 128M
默认情况下:他们两是一一对应的,也可以手工设置他们之间的关系(不建议)
InputFormat: 将我们的输入数据进行分片(split): InputSplit[] getSplits(JobConf job, int numSplits)
TextInputFormat: 处理文本格式的数据 OutputFormat: 输出
- MapReduce架构。
-
MapReduce1.x的架构
1)JobTracker: JT
作业的管理者
将作业分解成一堆的任务:Task(MapTask和ReduceTask)
将任务分派给TaskTracker运行
作业的监控、容错处理(task作业挂了,重启task的机制)
在一定的时间间隔内,JT没有收到TT的心跳信息,TT可能是挂了,TT上运行的任务会被指派到其他TT上去执行2)TaskTracker: TT 任务的执行者
在TT上执行我们的Task(MapTask和ReduceTask)
会与JT进行交互:执行/启动/停止作业,发送心跳信息给JT3)MapTask
自己开发的map任务交由该Task
解析每条记录的数据,交给自己的map方法处理
将map的输出结果写到本地磁盘(有些作业只仅有map没有reduce==>HDFS)4)ReduceTask
将Map Task输出的数据进行读取
按照数据进行分组传给我们自己编写的reduce方法处理
输出结果写到HDFS -
MapReduce2.x的架构
-
- MapReduce实现WordCount(使用IDEA+Maven开发)。
详细代码- 开发
- 编译:mvn clean package -DskipTests
- 上传到服务器:scp target/hadoop-train-1.0.jar hadoop@hadoop000:~/lib
- 运行 hadoop jar hadoop-demo-1.0-SNAPSHOT.jar com.cc.hadoop.mapreduce.WordCount hdfs://localhost:9000/in hdfs://localhost:9000/out
相同的代码和脚本再次执行,会报错 security.UserGroupInformation: PriviledgedActionException as:hadoop (auth:SIMPLE) cause: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://hadoop000:8020/output/wc already exists Exception in thread “main” org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://hadoop000:8020/output/wc already exists
注:在MR中,输出文件是不能事先存在的
1)先手工通过shell的方式将输出文件夹先删除 hadoop fs -rm -r /output/wc
2) 在代码中完成自动删除功能:Path outputPath = new Path(args[1]); FileSystem fileSystem = FileSystem.get(configuration); if(fileSystem.exists(outputPath)){ fileSystem.delete(outputPath, true); System.out.println("output file exists, but is has deleted"); } - MapReduce编程之Combiner 在map端做一个reduce操作,简化数据,提高传输速度 - MapReduce编程之Partitioner [详细代码](https://github.com/zhanghaichao520/hadoopdemo/blob/master/src/main/java/com/cc/hadoop/mapreduce/Parititoner.java)
- jobHistory
- 记录运行完的MR信息到指定的HDFS目录下
- 默认不开启
- 启动方法:
cd hadoop-2.8.4/etc/hadoop/ vim mapred-site.xml 加入下面四组标签: <property> <name>mapreduce.jobhistory.address</name> <value>localhost:10020</value> </property> <property> <name>mapreduce.jobhistory.webapp.address</name> <value>localhost:19888</value> </property> <property> <name>mapreduce.jobhistory.done-dir</name> <value>/history/done</value> </property> <property> <name>mapreduce.jobhistory.intermediate-done-dir</name> <value>/history/done_intermediate</value> </property> vim yarn-site.xml 加入下面的标签 <property> <name>yarn.log-aggregation-enable</name> <value>true</value> </property>
五、Hadoop项目实战
用户行为日志:用户每次访问网站时所有的行为数据(访问、浏览、搜索、点击…) 用户行为轨迹、流量日志
日志数据内容:
1)访问的系统属性: 操作系统、浏览器等等。
2)访问特征:点击的url、从哪个url跳转过来的(referer)、页面上的停留时间等
3)访问信息:session_id、访问ip(访问城市)等
数据处理流程
1)数据采集
Flume: web日志写入到HDFS
2)数据清洗
脏数据
Spark、Hive、MapReduce 或者是其他的一些分布式计算框架
清洗完之后的数据可以存放在HDFS(Hive/Spark SQL)
3)数据处理
按照我们的需要进行相应业务的统计和分析
Spark、Hive、MapReduce 或者是其他的一些分布式计算框架
4)处理结果入库
结果可以存放到RDBMS、NoSQL
5)数据的可视化
通过图形化展示的方式展现出来:饼图、柱状图、地图、折线图
ECharts、HUE、Zeppelin
六、Hadoop分布式集群搭建
- 前置准备:
- 配置每台机器hostname,并将主机名和ip写入主节点hosts中
- 配置ssh免密码登陆:ssh-keygen -t rsa
- 在主节点机器中添加每台机器的sshkey:ssh-copy-id -i ~/.ssh/id_rsa.pub hostname
- jdk安装,并配置环境变量
- 集群安装
- hadoop安装:解压hadoop安装包,并配置环境变量
- hadoop配置:参考2.4和3.4配置,需额外在yarn-site.xml中配置yarn.resourcemanager.hostname为主节点机器
- slaves:配置从结点,顶格写入所有从结点机器的主机名
- 分发安装包到所有结点机器中(包括jdk和hadoop):scp -r ~/app hostname2:~/app
- 分发环境变量并生效:.bash_profile:scp ~/.bash_profile ~/
- 启动集群
- 在主节点对namenode格式化:bin/hdfs namenode -format
- 在主节点启动:sbin/start-all.sh
- 验证:jps,主节点5个进程,从结点2个进程。访问50070(hdfs)或者8088(yarn)端口。
- 停止:sbin/stop-all.sh
七、前沿技术Spark、Flink、Beam
-
分布式计算框架spark.
spark启动:spark-shell –master local[2]
spark实现wc:val file = sc.textFile("file:///home/hadoop/data/hello.txt") val a = file.flatMap(line => line.split(" ")) val b = a.map(word => (word,1)) val c = b.reduceByKey(_ + _) c.collect sc.textFile("file:///home/hadoop/data/hello.txt").flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _).collect
-
分布式计算框架flink.
./bin/flink run ./examples/batch/WordCount.jar \ --input file:///home/hadoop/data/hello.txt --output file:///home/hadoop/tmp/flink_wc_output
-
统一编程模型beam.
wc的beam程序以多种不同的runner运行。
-
direct方式运行 mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args=”–inputFile=/home/hadoop/data/hello.txt –output=counts” \ -Pdirect-runner
-
spark方式运行 mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args=”–runner=SparkRunner –inputFile=/home/hadoop/data/hello.txt –output=counts” -Pspark-runner
-
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--runner=FlinkRunner --inputFile=/home/hadoop/data/hello.txt --output=counts" -Pflink-runner