emmm-spark商业数据分析案例讲解

记录

这篇记录为啥以emmm开头呢?哈哈(~~)~因为我的阿里云还有一个月就要到期了。这意味着又是一波放血~~不过对于一个忠实的阿里粉来说,还是用了自己一个月的生活费,又续租了一年。(这东西有点上瘾,用上了就不想放弃了~~哈哈,这是真好用,所以不能对不起这一个月的生活费不是!当然,如果以后真的有机会进了阿里,阿里爸爸会不会给我免费服务的机会呀!😂)

Jupyter notebook 阿里云搭建

刚说完,要充分利用,那就赶快利用起来吧! 本来是搭建在我MAC上的,现在就把它迁移到阿里云上吧!也能让我随时随地的进行编程了~~

名称 Jupyter 是由Julia、Python和R三个单词组合而成的。Jupyter Notebook是一种Web应用,它能让用户将说明文本、数学方程、代码和可视化内容全部组合到一个易于共享的文档中,非常方便研究和教学。Jupyter Notebook特别适合做数据处理,其用途可以包括数据清理和探索、可视化、机器学习和大数据分析。

服务原理

在这里插入图片描述

Anaconda

Anaconda是一个开源的Python发行版本,其包含了conda、Python等180多个科学包及其依赖项。Anaconda中已经集成了Jupyter Notebook,因此,可以首先安装Anaconda,然后再配置Jupyter Notebook。

Anaconda的安装过程,这里就不在记录了!

安装好Anaconda之后,检查里面是否已经集成了jupyter Notebook。一般这里面都是集成了的,如果没有的话,那就在安装一下吧!
因为我搭建在了云上,这里的安全性还是要保护一下的。因为我的目的是随时随地,随机都可以进行编程!

1
2
cd /home/hadoop/anaconda3/bin
./python

这里来生成一下密码:

1
2
>>>from notebook.auth import passwd
>>>passwd()

然后系统会生成一个密码字符串,比如sha1;记得将这个密码串保存下来,用来配置你的密码。

退出python之后,找到配置文件。

1
vim ~/.jupyter/jupyter_notebook_config.py

1
2
3
4
5
c.NotebookApp.ip='*'                     # 就是设置所有ip皆可访问  
c.NotebookApp.password = 'sha1:7c7990750e83:965c1466a4fab0849051ca5f3c5661110813795b' # 上面复制的那个sha密文'
c.NotebookApp.open_browser = False # 禁止自动打开浏览器
c.NotebookApp.port =8888 # 端口
c.NotebookApp.notebook_dir = '/home/hadoop/jupyternotebook' #设置Notebook启动进入的目录

配置完成之后,使用jupyter notebook启动即可!这时候在本机访问是没有问题的,如果想要通过其他电脑访问还需要进行下一步的策略!

教训

还记得HDFS实验中,有一个问题没有解决吗?哈哈哈 其实有点傻了。
在配置notebook的时候,我使用的是8888端口,阿里云是对这些端口进行保护了的。主要有两层保护:第一层是阿里云本身的防火墙,第二层是控制台的安全组规则。这个经验本来是在mysql的配置的时候解决过的!但是忘记了。所以在执行HDFS分布式文件读取的时候,一直没有成功!但是这里配置的时候,忽然想起了这个事情,所以就把这个问题解决了。HDFS 使用本机用户分布式读取也没问题了。但是写权限还是不够的。

案例讲解

数据来源:YELP数据集仅用来学习

json 数据集

这里的数据集格式是属于NoSQL的中的文档数据集格式之一的json数据集格式。
因为,这也是我第一次对这类数据进行处理,这里做一个简单的学习记录。

  1. 简介
    JSON:一种与开发语言无关的、轻量级的数据存储格式,全称JavaScript Object Notation,一种数据格式的标准规范,起初来源于JavaScript这门语言,后来随着使用的广泛,几乎每门开发语言都有处理JSON的API。
    优点:易于人的阅读和编写,易于程序解析与生产。(看了一下真的是便于阅读与处理)

    JSON样例:首先一个花括号{},整个代表一个对象,同时里面是一种Key-Value的存储形式,它还有不同的数据类型来区分。

  2. 数据类型表示
    数据结构:Object、Array
    基本类型:string,number,true,false,null
    (1)Object
    {key:value,key:value…}
    key:string类型
    value:任何基本类型或数据结构
    (2)Array
    [value,value…]
    value:任何基本类型或数据结构。
    比如:{“name”:”李广”, “values”:[1,2,45,”你好”] }

  3. 实际json数据格式
    这个格式的文件可以通过t x t或者网页等方式打开,当然还有许多工具。
    我这里用的是Xcode。好了这就是我们要处理的数据了。
    在这里插入图片描述

  4. 数据集提前分析
    我这里取出来一条信息,先来看看信息是啥样子的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{"business_id":"f9NumwFMBDn751xgFiRbNA",
"name":"The Range At Lake Norman",
"address":"10913 Bailey Rd",
"city":"Cornelius",
"state":"NC",
"postal_code":"28031",
"latitude":35.4627242,
"longitude":-80.8526119,
"stars":3.5,
"review_count":36,
"is_open":1,
"attributes":{"BusinessAcceptsCreditCards":"True","BikeParking":"True","GoodForKids":"False","BusinessParking":"{'garage': False, 'street': False, 'validated': False, 'lot': True, 'valet': False}","ByAppointmentOnly":"False","RestaurantsPriceRange2":"3"},
"categories":"Active Life, Gun\/Rifle Ranges, Guns & Ammo, Shopping",
"hours":{"Monday":"10:0-18:0",
"Tuesday":"11:0-20:0",
"Wednesday":"10:0-18:0",
"Thursday":"11:0-20:0",
"Friday":"11:0-20:0",
"Saturday":"11:0-20:0",
"Sunday":"13:0-18:0"}}

上面描述的就是这个文档中所有数据集中其中的一条数据。
可以发现json数据格式是按照键值对的形式进行存储的。

  • business_id : 商业店铺的id,用来唯一的标识这一个商铺
  • name : 商业店铺的名称
  • address : 商铺的位置
  • city : 城镇
  • state : 州县
  • postal_code : 邮政编码
  • latitude :纬度
  • longitude :经度
  • starts : 星级评分
  • review_count:评论数
  • is_open: 商家是否营业
  • attributes :商家所进行的业务
  • categories: 类别
  • hours : 商家的营业时间

主要任务

  1. 统计出所有的商业类别,并且进行计算出商业类别的Top 10(categories)。
  2. 每个城市各种商业类型的商家数量,并且计算出商家数量最多的十个城市(city,categories)。
  3. 消费者评价最多的10种商业类别(review_count,categories)。
  4. 最受消费者欢迎的10种商业类别(starts)。
  5. 商业额外业务的评价情况。

主要的解决步骤

  1. 第一步 对数据进行预处理,剔除异常值
  2. 第二步 进行数据集的分析
  3. 第三步 对数据进行可视化分析

代码解读

所需要的包

1
2
3
4
5
6
7
from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import os
import json
import pandas as pd
import matplotlib.pyplot as plt
  • pyspark
    PySpark 是 Spark 为 Python 开发者提供的 API

    • SparkContext
      是程序的入口点,负责连接Spark集群。集群通过SparkContext进行资源管理器的通信以及进行资源的申请、任务的分配和监控,需要从SparkSession中获得
    • SparkConf
      创建SparkContext前得使用SparkConf进行配置,以键值对形式进行
  • pyspark.sql
    一种解析传统SQL到大数据运算模型的引擎

    • SparkSession
      其为用户提供了一个统一的切入点来使用Spark的各项功能,并且允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序
    • DataFrame(表)是Spark SQL对结构化数据的抽象。可以将DataFrame看做RDD
    • Dataset是数据的分布式集合
  • pyspark.sql.function
    可用于dataFram的内置功能列表
  • os
    主要是针对操作系统的包,用来与操作系统进行交互,创建文件等。
  • json
    Python里的json模块主要用于“Python数据与JSON格式的数据间相互转换”
  • pandas
    pandas是一个强大的分析结构化数据的工具集;它的使用基础是Numpy(提供高性能的矩阵运算);用于数据挖掘和数据分析,同时也提供数据清洗功能。
  • matplotlib.pyplot
    是matlib的python版实现,用于绘图操作。

获取spark操作对象

建立一个会话对象

1
2
3
4
5
# 建立配置文件
#builder用来创建一个Sparksession实例
#config 配置相关
#getOrCreate 有就获取没有就创建
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()

数据清洗

下面对数据清洗 进行一个详细的分析。主要将数据中存在值缺失的数据,商家位置错误的数据(这里可以称为离群值)进行筛选。代码的详解解读,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 数据清洗
def data_process(raw_data_path):

# 通过数据路径,读取数据,这里是json数据
business = spark.read.json(raw_data_path)
#split(str, pattern, limit=-1)
#str – 我们要分割的字符串
#pattern 分割所用的正则表达式
split_col = f.split(business['categories'], ',')
#withColumn(colName, col)
#withColumn 负责在原有表的基础上新添加一列 colName是添加一列后的新的列名,col是新列的值 ?列名一样会怎么样处理(这里应该是会直接替代原来的,有待验证)

#filter 使用给定的条件 过滤行 这里是在所有的categories中过滤掉城市
# dropna 至少又一个空缺值的行都会被删除
business = business.withColumn("categories", split_col).filter(business["city"] != "").dropna()
#创建一个数据的临时视图,这个视图的周期与sparkSession相关联
business.createOrReplaceTempView("business")
#解析传统的sql到大数据运算模型 筛选出所需要的内容
#cache 按照默认的存储级别 持久化dataFrame
b_etl = spark.sql("SELECT business_id, name, city, state, latitude, longitude, stars, review_count, is_open, categories, attributes FROM business").cache()
#将筛选完的数据 在进行一个临时视图 方便下一步的sql分析
b_etl.createOrReplaceTempView("b_etl")
#这里是筛选掉 离群值 (距离洲内商家平均位置的欧式距离)
#b1 作为business的原始数据表 b2是在原始数据表的基础上计算每个州县的平均 经纬度 然后计算每一个商家在这个州县的欧式距离 并根据计算结果降序排列
outlier = spark.sql(
"SELECT b1.business_id, SQRT(POWER(b1.latitude - b2.avg_lat, 2) + POWER(b1.longitude - b2.avg_long, 2)) \
as dist FROM b_etl b1 INNER JOIN (SELECT state, AVG(latitude) as avg_lat, AVG(longitude) as avg_long \
FROM b_etl GROUP BY state) b2 ON b1.state = b2.state ORDER BY dist DESC")
#创建一个新的临时视图 outlier
outlier.createOrReplaceTempView("outlier")
#从b_et1中 筛选出所有的距离小于10 的值。这对应到经纬度上面 已经是比较大的值了
joined = spark.sql("SELECT b.* FROM b_etl b INNER JOIN outlier o ON b.business_id = o.business_id WHERE o.dist<10")
#将筛选后的数据进行存储 存储为parquet的格式
joined.write.parquet("file:///home/hadoop/yelp-etl/business_etl", mode="overwrite")

