Spark SQL 初探: 使用大数据分析2000万数据

目录 [−]

  1. 安装和配置Spark
  2. Spark初试
  3. 使用Spark SQL分析数据

去年网上曾放出个2000W的开房记录的数据库, 不知真假。 最近在学习Spark, 所以特意从网上找来数据测试一下, 这是一个绝佳的大数据素材。 如果数据涉及到个人隐私,请尽快删除, 本站不提供此类数据。你可以写个随机程序生成2000W的测试数据, 以CSV格式。

Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。

Spark是一个高效的分布式计算系统,相比Hadoop,它在性能上比Hadoop要高100倍。Spark提供比Hadoop更上层的API,同样的算法在Spark中实现往往只有Hadoop的1/10或者1/100的长度。Shark类似“SQL on Spark”,是一个在Spark上数据仓库的实现,在兼容Hive的情况下,性能最高可以达到Hive的一百倍。

Apache Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。

2014年处, Apache 基金会宣布旗下的 Apache Spark 项目成为基金会的顶级项目,拥有顶级域名 http://spark.apache.org/。 Spark 的用户包括:阿里巴巴、Cloudera、Databricks、IBM、英特尔和雅虎等知名厂商。

Spark SQL是支持在Spark中使用Sql、HiveSql、Scaca中的关系型查询表达式。它的核心组件是一个新增的RDD类型SchemaRDD,它把行对象用一个Schema来描述行里面的所有列的数据类型,它就像是关系型数据库里面的一张表。它可以从原有的RDD创建,也可以是Parquet文件,最重要的是它可以支持用HiveQL从hive里面读取数据。

在2014年7月1日的Spark Summit上,Databricks宣布终止对Shark的开发,将重点放到Spark SQL上。在会议上,Databricks表示,Shark更多是对Hive的改造,替换了Hive的物理执行引擎,因此会有一个很快的速度。然而,不容忽视的是,Shark继承了大量的Hive代码,因此给优化和维护带来了大量的麻烦。随着性能优化和先进分析整合的进一步加深,基于MapReduce设计的部分无疑成为了整个项目的瓶颈。 详细内容请参看 Shark, Spark SQL, Hive on Spark, and the future of SQL on Spark

当前Spark SQL还处于alpha阶段,一些API在将将来的版本中可能会有所改变。

我也翻译几篇重要的Spark文档,你可以在我的网站找到。 Spark翻译文档

本文主要介绍了下面几个知识点:

  • Spark读取文件夹的文件
  • Spark filter和map使用
  • Spark sql语句调用
  • 自定义Spark sql的函数

提前讲一下,我也是最近才学习Spark及其相关的技术如Scala,下面的例子纯粹为了验证性的试验, 相信例子代码很很多优化的地方。

安装和配置Spark

当前最新的Spark版本为1.1.1, 因为我们以Standalone方式运行Spark,所以直接随便挑一个版本, 比如spark-1.1.1-bin-hadoop2.4.tgz, 解压到你的机器上。
我使用的CentOS 6.4。 具体来讲,它是我笔记本的一个虚拟机, 4个核, 4G内存。

在/opt解压它, 命令行中进入解压后的目录/opt/spark-1.1.1-bin-hadoop2.4。

运行./bin/spark-shell就可以启动一个交互式的spark shell控制台, 在其中可以执行scala代码。

Spark初试

因为我们以本地单机的形式测试Spark, 你需要配置以下你的spark, 否则在分析大数据时很容易出现内存不够的问题。
在我的机器上, conf文件夹下复制一份spark-defaults.conf,将使用的内存增大一些:

1
2
spark.executor.memory 2g
spark.driver.memory 2g

启动shark-shell的时候设置使用4个核。

1
[root@colobu conf]# ./bin/spark-shell --master local[4]

根据 Spark 快速入门中的介绍运行个例子测试一下:

1
2
3
4
5
6
scala> val textFile = sc.textFile("README.md")
14/12/11 13:52:00 INFO MemoryStore: ensureFreeSpace(163705) called with curMem=0, maxMem=1111794647
14/12/11 13:52:00 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 159.9 KB, free 1060.1 MB)
textFile: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at <console>:12
scala> textFile.count()

这个例子从Spark解压目录下的README.md文件创建一个RDD,并统计此文件有多少行。

再看一个抛针法计算PI值的例子。

