개발 기록장

03. Spark 프로그래밍: SQL 본문

데브코스(DE)/하둡과 Spark

03. Spark 프로그래밍: SQL

jxwxnk 2024. 6. 20. 16:53
반응형

학습 주제: Spark SQL, Aggregation, Join, UDF, Hive 메타스토어, 유닛 테스트

Spark SQL 소개

Spark SQL이란?

  • 구조화된 데이터 처리를 위한 Spark 모듈
  • 데이터프레임 작업을 SQL로 처리 가능
    • 데이터프레임에 테이블 이름 지정 후 sql함수 사용가능
    • HQL(Hive Query Language)와 호환 제공: Hive 테이블 읽고, 쓰기 가능(Hive Metastore)

Spark SQL vs. DataFrame

  • SQL로 가능한 작업이라면 DataFrame을 사용할 이유가 없음

    • 두 개를 동시에도 사용 가능
  • Familiarity/Readability

    • SQL이 가독성이 더 좋고, 더 많은 사람들이 사용 가능
  • Optimization

    • Spark SQL 엔진이 최적화하기 더 좋음(SQL은 Declarative)
    • Catalyst Optimizer와 Project Tungsten
  • Interoperability/Data Management

    • SQL이 포팅도 쉽고, 접근권한 체크도 쉬움

Spark SQL 사용법 - SQL 사용 방법

  • 데이터프레임을 기반으로 테이블 뷰 생성: 테이블이 만들어짐
    • createOrReplaceTempView: spark Session이 살아있는 동안 존재
    • createOrReplaceGlobalTempView: Spark 드라이버가 살아있는 동안 존재
  • Spark Session의 sql 함수로 SQL 결과를 데이터 프레임으로 받음
namegender_df.createOrReplaceTempView("namegender") 
namegender_group_df = spark.sql("""
    SELECT gender, count(1) FROM namegender GROUP BY 1 
    """)
print(namegender_group_df.collect())

SparkSession 사용 외부 데이터베이스 연결

  • Spark Session의 read 함수를 호출(로그인 관련 정보와 읽어오고자 하는 테이블 혹은 SQL 지정)
  • 결과가 데이터 프레임으로 리턴됨
df_user_session_channel = spark.read \
    .format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", "jdbc:redshift://HOST:PORT/DB?user=ID&password=PASSWORD") \ 
    .option("dbtable", "raw_data.user_session_channel") \     
    .load()

Aggregation, JOIN, UDF

Aggregation 함수

DataFrame이 아닌 SQL로 작성하는 것을 추천

  • Group By
  • Window
  • Rank

JOIN

  • SQL 조인은 두 개 혹은 그 이상의 테이블들을 공통 필드를 가지고 merge
  • 스타 스키마로 구성된 테이블들로 분산되어 있던 정보를 통합하는데 사용
  • 왼쪽 테이블을 LEFT라고 하고 오른쪽 테이블을 RIGHT이라고 하면
    • JOIN의 결과는 방식에 따라 양쪽의 필드를 모든 가진 새로운 테이블 생성
    • 조인 방식에 따라 다음 두가지가 달라짐
      • 어떤 레코드들이 선택되는가?
      • 어떤 필드들이 채워지는가?

JOIN의 종류

  • INNERJOIN
  • LEFT JOIN
  • RIGHT JOIN
  • FULL JOIN
  • CROSS JOIN
  • SELF JOIN

최적화 관점에서 본 조인 종류

Shuffle JOIN

  • 일반 조인 방식
  • Bucket JOIN: 조인키를 바탕으로 파티션을 새로 만들고 조인하는 방식
  • Broadcast JOIN*
  • 큰 데이터와 작은 데이터 간의 조인
  • 데이터프레임 하나가 충분히 작으면 작은 데이터 프레임을 다른 데이터 프레임이 있는 서버들로 뿌리는 것(broadcasting)
    • spark.sql.autoBroadcastJoinThreshold 파라미터로 충분히 작은지 여부 결정

UDF(User Defined Function)

  • DataFrame이나 SQK에서 적용할 수 있는 사용자 정의 함수
  • Scalar함수 vs. Aggregation 함수
    • Scalar 함수: UPPER, LOWER 등
    • Aggregation 함수(UDAF): SUM, MIN, MAX