内容补充

  1. parquet 文件格式
    parquet采用不同的压缩比,能达到有效的压缩比。减少磁盘的使用。
    parquet结合spark,可以完美的实现支持分区过滤。如,需要某个产品某段时间的数据,则hdfs只取这个文件夹。
    列修剪:其实说简单点就是我们要取回的那些列的数据
  2. 离群值
    离群值(outlier),也称逸出值,是指在数据中有一个或几个数值与其他数值相比差异较大.
  3. withColumn(colName, col)
    这里要解决的问题是,我们在项目中遗留的一个问题,withColumn往数据里面执行添加操作的时候,当colName的名字与原表中的名字一样的时候,添加操作是如何进行的。

数据分析

这一部分主要对清理后的数据进行分析。

1
2
3
4
5
6
7
8
9
10
11
def analysis(data_path):
# 读取清理后的数据路径,存储的parque数据,并持久化到磁盘中
business = spark.read.parquet(data_path).cache()
#将新的数据建立一个新的视图 business
business.createOrReplaceTempView("business")

#从上面处理过的数据中,选择各种商业类别的洲县、城镇、星级评分、评论数
#explode 在sql语句中用来处理array类型的数据。
part_business = spark.sql("SELECT state, city, stars,review_count, explode(categories) AS category FROM business").cache()
#默认显示前 20 行 并提示only showing top 20 rows
part_business.show()

Start analysis data!
+—–+—————-+—–+————+——————–+
|state| city|stars|review_count| category|
+—–+—————-+—–+————+——————–+
| AZ| Phoenix| 4.0| 41| Notaries|
| AZ| Phoenix| 4.0| 41| Mailbox Centers|
| AZ| Phoenix| 4.0| 41| Printing Services|
| AZ| Phoenix| 4.0| 41| Local Services|
| AZ| Phoenix| 4.0| 41| Shipping Centers|
| NV| Las Vegas| 4.0| 681| Restaurants|
| NV| Las Vegas| 4.0| 681| Bars|
| NV| Las Vegas| 4.0| 681| Nightlife|
| NV| Las Vegas| 4.0| 681| American (New)|
| NV| Las Vegas| 4.0| 681| Seafood|
| QC|Vaudreuil-Dorion| 4.0| 7| Food|
| QC|Vaudreuil-Dorion| 4.0| 7| Bakeries|
| AZ| Scottsdale| 3.5| 5| Shopping|
| AZ| Scottsdale| 3.5| 5| Home & Garden|
| AZ| Scottsdale| 3.5| 5| Mattresses|
| NC| Charlotte| 3.0| 64| Gyms|
| NC| Charlotte| 3.0| 64| Sports Clubs|
| NC| Charlotte| 3.0| 64| Active Life|
| NC| Charlotte| 3.0| 64| Fitness & Instru…|
| ON| Mississauga| 4.5| 12| Restaurants|
+—–+—————-+—–+————+——————–+
only showing top 20 rows

上述就是对处理完的数据进行一个显示以及筛选。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#将筛选的商业类别的相关信息 创建一个新的视图
part_business.createOrReplaceTempView('part_business_1')
#选择洲 城市 星级评定,评论数
# REPLACE(category, ' ','')as new_category 的含义是对category 这一行进行‘’ 替换成‘’ 然后创建一个新的代表商家类别的列(相当于对这一列的数据进行了一个备份)
part_business = spark.sql("SELECT state, city, stars, review_count, REPLACE(category, ' ','')as new_category FROM part_business_1")
# 创建一个新的视图
part_business.createOrReplaceTempView('part_business')

# 所有的不同的商家类别
print("## All distinct categories")
# 筛选所有种类的商家 分别是商家的id 以及 商家的种类
all_categories = spark.sql("SELECT business_id, explode(categories) AS category FROM business")
#将筛选出的数据重新创建一个新的视图
all_categories.createOrReplaceTempView('all_categories')

# 祛除重复的数据 并计算出所有种类的个数
distinct = spark.sql("SELECT COUNT(DISTINCT(new_category)) FROM part_business")
distinct.show()

上述过程是统计出所有种类的商家。

All distinct categories
+—————————-+
|count(DISTINCT new_category)|
+—————————-+
| 1318|
+—————————-+

1
2
3
4
5
6
7
print("## Top 10 business categories")
#统计出没种种类的商家的个数 并且按照降序排列
top_cat = spark.sql("SELECT new_category, COUNT(*) as freq FROM part_business GROUP BY new_category ORDER BY freq DESC")
#显示出排名前10的商家的种类
top_cat.show(10)
#将统计出的数据 存储为 json文件
top_cat.write.json("file:///usr/local/spark/yelp/analysis/top_category", mode='overwrite')

在这里插入图片描述

1
2
3
4
5
6
#在每个城市中商业种类的个数 仅按照商业种类的个数显示 前20条 
print("## Top business categories - in every city")
top_cat_city = spark.sql("SELECT city, new_category, COUNT(*) as freq FROM part_business GROUP BY city, new_category ORDER BY freq DESC")
top_cat_city.show()
#将数据存储到json中
top_cat.write.json("file:///usr/local/spark/yelp/analysis/top_category_city", mode='overwrite')
1
2
3
4
5
# 商业数量最多的城市
print("## Cities with most businesses")
bus_city = spark.sql("SELECT city, COUNT(business_id) as no_of_bus FROM business GROUP BY city ORDER BY no_of_bus DESC")
bus_city.show(10)
bus_city.write.json("file:///usr/local/spark/yelp/analysis/top_business_city", mode='overwrite')

在这里插入图片描述

1
2
3
4
5
6
#每种商业种类的 平均评论数 
print("## Average review count by category")
avg_city = spark.sql(
"SELECT new_category, AVG(review_count)as avg_review_count FROM part_business GROUP BY new_category ORDER BY avg_review_count DESC")
avg_city.show()
avg_city.write.json("file:///usr/local/spark/yelp/analysis/average_review_category", mode='overwrite')

在这里插入图片描述
这里其实也是显示了20行,就不截图太多了。

1
2
3
4
5
6
#每种商业类别的 平均星级水平
print("## Average stars by category")
avg_state = spark.sql(
"SELECT new_category, AVG(stars) as avg_stars FROM part_business GROUP BY new_category ORDER BY avg_stars DESC")
avg_state.show()
avg_state.write.json("file:///usr/local/spark/yelp/analysis/average_stars_category", mode='overwrite')

由于字段 attribute 中的RestaurantsTakeout可能NULL的情况,所以需要利用dropna()处理缺失值的问题。该项对商家是否有’Take out’服务进行分析,统计出两种不同情况的商家的平均星级评分.对应的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 商家所进行的业务分析
print("## Data based on Attribute")
#仅筛选出 商家所进行的业务,星级,以及划分到的商业类别
for_att = spark.sql("SELECT attributes, stars, explode(categories) AS category FROM business")
#将筛选出的数据 创建一个新的视图
for_att.createOrReplaceTempView("for_att")
#对 商家是否 有外卖这个 业务进行分析
attribute = 'RestaurantsTakeout'
attribute_score(attribute)
def attribute_score(attribute):
# 这里的format 表示用attribute的值代替attr的值
att = spark.sql("SELECT attributes.{attr} as {attr}, category, stars FROM for_att".format(attr=attribute)).dropna()
att.createOrReplaceTempView("att")
att_group = spark.sql("SELECT {attr}, AVG(stars) AS stars FROM att GROUP BY {attr} ORDER BY stars".format(attr=attribute))
att_group.show()
att_group.write.json("file:///usr/local/spark/yelp/analysis/{attr}".format(attr=attribute), mode='overwrite')

到这里所有的数据分析的代码就讲解完了。接下来我们来看一下,对于我们统计出来的这些数据,我们那如何更直观的查看。也就是数据的可视化

数据可视化

这里我们进行的数据可视化是通过 python中的matlib包进行绘图表示的。

数据的读取

我们先将上面已经分析过的数据并存储在磁盘上的数据路径自定义。(这一块系统可是无法自己找到)

1
2
3
4
5
6
AVE_REVIEW_CATEGORY = '/usr/local/spark/yelp/analysis/average_review_category'
TOP_CATEGORY_CITY = '/usr/local/spark/yelp/analysis/top_category_city'
TOP_BUSINESS_CITY = '/usr/local/spark/yelp/analysis/top_business_city'
TOP_CATEGORY = '/usr/local/spark/yelp/analysis/top_category'
AVE_STARS_CATEGORY = '/usr/local/spark/yelp/analysis/average_stars_category'
TAKEOUT = '/usr/local/spark/yelp/analysis/RestaurantsTakeout'

定义数据读取的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def read_json(file_path):
#读取这个路径下的所有文件的名称
json_path_names = os.listdir(file_path)
data = []
#遍历所有的文件
for idx in range(len(json_path_names)):
#拼接路径名称
json_path = file_path + '/' + json_path_names[idx]
#判断文件是不是json文件 如果是的话 就打开读取
if json_path.endswith('.json'):
with open(json_path) as f:
#将读取的数据拼接到data中 返回读取到的数据
for line in f:
data.append(json.loads(line))
return data

获得读取的所有的数据

1
2
3
4
5
6
7
ave_review_category_list = read_json(AVE_REVIEW_CATEGORY)
open_close_list = read_json(OPEN_CLOSE)
top_category_city_list = read_json(TOP_CATEGORY_CITY)
top_business_city_list = read_json(TOP_BUSINESS_CITY)
top_category_list = read_json(TOP_CATEGORY)
ave_stars_category_list = read_json(AVE_STARS_CATEGORY)
takeout_list = read_json(TAKEOUT)

