High Performance Spark를 읽고 간단히 정리해본 글입니다.

스키마의 기초

  • 스키마의 정보와 그로 인해 가능해지는 최적화는 스파크 SQL과 코어 스파크 사이의 핵심 차이점 중 하나
  • 스키마는 보통 스파크 SQL에 의해 자동으로 처리됨
    • 데이터를 로딩할 때 스키마를 추측
    • 부모 DataFrame과 적용된 트랜스포메이션에 의해 계산
  • printSchema() 를 통해 보통 많이 DataFrame의 스키마를 확인
  • 복합적인 SQL type
    스칼라 타입(scala type) SQL 타입 설명 예제
    Array[T] ArrayType(elementType, containsNull) 한 가지 타입으로 이루어진 배열, null을 하나라도 갖고 있으면 containsNull이 true Array[Int] ⇒ ArrayType(IntegerType, true)
    Map[K,V] MapType(elementType. valueType, valueContainsNull) 키/값의 맵. 값에 null이 존재하면 valueContinsNull이 true Map[String, Int] ⇒ MapType(StringType, IntegerType, true)
    case class StructType (List[StructField]) 다양한 타입들로 이루어진 이름이 붙은 필드들 - 케이스 클래스나 자바 빈과 유사 case class Panda(name: String, age: Int) ⇒ StructType(List(StructField("name", StringType, true), StructField("age", IntegerType, true)))

DataFrame과 Dataset에서의 데이터 표현

  • 텅스텐 데이터 타입은 스파크에 필요한 연산 형태에 따라 최적화된 특수한 메모리 데이터 구조 등의 장점
  • 텅스텐의 데이터 표현 방식은 자바/kryo 직렬화보다 용량이 훨씬 적음
    • RDD 33mb
    • DataFrame 7mb
  • 텅스텐은 자바 객체 기반이 아니기 때문에 온힙과 오프힙을 모두 지원
  • Spark SQL에서는 kryo 직렬화와 칼럼 기반 저장 포맷만 사용해도 저장비용 최소화가 가능

이전에 repartitioncoalesce를 비교해본 포스트가 있었다.
그 당시만 해도 성능적으로는 coalesce()가 무조건적으로 좋을 거라고 생각했었는데, 최근 그 반대의 경우를 만나서 포스팅한다.

repartition()에서는 shuffle을 수행하기 때문에 coalesce()가 성능적으로 더 좋다고 언급했었는데, 언제나 그런 것은 아니다.
shuffle을 수행하게 되면 executor들이 쌓아두었던 메모리를 refresh하게 되는데, spill이 발생할 정도로 메모리를 많이 사용하고 있었다면 shuffle을 통해 메모리를 한 번 비워주는 것이 성능적으로 도움이 될 수 있다.

물론 데이터에 따라 결과는 달라질 수 있지만, 무조건적으로 coalesce()repartition()보다 성능이 좋지 않은 경우가 생길 수 있다는 점을 강조하고 싶어서 추가로 포스팅을 하게 되었다.

역시 튜닝은 어렵다..

High Performance Spark를 읽고 간단히 정리해본 글입니다.
잘못된 정보나 오류가 있다면 언제나 지적 부탁드립니다 :)

  • 스파크는 스파크 JVM 위에서 연산을 수행할 뿐, 데이터 저장 솔루션은 아님
    • 로컬 모드에서는 하나의 단일 JVM에서 수행
  • RDD를 기반으로 만들어진 데이터 처리 프레임워크

Spark SQL

  • 스파크 SQL의 경우 스파크 코어와는 다른 질의 최적화 엔진을 갖고 있기 때문에 성능에 대해 스파크 코어와는 다르게 고려해야 함
    • 물론 스파크 코어에서 사용되는 최적화 기법도 활용가능한 부분이 있음

Spark Streaming

  • 데이터의 마이크로 배치에 대한 스트리밍 분석에 스파크 코어의 스케쥴링을 사용

