일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
Tags
- Hive
- 컨테이너 삭제
- airflow.cfg
- airflow
- 웹 스크래핑
- truncate
- dag
- ELT
- docker hub
- 알고리즘
- Django
- AWS
- dag 작성
- 데이터파이프라인
- Serializer
- 데이터마트
- ETL
- Kafka
- snowflake
- spark
- 데이터 웨어하우스
- selenium
- Django Rest Framework(DRF)
- docker
- SQL
- docker-compose
- 웹 크롤링
- yarn
- redshift
- 데이터레이크
Archives
- Today
- Total
개발 기록장
03. Spark 프로그래밍: SQL 본문
반응형
학습 주제: Spark SQL, Aggregation, Join, UDF, Hive 메타스토어, 유닛 테스트
Spark SQL 소개
Spark SQL이란?
- 구조화된 데이터 처리를 위한 Spark 모듈
- 데이터프레임 작업을 SQL로 처리 가능
- 데이터프레임에 테이블 이름 지정 후 sql함수 사용가능
- HQL(Hive Query Language)와 호환 제공: Hive 테이블 읽고, 쓰기 가능(Hive Metastore)
Spark SQL vs. DataFrame
SQL로 가능한 작업이라면 DataFrame을 사용할 이유가 없음
- 두 개를 동시에도 사용 가능
Familiarity/Readability
- SQL이 가독성이 더 좋고, 더 많은 사람들이 사용 가능
Optimization
- Spark SQL 엔진이 최적화하기 더 좋음(SQL은 Declarative)
- Catalyst Optimizer와 Project Tungsten
Interoperability/Data Management
- SQL이 포팅도 쉽고, 접근권한 체크도 쉬움
Spark SQL 사용법 - SQL 사용 방법
- 데이터프레임을 기반으로 테이블 뷰 생성: 테이블이 만들어짐
- createOrReplaceTempView: spark Session이 살아있는 동안 존재
- createOrReplaceGlobalTempView: Spark 드라이버가 살아있는 동안 존재
- Spark Session의 sql 함수로 SQL 결과를 데이터 프레임으로 받음
namegender_df.createOrReplaceTempView("namegender")
namegender_group_df = spark.sql("""
SELECT gender, count(1) FROM namegender GROUP BY 1
""")
print(namegender_group_df.collect())
SparkSession 사용 외부 데이터베이스 연결
- Spark Session의 read 함수를 호출(로그인 관련 정보와 읽어오고자 하는 테이블 혹은 SQL 지정)
- 결과가 데이터 프레임으로 리턴됨
df_user_session_channel = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", "jdbc:redshift://HOST:PORT/DB?user=ID&password=PASSWORD") \
.option("dbtable", "raw_data.user_session_channel") \
.load()
Aggregation, JOIN, UDF
Aggregation 함수
DataFrame이 아닌 SQL로 작성하는 것을 추천
- Group By
- Window
- Rank
JOIN
- SQL 조인은 두 개 혹은 그 이상의 테이블들을 공통 필드를 가지고 merge
- 스타 스키마로 구성된 테이블들로 분산되어 있던 정보를 통합하는데 사용
- 왼쪽 테이블을 LEFT라고 하고 오른쪽 테이블을 RIGHT이라고 하면
- JOIN의 결과는 방식에 따라 양쪽의 필드를 모든 가진 새로운 테이블 생성
- 조인 방식에 따라 다음 두가지가 달라짐
- 어떤 레코드들이 선택되는가?
- 어떤 필드들이 채워지는가?
JOIN의 종류
- INNERJOIN
- LEFT JOIN
- RIGHT JOIN
- FULL JOIN
- CROSS JOIN
- SELF JOIN
최적화 관점에서 본 조인 종류
Shuffle JOIN
- 일반 조인 방식
- Bucket JOIN: 조인키를 바탕으로 파티션을 새로 만들고 조인하는 방식
- Broadcast JOIN*
- 큰 데이터와 작은 데이터 간의 조인
- 데이터프레임 하나가 충분히 작으면 작은 데이터 프레임을 다른 데이터 프레임이 있는 서버들로 뿌리는 것(broadcasting)
- spark.sql.autoBroadcastJoinThreshold 파라미터로 충분히 작은지 여부 결정
- spark.sql.autoBroadcastJoinThreshold 파라미터로 충분히 작은지 여부 결정
UDF(User Defined Function)
- DataFrame이나 SQK에서 적용할 수 있는 사용자 정의 함수
- Scalar함수 vs. Aggregation 함수
- Scalar 함수: UPPER, LOWER 등
- Aggregation 함수(UDAF): SUM, MIN, MAX
UDF 사용 방법
- 함수 구현
- 파이썬 람다 함수
- 파이썬 (보통) 함수
- 파이썬 판다스 함수
- pyspark.sql.functions.pandas_udf로 annotation
- Apache Arrow를 사용해서 파이썬 객체를 자바 객치로 변환이 훨씬 더 효율적
- 함수 등록
- pyspark.sql.functions.udf: DataFrame에서만 사용 가능
- spark.udf.register: SQL 모두에서 사용 가능
- 함수 사용
- .withColumn, .agg
- SQL
만약 성능이 중요하다면?
Scala나 Java로 구현하는 것이 가장 좋음
파이썬을 사용해야한다면 Pandas UDF로 구현
UDF: DataFrame에 사용
import pyspark.sql.functions as F
import pyspark.sql.types import *
upperUDF = F.udf(lambda z:z.upper())
df.withColumn("Curated Name", upperUDF("Name"))
UDF: SQL에 사용
def upper(s):
return s.upper()
# 먼저 테스트
upperUDF = spark.udf.register("upper", upper)
spark.sql("SELECT upper('aBcD')").show()
# DataFrame 기반 SQL에 적용
df.createOrReplaceTempView("test")
spark.sql("""SELECT name, upper(name) "Curated Name "FROM test""").show()
Hive 메타 스토어
Spark 데이터베이스와 테이블
카탈로그: 테이블과 뷰에 관한 메타 데이터 관리
- 기본으로 메모리 기반 카탈로그 제공: 세션이 끝나면 사라짐
- Hive와 호환되는 카탈로그 제공: Persistent
테이블 관리 방식
- 테이블들은 데이터베이스라 부르는 폴더와 같은 구조로 관리(2단계)
- 테이블들은 데이터베이스라 부르는 폴더와 같은 구조로 관리(2단계)
메모리 기반 테이블/ 뷰: 임시테이블
스토리지 기반 테이블
- 기본적으로 HDFS와 Parquet 포맷 사용
- Hive와 호환되는 메타스토어 사용
- 두 종류의 테이블 존재(Hive와 동일한 개념)
- Managed Table: Spark이 실제 데이터와 메타 데이터 모두 관리
- Unmanaged (External) Table: Spark이 메타 데이터만 관리
SparkSQL - 스토리지 기반 카탈로그 사용 방법
- Hive와 호환되는 메타스토어 사용
- SparkSession 생성 시 enableHiveSupport() 호출
- 기본으로 "default"라는 이름의 데이터베이스 생성
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark Hive") \
.enableHiveSupport() \
.getOrCreate()
SparkSQL - Managed Table 사용 방법
- 두 가지 테이블 생성방법
- dataframe.saveAsTable("테이블이름")
- SQL 문법 사용(CREATE TABLE, CTAS)
- spark.sql.warehouse.dir가 가리키는 위치에 데이터 저장됨
- PARQUET이 기본 데이터 포맷
- 선호하는 테이블 타입
- Spark 테이블로 처리하는 것의 장점(파일로 저장하는 것과 비교시)
- JDBC/ODBC 등으로 Spark를 연결해서 접근 가능(태블로, power BI)
SparkSQL - External Table 사용 방법
- 이미 HDFS에 존재하는 데이터에 스키마 정의해서 사용
- LOCATION이란 프로퍼티 사용
- 메타데이터만 카탈로그에 기록됨
- 데이터는 이미 존재
- External Table은 삭제되어도 데이터는 그대로임
CREATE TABLE table_name (
column1 type1,
column2 type2,
column3 type3,
...
)
USING PARQUET
LOCATION 'hdfs_path';
유닛 테스트
- 코드 상의 특정 기능(보통 메소드의 형태)을 테스트하기 위해 작성된 코드
- 보통 정해진 입력을 주고 예상된 출력이 나오는지 형태로 테스트
- CI/CD를 사용하려면 전체 코드의 테스트 커버리지가 매우 중요
- 각 언어별로 정해진 테스트 프레임웍을 사용하는 것이 일반적
- Java: JUnit
- .NET: NUnit
- Python: unittest
반응형
'데브코스(DE) > 하둡과 Spark' 카테고리의 다른 글
02. Spark 프로그래밍: DataFrame (0) | 2024.06.19 |
---|---|
01. 빅데이터 처리와 Spark 소개 (1) | 2024.06.18 |