下面是数据的图形化展示

对于这一部分,对一部分进行讲解,因为其余的图形的绘画 的思路是一样的。不一样的地方,在于画图的语法不一样。(这里可以自行学习)
学习链接(marplotlib画图)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#在每个城市中商业种类的个数 仅按照商业种类的个数显示 前10条 根据商业种类出现的次数进行 排名 从小到大
top_category_list.sort(key=lambda x: x['freq'], reverse=True)
#商业种类的名称
top_category_key = []
#商业种类的统计个数
top_category_value = []
#数据填充
for idx in range(10):
one = top_category_list[idx]
top_category_key.append(one['new_category'])
top_category_value.append(one['freq'])
#绘制条形图 截取前10名
plt.barh(top_category_key[:10], top_category_value[:10], tick_label=top_category_key[:10])
#标题
plt.title('Top 10 Categories', size = 16)
#x 的label为Frequency 条形图的宽度为 8 颜色 black
plt.xlabel('Frequency',size =8, color = 'Black')
plt.ylabel('Category',size = 8, color = 'Black')
# 会自动填充子图参数 让子图能够填充整个图像区域。(仅仅检查 坐标轴的标签,刻度标签以及标题的部分)
plt.tight_layout()

在这里插入图片描述

在这里插入图片描述
这样是不是看起来就更加直观了呢!

显示的图片不是很清晰,或者标题的字体格式等也是可以调的,这里就不在详细的介绍了。

源码就存在我的jupyter上了(不过需要我开启服务器才可以访问)源码

大数据分析之阿里云安装Spark

在我们已经安装好Hadoop的阿里云上,进行如下操作

下载

下载链接
使用wget在linux上进行下载即可

解压

1
sudo tar -zxvf spark-3.0.1-bin-hadoop2.7.tgz -C /usr/local

改名+修改文件权限

因为我在搭建hadoop的时候,在root用户上新创建了一个新的用户,因此,要把权限给hadoop用户

1
2
sudo mv ./spark-3.0.1-bin-hadoop2.7 ./spark
sudo chown -R hadoop ./spark

更改为可执行文件

1
cp ./conf/spark-env.sh.template ./conf/spark-env.sh

编辑spark-env.sh文件(vim ./conf/spark-env.sh),在第一行添加以下配置信息:

export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)

有了上面的配置信息以后,Spark就可以把数据存储到Hadoop分布式文件系统HDFS中,也可以从HDFS中读取数据。如果没有配置上面信息,Spark就只能读写本地数据,无法读写HDFS数据。
配置完成后就可以直接使用,不需要像Hadoop运行启动命令。
通过运行Spark自带的示例,验证Spark是否安装成功。

启动进入spark-shell交互式执行环境

1
2
cd /usr/local/spark
./bin/spark-shell

我们就可以看到Spark的信息,这样spark就安装成功了
如果想要退出spark的话,执行

1
scala> :quit

测试Spark是否可以正常访问Ubuntu系统中的本地文件

在本地创建一个文本文件,里面输入hello word

开启spark ,写入测试文件

1
2
3
4
5
cala> val line=sc.textFile("file:///home/hadoop/a.txt")
line: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/a.txt MapPartitionsRDD[1] at textFile at <console>:24

scala> print(line.count())
1

测试Spark是否能够正常访问Hadoop中的HDFS

首先执行一下jps检查一下,hadoop是否已经正常启动。
接下来,我们将在HDFS中已经准备好的测试文件,来查看一下主要的文件信息
,这里假设大家都已经掌握了Hadoop的基本知识。 这是在我的主文件下。

1
./bin/hdfs dfs -cat /user/hadoop/text.txt

我们再来执行一下
val line=sc.textFile(“/user/hadoop/text.txt”)
println(line.count())
来检查一下。
注意:在spark-shell交互式环境中,要访问HDFS中的文件时,可以直接使 sc.textFile(“/user/linziyu/word.txt”)和sc.textFile(“word.txt”)这两种路径格式
这样我们的Spark就搭建完了。

大数据-Hbase实验以及讲解

简介

HBase是一个面向列的高可靠、高性能、可伸缩的分布式数据库。是Google公司的BigTable的开源实现,主要用来存储非结构化数据半结构化数据的松散数据,支持大规模海量数据的、分布式并发数据处理效率极高;适用于廉价设备,适合读操作,不适合写操作。HBase可以直接使用本地文件系统而不用HDFS作为底层数据存储方式。为了提高数据可靠性和系统的健壮性,一般都使用HDFS作为底层数据存储方式。

HBase与传统数据库的对比

对比项目 传统数据库 HBase
数据类型 关系型模型 未经解释的字符串
数据操作 插入、删除、更新、查询等已经多表连接等 只有简单的插入、查询、删除、清空等。避免了复杂的表格之间的关系,只采用单表的主键查询,无法实现表的连接操作
存储模式 基于行存储,元组或者行会连续的存储在磁盘页 基于列存储的,每个列族都由几个文件保存,不同列族的文件是分离的
数据索引 通常可以针对不同的列构建复杂的多个索引 只有一个索引-行键
数据维护 更新操作的时候,新的值会覆盖旧的值 不会删除旧版的数据,而是生成一个新的版本
可伸缩性 很难实现横向扩展,纵向扩展有限 很好的实现水平扩展

数据模型的新的概念

  • 列族
    一个表被分成许多个列族的组合。列族需要在表创建的时候就创建好,数量不能太多(HBa se的一些缺陷导致的),存储在同一个列族当中的数据,通常都是属于同一种数据类型的。在存储的时候,列明都以列族作为前缀的。比如,course:history和 course:math这两个列都属于courses这个列族

  • 列限定符

列族里面的数据通过列限定符来定位。列限定符不用事先定义,也不需要在不同行之间保持一致。

  • 时间戳
    每个单元格都保存着同一份数据的多个版本,这些版本采用时间戳进行索引。每次对一个单元格执行操作的时候,HBa se都会隐式的自动生成并存储一个时间戳。

  • 数据坐标

    传统的数据库我们都是通过二维坐标进行取值的,但是对于HBase来讲,需要根据[行键、列族、列限定符、时间戳]来确定一个单元格。

    物理视图

对于数据表的观察,便于我们观察的是概念视图,但是在HBa se中实际存储的是按照数据的物理视图进行存储的。物理视图是采用了基于列的存储方式,而不是像传统的方式那样基于行的存储方式。
在这里插入图片描述
对于在概念视图中,空的列不会存储成null,而是根本就不会存储。

列式数据库主要适合于批量数据处理和即席查询 。
他的优点是:可以降低I/O开销,支持大并发用户查询,处理速度比传统方法快100倍。
他的缺点是:执行连接操作时需要昂贵的元组重构代价,因为一个元组的不同属性被分散到不同的磁盘页中存储,当需要一个完整的元组的时候,就要从多个磁盘页中读取相应字段的值来重新组合得到原来的一个元组。

运行机制

主要理解HBa se架构个部分的作用,主要包括:

  • 客户端
  • Zookper服务器
  • Master
  • Region服务器

    • Store
    • HLog文件

      HBase 常用的She l l命令

      自行学习,网上资料很多

      HBase 常用的Java API 及应用实例

      自行学习,网上资料很多
      主要包括:HBaseConfiguration、HTableDescriptor、HcolumnDescriptor、Put、Get、ResultScanner、Resul、Scan。

      HBase的安装

  • 可以使用wget命令在HBase的官网下载想要使用的版本

  • 解压
1
2
cd ~
sudo tar -zxf ~/下载/hbase-2.2.2-bin.tar.gz -C /usr/local
  • 更改名称
1
2
cd /usr/local
sudo mv ./hbase-2.2.2 ./hbase
  • 更改权限
1
2
cd /usr/local
sudo chown -R hadoop ./hbase
  • 配置环境变量
1
2
3
vim ~/.bashrc
//如果没有引入过PATH 直接添加下面的代码,如果引入过,就将路径添加到path路径的后面即可
export PATH=$PATH:/usr/local/hbase/bin

执行命令,让配置生效

1
source ~/.bashrc
  • 添加HBase的权限
1
2
cd /usr/local
sudo chown -R hadoop ./hbase #将hbase下的所有文件的所有者改为hadoop,hadoop是当前用户的用户名。
  • 查看版本,确保安装成功
1
/usr/local/hbase/bin/hbase version

伪分布式配置

  • 配置/usr/local/hbase/conf/hbase-env.sh。命令如下
1
vim /usr/local/hbase/conf/hbase-env.sh

配置JAVA_HOME,HBASE_CLASSPATH,HBASE_MANAGES_ZK.
HBASE_CLASSPATH设置为本机HBase安装目录下的conf目录(即/usr/local/hbase/conf)

1
2
3
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_162
export HBASE_CLASSPATH=/usr/local/hbase/conf
export HBASE_MANAGES_ZK=true
  • 配置/usr/local/hbase/conf/hbase-site.xml
    用命令vi打开并编辑hbase-site.xml,命令如下
1
vim /usr/local/hbase/conf/hbase-site.xml

修改hbase.rootdir,指定HBase数据在HDFS上的存储路径;将属性hbase.cluter.distributed设置为true。假设当前Hadoop集群运行在伪分布式模式下,在本机上运行,且NameNode运行在9000端口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://localhost:9000/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.unsafe.stream.capability.enforce</name>
<value>false</value>
</property>
</configuration>
  • 接下来测试运行HBase
    启动hbase的时候一定要确保hadoop已经成功启动
1
2
cd /usr/local/hbase
bin/start-hbase.sh

成功安装配置好之后,下面就开始我们基本的实验吧

实验

现有以下关系型数据库中的表和数据,要求将其转换为适合于HBase存储的表并插入数据:

学生表(Student)
学号(S_No) | 姓名(S_Name) |性别(S_Sex)|年龄(S_Age)
——– | —–|—-|—-
2015001 | Zhangsan|male|23
2015002 | Mary|female|22
2015003 | Lisi |male|24
下面先插入s003 学生的信息

1
2
3
4
5
6
7
8
 hbase(main):008:0>create 'Student','S_No','S_Name','S_Sex','S_Age'
hbase(main):029:0> put 'Student','s003','S_No','2015003'
Took 0.0138 seconds
hbase(main):030:0> put 'Student','s003','S_Name','Lisi'
Took 0.0118 seconds
hbase(main):031:0> put 'Student','s003','S_Sex','male'
Took 0.0090 seconds
hbase(main):032:0> put 'Student','s003','S_Age','24'

