第二套完整解析

第二套完整解析

第二套完整解析-MarkDown 模块一:平台搭建与运维 1. 对三台环境更新主机名,并配置hosts文件 附原文:https://www.cnblogs.com/xupccc/p/9838719.html # 步骤如下 1. 进入宿主机的/var/lib/docker/containers路径

第二套完整解析-MarkDown

模块一:平台搭建与运维

1. 对三台环境更新主机名,并配置hosts文件

附原文:https://www.cnblogs.com/xupccc/p/9838719.html


# 步骤如下
1. 进入宿主机的/var/lib/docker/containers路径下
2. 运行 docker ps -a
3. 关闭docker服务 service docker stop 


[root@ehdpc ~]# docker ps -a
CONTAINER ID        IMAGE                        COMMAND             CREATED             STATUS              PORTS               NAMES
7751668f2e8b        my_docker_image              "/bin/bash"         5 months ago        Up About a minute                       slave2
739138768ee3        my_docker_image              "/bin/bash"         5 months ago        Up About a minute                       slave1
a21b573c926b        openeuler/openeuler:latest   "/bin/bash"         5 months ago        Up About a minute                       master

alt text

4. 打开与NAMES对应的CONTAINER ID文件夹
5. 修改其中的hosts和hostname文件
6. 开启docker服务 service docker start
7. 启动容器docker start master slave1 slave2 

以下是hosts文件的内容

127.0.0.1   localhost
::1 localhost ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters
192.168.157.200 node01
192.168.157.201 node02
192.168.157.202 node03

以下是hostname中的内容

node01  # 在node01主机(原master)上修改
node02  # 在node02主机(原slave1)上修改
node03  # 在node03主机(原slave2)上修改

2. 以node01作为时钟源并进行时间同步

附参考:https://blog.csdn.net/czxylzl/article/details/102718256


# 步骤如下
1. 安装NTP服务 yum install ntp
2. 编辑/etc/ntp.conf


3. 写入以下内容
​restrict 192.168.157.200 mask 255.255.255.0 nomodify notrap
​server 192.168.157.200


4. 编辑​vi /etc/sysconfig/ntpd

​SYNC_HWLOCK=yes

3. 实现三台机器间的免秘登陆

# 生成RSA密钥
ssh-keygen -t rsa


# 传输RSA公钥到其他节点
ssh-copy-id node01
ssh-copy-id node02
ssh-copy-id node03

4. 将JDK复制到容器node01中

# 步骤如下
1. 先在node01到node03中创建/root/software文件夹 

mkdir -p /root/software

2. 在宿主机中使用scp传输 

docker cp /root/jdk-8u212-linux-x64.tar.gz node01:/root/software

5. 将JDK安装包解压到/root/software 路径中

tar -zxvf /root/software/jdk-8u212-linux-x64.tar.gz -C /root/software

6. 设置JDK环境变量并使其生效

# 步骤如下
1. 在/etc/profile,d添加jdk.sh文件

vi /etc/profile.d/jdk.sh

2. 添加以下内容
export JAVA_HOME=/root/software/jdk1.8.0_212
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib.tools.jar

3. 生效JDK变量

source /etc/profile

7. 分别执行“java -version”和“javac”命令

java -version


javac

8. 将Hadoop解压到/root/software

tar -zxvf /root/software/hadoop-3.1.3.tar.gz -C /root/software 

9. 将解压包分发至 node02、node03 中

scp -r /root/software/hadoop-3.1.3 node02:/root/software

scp -r /root/software/hadoop-3.1.3 node03:/root/software

10. 创建目录

mkdir -p /root/software/hadoop-3.1.3/hadoopDatas/tempDatas
mkdir -p /root/software/hadoop-3.1.3/hadoopDatas/namenodeDatas
mkdir -p /root/software/hadoop-3.1.3/hadoopDatas/datanodeDatas
mkdir -p /root/software/hadoop-3.1.3/hadoopDatas/dfs/nn/snn/edits
mkdir -p /root/software/hadoop-3.1.3/hadoopDatas/dfs/snn/edits
mkdir -p /root/software/hadoop-3.1.3/hadoopDatas/dfs/nn/name

11. 配置hadoop-env.sh

export JAVA_HOME=/root/software/jdk1.8.0_212
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root

12. 配置core-site.xml

# 删除原有的<configuration>追加以下代码

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://node01:9000</value>
    </property>
</configuration>

13. 配置hdfs-site.xml

# 删除原有的<configuration>追加以下代码

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>3</value>
    </property>
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>file:/root/software/hadoop-3.1.3/hadoopDatas/namenodeDatas</value>
    </property>
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>file:/root/software/hadoop-3.1.3/hadoopDatas/datanodeDatas</value>
    </property>
