High Performance Spark를 읽고 간단히 정리해본 글입니다.

스키마의 기초

  • 스키마의 정보와 그로 인해 가능해지는 최적화는 스파크 SQL과 코어 스파크 사이의 핵심 차이점 중 하나
  • 스키마는 보통 스파크 SQL에 의해 자동으로 처리됨
    • 데이터를 로딩할 때 스키마를 추측
    • 부모 DataFrame과 적용된 트랜스포메이션에 의해 계산
  • printSchema() 를 통해 보통 많이 DataFrame의 스키마를 확인
  • 복합적인 SQL type
    스칼라 타입(scala type) SQL 타입 설명 예제
    Array[T] ArrayType(elementType, containsNull) 한 가지 타입으로 이루어진 배열, null을 하나라도 갖고 있으면 containsNull이 true Array[Int] ⇒ ArrayType(IntegerType, true)
    Map[K,V] MapType(elementType. valueType, valueContainsNull) 키/값의 맵. 값에 null이 존재하면 valueContinsNull이 true Map[String, Int] ⇒ MapType(StringType, IntegerType, true)
    case class StructType (List[StructField]) 다양한 타입들로 이루어진 이름이 붙은 필드들 - 케이스 클래스나 자바 빈과 유사 case class Panda(name: String, age: Int) ⇒ StructType(List(StructField("name", StringType, true), StructField("age", IntegerType, true)))

DataFrame과 Dataset에서의 데이터 표현

  • 텅스텐 데이터 타입은 스파크에 필요한 연산 형태에 따라 최적화된 특수한 메모리 데이터 구조 등의 장점
  • 텅스텐의 데이터 표현 방식은 자바/kryo 직렬화보다 용량이 훨씬 적음
    • RDD 33mb
    • DataFrame 7mb
  • 텅스텐은 자바 객체 기반이 아니기 때문에 온힙과 오프힙을 모두 지원
  • Spark SQL에서는 kryo 직렬화와 칼럼 기반 저장 포맷만 사용해도 저장비용 최소화가 가능

이전에 repartitioncoalesce를 비교해본 포스트가 있었다.
그 당시만 해도 성능적으로는 coalesce()가 무조건적으로 좋을 거라고 생각했었는데, 최근 그 반대의 경우를 만나서 포스팅한다.

repartition()에서는 shuffle을 수행하기 때문에 coalesce()가 성능적으로 더 좋다고 언급했었는데, 언제나 그런 것은 아니다.
shuffle을 수행하게 되면 executor들이 쌓아두었던 메모리를 refresh하게 되는데, spill이 발생할 정도로 메모리를 많이 사용하고 있었다면 shuffle을 통해 메모리를 한 번 비워주는 것이 성능적으로 도움이 될 수 있다.

물론 데이터에 따라 결과는 달라질 수 있지만, 무조건적으로 coalesce()repartition()보다 성능이 좋지 않은 경우가 생길 수 있다는 점을 강조하고 싶어서 추가로 포스팅을 하게 되었다.

역시 튜닝은 어렵다..

High Performance Spark를 읽고 간단히 정리해본 글입니다.
잘못된 정보나 오류가 있다면 언제나 지적 부탁드립니다 :)

  • 스파크는 스파크 JVM 위에서 연산을 수행할 뿐, 데이터 저장 솔루션은 아님
    • 로컬 모드에서는 하나의 단일 JVM에서 수행
  • RDD를 기반으로 만들어진 데이터 처리 프레임워크

Spark SQL

  • 스파크 SQL의 경우 스파크 코어와는 다른 질의 최적화 엔진을 갖고 있기 때문에 성능에 대해 스파크 코어와는 다르게 고려해야 함
    • 물론 스파크 코어에서 사용되는 최적화 기법도 활용가능한 부분이 있음

Spark Streaming

  • 데이터의 마이크로 배치에 대한 스트리밍 분석에 스파크 코어의 스케쥴링을 사용

