emmm-spark商业数据分析案例讲解

记录

这篇记录为啥以emmm开头呢?哈哈(~~)~因为我的阿里云还有一个月就要到期了。这意味着又是一波放血~~不过对于一个忠实的阿里粉来说,还是用了自己一个月的生活费,又续租了一年。(这东西有点上瘾,用上了就不想放弃了~~哈哈,这是真好用,所以不能对不起这一个月的生活费不是!当然,如果以后真的有机会进了阿里,阿里爸爸会不会给我免费服务的机会呀!😂)

Jupyter notebook 阿里云搭建

刚说完,要充分利用,那就赶快利用起来吧! 本来是搭建在我MAC上的,现在就把它迁移到阿里云上吧!也能让我随时随地的进行编程了~~

名称 Jupyter 是由Julia、Python和R三个单词组合而成的。Jupyter Notebook是一种Web应用,它能让用户将说明文本、数学方程、代码和可视化内容全部组合到一个易于共享的文档中,非常方便研究和教学。Jupyter Notebook特别适合做数据处理,其用途可以包括数据清理和探索、可视化、机器学习和大数据分析。

服务原理

在这里插入图片描述

Anaconda

Anaconda是一个开源的Python发行版本,其包含了conda、Python等180多个科学包及其依赖项。Anaconda中已经集成了Jupyter Notebook,因此,可以首先安装Anaconda,然后再配置Jupyter Notebook。

Anaconda的安装过程,这里就不在记录了!

安装好Anaconda之后,检查里面是否已经集成了jupyter Notebook。一般这里面都是集成了的,如果没有的话,那就在安装一下吧!
因为我搭建在了云上,这里的安全性还是要保护一下的。因为我的目的是随时随地,随机都可以进行编程!

1
2
cd /home/hadoop/anaconda3/bin
./python

这里来生成一下密码:

1
2
>>>from notebook.auth import passwd
>>>passwd()

然后系统会生成一个密码字符串,比如sha1;记得将这个密码串保存下来,用来配置你的密码。

退出python之后,找到配置文件。

1
vim ~/.jupyter/jupyter_notebook_config.py

1
2
3
4
5
c.NotebookApp.ip='*'                     # 就是设置所有ip皆可访问  
c.NotebookApp.password = 'sha1:7c7990750e83:965c1466a4fab0849051ca5f3c5661110813795b' # 上面复制的那个sha密文'
c.NotebookApp.open_browser = False # 禁止自动打开浏览器
c.NotebookApp.port =8888 # 端口
c.NotebookApp.notebook_dir = '/home/hadoop/jupyternotebook' #设置Notebook启动进入的目录

配置完成之后,使用jupyter notebook启动即可!这时候在本机访问是没有问题的,如果想要通过其他电脑访问还需要进行下一步的策略!

教训

还记得HDFS实验中,有一个问题没有解决吗?哈哈哈 其实有点傻了。
在配置notebook的时候,我使用的是8888端口,阿里云是对这些端口进行保护了的。主要有两层保护:第一层是阿里云本身的防火墙,第二层是控制台的安全组规则。这个经验本来是在mysql的配置的时候解决过的!但是忘记了。所以在执行HDFS分布式文件读取的时候,一直没有成功!但是这里配置的时候,忽然想起了这个事情,所以就把这个问题解决了。HDFS 使用本机用户分布式读取也没问题了。但是写权限还是不够的。

案例讲解

数据来源:YELP数据集仅用来学习

json 数据集

