[Spark] cache()와 persist()의 차이

Spark

cache()와 persist()의 차이

Spark로 데이터를 다룰 때 Action수행 시점마다 로드되지 않고,
한번 로드한 데이터를 메모리상에 상주 시키는 메서드가 있으며,
그것이 cache()persist()이다.
둘의 차이를 알아보자.

1. 먼저 cache()

scala> val test = Seq(1, 2, 3)
test: Seq[Int] = List(1, 2, 3)

// to DF
scala> val test_df = test.toDF()
test_df: org.apache.spark.sql.DataFrame = [value: int]

// 캐싱
scala> test_df.cache()

scala> test_df.storageLevel
res0: org.apache.spark.storage.StorageLevel = StorageLevel(disk, memory, deserialized, 1 replicas)

2. persist()

persist()메서드는 스토리지 레벨을 인자로 받아 저장 공간을 정할 수 있다.

scala> val test2 = Seq(1, 2, 3)
scala> val test2_df = test2.toDF()

scala> import org.apache.spark.storage.StorageLevel

// 캐싱
scala> test2_df.persist(StorageLevel.MEMORY_AND_DISK)

scala> test2_df.storageLevel
res1: org.apache.spark.storage.StorageLevel = StorageLevel(disk, memory, deserialized, 1 replicas)

3. Storage Level

– 스토리지 레벨에는 여러 종류가 있다. spark_storage_level~_SER은 원본이 아닌 Serealize 된 형태로 저장한다는 것, 저장용량에서의 이점은 있지만, 저장할 때와 읽을 때 SER, deSER로 인한 CPU사용량 증가가 있다.

– DF.storageLevel메서드와 RDD.getStorageLevel을 사용해 스토리지 레벨을 확인할 수 있다.
아래 코드는 DFRDD에서 cache()메서드 기본값의 차이를 보여준다.

scala> import org.apache.spark.storage.StorageLevel

scala> val test3 = Seq(1, 2, 3)
scala> val test3_df = test3.toDF()

// RDD & DF 스토리지 레벨
scala> test3_df.rdd.getStorageLevel
res0: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)
scala> test3_df.storageLevel
res1: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)

// DF 캐싱
scala> test_df.cache()

// RDD & DF 스토리지 레벨
scala> test3_df.rdd.getStorageLevel
res3: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)
scala> test3_df.storageLevel
res4: org.apache.spark.storage.StorageLevel = StorageLevel(disk, memory, deserialized, 1 replicas)

// RDD 캐싱
scala> test3_df.rdd.cache()

// RDD 스토리지 레벨
scala> test3_df.rdd.getStorageLevel
org.apache.spark.storage.StorageLevel = StorageLevel(memory, deserialized, 1 replicas)

위 코드에서 보이듯이 cache()메서드는
RDD에선 persist(StorageLevel.MEMORY_ONLY)
DF에선 persist(StorageLevel.MEMORY_AND_DISK) 로 작동한다.

4. 언캐싱

– 캐싱의 반대
unpersist()메서드를 쓰며 DFRDD는 따로 관리된다.

// DF 생성
scala> val test4 = Seq(1, 2, 3)
scala> val test4_df = test4.toDF()

// RDD & DF 캐싱
scala> test4_df.rdd.cache()
scala> test4_df.cache()

// RDD & DF 스토리지 레벨 확인
scala> test4_df.rdd.getStorageLevel
res3: org.apache.spark.storage.StorageLevel = StorageLevel(memory, deserialized, 1 replicas)
scala> test4_df.storageLevel
res4: org.apache.spark.storage.StorageLevel = StorageLevel(disk, memory, deserialized, 1 replicas)

// DF 언캐싱
scala> test4_df.unpersist()

// RDD & DF 스토리지 레벨 확인
scala> test4_df.rdd.getStorageLevel
res6: org.apache.spark.storage.StorageLevel = StorageLevel(memory, deserialized, 1 replicas)
scala> test4_df.storageLevel
res7: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)

// RDD 언캐싱
scala> test4_df.rdd.unpersist()

// RDD & DF 스토리지 레벨 확인
scala> test4_df.rdd.getStorageLevel
res9: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)
scala> test4_df.storageLevel
res10: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)

결론

cache()메서드와 persist()메서드는 둘 다 로드된 데이터를 저장공간상에 올려두는 작업을 하며,
RDD.cache()는 persist(StorageLevel.MEMORY_ONLY)
DF.cache()는 persist(StorageLevel.MEMORY_AND_DISK) 로 작동한다.

즉, val df = Seq(1, 2, 3).toDF()라는 DataFrame이 있을 때
df.cache()df.persist(StorageLevel.MEMORY_AND_DISK)와 같다고 볼 수 있다.


추가) Temp View 캐싱에 관하여

Spark에서 sql문으로 작업을 하기 위해 Temp View를 생성한다.
Temp View도 캐싱이 가능하다.
이전에 Temp View 관련 글에서 Temp View 삭제 메서드를 쓸 때
spark.catalog.dropTempView()메서드를 사용했다.
그렇다. Temp View나 Table등을 관리하는 메서드들은 spark.catalog에 모아져있다.
spark.catalog를 살펴보자.

