끄적끄적

스파크 9장 본문

스파크 스터디/스파크 9장

스파크 9장

monkeydev 2019. 6. 8. 14:29

9.5 ORC 파일

ORC는 하듭 워크로드를 위해 설계된 자기 기술적이며, 데이터 타입을 인식할 수 있는 컬럼 기반의 파일 포맷이다.

파케이처럼 별도의 옵션 지정 없이 데이터를 읽을 수 있다.

파케이, ORC 두 포맷은 매우 유사하지만 근본적인 차이점이 있다.

파케이는 스파크에 최적화된 포맷이고, ORC는 하이브에 최적화되어 있다.

 

9.5.1 ORC 파일 읽기

ORC 파일 읽기 예시

spark.read.format("orc").load("/data/flight-data/orc/2010-summary.orc").show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+

 

9.5.2 ORC 파일 쓰기

ORC 파일 쓰기 예시

val csvFile = spark.read.format("csv").option("header", "true").load("/data/flight-data/csv/2010-summary.csv")
csvFile.write.format("orc").mode("overwrite").save("my-orc-file.orc")

 

9.6 SQL 데이터베이스

SQL 데이터 소스는 매우 강력한 데이터 소스 중 하나이다.

이 장의 예제는 책의 조언에 따라 MySQL을 사용한다.

 

9.6.1, 9.6.2 SQL 데이터베이스 읽기 및 쿼리 푸시다운

Spark에서는 JDBC(JDBC(Java Database Connectivity)는 자바에서 데이터베이스에 접속할 수 있도록 하는 자바 API이다. JDBC는 데이터베이스에서 자료를 쿼리하거나 업데이트하는 방법을 제공한다.)를 이용한 여러 RDB와의 연결이 가능하다.

 

RDB와의 연결을 통해서 RDB의 테이블을 읽을 수 있고, RDB에 filter 과정을 위임할 수도 있다.

실행 계획의 PushedFilters 부분에서 관련 내용을 확인할 수 있다.

 

<MySQL mysql 데이터베이스의 ailab 테이블>

mysql> select * from ailab;
+---------+-----+
| name    | age |
+---------+-----+
| jaehyun |  31 |
| kyungah |  27 |
| sungil  |  28 |
| wonhyuk |  29 |
| wooseok |  29 |
+---------+-----+

 

<explain을 통한 PushedFilters 확인>

scala> val driver = "com.mysql.jdbc.Driver"
driver: String = com.mysql.jdbc.Driver

# 맨 끝에 mysql은 DataBase 이름입니다. 
scala> val url = "jdbc:mysql://172.17.0.3/mysql"
url: String = jdbc:mysql://172.17.0.3/mysql

# mysql 데이터 베이스의 ailab이라는 테이블의 전체 데이터를 load한다.
scala> val dbDataFrame = spark.read.format("jdbc").option("url", url).option("dbtable", "ailab").option("driver", driver).option("user", "root").option("password", "hellworld").load()

scala> dbDataFrame.filter(col("age") > 28).explain
== Physical Plan ==
*(1) Scan JDBCRelation(ailab) [numPartitions=1] [name#0,age#1] PushedFilters: [*IsNotNull(age), *GreaterThan(age,28)], ReadSchema: struct<name:string,age:int>

 

전체 쿼리 위임

때로는 전체 쿼리를 RDB에 전달해 결과를 DataFrame으로 받아야 하는 경우도 있다.

테이블명 대신 SQL 쿼리를 명시해 주면 된다.

scala> val pushdownQuery = """(SELECT DISTINCT(age) FROM ailab) AS ailab"""
pushdownQuery: String = (SELECT DISTINCT(age) FROM ailab) AS ailab

scala> val dbDataFrame = spark.read.format("jdbc").option("url", url).option("dbtable", pushdownQuery).option("driver", driver).option("user", "root").option("password", "tjddlf6277").load()
dbDataFrame: org.apache.spark.sql.DataFrame = [age: int]

scala> dbDataFrame.explain()
== Physical Plan ==
*(1) Scan JDBCRelation((SELECT DISTINCT(age) FROM ailab) AS ailab) [numPartitions=1] [age#30] PushedFilters: [], ReadSchema: struct<age:int>

 

스파크 자체 파티션에 결과 데이터를 저장함으로서 더 많은 처리를 할 수도 있다.

scala> val props = new java.util.Properties
props: java.util.Properties = {}

scala> props.setProperty("driver", "com.mysql.jdbc.Driver")
res13: Object = null

scala> props.setProperty("user", "root")
res14: Object = null

scala> props.setProperty("password", "tjddlf6277")
res15: Object = null

scala> val predicates = Array("age = 28 OR age = 29")
predicates: Array[String] = Array(age = 28 OR age = 29)

scala> spark.read.jdbc(url, "ailab", predicates, props).show()
+-------+---+
|   name|age|
+-------+---+
| sungil| 28|
|wonhyuk| 29|
|wooseok| 29|
+-------+---+

# rdd로 만들 수 있음.
scala> spark.read.jdbc(url, "ailab", predicates, props).rdd.getNumPartitions
res19: Int = 1

 

RDB를 spark에서 parquet로 저장

scala> val df = spark.read.jdbc(url, "ailab", predicates, props)
df: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> df.printSchema()
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)


scala> df.show()
+-------+---+
|   name|age|
+-------+---+
| sungil| 28|
|wonhyuk| 29|
|wooseok| 29|
+-------+---+

scala> df.write.format("parquet").save("./parquetFile")

scala> spark.read.load("./parquetFile").show()
+-------+---+
|   name|age|
+-------+---+
| sungil| 28|
|wonhyuk| 29|
|wooseok| 29|
+-------+---+


 

RDB의 CSV를 이용하여 RDB에 write

# spark 영역
scala> val pf = spark.read.format("parquet").load("./parquetFile")
pf: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> pf.show()
+-------+---+
|   name|age|
+-------+---+
| sungil| 28|
|wonhyuk| 29|
|wooseok| 29|
+-------+---+


scala> pf.write.jdbc(url, "newnewailab", props)


# mysql 영역
mysql> select * from newnewailab;
+---------+------+
| name    | age  |
+---------+------+
| sungil  |   28 |
| wonhyuk |   29 |
| wooseok |   29 |
+---------+------+
3 rows in set (0.00 sec)

 

9.8.5 파일 크기 관리

파일 크기를 관리하는 것은 저장할때는 중요하지 않지만, 읽을 때는 중요한 요소다.

작은 파일을 많이 생성하면 메타데이터에 엄청난 관리 부하가 발생한다.

HDFS, 스파크는 작은 크기의 파일을 잘 다루지 못한다.

이런 상황을 '작은 크기의 파일 문제' 라고 한다.

 

하지만 반대로 너무 큰 크기의 파일은 몇 개의 로우가 필요하더라도 전체 데이터 블록을 읽어야 하기 때문에 비효율적이다.

앞서 9.8.5 이전에, 결과 파일 수는 파일을 쓰는 시점에서의 파티션 수에서 파생된다고 말했다.

(9.8.3 병렬로 데이터 쓰기 챕터에서 repartition의 조정으로 파일의 수를 조정할 수 있었다. 그럼 파일 당 크기도 마음대로 지정할 수 있나?)

 

책에 의하면 DataFrameWriter에 다음의 옵션을 지정하면 파일당 최대 5,000개의 로우를 포함하도록 보장할 수 있다고 한다.

// df는 데이터프레임이라고 가정한다.
df.write.option("maxRecordsPerFile", 5000)
Comments