Posts Window function, pivot trong Spark SQL
Post
Cancel

Window function, pivot trong Spark SQL

Window aggregate functions (hay thường được gọi tắt là window functions hoặc windowed aggregates) là hàm giúp hỗ trợ tính toán trên 1 nhóm các bản ghi được gọi là cửa sổ mà có liên quan tới bản ghi hiện tại.

Nhìn chung thì những phần này khá trìu tượng về mặt lý thuyết nên đọc lý thuyết càng gây ra các cảm giác khó hiểu nên thế mình sẽ giới thiệu window functionpivot qua những ví dụ để dễ dàng hình dung hơn nha.

Example 1: Simple select

Yêu cầu

Cho tập dữ liệu dưới dạng file csv như sau với tên file là input.csv:

1
2
3
4
5
id,name,population
0,Warsaw,1 764 615
1,Villeneuve-Loubet,15 020
2,Vranje,83 524
3,Pittsburgh,1 775 634
  • Load dữ liệu từ csv file
  • Query population lớn nhất

INPUT:

idnamepopulation
0Warsaw1 764 615
1Villeneuve-Loubet15 020
2Vranje83 524
3Pittsburgh1 775 634

OUTPUT:

population
1775634

Lời giải

Bài này mang tính chất khởi động thôi chứ chưa hề động gì tới window function hay pivot cả, mọi người làm như bình thường:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package part1;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class Main {
	public static void main(String[] args) {
		SparkSession spark = SparkSession.builder().appName("Bai_1").master("local").getOrCreate();
		Dataset<Row> data = spark.read().option("header", true).option("inferSchema", true).csv("src/part1/input.csv");
		
		data.createOrReplaceTempView("data");
		Dataset<Row> result = spark.sql("select population from data order by population desc limit 1");
		
		
		result.show();
	}
}
  • option header giúp việc đọc vào từ file csv sẽ mặc định dòng đầu tiên là dòng tiêu đề cho các hàng
  • option inferSchema sẽ mặc định cho Spark tự nhận dạng dữ liệu đầu vào mà không cần chúng ta phải tạo schema nữa

Chúng ta sẽ được kết quả của bài 1 như sau:

1
2
3
4
5
+----------+
|population|
+----------+
|   1775634|
+----------+

Example 2: group function

Yêu cầu

Cho tập dữ liệu sau:

idgroup
00
11
20
31
40

Nhóm lại các id cùng group và cho ra output như sau:

groupids
0[0, 2, 4]
1[1, 3]

Lưu ý: [0,2,4] là biểu diễn mảng

Lời giải

Bài này cũng chưa phải window function hay pivot gì, chúng ta nhóm bằng việc sử dụng hàm group như sau:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package part2;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.StructType;

public class Main {
	public static void main(String[] args) {
		SparkSession spark = SparkSession.builder().appName("Bai_1").master("local").getOrCreate();
		
		StructType schema = new StructType().add("id", "integer")
				.add("group", "integer");

        List<Row> listOfdata = new ArrayList<Row>();
        listOfdata.add(RowFactory.create(0,0));
        listOfdata.add(RowFactory.create(1,1));
        listOfdata.add(RowFactory.create(2,0));
        listOfdata.add(RowFactory.create(3,1));
        listOfdata.add(RowFactory.create(4,0));
        
        Dataset<Row> data = spark.createDataFrame(listOfdata,schema);
        
        Dataset<Row> result = data.groupBy("group")
        		.agg(functions.collect_list("id").as("ids")).sort("group");
        
        result.show();
	}
}

Chúng ta sẽ được kết quả của bài 2 như sau:

1
2
3
4
5
6
+-----+---------+
|group|      ids|
+-----+---------+
|    0|[0, 2, 4]|
|    1|   [1, 3]|
+-----+---------+

Example 3: pivot

Yêu cầu

Cho tập dữ liệu như sau:

iddaypriceunits
10012310
10024511
10036712
10047813
10112310
10124513
10136714
10147815
10212310
10224511
10236716
10247818

Sử dụng pivot và cho ra kết quả như sau:

idprice_1price_2price_3price_4unit_1unit_2unit_3unit_4
1002345677810111213
1012345677810131415
1022345677810111618

Lời giải

Đầu tiên là chúng ta sẽ đi tạo tập dữ liệu theo yêu cầu đề bài (các bạn cũng có thể tạo 1 file csv và đọc vào cũng được nha, ở đây mình muốn tạo dữ liệu bởi nhiều cách để có thể giới thiệu hết 1 lượt những cách tạo dữ liệu input trong spark):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
StructType schema = new StructType().add("id", "integer")
				.add("day", "integer")
				.add("price", "integer")
				.add("units", "integer");

List<Row> listOfdata = new ArrayList<Row>();
listOfdata.add(RowFactory.create(100,1,23,10));
listOfdata.add(RowFactory.create(100,2,45,11));
listOfdata.add(RowFactory.create(100,3,67,12));
listOfdata.add(RowFactory.create(100,4,78,13));
listOfdata.add(RowFactory.create(101,1,23,10));
listOfdata.add(RowFactory.create(101,2,45,13));
listOfdata.add(RowFactory.create(101,3,67,14));
listOfdata.add(RowFactory.create(101,4,78,15));
listOfdata.add(RowFactory.create(102,1,23,10));
listOfdata.add(RowFactory.create(102,2,45,11));
listOfdata.add(RowFactory.create(102,3,67,16));
listOfdata.add(RowFactory.create(102,4,78,18));