这里的数据集格式是属于NoSQL的中的文档数据集格式之一的json数据集格式。
因为,这也是我第一次对这类数据进行处理,这里做一个简单的学习记录。

  1. 简介
    JSON:一种与开发语言无关的、轻量级的数据存储格式,全称JavaScript Object Notation,一种数据格式的标准规范,起初来源于JavaScript这门语言,后来随着使用的广泛,几乎每门开发语言都有处理JSON的API。
    优点:易于人的阅读和编写,易于程序解析与生产。(看了一下真的是便于阅读与处理)

    JSON样例:首先一个花括号{},整个代表一个对象,同时里面是一种Key-Value的存储形式,它还有不同的数据类型来区分。

  2. 数据类型表示
    数据结构:Object、Array
    基本类型:string,number,true,false,null
    (1)Object
    {key:value,key:value…}
    key:string类型
    value:任何基本类型或数据结构
    (2)Array
    [value,value…]
    value:任何基本类型或数据结构。
    比如:{“name”:”李广”, “values”:[1,2,45,”你好”] }

  3. 实际json数据格式
    这个格式的文件可以通过t x t或者网页等方式打开,当然还有许多工具。
    我这里用的是Xcode。好了这就是我们要处理的数据了。
    在这里插入图片描述

  4. 数据集提前分析
    我这里取出来一条信息,先来看看信息是啥样子的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{"business_id":"f9NumwFMBDn751xgFiRbNA",
"name":"The Range At Lake Norman",
"address":"10913 Bailey Rd",
"city":"Cornelius",
"state":"NC",
"postal_code":"28031",
"latitude":35.4627242,
"longitude":-80.8526119,
"stars":3.5,
"review_count":36,
"is_open":1,
"attributes":{"BusinessAcceptsCreditCards":"True","BikeParking":"True","GoodForKids":"False","BusinessParking":"{'garage': False, 'street': False, 'validated': False, 'lot': True, 'valet': False}","ByAppointmentOnly":"False","RestaurantsPriceRange2":"3"},
"categories":"Active Life, Gun\/Rifle Ranges, Guns & Ammo, Shopping",
"hours":{"Monday":"10:0-18:0",
"Tuesday":"11:0-20:0",
"Wednesday":"10:0-18:0",
"Thursday":"11:0-20:0",
"Friday":"11:0-20:0",
"Saturday":"11:0-20:0",
"Sunday":"13:0-18:0"}}

上面描述的就是这个文档中所有数据集中其中的一条数据。
可以发现json数据格式是按照键值对的形式进行存储的。

  • business_id : 商业店铺的id,用来唯一的标识这一个商铺
  • name : 商业店铺的名称
  • address : 商铺的位置
  • city : 城镇
  • state : 州县
  • postal_code : 邮政编码
  • latitude :纬度
  • longitude :经度
  • starts : 星级评分
  • review_count:评论数
  • is_open: 商家是否营业
  • attributes :商家所进行的业务
  • categories: 类别
  • hours : 商家的营业时间

主要任务

  1. 统计出所有的商业类别,并且进行计算出商业类别的Top 10(categories)。
  2. 每个城市各种商业类型的商家数量,并且计算出商家数量最多的十个城市(city,categories)。
  3. 消费者评价最多的10种商业类别(review_count,categories)。
  4. 最受消费者欢迎的10种商业类别(starts)。
  5. 商业额外业务的评价情况。

主要的解决步骤

  1. 第一步 对数据进行预处理,剔除异常值
  2. 第二步 进行数据集的分析
  3. 第三步 对数据进行可视化分析

代码解读

所需要的包

1
2
3
4
5
6
7
from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import os
import json
import pandas as pd
import matplotlib.pyplot as plt
  • pyspark
    PySpark 是 Spark 为 Python 开发者提供的 API

    • SparkContext
      是程序的入口点,负责连接Spark集群。集群通过SparkContext进行资源管理器的通信以及进行资源的申请、任务的分配和监控,需要从SparkSession中获得
    • SparkConf
      创建SparkContext前得使用SparkConf进行配置,以键值对形式进行
  • pyspark.sql
    一种解析传统SQL到大数据运算模型的引擎

    • SparkSession
      其为用户提供了一个统一的切入点来使用Spark的各项功能,并且允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序
    • DataFrame(表)是Spark SQL对结构化数据的抽象。可以将DataFrame看做RDD
    • Dataset是数据的分布式集合
  • pyspark.sql.function
    可用于dataFram的内置功能列表
  • os
    主要是针对操作系统的包,用来与操作系统进行交互,创建文件等。
  • json
    Python里的json模块主要用于“Python数据与JSON格式的数据间相互转换”
  • pandas
    pandas是一个强大的分析结构化数据的工具集;它的使用基础是Numpy(提供高性能的矩阵运算);用于数据挖掘和数据分析,同时也提供数据清洗功能。
  • matplotlib.pyplot
    是matlib的python版实现,用于绘图操作。

获取spark操作对象

建立一个会话对象

1
2
3
4
5
# 建立配置文件
#builder用来创建一个Sparksession实例
#config 配置相关
#getOrCreate 有就获取没有就创建
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()

数据清洗

下面对数据清洗 进行一个详细的分析。主要将数据中存在值缺失的数据,商家位置错误的数据(这里可以称为离群值)进行筛选。代码的详解解读,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 数据清洗
def data_process(raw_data_path):

