第二套完整解析-MarkDown
模块一:平台搭建与运维
1. 对三台环境更新主机名,并配置hosts文件
附原文:https://www.cnblogs.com/xupccc/p/9838719.html
# 步骤如下
1. 进入宿主机的/var/lib/docker/containers路径下
2. 运行 docker ps -a
3. 关闭docker服务 service docker stop
[root@ehdpc ~]# docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
7751668f2e8b my_docker_image "/bin/bash" 5 months ago Up About a minute slave2
739138768ee3 my_docker_image "/bin/bash" 5 months ago Up About a minute slave1
a21b573c926b openeuler/openeuler:latest "/bin/bash" 5 months ago Up About a minute master
4. 打开与NAMES对应的CONTAINER ID文件夹
5. 修改其中的hosts和hostname文件
6. 开启docker服务 service docker start
7. 启动容器docker start master slave1 slave2
以下是hosts文件的内容
127.0.0.1 localhost
::1 localhost ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters
192.168.157.200 node01
192.168.157.201 node02
192.168.157.202 node03
以下是hostname中的内容
node01 # 在node01主机(原master)上修改
node02 # 在node02主机(原slave1)上修改
node03 # 在node03主机(原slave2)上修改
2. 以node01作为时钟源并进行时间同步
附参考:https://blog.csdn.net/czxylzl/article/details/102718256
# 步骤如下
1. 安装NTP服务 yum install ntp
2. 编辑/etc/ntp.conf
3. 写入以下内容
restrict 192.168.157.200 mask 255.255.255.0 nomodify notrap
server 192.168.157.200
4. 编辑vi /etc/sysconfig/ntpd
SYNC_HWLOCK=yes
3. 实现三台机器间的免秘登陆
# 生成RSA密钥
ssh-keygen -t rsa
# 传输RSA公钥到其他节点
ssh-copy-id node01
ssh-copy-id node02
ssh-copy-id node03
4. 将JDK复制到容器node01中
# 步骤如下
1. 先在node01到node03中创建/root/software文件夹
mkdir -p /root/software
2. 在宿主机中使用scp传输
docker cp /root/jdk-8u212-linux-x64.tar.gz node01:/root/software
5. 将JDK安装包解压到/root/software 路径中
tar -zxvf /root/software/jdk-8u212-linux-x64.tar.gz -C /root/software
6. 设置JDK环境变量并使其生效
# 步骤如下
1. 在/etc/profile,d添加jdk.sh文件
vi /etc/profile.d/jdk.sh
2. 添加以下内容
export JAVA_HOME=/root/software/jdk1.8.0_212
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib.tools.jar
3. 生效JDK变量
source /etc/profile
7. 分别执行“java -version”和“javac”命令
java -version
javac
8. 将Hadoop解压到/root/software
tar -zxvf /root/software/hadoop-3.1.3.tar.gz -C /root/software
9. 将解压包分发至 node02、node03 中
scp -r /root/software/hadoop-3.1.3 node02:/root/software
scp -r /root/software/hadoop-3.1.3 node03:/root/software
10. 创建目录
mkdir -p /root/software/hadoop-3.1.3/hadoopDatas/tempDatas
mkdir -p /root/software/hadoop-3.1.3/hadoopDatas/namenodeDatas
mkdir -p /root/software/hadoop-3.1.3/hadoopDatas/datanodeDatas
mkdir -p /root/software/hadoop-3.1.3/hadoopDatas/dfs/nn/snn/edits
mkdir -p /root/software/hadoop-3.1.3/hadoopDatas/dfs/snn/edits
mkdir -p /root/software/hadoop-3.1.3/hadoopDatas/dfs/nn/name
11. 配置hadoop-env.sh
export JAVA_HOME=/root/software/jdk1.8.0_212
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
12. 配置core-site.xml
# 删除原有的<configuration>追加以下代码
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://node01:9000</value>
</property>
</configuration>
13. 配置hdfs-site.xml
# 删除原有的<configuration>追加以下代码
<configuration>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/root/software/hadoop-3.1.3/hadoopDatas/namenodeDatas</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:/root/software/hadoop-3.1.3/hadoopDatas/datanodeDatas</value>
</property>
</configuration>
13. 配置mapred-site.xml
# 删除原有的<configuration>追加以下代码
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>yarn.app.mapreduce.am.env</name>
<value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value>
</property>
<property>
<name>mapreduce.map.env</name>
<value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value>
</property>
<property>
<name>mapreduce.reduce.env</name>
<value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value>
</property>
</configuration>
14. 配置yarn-site.xml
# 删除原有的<configuration>追加以下代码
<configuration>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>node01</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux.services.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
</configuration>
15. 配置workers
1. 删除原有的localhost添加以下代码
node01
node02
node03
16. 配置Hadoop环境变量
# 创建Hadoop.sh文件
vi /etc/profile.d/hadoop.sh
# 添加以下代码
export HADOOP_HOME=/root/software/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
# 保存并退出
source /etc/profile # 生效环境变量
17. 传输hadoop和jdk的主文件和环境变量文件到别的节点
scp -r /root/software/hadoop-3.1.3 node02:/root/software
scp -r /root/software/hadoop-3.1.3 node03:/root/software
scp -r /root/software/jdk1.8.0_212 node02:/root/software
scp -r /root/software/jdk1.8.0_212 node03:/root/software
scp -r /etc/profile.d/jdk.sh node02:/etc/profile.d
scp -r /etc/profile.d/jdk.sh node03:/etc/profile.d
scp -r /etc/profile.d/hadoop.sh node02:/etc/profile.d
scp -r /etc/profile.d/hadoop.sh node03:/etc/profile.d
18. 在node02和node03中生效
source /etc/profile
19. 主节点格式化集群
hdfs namenode -format
20. 开启集群,查看各节点进程
start-all.sh
mapred --daemon start historyserver
jps
21. 从宿主机/root 目录下将hive和MySQL-connector复制到容器 node03 中的/root/software 路径中
docker cp /root/apache-hive-3.1.2-bin.tar.gz node03:/root/software
docker cp /root/mysql-connector-java-5.1.37.jar node03:/root/software
22. 将Hive安装包解压到/root/software目录下
tar -zxvf /root/software/apache-hive-3.1.2-bin.tar.gz -C /root/software
23. 配置Hive环境变量
vi /etc/profile.d/hive.sh
export HIVE_HOME=/root/software/apache-hive-3.1.2-bin
export PATH=$PATH:$HIVE_HOME/bin
source /etc/profile
24. 查看Hive版本
hive --version
25. 配置hive-env.sh
export HADOOP_HOME=/root/software/hadoop-3.1.3
export HIVE_CONF_DIR=/root/software/apache-hive-3.1.2-bin/conf
export HIVE_AUX_JARS_PATH=/root/software/apache-hive-3.1.2-bin/lib
26. 解压mysql-connector包**(已废除)**
tar -zxvf /root/software/mysql-connector-java-5.1.47.tar.gz -C /root/software
复制jar文件到/root/software/apache-hive-3.1.2-bin/lib中
cp /root/software/mysql-connector-java-5.1.37.jar /root/software/apache-hive-3.1.2-bin/lib
27. 配置hive-site.xml文件
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://node03:3306/hive?createDatabaseIfNotExist=true&useSSL=false&useUnicode=true&characterEncoding=UTF8</value>
<description>Metastore database connection URL</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Metastore database connection driver class</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>Metastore database connection username</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
<description>Metastore database connection password</description>
</property>
<property>
<name>datanucleus.autoCreateSchema</name>
<value>false</value>
</property>
<property>
<name>datanucleus.fixedDatastore</name>
<value>true</value>
</property>
</configuration>
28. 复制更新的guava文件
rm -rf /root/software/apache-hive-3.1.2-bin/lib/guava-19.0.jar
cp /root/software/hadoop-3.1.3/share/hadoop/common/lib/guava-27.0-jre.jar /root/software/apache-hive-3.1.2-bin/lib/
29. 删除log4j Jar包
rm /root/software/apache-hive-3.1.2-bin/lib/log4j-slf4j-impl-2.10.0.jar
30. 初始化 Hive 元数据
schematool -dbType mysql -initSchema
31. 从宿主机/root 目录下将flume复制到容器node03中
docker cp /root/apache-flume-1.11.0-bin.tar.gz node03:/root/software
32. 将 node03节点Flume安装包解压到/root/software目录下
tar -zxvf /root/software/apache-flume-1.11.0-bin.tar.gz -C /root/software
33. 配置Flume环境变量
vi /etc/profile.d/flume.sh
export FLUME_HOME=/root/software/apache-flume-1.11.0-bin
export PATH=$PATH:$FLUME_HOME/bin
# 保存并退出
source /etc/profile # 生效环境变量
34. 进入/root/software/apache/flume-1.11.0-bin/conf目录下
cd /root/software/apache-flume-1.11.0-bin/conf
35. 复制flume-env.sh.template,并改名为flume-env.sh
cp flume-env.sh.template flume-env.sh
36. 编辑flume-env.sh
# 添加以下代码
JAVA_HOME=/root/software/jdk1.8.0_212
37. 执行命令 flume-ng version
flume-ng version
38. 在主机 node3 上安装 mysql-community-server
# 在宿主机中执行
docker cp /root/mysql-5.7.25-1.el7.x86_64.rpm-bundle.tar node03:/root/software
# 步骤如下
1. 解压Mysql的压缩包
tar -xvf /root/software/mysql-5.7.25-1.el7.x86_64.rpm-bundle.tar -C /root/software
2. 进入software目录下
cd /root/software
3. 安装MySQL依赖
yum install libncurses* perl libnuma* libaio* perl-JSON
4. 安装MySQL
rpm -ivh ./*.rpm
5. 初始化MySQL服务
mysqld --initialize
6. 查看mysql初始密码
grep password /var/log/mysqld.log
7. 为MySQL文件夹赋权
chown -R mysql:mysql /var/lib/mysql
chown -R mysql:mysql /var/lib/mysql
8. 启动MySQL服务
nohup /usr/sbin/mysqld --defaults-file=/etc/my.cnf --user=mysql &
9. 使用初始密码进入mysql
mysql -uroot -p
39. 修改本地密码为“123456”
ALTER USER root@'localhost' IDENTIFIED BY '123456';
40. 开启 MySQL 远程连接权限
-- 进入MySQL库
use mysql
-- 开启MySQL远程连接权限
UPDATE user SET user.Host='%' WHERE user.user='root';
-- 刷新权限
FLUSH PRIVILEGES;
41. 创建root_sl_src库
create database root_sl_src;
42. 将root_sl_src.sql导入MySQL对应数据库 root_sl_src
mysql -uroot -p'123456' root_sl_src < /root/eduhq/equipment/root_sl_src.sql
43. 创建root_sl_ugoogds_src库
create database root_sl_ugoogds_src;
44. 将root_sl_ugoogds_src.sql导入MySQL对应数据库 root_sl_ugoogds_src
-- 修改 SQL 模式以允许无效日期
SET GLOBAL sql_mode = 'ALLOW_INVALID_DATES';
mysql -uroot -p'123456' root_sl_ugoogds_src < /root/eduhq/equipment/root_sl_ugoogds_src.sql
45. 修改字段 province_id 为 24 的记录的province_name,修改为‘内蒙古自治区’
use root_sl_src
update province set province_name='内蒙古自治区' where province_id=24;
47. 删除字段 city_id 为 142 的记录
use root_sl_src
delete from city where city_id=142;
模块二:数据获取与处理
1. 创建数据库equipment_dashboard
create database if not exists equipment_dashboard;
2. 创建省份表ods_province
use equipment_dashboard
CREATE TABLE IF NOT EXISTS ods_province (
province_id INT,
province_name STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
3. 将province.csv数据导入ods_province
LOAD DATA LOCAL INPATH '/root/eduhq/file/province.csv' INTO TABLE ods_province;
4. 创建城市表ods_city
CREATE TABLE IF NOT EXISTS ods_city (
city_id INT,
city_name STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
5. 将city.csv数据导入ods_city
LOAD DATA LOCAL INPATH '/root/eduhq/file/city.csv' INTO TABLE ods_city;
6. 使用put命令将工单故障记录数据上传至HDFS
hdfs dfs -mkdir -p /source/logs/sms_so_failure_logs/
hdfs dfs -put /root/eduhq/file/sms_so_failure_logs.txt /source/logs/sms_so_failure_logs/
7. 使用put命令将设备数据上传至HDFS
hdfs dfs -mkdir -p /source/logs/province_iso/
hdfs dfs -put /root/eduhq/file/province_iso_shell.txt /source/logs/province_iso/
8. /root/eduhq/equipment/目录下工单故障记录表sms_so_failure_logs.txt进行文本清洗
awk 'NR > 1 { $1=""; $2=""; print substr($0, 3) }' /root/eduhq/equipment/sms_so_failure_logs.txt > /root/eduhq/equipment/sms_so_failure_logs_shell.txt
sed '1d' /root/eduhq/equipment/province_iso.txt | cut -d ',' -f 3- > /root/eduhq/equipment/province_iso_shell.txt
9. 对空字段进行分类,统一处理,添加设备状态标签“未获取”
// 以下是FormatLogJob.java文件内容
package com.wz.test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.log4j.BasicConfigurator;
import java.io.IOException;
public class FormatLogJob {
public static class Map extends Mapper<LongWritable, Text, Text, NullWritable> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] data = value.toString().split(","); // 假设数据以逗号分隔
Text outputValue;
if (data.length != 23) { // 根据实际字段数量调整expectedNumberOfColumns的值
outputValue = new Text("Invalid data: " + value); // 错误处理,如设置为无效数据
} else {
if (data[0].contains("ID")) {
return;
}
String formattedTime = formatTime(data[18]); // timeColumnIndex是时间字段的索引
for (int i = 0; i < data.length; i++) {
if (data[i].isEmpty()) { // 空字段处理,这里设置为未获取
data[i] = "未获取";
}
data[18] = formattedTime;
}
outputValue = new Text(String.join(",", data).replace("\"","")); // 将处理后的数据用逗号连接起来作为输出值
}
context.write(outputValue, NullWritable.get()); // 输出键值对
}
}
public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable> {
public void reduce(Iterable<Text> values, NullWritable key, Context context) throws IOException, InterruptedException {
for (Text val : values) {
context.write(val, key); // 输出键值对到HDFS
}
}
}
// 时间格式化方法,根据实际需求进行修改
private static String formatTime(String timeStr) {
// 实现时间格式化逻辑
return timeStr.replace("/", "-");
}
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "root");
Configuration conf = new Configuration();
BasicConfigurator.configure();
Job job = Job.getInstance(conf, "Hive to MapReduce job");
job.setJarByClass(FormatLogJob.class); // 设置jar文件的入口类为当前类
job.setMapperClass(Map.class); // 设置Mapper类为自定义的Map类
job.setReducerClass(Reduce.class); // 设置Reducer类为自定义的Reduce类
job.setOutputKeyClass(Text.class); // 设置输出键的类型为Text类型
job.setOutputValueClass(NullWritable.class); // 设置输出值的类型为Text类型
job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class); // 设置输入格式为TextInputFormat类型
job.setOutputFormatClass(TextOutputFormat.class); // 设置输出格式为TextOutputFormat类型
FileInputFormat.addInputPath(job, new Path("hdfs://60.0.0.5:9000/user/hive/warehouse/equipment_dashboard.db/sms_so_failure_logs/sms_so_failure_logs.csv")); // 设置输入路径,这里替换为你的Hive表路径inputPath需要替换为你的Hive表路径
FileOutputFormat.setOutputPath(job, new Path("hdfs://60.0.0.5:9000/source/mr/sms_so_failure_logs")); // 设置输出路径,这里替换为你想要保存到HDFS的路径outputPath需要替换为你想要保存到HDFS
job.waitForCompletion(true); // 开始执行任务并等待完成
}
}
hadoop jar /root/eduhq/FormatLogJob.jar com.wz.test.FormatLogJob
hdfs dfs -ls /source/mr/sms_so_failure_logs/
10. Hive 中创建库equipment_dashboard
create database if not exists equipment_dashboard;
use equipment_dashboard;
11. 将标注后的数据上传至表ods_sms_so_failure_log
hive -e "
LOAD DATA INPATH '/source/mr/sms_so_failure_logs'
INTO TABLE ods_sms_so_failure_logs;
"
12. 将标注后的数据上传至表ods_province_iso
CREATE TABLE IF NOT EXISTS ods_province_iso (
id INT,
iso_code STRING,
province_name STRING,
region_name STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
hive -e "
LOAD DATA INPATH '/source/mr/province_iso/'
INTO TABLE ods_province_iso;
"
13. 统计设备数量
select count(*) from ods_province_iso group by iso_code;
14. 统计用户数量
select count(*) from ods_sms_so_failure_logs group by TENANT_ID;