본문 바로가기

Spark

2. RDD

스파크에선 RDD 라는 개념이 굉장히 중요한듯..!!


RDD : Resilent Distributed Dataset


뭔가 .. 의역하면 회복력있는 분포된 데이터집합!!


스파크에서는 모든 작업은 새로운 RDD 를 만들거나, 존재하는 RDD를 변형하거나, 결과 계산을 위해 RDD에서 연산을 호출하는 것 중의 하나로 표현된다. 그리고 내부적으로 스파크는 자동으로 RDD에 있는 데이터들을 클러스터에 분배하여 클러스터 위에서 수행하는 연산들을 병렬화한다.




스파크의 RDD는 단순하게 말하면 분산되어 있는 변경 불가능한 객체 모음이다. RDD는 클러스터의 서로 다른 노드들에서 연산 가능하도록 여러개의 파티션( partition )으로 나뉜다.


이 멘트 엄청 핵심포인트 인듯...!! 분산된 데이터를 다시 파티션으로 나눠 분산 처리 할 수 있다..! 이런 말인듯 ㅎㅎ 



그니까 엄청 큰 데이터를 자기한테 가져와서 다시 다른 여러 노드들에게 "야! 이거 양 많으니까 나눠서 처리해!!" 라는 식으로 데이터를 파티셔닝해서 빠르게 처리 할 수 있다!! 이런 의미로 보면 될듯!! ㅎㅎ











RDD 의 연산 타입

  • 트랜스 포메이션 ( transformation )
 - 존재하는 RDD에서 새로운 RDD를 만들어 낸다.

>>> sc.textFile("README.md").filter(lambda line : "Python" in line)
16/04/05 16:30:54 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 127.4 KB, free 834.2 KB)
16/04/05 16:30:54 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 13.9 KB, free 848.2 KB)
16/04/05 16:30:54 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:53978 (size: 13.9 KB, free: 511.0 MB)
16/04/05 16:30:54 INFO SparkContext: Created broadcast 5 from textFile at NativeMethodAccessorImpl.java:-2
PythonRDD[14] at RDD at PythonRDD.scala:43


위에서 보듯이 PythonRDD 라는 Type 의 Object 로 나온것을 볼 수 있다. 트랜스 포메이션의 결과는 RDD 라는 말이 이런 뜻임!!



  • 액션 ( action )

 - RDD를 기초로 결과 값을 게산하며 그 값을 드라이버 프로그램에 되돌려 주거나 외부 스토리지에 저장하기도 한다.


>>> sc.textFile("README.md").first()
16/04/05 16:39:18 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 127.4 KB, free 127.4 KB)
16/04/05 16:39:18 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 13.9 KB, free 141.3 KB)
16/04/05 16:39:18 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:54457 (size: 13.9 KB, free: 511.1 MB)
16/04/05 16:39:18 INFO SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:-2
16/04/05 16:39:18 INFO FileInputFormat: Total input paths to process : 1
16/04/05 16:39:18 INFO SparkContext: Starting job: runJob at PythonRDD.scala:393
16/04/05 16:39:18 INFO DAGScheduler: Got job 0 (runJob at PythonRDD.scala:393) with 1 output partitions
16/04/05 16:39:18 INFO DAGScheduler: Final stage: ResultStage 0 (runJob at PythonRDD.scala:393)
16/04/05 16:39:18 INFO DAGScheduler: Parents of final stage: List()
16/04/05 16:39:18 INFO DAGScheduler: Missing parents: List()
16/04/05 16:39:18 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[2] at RDD at PythonRDD.scala:43), which has no missing parents
16/04/05 16:39:18 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.8 KB, free 146.1 KB)
16/04/05 16:39:18 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 3.0 KB, free 149.1 KB)
16/04/05 16:39:18 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:54457 (size: 3.0 KB, free: 511.1 MB)
16/04/05 16:39:18 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/04/05 16:39:18 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (PythonRDD[2] at RDD at PythonRDD.scala:43)
16/04/05 16:39:18 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/04/05 16:39:18 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2167 bytes)
16/04/05 16:39:18 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/04/05 16:39:18 INFO HadoopRDD: Input split: file:/Users/1002720/Documents/spark-1.6.1-bin-hadoop2.6/README.md:0+1679
16/04/05 16:39:18 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
16/04/05 16:39:18 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
16/04/05 16:39:18 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
16/04/05 16:39:18 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
16/04/05 16:39:18 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
16/04/05 16:39:19 INFO PythonRunner: Times: total = 207, boot = 200, init = 7, finish = 0
16/04/05 16:39:19 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2143 bytes result sent to driver
16/04/05 16:39:19 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 273 ms on localhost (1/1)
16/04/05 16:39:19 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/04/05 16:39:19 INFO DAGScheduler: ResultStage 0 (runJob at PythonRDD.scala:393) finished in 0.284 s
16/04/05 16:39:19 INFO DAGScheduler: Job 0 finished: runJob at PythonRDD.scala:393, took 0.344070 s
u'# Apache Spark'


트렌스포메이션(transformation) 처럼 RDD 가 아닌 스트링(String) 형태의 값이 리턴된것을 볼 수 있다!




RDD 관련한 method 로는 아래 링크를 참조하자!

http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD



그러면 만약 python 파일에서 실행하려면 어떻게 해야 할까??







from pyspark import SparkContext

logFile = "README.md"  # Should be some file on your system

sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))


위 처럼 py파일을 하나 작성하고 아래처럼 명령어를 실행시키면 결과를 볼 수 있다.



➜  spark-1.6.1-bin-hadoop2.6 bin/pyspark SimpleApp.py
Lines with a: 58, lines with b: 26





'Spark' 카테고리의 다른 글

개발환경 셋팅하기 with pycharm  (0) 2016.06.14
2-2.RDD - Operations  (0) 2016.06.03
2-1. RDD - Parallelized Collections, External Datasets  (0) 2016.06.03
1. spark 설치하기  (0) 2016.04.05
0. Spark ( Lightning-fast cluster computing )  (0) 2016.04.03