# 通过数据路径,读取数据,这里是json数据
business = spark.read.json(raw_data_path)
#split(str, pattern, limit=-1)
#str – 我们要分割的字符串
#pattern 分割所用的正则表达式
split_col = f.split(business['categories'], ',')
#withColumn(colName, col)
#withColumn 负责在原有表的基础上新添加一列 colName是添加一列后的新的列名,col是新列的值 ?列名一样会怎么样处理(这里应该是会直接替代原来的,有待验证)

#filter 使用给定的条件 过滤行 这里是在所有的categories中过滤掉城市
# dropna 至少又一个空缺值的行都会被删除
business = business.withColumn("categories", split_col).filter(business["city"] != "").dropna()
#创建一个数据的临时视图,这个视图的周期与sparkSession相关联
business.createOrReplaceTempView("business")
#解析传统的sql到大数据运算模型 筛选出所需要的内容
#cache 按照默认的存储级别 持久化dataFrame
b_etl = spark.sql("SELECT business_id, name, city, state, latitude, longitude, stars, review_count, is_open, categories, attributes FROM business").cache()
#将筛选完的数据 在进行一个临时视图 方便下一步的sql分析
b_etl.createOrReplaceTempView("b_etl")
#这里是筛选掉 离群值 (距离洲内商家平均位置的欧式距离)
#b1 作为business的原始数据表 b2是在原始数据表的基础上计算每个州县的平均 经纬度 然后计算每一个商家在这个州县的欧式距离 并根据计算结果降序排列
outlier = spark.sql(
"SELECT b1.business_id, SQRT(POWER(b1.latitude - b2.avg_lat, 2) + POWER(b1.longitude - b2.avg_long, 2)) \
as dist FROM b_etl b1 INNER JOIN (SELECT state, AVG(latitude) as avg_lat, AVG(longitude) as avg_long \
FROM b_etl GROUP BY state) b2 ON b1.state = b2.state ORDER BY dist DESC")
#创建一个新的临时视图 outlier
outlier.createOrReplaceTempView("outlier")
#从b_et1中 筛选出所有的距离小于10 的值。这对应到经纬度上面 已经是比较大的值了
joined = spark.sql("SELECT b.* FROM b_etl b INNER JOIN outlier o ON b.business_id = o.business_id WHERE o.dist<10")
#将筛选后的数据进行存储 存储为parquet的格式
joined.write.parquet("file:///home/hadoop/yelp-etl/business_etl", mode="overwrite")

内容补充

  1. parquet 文件格式
    parquet采用不同的压缩比,能达到有效的压缩比。减少磁盘的使用。
    parquet结合spark,可以完美的实现支持分区过滤。如,需要某个产品某段时间的数据,则hdfs只取这个文件夹。
    列修剪:其实说简单点就是我们要取回的那些列的数据
  2. 离群值
    离群值(outlier),也称逸出值,是指在数据中有一个或几个数值与其他数值相比差异较大.
  3. withColumn(colName, col)
    这里要解决的问题是,我们在项目中遗留的一个问题,withColumn往数据里面执行添加操作的时候,当colName的名字与原表中的名字一样的时候,添加操作是如何进行的。

数据分析

这一部分主要对清理后的数据进行分析。

1
2
3
4
5
6
7
8
9
10
11
def analysis(data_path):
# 读取清理后的数据路径,存储的parque数据,并持久化到磁盘中
business = spark.read.parquet(data_path).cache()
#将新的数据建立一个新的视图 business
business.createOrReplaceTempView("business")

#从上面处理过的数据中,选择各种商业类别的洲县、城镇、星级评分、评论数
#explode 在sql语句中用来处理array类型的数据。
part_business = spark.sql("SELECT state, city, stars,review_count, explode(categories) AS category FROM business").cache()
#默认显示前 20 行 并提示only showing top 20 rows
part_business.show()

Start analysis data!
+—–+—————-+—–+————+——————–+
|state| city|stars|review_count| category|
+—–+—————-+—–+————+——————–+
| AZ| Phoenix| 4.0| 41| Notaries|
| AZ| Phoenix| 4.0| 41| Mailbox Centers|
| AZ| Phoenix| 4.0| 41| Printing Services|
| AZ| Phoenix| 4.0| 41| Local Services|
| AZ| Phoenix| 4.0| 41| Shipping Centers|
| NV| Las Vegas| 4.0| 681| Restaurants|
| NV| Las Vegas| 4.0| 681| Bars|
| NV| Las Vegas| 4.0| 681| Nightlife|
| NV| Las Vegas| 4.0| 681| American (New)|
| NV| Las Vegas| 4.0| 681| Seafood|
| QC|Vaudreuil-Dorion| 4.0| 7| Food|
| QC|Vaudreuil-Dorion| 4.0| 7| Bakeries|
| AZ| Scottsdale| 3.5| 5| Shopping|
| AZ| Scottsdale| 3.5| 5| Home & Garden|
| AZ| Scottsdale| 3.5| 5| Mattresses|
| NC| Charlotte| 3.0| 64| Gyms|
| NC| Charlotte| 3.0| 64| Sports Clubs|
| NC| Charlotte| 3.0| 64| Active Life|
| NC| Charlotte| 3.0| 64| Fitness & Instru…|
| ON| Mississauga| 4.5| 12| Restaurants|
+—–+—————-+—–+————+——————–+
only showing top 20 rows

