Posts Window function, pivot trong Spark SQL (Part 2)
Post
Cancel

Window function, pivot trong Spark SQL (Part 2)

Nếu bạn chưa xem phần 1 thì có thể xem lại TẠI ĐÂY nha, bài viết hôm nay mình sẽ giới thiệu tiếp tới mọi người một số ví dụ về window function và pivot sâu hơn để mọi người có thể hiểu rõ hơn về window function và pivot trong Spark

Example 5: Window function

Yêu cầu

Cho tập dữ liệu dạng .csv như sau:

1
2
3
4
5
6
7
8
9
10
11
time,department,items_sold,running_total
1,IT,15,15
2,Support,81,81
3,Support,90,171
4,Support,25,196
5,IT,40,55
6,IT,24,79
7,Support,31,227
8,Support,1,228
9,HR,27,27
10,IT,75,154
  • Đọc dữ liệu vào spark từ file csv chứa dữ liệu.
  • Dùng SparkSQL query độ lệch giữa các running_total liên tiếp nhau theo thời gian của các department

INPUT:

timedepartmentitems_soldrunning_total
1IT1515
2Support8181
3Support90171
4Support25196
5IT4055
6IT2479
7Support31227
8Support1228
9HR2727
10IT75154

OUTPUT:

timedepartmentitems_soldrunning_totaldiff
9HR272727
1IT151515
5IT405540
6IT247924
10IT7515475
2Support818181
3Support9017190
4Support2519625
7Support3122731
8Support12281

Lời giải

Chúng ta sẽ thấy là bài này giống bài tính tổng tích lũy tại Ví dụ 4 chỉ khác là tại ví dụ này chúng ta sẽ đi tính hiệu giữa 2 running_total liên tiếp mà không phải là tổng.

Để giải quyết bài này thì mình đưa ra một lời giải hơi phức tạp 1 tí như sau:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package part5;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;

public class Main {
	public static void main(String[] args) {
		SparkSession spark = SparkSession.builder().appName("Bai_5").master("local").getOrCreate();
		Dataset<Row> data = spark.read().option("header", true).option("inferSchema", true).csv("src/part5/input.csv");
		
		WindowSpec wins = Window.partitionBy("department")
				.orderBy("time")
				.rowsBetween(Window.currentRow() - 1, Window.currentRow());
		
		Dataset<Row> result_1 = data.withColumn("diff_sum", functions.sum("running_total").over(wins));
		Dataset<Row> result_2 = result_1.withColumn("diff_1", functions.max("running_total").over(wins));
		result_2.createOrReplaceTempView("data_1");
		
		spark.sql("select time, department, items_sold, running_total, diff_1, diff_sum - diff_1 as diff_2 from data_1")
				.createOrReplaceTempView("data_2");
		
		Dataset<Row> result = spark.sql("select time, department, items_sold, running_total, diff_1 - diff_2 as diff from data_2");
		result.show();
		
	}
}

Ngắn gọn lại lời giải trên, thì để tính được cột diff ta sẽ lấy tổng của running_total của hàng hiện và hàng trước đó trừ đi running_total của hàng hiện tại để lấy ra running_total của hàng trước đó, gọi cột mới này là cột diff_2. Sau đó lấy running_total của mỗi hàng hiện tại trừ đi running_total của hàng trước đó (là cột diff_2) thì ta sẽ ra được kết quả. Sở dĩ làm phức tạp như vậy mà không lấy luôn running_total của hàng trước đó qua window function là để tránh đi kết quả null, nếu ta lấy luôn giá trị running_total của hàng trước đó bằng window function sau:

1
WindowSpec wins = Window.partitionBy("department").orderBy("time").rowsBetween(Window.currentRow() - 1, Window.currentRow()-1);

Thì ta sẽ tạo được thêm cột mới là giá trị running_total của hàng trước đó và công việc bây giờ chỉ cần trừ đi sẽ có được kết quả cuối cùng, đơn giản như vậy nhưng kết quả của phương pháp trên bị vướng giá trị null như dưới đây:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
+----+----------+----------+-------------+----+
|time|department|items_sold|running_total|diff|
+----+----------+----------+-------------+----+
|   9|        HR|        27|           27|null|
|   1|        IT|        15|           15|null|
|   5|        IT|        40|           55|  40|
|   6|        IT|        24|           79|  24|
|  10|        IT|        75|          154|  75|
|   2|   Support|        81|           81|null|
|   3|   Support|        90|          171|  90|
|   4|   Support|        25|          196|  25|
|   7|   Support|        31|          227|  31|
|   8|   Support|         1|          228|   1|
+----+----------+----------+-------------+----+

Và mình vẫn chưa tìm được cách replace giá trị null kia thế nên mình giải quyết bài toán này bằng 1 phương pháp hơi phức tạp như trên.

