• Apache Kafka는 링크드인에서 개발된 분산 메세지 시스템이다.
  • 2011년 오픈소스로 공개되었고 → 14년 11월 confluent가 창립된다.
  • 대용량의 실시간 데이터 처리에 특화된 아키텍쳐로 설계되었다.

탄생 배경

  • 링크드인 사이트가 급속도로 성장하면서 발생하는 실시간 이슈를 처리할 필요성이 대두되었다.
  • 실시간 트랜잭션 처리와 비동기 처리가 동시에 이뤄지지만 통합된 전송 영역이 없어서 복잡도가 증가했다.
  • 데이터 파이프라인 관리 복잡도(하둡, DB 등)가 심했다.

시스템 목표

  • 프로듀서와 컨슈머의 분리
  • 기존 메세징 시스템과 같이 영구적인 메세지 데이터를 여러 컨슈머에게 허용
  • 높은 처리량을 위한 메세지 최적화
  • 데이터가 증가함에 따라 스케일 아웃이 가능한 시스템

카프카 개발 전/후 데이터 처리 시스템 비교

Before

  • end-to-end 연결 방식의 문제점 발생
  • 구조가 복잡함

After

  • 이벤트/데이터의 흐름을 중앙에서 일괄적으로 관리
  • 구조가 훨씬 간결해졌다.

카프카의 지향점

카프카를 메세지 전달의 중앙 플랫폼으로 두고 필요한 모든 DB 시스템(실시간 분석 시스템, 하둡, MSA 등등)과 연결된 파이프라인을 만드는 것을 지향

카프카의 특징 및 데이터 모델

동작방식 및 원리

메세지 시스템

  • producer(publisher)
    • 데이터 단위를 보내는 부분
  • consumer(subscriber)
    • 토픽이라는 메세지 저장소에 저장된 데이터를 가져가는 부분
  • 중앙에 메세지 시스템 서버를 두고 메세지를 주고 받는 형태의 구독/배포 모델

구독/배포 모델

  • 발신자의 메세지엔 수신자가 정해져있지 않은 상태로 발행한다.
  • 구독을 신청한 수신자만 메세지를 받을 수 있다.
  • 일반적인 형태의 통신은 통신에 참여하는 개체끼리 모두 연결해야 해서 구조가 복잡해지고 확장성이 낮다.
  • 구독/배포 모델은 구조도 간단하고 확장성이 높다.
  • 데이터 유실이 적다.
  • 메세징 시스템이 중간에 있어서 전달 속도가 빠르지 않다.
    • 직접 전송하지 않기 때문
    • 중앙 카프카 시스템이 배포 메세지를 받고 구독 수신자에게 뿌려주는 형태로 구성

배경

  • 하둡의 서브 프로젝트중의 하나로 탄생했다.
  • 대용량 분산 처리 애플리케이션인 하둡은 중앙에서 분산 애플리케이션을 관리하는 코디네이션 애플리케이션이 필요했기 때문에 서브 프로젝트로 주키퍼 개발을 진행했었다.
  • 2011년 1월 하둡 서브 프로젝트 → 아파치의 탑 레벨 프로젝트로 승격한다. (wow)
  • storm, hbase, NiFi 등 많은 애플리케이션에서 사용중이다.

소개

  • 구성 정보를 유지 관리하고 이름을 지정하여 분산 동기화를 제공하고 그룹 서비스를 제공하는 중앙 집중식 서비스
    • 분산 코디네이터
  • 분산 시스템을 구성할 때 고려해야 할 사항
    • 네트워크의 신뢰성
    • 지연
    • 대역폭
    • 안정성
    • 토폴로지
    • 전송 비용
    • 네트워크 유형
  • 분산 서버들 간의 정보 공유, 동기화 분산 서버들의 상태 확인 또한 필요하다.
  • 역할
    • 그룹 멤버십
    • 잠금 제어
    • 구독/배포
    • 리더 선정
    • 동기화

카프카를 위한 주키퍼

데이터 모델

  • 데이터를 디렉토리 구조로 저장하고, 데이터가 변경되면 클라이언트에게 어떤 노드가 변경되었는지 콜백을 통해서 알려준다.
  • 클러스터 구성원 간의 데이터 공유가 가능하다(중앙화된 분산 코디네이터 클러스터).
  • 데이터 스토리지(영속/임시 데이터), 클러스터 멤버십 관리를 통한 데이터 변경 통지(watch), 마스터 및 분산 락 등에 활용되는 시퀀스 노드 등을 제공한다.

