[Spark를 활용한 데이터분석] 2. 데이터 전처리


PySpark이 아닌 Scala Spark으로 진행


데이터 전처리
UDF(User Define Function)
– DataFrame.drop()
– DataFrame.withColumn()
– Temp View 생성 (for SparkSession.sql())

이전 강의에서 Load한 데이터를 클렌징할 작정이다.

데이터 전처리

– 데이터 클렌징 이라고도 불림
– 데이터 전처리란 데이터를 가공해 분석에 용이하게 변경하는 과정
– 결측값 처리, 이상값 처리 등의 작업들을 말한다.

전처리 작업목록

– 필요없는 column 제거
– NA 문자열을 null로 바꾸기
– 몇몇 column들의 type 바꾸기

위 과정을 처리하기 위해
spark 기본 제공 함수가 아닌 유저가 직접 정의한 함수(UDF)를 사용할 예정이다.
UDF(User Define Function)를 만들어보자.

1. 일단 UDF함수로 변환할 함수 정의

#!/usr/bin/env scala

object UDFs {
    def stringToInteger(value: String): Option[Int] = {
        if ((value.isEmpty) || (value == "NA")) None
        else Some(value.toInt)
    def integerToBoolean(value: Int): Boolean = {
        if (value == 0) false 
        else true

2. udf함수로 변환

#!/usr/bin/env scala

import org.apache.spark.sql.functions.udf

val stringToIntegerFunction = udf(UDFs.stringToInteger _)
val integerToBooleanFunction = udf(UDFs.integerToBoolean _)

3. 데이터 처리

– udf함수를 이용한 처리
– 처리한 데이터 프레임을 us_carrier_df에 저장

#!/usr/bin/env scala

val us_carrier_df = raw_df
        // 사용하지 않을 column 삭제
        "DepTime", "CRSDepTime", "ArrTime", "CRSArrTime", "AirTime", "ArrDelay", "DepDelay", "TaxiIn", "TaxiOut",
        "CancellationCode", "CarrierDelay", "WeatherDelay", "NASDelay", "SecurityDelay", "LateAircraftDelay")
        // 'NA' to null & Integer type으로 변경
        "ActualElapsedTime", stringToIntegerFunction(raw_df("ActualElapsedTime")))
	// 'NA' to null & Integer type으로 변경
        "CRSElapsedTime", stringToIntegerFunction(raw_df("CRSElapsedTime")))
	// 'NA' to null & Integer type으로 변경
        "TailNum", stringToIntegerFunction(raw_df("TailNum")))
	// 'NA' to null & Integer type으로 변경
        "Distance", stringToIntegerFunction(raw_df("Distance")))
        // Boolean type으로 변경
        "Cancelled", integerToBooleanFunction(raw_df("Cancelled")))
	// Boolean type으로 변경
        "Diverted", integerToBooleanFunction(raw_df("Diverted")))

4. us_carrier_df 살펴보기

#!/usr/bin/env scala

// Schema 확인

