Spark学习笔记——二次排序,TopN,TopNByGroup

一、二次排序

使用sortByKey实现二次排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
object SecondSort {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("SecondSort")
.setMaster("local")
.set("spark.testing.memory", "2147480000")
val sc = new SparkContext(conf)
val lines = sc.textFile("secondSort.txt")
val pairs = lines.map { x =>
(new SecondSortKey(x.split(" ")(0).toInt,
x.split(" ")(1).toInt), x)
}
val sortedPairs = pairs.sortByKey(false);
sortedPairs.map(_._2).foreach(println)
}
}
class SecondSortKey(val first: Int, val second: Int)
extends Ordered[SecondSortKey] with Serializable {
override def compare(that: SecondSortKey): Int = {
if (this.first - that.first == 0) {
this.second - that.second
} else {
this.first - that.first
}
}
}

【测试数据】

`
3 10
5 2
6 5
8 123
1 4
4 123
5 432
3 54
5 121
8 654
3 98
`    

【输出结果】

`
8 654
8 123
6 5
5 432
5 121
5 2
4 123
3 98
3 54
3 10
1 4
`

二、TopN

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
object TopN {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("TopN")
.setMaster("local")
.set("spark.testing.memory", "2147480000")
val sc = new SparkContext(conf)
val lines = sc.textFile("top.txt")
val lineList = lines.map(x => (x.split(",")(0), x))
val sortRDD = lineList.sortByKey(false)
val resultRDD = sortRDD.map(x => x._2)
for (a <- resultRDD.take(5)) {
println(a)
}
}
}

【测试数据】

`
3,zhangsan
5,lisi
6,wangwu
7,wermaziang
1,scc
4,hzf
5,gyx
6,zdh
9,laogao
0,xiaogao
3,laoxiao
`

【输出结果】

`
9,laogao
7,wermaziang
6,wangwu
6,zdh
5,lisi
`

三、TopNByGroup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
object TopNByGroup {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("TopNByGroup")
.setMaster("local")
.set("spark.testing.memory", "2147480000")
val sc = new SparkContext(conf)
val lines = sc.textFile("scores.txt")
val lineList = lines.map(x => (x.split("\t")(0),
x.split("\t")(1))).groupByKey()
//lineList.foreach(println)
/*(class3,CompactBuffer(70, 70, 98))
(class1,CompactBuffer(100, 102, 45, 16, 95, 99))
(class2,CompactBuffer(85, 65, 85, 88, 37, 23))*/
val topList = lineList.map(x => {
var t = List[Int]()
for (a <- x._2) {
t = t.::(a.toInt)
}
t.sortBy(x => -x).take(3)
})
topList.foreach(println)
}
}

【测试数据】

`
class1    100
class2    85
class3    70
class1    102
class2    65
class1    45
class2    85
class3    70
class1    16
class2    88
class1    95
class2    37
class3    98
class1    99
class2    23
`

【输出结果】

`
List(98, 70, 70)
List(102, 100, 99)
List(88, 85, 85)
`