Spark 3.x로 올라오면서 Structured Streaming(이하 s.stream)이 지원되면서 group by, window 등 강력하면서도 다양한 기능을 streaming 에서도 사용할 수 있게 되었다.

기존 Discretized Streaming(이하 d.stream)과 다른 점 중 두드러지는 변화는 바로 checkpoint를 저장한다는 것이다.

checkpoint란?

streaming의 metadata 또는 kafka offset 등을 저장하는 데이터로 streaming app의 진행상황을 기록해둘 수 있다.

streaming app의 진행상황을 기록해두면, app이 종료되더라도 해당 지점부터 재수행이 가능하기 때문이다.

동작 원리나 여러가지 부가적인 기능 및 내용들이 많이 있지만, 자세한 것은 공식문서를 참고하길 바란다.

(안타깝게도 언제나 그렇듯 공식문서의 설명은 충분하지 않긴 하다)

공식문서: https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing

checkpoint 지정 방법

간단하다. SparkContext 또는 DataStreamWriter의 option을 주면 된다.

// 1. SparkContext에 지정하는 방법
val sparkConf = new SparkConf().setAppName("Checkpoint Example")
val checkpointDirectory: String = "/my/directory/checkpoint"

val ssc = new StreamingContext(sparkConf, Seconds(1))  
ssc.checkpoint(checkpointDirectory)
...


// 2. DataStreamWriter의 option으로 주는 방법
val dataset: Dataset[Schema] = getDatasetExample()
val checkpointDirectory: String = "/my/directory/checkpoint"