// 실제 데이터 확인


 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: integer (nullable = true)
 |-- ActualElapsedTime: integer (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- Cancelled: boolean (nullable = true)
 |-- Diverted: boolean (nullable = true)

5. us_carrier_df 캐싱

– 자주 쓸 데이터이므로 클러스터 메모리상에 올리자.

#!/usr/bin/env scala


데이터 전처리도 끝났고,
그 데이터를 us_carrier_df에 데이터프레임 형태로 저장도했고, 캐싱도 마쳤다.
이제 us_carrier_df를 SQL문으로 톺아보자

1. ‘전역 임시 뷰’ 생성

createOrReplaceGlobalTempView() 메서드로 생성
createOrReplaceTempView() 와의 차이점은 전역 임시 뷰는 다른 세션에서도 사용할 수 있음.

#!/usr/bin/env scala


2. SQL문으로 조회

전역 임시 뷰는 시스템 데이터베이스에서 global_temp로 저장되므로 이를 참조하기 위해 전체 이름을 지정해야 함 (e.g. SELECT * FROM global_temp.view1)

#!/usr/bin/env scala

spark.sql("SELECT * FROM global_temp.us_carrier LIMIT 10")


|1988|    1|         9|        6|           PI|      942|   null|               70|            64|   SYR| BWI|     273|    false|   false|
|1988|    1|        10|        7|           PI|      942|   null|               69|            64|   SYR| BWI|     273|    false|   false|
|1988|    1|        11|        1|           PI|      942|   null|               67|            64|   SYR| BWI|     273|    false|   false|
|1988|    1|        12|        2|           PI|      942|   null|               64|            64|   SYR| BWI|     273|    false|   false|
|1988|    1|        13|        3|           PI|      942|   null|               82|            64|   SYR| BWI|     273|    false|   false|
|1988|    1|        14|        4|           PI|      942|   null|               75|            64|   SYR| BWI|     273|    false|   false|
|1988|    1|        15|        5|           PI|      942|   null|               63|            64|   SYR| BWI|     273|    false|   false|
|1988|    1|        16|        6|           PI|      942|   null|               60|            64|   SYR| BWI|     273|    false|   false|
|1988|    1|        17|        7|           PI|      942|   null|               69|            64|   SYR| BWI|     273|    false|   false|
|1988|    1|        18|        1|           PI|      942|   null|               90|            64|   SYR| BWI|     273|    false|   false|

다음 글 부터는 본격적으로 분석을 해보겠다.

전체 Code

#!/usr/bin/env scala

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .appName("Spark EDA")

// RDD를 DataFrame으로 바꾸는 것과 같은 암시적 변환(implicit conversion)을 처리하기 위해                                                    
import spark.implicits._

val raw_df = spark.read
  .option("header", "true")
  .option("inferSchema", "true")

// OR
//val raw_df = spark.read             
//  .options(Map("header"-> "true", "inferSchema"-> "true"))
//  .csv("s3a://edwith-pyspark-dataset")

import org.apache.spark.sql.functions.udf

// 'UDFs' 라는 이름의 싱글턴 객체 안에
// udf로 변환할 메서드들 정의
object UDFs {                                                         
    def stringToInteger(value: String): Option[Int] = {
        if ((value.isEmpty) || (value == "NA")) None
        else Some(value.toInt)
    def integerToBoolean(value: Int): Boolean ={
        if (value == 0) false
        else true

// udf 변환
val stringToIntegerFunction = udf(UDFs.stringToInteger _)
val integerToBooleanFunction = udf(UDFs.integerToBoolean _)

// udf을 활용한 데이터 처리
val us_carrier_df = raw_df
        // 사용하지 않을 column 삭제
        "DepTime", "CRSDepTime", "ArrTime", "CRSArrTime", "AirTime", "ArrDelay", "DepDelay", "TaxiIn", "TaxiOut",
        "CancellationCode", "CarrierDelay", "WeatherDelay", "NASDelay", "SecurityDelay", "LateAircraftDelay")
        // 'NA' to null & Integer type으로 변경
        "ActualElapsedTime", stringToIntegerFunction(raw_df("ActualElapsedTime")))
	// 'NA' to null & Integer type으로 변경
        "CRSElapsedTime", stringToIntegerFunction(raw_df("CRSElapsedTime")))
	// 'NA' to null & Integer type으로 변경
        "TailNum", stringToIntegerFunction(raw_df("TailNum")))
	// 'NA' to null & Integer type으로 변경
        "Distance", stringToIntegerFunction(raw_df("Distance")))
        // Boolean type으로 변경
        "Cancelled", integerToBooleanFunction(raw_df("Cancelled")))
	// Boolean type으로 변경
        "Diverted", integerToBooleanFunction(raw_df("Diverted")))

// Schema 확인

// 실제 데이터 확인

// 캐싱

// 전역 임시 뷰 생성

// SQL문으로 조회
spark.sql("SELECT * FROM global_temp.us_carrier LIMIT 10")

