본문 바로가기

Spark

3. Spark SQL, DataFrames and Datasets

Spark 에서도 데이터를 SQL 문을 이용해서 데이터를 Query 할 수 있는데, 그 부분에 대해서 간단하게 얘기해 보려고 한다.


기본적으로 SQL 로 Query 한 데이터는 DataFrame 이라는 데이터 형태로 데이터를 Return 한다. DataFrame 은 스파크에서 분산처리를 위해 만든 컬럼이 존재하는 분산 콜렉션이다. 


사실 뭔가 업무할 때 ( 코딩할 때 ) 필요한 것만 적을거라 더이상의 자세한 설명은 빼고 아래 코드로 대체 하겠다. 

# sc is an existing SparkContext.
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are RDDs and support all the normal RDD operations.
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
  print(teenName)


Parquet 파일로부터 데이터 소스를 로드 하는 법도 있는데 그건 빼고 +_+... (왜냐면 우리 회사에서는 이렇게는 잘 안쓰는... 나만 그런가... 아님 아직 내가 초짜라... ㅠㅠ )

스파크에서는 Hive 로부터 데이터를 읽고 쓰는 것이 가능하다.  


아래 URL 을 따라 가보면 Spark 에서 Hive 를 사용시에 Spark conf 파일에 들어가야 할 내용이 명시되어 있다. 보통 conf/spark-env.sh 에 HADOOP_CONF 경로가 적혀 있다.

링크 :http://spark.apache.org/docs/1.6.1/sql-programming-guide.html#hive-tables

물론 spark-submit 시에 물고 올라가는 방법도 있긴하다.

다 설정이 되고 나면 아래와 같은 명령이 실행 가능하다고 한다.

# sc is an existing SparkContext.
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results = sqlContext.sql("FROM src SELECT key, value").collect()


Tuning 을 위해 sqlContext.cacheTable("tableName") or dataFrame.cache() 과 같은 로직도 사용이 가능한데, 반드시 사용후에는 sqlContext.uncacheTable("tableName") 을 해줘야 한다.