Example 6: Window function

Yêu cầu

Cho tập dữ liệu đầu vào .csv như sau:

1
2
3
4
5
6
7
8
9
10
11
id,name,department,salary
1,Hunter Fields,IT,15
2,Leonard Lewis,Support,81
3,Jason Dawson,Support,90
4,Andre Grant,Support,25
5,Earl Walton,IT,40
6,Alan Hanson,IT,24
7,Clyde Matthews,Support,31
8,Josephine Leonard,Support,1
9,Owen Boone,HR,27
10,Max McBride,IT,75
  • Đọc dữ liệu vào spark từ file csv
  • Tìm sự chênh lệch salary giữa nhân viên có lương cao nhất với các nhân viên còn lại trong từng phòng ban

INPUT:

idnamedepartmentsalary
1Hunter FieldsIT15
2Leonard LewisSupport81
3Jason DawsonSupport90
4Andre GrantSupport25
5Earl WaltonIT40
6Alan HansonIT24
7Clyde MatthewsSupport31
8Josephine LeonardSupport1
9Owen BooneHR27
10Max McBrideIT75

OUTPUT:

idnamedepartmentsalarydiff
9Owen BooneHR270
1Hunter FieldsIT1560
5Earl WaltonIT4035
6Alan HansonIT2451
10Max McBrideIT750
2Leonard LewisSupport819
3Jason DawsonSupport900
4Andre GrantSupport2565
7Clyde MatthewsSupport3159
8Josephine LeonardSupport189

Lời giải

Bài này khá là đơn giản hơn so với các bài bên trên, chúng ta chỉ cần tìm ra giá trị max của department và trừ đi là xong:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package part6;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;

public class Main {
	public static void main(String[] args) {
		SparkSession spark = SparkSession.builder().appName("Bai_6").master("local").getOrCreate();
		Dataset<Row> data = spark.read().option("header", true).option("inferSchema", true).csv("src/part6/input.csv");
		
		WindowSpec wins = Window.partitionBy("department").orderBy("id")
				.rowsBetween(Window.unboundedPreceding(), Window.unboundedFollowing());
		Dataset<Row> result_1 = data.withColumn("diff_max", functions.max("salary").over(wins));
		
		result_1.createOrReplaceTempView("data_1");
		Dataset<Row> result = spark.sql("select id, name, department, salary, diff_max - salary as diff from data_1");
		
		result.show();
	}
}

Example 7: Window function

Yêu cầu

Cho tập dữ liệu dưới dạng đầu vào .csv như sau:

1
2
3
4
5
6
7
8
9
10
11
Employee,Salary
Tony,50
Alan,45
Lee,60
David,35
Steve,65
Paul,48
Micky,62
George,80
Nigel,64
John,42
  • Load dữ liệu từ file csv
  • Thêm cột Percentage với giá trị được quy định như sau
  • 30% số nhân viên đầu có lương cao nhất nhận giá trị “High”
  • 40% tiếp theo nhận giá trị “Average”
  • Phần còn lại nhận giá trị “Low”

Lời giải

Thoạt nhìn thì ta sẽ thấy nó đơn giản nhưng nhìn chung nó cũng không được đơn giản cho lắm. Hướng làm cho bài này mà mình đưa ra sẽ là:

  • Sắp xếp lại giá trị lương theo thứ tự từ cao tới thấp
  • Đánh số thứ tự cho các nhân viên vừa sắp xếp theo chỉ số giảm dần
  • Tính giá trị nhân viên nằm ở top bao nhiêu phần trăm bằng cách lấy số thự tự của nhân viên vừa được đánh dấu chia cho số thứ tự cao nhất (hay là tổng số nhân viên)
  • Thêm cột Percentage bằng cách những nhân viên nào có giá trị phần trăm tính ở bước trên từ 0.7 tới 1 thì là High, từ 0.3 tới 0.7 thì là Average, từ 0 tới 0.3 là Low

Mã nguồn lời giải này như sau:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package part7;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.expressions.Window;

public class Main {
	public static void main(String[] args) {
		SparkSession spark = SparkSession.builder().appName("Bai_7").master("local").getOrCreate();
		Dataset<Row> data = spark.read().option("header", true).option("inferSchema", true).csv("src/part7/input.csv");
		data = data.sort(data.col("Salary").desc());
		data = data.withColumn("stt", functions.rank().over(Window.orderBy("Salary")));
		data = data.withColumn("Salary_count", functions.count("Salary").over()).sort(data.col("Salary").desc());
		data.createOrReplaceTempView("data");
		
		Dataset<Row> data_1 = spark.sql("select Employee, Salary, stt / Salary_count as per from data");
		data_1.createOrReplaceTempView("data_1");
		
		Dataset<Row> result_1 = spark.sql("select Employee, Salary from data_1 where per > 0.7 and per <= 1");
		result_1 = result_1.withColumn("Percentage", functions.lit("High"));
		
		Dataset<Row> result_2 = spark.sql("select Employee, Salary from data_1 where per > 0.3 and per <= 0.7");
		result_2 = result_2.withColumn( "Percentage", functions.lit("Average"));
		
		Dataset<Row> result_3 = spark.sql("select Employee, Salary from data_1 where per <= 0.3 ");
		result_3 = result_3.withColumn("Percentage", functions.lit("Low"));
		
		Dataset<Row> result = result_1.union(result_2).union(result_3);
		result.show();
	}
}