일반적인 동작 원리

  • driver에서 동작하는 프로그램을 사용자가 작성
  • 스파크가 RDD를 executor에 저장
    • RDD를 구성하는 객체를 partition이라고 부름
    • 저장된 executor가 아니라 다른 executor에서 계산될 수도 있음
  • 스파크 클러스터 manager는 스파크 application에서 executor들을 실행하고 분산해주는 역할을 맡음
  • 스파크 실행 엔진은 연산을 위해 executor들에게 데이터를 분산해주고 실행을 요청
  • 스파크는 driver가 RDD의 데이터 처리 단계를 결정하자마자 연산을 수행하지 않고 최종 RDD를 계산해야하는 시점에 RDD transform을 수행함
    • 보통 write작업 또는 결과 데이터를 driver에 보낼 때
  • 스파크는 더 빠른 접근/반복 연산을 위해 스파크 application이 동작하는 동안 로드된 RDD를 executor의 메모리에 가지고 있을 수 있음
    • 로드된 RDD는 immutable하기 때문에 데이터 transform 결과는 기존 RDD가 아니라 새로운 RDD를 리턴함

Spark Job 스케쥴링

  • 기본적으로 고수준의 스파크 로직이 작성되어있는 드라이버와 익스큐터 프로세스로 구성
  • 스파크 프로그램 자체는 드라이버에서 수행되며 명령들을 익스큐터에 보냄

Spark Application

  • 각 익스큐터는 각자의 자바 가상 머신(JVM)을 가짐
  • SparkConf 객체의 변수들을 통해 SparkContext 를 생성
  • RDD는 application끼리 공유할 수 없음
    • 따라서 join같이 여러 RDD를 쓰는 경우에는 동일한 SparkContext 를 가져야함
  • SparkContext 가 시작할 때 발생하는 일
    • 드라이버가 클러스터 매니저에게 ping을 보냄
    • 클러스터 매니저가 다수의 익스큐터들을 띄움
    • 하나의 RDD가 파티션이 존재하는 익스큐터들에 의해 수행됨
    • 익스큐터는 여러개의 파티션을 가질 수 있지만, 하나의 파티션을 다른 익스큐터와 나눌 수는 없음

기본 스파크 스케쥴러

  • 기본적으로 FIFO방식이 사용됨
  • round-robin 방식도 제공하기는 함
  • SparkContext 에서 해당 액션들이 호출된 순서대로 작업을 수행

스파크 잡의 해부

  • 스파크 애플리케이션은 드라이버가 액션을 호출하기 전 까지는 아무것도 하지 않음
  • 각 액션마다 스파크 스케쥴러는 실행 그래프를 만들고 스파크 잡을 수행
  • 각 잡은 최종 RDD를 만들기 위해 필요한 데이터 변환의 단계를 의미하는 stage로 구성
  • 각 stage는 병렬 연산의 한 단위를 의미
  • stage는 익스큐터 위에서 수행되는 다수의 task로 구성

Spark Application Tree

  • Spark Application (SparkContext/SparkSession)
    • Spark Job (collect, saveToEs, write)
      • Stage (넓은 transformation. sort, groupByKey, reduceBy)
        • Task (좁은 transformation)

DAG

  • 각 스파크 잡의 DAG를 만들기 위해 RDD 종속성 정보를 사용
    • Spark API에서는 DAG 스케쥴러라고 표현

Spark Job

  • 스파크 잡은 스파크의 실행 구성 중 가장 높은 단계의 요소
  • 각 스파크 잡은 하나의 액션과 대응
    • 액션은 드라이버에서 호출
  • 액션을 개념적으로 구체화하는 한 가지 정의는 데이터를 RDD 바깥으로 놓는 행위라고 표현 가능
    • collect 처럼 드라이버로 데이터를 보내거나
    • saveToEs 처럼 다른 데이터 저장 공간에 저장하거나
  • 액션은 언제나 DAG의 마지막 leaf node가 됨
    • 더이상 다른 RDD를 호출할 수 없기 때문

