Apache Spark - RDD
RDDs(Resilient Distributed Datasets)
분산되어 존재하는 데이터 요소들의 모임.
스파크에서 이루어지는 모든 작업의 기본 요소이자 핵심.
분산되어 있는 변경 불가능한 객체 모음.
각 RDD는 클러스터의 서로 다른 노드들에서 연산 가능하도록 여러 개의 파티션으로 나뉜다.
RDD는 사용자 정의 클래스를 포함해서 파이썬, 자바, 스칼라의 어떤 타입의 객체든 가질 수 있다.
트랜스포메이션(transformation)과 액션(action) 두가지 타입의 연산을 지원한다.
create RDDs
RDDs를 만드는 방법은 두가지가 있다.
- driver program의 collection을 parallelize.
- 파일시스템이나 hdfs등의 외부 데이터를 불러오기.
parallelize
- python
lines = sc.parallelize(["pandas", "i like pandas"])
- scala
val lines = sc.parallelize(List("pandas", "i like pandas"))
위의 두 예제 모두 파이썬과 스칼라의 리스트를 RDD로 parallelize 했다.
external storage data
- python
lines = sc.textFile("/path/to/README.md")
- scala
val lines = sc.textFile("/path/to/README.md")
외부 스토리지에서 데이터를 불러온다.
operations
RDD는 트랜스포메이션과 액션 연산 지원.
transformation
존재하는 RDD에서 새로운 RDD를 만들어 낸다.
example
- 표현식과 일치하는 데이터를 걸러 낸다.(filter)
-
두개의 RDD를 합친다.(union)
- python
inputRDD = sc.textFile("log.txt")
errorsRDD = inputRDD.filter(lambda x: "error" in x)
- scala
val inputRDD = sc.textFile("log.txt")
val errorsRDD = inputRDD.filter(line => line.contains("error"))
이미 존재하는 RDD를 변경하지는 않음. 대신 새로운 RDD를 리턴.
action
RDD를 기초로 결과 값을 계산하며 그 값을 드라이버 프로그램에 되돌려주거나 외부 스토리지(ex. HDFS)에 저장.
action이 실행될 때 마다 매번 새로 연산. 여러 액션 중 RDD 하나를 재사용하고 싶으면 RDD.persist()
를 사용하여 결과를 유지할 수 있다.
데이터를 보존할 위치도 지정할 수 있다. 기본적으로는 메모리에 저장된다.
- python
print "Input had " + badLinesRDD.count() + " concerning lines"
print "Here are 10 examples:"
for line in badLinesRDD.take(10):
print line
- scala
println("Input had " + badLinesRDD.count() + " concerning lines")
println("Here are 10 examples: ")
badLinesRDD.take(10).foreach(println)
take()
: 일부 데이터를 가져옴.
전체 데이터 셋이 단일 컴퓨터의 메모리에 올라올 수 있을 정도의 크기인 경우collect()
사용 가능.
passing function
python
# lambda 식 예제
word = rdd.filter(lambda s: "error" in s)
# 실제 함수 전달 예제
def containsError(s):
return "error" in s
word = rdd.filter(containsError)
#필드 참조가 없는 파이썬 함수 전달
class WordFunctions(Object):
...
def getMatchesNoReference(self, rdd):
query = self.query
return rdd.filter(lambda x: query in x)
scala
class SearchFunctions(val query: String) {
def isMatch(s: String): Boolean = {
s.contain(query)
}
def getMatchesFunctionReference(rdd: RDD[String]): RDD[Boolean] = {
// "isMatch"는 "this.isMatch"이므로 this의 모든 것이 전달된다.
rdd.map(isMatch)
}
def getMatchesFieldReference(rdd: RDD[String]): RDD[Array[String]] = {
// "query"는 "this.query"이므로 this의 모든 것이 전달된다.
rdd.map(x => x.split(query))
}
def getMatchesNoReference(rdd: RDD[String]): RDD[Array[String]] = {
// 필요한 필드만 추출하여 지역 변수에 저장해 전달한다.
val query_ = this.query
rdd.map(x => x.split(query_))
}
}
- 외부 데이터에서 입력 RDD를 만든다.
- filter()같은 transformation을 써서 새로운 RDD를 정의한다.
- 재사용을 위한 중간 단계의 RDD들을 보존하기 위해 스파크에 persist()를 요청한다.
- 스파크가 최적화한 병렬 연산 수행을 위해 count()나 first() 같은 액션을 시작한다.