• Apache Kafka는 링크드인에서 개발된 분산 메세지 시스템이다.
  • 2011년 오픈소스로 공개되었고 → 14년 11월 confluent가 창립된다.
  • 대용량의 실시간 데이터 처리에 특화된 아키텍쳐로 설계되었다.

탄생 배경

  • 링크드인 사이트가 급속도로 성장하면서 발생하는 실시간 이슈를 처리할 필요성이 대두되었다.
  • 실시간 트랜잭션 처리와 비동기 처리가 동시에 이뤄지지만 통합된 전송 영역이 없어서 복잡도가 증가했다.
  • 데이터 파이프라인 관리 복잡도(하둡, DB 등)가 심했다.

시스템 목표

  • 프로듀서와 컨슈머의 분리
  • 기존 메세징 시스템과 같이 영구적인 메세지 데이터를 여러 컨슈머에게 허용
  • 높은 처리량을 위한 메세지 최적화
  • 데이터가 증가함에 따라 스케일 아웃이 가능한 시스템

카프카 개발 전/후 데이터 처리 시스템 비교

Before

  • end-to-end 연결 방식의 문제점 발생
  • 구조가 복잡함

After

  • 이벤트/데이터의 흐름을 중앙에서 일괄적으로 관리
  • 구조가 훨씬 간결해졌다.

카프카의 지향점

카프카를 메세지 전달의 중앙 플랫폼으로 두고 필요한 모든 DB 시스템(실시간 분석 시스템, 하둡, MSA 등등)과 연결된 파이프라인을 만드는 것을 지향

카프카의 특징 및 데이터 모델

동작방식 및 원리

메세지 시스템

  • producer(publisher)
    • 데이터 단위를 보내는 부분
  • consumer(subscriber)
    • 토픽이라는 메세지 저장소에 저장된 데이터를 가져가는 부분
  • 중앙에 메세지 시스템 서버를 두고 메세지를 주고 받는 형태의 구독/배포 모델

구독/배포 모델

  • 발신자의 메세지엔 수신자가 정해져있지 않은 상태로 발행한다.
  • 구독을 신청한 수신자만 메세지를 받을 수 있다.
  • 일반적인 형태의 통신은 통신에 참여하는 개체끼리 모두 연결해야 해서 구조가 복잡해지고 확장성이 낮다.
  • 구독/배포 모델은 구조도 간단하고 확장성이 높다.
  • 데이터 유실이 적다.
  • 메세징 시스템이 중간에 있어서 전달 속도가 빠르지 않다.
    • 직접 전송하지 않기 때문
    • 중앙 카프카 시스템이 배포 메세지를 받고 구독 수신자에게 뿌려주는 형태로 구성

배경

  • 하둡의 서브 프로젝트중의 하나로 탄생했다.
  • 대용량 분산 처리 애플리케이션인 하둡은 중앙에서 분산 애플리케이션을 관리하는 코디네이션 애플리케이션이 필요했기 때문에 서브 프로젝트로 주키퍼 개발을 진행했었다.
  • 2011년 1월 하둡 서브 프로젝트 → 아파치의 탑 레벨 프로젝트로 승격한다. (wow)
  • storm, hbase, NiFi 등 많은 애플리케이션에서 사용중이다.

소개

  • 구성 정보를 유지 관리하고 이름을 지정하여 분산 동기화를 제공하고 그룹 서비스를 제공하는 중앙 집중식 서비스
    • 분산 코디네이터
  • 분산 시스템을 구성할 때 고려해야 할 사항
    • 네트워크의 신뢰성
    • 지연
    • 대역폭
    • 안정성
    • 토폴로지
    • 전송 비용
    • 네트워크 유형
  • 분산 서버들 간의 정보 공유, 동기화 분산 서버들의 상태 확인 또한 필요하다.
  • 역할
    • 그룹 멤버십
    • 잠금 제어
    • 구독/배포
    • 리더 선정
    • 동기화

카프카를 위한 주키퍼

데이터 모델

  • 데이터를 디렉토리 구조로 저장하고, 데이터가 변경되면 클라이언트에게 어떤 노드가 변경되었는지 콜백을 통해서 알려준다.
  • 클러스터 구성원 간의 데이터 공유가 가능하다(중앙화된 분산 코디네이터 클러스터).
  • 데이터 스토리지(영속/임시 데이터), 클러스터 멤버십 관리를 통한 데이터 변경 통지(watch), 마스터 및 분산 락 등에 활용되는 시퀀스 노드 등을 제공한다.

