Key-Value Pair

집합 연산 등에 빈번하게 쓰임.

스파크는 키/값 쌍을 가지고 있는 RDD에 대해 특수한 연산 제공.
pair rdd라고 불림.


Pair RDD 생성

키를 가지는 데이터로 동작하는 함수들을 고려해 튜플로 구성된 RDD 리턴.

  • python
pairs = lines.map(lambda x: (x.split(" ")[0], x))
  • scala
val pairs = lines.map(x => (x.split(" ")(0), x))

스칼라나 파이썬에서 메모리에 있는 데이터로부터 페어 RDD를 만들어내려면 페어 데이터세트에 Spark.Context.parallelize() 호출.


pair rdd transformation

기본 RDD에서 가능한 모든 트랜스포메이션을 사용할 수 있다.
단, 페어 RDD는 튜플을 가지므로, 개별데이터를 다루는 함수 대신 튜플을 처리하는 함수를 전달해야 한다.

functions

rdd example

{(1, 2), (3, 4), (3, 6)}

reduceByKey(func) : 동일 키에 대한 값들을 합친다.

rdd.reduceByKey((x, y) => x + y)  // result : {(1, 2), (3, 10)}

groupByKey() : 동일 키에 대한 값들을 그룹화한다.

rdd.groupByKey()    // result : {(1, [2]), (3, [4, 6])}

combineByKey(createConbiner, mergeValue, mergeConbiners, partitioner) : 다른 결과타입을 써서 동일 키의 값을 합친다.

mapValues(func) : 키의 변경 없이 페어 RDD의 각 값에 함수를 적용한다.

rdd.mapValues(x => x + 1)   // result : {(1, 3), (3, 5), (3, 7)}

flatMapValues(func) : pair RDD의 각 값에 대해 반복자를 리턴하는 함수를 적용하고, 리턴받은 값들에 대해 기존 키를 써서 키/값 쌍을 만든다. 종종 token 분리에 쓰인다.

rdd.flatMapValues(x => (x to 5))
// {(1, 2), (1, 3), (1, 4), (1, 5), (3, 4), (3, 5)}

keys() : RDD가 가진 키들만을 되돌려준다.

rdd.keys()    //result : {1, 3, 3}

values() : RDD가 가진 값들만을 되돌려준다.

rdd.values()  // result : {2, 4, 6}

sortByKey() : 키로 정렬된 RDD를 되돌려준다.

rdd.sortByKey()   // result : {(1, 2), (3, 4), (3, 6)}

두 pair RDD에 대한 functions

rdd = {(1, 2), (3, 4), (3, 6)}, other = {(3, 9)}

subtractByKey : 다른 쪽 RDD에 있는 키를 써서 RDD의 데이터를 삭제한다.

rdd.subtractByKey(other)  // result : {(1, 2)}

join : 두 RDD에 대해 이너 조인(inner join)을 수행한다.

rdd.join(other)   // result : {(3, (4, 9)), (3, (6, 9))}

rightOuterJoin : 첫 번째 RDD에 있는 키들을 대상으로 두 RDD간에 조인을 수행한다.

rdd.rightOuterJoin(other) // result : {(3, (Some(4), 9)), (3, (Some(6), 9))}

leftOuterJoin : 다른 쪽 RDD에 있는 키들을 대상으로 두 RDD 간에 조인을 수행한다.

rdd.leftOuterJoin(other)  // result : {(1, (2, None)), (3, (4, Some(9))), (3, (6, Some(9)))}

cogroup : 동일 키에 대해 양쪽 RDD를 그룹화한다.

rdd.cogroup(other)    // result : {(1, ([2], [])), (3, ([4, 6], [9]))}


value에 filter 적용

pairs.filter{ case (key, value) => value.length < 20 }
map { case(x, y): (x, func(y))} // mapValues(func)와 동일


집합 연산

스칼라에서 reduceByKey()와 mapValues()로 키별 평균 구하기

rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
// word count 예
val input = sc.textFile("s3://...")
val words = input.flatMap(x => x.split(" "))
val result = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)
// combineByKey()를 사용한 키별 평균
val result = input.combineByKey(
  (v) => (v, 1),    // createConbiner
  (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), // mergeValue
  (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)  // mergeConbiners
).map { case(key, value) => (key, value._1 / value._2.toFloat) }
result.collectAsMap().map(println(_))
// 병렬화 직접 지정을 사용한 reduceByKey()
val data = Seq(("a", 3), ("b", 4), ("a", 1))
sc.parallelize(data).reduceByKey((x, y) => x + y) // 기본 병렬화 수준 사용.
sc.parallelize(data).reduceByKey((x, y) => x + y, 10)   // 병렬화 수준 지정.


sorting

val input: RDD[(Int, Venue)] = ...
implicit val sortIntegersByString = new Ordering[Int] {
  override def compare(a: Int, b: Int) = a.toString.compare(b.toString)
}
rdd.sortByKey(sortIntegersByString)


pair RDD Action

rdd example

{(1, 2), (3, 4), (3, 6)}

countByKey() : 각 키에 대한 값의 개수를 센다.

rdd.countByKey()    // result : {(1, 1), (3, 2)}

collectAsMap() : 쉬운 검색을 위해 결과를 맵 형태로 모은다.

rdd.collectAsMap()    // result : Map{(1, 2), (3, 4), (3, 6)}

lookup(key) : 들어온 키에 대한 모든 값을 되돌려 준다.

rdd.lookup(3)   // result : [4, 6]


Data Partitioning

// code initialization, read user information from hadoop sequence file
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()

// 지난 5분간의 이벤트 로그 파일을 처리하기 위해 주기적으로 불리는 함수.
// 여기서 처리하는 시퀀스 파일이 (UserId, LinkInfo) 쌍을 갖고 있다고 가정.
def processNewLogs(logFileName: String) {
  val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
  val joined = userData.join(events)  // (UserID, (UserInfo, LinkInfo))를 페어로 가지는 RDD

  val offTopicVisits = joined.filter {
    // 각 아이템을 그 컴포넌트들로 확장한다.
    case (userId, (userInfo, linkInfo)) => !userInfo.topics.contains(linkInfo.topic)
  }.count()
  println("Number of visits to non-subscribed topics: " + offTopicVisits)
}


파티셔닝 연산

페이지 랭크 알고리즘

// 이웃 리스트는 스파크 오브젝트 파일에 저장되어 있다고 가정.
val links = sc.objectFile[(String, Seq[String])]("links").partitionBy(new HashPartitioner(100)).persist()

// 각 페이지의 랭크를 1.0으로 초기화. mapValues()를 사용하므로 결과 RDD는 links와 동일한 파티셔너를 가진다.
var ranks = links.mapValues(v => 1.0)

// 10회 반복하여 알고리즘 수행
for (i <- 0 until 10) {
  val contributions = links.join(ranks).flatMap {
    case (pageId, (links, rank)) =>
      links.map(dest => (dest, rank / links.size))
  }
  ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85 * v)
}

// 결과 기록
ranks.saveAsTextFile("ranks")