Posts Spark RDD
Post
Cancel

Spark RDD

Resilient Distributed Datasets (RDDs)

Resilient Distributed Datasets (RDD) là một cấu trúc dữ liệu cơ bản của Spark. Nó là một tập hợp bất biến phân tán của một đối tượng có thể hoạt động song song.

Có hai cách để tạo RDDs:

  • Tạo từ một tập hợp dữ liệu có sẵn trong ngôn ngữ sử dụng như Java, Python, Scala.
  • Lấy từ dataset hệ thống lưu trữ bên ngoài như HDFS, Hbase hoặc các cơ sở dữ liệu quan hệ.

Ví dụ tạo RDD từ 1 List với Java:

1
2
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

Ví dụ tạo RDD từ 1 file text:

1
JavaRDD<String> distFile = sc.textFile("data.txt");

RDD Operations

RDD hỗ trợ 2 loại operations:

  • transformations: tạo tập dữ liệu mới từ tập dữ liệu hiện có
  • actions: trả về giá trị cho chương trình điều khiển sau khi thực hiện tính toán trên tập dữ liệu

Tất cả transformations trong Spark đều lazy, đây cũng là một tính chất quan trọng quyết định tới hiệu năng của Spark và đi phỏng vấn hay thi câu này khá được chú ý tới. Các transformations không tính toán kết quả ngay lập tức, thay vào đó nó nhớ phép biến đổi được áp dụng cho tập dữ liệu. Transformations chỉ thực hiện tính toán khi actions yêu cầu một kết quả trả về.

Transformations

TransformationÝ nghĩa
map(func)Trả về một tập dữ liệu mới bằng cách chuyển đổi mỗi phần tử cũ sang từng phần tử mới thông qua func
filter(func)Trả về một tập dữ liệu mới nếu các phần tử trong tập dữ liệu được chọn có func trả về giá trị true
flatMap(func)Giống mới map, tuy nhiên map là chuyển đổi 1-1 còn với flatMap thì có thể là chuyển đổi 1-N, 1-1, 1-0
union(otherDataset)Trả về tập dữ liệu mới là hợp của 2 tập dữ liệu
intersection(otherDataset)Trả về tập dữ liệu mới là hợp của 2 tập dữ liệu
distinct([numPartitions]))Trả về tập dữ liệu mới với các phần tử riêng biệt
groupByKey([numPartitions])Khi tập dữ liệu ban đầu có dạng cặp (K,V), trả về tập dữ liệu khác có dạng là 1 cặp (K, Iterable). Hiểu đơn giản là hàm này giúp nhóm các cặp có cùng Key lại với nhau
reduceByKey(func, [numPartitions])Tập dữ liệu ban đầu có dạng là 1 cặp (K,V) và kết quả cho ra là 1 cặp (K,V) khác, trong đó giá trị V được tổng hợp từ hàm func. Chúng ta hiểu đơn giản, reduceByKey giống so mới groupByKey, tuy nhiên groupByKey chỉ giúp nhóm các Value lại trong khi đó reduceByKey có thêm 1 hàm func để thực hiện phép toán tổng hợp các Value đó và trả về kết quả Value duy nhất
sortByKey([ascending], [numPartitions])Tập dữ liệu ban đầu có dạng 1 cặp (K,V), trả về 1 cặp (K,V) khác với các K đã được sắp xếp tăng dần hoặc giảm dần

Còn 1 số các Transformations khác mà do chúng không phổ biến và không được sử dụng thường xuyên nên mình không đề cập ở đây. Nếu bạn muốn tìm hiểu thêm thì đọc tại Spark docs transformations

Actions

ActionsÝ nghĩa
reduce(func)Tổng hợp các phần tử sử dụng hàm func (nhận vào 2 tham số và trả về một)
collect()Trả về tất cả các dữ liệu dưới dạng 1 mảng
count()Trả về sống lượng phần tử của tập dữ liệu
first()Trả về phần tử đầu tiên
take(n)Trả về 1 mảng với n phần tử đầu tiên trong mảng đó
saveAsTextFile(path)Ghi dữ liệu vào file text
foreach(func)Duyệt từng phần tử của tập hợp

Còn 1 số các Actions khác mà do chúng không phổ biến và không được sử dụng thường xuyên nên mình không đề cập ở đây. Nếu bạn muốn tìm hiểu thêm thì đọc tại Spark docs Actions

Ví dụ một số Operations

Tạo một maven project bằng Java và thêm vào các phụ thuộc sau:

1
2
3
4
5
6
7
8
9
10
11
12
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-core_2.12</artifactId>
	<version>3.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
	<groupId>org.apache.hadoop</groupId>
	<artifactId>hadoop-client</artifactId>
	<version>3.3.0</version>
