High Performance Spark를 읽고 간단히 정리해본 글입니다.

스키마의 기초

  • 스키마의 정보와 그로 인해 가능해지는 최적화는 스파크 SQL과 코어 스파크 사이의 핵심 차이점 중 하나
  • 스키마는 보통 스파크 SQL에 의해 자동으로 처리됨
    • 데이터를 로딩할 때 스키마를 추측
    • 부모 DataFrame과 적용된 트랜스포메이션에 의해 계산
  • printSchema() 를 통해 보통 많이 DataFrame의 스키마를 확인
  • 복합적인 SQL type
    스칼라 타입(scala type) SQL 타입 설명 예제
    Array[T] ArrayType(elementType, containsNull) 한 가지 타입으로 이루어진 배열, null을 하나라도 갖고 있으면 containsNull이 true Array[Int] ⇒ ArrayType(IntegerType, true)
    Map[K,V] MapType(elementType. valueType, valueContainsNull) 키/값의 맵. 값에 null이 존재하면 valueContinsNull이 true Map[String, Int] ⇒ MapType(StringType, IntegerType, true)
    case class StructType (List[StructField]) 다양한 타입들로 이루어진 이름이 붙은 필드들 - 케이스 클래스나 자바 빈과 유사 case class Panda(name: String, age: Int) ⇒ StructType(List(StructField("name", StringType, true), StructField("age", IntegerType, true)))

DataFrame과 Dataset에서의 데이터 표현

  • 텅스텐 데이터 타입은 스파크에 필요한 연산 형태에 따라 최적화된 특수한 메모리 데이터 구조 등의 장점
  • 텅스텐의 데이터 표현 방식은 자바/kryo 직렬화보다 용량이 훨씬 적음
    • RDD 33mb
    • DataFrame 7mb
  • 텅스텐은 자바 객체 기반이 아니기 때문에 온힙과 오프힙을 모두 지원
  • Spark SQL에서는 kryo 직렬화와 칼럼 기반 저장 포맷만 사용해도 저장비용 최소화가 가능

이전에 repartitioncoalesce를 비교해본 포스트가 있었다.
그 당시만 해도 성능적으로는 coalesce()가 무조건적으로 좋을 거라고 생각했었는데, 최근 그 반대의 경우를 만나서 포스팅한다.

repartition()에서는 shuffle을 수행하기 때문에 coalesce()가 성능적으로 더 좋다고 언급했었는데, 언제나 그런 것은 아니다.
shuffle을 수행하게 되면 executor들이 쌓아두었던 메모리를 refresh하게 되는데, spill이 발생할 정도로 메모리를 많이 사용하고 있었다면 shuffle을 통해 메모리를 한 번 비워주는 것이 성능적으로 도움이 될 수 있다.

물론 데이터에 따라 결과는 달라질 수 있지만, 무조건적으로 coalesce()repartition()보다 성능이 좋지 않은 경우가 생길 수 있다는 점을 강조하고 싶어서 추가로 포스팅을 하게 되었다.

역시 튜닝은 어렵다..

High Performance Spark를 읽고 간단히 정리해본 글입니다.
잘못된 정보나 오류가 있다면 언제나 지적 부탁드립니다 :)

  • 스파크는 스파크 JVM 위에서 연산을 수행할 뿐, 데이터 저장 솔루션은 아님
    • 로컬 모드에서는 하나의 단일 JVM에서 수행
  • RDD를 기반으로 만들어진 데이터 처리 프레임워크

Spark SQL

  • 스파크 SQL의 경우 스파크 코어와는 다른 질의 최적화 엔진을 갖고 있기 때문에 성능에 대해 스파크 코어와는 다르게 고려해야 함
    • 물론 스파크 코어에서 사용되는 최적화 기법도 활용가능한 부분이 있음

Spark Streaming

  • 데이터의 마이크로 배치에 대한 스트리밍 분석에 스파크 코어의 스케쥴링을 사용

