FlumeNG + Kafka Plugin (플럼+카프카 플러그인)

Facebook Server Side Architecture Group #SSAG
https://www.facebook.com/groups/serverside/

flume-logo images--2kafka_logo

현재 근무 중인 유누스 빅데이터 솔루션 팀에서 FlumeNG + Kafka를 검토중
해당 플러그인이 있는지 찾고 있었습니다.

그래서 구글링을 해보니 한개가 검색이 되었습니다. (https://github.com/ewhauser/flume-kafka-plugin)
그런데 FlumeOG 버전 이고, 현재 제가 사용하는 버전과 틀려습니다.
(실행도 잘안되는 것 같고 ㅡㅡ+)

그래서 혹시나 저처럼 FlumeNG + Kafka 최신버전을 사용하는 user가 있을 않을까 해서
플러그인을 만들었습니다.

개인적으로는 아쉬운 부분도 있지만  그래도 Github에 공유하는게 더 좋을듯 싶어서 올렸습니다.
입맞에 맞는 수정은 알아서들 하시길 바랍니다. ^^

그리고 본 포스팅의 목적은 FlumeNG + Kafka 플러그인 사용법이기 때문에
Flume 과 Kafka의 설명은 생략하고 기회가 될때 따로 포스팅을 하겠습니다.

<< Big Picture >>

이미지 2

product 환경에서는 좀더 복잡해 보일수 있지만 기본적으로 위에 있는 다이어그램의 흐름은 같습니다.
중요 컴포넌트는 아래와 같습니다.

  • Producer Agent (JVM instance) : 소스 데이터를 수집해서 Kafka에 전달 합니다.
  • Zookeeper : Kafka의 모든 메타 데이터(상태 관리, topic 정보, consumer..등) 를 관리 합니다.
  • Kafka : Message Queue(MQ) 이며, 소스 데이터를 중앙 집중하는 역할을 합니다.
  • Consumer Agent (JVM instance): Kafka의 topic(Queue)를 폴링하고 있다가 데이터가 소비하는 역할을 합니다.

SequenceGenerateSource에서 지속적으로 임으로 생성한 숫자(“1,2,3,4,5…” )를 내부 Channel로 전달 합니다.
KafkaSink는 내부 Channel를 폴링하고 있다가 데이터가 들어 오면 take해서 Kafka 서버로 전송 합니다.
Consumer AgentKafkaSourceKafkatopic(Queue)에 데이터가 들어 오면
내부 Channel로 데이터를 전송 합니다.(전송시 멀티 쓰레드로 처리 됩니다.)
최종 Log4jSinkconsole(화면)에 출력을 합니다.

product 환경에서는 아마도 SequenceGenerateSource는 로그 수집하는 Source로, Log4jSink
HDFS 또는 DBMS Sink를 사용 할 것입니다.

<< Technical Specifications >>

플러그인이 동작가능한 기술 스펙은 아래와 같습니다.

  • Flume 1.4
  • Kafka 0.8.0 Beta

<< Plugin Develop Lifecycle >>

플러그인 적용 하는 방법은 아래와 같습니다.

  1. Flume Agent 설치 및 plugins.d 디렉토리 생성
  2. Kafka 설치
  3. Plugin 프로젝트 생성
  4. 소스  및 config 정보 수정
  5. build 및 플러그인 , 3rd 라이브러리 배포
  6. Flume Agent 기동하기

<< Flume & Kafka Installation >>

– flume , kafka 설치 파일을 다운로드 합니다.

# download flume
$ wget http://apache.mirror.cdnetworks.com/flume/1.4.0/apache-flume-1.4.0-bin.tar.gz
# download kafka
$ wget https://dist.apache.org/repos/dist/release/kafka/kafka_2.8.0-0.8.0-beta1.tgz

– tarball 파일을 적당한 위치에 풀어서 심볼릭 링크를 설정 합니다.

$ tar xvfz <strong>apache-flume-1.4.0-bin.tar.gz</strong>
$ tar xvfz <strong>kafka_2.8.0-0.8.0-beta1.tgz</strong>
$ ln -s apache-flume-1.4.0 flume
$ ln -s kafka_2.8.0-0.8.0 kafka

– 최종 설치된 디렉토리는 아래와 같습니다.

※ 디렉토리 경로는 각자 환경에 따라서 다를수 있습니다.

flume 홈 디렉토리 : /home/hadoop/flume
kafka 홈 디렉토리 : /home/hadoop/kafka

<< Flume Plugin Directory >>

flume에서 플러그인을 등록하는 방법은 몇가지가 있습니다.
여기서는 그들이 권고하는 방법인 “plugins.d” 디렉토리 방법을 사용 하겠습니다.

$ mkdir -p /home/hadoop/flume/plugins.d/agent/lib
$ mkdir -p /home/hadoop/flume/plugins.d/agent/libext

lib” 디렉토리에는 실제로 만든 플러그인 jar를 배포하는 곳입니다.
libext” 디렉토리는 플러그인 jar에서 사용하는 3rd 라이브러리를 배포 하는 곳입니다.

즉, flume agent 기동시 클래스 로더는 “lib” 와 “libext” 디렉토리의
라이브러리들을 로딩을 시키는 것입니다.


<< Kafka 기동 하기 >>

Kafka는 기본적으로 zookeeper를 사용 합니다.  product 환경에서는 zookeeper
외부에 별도로 구성을 하지만, 여기서는 플러그인 테스트가 목적이라서 생략하겠습니다.
하지만 kafka 배포 버전에 같이 포함되어 있기 때문에 손쉽게 zookeeper를 실행 할수 있습니다.

#zookeeper 기동 하기
$ ~/kafka/bin/zookeeper-server-start.sh ~/kafka/config/zookeeper.properties

#kafka 기동 하기
$ ~/kafka/bin/kafka-server-start.sh ~/kafka/config/server.properties

Note That : 반드시 zookeeper – kafka 순으로 기동 해야 합니다.

<< Clone Plugin >>

지금까지 기본 설정 부분은 끝낫습니다. 이제 플러그인을  build & deploy하면
끝입니다.

그전에 소스를 다운로드 받고, plugin 설정 하는 방법에 대해서 말씀 드리겠습니다.

이미지 3--

제가 Github에 배포한 소스를 clone 합니다.

Github URLhttps://github.com/beyondj2ee/flumeng-kafka-plugin
Git Repository URL : https://github.com/beyondj2ee/flumeng-kafka-plugin.git

$ git clone https://github.com/beyondj2ee/flumeng-kafka-plugin.git

프로젝트 구조를 maven으로 되어 있기 때문에 이클립스 또는 IntelliJ에서 쉽게
import 하실수 있습니다.

<< Project Directory Structure >>

이미지 1@@

flumeng-kafka : Maven 최상위 부모 디렉토리 입니다.
flumeng-kafka-plugin : 실제 플러그인 프로젝트 디렉토리 입니다.
flumeng-kafka-plugin/conf : flume 설정 파일을 관리하는 디렉토리 입니다.
flumeng-kafka-plugin/libs : 플러그인에서 사용하는 3rd 라이브러리 입니다.
flumeng-kafka-plugin/package : build 후에 생성되는 플러그인 라이브러리 디렉토리 입니다.
flumeng-kafka-plugin/source : 소스 디렉토리 입니다.

실제로 배포되는 파일들은 conf, libs, package 디렉토리에 있는 파일들 입니다.

<< flume-conf.properties >>

flume-conf.properties” 설정 파일은 flume agent 입장에서 핵심 설정 파일 입니다.
어떤 곳에서 수집하고, 어떤 채널을 사용하며, 어디로 보낼지등에 대한
event flow” 정보 입니다.

KafkaSink 플러그인에서는 “Kafka“에서 제공하는 정보 와 제가 추가한
2개의 설정 정보로 구분 됩니다.

custom.” 포함된 속성정보는 제가 추가한 속성 정보 입니다.

이미지 3@@2

Property Default Description
metadata.broker.list This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas). The socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.