课程表(Course)
课程号(C_No) | 课程名(C_Name) |学分(C_Credit)
——– | —–|—-|—-
123001 | Math|2.0
123002 | Computer Science|5.0
123003 |English|3.0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
hbase(main):041:0> create 'Course','C_No','C_Name','C_Credit'
Created table Course
Took 1.3558 seconds
=> Hbase::Table - Course
hbase(main):042:0> put 'Course','C001','C_No','2015001'
Took 0.0373 seconds
hbase(main):043:0> put 'Course','C001','C_Name','Math'
Took 0.0097 seconds
hbase(main):044:0> put 'Course','C001','C_No','123001'
Took 0.0060 seconds
hbase(main):045:0> put 'Course','C001','C_Credit','2.0'
Took 0.0093 seconds
hbase(main):046:0> put 'Course','c002','C_No','123002'
Took 0.0049 seconds
hbase(main):047:0> put 'Course','c002','C_Name','Computer'
Took 0.1125 seconds
hbase(main):048:0> put 'Course','c002','C_Credit','5.0'
Took 0.0107 seconds
hbase(main):049:0> put 'Course','c003','C_No','123003'
Took 0.0097 seconds
hbase(main):050:0> put 'Course','c003','C_Name','English'
Took 0.0060 seconds
hbase(main):051:0> put 'Course','c003','C_Credit','3.0'
Took 0.0089 seconds

选课表(SC)
学号(SC_Sno) | 课程号(SC_Cno))|成绩(SC_Score)
——– | —–|—-|—-
2015001 | 123001|86
2015001 | 123003|69
2015002 |123002|77
2015002 | 123003|99
2015003 | 123001|98
2015003 |123002|95