上述就是对处理完的数据进行一个显示以及筛选。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#将筛选的商业类别的相关信息 创建一个新的视图
part_business.createOrReplaceTempView('part_business_1')
#选择洲 城市 星级评定,评论数
# REPLACE(category, ' ','')as new_category 的含义是对category 这一行进行‘’ 替换成‘’ 然后创建一个新的代表商家类别的列(相当于对这一列的数据进行了一个备份)
part_business = spark.sql("SELECT state, city, stars, review_count, REPLACE(category, ' ','')as new_category FROM part_business_1")
# 创建一个新的视图
part_business.createOrReplaceTempView('part_business')

# 所有的不同的商家类别
print("## All distinct categories")
# 筛选所有种类的商家 分别是商家的id 以及 商家的种类
all_categories = spark.sql("SELECT business_id, explode(categories) AS category FROM business")
#将筛选出的数据重新创建一个新的视图
all_categories.createOrReplaceTempView('all_categories')

# 祛除重复的数据 并计算出所有种类的个数
distinct = spark.sql("SELECT COUNT(DISTINCT(new_category)) FROM part_business")
distinct.show()

上述过程是统计出所有种类的商家。

All distinct categories
+—————————-+
|count(DISTINCT new_category)|
+—————————-+
| 1318|
+—————————-+

1
2
3
4
5
6
7
print("## Top 10 business categories")
#统计出没种种类的商家的个数 并且按照降序排列
top_cat = spark.sql("SELECT new_category, COUNT(*) as freq FROM part_business GROUP BY new_category ORDER BY freq DESC")
#显示出排名前10的商家的种类
top_cat.show(10)
#将统计出的数据 存储为 json文件
top_cat.write.json("file:///usr/local/spark/yelp/analysis/top_category", mode='overwrite')

在这里插入图片描述

1
2
3
4
5
6
#在每个城市中商业种类的个数 仅按照商业种类的个数显示 前20条 
print("## Top business categories - in every city")
top_cat_city = spark.sql("SELECT city, new_category, COUNT(*) as freq FROM part_business GROUP BY city, new_category ORDER BY freq DESC")
top_cat_city.show()
#将数据存储到json中
top_cat.write.json("file:///usr/local/spark/yelp/analysis/top_category_city", mode='overwrite')
1
2
3
4
5
# 商业数量最多的城市
print("## Cities with most businesses")
bus_city = spark.sql("SELECT city, COUNT(business_id) as no_of_bus FROM business GROUP BY city ORDER BY no_of_bus DESC")
bus_city.show(10)
bus_city.write.json("file:///usr/local/spark/yelp/analysis/top_business_city", mode='overwrite')

在这里插入图片描述

1
2
3
4
5
6
#每种商业种类的 平均评论数 
print("## Average review count by category")
avg_city = spark.sql(
"SELECT new_category, AVG(review_count)as avg_review_count FROM part_business GROUP BY new_category ORDER BY avg_review_count DESC")
avg_city.show()
avg_city.write.json("file:///usr/local/spark/yelp/analysis/average_review_category", mode='overwrite')

在这里插入图片描述
这里其实也是显示了20行,就不截图太多了。

1
2
3
4
5
6
#每种商业类别的 平均星级水平
print("## Average stars by category")
avg_state = spark.sql(
"SELECT new_category, AVG(stars) as avg_stars FROM part_business GROUP BY new_category ORDER BY avg_stars DESC")
avg_state.show()
avg_state.write.json("file:///usr/local/spark/yelp/analysis/average_stars_category", mode='overwrite')