일반적인 동작 원리

  • driver에서 동작하는 프로그램을 사용자가 작성
  • 스파크가 RDD를 executor에 저장
    • RDD를 구성하는 객체를 partition이라고 부름
    • 저장된 executor가 아니라 다른 executor에서 계산될 수도 있음
  • 스파크 클러스터 manager는 스파크 application에서 executor들을 실행하고 분산해주는 역할을 맡음
  • 스파크 실행 엔진은 연산을 위해 executor들에게 데이터를 분산해주고 실행을 요청
  • 스파크는 driver가 RDD의 데이터 처리 단계를 결정하자마자 연산을 수행하지 않고 최종 RDD를 계산해야하는 시점에 RDD transform을 수행함
    • 보통 write작업 또는 결과 데이터를 driver에 보낼 때
  • 스파크는 더 빠른 접근/반복 연산을 위해 스파크 application이 동작하는 동안 로드된 RDD를 executor의 메모리에 가지고 있을 수 있음
    • 로드된 RDD는 immutable하기 때문에 데이터 transform 결과는 기존 RDD가 아니라 새로운 RDD를 리턴함

Spark Job 스케쥴링

  • 기본적으로 고수준의 스파크 로직이 작성되어있는 드라이버와 익스큐터 프로세스로 구성
  • 스파크 프로그램 자체는 드라이버에서 수행되며 명령들을 익스큐터에 보냄

Spark Application

  • 각 익스큐터는 각자의 자바 가상 머신(JVM)을 가짐
  • SparkConf 객체의 변수들을 통해 SparkContext 를 생성
  • RDD는 application끼리 공유할 수 없음
    • 따라서 join같이 여러 RDD를 쓰는 경우에는 동일한 SparkContext 를 가져야함
  • SparkContext 가 시작할 때 발생하는 일
    • 드라이버가 클러스터 매니저에게 ping을 보냄
    • 클러스터 매니저가 다수의 익스큐터들을 띄움
    • 하나의 RDD가 파티션이 존재하는 익스큐터들에 의해 수행됨
    • 익스큐터는 여러개의 파티션을 가질 수 있지만, 하나의 파티션을 다른 익스큐터와 나눌 수는 없음

기본 스파크 스케쥴러

  • 기본적으로 FIFO방식이 사용됨
  • round-robin 방식도 제공하기는 함
  • SparkContext 에서 해당 액션들이 호출된 순서대로 작업을 수행

스파크 잡의 해부

  • 스파크 애플리케이션은 드라이버가 액션을 호출하기 전 까지는 아무것도 하지 않음
  • 각 액션마다 스파크 스케쥴러는 실행 그래프를 만들고 스파크 잡을 수행
  • 각 잡은 최종 RDD를 만들기 위해 필요한 데이터 변환의 단계를 의미하는 stage로 구성
  • 각 stage는 병렬 연산의 한 단위를 의미
  • stage는 익스큐터 위에서 수행되는 다수의 task로 구성

Spark Application Tree

  • Spark Application (SparkContext/SparkSession)
    • Spark Job (collect, saveToEs, write)
      • Stage (넓은 transformation. sort, groupByKey, reduceBy)
        • Task (좁은 transformation)

DAG

  • 각 스파크 잡의 DAG를 만들기 위해 RDD 종속성 정보를 사용
    • Spark API에서는 DAG 스케쥴러라고 표현

Spark Job

  • 스파크 잡은 스파크의 실행 구성 중 가장 높은 단계의 요소
  • 각 스파크 잡은 하나의 액션과 대응
    • 액션은 드라이버에서 호출
  • 액션을 개념적으로 구체화하는 한 가지 정의는 데이터를 RDD 바깥으로 놓는 행위라고 표현 가능
    • collect 처럼 드라이버로 데이터를 보내거나
    • saveToEs 처럼 다른 데이터 저장 공간에 저장하거나
  • 액션은 언제나 DAG의 마지막 leaf node가 됨
    • 더이상 다른 RDD를 호출할 수 없기 때문

Stage

  • transformation은 액션이 호출되기 전까지는 실행되지 않음
  • 각 stage는 스파크에서 넓은 transformation에 의해 생성되는 셔플 의존성에 대응함
  • 높은 차원에서 보면 하나의 stage는 다륵 익스큐터나 드라이버와 통신하지 않고 익스큐터에서 계산가능한 task들의 집합으로 볼 수도 있음
  • 새로운 스테이지는 언제든지 작업 node들 사이에서 네트워크 통신이 요구될 때마다 시작됨(ex. 셔플)

