Window aggregate functions (hay thường được gọi tắt là window functions hoặc windowed aggregates) là hàm giúp hỗ trợ tính toán trên 1 nhóm các bản ghi được gọi là cửa sổ mà có liên quan tới bản ghi hiện tại.
Nhìn chung thì những phần này khá trìu tượng về mặt lý thuyết nên đọc lý thuyết càng gây ra các cảm giác khó hiểu nên thế mình sẽ giới thiệu window function và pivot qua những ví dụ để dễ dàng hình dung hơn nha.
Example 1: Simple select
Yêu cầu
Cho tập dữ liệu dưới dạng file csv như sau với tên file là input.csv
:
1
2
3
4
5
id,name,population
0,Warsaw,1 764 615
1,Villeneuve-Loubet,15 020
2,Vranje,83 524
3,Pittsburgh,1 775 634
- Load dữ liệu từ csv file
- Query population lớn nhất
INPUT:
id | name | population |
---|---|---|
0 | Warsaw | 1 764 615 |
1 | Villeneuve-Loubet | 15 020 |
2 | Vranje | 83 524 |
3 | Pittsburgh | 1 775 634 |
OUTPUT:
population |
---|
1775634 |
Lời giải
Bài này mang tính chất khởi động thôi chứ chưa hề động gì tới window function hay pivot cả, mọi người làm như bình thường:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package part1;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class Main {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("Bai_1").master("local").getOrCreate();
Dataset<Row> data = spark.read().option("header", true).option("inferSchema", true).csv("src/part1/input.csv");
data.createOrReplaceTempView("data");
Dataset<Row> result = spark.sql("select population from data order by population desc limit 1");
result.show();
}
}
- option header giúp việc đọc vào từ file csv sẽ mặc định dòng đầu tiên là dòng tiêu đề cho các hàng
- option inferSchema sẽ mặc định cho Spark tự nhận dạng dữ liệu đầu vào mà không cần chúng ta phải tạo schema nữa
Chúng ta sẽ được kết quả của bài 1 như sau:
1
2
3
4
5
+----------+
|population|
+----------+
| 1775634|
+----------+
Example 2: group function
Yêu cầu
Cho tập dữ liệu sau:
id | group |
---|---|
0 | 0 |
1 | 1 |
2 | 0 |
3 | 1 |
4 | 0 |
Nhóm lại các id cùng group và cho ra output như sau:
group | ids |
---|---|
0 | [0, 2, 4] |
1 | [1, 3] |
Lưu ý: [0,2,4] là biểu diễn mảng
Lời giải
Bài này cũng chưa phải window function hay pivot gì, chúng ta nhóm bằng việc sử dụng hàm group như sau:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package part2;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.StructType;
public class Main {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("Bai_1").master("local").getOrCreate();
StructType schema = new StructType().add("id", "integer")
.add("group", "integer");
List<Row> listOfdata = new ArrayList<Row>();
listOfdata.add(RowFactory.create(0,0));
listOfdata.add(RowFactory.create(1,1));
listOfdata.add(RowFactory.create(2,0));
listOfdata.add(RowFactory.create(3,1));
listOfdata.add(RowFactory.create(4,0));
Dataset<Row> data = spark.createDataFrame(listOfdata,schema);
Dataset<Row> result = data.groupBy("group")
.agg(functions.collect_list("id").as("ids")).sort("group");
result.show();
}
}
Chúng ta sẽ được kết quả của bài 2 như sau:
1
2
3
4
5
6
+-----+---------+
|group| ids|
+-----+---------+
| 0|[0, 2, 4]|
| 1| [1, 3]|
+-----+---------+
Example 3: pivot
Yêu cầu
Cho tập dữ liệu như sau:
id | day | price | units |
---|---|---|---|
100 | 1 | 23 | 10 |
100 | 2 | 45 | 11 |
100 | 3 | 67 | 12 |
100 | 4 | 78 | 13 |
101 | 1 | 23 | 10 |
101 | 2 | 45 | 13 |
101 | 3 | 67 | 14 |
101 | 4 | 78 | 15 |
102 | 1 | 23 | 10 |
102 | 2 | 45 | 11 |
102 | 3 | 67 | 16 |
102 | 4 | 78 | 18 |
Sử dụng pivot và cho ra kết quả như sau:
id | price_1 | price_2 | price_3 | price_4 | unit_1 | unit_2 | unit_3 | unit_4 |
---|---|---|---|---|---|---|---|---|
100 | 23 | 45 | 67 | 78 | 10 | 11 | 12 | 13 |
101 | 23 | 45 | 67 | 78 | 10 | 13 | 14 | 15 |
102 | 23 | 45 | 67 | 78 | 10 | 11 | 16 | 18 |
Lời giải
Đầu tiên là chúng ta sẽ đi tạo tập dữ liệu theo yêu cầu đề bài (các bạn cũng có thể tạo 1 file csv và đọc vào cũng được nha, ở đây mình muốn tạo dữ liệu bởi nhiều cách để có thể giới thiệu hết 1 lượt những cách tạo dữ liệu input trong spark):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
StructType schema = new StructType().add("id", "integer")
.add("day", "integer")
.add("price", "integer")
.add("units", "integer");
List<Row> listOfdata = new ArrayList<Row>();
listOfdata.add(RowFactory.create(100,1,23,10));
listOfdata.add(RowFactory.create(100,2,45,11));
listOfdata.add(RowFactory.create(100,3,67,12));
listOfdata.add(RowFactory.create(100,4,78,13));
listOfdata.add(RowFactory.create(101,1,23,10));
listOfdata.add(RowFactory.create(101,2,45,13));
listOfdata.add(RowFactory.create(101,3,67,14));
listOfdata.add(RowFactory.create(101,4,78,15));
listOfdata.add(RowFactory.create(102,1,23,10));
listOfdata.add(RowFactory.create(102,2,45,11));
listOfdata.add(RowFactory.create(102,3,67,16));
listOfdata.add(RowFactory.create(102,4,78,18));
Nhìn vào yêu cầu output chúng ta sẽ thấy là sẽ có 2 yêu cầu rõ rệt là sẽ xoay ngang cột price và cột unit. Vậy chúng ta cũng sẽ làm từng bước một, đầu tiên là xoay ngang cột price ra trước như sau:
1
2
Dataset<Row> result_1 = data.withColumn("concat_day", functions.concat(functions.lit("price_"), data.col("day")))
.groupBy("id").pivot("concat_day").agg(functions.first("price"));
functions.concat()
giúp tạo ra các giá trị cho cột mới là price cộng với đuôi là giá trị daypivot
giúp xoay ngang lại bảng dữ liệu ban đầu với cột xoay là cột vừa mới tạo concat_day
Kết quả của bước đầu tiên sẽ là:
1
2
3
4
5
6
7
+---+-------+-------+-------+-------+
| id|price_1|price_2|price_3|price_4|
+---+-------+-------+-------+-------+
|101| 23| 45| 67| 78|
|100| 23| 45| 67| 78|
|102| 23| 45| 67| 78|
+---+-------+-------+-------+-------+
Tương tự ta xoay ngang dữ liệu ban đầu theo cột unit ở bước 2 như sau:
1
2
Dataset<Row> result_2 = data.withColumn("concat_day", functions.concat(functions.lit("unit_"), data.col("day")))
.groupBy("id").pivot("concat_day").agg(functions.first("units"));
Kết quả của bước 2 là:
1
2
3
4
5
6
7
+---+------+------+------+------+
| id|unit_1|unit_2|unit_3|unit_4|
+---+------+------+------+------+
|101| 10| 13| 14| 15|
|100| 10| 11| 12| 13|
|102| 10| 11| 16| 18|
+---+------+------+------+------+
Công việc còn lại là đơn giản rồi, chỉ cần nối 2 bảng lại là xong:
1
Dataset<Row> result = result_1.join(result_2, "id").sort("id");
Ta sẽ có kết quả cuối cùng là:
1
2
3
4
5
6
7
+---+-------+-------+-------+-------+------+------+------+------+
| id|price_1|price_2|price_3|price_4|unit_1|unit_2|unit_3|unit_4|
+---+-------+-------+-------+-------+------+------+------+------+
|100| 23| 45| 67| 78| 10| 11| 12| 13|
|101| 23| 45| 67| 78| 10| 13| 14| 15|
|102| 23| 45| 67| 78| 10| 11| 16| 18|
+---+-------+-------+-------+-------+------+------+------+------+
Example 4: window function
Yêu cầu
Cho tập dữ liệu sau:
1
2
3
4
5
6
7
8
9
10
11
time,department,items_sold
1,IT,15
2,Support,81
3,Support,90
4,Support,25
5,IT,40
6,IT,24
7,Support,31
8,Support,1
9,HR,27
10,IT,75
Yêu cầu:
- Load dữ liệu vào spark từ file csv
- Tính running_total hay tổng tích lũy số item đã bán được đến thời điểm time.
Output có dạng như sau:
time | department | items_sold | running_total |
---|---|---|---|
9 | HR | 27 | 27 |
1 | IT | 15 | 15 |
5 | IT | 40 | 55 |
6 | IT | 24 | 79 |
10 | IT | 75 | 154 |
2 | Support | 81 | 81 |
3 | Support | 90 | 171 |
4 | Support | 25 | 196 |
7 | Support | 31 | 227 |
8 | Support | 1 | 228 |
Lời giải
Phân tích output tí ta có thể thấy:
- Các bản ghi được nhóm lại theo từng department và sắp xếp theo time
- Cột running_total được tính bằng tổng tích lũy các items_sold ở phía trước nó mà cùng department
Phần nhóm và sắp xếp thì ta có thể sử dụng group và sort, còn phần tính tổng tích lũy thì sao? Giờ phải có cách nào lấy ra các bản ghi cùng department và ở phía trước của bản ghi hiện tại, window function là cách tốt nhất để giải quyết vấn đề này.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package part4;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
public class Main {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("Bai_4").master("local").getOrCreate();
Dataset<Row> data = spark.read().option("header", true).option("inferSchema", true).csv("src/part4/input.csv");
WindowSpec wins = Window.partitionBy("department").orderBy("time").rowsBetween(Window.unboundedPreceding(), Window.currentRow());
Dataset<Row> result = data.withColumn("running_total", functions.sum("items_sold").over(wins));
result.show();
}
}
partitionBy
chia tách dữ liệu ban đầu ra theo partitionrowsBetween(Window.unboundedPreceding(), Window.currentRow())
giới hạn lại không gian của cửa sổ là từ đầu cho tới vị trí hiện tạifunctions.sum("items_sold").over(wins)
là giá trị của cộtrunning_total
được tạo bằng tổng của cột items_sold giới hạn trong cửa sổwins
Kết quả cuối cùng của ví dụ 4 sẽ là:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
+----+----------+----------+-------------+
|time|department|items_sold|running_total|
+----+----------+----------+-------------+
| 9| HR| 27| 27|
| 1| IT| 15| 15|
| 5| IT| 40| 55|
| 6| IT| 24| 79|
| 10| IT| 75| 154|
| 2| Support| 81| 81|
| 3| Support| 90| 171|
| 4| Support| 25| 196|
| 7| Support| 31| 227|
| 8| Support| 1| 228|
+----+----------+----------+-------------+
Còn 1 số ví dụ nữa có lẽ mình sẽ viết trong phần 2 nha vì bài này cũng khá là dài rồi. Xem tiếp phần 2 TẠI ĐÂY