Nhìn vào yêu cầu output chúng ta sẽ thấy là sẽ có 2 yêu cầu rõ rệt là sẽ xoay ngang cột price và cột unit. Vậy chúng ta cũng sẽ làm từng bước một, đầu tiên là xoay ngang cột price ra trước như sau:

1
2
Dataset<Row> result_1 = data.withColumn("concat_day", functions.concat(functions.lit("price_"), data.col("day")))
				.groupBy("id").pivot("concat_day").agg(functions.first("price"));
  • functions.concat() giúp tạo ra các giá trị cho cột mới là price cộng với đuôi là giá trị day
  • pivot giúp xoay ngang lại bảng dữ liệu ban đầu với cột xoay là cột vừa mới tạo concat_day

Kết quả của bước đầu tiên sẽ là:

1
2
3
4
5
6
7
+---+-------+-------+-------+-------+
| id|price_1|price_2|price_3|price_4|
+---+-------+-------+-------+-------+
|101|     23|     45|     67|     78|
|100|     23|     45|     67|     78|
|102|     23|     45|     67|     78|
+---+-------+-------+-------+-------+

Tương tự ta xoay ngang dữ liệu ban đầu theo cột unitbước 2 như sau:

1
2
Dataset<Row> result_2 = data.withColumn("concat_day", functions.concat(functions.lit("unit_"), data.col("day")))
				.groupBy("id").pivot("concat_day").agg(functions.first("units"));

Kết quả của bước 2 là:

1
2
3
4
5
6
7
+---+------+------+------+------+
| id|unit_1|unit_2|unit_3|unit_4|
+---+------+------+------+------+
|101|    10|    13|    14|    15|
|100|    10|    11|    12|    13|
|102|    10|    11|    16|    18|
+---+------+------+------+------+

Công việc còn lại là đơn giản rồi, chỉ cần nối 2 bảng lại là xong:

1
Dataset<Row> result = result_1.join(result_2, "id").sort("id");

Ta sẽ có kết quả cuối cùng là:

1
2
3
4
5
6
7
+---+-------+-------+-------+-------+------+------+------+------+
| id|price_1|price_2|price_3|price_4|unit_1|unit_2|unit_3|unit_4|
+---+-------+-------+-------+-------+------+------+------+------+
|100|     23|     45|     67|     78|    10|    11|    12|    13|
|101|     23|     45|     67|     78|    10|    13|    14|    15|
|102|     23|     45|     67|     78|    10|    11|    16|    18|
+---+-------+-------+-------+-------+------+------+------+------+

Example 4: window function

Yêu cầu

Cho tập dữ liệu sau:

1
2
3
4
5
6
7
8
9
10
11
time,department,items_sold
1,IT,15
2,Support,81
3,Support,90
4,Support,25
5,IT,40
6,IT,24
7,Support,31
8,Support,1
9,HR,27
10,IT,75

Yêu cầu:

  • Load dữ liệu vào spark từ file csv
  • Tính running_total hay tổng tích lũy số item đã bán được đến thời điểm time.

Output có dạng như sau:

timedepartmentitems_soldrunning_total
9HR2727
1IT1515
5IT4055
6IT2479
10IT75154
2Support8181
3Support90171
4Support25196
7Support31227
8Support1228

Lời giải

Phân tích output tí ta có thể thấy:

  • Các bản ghi được nhóm lại theo từng department và sắp xếp theo time
  • Cột running_total được tính bằng tổng tích lũy các items_sold ở phía trước nó mà cùng department

Phần nhóm và sắp xếp thì ta có thể sử dụng groupsort, còn phần tính tổng tích lũy thì sao? Giờ phải có cách nào lấy ra các bản ghi cùng department và ở phía trước của bản ghi hiện tại, window function là cách tốt nhất để giải quyết vấn đề này.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package part4;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;

public class Main {
	public static void main(String[] args) {
		SparkSession spark = SparkSession.builder().appName("Bai_4").master("local").getOrCreate();
		Dataset<Row> data = spark.read().option("header", true).option("inferSchema", true).csv("src/part4/input.csv");
				
		WindowSpec wins = Window.partitionBy("department").orderBy("time").rowsBetween(Window.unboundedPreceding(), Window.currentRow());
		Dataset<Row> result = data.withColumn("running_total", functions.sum("items_sold").over(wins));
		
		result.show();
	}
}

  • partitionBy chia tách dữ liệu ban đầu ra theo partition
  • rowsBetween(Window.unboundedPreceding(), Window.currentRow()) giới hạn lại không gian của cửa sổ là từ đầu cho tới vị trí hiện tại
  • functions.sum("items_sold").over(wins) là giá trị của cột running_total được tạo bằng tổng của cột items_sold giới hạn trong cửa sổ wins

Kết quả cuối cùng của ví dụ 4 sẽ là:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
+----+----------+----------+-------------+
|time|department|items_sold|running_total|
+----+----------+----------+-------------+
|   9|        HR|        27|           27|
|   1|        IT|        15|           15|
|   5|        IT|        40|           55|
|   6|        IT|        24|           79|
|  10|        IT|        75|          154|
|   2|   Support|        81|           81|
|   3|   Support|        90|          171|
|   4|   Support|        25|          196|
|   7|   Support|        31|          227|
|   8|   Support|         1|          228|
+----+----------+----------+-------------+

Còn 1 số ví dụ nữa có lẽ mình sẽ viết trong phần 2 nha vì bài này cũng khá là dài rồi. Xem tiếp phần 2 TẠI ĐÂY

This post is licensed under CC BY 4.0 by the author.