Task

  • 하나의 stage는 여러개의 task로 이루어짐
  • task는 실행 계층에서 가장 작은 단위
  • 한 stage에서 모든 task들은 서로 다른 데이터를 대상으로 같은 코드를 실행
  • 한 task는 오직 하나의 executor에서만 실행됨
    • executor에서 여러개의 슬룻만큼 task를 동시에 수행할 수 있음
  • stage당 task의 개수는 해당 stage의 결과 파티션 개수와 같음

High Performance Spark를 읽고 간단히 정리해본 글입니다.
잘못된 정보나 오류가 있다면 언제나 지적 부탁드립니다 :)

why Spark?

  • Apache Spark는 범용 목적의 고성능 분산 처리 시스템
  • 하나 이상의 서버에서 처리할 수 있는 수준 이상의 대용량 데이터 처리 가능

최적화하지 않으면 매우 느리거나 불안정할 수 있다.

  • 최적화를 통해 동일 클러스터에서 동일 작업을 100배이상 빠르게 실행되도록 개선도 가능
  • 그러나 모든 case에 모든 기법을 적용할 수 있는 것은 아님
  • 특히 Spark는 유사한 종류의 다른 프레임워크에 비해 세밀한 설정이 가능하기 때문에 데이터의 구조와 형태에 대해 잘 파악하고 최적화를 적용하는 것이 중요
  • 시스템 및 데이터의 특징을 통해 Spark가 어떻게 동작하는지를 이해하는 것이 가장 어려운 수준의 데이터 engineering 이슈를 해결하는 방법

why Scala?

  • 스칼라는 파이썬보다 빠름
  • 스칼라는 자바 API보다 훨씬 사용하기 쉬움
    • 인라인 함수나, Spark API를 사용하는것이나 자바보다 더 편함
    • Spark Shell은 정말 강력한 도구인데, 자바에서는 미지원

DATA+AI SUMMIT 2021 에서 발표한 Monitor Apache Spark 3 on Kubernetes using Metrics and Plugins 세션을 나름대로 정리해보았습니다. 잘못된 정보나 오류가 있다면 언제나 지적 부탁드립니다 :)


Spark 모니터링 생태계

Web UI

  • 제일 쉽게 접근이 가능하면서 유용한 모니터링이 가능
  • job, stage, stage, SQL, streaming, ... 등등의 세부사항 확인 가능
  • default URL: http://driverhost:4040

Spark REST API + Spark Listener

  • task와 executor의 metric들을 확인 가능
  • web UI보다 더욱 세부적인 metric들을 확인 가능

Spark Metrics System

  • Dropwizard metric 라이브러리를 통해 구현
  • driver, executor의 여러 metric을 알 수 있음
    • active task 개수
    • job/stage complete/fail 여부
    • executor의 cpu 사용량
    • executor running time
    • gc time
    • shuffle metrics
    • I/O metrics
    • ...
  • default URL: http://driver:4040/metrics/json
  • executor의 경우 Spark의 local mode인경우에만 사용가능
  • prometheus에서 사용하고있음
  • pipeline
  • spark 2.x~3.0 에서 k8s를 사용하는 경우는 다음과 같이 사용
      $SPARK_HOME/bin/spark-shell \
          --conf "spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.GraphiteSink" \
          --conf "spark.metrics.conf.*.sink.graphite.host"="graphiteEndPoint_influxDB_hostname" \
          ...

Spark 3 metrics system with Plugins

  • K8S pod들의 resource 관련 metric 등을 모니터링 가능
    • nsight에서의 cpu 사용량, network in/out 같은 metric
    • RSS, Swap, Cache 등의 세부 metric 확인 가능
  • 사용자가 직접 커스터마이징해서 metric을 모니터링 가능

출처: https://databricks.com/session_na21/monitor-apache-spark-3-on-kubernetes-using-metrics-and-plugins

  • Apache Kafka는 링크드인에서 개발된 분산 메세지 시스템이다.
  • 2011년 오픈소스로 공개되었고 → 14년 11월 confluent가 창립된다.
  • 대용량의 실시간 데이터 처리에 특화된 아키텍쳐로 설계되었다.

탄생 배경

  • 링크드인 사이트가 급속도로 성장하면서 발생하는 실시간 이슈를 처리할 필요성이 대두되었다.
  • 실시간 트랜잭션 처리와 비동기 처리가 동시에 이뤄지지만 통합된 전송 영역이 없어서 복잡도가 증가했다.
  • 데이터 파이프라인 관리 복잡도(하둡, DB 등)가 심했다.