1
2
3
4
5
6
7
val NUM_SAMPLES=1000000
val count = sc.parallelize(1 to NUM_SAMPLES).map{i =>
val x = Math.random()
val y = Math.random()
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi 值大约为 " + 4.0 * count / NUM_SAMPLES)

结果为:

1
Pi 值大约为 3.141408

到目前为止,我们搭建好了一个Spark环境, 并简单进行了测试。 下一步我们使用Spark SQL分析前面所说的数据。

使用Spark SQL分析数据

这一步,我们使用Spark SQL按照星座对2000W数据进行分组统计, 看看哪个星座的人最喜欢开房。
当然, 使用纯Spark也可以完成我们的分析, 因为实际Spark SQL最终是利用Spark来完成的。
实际测试中发现这些数据并不是完全遵守一个schema, 有些数据的格式是不对的, 有些数据的数据项也是错误的。 在代码中我们要剔除那么干扰数据。
反正我们用这个数据测试者玩, 并没有严格的要求去整理哪些错误数据。

先看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD
case class Customer(name: String, gender: String, ctfId: String, birthday: String, address: String)
val customer = sc.textFile("/mnt/share/2000W/*.csv").map(_.split(",")).filter(line => line.length > 7).map(p => Customer(p(0), p(5), p(4), p(6), p(7))).distinct()
customer.registerTempTable("customer")
def toInt(s: String):Int = {
try {
s.toInt
} catch {
case e:Exception => 9999
}
}
def myfun(birthday: String) : String = {
var rt = "未知"
if (birthday.length == 8) {
val md = toInt(birthday.substring(4))
if (md >= 120 & md <= 219)
rt = "水瓶座"
else if (md >= 220 & md <= 320)
rt = "双鱼座"
else if (md >= 321 & md <= 420)
rt = "白羊座"
else if (md >= 421 & md <= 521)
rt = "金牛座"
else if (md >= 522 & md <= 621)
rt = "双子座"
else if (md >= 622 & md <= 722)
rt = "巨蟹座"
else if (md >= 723 & md <= 823)
rt = "狮子座"
else if (md >= 824 & md <= 923)
rt = "处女座"
else if (md >= 924 & md <= 1023)
rt = "天秤座"
else if (md >= 1024 & md <= 1122)
rt = "天蝎座"
else if (md >= 1123 & md <= 1222)
rt = "射手座"
else if ((md >= 1223 & md <= 1231) | (md >= 101 & md <= 119))
rt = "摩蝎座"
else
rt = "未知"
}
rt
}
sqlContext.registerFunction("constellation", (x:String) => myfun(x))
var result = sqlContext.sql("SELECT constellation(birthday), count(constellation(birthday)) FROM customer group by constellation(birthday)")
result.collect().foreach(println)

为了使用spark sql,你需要引入 sqlContext.createSchemaRDD. Spark sql一个核心对象就是SchemaRDD。 上面的import可以隐式的将一个RDD转换成SchemaRDD。
接着定义了Customer类,用来映射每一行的数据, 我们只使用每一行很少的信息, 像地址,email等都没用到。
接下来从2000W文件夹中读取所有的csv文件, 创建一个RDD并注册表customer。
因为没有一个内建的函数可以将出生一起映射为星座, 所以我们需要定义一个映射函数myfun, 并把它注册到SparkContext中。 这样我们就可以在sql语句中使用这个函数。 类似地,字符串的length函数当前也不支持, 你可以增加一个这样的函数。 因为有的日期不正确,所有特别增加了一个"未知"的星座。 错误数据可能有两种, 一是日期出错, 而是此行格式不对,将其它字段映射成了出生日期。 我们在分析的时候忽略它们好了。

然后执行一个分组的sql语句。这个sql语句查询结果类型为SchemaRDD, 也继承了RDD所有的操作。
最后将结果打印出来。

1
2
3
4
5
6
7
8
9
10
11
12
[双子座,1406018]
[双鱼座,1509839]
[摩蝎座,2404812]
[金牛座,1406225]
[水瓶座,1635358]
[巨蟹座,1498077]
[处女座,1666009]
[天秤座,1896544]
[白羊座,1409838]
[射手座,1614915]
[未知,160483]
[狮子座,1613529]

看起来魔蝎座的人最喜欢开房了, 明显比其它星座的人要多。

我们也可以分析一下开房的男女比例:

1
2
3
......
result = sqlContext.sql("SELECT gender, count(gender) FROM customer where gender = 'F' or gender = 'M' group by gender")
result.collect().foreach(println)

结果显示男女开房的人数大约是2:1

1
2
[F,6475461]
[M,12763926]