Stage

  • transformation은 액션이 호출되기 전까지는 실행되지 않음
  • 각 stage는 스파크에서 넓은 transformation에 의해 생성되는 셔플 의존성에 대응함
  • 높은 차원에서 보면 하나의 stage는 다륵 익스큐터나 드라이버와 통신하지 않고 익스큐터에서 계산가능한 task들의 집합으로 볼 수도 있음
  • 새로운 스테이지는 언제든지 작업 node들 사이에서 네트워크 통신이 요구될 때마다 시작됨(ex. 셔플)

Task

  • 하나의 stage는 여러개의 task로 이루어짐
  • task는 실행 계층에서 가장 작은 단위
  • 한 stage에서 모든 task들은 서로 다른 데이터를 대상으로 같은 코드를 실행
  • 한 task는 오직 하나의 executor에서만 실행됨
    • executor에서 여러개의 슬룻만큼 task를 동시에 수행할 수 있음
  • stage당 task의 개수는 해당 stage의 결과 파티션 개수와 같음

High Performance Spark를 읽고 간단히 정리해본 글입니다.
잘못된 정보나 오류가 있다면 언제나 지적 부탁드립니다 :)

why Spark?

  • Apache Spark는 범용 목적의 고성능 분산 처리 시스템
  • 하나 이상의 서버에서 처리할 수 있는 수준 이상의 대용량 데이터 처리 가능

최적화하지 않으면 매우 느리거나 불안정할 수 있다.

  • 최적화를 통해 동일 클러스터에서 동일 작업을 100배이상 빠르게 실행되도록 개선도 가능
  • 그러나 모든 case에 모든 기법을 적용할 수 있는 것은 아님
  • 특히 Spark는 유사한 종류의 다른 프레임워크에 비해 세밀한 설정이 가능하기 때문에 데이터의 구조와 형태에 대해 잘 파악하고 최적화를 적용하는 것이 중요
  • 시스템 및 데이터의 특징을 통해 Spark가 어떻게 동작하는지를 이해하는 것이 가장 어려운 수준의 데이터 engineering 이슈를 해결하는 방법

why Scala?

  • 스칼라는 파이썬보다 빠름
  • 스칼라는 자바 API보다 훨씬 사용하기 쉬움
    • 인라인 함수나, Spark API를 사용하는것이나 자바보다 더 편함
    • Spark Shell은 정말 강력한 도구인데, 자바에서는 미지원

DATA+AI SUMMIT 2021 에서 발표한 Monitor Apache Spark 3 on Kubernetes using Metrics and Plugins 세션을 나름대로 정리해보았습니다. 잘못된 정보나 오류가 있다면 언제나 지적 부탁드립니다 :)


Spark 모니터링 생태계

Web UI

  • 제일 쉽게 접근이 가능하면서 유용한 모니터링이 가능
  • job, stage, stage, SQL, streaming, ... 등등의 세부사항 확인 가능
  • default URL: http://driverhost:4040

Spark REST API + Spark Listener

  • task와 executor의 metric들을 확인 가능
  • web UI보다 더욱 세부적인 metric들을 확인 가능

Spark Metrics System

  • Dropwizard metric 라이브러리를 통해 구현
  • driver, executor의 여러 metric을 알 수 있음
    • active task 개수
    • job/stage complete/fail 여부
    • executor의 cpu 사용량
    • executor running time
    • gc time
    • shuffle metrics
    • I/O metrics
    • ...
  • default URL: http://driver:4040/metrics/json
  • executor의 경우 Spark의 local mode인경우에만 사용가능
  • prometheus에서 사용하고있음
  • pipeline
  • spark 2.x~3.0 에서 k8s를 사용하는 경우는 다음과 같이 사용
      $SPARK_HOME/bin/spark-shell \
          --conf "spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.GraphiteSink" \
          --conf "spark.metrics.conf.*.sink.graphite.host"="graphiteEndPoint_influxDB_hostname" \
          ...

Spark 3 metrics system with Plugins

  • K8S pod들의 resource 관련 metric 등을 모니터링 가능
    • nsight에서의 cpu 사용량, network in/out 같은 metric
    • RSS, Swap, Cache 등의 세부 metric 확인 가능
  • 사용자가 직접 커스터마이징해서 metric을 모니터링 가능