UDF 사용 방법

  • 함수 구현
    • 파이썬 람다 함수
    • 파이썬 (보통) 함수
    • 파이썬 판다스 함수
      • pyspark.sql.functions.pandas_udf로 annotation
      • Apache Arrow를 사용해서 파이썬 객체를 자바 객치로 변환이 훨씬 더 효율적
  • 함수 등록
    • pyspark.sql.functions.udf: DataFrame에서만 사용 가능
    • spark.udf.register: SQL 모두에서 사용 가능
  • 함수 사용
    • .withColumn, .agg
    • SQL

만약 성능이 중요하다면?
Scala나 Java로 구현하는 것이 가장 좋음
파이썬을 사용해야한다면 Pandas UDF로 구현

UDF: DataFrame에 사용

import pyspark.sql.functions as F
import pyspark.sql.types import *

upperUDF = F.udf(lambda z:z.upper())
df.withColumn("Curated Name", upperUDF("Name"))

UDF: SQL에 사용

def upper(s):
    return s.upper()

# 먼저 테스트
upperUDF = spark.udf.register("upper", upper)
spark.sql("SELECT upper('aBcD')").show()

# DataFrame 기반 SQL에 적용
df.createOrReplaceTempView("test")
spark.sql("""SELECT name, upper(name) "Curated Name "FROM test""").show()

Hive 메타 스토어

Spark 데이터베이스와 테이블

  • 카탈로그: 테이블과 뷰에 관한 메타 데이터 관리

    • 기본으로 메모리 기반 카탈로그 제공: 세션이 끝나면 사라짐
    • Hive와 호환되는 카탈로그 제공: Persistent
  • 테이블 관리 방식

    • 테이블들은 데이터베이스라 부르는 폴더와 같은 구조로 관리(2단계)
  • 메모리 기반 테이블/ 뷰: 임시테이블

  • 스토리지 기반 테이블

    • 기본적으로 HDFS와 Parquet 포맷 사용
    • Hive와 호환되는 메타스토어 사용
    • 두 종류의 테이블 존재(Hive와 동일한 개념)
      • Managed Table: Spark이 실제 데이터와 메타 데이터 모두 관리
      • Unmanaged (External) Table: Spark이 메타 데이터만 관리

SparkSQL - 스토리지 기반 카탈로그 사용 방법

  • Hive와 호환되는 메타스토어 사용
  • SparkSession 생성 시 enableHiveSupport() 호출
    • 기본으로 "default"라는 이름의 데이터베이스 생성
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark Hive") \ 
    .enableHiveSupport() \ 
    .getOrCreate()

SparkSQL - Managed Table 사용 방법

  • 두 가지 테이블 생성방법
    • dataframe.saveAsTable("테이블이름")
    • SQL 문법 사용(CREATE TABLE, CTAS)
  • spark.sql.warehouse.dir가 가리키는 위치에 데이터 저장됨
    • PARQUET이 기본 데이터 포맷
  • 선호하는 테이블 타입
  • Spark 테이블로 처리하는 것의 장점(파일로 저장하는 것과 비교시)
    • JDBC/ODBC 등으로 Spark를 연결해서 접근 가능(태블로, power BI)

SparkSQL - External Table 사용 방법

  • 이미 HDFS에 존재하는 데이터에 스키마 정의해서 사용
    • LOCATION이란 프로퍼티 사용
  • 메타데이터만 카탈로그에 기록됨
    • 데이터는 이미 존재
    • External Table은 삭제되어도 데이터는 그대로임
CREATE TABLE table_name ( 
column1 type1,
column2 type2,
column3 type3,
... 
)
USING PARQUET
LOCATION 'hdfs_path';

유닛 테스트

  • 코드 상의 특정 기능(보통 메소드의 형태)을 테스트하기 위해 작성된 코드
  • 보통 정해진 입력을 주고 예상된 출력이 나오는지 형태로 테스트
  • CI/CD를 사용하려면 전체 코드의 테스트 커버리지가 매우 중요
  • 각 언어별로 정해진 테스트 프레임웍을 사용하는 것이 일반적
    • Java: JUnit
    • .NET: NUnit
    • Python: unittest
반응형

'데브코스(DE) > 하둡과 Spark' 카테고리의 다른 글

02. Spark 프로그래밍: DataFrame  (0) 2024.06.19
01. 빅데이터 처리와 Spark 소개  (1) 2024.06.18