Catalog

spark_catalog 여러 메서드들 중에 간단히 몇가지만 살펴보겠다. 자세한 내용은 여기
cacheTable, uncacheTable : View 캐싱, 언캐싱
dropTempView, dropGlobalTempView : Drop View & Global View
isCached : 캐싱 상태확인
clearCache : 모든 View 언캐싱
tableExists : View 존재확인
위 메서드들 말고도 테이블이나 칼럼의 리스트를 본다거나 존재여부 등을 체크하는 메서드들이 있다.

아래는 간단한 DF의 TempView를 만들어서 캐싱 해보았다.

// DF 생성
scala> val test1_df = Seq(1, 2, 3).toDF()

// View 생성
scala> test1_df.createTempView("test1_tempView")

// DF & Temp View 스토리지 레벨 체크
scala> test1_df.storageLevel
scala> spark.catalog.isCached("test1_tempView")
resN: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)
resN: Boolean = false

// Temp View Caching
scala> spark.catalog.cacheTable("test1_tempView")

// DF & Temp View 스토리지 레벨 체크
scala> test1_df.storageLevel
scala> spark.catalog.isCached("test1_tempView")
resN: org.apache.spark.storage.StorageLevel = StorageLevel(disk, memory, deserialized, 1 replicas)
resN: Boolean = true

위 코드에서 뭔가 이상한게 하나있다.
Temp View만 캐싱했는데 DF까지 캐싱이 되었다.
DF와 그 DF로 생성한 View, Table 등은 스토리지 레벨을 공유한다.
즉, 위 코드에서 DF나 TempView를 언캐싱하면 둘 다 언캐싱 된다는 소리다.
아래 코드는 위 코드의 test1_tempView를 언캐싱해보겠다.

#!/usr/bin/env spark-shell
// DF 언캐싱
scala> spark.catalog.uncacheTable("test1_tempView")

// DF & Temp View 스토리지 레벨 체크
scala> test1_df.storageLevel
scala> spark.catalog.isCached("test1_tempView")
resN: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)
resN: Boolean = false

예상대로이다.
test1_df.unpersist()를 (또는 spark.catalog.clearCache()) 실행해도 결과는 마찬가지
그렇다면 기존 DF가 캐싱이 된 상태에서 View를 생성한다면?
View와 DF는 스토리지 레벨을 공유하므로 생성된 View도 캐싱이 자동으로 될것이다.
확인해보자.

// DF 캐싱 상태 확인
scala> test1_df.storageLevel
resN: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)

// DF 캐싱
scala> test1_df.cache()
resN: test1_df.type = [value: int]

// View 생성
scala> test1_df.createTempView("abc")

// View 캐싱 상태 확인
scala> spark.catalog.isCached("abc")
resN: Boolean = true

예상대로이다.
그렇다면 또 다른 궁금증.
현재 View가 test1_tempViewabc가 있는데 여기서 하나의 View를 언캐싱 한다면??
두 View 모두 같은 test1_df로 만들어졌으므로 하나의 View를 언캐싱하면 DF도 언캐싱되고
그것과 스토리지레벨을 공유하는 또다른 View도 언캐싱될것으로 예상된다. 살펴보자.

scala> spark.catalog.listTables().show()
+--------------+--------+-----------+---------+-----------+
|          name|database|description|tableType|isTemporary|
+--------------+--------+-----------+---------+-----------+
|test1_tempview|    null|       null|TEMPORARY|       true|
|           abc|    null|       null|TEMPORARY|       true|
+--------------+--------+-----------+---------+-----------+

// DF & Views 스토리지 레벨 체크
scala> test1_df.storageLevel
scala> spark.catalog.isCached("test1_tempView")
scala> spark.catalog.isCached("abc")
resN: org.apache.spark.storage.StorageLevel = StorageLevel(disk, memory, deserialized, 1 replicas)
resN: Boolean = true
resN: Boolean = true

// 하나의 View 언캐싱
scala> spark.catalog.uncacheTable("abc")

// DF & Views 스토리지 레벨 체크
scala> test1_df.storageLevel
scala> spark.catalog.isCached("test1_tempView")
scala> spark.catalog.isCached("abc")
resN: org.apache.spark.storage.StorageLevel = StorageLevel(1 replicas)
resN: Boolean = false
resN: Boolean = false

예상대로이다.

이번엔 View를 Drop해보자.

// View 존재 확인
scala> spark.catalog.tableExists("test1_tempView")
resN: Boolean = true

// Drop View
scala> spark.catalog.dropTempView("test1_tempView")
resN: Boolean = true

// View 존재 확인
scala> spark.catalog.tableExists("test1_tempView")
resN: Boolean = false

scala> spark.catalog.listTables().show()
+--------------+--------+-----------+---------+-----------+
|          name|database|description|tableType|isTemporary|
+--------------+--------+-----------+---------+-----------+
|           abc|    null|       null|TEMPORARY|       true|
+--------------+--------+-----------+---------+-----------+

참고자료

링크: databricks Cache()

Spark docs: object StorageLevel

Spark docs: abstract class Catalog

Blog: 조대협님의 블로그 글