노드

  • persistent(영속) node
    • 한번 저장되고 나면 세션이 종료되어도 삭제되지 않고 유지되는 노드
    • 명시적으로 삭제하지 않는 한 해당 데이터는 삭제 및 변경되지 않는다.
  • ephemeral(임시) node
    • 특정 노드를 생성한 세션이 유효한 동안 데이터가 유효한 노드
    • 주키퍼 서버에 접속한 클라이언트가 특정 노드를 emphermeral node로 생성했다면 세션이 끊어질 때 해당 노드는 자동으로 삭제된다.
    • 클라이언트가 동작하는지의 여부를 쉽게 판단 가능하다.
  • sequence(순서) node
    • 노드 생성 시 sequence number가 자동으로 붙는 노드
    • 이 기능을 활용하여 분산 락 등을 구현할 수 있다

Watcher

  • 주키퍼 클라이언트가 특정 znode에 watch를 걸어놓을 경우 해당 znode가 변경되었을 때 클라이언트로 콜백 호출을 날려 해당 클라이언트가 변경됨을 알 수 있다.

znode

  • 주키퍼가 제공해주는 파일시스템에 저장되는 파일 하나를 znode라고 한다.
  • unix의 파일시스템처럼 node간에 hierarchy namespace를 가진다.
  • /(슬래시)를 사용
  • 기존 파일 시스템과 다르게 주키퍼는 file과 directory의 개념이 없어 znode하나만 쓰인다.

보통 zookeeper는 분산 작업을 제어하기 위한 트리 형태의 데이터 저장소로 kafka broker를 관리하는 용도로 많이 사용한다.

하지만 이런 기능 외에도 semaphore를 관리하는 서버의 용도로도 사용할 수 있다.

예제는 다음과 같다.

val zookeeperClient: CuratorFramework = CuratorFrameworkFactory.newClient(s"${conf.host}:${conf.port}", new ExponentialBackoffRetry(1000, 3))
client.start()

val maxSemaphoreNum = 5 // 5개 까지 허용
val semaphore: InterProcessSemaphoreV2 = new InterProcessSemaphoreV2(zookeeperClient, "/my/semaphore/directory", ${maxSemaphoreNum})

val lease: Lease = semaphore.acquire()

... // semaphore 획득 후 수행할 task

semaphore.returnLease(lease)
zookeeperClient.close()

웹 기반 노트북을 사용한 데이터 시각화 tool

  • spark, sparkSQL의 결과를 바로 차트로 그릴 수 있음
  • 물론 다른 언어와의 호환성이 매우 뛰어남

기존의 workflow

  • 다양한 제품을 조합하여 데이터 분석을 하는 것이 일반적
  • 많은 엔지니어링 필요
  • 다방면에 경험 많은 능력자의 전유물
  • 파이프라인이 복잡하기 때문에 고장나기 쉽고 유지보수가 어려움

Zeppelin!

spark를 통해 데이터 정제, 처리, 요약 데이터 시각화, 고급 분석까지 전부 spark와 zeppelin으로 해결 가능!

Notebook

  • 소스코드 작성, 수정, 자동 저장, 실행
  • scala(spark), spark SQL, markdown 등 지원

Visualization

  • spark SQL 수행 결과를 table, line chart, pie chart 등 다양한 형태로 시각화
  • spark의 좋은 성능 덕분에 대부분 코드가 즉시 실행되므로 interactive하게 데이터를 다룰 수 있음
  • HTML을 표현 가능하므로 테이블에 이미지를 표시하거나 link를 넣거나 하는 등의 동작 가능

분명 Serializer에 `id` 필드를 명시했다!

AttributeError at /users
Got AttributeError when attempting to get a value for field `id` on serializer `UserSerializer`.
The serializer field might be named incorrectly and not match any attribute or key on the `QuerySet` instance.
Original exception text was: 'QuerySet' object has no attribute 'id'.
Request Method:    GET
Request URL:    http://127.0.0.1:8000/users
Django Version:    3.2.7
Exception Type:    AttributeError
Exception Value:    
Got AttributeError when attempting to get a value for field `id` on serializer `UserSerializer`.
The serializer field might be named incorrectly and not match any attribute or key on the `QuerySet` instance.
Original exception text was: 'QuerySet' object has no attribute 'id'.
  • serializer는 한 개의 객체만 이해할 수 있고 리스트는 이해할 수 없기 때문이다.
  • objects.all() 은 객체들의 리스트를 리턴하므로 many=True 옵션을 추가해줌으로써 해결
...
users = User.objects.all()
serializer = UserSerializer(users, many=True)
...

+ Recent posts