1
2
3
4
5
6
7
8
9
10
11
12
create 'SC','SC_Sno','SC_Cno','SC_Score'
Created table SC
Took 1.2786 seconds
=> Hbase::Table - SC
hbase(main):053:0> put 'SC','sc001','SC_Sno','2015001'
NoMethodError: undefined method `ut' for main:Object

hbase(main):054:0> put 'SC','sc001','SC_Sno','2015001'
Took 0.0210 seconds
hbase(main):055:0> put 'SC','sc001','SC_Cno','123001'
Took 0.0201 seconds
hbase(main):056:0> put 'SC','sc001','SC_Score','86'

这样,我们的表还有数据就填充完了,上面因为一些数据的填充的过程都是一样的,因此就写了一个实例来演示一下,需要注意的是表格中每一个行的数据,都需要为他增加一个主键,也就是语句中的sc001的部分,因为hba se是列存储的,这个用来表示每一行,因此,对应每一行的时候,这个值要有所区分

编程实现以下指定功能,并用Hadoop提供的HBase Shell命令完成相同任务:

  • 先是shell命令

(1)列出HBase所有的表的相关信息,例如表名;

1
hbase(main):073:0> list

(2)在终端打印出指定的表的所有记录数据;

1
scan 'Student'

(3)向已经创建好的表添加和删除指定的列族或列;

1
2
3
create 's1','score'
put 's1','zhangsan','score:math','69'
delete 's1','zhangsan','score:math'

(4)清空指定的表的所有记录数据;

1
hbase(main):078:0> truncate 's1'

(5)统计表的行数。

1
hbase> count 's1'
  • Java命令

使用HBase Java API进行编程控制
在进行编程之前,我们先进行一些前期的准备工作,准备好编程需要的环境。
当我们建立好普通的java项目之后,首先要进行一个导包的操作。
(导包操作以及建立项目的过程,不会的自行百度学习)
所需要的包位于

<1> 进入到“/usr/local/hbase/lib”目录,选中该目录下的所有jar文件(注意,不要选中client-facing-thirdparty、ruby、shaded-clients和zkcli这四个目录)

<2>在“client-facing-thirdparty”目录下(如下图所示),选中所有jar文件
上面就是我们所需要的全部的包了。

在这个项目下,建立一个类就可以进行编程了。
下面是上面五个小实验的详细代码 讲解,需要执行哪一个过程,在main函数中,调用即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package Test1;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.List;
public class ExampleForHBase {
//首先定义一个 与数据库连接所需要的配置文件
public static Configuration configuration;
//定义一个连接对象
public static Connection connection;
//定义一个操作数据库的对象。管理员可用于创建,删除,列出,启用和禁用以及以其他方式修改表以及执行其他管理操作。
public static Admin admin;

public static void main(String[] args)throws IOException{
init();
createTable("student",new String[]{"score"});
insertData("student","zhangsan","score","English","69");
insertData("student","zhangsan","score","Math","86");
insertData("student","zhangsan","score","Computer","77");
getData("student", "zhangsan", "score","English");
close();
}
//初始化 连接数据库所需要的 配置文件,连接对象,以及管理员
public static void init(){
configuration = HBaseConfiguration.create();
configuration.set("hbase.rootdir","hdfs://0.0.0.0:8020/hbase");
try{
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
}catch (IOException e){
e.printStackTrace();
}
}
//设置数据库连接完成之后的关闭操作
public static void close(){
try{
if(admin != null){
//管理员权限关闭
admin.close();
}
if(null != connection){
//连接关闭
connection.close();
}
}catch (IOException e){
e.printStackTrace();
}
}
// 第一个题 列出HBase所有的表的相关信息
public static void listTables() throws IOException {
init();//建立连接
List<TableDescriptor> tableDescriptors = admin.listTableDescriptors();
for(TableDescriptor tableDescriptor : tableDescriptors){
TableName tableName = tableDescriptor.getTableName();
System.out.println("Table:" + tableName);
}
close();//关闭连接
}
//第二题 在终端打印出指定的表的所有记录数据

//在终端打印出指定的表的所有记录数据
public static void getData(String tableName)throws IOException{
init();
//根据表名 获得一个表对象
Table table = connection.getTable(TableName.valueOf(tableName));
// 获得一个查询表的查询对象
Scan scan = new Scan();
//使用查询对象,获得查询到的结果(Result)集合 ResultScanner
ResultScanner scanner = table.getScanner(scan);//获取行的遍历器
for (Result result:scanner){
//将每一条Result的结果 打印输出
printRecoder(result);
}
close();
}
//打印一条记录的详情
public static void printRecoder(Result result)throws IOException{
//每一个列又包含多条信息 ,因此 需要迭代打印输出
for(Cell cell:result.rawCells()){
System.out.print("行健: "+new String(Bytes.toString(cell.getRowArray(),cell.getRowOffset(), cell.getRowLength())));
System.out.print("列簇: "+new String( Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(), cell.getFamilyLength()) ));
System.out.print(" 列: "+new String(Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(), cell.getQualifierLength())));
System.out.print(" 值: "+new String(Bytes.toString(cell.getValueArray(),cell.getValueOffset(), cell.getValueLength())));
System.out.println("时间戳: "+cell.getTimestamp());
}
}



//第三题 向表中 添加数据 所需要的关键字 与 shell的put语句 是对应的
//colFamily代表的是列簇 如果列为空,那么应设置col的值为“”
public static void insertData(String tableName,String rowKey,String colFamily,String col,String val) throws IOException {
init();
//建立连接 获取表
Table table = connection.getTable(TableName.valueOf(tableName));
//语句执行 添加
Put put = new Put(rowKey.getBytes());
put.addColumn(colFamily.getBytes(),col.getBytes(), val.getBytes());
table.put(put);
table.close();
close();
}
//删除数据
public static void deleRow(String tableName,String rowKey,String colFamily,String col) throws IOException {
init();
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(rowKey.getBytes());
//删除指定列族
delete.addFamily(Bytes.toBytes(colFamily));
//删除指定列
delete.addColumn(Bytes.toBytes(colFamily),Bytes.toBytes(col));
table.delete(delete);
table.close();
close();
}

//第四题 清空指定的表的所有记录数据;
public static void clearRows(String tableName)throws IOException{
init();
TableName tablename = TableName.valueOf(tableName);
admin.disableTable(tablename);
admin.deleteTable(tablename);
TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tablename);
admin.createTable(tableDescriptor.build());
close();
}

//第五题 统计表的行数
public static void countRows(String tableName)throws IOException{
init();
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
ResultScanner scanner = table.getScanner(scan);
int num = 0;
for (Result result = scanner.next();result!=null;result=scanner.next()){
num++;
}
System.out.println("行数:"+ num);
scanner.close();
close();
}

//创建表
public static void createTable(String myTableName,String[] colFamily) throws IOException {
TableName tableName = TableName.valueOf(myTableName);
if(admin.tableExists(tableName)){
System.out.println("talbe is exists!");
//删除原来的表
admin.disableTable(tablename);
admin.deleteTable(tablename);
}else {
//创建新的表
TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName);
for(String str:colFamily){
ColumnFamilyDescriptor family =
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(str)).build();
tableDescriptor.setColumnFamily(family);
}
admin.createTable(tableDescriptor.build());
}
}
//获取表中的数据
public static void getData(String tableName,String rowKey,String colFamily, String col)throws IOException{
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(rowKey.getBytes());
get.addColumn(colFamily.getBytes(),col.getBytes());
Result result = table.get(get);
System.out.println(new String(result.getValue(colFamily.getBytes(),col==null?null:col.getBytes())));
table.close();
}
}

同时完成:
(1)createTable(String tableName, String[] fields)
创建表,参数tableName为表的名称,字符串数组fields为存储记录各个域名称的数组。要求当HBase已经存在名为tableName的表的时候,先删除原有的表,然后再创建新的表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void createTable(String myTableName,String[] colFamily) throws IOException {
TableName tableName = TableName.valueOf(myTableName);
if(admin.tableExists(tableName)){
System.out.println("talbe is exists!");
//删除原来的表
admin.disableTable(tablename);
admin.deleteTable(tablename);
}else {
//创建新的表
TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName);
for(String str:colFamily){
ColumnFamilyDescriptor family =
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(str)).build();
tableDescriptor.setColumnFamily(family);
}
admin.createTable(tableDescriptor.build());
}
}

(2)addRecord(String tableName, String row, String[] fields, String[] values)
向表tableName、行row(用S_Name表示)和字符串数组files指定的单元格中添加对应的数据values。其中fields中每个元素如果对应的列族下还有相应的列限定符的话,用“columnFamily:column”表示。例如,同时向“Math”、“Computer Science”、“English”三列添加成绩时,字符串数组fields为{“Score:Math”,”Score;Computer Science”,”Score:English”},数组values存储这三门课的成绩。

1
2
3
4
5
6
7
8
9
10
11
12
public static void addRecord(String tableName,String row,String[] fields,String[] values) throws IOException {
init();
Table table = connection.getTable(TableName.valueOf(tableName));
for(int i = 0;i != fields.length;i++){
Put put = new Put(row.getBytes());
String[] cols = fields[i].split(":");
put.addColumn(cols[0].getBytes(), cols[1].getBytes(), values[i].getBytes());
table.put(put);
}
table.close();
close();
}

(3)scanColumn(String tableName, String column)
浏览表tableName某一列的数据,如果某一行记录中该列数据不存在,则返回null。要求当参数column为某一列族名称时,如果底下有若干个列限定符,则要列出每个列限定符代表的列的数据;当参数column为某一列具体名称(例如“Score:Math”)时,只需要列出该列的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void scanColumn(String tableName,String column)throws  IOException{
init();
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes(column));
ResultScanner scanner = table.getScanner(scan);
for (Result result = scanner.next(); result != null; result = scanner.next()){
showCell(result);
}
table.close();
close();
}
//格式化输出
public static void showCell(Result result){
Cell[] cells = result.rawCells();
for(Cell cell:cells){
System.out.println("RowName:"+new String(Bytes.toString(cell.getRowArray(),cell.getRowOffset(), cell.getRowLength()))+" ");
System.out.println("Timetamp:"+cell.getTimestamp()+" ");
System.out.println("column Family:"+new String(Bytes.toString(cell.getFamilyArray(),cell.getFamilyOffset(), cell.getFamilyLength()))+" ");
System.out.println("row Name:"+new String(Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(), cell.getQualifierLength()))+" ");
System.out.println("value:"+new String(Bytes.toString(cell.getValueArray(),cell.getValueOffset(), cell.getValueLength()))+" ");
}
}

(4)modifyData(String tableName, String row, String column)
修改表tableName,行row(可以用学生姓名S_Name表示),列column指定的单元格的数据。

1
2
3
4
5
6
7
8
9
public static void modifyData(String tableName,String row,String column,String val)throws IOException{
init();
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(row.getBytes());
put.addColumn(column.getBytes(),null,val.getBytes());
table.put(put);
table.close();
close();
}

(5)deleteRow(String tableName, String row)
删除表tableName中row指定的行的记录。

1
2
3
4
5
6
7
8
public static void deleteRow(String tableName,String row)throws IOException{
init();
Table table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(row.getBytes());
table.delete(delete);
table.close();
close();
}

林子雨大数据-hdfs常用的操作(实验操作详细记录与讲解)

一、向HDFS中上传任意文本文件,如果指定的文件在HDFS中已经存在,由用户指定是追加到原有文件末尾还是覆盖原有的文件;(用JAVA编程实现相同功能)

  • shell命令
    1.首先,在我本地目录下创建一个本地文件local.txt
    我的文件路径是 /usr/local/hadoop/input$
1
vim local.txt

2.检查要追加的原有文件是否已经存在

1
2
3
4
//检查操作
$./bin/hdfs dfs -test -e text.txt
//输出检查后的结果 1 表示不存在 0 表示存在
echo $?

3.指定是进行追加还是覆盖操作

1
2
3
4
不存在我们就执行复制操作
./bin/hdfs dfs -copyFromLocal -f ./input/local.txt test.txt
存在我们就执行追加操作
./bin/hdfs dfs -appendToFile ./input/local.txt text.txt

下面我们把上面的语句整合一下

1
2
3
4
if $(./bin/hdfs dfs -test -e test.txt);
> then $(./bin/hdfs dfs -copyFromLocal -f ./input/local.txt test.txt);
> else $(./bin/hdfs dfs -appendToFile ./input/local.txt test.txt);
> fi
  • java编程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package Test1;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;

public class CopyFromLocalHost {
/**
* 判断路径是否存在
*/
public static boolean test(Configuration conf, String path) throws IOException {
FileSystem fs = FileSystem.get(conf);
return fs.exists(new Path(path));
}

/**
* 复制文件到指定路径
* 若路径已存在,则进行覆盖
*/
public static void copyFromLocalFile(Configuration conf, String localFilePath, String remoteFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path localPath = new Path(localFilePath);
Path remotePath = new Path(remoteFilePath);
/* fs.copyFromLocalFile 第一个参数表示是否删除源文件,第二个参数表示是否覆盖 */
fs.copyFromLocalFile(false, true, localPath, remotePath);
fs.close();
}

/**
* 追加文件内容
*/
public static void appendToFile(Configuration conf, String localFilePath, String remoteFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
/* 创建一个文件读入流 */
FileInputStream in = new FileInputStream(localFilePath);
/* 创建一个文件输出流,输出的内容将追加到文件末尾 */
FSDataOutputStream out = fs.append(remotePath);
/* 读写文件内容 */
byte[] data = new byte[1024];
int read = -1;
while ( (read = in.read(data)) > 0 ) {
out.write(data, 0, read);
}
out.close();
in.close();
fs.close();
}

/**
* 主函数
*/
public static void main(String[] args) {
Configuration conf = new Configuration();
//在这里输入在hdfs配置文件中配置的地址即可。
conf.set("fs.default.name","hdfs://0.0.0.0:8020");
String localFilePath = "/usr/local/hadoop/input/local.txt"; // 本地路径
//这里的路径是我们之前配置的hadoop的用户目录
String remoteFilePath = "/user/hadoop/text.txt"; // HDFS路径
//这里选择文件是否覆盖还是追加
String choice = "append"; // 若文件存在则追加到文件末尾
// String choice = "overwrite"; // 若文件存在则覆盖

try {
/* 判断文件是否存在 */
Boolean fileExists = false;
if (CopyFromLocalHost.test(conf, remoteFilePath)) {
fileExists = true;
System.out.println(remoteFilePath + " 已存在.");
// choice = "overwrite";
} else {
System.out.println(remoteFilePath + " 不存在.");
}
/* 进行处理 */
if ( !fileExists) { // 文件不存在,则上传
CopyFromLocalHost.copyFromLocalFile(conf, localFilePath, remoteFilePath);
System.out.println(localFilePath + " 已上传至 " + remoteFilePath);
} else if ( choice.equals("overwrite") ) { // 选择覆盖
CopyFromLocalHost.copyFromLocalFile(conf, localFilePath, remoteFilePath);
System.out.println(localFilePath + " 已覆盖 " + remoteFilePath);
} else if ( choice.equals("append") ) { // 选择追加
CopyFromLocalHost.appendToFile(conf, localFilePath, remoteFilePath);
System.out.println(localFilePath + " 已追加至 " + remoteFilePath);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

因为我的hd f s是搭建在阿里云上的,在云上没有安装成功Eclipse.所以就在本地编写了代码,上传到云上执行的,下面的执行了同样的过程。
更方便的操作,等我完全搞懂了,如何在本地执行分布式或者在云上安装好eclipse,在做记录。
操作成功后会显示:

1
2
3
4
2020-10-10 16:33:06,050 INFO Configuration.deprecation: fs.default.name is deprecated. Instead, use fs.defaultFS
/user/hadoop/text.txt 不存在.
2020-10-10 16:33:08,706 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
/usr/local/hadoop/local.txt 已上传至 /user/hadoop/text.txt

二、 从HDFS中下载指定文件,如果本地文件与要下载的文件名称相同,则自动对下载的文件重命名

我们还是使用上面用过的local.txt 以及在h d f s中的test.txt文件。

  • Shell编程
1
2
3
4
if $(./bin/hdfs dfs -test -e file:/usr/local/hadoop/input/text.txt); 
then $(./bin/hdfs dfs -copyToLocal text.txt ./input/text2.txt);
else $(./bin/hdfs dfs -copyToLocal text.txt ./input/text.txt );
fi
  • java编程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package Test1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;

public class CopyFromLocalHost {
/**
* 下载文件到本地
* 判断本地路径是否已存在,若已存在,则自动进行重命名
*/
public static void copyToLocal(Configuration conf, String remoteFilePath, String localFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
File f = new File(localFilePath);
/* 如果文件名存在,自动重命名(在文件名后面加上 _0, _1 ...) */
if (f.exists()) {
System.out.println(localFilePath + " 已存在.");
Integer i = 0;
while (true) {
f = new File(localFilePath + "_" + i.toString());
if (!f.exists()) {
localFilePath = localFilePath + "_" + i.toString();
break;
}
}
System.out.println("将重新命名为: " + localFilePath);
}

// 下载文件到本地
Path localPath = new Path(localFilePath);
fs.copyToLocalFile(remotePath, localPath);
fs.close();
}

/**
* 主函数
*/
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://0.0.0.0:8020");
String localFilePath = "/usr/local/hadoop/input/text.txt"; // 本地路径
String remoteFilePath = "/user/hadoop/text.txt"; // HDFS路径

try {
CopyFromLocalHost.copyToLocal(conf, remoteFilePath, localFilePath);
System.out.println("下载完成");
} catch (Exception e) {
e.printStackTrace();
}
}
}

成功执行后会显示(因为我上面已经执行过shell程序了,所以在我操作的文件夹上,内容是已经存在了的,因此进行重命名操作)

1
2
3
4
5
2020-10-10 18:46:17,802 INFO Configuration.deprecation: fs.default.name is deprecated. Instead, use fs.defaultFS
/usr/local/hadoop/input/text.txt 已存在.
将重新命名为: /usr/local/hadoop/input/text.txt_0
2020-10-10 18:46:21,666 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
下载完成

三、将HDFS中指定文件的内容输出到终端中

本质就是查看文件内容

  • Shell

    1
    ./bin/hdfs dfs -cat text.txt
  • java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package Test1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;

public class CopyFromLocalHost {
/**
* 读取文件内容
*/
public static void cat(Configuration conf, String remoteFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
FSDataInputStream in = fs.open(remotePath);
BufferedReader d = new BufferedReader(new InputStreamReader(in));
String line = null;
while ( (line = d.readLine()) != null ) {
System.out.println(line);
}
d.close();
in.close();
fs.close();
}

/**
* 主函数
*/
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://0.0.0.0:8020");
String remoteFilePath = "/user/hadoop/text.txt"; // HDFS路径

try {
System.out.println("读取文件: " + remoteFilePath);
CopyFromLocalHost.cat(conf, remoteFilePath);
System.out.println("\n读取完成");
} catch (Exception e) {
e.printStackTrace();
}
}
}

读取成功后显示

1
2
3
4
5
6
2020-10-10 19:14:58,684 INFO Configuration.deprecation: fs.default.name is deprecated. Instead, use fs.defaultFS
读取文件: /user/hadoop/text.txt
2020-10-10 19:15:01,401 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
this ssss

读取完成

四、显示HDFS中指定的文件的读写权限、大小、创建时间、路径等信息

  • shell
1
./bin/hdfs dfs -ls -h text.txt
  • java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package Test1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;
import java.text.SimpleDateFormat;

public class CopyFromLocalHost {
/**
* 显示指定文件的信息
*/
public static void ls(Configuration conf, String remoteFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
FileStatus[] fileStatuses = fs.listStatus(remotePath);
for (FileStatus s : fileStatuses) {
System.out.println("路径: " + s.getPath().toString());
System.out.println("权限: " + s.getPermission().toString());
System.out.println("大小: " + s.getLen());
/* 返回的是时间戳,转化为时间日期格式 */
Long timeStamp = s.getModificationTime();
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String date = format.format(timeStamp);
System.out.println("时间: " + date);
}
fs.close();
}

/**
* 主函数
*/
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://0.0.0.0:8020");
String remoteFilePath = "/user/hadoop/text.txt"; // HDFS路径

try {
System.out.println("读取文件信息: " + remoteFilePath);
CopyFromLocalHost.ls(conf, remoteFilePath);
System.out.println("\n读取完成");
} catch (Exception e) {
e.printStackTrace();
}
}
}

结果显示

1
2
3
4
5
6
7
读取文件信息: /user/hadoop/text.txt
路径: hdfs://0.0.0.0:8020/user/hadoop/text.txt
权限: rw-r--r--
大小: 11
时间: 2020-10-10 16:33:08

读取完成

五、给定HDFS中某一个目录,输出该目录下的所有文件的读写权限、大小、创建时间、路径等信息,如果该文件是目录,则递归输出该目录下所有文件相关信息

  • shell
1
./bin/hdfs dfs -ls -R -h
  • java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package Test1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;
import java.text.SimpleDateFormat;

public class CopyFromLocalHost {
/**
* 显示指定文件夹下所有文件的信息(递归)
*/
public static void lsDir(Configuration conf, String remoteDir) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path dirPath = new Path(remoteDir);
/* 递归获取目录下的所有文件 */
RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(dirPath, true);
/* 输出每个文件的信息 */
while (remoteIterator.hasNext()) {
FileStatus s = remoteIterator.next();
System.out.println("路径: " + s.getPath().toString());
System.out.println("权限: " + s.getPermission().toString());
System.out.println("大小: " + s.getLen());
/* 返回的是时间戳,转化为时间日期格式 */
Long timeStamp = s.getModificationTime();
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String date = format.format(timeStamp);
System.out.println("时间: " + date);
System.out.println();
}
fs.close();
}

/**
* 主函数
*/
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://0.0.0.0:8020");
String remoteDir = "/user/hadoop"; // HDFS路径

try {
System.out.println("(递归)读取目录下所有文件的信息: " + remoteDir);
CopyFromLocalHost.lsDir(conf, remoteDir);
System.out.println("读取完成");
} catch (Exception e) {
e.printStackTrace();
}
}
}

六、提供一个HDFS内的文件的路径,对该文件进行创建和删除操作。如果文件所在目录不存在,则自动创建目录

  • shell
1
2
3
4
5
if $(hdfs dfs -test -d dir1/dir2);
then $(hdfs dfs -touchz dir1/dir2/filename);
else $(hdfs dfs -mkdir -p dir1/dir2 && hdfs dfs -touchz dir1/dir2/filename);
fi
hdfs dfs -rm dir1/dir2/filename #删除文件
  • java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;

public class HDFSApi {
/**
* 判断路径是否存在
*/
public static boolean test(Configuration conf, String path) throws IOException {
FileSystem fs = FileSystem.get(conf);
return fs.exists(new Path(path));
}

/**
* 创建目录
*/
public static boolean mkdir(Configuration conf, String remoteDir) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path dirPath = new Path(remoteDir);
boolean result = fs.mkdirs(dirPath);
fs.close();
return result;
}

/**
* 创建文件
*/
public static void touchz(Configuration conf, String remoteFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
FSDataOutputStream outputStream = fs.create(remotePath);
outputStream.close();
fs.close();
}

/**
* 删除文件
*/
public static boolean rm(Configuration conf, String remoteFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
boolean result = fs.delete(remotePath, false);
fs.close();
return result;
}

/**
* 主函数
*/
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://0.0.0.0:8020");
String remoteFilePath = "/user/hadoop/input/text.txt"; // HDFS路径
String remoteDir = "/user/hadoop/input"; // HDFS路径对应的目录

try {
/* 判断路径是否存在,存在则删除,否则进行创建 */
if ( HDFSApi.test(conf, remoteFilePath) ) {
HDFSApi.rm(conf, remoteFilePath); // 删除
System.out.println("删除路径: " + remoteFilePath);
} else {
if ( !HDFSApi.test(conf, remoteDir) ) { // 若目录不存在,则进行创建
HDFSApi.mkdir(conf, remoteDir);
System.out.println("创建文件夹: " + remoteDir);
}
HDFSApi.touchz(conf, remoteFilePath);
System.out.println("创建路径: " + remoteFilePath);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

七、提供一个HDFS的目录的路径,对该目录进行创建和删除操作。创建目录时,如果目录文件所在目录不存在则自动创建相应目录;删除目录时,由用户指定当该目录不为空时是否还删除该目录

  • Shell

创建目录的操作

1
hdfs dfs -mkdir -p dir1/dir2

删除目录的操作

1
hdfs dfs -rmdir dir1/dir2

目录不为空时候的删除操作

1
hdfs dfs -rm -R dir1/dir2
  • java实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;

public class HDFSApi {
/**
* 判断路径是否存在
*/
public static boolean test(Configuration conf, String path) throws IOException {
FileSystem fs = FileSystem.get(conf);
return fs.exists(new Path(path));
}

/**
* 判断目录是否为空
* true: 空,false: 非空
*/
public static boolean isDirEmpty(Configuration conf, String remoteDir) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path dirPath = new Path(remoteDir);
RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(dirPath, true);
return !remoteIterator.hasNext();
}

/**
* 创建目录
*/
public static boolean mkdir(Configuration conf, String remoteDir) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path dirPath = new Path(remoteDir);
boolean result = fs.mkdirs(dirPath);
fs.close();
return result;
}

/**
* 删除目录
*/
public static boolean rmDir(Configuration conf, String remoteDir) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path dirPath = new Path(remoteDir);
/* 第二个参数表示是否递归删除所有文件 */
boolean result = fs.delete(dirPath, true);
fs.close();
return result;
}

/**
* 主函数
*/
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://0.0.0.0:8020");
String remoteDir = "/user/hadoop/input"; // HDFS目录
Boolean forceDelete = false; // 是否强制删除

try {
/* 判断目录是否存在,不存在则创建,存在则删除 */
if ( !HDFSApi.test(conf, remoteDir) ) {
HDFSApi.mkdir(conf, remoteDir); // 创建目录
System.out.println("创建目录: " + remoteDir);
} else {
if ( HDFSApi.isDirEmpty(conf, remoteDir) || forceDelete ) { // 目录为空或强制删除
HDFSApi.rmDir(conf, remoteDir);
System.out.println("删除目录: " + remoteDir);
} else { // 目录不为空
System.out.println("目录不为空,不删除: " + remoteDir);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

八、向HDFS中指定的文件追加内容,由用户指定内容追加到原有文件的开头或结尾

  • shell
1
hdfs dfs -appendToFile local.txt text.txt
1
2
3
hdfs dfs -get text.txt
cat text.txt >> local.txt
hdfs dfs -copyFromLocal -f text.txt text.txt
  • java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;

public class HDFSApi {
/**
* 判断路径是否存在
*/
public static boolean test(Configuration conf, String path) throws IOException {
FileSystem fs = FileSystem.get(conf);
return fs.exists(new Path(path));
}

/**
* 追加文本内容
*/
public static void appendContentToFile(Configuration conf, String content, String remoteFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
/* 创建一个文件输出流,输出的内容将追加到文件末尾 */
FSDataOutputStream out = fs.append(remotePath);
out.write(content.getBytes());
out.close();
fs.close();
}

/**
* 追加文件内容
*/
public static void appendToFile(Configuration conf, String localFilePath, String remoteFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
/* 创建一个文件读入流 */
FileInputStream in = new FileInputStream(localFilePath);
/* 创建一个文件输出流,输出的内容将追加到文件末尾 */
FSDataOutputStream out = fs.append(remotePath);
/* 读写文件内容 */
byte[] data = new byte[1024];
int read = -1;
while ( (read = in.read(data)) > 0 ) {
out.write(data, 0, read);
}
out.close();
in.close();
fs.close();
}

/**
* 移动文件到本地
* 移动后,删除源文件
*/
public static void moveToLocalFile(Configuration conf, String remoteFilePath, String localFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
Path localPath = new Path(localFilePath);
fs.moveToLocalFile(remotePath, localPath);
}

/**
* 创建文件
*/
public static void touchz(Configuration conf, String remoteFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
FSDataOutputStream outputStream = fs.create(remotePath);
outputStream.close();
fs.close();
}

/**
* 主函数
*/
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://0.0.0.0:8020");
String remoteFilePath = "/user/hadoop/text.txt"; // HDFS文件
String content = "新追加的内容\n";
String choice = "after"; //追加到文件末尾
// String choice = "before"; // 追加到文件开头

try {
/* 判断文件是否存在 */
if ( !HDFSApi.test(conf, remoteFilePath) ) {
System.out.println("文件不存在: " + remoteFilePath);
} else {
if ( choice.equals("after") ) { // 追加在文件末尾
HDFSApi.appendContentToFile(conf, content, remoteFilePath);
System.out.println("已追加内容到文件末尾" + remoteFilePath);
} else if ( choice.equals("before") ) { // 追加到文件开头
/* 没有相应的api可以直接操作,因此先把文件移动到本地*/
/*创建一个新的HDFS,再按顺序追加内容 */
String localTmpPath = "/user/hadoop/tmp.txt";
// 移动到本地
HDFSApi.moveToLocalFile(conf, remoteFilePath, localTmpPath);
// 创建一个新文件
HDFSApi.touchz(conf, remoteFilePath);
// 先写入新内容
HDFSApi.appendContentToFile(conf, content, remoteFilePath);
// 再写入原来内容
HDFSApi.appendToFile(conf, localTmpPath, remoteFilePath);
System.out.println("已追加内容到文件开头: " + remoteFilePath);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

九、删除HDFS中指定的文件

  • shell
1
hdfs dfs -rm text.txt
  • java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;

public class HDFSApi {
/**
* 删除文件
*/
public static boolean rm(Configuration conf, String remoteFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
boolean result = fs.delete(remotePath, false);
fs.close();
return result;
}

/**
* 主函数
*/
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://0.0.0.0:8020");
String remoteFilePath = "/user/hadoop/text.txt"; // HDFS文件

try {
if ( HDFSApi.rm(conf, remoteFilePath) ) {
System.out.println("文件删除: " + remoteFilePath);
} else {
System.out.println("操作失败(文件不存在或删除失败)");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

十、删除HDFS中指定的目录,由用户指定目录中如果存在文件时是否删除目录

见第7个实验

十一、在HDFS中,将文件从源路径移动到目的路径

  • shell

text.txt 是源路径,text2.txt所在的位置是目的路径所在的位置

1
hdfs dfs -mv text.txt text2.txt

  • java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.*;

public class HDFSApi {
/**
* 移动文件
*/
public static boolean mv(Configuration conf, String remoteFilePath, String remoteToFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path srcPath = new Path(remoteFilePath);
Path dstPath = new Path(remoteToFilePath);
boolean result = fs.rename(srcPath, dstPath);
fs.close();
return result;
}

/**
* 主函数
*/
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://0.0.0.0:8020");
String remoteFilePath = "hdfs:///user/hadoop/text.txt"; // 源文件HDFS路径
String remoteToFilePath = "hdfs:///user/hadoop/new.txt"; // 目的HDFS路径

try {
if ( HDFSApi.mv(conf, remoteFilePath, remoteToFilePath) ) {
System.out.println("将文件 " + remoteFilePath + " 移动到 " + remoteToFilePath);
} else {
System.out.println("操作失败(源文件不存在或移动失败)");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

编程实现一个类“MyFSDataInputStream”,该类继承“org.apache.hadoop.fs.FSDataInputStream”,要求如下:实现按行读取HDFS中指定文件的方法“readLine()”,如果读到文件末尾,则返回空,否则返回文件一行的文本。查看Java帮助手册或其它资料,用“java.net.URL”和“org.apache.hadoop.fs.FsURLStreamHandlerFactory”编程完成输出HDFS中指定文件的文本到终端中

关于Java API中URL类的使用细节,可以详细阅读:

java.net.URL

关于Hadoop中FsURLStreamHandlerFactory类的使用细节可以学习:
org.apache.hadoop.fs.FsURLStreamHandlerFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.*;

public class MyFSDataInputStream extends FSDataInputStream {
public MyFSDataInputStream(InputStream in) {
super(in);
}

/**
* 实现按行读取
* 每次读入一个字符,遇到"\n"结束,返回一行内容
*/
public static String readline(BufferedReader br) throws IOException {
char[] data = new char[1024];
int read = -1;
int off = 0;
// 循环执行时,br 每次会从上一次读取结束的位置继续读取
//因此该函数里,off 每次都从0开始
while ( (read = br.read(data, off, 1)) != -1 ) {
if (String.valueOf(data[off]).equals("\n") ) {
off += 1;
break;
}
off += 1;
}

if (off > 0) {
return String.valueOf(data);
} else {
return null;
}
}

/**
* 读取文件内容
*/
public static void cat(Configuration conf, String remoteFilePath) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path remotePath = new Path(remoteFilePath);
FSDataInputStream in = fs.open(remotePath);
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String line = null;
while ( (line = MyFSDataInputStream.readline(br)) != null ) {
System.out.println(line);
}
br.close();
in.close();
fs.close();
}

/**
* 主函数
*/
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://0.0.0.0:8020");
String remoteFilePath = "/user/hadoop/text.txt"; // HDFS路径
try {
MyFSDataInputStream.cat(conf, remoteFilePath);
} catch (Exception e) {
e.printStackTrace();
}
}
}

查看Java帮助手册或其它资料,用“java.net.URL”和“org.apache.hadoop.fs.FsURLStreamHandlerFactory”编程完成输出HDFS中指定文件的文本到终端中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import java.io.*;
import java.net.URL;

public class HDFSApi {
static{
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}

/**
* 主函数
*/
public static void main(String[] args) throws Exception {
String remoteFilePath = "hdfs://0.0.0.0:8020/user/hadoop/text.txt"; // HDFS文件
InputStream in = null;
try{
/* 通过URL对象打开数据流,从中读取数据 */
in = new URL(remoteFilePath).openStream();
IOUtils.copyBytes(in,System.out,4096,false);
} finally{
IOUtils.closeStream(in);
}
}
}

Java中的Socket

java.net包中提供了两种常见的网络协议的支持:

  • TCP:TCP传输控制协议,保证了两个应用程序之间的可靠传输通信
  • UDP:UDP是用户数据报协议,一个无链接的协议,提供了应用程序之间要发送的数据的数据包。

    Socket 的定义

    套接字使用TCP提供了两台计算机之间的通信机制。

计算机使用套接字进行通信的过程

  1. 服务器实例化一个ServerSocket对象,表示通过服务器上的端口通信
  2. 服务器调用ServerSocket类的accept()方法,该方法会一直等待,直到客户端连接到服务器上的给定的端口。
  3. 服务器正在等待的时候,一个客户端实例化一个Socket对象,指定服务器名称以及端口来请求连接。
  4. Socket类的构造函数试图将客户端连接到指定的服务器和端口号。如果通信被建立,则在客户端创建一个Socket对象与服务器进行通信。
  5. 在服务器端,accept()方法返回服务器上的一个新的socket引用,这个socket连接到客户端的socket。
  6. 连接建立之后,通过使用I/O流进行通信,每一个socket都有一个输出流和一个输入流。客户端的输出流连接到服务器的输入流,客户端的输入流连接到服务器的输出流。
    TCP是一个双向的通信协议,数据可以通过两个数据流在同一个时间发送。

    Socket方法

    参见识 java API

使用Socket实现文件的传输

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package Test;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.*;

public class Server extends Thread {
private ServerSocket serverSocket;
public Server(int port) throws IOException {
serverSocket = new ServerSocket(port);
//指定超时 以毫秒为单位
serverSocket.setSoTimeout(10000000);
}
public void run() {
while (true) {
System.out.println("waiting for client on port..."+serverSocket.getLocalPort());
try {
//等待新的连接
Socket server = serverSocket.accept();
System.out.println("Just connected to "+server.getRemoteSocketAddress());
//获取输入流
DataInputStream in = new DataInputStream(server.getInputStream());
File file = new File("/Users/gorge/Desktop"+server.getRemoteSocketAddress()+".txt");
FileOutputStream outputStream =new FileOutputStream(file);
byte car [] = new byte[1024];
int len = 0;
while ((len=in.read(car))!=-1) {
outputStream.write(car, 0, len);
}
outputStream.flush();
//System.out.println("Client say"+in.readUTF());

//DataOutputStream out = new DataOutputStream(server.getOutputStream());
//写入数据 返回这个套接字绑定的端点的地址
//out.writeUTF("Thanks you for a connecting to"+server.getLocalSocketAddress()+"good bye");
server.close();
} catch (SocketException s) {
// TODO Auto-generated catch block
System.out.println("socket timed out");
break;
}catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
break;
}
}
}
public static void main(String[] args) {
int port = 5000;
Thread thread;
try {
thread = new Server(port);
thread.start();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
在这里package Test;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;

public class Client extends Thread {
int port ;
String servername;
Socket client;
public Client(String servername, int port) {
this.servername = servername;
this.port = port;
try {
System.out.println("Connecting to "+servername+"on port"+port);
client = new Socket(servername,port);
System.out.println("Just connect to "+client.getRemoteSocketAddress());
//输出流
OutputStream outToserver = client.getOutputStream();
DataOutputStream out = new DataOutputStream(outToserver);
File file1 = new File("/Users/gorge/Desktop/a.txt");
//先从本地读取一个文件
FileInputStream inputStream = new FileInputStream(file1);
byte [] b = new byte[1024];
int len =0;
while(-1!=(len=inputStream.read(b))) {
out.write(b,0,len);
}
//out.writeUTF("Hello from" +client.getLocalSocketAddress());

//输入流
InputStream inFromserver = client.getInputStream();
DataInputStream in = new DataInputStream(inFromserver);
//System.out.println("Server says:"+in.readUTF());
client.close();
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void main(String[] args) {
Client cl = new Client("127.0.0.1", 5000);
cl.start();
}

}

WebMagic框架实现对58同城就业信息的爬取

简介

此信息的爬取,纯属是练习使用,没有用在商业用途。
WebMagic的设计参考了业界最优秀的爬虫Scrapy,而实现则应用了HttpClient、Jsoup等Java世界最成熟的工具。

WebMagic由四个组件(Downloader、PageProcessor、Scheduler、Pipeline)构成,核心代码非常简单,主要是将这些组件结合并完成多线程的任务。
在这里插入图片描述

导包

使用webmagic需要导入的基本包:

webmagic-core
webmagic-core是WebMagic核心部分,只包含爬虫基本模块和基本抽取器。WebMagic-core的目标是成为网页爬虫的一个教科书般的实现。

webmagic-extension
webmagic-extension是WebMagic的主要扩展模块,提供一些更方便的编写爬虫的工具。包括注解格式定义爬虫、JSON、分布式等支持。

页面分析

假设我们现在要爬取的是城市在北京的所有的岗位招聘信息。
58同城的页面的设计如下:
在这里插入图片描述
从这一个界面上大家就可以很容易的找到一些想要的基本的信息,比如说岗位、公司性质、薪资情况等信息。如果你需要的信息这些就足够了,那么你只需要对这一个界面进行分析爬取就好了。我这里为了获取更完善的信息,所以就进行了进一步的分析。

职位详细信息的界面设置如下:
在这里插入图片描述

链接分析

总体界面链接分析:
第一页的链接:
https://search.51job.com/list/010000,000000,0000,00,9,99,+,2,1.html?lang=c&postchannel=0000&workyear=99&cotype=99&degreefrom=99&jobterm=99&companysize=99&ord_field=0&dibiaoid=0&line=&welfare=
第二页的链接:
https://search.51job.com/list/010000,000000,0000,00,9,99,+,2,2.html?lang=c&postchannel=0000&workyear=99&cotype=99&degreefrom=99&jobterm=99&companysize=99&ord_field=0&dibiaoid=0&line=&welfare=

大家对这个链接进行分析,可以发现链接是有规律的,在同一个城市下的求职信息,链接的差别主要在 n.html 这个位置,所以如果只是获取这个页面的信息,链接的分析到此就可以结束了,直接对页面进行详细分析,如何用xpath 进行匹配到你想要获取的信息,就可以了。

详细页面链接分析:
第一个职位链接:
https://jobs.51job.com/beijing-tzq/124559934.html?s=01&t=0
第二个职位链接:
https://jobs.51job.com/beijing-cyq/117551387.html?s=01&t=0

这里我没有看出很容易处理的共同点来。所有这个链接就不能很简单的获取。但是我们可以从每一个汇总页的位置获得,大家可以多看几个链接进行分析,可以用正则表达式进行进一步的分析(这个方式似乎更方便一些)

页面HTML分析

所以我们这里就直接分析总页就可以了。
在这里插入图片描述
大家看到这里是不是已经很明显了,对于总页的每一个模块的链接是非常明显的,并且58同城的信息每一个模块的设置的格式是非常整洁的。可以看到每一个div的class属性是e,然后直接找子标签为a的标签的href属性就可以得到你想要的进一步的页面的链接。

所以,就进行了两步爬取分析。第一步是爬取相关招聘信息的网址,第二步是针对当前网址爬取招聘岗位的详细信息。

编写爬虫(这里只介绍一下爬取网址,详细信息过程一样)

  • 在WebMagic里面如果想要实现基本的爬虫,只需要编写一个类实现Pa geProcessor即可,需要重写process(Page page)方法
  • 爬虫的配置
    1、Spider
    Spider是爬虫启动的入口。在启动爬虫之前,我们需要使用一个PageProcessor创建一个Spider对象,然后使用run()进行启动。同时Spider的其他组件(Downloader、Scheduler、Pipeline)都可以通过set方法来进行设置。
1
2
3
4
5
6
Spider.create(new URls())
.addUrl("url")
//使用文件保存抓取URL,可以在关闭程序并下次启动时,从之前抓取到的URL继续抓取(具体内容查看Scheduler的配置)
.setScheduler(new FileCacheQueueScheduler("./51jobUrl"))
.thread(5)
.run();

2、Site
对站点本身的一些配置信息,例如编码、HTTP头、超时时间、重试策略等、代理等,都可以通过设置Site对象来进行配置。
private Site site = Site.me().setRetryTimes(3).setSleepTime(10000).setTimeOut(10000);

  • 抽取信息并且将信息保留下来

将获取到的页面进行xpath解析,xpath技术这里就不在赘述。.all()方法是返回所有抽取结果。

1
page.getHtml().xpath("//div[@class='el']//p[@class='t1']/span/a/@href").all()

将获得数据保存下来

1
page.putField("url",page.getHtml().xpath("//div[@class='el']//p[@class='t1']/span/a/@href").all());

因为我们这里需要将数据保存到数据库中,因此需要将数据从f ield中通过名称获取出来

1
String  a = page.getResultItems().get("url").toString();
  • 从页面发现后续的地址来进行抓取
    list是你要进行匹配的链接,可以是使用正则表达式进行匹配的,也可以是手动添加的等。
1
page.addTargetRequests(list);
  • 数据已经获得了,最后将这些数据插入到提前建设好的数据库就可以了。
  • 源码
    官方学习文档

本机Mac建设git记录《一》

安装

首先,是在电脑上通过app store 安装Xcode.
在 Xcode中找到locations 这个 选项,可以发现有一个Command Line Tools 的工具。

创建本地仓库

1
2
mkdir learngit
git init

我的这个安装目录是 /Users/gorge/learngit

查看git是否配置成功

git config –list

密钥的位置

在gorge 下执行ls -al 查看是否存在 .ssh 目录

上传方式

  • 首先我们要将我们要进行管理的文件 移动到本地建立的文件夹
  • 在当前目录执行 git add [文件名] 将文件添加到本地仓库
  • 执行提交 git commit -m “文件描述” 命令 ,告诉系统文件提交到仓库
  • 将文件 直接提交到 远程仓库的命令是 git push origin master 命令进行提交

本地文件夹是工作区.git目录是版本库
我们执行的添加操作实际上是指将信息添加到本地的工作区中的版本库中。
git add 操作将数据添加到暂存区。
git commit 操作提交更改,将内容添加到分支master

查看历史修改版本的命令

1
git reset --hard  HEAD^
  • HEAD 表示当前版本
  • ^ 表示回退一个版本
  • ^^ 表示回退两个版本
  • 如果是多个版本的话可以是 HEAD~n

或者执行git reset –hard [id]

查看历史记录

git log 用来查看提交记录
git reflog 用来查看你的每一次操作记录

撤销修改

主要分为两种情况:

  1. 首先是在工作区的修改还没有提交到暂存区

下面的命令会将文件的版本恢复为我们提交到分支的最新的版本。

1
git checkout -- readme.txt

  1. 在工作区进行了修改并且已经提交到暂存区
1
git reset HEAD <file>

分支管理

为了提高操作的安全行,我们在操作的过程中可以使用
创建新的分支–执行操作–合并分支 的过程。
我们通常使用的是一个分支master分支。但是对于git来说,分支的创建与合并是非常快速的。因此,我们为了操作的安全性,创建一个新的分支进行操作无疑是一个非常好的方法。

执行创建分支dev,并且将当前工作转换到当前分支上面。

1
git checkout -b dev

查看当前分支

1
git branch

在当前分支执行操作完成后,进行添加并提交

1
2
git add <文件名>
git commit -m "文件描述"

分支的切换操作,master是我们的默认分支

1
git checkout master

或者
创建并且切换到新的分支

1
2
3
git switch -c dev
//切换到已有分支
git switch master

将dev分支进行合并

1
git merge dev

合并完成之后,新创建的分支就可以删除了

1
git branch -d dev

NDCG

归一化则损累积增益
这个指标通常是用来衡量和评价搜索结果算法

  1. 高关联度的结果比一般关联度的结果更影响最终的指标得分
  2. 有高关联度的结果出现在更靠前的位置的时候,指标会越高

    累计增益CG

一个搜索结果相关性分数的总和,与顺序无关
指定位置P的CG为:
在这里插入图片描述
其中reli 表示 i这个位置上的相关度。

折损累计增益DCG

就是在每一个CG的结果上除以一个折损值,DCG在每个项目的得分乘上一个权值,该权值与位置成反关系,即位置越靠前,权值越大,目的是为了让排名越靠前的结果越能影响最后的结果。排序越往后,价值越低。
在这里插入图片描述
或者
在这里插入图片描述

归一化则损累计增益

由于搜索结果随着检索词的不同,返回的数量是不一致的。而DCG是一个累加的值,没法对两个不同的结果进行比较,所以需要进行一个归一化的处理。
在这里插入图片描述
IDCG 为理想情况下的最大的DCG值
在这里插入图片描述
其中|REL|表示结果按照相关性从大到小的顺序排序,取前P个结果组成的集合。

Pearson Collelation

皮尔森相关系数是衡量线性关联性的程度,P的一个几何解释是其代表两个变量的取值根据均值集中后构成的向量之间夹角的余弦。

  • 0 无相关
  • 0-1 正相关
  • -1 - 0负相关

相关系数

  • 0.8-1.0 极强相关
  • 0.6-0.8 强相关
  • 0.4-0.6 中等相关
  • 0.2- 0.4 弱相关
  • 0.0- 0.2 极弱相关或者无关

    计算公式

在这里插入图片描述
两个连续变量(x,y)的per 系数P(x,y)等于他们之间的协方差cov(x,y)除以他们各自的标准差成绩。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import math
vec1 = [0,1,2,3,4]
vec2 = [0,1,2,3,4]
def pearson(vec1,vec2):
n = len(vec1)
n1= len(vec2)
if(n==n1):
sum1 = sum(float(vec1[i]) for i in range(n))
sum2 = sum(float(vec2[i]) for i in range(n))
sum1_pow = sum([pow(v,2.0) for v in vec1])
sum2_pow = sum([pow(v,2.0) for v in vec2])
p_sum = sum([vec1[i]*vec2[i] for i in range(n)])
num = p_sum-(sum1*sum2/n)
fen = math.sqrt((sum1_pow-pow(sum1,2)/n)*(sum2_pow-pow(sum2,2)/n))
if fen ==0:
return 0.0;
return num/fen;
return 0.0;
v = pearson(vec1,vec2);
print(v)

PDF & CDF

概率密度函数 PDF

定义

在数学中,连续型随机变量的概率密度函数(在不至于混淆时可以简称为密度函数)是一个描述这个随机变量的输出值,在某个确定的取值点附近的可能性的函数。而随机变量的取值落在某个区域之内的概率则为概率密度函数在这个区域上的积分。

性质

对于一维连续随机变量
随机数据的概率密度函数:表示瞬时幅值落在指定的范围内的概率,因此是幅值的函数。它随所取范围的幅值而变化。
密度函数f(x) 具有以下性质:

  1. f(x) >= 0
  2. 在这里插入图片描述
    1. 在这里插入图片描述

累积分布函数CDF

累积分布函数又叫分布函数是概率密度函数的积分,能完整的描述一个实随机变量X的概率分布。

定义

对于所有的实数x累积分布函数
在这里插入图片描述

意义

对离散变量而言,所有小于等于a的值出现概率的和

性质

  • 有界性
    在这里插入图片描述
  • 单调性
    在这里插入图片描述
  • 右连续性
    在这里插入图片描述

    两种应用

    一种是对小于参考值的现象值的出现频率的分析的累积频率分析
    另一种是对累计分布函数进行估计,随后可以求的简单的统计值或进行各种统计假设检验。