일반적인 동작 원리

  • driver에서 동작하는 프로그램을 사용자가 작성
  • 스파크가 RDD를 executor에 저장
    • RDD를 구성하는 객체를 partition이라고 부름
    • 저장된 executor가 아니라 다른 executor에서 계산될 수도 있음
  • 스파크 클러스터 manager는 스파크 application에서 executor들을 실행하고 분산해주는 역할을 맡음
  • 스파크 실행 엔진은 연산을 위해 executor들에게 데이터를 분산해주고 실행을 요청
  • 스파크는 driver가 RDD의 데이터 처리 단계를 결정하자마자 연산을 수행하지 않고 최종 RDD를 계산해야하는 시점에 RDD transform을 수행함
    • 보통 write작업 또는 결과 데이터를 driver에 보낼 때
  • 스파크는 더 빠른 접근/반복 연산을 위해 스파크 application이 동작하는 동안 로드된 RDD를 executor의 메모리에 가지고 있을 수 있음
    • 로드된 RDD는 immutable하기 때문에 데이터 transform 결과는 기존 RDD가 아니라 새로운 RDD를 리턴함

Spark Job 스케쥴링

  • 기본적으로 고수준의 스파크 로직이 작성되어있는 드라이버와 익스큐터 프로세스로 구성
  • 스파크 프로그램 자체는 드라이버에서 수행되며 명령들을 익스큐터에 보냄

Spark Application

  • 각 익스큐터는 각자의 자바 가상 머신(JVM)을 가짐
  • SparkConf 객체의 변수들을 통해 SparkContext 를 생성
  • RDD는 application끼리 공유할 수 없음
    • 따라서 join같이 여러 RDD를 쓰는 경우에는 동일한 SparkContext 를 가져야함
  • SparkContext 가 시작할 때 발생하는 일
    • 드라이버가 클러스터 매니저에게 ping을 보냄
    • 클러스터 매니저가 다수의 익스큐터들을 띄움
    • 하나의 RDD가 파티션이 존재하는 익스큐터들에 의해 수행됨
    • 익스큐터는 여러개의 파티션을 가질 수 있지만, 하나의 파티션을 다른 익스큐터와 나눌 수는 없음

기본 스파크 스케쥴러

  • 기본적으로 FIFO방식이 사용됨
  • round-robin 방식도 제공하기는 함
  • SparkContext 에서 해당 액션들이 호출된 순서대로 작업을 수행

스파크 잡의 해부

  • 스파크 애플리케이션은 드라이버가 액션을 호출하기 전 까지는 아무것도 하지 않음
  • 각 액션마다 스파크 스케쥴러는 실행 그래프를 만들고 스파크 잡을 수행
  • 각 잡은 최종 RDD를 만들기 위해 필요한 데이터 변환의 단계를 의미하는 stage로 구성
  • 각 stage는 병렬 연산의 한 단위를 의미
  • stage는 익스큐터 위에서 수행되는 다수의 task로 구성

Spark Application Tree

  • Spark Application (SparkContext/SparkSession)
    • Spark Job (collect, saveToEs, write)
      • Stage (넓은 transformation. sort, groupByKey, reduceBy)
        • Task (좁은 transformation)

DAG

  • 각 스파크 잡의 DAG를 만들기 위해 RDD 종속성 정보를 사용
    • Spark API에서는 DAG 스케쥴러라고 표현

Spark Job

  • 스파크 잡은 스파크의 실행 구성 중 가장 높은 단계의 요소
  • 각 스파크 잡은 하나의 액션과 대응
    • 액션은 드라이버에서 호출
  • 액션을 개념적으로 구체화하는 한 가지 정의는 데이터를 RDD 바깥으로 놓는 행위라고 표현 가능
    • collect 처럼 드라이버로 데이터를 보내거나
    • saveToEs 처럼 다른 데이터 저장 공간에 저장하거나
  • 액션은 언제나 DAG의 마지막 leaf node가 됨
    • 더이상 다른 RDD를 호출할 수 없기 때문

Stage

  • transformation은 액션이 호출되기 전까지는 실행되지 않음
  • 각 stage는 스파크에서 넓은 transformation에 의해 생성되는 셔플 의존성에 대응함
  • 높은 차원에서 보면 하나의 stage는 다륵 익스큐터나 드라이버와 통신하지 않고 익스큐터에서 계산가능한 task들의 집합으로 볼 수도 있음
  • 새로운 스테이지는 언제든지 작업 node들 사이에서 네트워크 통신이 요구될 때마다 시작됨(ex. 셔플)