노드

  • persistent(영속) node
    • 한번 저장되고 나면 세션이 종료되어도 삭제되지 않고 유지되는 노드
    • 명시적으로 삭제하지 않는 한 해당 데이터는 삭제 및 변경되지 않는다.
  • ephemeral(임시) node
    • 특정 노드를 생성한 세션이 유효한 동안 데이터가 유효한 노드
    • 주키퍼 서버에 접속한 클라이언트가 특정 노드를 emphermeral node로 생성했다면 세션이 끊어질 때 해당 노드는 자동으로 삭제된다.
    • 클라이언트가 동작하는지의 여부를 쉽게 판단 가능하다.
  • sequence(순서) node
    • 노드 생성 시 sequence number가 자동으로 붙는 노드
    • 이 기능을 활용하여 분산 락 등을 구현할 수 있다

Watcher

  • 주키퍼 클라이언트가 특정 znode에 watch를 걸어놓을 경우 해당 znode가 변경되었을 때 클라이언트로 콜백 호출을 날려 해당 클라이언트가 변경됨을 알 수 있다.

znode

  • 주키퍼가 제공해주는 파일시스템에 저장되는 파일 하나를 znode라고 한다.
  • unix의 파일시스템처럼 node간에 hierarchy namespace를 가진다.
  • /(슬래시)를 사용
  • 기존 파일 시스템과 다르게 주키퍼는 file과 directory의 개념이 없어 znode하나만 쓰인다.

보통 zookeeper는 분산 작업을 제어하기 위한 트리 형태의 데이터 저장소로 kafka broker를 관리하는 용도로 많이 사용한다.

하지만 이런 기능 외에도 semaphore를 관리하는 서버의 용도로도 사용할 수 있다.

예제는 다음과 같다.

val zookeeperClient: CuratorFramework = CuratorFrameworkFactory.newClient(s"${conf.host}:${conf.port}", new ExponentialBackoffRetry(1000, 3))
client.start()

val maxSemaphoreNum = 5 // 5개 까지 허용
val semaphore: InterProcessSemaphoreV2 = new InterProcessSemaphoreV2(zookeeperClient, "/my/semaphore/directory", ${maxSemaphoreNum})

val lease: Lease = semaphore.acquire()

... // semaphore 획득 후 수행할 task

semaphore.returnLease(lease)
zookeeperClient.close()

웹 기반 노트북을 사용한 데이터 시각화 tool

  • spark, sparkSQL의 결과를 바로 차트로 그릴 수 있음
  • 물론 다른 언어와의 호환성이 매우 뛰어남

기존의 workflow

  • 다양한 제품을 조합하여 데이터 분석을 하는 것이 일반적
  • 많은 엔지니어링 필요
  • 다방면에 경험 많은 능력자의 전유물
  • 파이프라인이 복잡하기 때문에 고장나기 쉽고 유지보수가 어려움

Zeppelin!

spark를 통해 데이터 정제, 처리, 요약 데이터 시각화, 고급 분석까지 전부 spark와 zeppelin으로 해결 가능!

Notebook

  • 소스코드 작성, 수정, 자동 저장, 실행
  • scala(spark), spark SQL, markdown 등 지원

Visualization

  • spark SQL 수행 결과를 table, line chart, pie chart 등 다양한 형태로 시각화
  • spark의 좋은 성능 덕분에 대부분 코드가 즉시 실행되므로 interactive하게 데이터를 다룰 수 있음
  • HTML을 표현 가능하므로 테이블에 이미지를 표시하거나 link를 넣거나 하는 등의 동작 가능

분명 Serializer에 `id` 필드를 명시했다!

AttributeError at /users
Got AttributeError when attempting to get a value for field `id` on serializer `UserSerializer`.
The serializer field might be named incorrectly and not match any attribute or key on the `QuerySet` instance.
Original exception text was: 'QuerySet' object has no attribute 'id'.
Request Method:    GET
Request URL:    http://127.0.0.1:8000/users
Django Version:    3.2.7
Exception Type:    AttributeError
Exception Value:    
Got AttributeError when attempting to get a value for field `id` on serializer `UserSerializer`.
The serializer field might be named incorrectly and not match any attribute or key on the `QuerySet` instance.
Original exception text was: 'QuerySet' object has no attribute 'id'.
  • serializer는 한 개의 객체만 이해할 수 있고 리스트는 이해할 수 없기 때문이다.
  • objects.all() 은 객체들의 리스트를 리턴하므로 many=True 옵션을 추가해줌으로써 해결
...
users = User.objects.all()
serializer = UserSerializer(users, many=True)
...

새 repository를 만들때 ignore 또는 readme를 추가하고 바로

git pull origin master

를 하고 프로젝트를 시작한다면 문제가 없지만, 이미 진행중이던 프로젝트가 있는 경우 충돌이 발생한다.