</configuration>

13. 配置mapred-site.xml

# 删除原有的<configuration>追加以下代码

<configuration>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
    <property>
        <name>yarn.app.mapreduce.am.env</name>
        <value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value>
    </property>
    <property>
        <name>mapreduce.map.env</name>
        <value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value>
    </property>
    <property>
        <name>mapreduce.reduce.env</name>
        <value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value>
    </property>
</configuration>

14. 配置yarn-site.xml

# 删除原有的<configuration>追加以下代码

<configuration>
    <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>node01</value>
    </property>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <property>
        <name>yarn.nodemanager.aux.services.mapreduce.shuffle.class</name>
        <value>org.apache.hadoop.mapred.ShuffleHandler</value>
    </property>
</configuration>

15. 配置workers

1. 删除原有的localhost添加以下代码

node01
node02
node03

16. 配置Hadoop环境变量

# 创建Hadoop.sh文件
vi /etc/profile.d/hadoop.sh

# 添加以下代码
export HADOOP_HOME=/root/software/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

# 保存并退出

source /etc/profile # 生效环境变量

17. 传输hadoop和jdk的主文件和环境变量文件到别的节点

scp -r /root/software/hadoop-3.1.3 node02:/root/software
scp -r /root/software/hadoop-3.1.3 node03:/root/software
scp -r /root/software/jdk1.8.0_212 node02:/root/software
scp -r /root/software/jdk1.8.0_212 node03:/root/software
scp -r /etc/profile.d/jdk.sh node02:/etc/profile.d
scp -r /etc/profile.d/jdk.sh node03:/etc/profile.d
scp -r /etc/profile.d/hadoop.sh node02:/etc/profile.d
scp -r /etc/profile.d/hadoop.sh node03:/etc/profile.d

18. 在node02和node03中生效

source /etc/profile

19. 主节点格式化集群

hdfs namenode -format

20. 开启集群,查看各节点进程

start-all.sh


mapred --daemon start historyserver


jps

21. 从宿主机/root 目录下将hive和MySQL-connector复制到容器 node03 中的/root/software 路径中

docker cp /root/apache-hive-3.1.2-bin.tar.gz node03:/root/software


docker cp /root/mysql-connector-java-5.1.37.jar node03:/root/software

22. 将Hive安装包解压到/root/software目录下

tar -zxvf /root/software/apache-hive-3.1.2-bin.tar.gz -C /root/software

23. 配置Hive环境变量

vi /etc/profile.d/hive.sh


export HIVE_HOME=/root/software/apache-hive-3.1.2-bin
export PATH=$PATH:$HIVE_HOME/bin


source /etc/profile

24. 查看Hive版本

hive --version

25. 配置hive-env.sh

export HADOOP_HOME=/root/software/hadoop-3.1.3
export HIVE_CONF_DIR=/root/software/apache-hive-3.1.2-bin/conf
export HIVE_AUX_JARS_PATH=/root/software/apache-hive-3.1.2-bin/lib

26. 解压mysql-connector包**(已废除)**

tar -zxvf /root/software/mysql-connector-java-5.1.47.tar.gz -C /root/software

复制jar文件到/root/software/apache-hive-3.1.2-bin/lib中

cp /root/software/mysql-connector-java-5.1.37.jar /root/software/apache-hive-3.1.2-bin/lib

27. 配置hive-site.xml文件

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://node03:3306/hive?createDatabaseIfNotExist=true&amp;useSSL=false&amp;useUnicode=true&amp;characterEncoding=UTF8</value>
    <description>Metastore database connection URL</description>
</property>

<property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>com.mysql.jdbc.Driver</value>
    <description>Metastore database connection driver class</description>
</property>

<property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>root</value>
    <description>Metastore database connection username</description>
</property>

<property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>123456</value>
    <description>Metastore database connection password</description>
</property>

<property>
    <name>datanucleus.autoCreateSchema</name>
    <value>false</value>
</property>

<property>
    <name>datanucleus.fixedDatastore</name>
    <value>true</value>
</property>

</configuration>

28. 复制更新的guava文件

rm -rf /root/software/apache-hive-3.1.2-bin/lib/guava-19.0.jar 

cp /root/software/hadoop-3.1.3/share/hadoop/common/lib/guava-27.0-jre.jar /root/software/apache-hive-3.1.2-bin/lib/

29. 删除log4j Jar包

rm /root/software/apache-hive-3.1.2-bin/lib/log4j-slf4j-impl-2.10.0.jar

30. 初始化 Hive 元数据

schematool -dbType mysql -initSchema

31. 从宿主机/root 目录下将flume复制到容器node03中