출처: https://databricks.com/session_na21/monitor-apache-spark-3-on-kubernetes-using-metrics-and-plugins

Spark 3.x로 올라오면서 Structured Streaming(이하 s.stream)이 지원되면서 group by, window 등 강력하면서도 다양한 기능을 streaming 에서도 사용할 수 있게 되었다.

기존 Discretized Streaming(이하 d.stream)과 다른 점 중 두드러지는 변화는 바로 checkpoint를 저장한다는 것이다.

checkpoint란?

streaming의 metadata 또는 kafka offset 등을 저장하는 데이터로 streaming app의 진행상황을 기록해둘 수 있다.

streaming app의 진행상황을 기록해두면, app이 종료되더라도 해당 지점부터 재수행이 가능하기 때문이다.

동작 원리나 여러가지 부가적인 기능 및 내용들이 많이 있지만, 자세한 것은 공식문서를 참고하길 바란다.

(안타깝게도 언제나 그렇듯 공식문서의 설명은 충분하지 않긴 하다)

공식문서: https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing

checkpoint 지정 방법

간단하다. SparkContext 또는 DataStreamWriter의 option을 주면 된다.

// 1. SparkContext에 지정하는 방법
val sparkConf = new SparkConf().setAppName("Checkpoint Example")
val checkpointDirectory: String = "/my/directory/checkpoint"

val ssc = new StreamingContext(sparkConf, Seconds(1))  
ssc.checkpoint(checkpointDirectory)
...


// 2. DataStreamWriter의 option으로 주는 방법
val dataset: Dataset[Schema] = getDatasetExample()
val checkpointDirectory: String = "/my/directory/checkpoint"