dataset.writeStream
    .option("checkpointLocation", checkpointDirectory)
    .trigger(ProcessingTime("10 seconds")
    .foreachBatch(saveFunction _)
    .queryName("Checkpoint Example")
    .start()

그렇다면 checkpoint는 어디다가 저장하는 것이 좋을까?

만약 물리장비에서 spark application을 운영한다면(아마도 spark on yarn), local directory에 저장하면 된다.
제일 간편하고, 성능이슈도 없다.

문제는 k8s환경에서(spark on k8s) 운영할 때인데, k8s환경의 local에 checkpoint를 저장하게 되면 pod이 내려갔을 때 checkpoint 정보를 모두 잃고 pod이 내려갔다가 올라오는 동안의 데이터가 유실되게 된다.

1. hdfs

hdfs는 좋은 선택지다.

그러나 secure hdfs를 사용하는 경우 i/o 성능이 뒷받침되지 않으면서 walCommit 시간이 늘어나서, micro batch 간격이 5~10초인 경우 lag이 점점 증가하는 경우가 발생할 수 있다.
(walCommit관련 내용은 추후에 또 다뤄보겠다)

secure hdfs를 사용하지 않는 경우 보안 이슈가 발생할 수 있으니 추천하기는 어렵다.

2. fuse

fuse는 사용하기는 편하겠지만 여전히 성능상의 이슈를 가지고 있다.
여러 pod에서 mount하기 때문에 write할 때 일종의 mutex를 획득해야 하는 경우가 많은데, 이 때 i/o 성능이 눈에 띄게 하락하게 된다.

역시나 walCommit 시간이 증가하는 이슈가 발생한다.

3. rbd

rbd는 거의 정답에 가까운 선택지다.
i/o 성능이 준수하고 옵션을 통해 persistence까지 보장이 가능하다. 필자도 현재 rbd를 checkpoint를 저장하는 용도로 사용하고 있으며, 아직 이슈가 된 적이 없다.

k8s환경에서 s.stream app을 운영하게 된다면, 성능 및 보안 측면에서 흠잡을 데 없는 rbd를 checkpoint directory로 사용하는 것을 강력하게 추천한다.

repartition vs coalesce

repartition()은 파티션의 개수를 늘리거나 줄일 수 있지만 coalesce()는 줄이는 것 만 가능하다.

성능 차이

파티션의 개수를 줄이는 경우에는 coalesce()를 사용하는 것이 더 좋은데, 그 이유는 바로 셔플때문이다.

coalesce()는 셔플을 수행하지 않기 때문에 성능 측면에서 repartition()보다 더 좋다.

구현

coalesce()함수가 구현되어있고, repartition()에서 coalesce()를 호출하는 형태로 구성되어있다.

코드 링크


여기까지는 많이 알려진 내용인데, 개발하다보니 순서를 보장한 상태에서 하나의 파일로 파티션을 묶어야 할 때가 있었다.
repartition()을 사용해서 묶었더니 정렬이 깨지거나 안되는 상황이 발생해서, 관련 테스트 결과를 기록한다.

  1. orderBy().repartition(1).write
    • 이 경우에는 partition 내부의 block 단위로 정렬이 된다.
    • 전체 파일이 정렬되지 않고 일부 정렬된 block들이 모여있는 상태처럼 보인다. (셔플이 수행되지 않은 것일까?)
    • 가끔 정렬이 되어있는 경우도 있었다. 왜인지 아시는 분은 댓글로 알려주시면 정말 감사드립니다..
  2. repartition(1).orderBy().write
    • 이게 어이없는 부분이었는데, 기존 100개의 partition을 읽고 해당 함수를 수행하니 100개의 partition으로 write했다.
    • 각각의 partition은 잘 정렬되어있었다.
    • 아마 1.에서의 block 단위가 100개의 partition으로 나뉘어진것이 아닌가 유추해본다.
  3. orderBy().coalesce(1).write
    • 제일 추천하는 방법이다.
    • 의도한대로 전체적으로 정렬이 되어있고, 1개의 partition으로 잘 묶여있다.
    • repartition(1)으로 묶었을 때와 비교해서 잘 정렬이 된 것으로 보아 repartition(1)으로 묶을때는 셔플이 수행된 것 처럼 보인다.
  4. coalesce(1).orderBy().write
    • 잘 동작한다. 직관적으로 1개로 묶고 정렬하는 방법이니 의도대로 잘 동작한다.
    • 역시나 2. 항목과 비교해보면 repartition() 방식이 아직도 잘 이해가 되지 않는다. 관련해서는 더 공부해봐야겠다.

coalesce() 동작 방식이 그림으로 잘 표현된 이미지가 있어서 첨부한다.

pipenv 환경에서 mysqlclient를 설치하려다 만난 에러들을 해결한 내용을 정리한다.

pipenv install mysqlclient

ld: library not found for -lzstd
clang: error: linker command failed with exit code 1 (use -v to see invocation)
error: command 'xcrun' failed with exit status 1
----------------------------------------
ERROR: Failed building wheel for mysqlclient
ERROR: Command errored out with exit status 1:
  • xcrun을 못찾아서 발생한 에러다.
  • xcrun이란 xcode에서 사용하는 스크립트 실행 명령어로, xcode를 사용하지 않더라도 mac OS를 사용하는 경우 종종 사용된다.
  • xcode-select --install를 수행하면 해결되지만, 다음과 같은 에러를 또 만날 수 있다.
    xcode-select: error: command line tools are already installed, use "Software Update" to install updates

xcode-select: error: command line tools are already installed, use "Software Update" to install updates 해결 방법

  • https://developer.apple.com/download/all/
  • 위 링크에서 자신의 mac OS보다 낮거나 같은 버전의 Command Line Tools를 설치해준다.
  • 필자의 경우 10.15 버전의 mac OS를 사용중이어서, 10.15로 검색했지만 아무것도 나오지 않았다.
  • 그래서 10.14로 검색하니 다음과 같이 호환되는 버전이 나와서 해당 버전으로 설치했다.
    image
  • xcode를 사용하지 않는 필자로썬 뒤에 for Xcode가 거슬리지만, 무시하고 설치하자.

설치 후

  • 설치하고 나면 기존에 사용중인 개발 환경에서 몇몇 변화가 있을 수 있다.
  • 필자의 경우
    • pipenv관련 설정이 날라가거나 초기화되었다.
    • 기본 python3 사용 버전이 3.7이었는데, 3.9로 업데이트되었다.
  • 다음과 같이 해결했다.
    • zsh를 사용하고 있었기 때문에, ~/.zshrc에 다음 설정을 추가했다.
      export PYENV_ROOT={python3 기본 경로} // $ whereis python3의 결과
      export PIPENV_PYTHON=$PYENV_ROOT/shims/python
    • Pipfile에서 python 3.7버전을 사용중이었지만, 3.9로 수정해주었다.
  • 이후 pipenv 환경 삭제 후 재생성해주었다.

원래 하려던 건 mysqlclient 설치였다.

  • 참 길게 돌고돌았다. 이제 다음 명령을 수행해보자.
    $ pipenv install mysqlclient
    ... ✔ Success! ...
  • mysqlclient 설치가 완료되었다.

yum 사용

  • AMI2 이미지에는 apt-get 명령어가 먹히지 않는다. sudo yum update -yyum 명령을 업데이트 후 사용하자.
$ sudo yum update -y
$ sudo yum install python3
Loaded plugins: priorities, update-motd, upgrade-helper
No package python3 available.
Error: Nothing to do

image

aws 자체 이미지를 사용하자

  • python3 오타 없이 잘 썼는데 없다니, 말도 안된다.
  • 말이 된다. 기준이 aws이기 때문에 패키지 이름이 다르다. 다음 명령으로 aws용으로 생성된 python3 패키지를 확인할 수 있다.
$ sudo yum list | grep python3
...
(대충 python3가 포함된 yum 패키지 리스트)
...

// 해당 패키지만 설치를 원한다면 해당 패키지를, 특정 버전을 설치하고 싶다면 버전까지만 입력해도 무방하다
$ sudo yum install python37 // python3.7 설치 예시
$ python3 --version
Python 3.7.x
  • 설치하면 python3를 정상적으로 사용할 수 있다.
  • 유감스럽게도 pip3는 pip-3.7 이런식으로 걸어준다. 직접 pip3로 소프트 링크를 걸어주자.
$ cd /usr/bin
$ sudo ln -sfn pip-3.7 pip3 // pip-3.x 버전은 본인이 설치한 버전을 쓰면 된다
$ pip3 list
pip (9.0.3)
setuptools (36.2.7)
You are using pip version 9.0.3, however version 21.1.3 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.

aws에서 새로운 인스턴스를 만들고 git을 연동하다 보면 꼭 마주치는 에러다.

[ec2-user@doolda]$ git push origin master
Permission denied (publickey).
fatal: Could not read from remote repository.

Please make sure you have the correct access rights
and the repository exists.

간단히 git에 ssh key를 추가해줌으로써 손쉽게 해결할 수 있다.

ssh key 생성하기

$ ssh-keygen -t rsa -C "{your-github-email}"
/ / 이후 엔터 3번

정상적으로 생성되었다면 public key, fingerprint 등 이것저것 만들었다는 안내 문구가 출력될 것이다.

다음 명령을 통해 우리가 필요한 ssh key를 출력한다.

$ cat ~/.ssh/id_rsa.pub

출력된 정상적인 ssh key는 ssh-rsa로 시작해서 위에서 입력한 {your-github-email}로 끝날 것이다.

ssh-rsa 부터 이메일 끝까지 모두 복사해둔다.

github에 생성한 ssh key 등록하기

  1. 본인의 github 계정의 Settings로 들어간다.
    image
  2. `SSH and GPG keys 탭으로 이동
    image
  3. New SSH Key 클릭
    image
  4. Key에다 복사해둔 ssh key를 입력하고 Title은 맘에 드는 이름을 지어주자.
    image
    Add SSH Key를 누르고 비밀번호를 입력하라는 창이 뜨면 입력해준다.

끝났다. 이제 다시 (아마도)git push origin master 명령을 수행해보면 정상적으로 동작할 것이다.

+ Recent posts