</dependency>

map

Ta có một tập dữ liệu có giá trị các phần tử là 10,20,30. Bây giờ chúng ta sẽ dùng map để gấp đôi từng phần tử trong tập dữ liệu ban đầu.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package demo;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

public class Main {
	public static void main(String[] args) {
		SparkConf conf = new SparkConf()
				.setAppName("Demo")
				.setMaster("local[2]");
		
		try (JavaSparkContext sc = new JavaSparkContext(conf)) {
			List<Integer> list = Arrays.asList(10,20,30);
			JavaRDD<Integer> data = sc.parallelize(list);
			
			data = data.map(new Function<Integer, Integer>() {
				private static final long serialVersionUID = 1L;

				@Override
				public Integer call(Integer v1) throws Exception {
					return v1 * 2;
				}
				
			});
			
			data.collect().forEach(v -> System.out.println(v));
		}
		
	}
}

Kết quả cho ra lần lượt là:

1
2
3
10
20
30

filter

Ta có tập dữ liệu ban đầu là 10, 11, 12, 13, 14, 15. Sau đó sử dụng flatMap để lọc ra các phần tử chia hết cho 5.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package filter;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

public class Main {
	public static void main(String[] args) {
		SparkConf conf = new SparkConf()
				.setAppName("Demo")
				.setMaster("local[2]");
	
		try (JavaSparkContext sc = new JavaSparkContext(conf)) {
			List<Integer> list = Arrays.asList(10,11,12,13,14,15);
			JavaRDD<Integer> data = sc.parallelize(list);
			
			data = data.filter(new Function<Integer, Boolean>() {
				private static final long serialVersionUID = 1L;

				@Override
				public Boolean call(Integer v1) throws Exception {
					if(v1 % 5 == 0) return true;
					return false;
				}
			});
			
			data.collect().forEach(v -> System.out.println(v));
		}
	}
}

Kết quả cho ra lần lượt là:

1
2
10
15

groupByKey

Ta có 1 tập dữ liệu ban đầu dạng (K,V). groupByKey sẽ giúp nhóm khác phần tử cùng key lại với nhau.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package groupbykey;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

public class Main {
	public static void main(String[] args) {
		SparkConf conf = new SparkConf()
				.setAppName("Demo")
				.setMaster("local[2]");
	
		try (JavaSparkContext sc = new JavaSparkContext(conf)) {
			List<Tuple2<String, Integer>> list = Arrays.asList(
					new Tuple2<String, Integer>("C", 3), 
					new Tuple2<String, Integer>("A", 1), 
					new Tuple2<String, Integer>("B", 4), 
					new Tuple2<String, Integer>("A", 2), 
					new Tuple2<String, Integer>("B", 5));
			
			JavaPairRDD<String, Integer> data = sc.parallelizePairs(list);
			data.groupByKey().collect().forEach(s -> System.out.println(s));
			
		}
	}
}

Kết quả sẽ là:

1
2
3
(B,[4, 5])
(A,[1, 2])
(C,[3])

reduceByKey

reduceByKey ngoài việc nhóm các phần tử cùng key lại với nhau nó còn giúp thực hiện tính toán trên các Value cùng Key đó.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package reducebykey;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;

import scala.Tuple2;

public class Main {
	public static void main(String[] args) {
		SparkConf conf = new SparkConf()
				.setAppName("Demo")
				.setMaster("local[2]");
	
		try (JavaSparkContext sc = new JavaSparkContext(conf)) {
			List<Tuple2<String, Integer>> list = Arrays.asList(
					new Tuple2<String, Integer>("C", 3), 
					new Tuple2<String, Integer>("A", 1), 
					new Tuple2<String, Integer>("B", 4), 
					new Tuple2<String, Integer>("A", 2), 
					new Tuple2<String, Integer>("B", 5));
			
			JavaPairRDD<String, Integer> data = sc.parallelizePairs(list);
			data = data.reduceByKey(new Function2<Integer, Integer, Integer>() {
				private static final long serialVersionUID = 1L;

				@Override
				public Integer call(Integer v1, Integer v2) throws Exception {
					return v1 + v2;
				}
			});
			
			data.collect().forEach(v -> System.out.println(v));
			
		}
	}
}

Kết quả là:

1
2
3
(B,9)
(A,3)
(C,3)

Xem toàn bộ mã nguồn của ví dụ tại: https://github.com/demanejar/spark-rdd

Tham khảo: https://laptrinh.vn/, https://spark.apache.org/

This post is licensed under CC BY 4.0 by the author.