자세한 정보는 http://kafka.apache.org/documentation.html 참고 하시기 바랍니다.
여기서 제일 중요한 것은 metadata.broker.list 입니다.  접속할 kafka 서버를 설정 하면 됩니다.

KafkaSource도 동일하게 구성이 되어 있어서 설명은 생략 하도록 하겠습니다.

<< Junit Test >>

Junit 테스트를 하실 경우 KafkaSinkTest, KafkaSourceTest를 사용하면 됩니다.

이미지 4@@@

“JunitConstans” 클래스는 Junit 테스트를 위한 상수들 입니다.

이미지 5@@
각자 환경에 맞게 서버 정보를 수정 하면 됩니다.
그리고 쓰레드를 사용하기 때문에 Junit을 rich하게 만들지 않고, 눈으로 검증 하도록
로그만 출력 되도록 했습니다.

이미지 6@@
이미지 7@@

<< Build >>

프로젝트가 maven으로 구성이 되어 있기 때문에 IDE를 사용할 경우는
flumeng-kafka/flumeng-kafka-plugin/pom.xml에서 “package“를 실행 하면
위에서 설명한 “package” 디렉토리에 생성이 됩니다.

※ Maven 설치가 되어 있어야 합니다. ^^

이미지 8@@

이미지 9@@