dataset.writeStream
    .option("checkpointLocation", checkpointDirectory)
    .trigger(ProcessingTime("10 seconds")
    .foreachBatch(saveFunction _)
    .queryName("Checkpoint Example")
    .start()

그렇다면 checkpoint는 어디다가 저장하는 것이 좋을까?

만약 물리장비에서 spark application을 운영한다면(아마도 spark on yarn), local directory에 저장하면 된다.
제일 간편하고, 성능이슈도 없다.

문제는 k8s환경에서(spark on k8s) 운영할 때인데, k8s환경의 local에 checkpoint를 저장하게 되면 pod이 내려갔을 때 checkpoint 정보를 모두 잃고 pod이 내려갔다가 올라오는 동안의 데이터가 유실되게 된다.

1. hdfs

hdfs는 좋은 선택지다.

그러나 secure hdfs를 사용하는 경우 i/o 성능이 뒷받침되지 않으면서 walCommit 시간이 늘어나서, micro batch 간격이 5~10초인 경우 lag이 점점 증가하는 경우가 발생할 수 있다.
(walCommit관련 내용은 추후에 또 다뤄보겠다)

secure hdfs를 사용하지 않는 경우 보안 이슈가 발생할 수 있으니 추천하기는 어렵다.

2. fuse

fuse는 사용하기는 편하겠지만 여전히 성능상의 이슈를 가지고 있다.
여러 pod에서 mount하기 때문에 write할 때 일종의 mutex를 획득해야 하는 경우가 많은데, 이 때 i/o 성능이 눈에 띄게 하락하게 된다.

역시나 walCommit 시간이 증가하는 이슈가 발생한다.

3. rbd

rbd는 거의 정답에 가까운 선택지다.
i/o 성능이 준수하고 옵션을 통해 persistence까지 보장이 가능하다. 필자도 현재 rbd를 checkpoint를 저장하는 용도로 사용하고 있으며, 아직 이슈가 된 적이 없다.

k8s환경에서 s.stream app을 운영하게 된다면, 성능 및 보안 측면에서 흠잡을 데 없는 rbd를 checkpoint directory로 사용하는 것을 강력하게 추천한다.

repartition vs coalesce

repartition()은 파티션의 개수를 늘리거나 줄일 수 있지만 coalesce()는 줄이는 것 만 가능하다.

성능 차이

파티션의 개수를 줄이는 경우에는 coalesce()를 사용하는 것이 더 좋은데, 그 이유는 바로 셔플때문이다.

coalesce()는 셔플을 수행하지 않기 때문에 성능 측면에서 repartition()보다 더 좋다.

구현

coalesce()함수가 구현되어있고, repartition()에서 coalesce()를 호출하는 형태로 구성되어있다.

코드 링크


여기까지는 많이 알려진 내용인데, 개발하다보니 순서를 보장한 상태에서 하나의 파일로 파티션을 묶어야 할 때가 있었다.
repartition()을 사용해서 묶었더니 정렬이 깨지거나 안되는 상황이 발생해서, 관련 테스트 결과를 기록한다.

  1. orderBy().repartition(1).write
    • 이 경우에는 partition 내부의 block 단위로 정렬이 된다.
    • 전체 파일이 정렬되지 않고 일부 정렬된 block들이 모여있는 상태처럼 보인다. (셔플이 수행되지 않은 것일까?)
    • 가끔 정렬이 되어있는 경우도 있었다. 왜인지 아시는 분은 댓글로 알려주시면 정말 감사드립니다..
  2. repartition(1).orderBy().write
    • 이게 어이없는 부분이었는데, 기존 100개의 partition을 읽고 해당 함수를 수행하니 100개의 partition으로 write했다.
    • 각각의 partition은 잘 정렬되어있었다.
    • 아마 1.에서의 block 단위가 100개의 partition으로 나뉘어진것이 아닌가 유추해본다.
  3. orderBy().coalesce(1).write
    • 제일 추천하는 방법이다.
    • 의도한대로 전체적으로 정렬이 되어있고, 1개의 partition으로 잘 묶여있다.
    • repartition(1)으로 묶었을 때와 비교해서 잘 정렬이 된 것으로 보아 repartition(1)으로 묶을때는 셔플이 수행된 것 처럼 보인다.
  4. coalesce(1).orderBy().write
    • 잘 동작한다. 직관적으로 1개로 묶고 정렬하는 방법이니 의도대로 잘 동작한다.
    • 역시나 2. 항목과 비교해보면 repartition() 방식이 아직도 잘 이해가 되지 않는다. 관련해서는 더 공부해봐야겠다.

coalesce() 동작 방식이 그림으로 잘 표현된 이미지가 있어서 첨부한다.

Spark-AI SUMMIT 2020 에서 발표한 Fine Tuning And Enhancing Performance Of Apache Spark Jobs 세션을 나름대로 정리해보았습니다. 잘못된 정보나 오류가 있다면 언제나 지적 부탁드립니다 :)

튜닝에 영향을 줄 수 있는 요소들

모든 요소는 무조건적으로 많을수록 좋아지지 않고 적을수록 좋아지지 않는다.

각 요소들은 다른 요소의 performance에 영향을 준다.

Stage를 skip하는 경우는 좋다. Tuning을 잘 한 결과물.

Core 개수

많을 경우

  • 병렬성을 증가시킬 수 있음
  • 스케쥴링 시간이 늘어남

메모리

많을 경우

  • GC 시간이 늘어남

파티션

너무 많을 경우

  • 스케쥴링 issue 발생

너무 적을 경우

  • 병렬성 감소

데이터 쏠림 현상 (skew)

  • 성능 저하 발생
  • 클러스터에서의 불균형 발생

확인 방법

  • spark UI에서는 모든 task를 보여주지 않음
  • 메모리를 과도하게 사용하는 경우 의심 가능
    • 첫 job을 시작할 때는 data ingestion을 수행하느라 메모리를 많이 사용하기 때문에 skew로 판단하지 말 것
  • executor에 heartbeat가 끊긴 경우
  • RDD의 파티션 크기를 확인

해결 방법