에러 코드는 다음과 같다.

! [rejected] master -> master (non-fast-forward)
error: failed to push some refs to 'https://github.com/?????/server-study.git'
hint: Updates were rejected because the tip of your current branch is behind its remote counterpart. Integrate the remote changes (e.g. 'git pull ...') before pushing again.
hint: See the 'Note about fast-forwards' in 'git push --help' for details.

다음 명령어로 해결할 수 있다.

git pull origin master --allow-unrelated-histories

다만 이 방법은 강제로 merge시키는 방법이기 때문에 처음 git과 연동할 때 충돌로 인한 문제를 해결할 때 외에는 사용을 추천하지 않는다.

안녕하세요,


개인 프로젝트를 진행하다가 블로깅을 해보면서 그 과정을 남기면 어떨까...하는 심정에서 블로그를 만들게 되었습니다.


이 글이 그 첫 발걸음인데요, 먼저 구글 api를 사용하면서 겪었던 과정들을 글로 남기려고 합니다.


저와 같은 오류를 겪으신 분들에게 힘이 되기를 바라며, 글을, 블로깅을 시작하도록 하겠습니다.


https://console.cloud.google.com/

위 링크를 따라가시면 구글 api를 사용할 수 있는 Google Cloud Platform 페이지가 열립니다.


다들 구글 계정은 있으시겠죠?

로그인 하시고 좌측 상단의 메뉴 버튼(가로로 세줄)을 누르시면 위 화면과 같은 창이 뜹니다.


좌측 메뉴 중에서 '결제' 카테고리를 클릭하신 후 무료 체험 비슷한 걸 신청하시면 됩니다.


현재 저는 이미 신청한 상태라 화면을 긁어올 수 없네요ㅠㅠ


알아서 잘 하시리라 믿고 궁금하신점 있으시면 댓글로 남겨주시면 최대한 빨리 답변 드리겠습니다.




Google Cloud Platform 사용 권한을 얻으셨다면 이제 사용할 api를 선택해봅시다.

좌측 메뉴에서 API 관리자를 클릭합니다.


\



클릭 후 형광펜 칠한 'API 사용 설정' 을 클릭합니다.



사용하려는 API 관련 키워드를 입력하고 검색하여 사용하려는 API를 클릭합니다.


저같은 경우는 Speech 관련 API를 사용하기 위해서 speech를 검색하여 Google Cloud Speech API를 클릭했습니다.



저는 이미 해당 API를 사용중이기 때문에 위 화면과 같이 나타나지만, 사용 전에는 '사용' 같은 버튼이 있을겁니다.


그걸 클릭해주시면 제 화면처럼 바뀌고, 해당 계정에서 선택한 API를 사용할 수 있게 되었습니다!


*한가지 중요한 점은, API를 사용하기 전에



미리 만들어 놓은 프로젝트에 한해서만 선택한 API 사용 권한이 적용된다는 점입니다.


이 룰을 지키지 않으시면 추후의 과정에서 오류가 나타날 수 있습니다ㅠㅠ


역시나 관련 질문은 댓글 남겨주시면 최대한 빨리 답변드리겠습니다.



이제 다시 API 관리자로 돌아와서, 형광색칠 된 '사용자 인증 정보'를 클릭합니다.



그리고 '사용자 인증 정보 만들기' 를 클릭합니다.


이 '사용자 인증 정보' 는 json 파일로 이루어진, 개발자가 구글 api를 사용하기 위한 권한을 인증해주는 파일의 정보 라고 이해하시면 좋습니다.


'서비스 계정 키' 를 선택합니다.



서비스 계정 이름을 설정하고(사용자 임의), 역할은 개발에 필요한 옵션을 선택하시면 됩니다만, 가급적 '소유자'를 선택하는 것이 편할 것 같습니다. 물론 필수는 아닙니다.


키 유형은 json파일을 추천합니다. 보편적으로 많이 사용하거든요.


이제 [생성] 버튼을 눌러주시면 

다음과 같은 팝업이 뜹니다. 디폴트 다운로드 경로로 json파일이 저장됩니다. 이제 해당 json파일을 GOOGLE_APPLICATION_CREDENTIALS 환경변수로 지정하여 인증하면 선택한 구글 API를 사용할 수 있습니다!


저같은 경우 라즈베리파이에서 음성인식을 하기 위하여 Speech api를 사용했는데요, 그 과정은 따로 포스팅 하도록 하겠습니다.


여기까지 구글 api 사용을 위한 간단한 과정을 소개하는 포스트였습니다.


도움이 되셨길 바라며, 더 좋은 포스팅으로 돌아오겠습니다!

+ Recent posts