由于字段 attribute 中的RestaurantsTakeout可能NULL的情况,所以需要利用dropna()处理缺失值的问题。该项对商家是否有’Take out’服务进行分析,统计出两种不同情况的商家的平均星级评分.对应的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 商家所进行的业务分析
print("## Data based on Attribute")
#仅筛选出 商家所进行的业务,星级,以及划分到的商业类别
for_att = spark.sql("SELECT attributes, stars, explode(categories) AS category FROM business")
#将筛选出的数据 创建一个新的视图
for_att.createOrReplaceTempView("for_att")
#对 商家是否 有外卖这个 业务进行分析
attribute = 'RestaurantsTakeout'
attribute_score(attribute)
def attribute_score(attribute):
# 这里的format 表示用attribute的值代替attr的值
att = spark.sql("SELECT attributes.{attr} as {attr}, category, stars FROM for_att".format(attr=attribute)).dropna()
att.createOrReplaceTempView("att")
att_group = spark.sql("SELECT {attr}, AVG(stars) AS stars FROM att GROUP BY {attr} ORDER BY stars".format(attr=attribute))
att_group.show()
att_group.write.json("file:///usr/local/spark/yelp/analysis/{attr}".format(attr=attribute), mode='overwrite')

到这里所有的数据分析的代码就讲解完了。接下来我们来看一下,对于我们统计出来的这些数据,我们那如何更直观的查看。也就是数据的可视化

数据可视化

这里我们进行的数据可视化是通过 python中的matlib包进行绘图表示的。

数据的读取

我们先将上面已经分析过的数据并存储在磁盘上的数据路径自定义。(这一块系统可是无法自己找到)

1
2
3
4
5
6
AVE_REVIEW_CATEGORY = '/usr/local/spark/yelp/analysis/average_review_category'
TOP_CATEGORY_CITY = '/usr/local/spark/yelp/analysis/top_category_city'
TOP_BUSINESS_CITY = '/usr/local/spark/yelp/analysis/top_business_city'
TOP_CATEGORY = '/usr/local/spark/yelp/analysis/top_category'
AVE_STARS_CATEGORY = '/usr/local/spark/yelp/analysis/average_stars_category'
TAKEOUT = '/usr/local/spark/yelp/analysis/RestaurantsTakeout'

定义数据读取的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def read_json(file_path):
#读取这个路径下的所有文件的名称
json_path_names = os.listdir(file_path)
data = []
#遍历所有的文件
for idx in range(len(json_path_names)):
#拼接路径名称
json_path = file_path + '/' + json_path_names[idx]
#判断文件是不是json文件 如果是的话 就打开读取
if json_path.endswith('.json'):
with open(json_path) as f:
#将读取的数据拼接到data中 返回读取到的数据
for line in f:
data.append(json.loads(line))
return data

获得读取的所有的数据

1
2
3
4
5
6
7
ave_review_category_list = read_json(AVE_REVIEW_CATEGORY)
open_close_list = read_json(OPEN_CLOSE)
top_category_city_list = read_json(TOP_CATEGORY_CITY)
top_business_city_list = read_json(TOP_BUSINESS_CITY)
top_category_list = read_json(TOP_CATEGORY)
ave_stars_category_list = read_json(AVE_STARS_CATEGORY)
takeout_list = read_json(TAKEOUT)

下面是数据的图形化展示

对于这一部分,对一部分进行讲解,因为其余的图形的绘画 的思路是一样的。不一样的地方,在于画图的语法不一样。(这里可以自行学习)
学习链接(marplotlib画图)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#在每个城市中商业种类的个数 仅按照商业种类的个数显示 前10条 根据商业种类出现的次数进行 排名 从小到大
top_category_list.sort(key=lambda x: x['freq'], reverse=True)
#商业种类的名称
top_category_key = []
#商业种类的统计个数
top_category_value = []
#数据填充
for idx in range(10):
one = top_category_list[idx]
top_category_key.append(one['new_category'])
top_category_value.append(one['freq'])
#绘制条形图 截取前10名
plt.barh(top_category_key[:10], top_category_value[:10], tick_label=top_category_key[:10])
#标题
plt.title('Top 10 Categories', size = 16)
#x 的label为Frequency 条形图的宽度为 8 颜色 black
plt.xlabel('Frequency',size =8, color = 'Black')
plt.ylabel('Category',size = 8, color = 'Black')
# 会自动填充子图参数 让子图能够填充整个图像区域。(仅仅检查 坐标轴的标签,刻度标签以及标题的部分)
plt.tight_layout()

在这里插入图片描述

在这里插入图片描述
这样是不是看起来就更加直观了呢!

显示的图片不是很清晰,或者标题的字体格式等也是可以调的,这里就不在详细的介绍了。

源码就存在我的jupyter上了(不过需要我开启服务器才可以访问)源码