Example 8: Window function

Yêu cầu

Cho tập dữ liệu với đầu vào .csv như sau:

1
2
3
4
5
6
7
8
9
10
11
id,title,genre,quantity
1,Hunter Fields,romance,15
2,Leonard Lewis,thriller,81
3,Jason Dawson,thriller,90
4,Andre Grant,thriller,25
5,Earl Walton,romance,40
6,Alan Hanson,romance,24
7,Clyde Matthews,thriller,31
8,Josephine Leonard,thriller,1
9,Owen Boone,sci-fi,27
10,Max McBride,romance,75
  • Load dữ liệu từ csv
  • Lọc những tilte có số lượng quantity top 1 và top 2 với mỗi genre

Lời giải

Bài này mình có thể giải quyết mà không cần sử dụng windows function như sau:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package part8;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class Main {
	public static void main(String[] args) {
		SparkSession spark = SparkSession.builder().appName("Bai_8").master("local").getOrCreate();
		Dataset<Row> data = spark.read().option("header", true).option("inferSchema", true).csv("src/part8/input.csv");
		
		data.createOrReplaceTempView("data");
		spark.sql("SELECT\n"
				+ "  id,\n"
				+ "  title,\n"
				+ "  genre,\n"
				+ "  quantity,\n"
				+ "  rank\n"
				+ "FROM (\n"
				+ "  SELECT\n"
				+ "    id,\n"
				+ "    title,\n"
				+ "    genre,\n"
				+ "    quantity,\n"
				+ "    dense_rank() OVER (PARTITION BY genre ORDER BY quantity DESC) as rank\n"
				+ "  FROM data) tmp\n"
				+ "WHERE\n"
				+ "  rank <= 2").show();
		
	}
}

Example 9: pivot

Yêu cầu

Cho tập dữ liệu với đầu vào .csv như sau:

1
2
3
4
5
1,Question1Text,Yes,abcde1,0,"(x1,y1)"
2,Question2Text,No,abcde1,0,"(x1,y1)"
3,Question3Text,3,abcde1,0,"(x1,y1)"
1,Question1Text,No,abcde2,0,"(x2,y2)"
2,Question2Text,Yes,abcde2,0,"(x2,y2)"
  • Load dữ liệu từ json
  • Tạo báo cáo giống output

INPUT:

QidQuestionAnswerTextParticipantIDAssessmentGeoTag
1Question1TextYesabcde10(x1,y1)
2Question2TextNoabcde10(x1,y1)
3Question3Text3abcde10(x1,y1)
1Question1TextNoabcde20(x2,y2)
2Question2TextYesabcde20(x2,y2)

OUTPUT:

ParticipantIDAssessmentGeoTagQid_1Qid_2Qid_3
abcde10(x1,y1)YesNo3
abcde20(x2,y2)NoYesnull

Lời giải

Bài này cũng khá tương tự với lại ví dụ 3, nên bạn có thể xem lại ví dụ 3 để hiểu hơn 1 số chỗ phép biến đổi nha:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package part9;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;


public class Main {
	public static void main(String[] args) {
		SparkSession spark = SparkSession.builder().appName("Bai_9").master("local").getOrCreate();
		Dataset<Row> data = spark.read().option("header", true).option("inferSchema", true).csv("src/part9/input.csv");
		
		Dataset<Row> result = data.withColumn("concat_Qid", functions.concat(functions.lit("Qid_"), data.col("Qid")))
				.groupBy("ParticipantID", "Assessment", "GeoTag").pivot("concat_Qid").agg(functions.first("AnswerText"));
		
		result.show();
	}
}

Tổng kết

Bạn có thể xem lại toàn bộ mã nguồn của 9 ví dụ trên TẠI ĐÂY (Nếu thấy thú vị thì và bổ ích cho nhóm mình 1 star trong repo nha).

File tổng hợp các ví dụ các bạn có thể xem TẠI ĐÂY

Mong rằng 9 ví dụ này giúp cho các bạn hiểu qua được phần vào về window functionpivot trong Spark SQL hơn.

This post is licensed under CC BY 4.0 by the author.