19 min to read BigData
[Spark를 활용한 데이터분석] 4. 항공사들의 운항거리, 횟수 톺아보기
강의 링크: PySpark를 활용한 데이터분석
edwith의 PySpark를 활용한 데이터분석 강좌의 내용 정리.
PySpark이 아닌 Scala Spark으로 진행
키워드
– DataFrame.filter()
– DataFrame.count()
– DataFrame.describe()
– spark.sql.functions.col()
(Column 지정 메서드)
– spark.sql.functions.{min, max, mean}
이전 강의에선 어떤 항공사들이 있는지 살펴보았다.
이번엔 항공사들의 운항거리와 횟수를 살펴보자.
이전 강의: 3. 얼마나 많은 항공사가 있을까?
운항거리 & 횟수 톺아보기
– 1. DL 항공사의 1990년도 운항 횟수 알아보기
– 2. 운항 거리의 최소, 최대, 평균 알아보기
1. DL 항공사의 1990년도 운항 횟수는?
– filter()
메서드를 이용해 조건 설정
– col()
메서드로 column 지정
#!/usr/bin/env scala
import org.apache.spark.sql.functions.col
// DL 항공만 추출
us_carrier_df.filter(col("UniqueCarrier") === "DL").show()
// same As
// us_carrier_df.filter($"UniqueCarrier" === "DL").show()
// 1990년도 데이터만 추출
us_carrier_df.filter(col("Year") === 1990).show()
// same As
// us_carrier_df.filter($"UniqueCarrier" === "DL").show()
// 위 두 명령을 합쳐보자.
// us_carrier_df.filter(col("UniqueCarrier") === "DL").filter(col("Year") === 1990).show()
us_carrier_df.filter((col("UniqueCarrier") === "DL") && (col("Year") === 1990).show()
// same As
// us_carrier_df.filter(($"UniqueCarrier" === "DL") && ($"Year" === 1990)).show()
// SQL문으로 작성시
spark.sql("""
SELECT *
FROM global_temp.us_carrier
WHERE UniqueCarrier == 'DL' AND Year == 1990
""").show()
---------------------------------------------------------------
Output:
+----+-----+----------+---------+-------------+---------+-------+-----------------+--------------+------+----+--------+---------+--------+
|Year|Month|DayofMonth|DayOfWeek|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|Origin|Dest|Distance|Cancelled|Diverted|
+----+-----+----------+---------+-------------+---------+-------+-----------------+--------------+------+----+--------+---------+--------+
|1990| 1| 20| 6| DL| 229| null| 184| 181| ATL| DEN| 1199| false| false|
|1990| 1| 21| 7| DL| 229| null| 182| 181| ATL| DEN| 1199| false| false|
|1990| 1| 22| 1| DL| 229| null| 192| 181| ATL| DEN| 1199| false| false|
|1990| 1| 23| 2| DL| 229| null| 193| 181| ATL| DEN| 1199| false| false|
|1990| 1| 24| 3| DL| 229| null| 182| 181| ATL| DEN| 1199| false| false|
|1990| 1| 25| 4| DL| 229| null| 181| 181| ATL| DEN| 1199| false| false|
|1990| 1| 26| 5| DL| 229| null| 184| 181| ATL| DEN| 1199| false| false|
|1990| 1| 27| 6| DL| 229| null| 191| 181| ATL| DEN| 1199| false| false|
|1990| 1| 28| 7| DL| 229| null| 186| 181| ATL| DEN| 1199| false| false|
|1990| 1| 29| 1| DL| 229| null| 187| 181| ATL| DEN| 1199| false| false|
|1990| 1| 30| 2| DL| 229| null| 176| 181| ATL| DEN| 1199| false| false|
|1990| 1| 31| 3| DL| 229| null| 184| 181| ATL| DEN| 1199| false| false|
|1990| 1| 1| 1| DL| 229| null| 81| 91| RIC| ATL| 481| false| false|
|1990| 1| 2| 2| DL| 229| null| 88| 91| RIC| ATL| 481| false| false|
|1990| 1| 3| 3| DL| 229| null| 84| 91| RIC| ATL| 481| false| false|
|1990| 1| 4| 4| DL| 229| null| 96| 91| RIC| ATL| 481| false| false|
|1990| 1| 5| 5| DL| 229| null| 105| 91| RIC| ATL| 481| false| false|
|1990| 1| 6| 6| DL| 229| null| 91| 91| RIC| ATL| 481| false| false|
|1990| 1| 7| 7| DL| 229| null| 123| 91| RIC| ATL| 481| false| false|
|1990| 1| 8| 1| DL| 229| null| 88| 91| RIC| ATL| 481| false| false|
+----+-----+----------+---------+-------------+---------+-------+-----------------+--------------+------+----+--------+---------+--------+
only showing top 20 rows
– 이제 운항 횟수를 알아보자.
– 위에서 추출한 데이터 프레임의 row수가 우리가 원하는 그것이다.
#!/usr/bin/env scala
us_carrier_df.filter(($"UniqueCarrier" === "DL") && ($"Year") === 1990).count()
-------------------------------------
Output:
resN: Long = 824062
2. 운항 거리의 최소, 최대, 평균 알아보기
– 특정한 column값 전체에 수행되는 연산을 집계 연산(aggregation operation)
이라고 한다.
– 관련 기능들은 org.apache.spark.sql.functions
안에 모여있다.
– min(), max(), mean()
메서드들을 사용해서 집계 연산을 해보자.
#!/usr/bin/env scala
import org.apache.spark.sql.functions.{min, mean, max}
us_carrier_df.select(min($"distance"), max($"distance"), mean($"distance")).show()
-----------------------------------------------
Output:
+-------------+-------------+-----------------+
|min(distance)|max(distance)| avg(distance)|
+-------------+-------------+-----------------+
| 0| 4983|701.6985307716058|
+-------------+-------------+-----------------+
import org.apache.spark.sql.functions.{min, mean, max}
– 최대거리는 4983, 평균 701이 나왔다.
– 그런데 이상하게 최소거리가 0인 비행 기록이 있다.
– 어떤 비행인지 살펴보자.
#!/usr/bin/env scala
us_carrier_df.filter($"distance" === 0).show()
-----------------------------------------------
Output:
+----+-----+----------+---------+-------------+---------+-------+-----------------+--------------+------+----+--------+---------+--------+
|Year|Month|DayofMonth|DayOfWeek|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|Origin|Dest|Distance|Cancelled|Diverted|
+----+-----+----------+---------+-------------+---------+-------+-----------------+--------------+------+----+--------+---------+--------+
|1989| 10| 24| 2| US| 2503| null| 48| 61| LAX| LAX| 0| false| false|
|1989| 10| 3| 2| AA| 83| null| null| 245| ORD| ORD| 0| false| true|
|1987| 10| 25| 7| EA| 759| null| null| 20| FLL| FLL| 0| false| true|
|1987| 10| 14| 3| HP| 206| null| null| 50| LAS| LAS| 0| false| true|
|1987| 10| 5| 1| HP| 857| null| null| 60| PHX| PHX| 0| true| false|
|1987| 10| 4| 7| EA| 2| null| null| 20| LGA| LGA| 0| false| true|
|1987| 10| 18| 7| EA| 152| null| null| 25| JFK| JFK| 0| false| true|
+----+-----+----------+---------+-------------+---------+-------+-----------------+--------------+------+----+--------+---------+--------+
– 운항이 취소되거나(Cancelled), 이륙 후 회항(Diverted)한 항공편들이다.
ㄴ> 이 경우 출발지(Origin)와 도착지(Dest)가 같은 것도 특징이다.
– 그렇다면 가장 먼거리를 운항한 항공편들을 살펴보자.
#!/usr/bin/env scala
us_carrier_df.filter($"distance" === 4983).show()
-----------------------------------------------------
Output:
+----+-----+----------+---------+-------------+---------+-------+-----------------+--------------+------+----+--------+---------+--------+
|Year|Month|DayofMonth|DayOfWeek|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|Origin|Dest|Distance|Cancelled|Diverted|
+----+-----+----------+---------+-------------+---------+-------+-----------------+--------------+------+----+--------+---------+--------+
|1988| 1| 1| 5| PA (1)| 17| null| 689| 645| JFK| HNL| 4983| false| false|
|1988| 1| 2| 6| PA (1)| 17| null| 674| 645| JFK| HNL| 4983| false| false|
|1988| 1| 8| 5| PA (1)| 17| null| null| 645| JFK| HNL| 4983| true| false|
|1988| 1| 9| 6| PA (1)| 17| null| 692| 645| JFK| HNL| 4983| false| false|
|1988| 1| 15| 5| PA (1)| 17| null| 660| 645| JFK| HNL| 4983| false| false|
|1988| 1| 16| 6| PA (1)| 17| null| 690| 645| JFK| HNL| 4983| false| false|
|1988| 1| 22| 5| PA (1)| 17| null| 590| 645| JFK| HNL| 4983| false| false|
|1988| 1| 23| 6| PA (1)| 17| null| 643| 645| JFK| HNL| 4983| false| false|
|1988| 1| 29| 5| PA (1)| 17| null| 672| 645| JFK| HNL| 4983| false| false|
|1988| 1| 1| 5| PA (1)| 18| null| 501| 540| HNL| JFK| 4983| false| false|
|1988| 1| 2| 6| PA (1)| 18| null| 504| 540| HNL| JFK| 4983| false| false|
|1988| 1| 8| 5| PA (1)| 18| null| null| 540| HNL| JFK| 4983| true| false|
|1988| 1| 9| 6| PA (1)| 18| null| 517| 540| HNL| JFK| 4983| false| false|
|1988| 1| 15| 5| PA (1)| 18| null| 536| 540| HNL| JFK| 4983| false| false|
|1988| 1| 16| 6| PA (1)| 18| null| 520| 540| HNL| JFK| 4983| false| false|
|1988| 1| 22| 5| PA (1)| 18| null| 538| 540| HNL| JFK| 4983| false| false|
|1988| 1| 23| 6| PA (1)| 18| null| 548| 540| HNL| JFK| 4983| false| false|
|1988| 1| 29| 5| PA (1)| 18| null| 523| 540| HNL| JFK| 4983| false| false|
|1988| 2| 6| 6| PA (1)| 17| null| 643| 645| JFK| HNL| 4983| false| false|
|1988| 2| 12| 5| PA (1)| 17| null| 597| 645| JFK| HNL| 4983| false| false|
+----+-----+----------+---------+-------------+---------+-------+-----------------+--------------+------+----+--------+---------+--------+
only showing top 20 rows
– 뉴욕(JFK)에서 하와이(HNL)까지 바로 가는 항공편이다.
– 여태까지 살펴본 최소, 최댓값 등의 통계치는 연속형 데이터에서 자주 살펴보게 되는 값들이다.
ㄴ> spark는 내장 메서드로 DataFrame.describe()
메서드를 제공한다.
– 평균, 최소, 최대, 개수, 표준편차를 담고있는 DataFrame
을 return해준다.
#!/usr/bin/env scala
us_carrier_df.describe("distance").show()
------------------------------------------
Output:
+-------+-----------------+
|summary| distance|
+-------+-----------------+
| count| 123332969|
| mean|701.6985307716058|
| stddev|551.2531700322845|
| min| 0|
| max| 4983|
+-------+-----------------+
다음 강의: 5. 운항 노선 알아보기
전체 Code
#!/usr/bin/env scala
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark EDA")
.getOrCreate()
// RDD를 DataFrame으로 바꾸는 것과 같은 암시적 변환(implicit conversion)을 처리하기 위해
import spark.implicits._
val raw_df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("s3a://edwith-pyspark-dataset")
// OR
//
//val raw_df = spark.read
// .options(Map("header"-> "true", "inferSchema"-> "true"))
// .csv("s3a://edwith-pyspark-dataset")
import org.apache.spark.sql.functions.udf
// 'UDFs' 라는 이름의 싱글턴 객체 안에
// udf로 변환할 메서드들 정의
object UDFs {
def stringToInteger(value: String): Option[Int] = {
if ((value.isEmpty) || (value == "NA")) None
else Some(value.toInt)
}
def integerToBoolean(value: Int): Boolean ={
if (value == 0) false
else true
}
}
// udf 변환
val stringToIntegerFunction = udf(UDFs.stringToInteger _)
val integerToBooleanFunction = udf(UDFs.integerToBoolean _)
// udf을 활용한 데이터 처리
val us_carrier_df = raw_df
.drop(
// 사용하지 않을 column 삭제
"DepTime", "CRSDepTime", "ArrTime", "CRSArrTime", "AirTime", "ArrDelay", "DepDelay", "TaxiIn", "TaxiOut",
"CancellationCode", "CarrierDelay", "WeatherDelay", "NASDelay", "SecurityDelay", "LateAircraftDelay")
.withColumn(
// 'NA' to null & Integer type으로 변경
"ActualElapsedTime", stringToIntegerFunction(raw_df("ActualElapsedTime")))
.withColumn(
// 'NA' to null & Integer type으로 변경
"CRSElapsedTime", stringToIntegerFunction(raw_df("CRSElapsedTime")))
.withColumn(
// 'NA' to null & Integer type으로 변경
"TailNum", stringToIntegerFunction(raw_df("TailNum")))
.withColumn(
// 'NA' to null & Integer type으로 변경
"Distance", stringToIntegerFunction(raw_df("Distance")))
.withColumn(
// Boolean type으로 변경
"Cancelled", integerToBooleanFunction(raw_df("Cancelled")))
.withColumn(
// Boolean type으로 변경
"Diverted", integerToBooleanFunction(raw_df("Diverted")))
// Schema 확인
us_carrier_df.printSchema()
// 실제 데이터 확인
us_carrier_df.show()
// 캐싱
us_carrier_df.cache()
// 전역 임시 뷰 생성
us_carrier_df.createOrReplaceGlobalTempView("us_carrier")
// SQL문으로 조회
spark.sql("SELECT * FROM global_temp.us_carrier LIMIT 10")
// 'UniqueCarrier' column만을 가지는 DataFrame
val carriers_only_df = us_carrier_df.select("UniqueCarrier")
// distinct() 메서드를 이용한 유니크 값 추출 (시간 소요가 조금 있다.)
val carriers_only_distinct_df = carriers_only_df.distinct()
// same as
// spark.sql("SELECT DISTINCT UniqueCarrier FROM global_temp.us_carrier").show()
carriers_only_distinct_df.show()
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.{min, max, mean}
// DL 항공만 추출
us_carrier_df.filter(col("UniqueCarrier") === "DL").show()
// same As
// us_carrier_df.filter($"UniqueCarrier" === "DL").show()
// 1990년도 데이터만 추출
us_carrier_df.filter(col("Year") === 1990).show()
// same As
// us_carrier_df.filter($"UniqueCarrier" === "DL").show()
// 위 두 명령을 합쳐보자.
// us_carrier_df.filter(col("UniqueCarrier") === "DL").filter(col("Year") === 1990).show()
us_carrier_df.filter((col("UniqueCarrier") === "DL") && (col("Year") === 1990).show()
// same As
// us_carrier_df.filter(($"UniqueCarrier" === "DL") && ($"Year" === 1990)).show()
// SQL문으로 작성시
spark.sql("""
SELECT *
FROM global_temp.us_carrier
WHERE UniqueCarrier == 'DL' AND Year == 1990
""").show()
// DL항공의 1990년도 운항 횟수
us_carrier_df.filter(($"UniqueCarrier" === "DL") && ($"Year") === 1990).count()
// 운항거리 최소, 최대, 평균
us_carrier_df.select(min($"distance"), max($"distance"), mean($"distance")).show()
// 운항거리가 0(최소)인 데이터 조회
us_carrier_df.filter($"distance" === 0).show()
// 운항거리가 4983(최대)인 데이터 조회
us_carrier_df.filter($"distance" === 4983).show()
// DataFrame.describe() 메서드로 한번에 알아보자.
us_carrier_df.describe("distance").show()