시스템 목표

  • 프로듀서와 컨슈머의 분리
  • 기존 메세징 시스템과 같이 영구적인 메세지 데이터를 여러 컨슈머에게 허용
  • 높은 처리량을 위한 메세지 최적화
  • 데이터가 증가함에 따라 스케일 아웃이 가능한 시스템

카프카 개발 전/후 데이터 처리 시스템 비교

Before

  • end-to-end 연결 방식의 문제점 발생
  • 구조가 복잡함

After

  • 이벤트/데이터의 흐름을 중앙에서 일괄적으로 관리
  • 구조가 훨씬 간결해졌다.

카프카의 지향점

카프카를 메세지 전달의 중앙 플랫폼으로 두고 필요한 모든 DB 시스템(실시간 분석 시스템, 하둡, MSA 등등)과 연결된 파이프라인을 만드는 것을 지향

카프카의 특징 및 데이터 모델

동작방식 및 원리

메세지 시스템

  • producer(publisher)
    • 데이터 단위를 보내는 부분
  • consumer(subscriber)
    • 토픽이라는 메세지 저장소에 저장된 데이터를 가져가는 부분
  • 중앙에 메세지 시스템 서버를 두고 메세지를 주고 받는 형태의 구독/배포 모델

구독/배포 모델

  • 발신자의 메세지엔 수신자가 정해져있지 않은 상태로 발행한다.
  • 구독을 신청한 수신자만 메세지를 받을 수 있다.
  • 일반적인 형태의 통신은 통신에 참여하는 개체끼리 모두 연결해야 해서 구조가 복잡해지고 확장성이 낮다.
  • 구독/배포 모델은 구조도 간단하고 확장성이 높다.
  • 데이터 유실이 적다.
  • 메세징 시스템이 중간에 있어서 전달 속도가 빠르지 않다.
    • 직접 전송하지 않기 때문
    • 중앙 카프카 시스템이 배포 메세지를 받고 구독 수신자에게 뿌려주는 형태로 구성

배경

  • 하둡의 서브 프로젝트중의 하나로 탄생했다.
  • 대용량 분산 처리 애플리케이션인 하둡은 중앙에서 분산 애플리케이션을 관리하는 코디네이션 애플리케이션이 필요했기 때문에 서브 프로젝트로 주키퍼 개발을 진행했었다.
  • 2011년 1월 하둡 서브 프로젝트 → 아파치의 탑 레벨 프로젝트로 승격한다. (wow)
  • storm, hbase, NiFi 등 많은 애플리케이션에서 사용중이다.

소개

  • 구성 정보를 유지 관리하고 이름을 지정하여 분산 동기화를 제공하고 그룹 서비스를 제공하는 중앙 집중식 서비스
    • 분산 코디네이터
  • 분산 시스템을 구성할 때 고려해야 할 사항
    • 네트워크의 신뢰성
    • 지연
    • 대역폭
    • 안정성
    • 토폴로지
    • 전송 비용
    • 네트워크 유형
  • 분산 서버들 간의 정보 공유, 동기화 분산 서버들의 상태 확인 또한 필요하다.
  • 역할
    • 그룹 멤버십
    • 잠금 제어
    • 구독/배포
    • 리더 선정
    • 동기화

카프카를 위한 주키퍼

데이터 모델

  • 데이터를 디렉토리 구조로 저장하고, 데이터가 변경되면 클라이언트에게 어떤 노드가 변경되었는지 콜백을 통해서 알려준다.
  • 클러스터 구성원 간의 데이터 공유가 가능하다(중앙화된 분산 코디네이터 클러스터).
  • 데이터 스토리지(영속/임시 데이터), 클러스터 멤버십 관리를 통한 데이터 변경 통지(watch), 마스터 및 분산 락 등에 활용되는 시퀀스 노드 등을 제공한다.

