Như đã tìm hiểu ở bài viết trước về Spark SQL, Dataframe và Dataset, Spark SQL là một mô hình để xử lý dữ liệu có cấu trúc của Spark rất phổ biến. Trong bài viết này chúng ta sẽ sử dụng Spark SQL để đi phân tích, xử lý một dữ liệu bán lẻ có cấu trúc cho trước
Dữ liệu và chuẩn bị dữ liệu
Dữ liệu
Cho dataset đính kèm về dữ liệu bán lẻ online TẠI ĐÂY, hãy phân tích dữ liệu trên để trả lời 5 câu hỏi sau đây:
- Có tổng bao nhiêu giao dịch, sản phẩm và khách hàng khác nhau?
- Tỉ lệ khách hàng không có thông tin
- Đâu là nước có số lượng đơn hàng (Quantity) nhiều thứ 3?
- Từ nào xuất hiện ít nhất trong phần Description?
- Sản phẩm nào bán được số lượng (Quantity) lớn nhất ở United Kingdom?
Chuẩn bị dữ liệu
Để trực quan thì dữ liệu chúng ta sẽ đẩy lên HDFS và Spark SQL sẽ lấy dữ liệu trực tiếp từ HDFS để xử lý, để đẩy dữ liệu lên HDFS chúng ta sử dụng câu lệnh:
1
hdfs dfs -copyFromLocal resources/retails.csv /
Kiểm tra dữ liệu đã có trên HDFS chưa:
1
hdfs dfs -ls /
Thư viện và ngôn ngữ sử dụng
Mình là fan cứng của Java vì vậy mình sẽ sử dụng Java để giải quyết bài toán này thay vì Scala. Vì Java không phải là ngôn ngữ triển khai của Spark nên những hỗ trợ về Java cũng không nhiều và cũng có rất ít tài liệu về Spark viết với Java nên mình cũng muốn đóng góp thêm chút ít phần tài liệu về Spark với Java tới cộng đồng.
Các bạn tạo một maven project và thêm vào 2 phụ thuộc sau (phụ thuộc Spark Core và Spark SQL):
1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
<scope>provided</scope>
</dependency>
Phân tích và xử lý dữ liệu bán lẻ
Nếu bạn mới tìm hiểu về Spark SQL thì hãy xem lại bài viết Spark SQL, Dataframe và Dataset để hiểu rõ hơn về SparkSession, Dataframe, Dataset
Part 1
Câu hỏi: Có tổng bao nhiêu giao dịch, sản phẩm và khách hàng khác nhau?
Với câu hỏi thứ nhất, chúng ta lấy ra từng loại giao dịch, sản phẩm và khách hàng và đếm số lượng của từng loại:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.spark.part_1;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class Main {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Part-1")
.master("local")
.getOrCreate();
Dataset<Row> data = spark.read()
.option("inferSchema", true)
.option("header", true)
.csv("hdfs://localhost:9000/retails.csv");
// number of customer distinct
// except 1 because there are value is null of information customer ID
long cntCustomers = data.select("CustomerID").distinct().count() - 1;
// number of product distinct
long cntProdcts = data.select("StockCode").distinct().count();
// number of invoice distinct
long cntInvoices = data.select("InvoiceNo").distinct().count();
// print
System.out.println("Number of customer distinct: " + cntCustomers);
System.out.println("Number of product distinct: " + cntProdcts);
System.out.println("Number of invoice distinct: " + cntInvoices);
// output:
// Number of customer distinct: 4372
// Number of product distinct: 4070
// Number of invoice distinct: 25900
}
}
- Chúng ta nhìn vào bản ghi có thể thấy là các giá trị khách hàng, sản phẩm hay hóa đơn đều có những bản ghi lặp lại, vì vậy trước khi đếm chúng ta phải sử dụng
distinct()
để lọc các giá trị trùng. - Nhìn vào câu 2 chúng ta có thể thấy là có những khách hàng sẽ không có thông tin, vì thế trường
CustomerID
sẽ có những trườngNULL
và chúng ta phải bỏ chúng đi, vì thếcntCustomers
phải trừ đi 1.
Part 2
Câu hỏi: Tỉ lệ khách hàng không có thông tin
Câu hỏi 2 này cũng gần giống với câu hỏi đầu tiên. Chúng ta đếm những khách hàng không có thông tin và chia cho tổng số lượng khách hàng:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.spark.part_2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class Main {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("Part-2").master("local").getOrCreate();
Dataset<Row> data = spark.read()
.option("inferSchema", true)
.option("header", true)
.csv("hdfs://localhost:9000/retails.csv");
// get number of customer (done part 1)
long cntCustomers = data.select("CustomerID").count();
// get number of customer no information
long cntCustomersNoInfor = data.select("CustomerID").filter(data.col("CustomerID").isNull()).count();
double ratio = (double) cntCustomersNoInfor / cntCustomers * 100;
System.out.printf("Ratio no information: %f \n", ratio);
}
}
- Sử dụng
isNull()
để lấy ra các dòng NULL chính xác hơn
Part 3
Câu hỏi: Đâu là nước có số lượng đơn hàng (Quantity) nhiều thứ 3?
Với câu hỏi này, chúng ta nhóm lại các đơn hàng theo từng quốc gia và tính tổng số lượng theo Quantity:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.spark.part_3;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class Main {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("Part-3").master("local").getOrCreate();
Dataset<Row> data = spark.read()
.option("inferSchema", true)
.option("header", true)
.csv("hdfs://localhost:9000/retails.csv");
data.createOrReplaceTempView("data");
spark.sql("select Country, sum(Quantity) as count from data group by Country order by count desc").show();
}
}
- Trong câu này chúng ta truy vấn trực tiếp bằng câu lệnh SQL với
Spark.sql()
. Kết quả đầu ra khi sử dụngSpark.sql
là mộtDataset<Row>
và sử dụng bình thường như nhữngDataset<Row>
được khai báo khác khác.
Part 4
Câu hỏi: Từ nào xuất hiện ít nhất trong phần Description?
Câu hỏi này có lẽ là câu hỏi phức tạp nhất trong phần này, chúng ta phải lấy ra phần Description
và xử lý chúng.
Mình sẽ lấy ví dụ để rõ ràng hơn nha, giả sử chúng ta có 1 bản ghi của trường Description
là “WHITE HANGING HEART T-LIGHT HOLDER”, chúng ta sẽ sử dụng flatMap để tách từ 1 dòng này ra thành 5 dòng, mỗi dòng chứa một từ “WHITE”, “HANGING”, “HEART”, “T_LIGHT”, “HOLDER”. Sau khi tách toàn bộ các dòng của tập dữ liệu chúng ta sẽ làm công việc như các câu hỏi bên trên là nhóm chúng lại theo các từ giống nhau và thực hiện đếm.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.spark.part_4;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.types.StructType;
public class Main {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("Part-4").master("local").getOrCreate();
Dataset<Row> data = spark.read()
.option("inferSchema", true)
.option("header", true)
.csv("hdfs://localhost:9000/retails.csv");
data.where("Description is not null").flatMap(new FlatMapFunction<Row, Row>() {
private static final long serialVersionUID = 1L;
private int cnt = 0;
@Override
public Iterator<Row> call(Row r) throws Exception {
List<String> listItem = Arrays.asList(r.getString(2).split(" "));
List<Row> listItemRow = new ArrayList<Row>();
for (String item : listItem) {
listItemRow.add(RowFactory.create(cnt, item, 1));
cnt++;
}
return listItemRow.iterator();
}
}, RowEncoder.apply(new StructType().add("number", "integer").add("word", "string").add("lit", "integer"))).createOrReplaceTempView("data");
spark.sql("select word, count(lit) as count from data group by word order by count desc").show();
}
}
data.where("Description is not null")
để bỏ đi những trường NULL, vì chúng vừa không mang lại gì mà còn hay gây ra lỗiNullPointerException
.RowEncoder
sẽ nói cho Spark biếtStructType
của dữ liệu mới sau khi biến đổi của bạn có dạng như nào.
Part 5
Câu hỏi: Sản phẩm nào bán được số lượng (Quantity) lớn nhất ở United Kingdom?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.spark.part_5;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class Main {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("Part-4").master("local").getOrCreate();
Dataset<Row> data = spark.read()
.option("inferSchema", true)
.option("header", true)
.csv("hdfs://localhost:9000/retails.csv");
data.filter(data.col("Country").equalTo("United Kingdom")).createOrReplaceTempView("data");
spark.sql("select Description, sum(Quantity) as count from data group by Description order by count desc").show();
}
}
Xem toàn bộ mã nguồn của Project tại https://github.com/demanejar/retails