Task

  • 하나의 stage는 여러개의 task로 이루어짐
  • task는 실행 계층에서 가장 작은 단위
  • 한 stage에서 모든 task들은 서로 다른 데이터를 대상으로 같은 코드를 실행
  • 한 task는 오직 하나의 executor에서만 실행됨
    • executor에서 여러개의 슬룻만큼 task를 동시에 수행할 수 있음
  • stage당 task의 개수는 해당 stage의 결과 파티션 개수와 같음

High Performance Spark를 읽고 간단히 정리해본 글입니다.
잘못된 정보나 오류가 있다면 언제나 지적 부탁드립니다 :)

why Spark?

  • Apache Spark는 범용 목적의 고성능 분산 처리 시스템
  • 하나 이상의 서버에서 처리할 수 있는 수준 이상의 대용량 데이터 처리 가능

최적화하지 않으면 매우 느리거나 불안정할 수 있다.

  • 최적화를 통해 동일 클러스터에서 동일 작업을 100배이상 빠르게 실행되도록 개선도 가능
  • 그러나 모든 case에 모든 기법을 적용할 수 있는 것은 아님
  • 특히 Spark는 유사한 종류의 다른 프레임워크에 비해 세밀한 설정이 가능하기 때문에 데이터의 구조와 형태에 대해 잘 파악하고 최적화를 적용하는 것이 중요
  • 시스템 및 데이터의 특징을 통해 Spark가 어떻게 동작하는지를 이해하는 것이 가장 어려운 수준의 데이터 engineering 이슈를 해결하는 방법

why Scala?

  • 스칼라는 파이썬보다 빠름
  • 스칼라는 자바 API보다 훨씬 사용하기 쉬움
    • 인라인 함수나, Spark API를 사용하는것이나 자바보다 더 편함
    • Spark Shell은 정말 강력한 도구인데, 자바에서는 미지원

DATA+AI SUMMIT 2021 에서 발표한 Monitor Apache Spark 3 on Kubernetes using Metrics and Plugins 세션을 나름대로 정리해보았습니다. 잘못된 정보나 오류가 있다면 언제나 지적 부탁드립니다 :)


Spark 모니터링 생태계

Web UI

  • 제일 쉽게 접근이 가능하면서 유용한 모니터링이 가능
  • job, stage, stage, SQL, streaming, ... 등등의 세부사항 확인 가능
  • default URL: http://driverhost:4040

Spark REST API + Spark Listener

  • task와 executor의 metric들을 확인 가능
  • web UI보다 더욱 세부적인 metric들을 확인 가능

Spark Metrics System

  • Dropwizard metric 라이브러리를 통해 구현
  • driver, executor의 여러 metric을 알 수 있음
    • active task 개수
    • job/stage complete/fail 여부
    • executor의 cpu 사용량
    • executor running time
    • gc time
    • shuffle metrics
    • I/O metrics
    • ...
  • default URL: http://driver:4040/metrics/json
  • executor의 경우 Spark의 local mode인경우에만 사용가능
  • prometheus에서 사용하고있음
  • pipeline
  • spark 2.x~3.0 에서 k8s를 사용하는 경우는 다음과 같이 사용
      $SPARK_HOME/bin/spark-shell \
          --conf "spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.GraphiteSink" \
          --conf "spark.metrics.conf.*.sink.graphite.host"="graphiteEndPoint_influxDB_hostname" \
          ...

Spark 3 metrics system with Plugins

  • K8S pod들의 resource 관련 metric 등을 모니터링 가능
    • nsight에서의 cpu 사용량, network in/out 같은 metric
    • RSS, Swap, Cache 등의 세부 metric 확인 가능
  • 사용자가 직접 커스터마이징해서 metric을 모니터링 가능

출처: https://databricks.com/session_na21/monitor-apache-spark-3-on-kubernetes-using-metrics-and-plugins

+ Recent posts