노드

  • persistent(영속) node
    • 한번 저장되고 나면 세션이 종료되어도 삭제되지 않고 유지되는 노드
    • 명시적으로 삭제하지 않는 한 해당 데이터는 삭제 및 변경되지 않는다.
  • ephemeral(임시) node
    • 특정 노드를 생성한 세션이 유효한 동안 데이터가 유효한 노드
    • 주키퍼 서버에 접속한 클라이언트가 특정 노드를 emphermeral node로 생성했다면 세션이 끊어질 때 해당 노드는 자동으로 삭제된다.
    • 클라이언트가 동작하는지의 여부를 쉽게 판단 가능하다.
  • sequence(순서) node
    • 노드 생성 시 sequence number가 자동으로 붙는 노드
    • 이 기능을 활용하여 분산 락 등을 구현할 수 있다

Watcher

  • 주키퍼 클라이언트가 특정 znode에 watch를 걸어놓을 경우 해당 znode가 변경되었을 때 클라이언트로 콜백 호출을 날려 해당 클라이언트가 변경됨을 알 수 있다.

znode

  • 주키퍼가 제공해주는 파일시스템에 저장되는 파일 하나를 znode라고 한다.
  • unix의 파일시스템처럼 node간에 hierarchy namespace를 가진다.
  • /(슬래시)를 사용
  • 기존 파일 시스템과 다르게 주키퍼는 file과 directory의 개념이 없어 znode하나만 쓰인다.

보통 zookeeper는 분산 작업을 제어하기 위한 트리 형태의 데이터 저장소로 kafka broker를 관리하는 용도로 많이 사용한다.

하지만 이런 기능 외에도 semaphore를 관리하는 서버의 용도로도 사용할 수 있다.

예제는 다음과 같다.

val zookeeperClient: CuratorFramework = CuratorFrameworkFactory.newClient(s"${conf.host}:${conf.port}", new ExponentialBackoffRetry(1000, 3))
client.start()

val maxSemaphoreNum = 5 // 5개 까지 허용
val semaphore: InterProcessSemaphoreV2 = new InterProcessSemaphoreV2(zookeeperClient, "/my/semaphore/directory", ${maxSemaphoreNum})

val lease: Lease = semaphore.acquire()

... // semaphore 획득 후 수행할 task

semaphore.returnLease(lease)
zookeeperClient.close()

웹 기반 노트북을 사용한 데이터 시각화 tool

  • spark, sparkSQL의 결과를 바로 차트로 그릴 수 있음
  • 물론 다른 언어와의 호환성이 매우 뛰어남

기존의 workflow

  • 다양한 제품을 조합하여 데이터 분석을 하는 것이 일반적
  • 많은 엔지니어링 필요
  • 다방면에 경험 많은 능력자의 전유물
  • 파이프라인이 복잡하기 때문에 고장나기 쉽고 유지보수가 어려움

Zeppelin!

spark를 통해 데이터 정제, 처리, 요약 데이터 시각화, 고급 분석까지 전부 spark와 zeppelin으로 해결 가능!

Notebook

  • 소스코드 작성, 수정, 자동 저장, 실행
  • scala(spark), spark SQL, markdown 등 지원

Visualization

  • spark SQL 수행 결과를 table, line chart, pie chart 등 다양한 형태로 시각화
  • spark의 좋은 성능 덕분에 대부분 코드가 즉시 실행되므로 interactive하게 데이터를 다룰 수 있음
  • HTML을 표현 가능하므로 테이블에 이미지를 표시하거나 link를 넣거나 하는 등의 동작 가능

분명 Serializer에 `id` 필드를 명시했다!

AttributeError at /users
Got AttributeError when attempting to get a value for field `id` on serializer `UserSerializer`.
The serializer field might be named incorrectly and not match any attribute or key on the `QuerySet` instance.
Original exception text was: 'QuerySet' object has no attribute 'id'.
Request Method:    GET
Request URL:    http://127.0.0.1:8000/users
Django Version:    3.2.7
Exception Type:    AttributeError
Exception Value:    
Got AttributeError when attempting to get a value for field `id` on serializer `UserSerializer`.
The serializer field might be named incorrectly and not match any attribute or key on the `QuerySet` instance.
Original exception text was: 'QuerySet' object has no attribute 'id'.
  • serializer는 한 개의 객체만 이해할 수 있고 리스트는 이해할 수 없기 때문이다.
  • objects.all() 은 객체들의 리스트를 리턴하므로 many=True 옵션을 추가해줌으로써 해결
...
users = User.objects.all()
serializer = UserSerializer(users, many=True)
...

+ Recent posts