docker cp /root/apache-flume-1.11.0-bin.tar.gz node03:/root/software

32. 将 node03节点Flume安装包解压到/root/software目录下

tar -zxvf /root/software/apache-flume-1.11.0-bin.tar.gz -C /root/software

33. 配置Flume环境变量

vi /etc/profile.d/flume.sh


export FLUME_HOME=/root/software/apache-flume-1.11.0-bin
export PATH=$PATH:$FLUME_HOME/bin

# 保存并退出

source /etc/profile # 生效环境变量

34. 进入/root/software/apache/flume-1.11.0-bin/conf目录下

cd /root/software/apache-flume-1.11.0-bin/conf

35. 复制flume-env.sh.template,并改名为flume-env.sh

cp flume-env.sh.template flume-env.sh

36. 编辑flume-env.sh

# 添加以下代码

JAVA_HOME=/root/software/jdk1.8.0_212

37. 执行命令 flume-ng version

flume-ng version

38. 在主机 node3 上安装 mysql-community-server

# 在宿主机中执行
docker cp /root/mysql-5.7.25-1.el7.x86_64.rpm-bundle.tar node03:/root/software


# 步骤如下
1. 解压Mysql的压缩包

tar -xvf /root/software/mysql-5.7.25-1.el7.x86_64.rpm-bundle.tar -C /root/software


2. 进入software目录下

cd /root/software


3. 安装MySQL依赖

yum install libncurses* perl libnuma* libaio* perl-JSON


4. 安装MySQL