repartition은 별게없으면서도 가장 효과적인 방법

  • 파티션을 narrow하게 만들기 좋음
  • 파티션의 개수를 늘려야 할 때는 repartition
  • 파티션의 개수를 줄여야 할 때는 repartition대신 coalesce를 사용

ingestion

파티션을 읽을 때 JDBC를 통해 spark option 사용

  • partitionColumn
    • 쏠린 파티션이 아니어야 함 (primary key 처럼)
    • MOD를 활용해보는 것도 좋음
  • lower/upperBound
  • numPartitions
    • 파티션의 최대 개수 지정

Cache / Persist

  • transformation한 뒤 DF를 재사용
  • cache를 사용하지 않으면 DF가 매번 생성됨
  • 너무 많이 cache를 사용하게 되면
    • 메모리 performance 저하
    • GC가 커짐
    • 오히려 성능 감소 가능성이 있음

그 외의 trick들

seq.par.foreach 사용하기

  • seq.foreach 보다 나은 성능
  • 병렬성 증가
  • race condition 또는 비결정적 결과(non-deterministic results)가 도출될 수 있음
    • accumulator 또는 synchronization을 사용해서 방지

UDF의 사용은 가능한 자제하기

  • UDF의 동작 원리
    • 모든 row를 object로 비직렬화
    • lambda 적용
    • 다시 직렬화
  • 더 많은 garbage 생성

스케쥴링

Round Robin

  • 쓰레드를 통해 하나의 application에서 여러 job을 동시에 돌릴 수 있음
  • 리소스 utilization에 더 좋음
  • 디버그와 튜닝을 더 어렵게 만듦

spark.scheduler.modeFAIR 로 설정하면

  • job들을 priority에 따라 pool로 묶음
  • pool에 속하지 않은 thread로부터 생성된 job은 default pool로 들어감
  • pool, weight, minShare의 config는 fairscheduler.xml에 있음

직렬화

Java Serializer

  • 대부분의 타입에서의 default로 사용됨
  • 모든 class와 호환 가능
  • 매우 유연한 편
  • 느림

Kyro Serializer

  • RDD shuffling과 간단한 type의 default로 사용됨
  • 빠름
  • SparkConf 에서 org.apache.spark.serializer.KyroSerializer 설정으로 사용 가능
  • 사용하고 싶은 class를 등록해서 원하는 만큼 적용 가능

GC 튜닝

GC 튜닝이 필요할 때

  • 너무 많은 객체가 할당된 경우
  • task가 끝나기 전에 Full GC 발생하는 경우
  • 너무 많은 RDD 캐시가 있는 경우
  • GC가 너무 빈번히 일어나는 경우
  • GC에 너무 많은 시간이 소요되는 경우
  • heartbeat가 끊긴 경우

Parrallel GC (spark의 default GC)

  • Heap size가 Young / Old generation으로 나뉨

각 상황 별 추천 해결 방법

  • minor GC가 자주 발생하는 경우
    • Eden과 Survivor size를 늘려줌
  • major GC가 자주 발생하는 경우
    • 너무 많은 객체가 생성된 경우 young size를 늘려줌
    • old size를 늘려줌
    • spark.memory.fraction 을 줄여줌
  • task가 끝나기 전에 Full GC가 발생하는 경우
    • GC를 통해 young space를 clear하는 것을 추천
    • memory를 늘려줌

G1GC

Appilication scale이 크거나 Heap size가 8GB보다 큰 경우 추천

  • 다음 옵션으로 설정 가능
    • -XX:ParrallelGCThreads = n
    • -XX:ConcGCThreads = [n, 2n]
    • -XX:InitiatingHeapOccupancyPercent = 35

마치며..

  • 성능 튜닝은 계속 반복되는 것이 정상
  • 튜닝은 상황에 따라 매번 달라짐
  • Spark UI, 로그, 사용가능한 모니터링 툴 등을 반드시 활용할 것
  • 대충 아무 trick이나 쓰지 말고 성능 저하의 주요 원인에 집중할 것
  • 완벽한 튜닝을 하기란 거의 불가능하다

+ Recent posts