만약 IDE를 사용하지 않을 경우는 해당 디렉토리 밑에서 “커맨드“를 사용해서
빌드 할 수 있습니다.

이미지 10@@@
이미지 11@@

<< Deploy >>

Build 작업이 완료되면 Flume Agent에 배포를 하면 됩니다.

flumeng-kafka/flumeng-kafka-plugin/confg/flume-config.properties ==> ~/flume/conf

flumeng-kafka/flumeng-kafka-plugin/libs/*.jar ==> ~/flume/plugins.d/agent/libext

flumeng-kafka/flumeng-kafka-plugin/package/flumeng-kafka-plugin.jar ==>  ~/flume/plugins.d/agent/lib

<< Startup Flume Agent >>

Flume Agent의 기동 순서는 먼저 “Consumer Agent” – “Producer Agent” 순으로 합니다.

#Consumer Agent 기동
$ ~/flume/bin/flume-ng agent -n consumer  -c ~/flume/conf -f ~/flume/conf/flume-conf.properties

이미지 12@@

#Producer Agent 기동
$ ~/flume/bin/flume-ng agent -n producer  -c ~/flume/conf -f ~/flume/conf/flume-conf.properties

이미지 13@@

<< Conclusion >>

지금까지 flume-kafka plugin에 대해서 설명 했습니다.  사실은 제가 전달하고 싶은 내용은
flume에서 어떻게 플러그인을 만들어서 빌드/배포 하는 방법 입니다.

flume, kafka를 하나씩 설명하기에는 하나의 포스팅으로는 무리가 따릅니다.
하지만 전체적으로 어떤 컨셉으로 진행하는지에 대해서는 도움이 되지 않았을까 합니다.

다소 아쉬운점은 KafkaSinkText 데이터만 전송 되도록 구현 했다는 것입니다.
좀더 보완해서 올리려고 생각을 했었는데 그렇게 미루다 보면 포스팅을 하지 않을 것
같아서 차라리 현재 버전을 먼저 공유하는게 낫다고 생각을 했습니다.

소스는 생각보다 어렵지 않기 때문에 각자 환경에 맞춰서 수정을 하시면 될것 같고,
만약 적용만 테스트 하고 싶은 분들은 “build” 세션부터 보셔서 적용을 하시면 될것 같습니다.

꼭 저렇게 flume + kafka 아키텍쳐링 할 필요는 없습니다. flume에서 바로
storage로 저장 할 수도 있습니다.

하지만 flumengkafka는 상당히 좋은 tool 이고 생각 합니다.
한번에 모든걸 적용하시지 마시고 flume 부터 하나씩 step by step으로
적용을 하시면 될것 같습니다.

flumeng + kafka 플러그인은 Github에도 없는데 허접하던 안하던
존재 하지 않는 무언가를 만드는 작업은 개발자로써 상당히 의미 있는 일 같습니다.

앞으로 한국의 유능한 엔지니어들이 Github에 프로젝트들이 많이 생겼으면 하는 바램 입니다.

마지막으로 flumeng – kafka 플러그인 데모 동영상을 보여 드리겠습니다.

왼쪽 커맨드 창은 “flume producer” 이며, 오른쪽 커맨드 창은 “flume consumer” 입니다.
consumer“의 로그가 가만히 있다가 “producer“가 기동하면 메세지를 받았기 때문에
로그가 계속 콘솔에 out 됩니다.

그리고 잠시 “producer” 인스턴스를 내리면, “consumer” 로그가 다시 조용하다가
producer“를 재기동 하면 다시 콘솔에 out 됩니다.

답글 남기기

아래 항목을 채우거나 오른쪽 아이콘 중 하나를 클릭하여 로그 인 하세요:

WordPress.com 로고

WordPress.com의 계정을 사용하여 댓글을 남깁니다. 로그아웃 / 변경 )

Twitter 사진

Twitter의 계정을 사용하여 댓글을 남깁니다. 로그아웃 / 변경 )

Facebook 사진

Facebook의 계정을 사용하여 댓글을 남깁니다. 로그아웃 / 변경 )

Google+ photo

Google+의 계정을 사용하여 댓글을 남깁니다. 로그아웃 / 변경 )

%s에 연결하는 중