RDDs(Resilient Distributed Datasets)

분산되어 존재하는 데이터 요소들의 모임.
스파크에서 이루어지는 모든 작업의 기본 요소이자 핵심.

분산되어 있는 변경 불가능한 객체 모음. 각 RDD는 클러스터의 서로 다른 노드들에서 연산 가능하도록 여러 개의 파티션으로 나뉜다.
RDD는 사용자 정의 클래스를 포함해서 파이썬, 자바, 스칼라의 어떤 타입의 객체든 가질 수 있다. 트랜스포메이션(transformation)과 액션(action) 두가지 타입의 연산을 지원한다.



create RDDs

RDDs를 만드는 방법은 두가지가 있다.

  1. driver program의 collection을 parallelize.
  2. 파일시스템이나 hdfs등의 외부 데이터를 불러오기.

parallelize

  • python
lines = sc.parallelize(["pandas", "i like pandas"])
  • scala
val lines = sc.parallelize(List("pandas", "i like pandas"))

위의 두 예제 모두 파이썬과 스칼라의 리스트를 RDD로 parallelize 했다.

external storage data

  • python
lines = sc.textFile("/path/to/README.md")
  • scala
val lines = sc.textFile("/path/to/README.md")

외부 스토리지에서 데이터를 불러온다.



operations

RDD는 트랜스포메이션과 액션 연산 지원.


transformation

존재하는 RDD에서 새로운 RDD를 만들어 낸다.

example

  • 표현식과 일치하는 데이터를 걸러 낸다.(filter)
  • 두개의 RDD를 합친다.(union)

  • python
inputRDD = sc.textFile("log.txt")
errorsRDD = inputRDD.filter(lambda x: "error" in x)
  • scala
val inputRDD = sc.textFile("log.txt")
val errorsRDD = inputRDD.filter(line => line.contains("error"))

이미 존재하는 RDD를 변경하지는 않음. 대신 새로운 RDD를 리턴.


action

RDD를 기초로 결과 값을 계산하며 그 값을 드라이버 프로그램에 되돌려주거나 외부 스토리지(ex. HDFS)에 저장.
action이 실행될 때 마다 매번 새로 연산. 여러 액션 중 RDD 하나를 재사용하고 싶으면 RDD.persist()를 사용하여 결과를 유지할 수 있다.
데이터를 보존할 위치도 지정할 수 있다. 기본적으로는 메모리에 저장된다.

  • python
print "Input had " + badLinesRDD.count() + " concerning lines"
print "Here are 10 examples:"
for line in badLinesRDD.take(10):
  print line
  • scala
println("Input had " + badLinesRDD.count() + " concerning lines")
println("Here are 10 examples: ")
badLinesRDD.take(10).foreach(println)
  • take() : 일부 데이터를 가져옴.
    전체 데이터 셋이 단일 컴퓨터의 메모리에 올라올 수 있을 정도의 크기인 경우 collect() 사용 가능.


passing function

python

# lambda 식 예제
word = rdd.filter(lambda s: "error" in s)

# 실제 함수 전달 예제
def containsError(s):
  return "error" in s
word = rdd.filter(containsError)

#필드 참조가 없는 파이썬 함수 전달
class WordFunctions(Object):
  ...
  def getMatchesNoReference(self, rdd):
    query = self.query
    return rdd.filter(lambda x: query in x)


scala

class SearchFunctions(val query: String) {
  def isMatch(s: String): Boolean = {
    s.contain(query)
  }
  def getMatchesFunctionReference(rdd: RDD[String]): RDD[Boolean] = {
    // "isMatch"는 "this.isMatch"이므로 this의 모든 것이 전달된다.
    rdd.map(isMatch)
  }
  def getMatchesFieldReference(rdd: RDD[String]): RDD[Array[String]] = {
    // "query"는 "this.query"이므로 this의 모든 것이 전달된다.
    rdd.map(x => x.split(query))
  }
  def getMatchesNoReference(rdd: RDD[String]): RDD[Array[String]] = {
    // 필요한 필드만 추출하여 지역 변수에 저장해 전달한다.
    val query_ = this.query
    rdd.map(x => x.split(query_))
  }
}
  1. 외부 데이터에서 입력 RDD를 만든다.
  2. filter()같은 transformation을 써서 새로운 RDD를 정의한다.
  3. 재사용을 위한 중간 단계의 RDD들을 보존하기 위해 스파크에 persist()를 요청한다.
  4. 스파크가 최적화한 병렬 연산 수행을 위해 count()나 first() 같은 액션을 시작한다.