rpm -ivh ./*.rpm


5. 初始化MySQL服务

mysqld --initialize


6. 查看mysql初始密码

grep password /var/log/mysqld.log


7. 为MySQL文件夹赋权

chown -R mysql:mysql /var/lib/mysql

chown -R mysql:mysql /var/lib/mysql


8. 启动MySQL服务

nohup /usr/sbin/mysqld --defaults-file=/etc/my.cnf --user=mysql &


9. 使用初始密码进入mysql
mysql -uroot -p

39. 修改本地密码为“123456”

ALTER USER root@'localhost' IDENTIFIED BY '123456';

40. 开启 MySQL 远程连接权限

-- 进入MySQL库
use mysql

-- 开启MySQL远程连接权限
UPDATE user SET user.Host='%' WHERE user.user='root';


-- 刷新权限

FLUSH PRIVILEGES;

41. 创建root_sl_src库

create database root_sl_src;

42. 将root_sl_src.sql导入MySQL对应数据库 root_sl_src

mysql -uroot -p'123456' root_sl_src < /root/eduhq/equipment/root_sl_src.sql

43. 创建root_sl_ugoogds_src库

create database root_sl_ugoogds_src;

44. 将root_sl_ugoogds_src.sql导入MySQL对应数据库 root_sl_ugoogds_src

-- 修改 SQL 模式以允许无效日期

SET GLOBAL sql_mode = 'ALLOW_INVALID_DATES';


mysql -uroot -p'123456' root_sl_ugoogds_src < /root/eduhq/equipment/root_sl_ugoogds_src.sql

45. 修改字段 province_id 为 24 的记录的province_name,修改为‘内蒙古自治区’

use root_sl_src


update province set province_name='内蒙古自治区' where province_id=24;

47. 删除字段 city_id 为 142 的记录

use root_sl_src


delete from city where city_id=142;

模块二:数据获取与处理

1. 创建数据库equipment_dashboard

create database if not exists equipment_dashboard;

2. 创建省份表ods_province

use equipment_dashboard 


CREATE TABLE IF NOT EXISTS ods_province (
    province_id INT,
    province_name STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

3. 将province.csv数据导入ods_province

LOAD DATA LOCAL INPATH '/root/eduhq/file/province.csv' INTO TABLE ods_province;

4. 创建城市表ods_city

CREATE TABLE IF NOT EXISTS ods_city (
    city_id INT,
    city_name STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

5. 将city.csv数据导入ods_city

LOAD DATA LOCAL INPATH '/root/eduhq/file/city.csv' INTO TABLE ods_city;

6. 使用put命令将工单故障记录数据上传至HDFS

hdfs dfs -mkdir -p /source/logs/sms_so_failure_logs/


hdfs dfs -put /root/eduhq/file/sms_so_failure_logs.txt /source/logs/sms_so_failure_logs/

7. 使用put命令将设备数据上传至HDFS

hdfs dfs -mkdir -p /source/logs/province_iso/


hdfs dfs -put /root/eduhq/file/province_iso_shell.txt /source/logs/province_iso/

8. /root/eduhq/equipment/目录下工单故障记录表sms_so_failure_logs.txt进行文本清洗

awk 'NR > 1 { $1=""; $2=""; print substr($0, 3) }' /root/eduhq/equipment/sms_so_failure_logs.txt > /root/eduhq/equipment/sms_so_failure_logs_shell.txt


sed '1d' /root/eduhq/equipment/province_iso.txt | cut -d ',' -f 3- > /root/eduhq/equipment/province_iso_shell.txt

9. 对空字段进行分类,统一处理,添加设备状态标签“未获取”

// 以下是FormatLogJob.java文件内容

package com.wz.test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.log4j.BasicConfigurator;

import java.io.IOException;

public class FormatLogJob {
    public static class Map extends Mapper<LongWritable, Text, Text, NullWritable> {
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] data = value.toString().split(","); // 假设数据以逗号分隔
            Text outputValue;
            if (data.length != 23) { // 根据实际字段数量调整expectedNumberOfColumns的值
                outputValue = new Text("Invalid data: " + value); // 错误处理,如设置为无效数据
            } else {
                if (data[0].contains("ID")) {
                    return;
                }
                String formattedTime = formatTime(data[18]); // timeColumnIndex是时间字段的索引
                for (int i = 0; i < data.length; i++) {
                    if (data[i].isEmpty()) { // 空字段处理,这里设置为未获取
                        data[i] = "未获取";
                    }
                    data[18] = formattedTime;
                }
                outputValue = new Text(String.join(",", data).replace("\"","")); // 将处理后的数据用逗号连接起来作为输出值
            }
            context.write(outputValue, NullWritable.get()); // 输出键值对
        }
    }

    public static class Reduce extends Reducer<Text, NullWritable, Text, NullWritable> {
        public void reduce(Iterable<Text> values, NullWritable key, Context context) throws IOException, InterruptedException {
            for (Text val : values) {
                context.write(val, key); // 输出键值对到HDFS
            }
        }
    }

    // 时间格式化方法,根据实际需求进行修改
    private static String formatTime(String timeStr) {
        // 实现时间格式化逻辑
        return timeStr.replace("/", "-");
    }

    public static void main(String[] args) throws Exception {
        System.setProperty("HADOOP_USER_NAME", "root");
        Configuration conf = new Configuration();
        BasicConfigurator.configure();
        Job job = Job.getInstance(conf, "Hive to MapReduce job");
        job.setJarByClass(FormatLogJob.class); // 设置jar文件的入口类为当前类
        job.setMapperClass(Map.class); // 设置Mapper类为自定义的Map类
        job.setReducerClass(Reduce.class); // 设置Reducer类为自定义的Reduce类
        job.setOutputKeyClass(Text.class); // 设置输出键的类型为Text类型
        job.setOutputValueClass(NullWritable.class); // 设置输出值的类型为Text类型
        job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class); // 设置输入格式为TextInputFormat类型
        job.setOutputFormatClass(TextOutputFormat.class); // 设置输出格式为TextOutputFormat类型
        FileInputFormat.addInputPath(job, new Path("hdfs://60.0.0.5:9000/user/hive/warehouse/equipment_dashboard.db/sms_so_failure_logs/sms_so_failure_logs.csv")); // 设置输入路径,这里替换为你的Hive表路径inputPath需要替换为你的Hive表路径
        FileOutputFormat.setOutputPath(job, new Path("hdfs://60.0.0.5:9000/source/mr/sms_so_failure_logs")); // 设置输出路径,这里替换为你想要保存到HDFS的路径outputPath需要替换为你想要保存到HDFS
        job.waitForCompletion(true); // 开始执行任务并等待完成
    }
}


hadoop jar /root/eduhq/FormatLogJob.jar com.wz.test.FormatLogJob


hdfs dfs -ls /source/mr/sms_so_failure_logs/

10. Hive 中创建库equipment_dashboard

create database if not exists equipment_dashboard;


use equipment_dashboard;

11. 将标注后的数据上传至表ods_sms_so_failure_log

hive -e "
LOAD DATA INPATH '/source/mr/sms_so_failure_logs' 
INTO TABLE ods_sms_so_failure_logs;
"

12. 将标注后的数据上传至表ods_province_iso

CREATE TABLE IF NOT EXISTS ods_province_iso (
    id INT,
    iso_code STRING,
    province_name STRING,
    region_name STRING
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;


hive -e "
LOAD DATA INPATH '/source/mr/province_iso/' 
INTO TABLE ods_province_iso;
"

13. 统计设备数量

select count(*) from ods_province_iso group by iso_code;

14. 统计用户数量

select count(*) from ods_sms_so_failure_logs group by TENANT_ID;

模块三:业务分析与可视化

LICENSED UNDER CC BY-NC-SA 4.0