하둡 맵리듀스 원격에서 실행 하기 (How to Run Hadoop map-reduce job on cluster remotely)

1. Motivation

images

2013년 새해 첫 포스팅을 합니다. 해당 포스트를 보시는 분들은 새해 복많이 받으시고, 다들 대박 나시길 바랍니다.

하둡(Hadoop)의 핵심 코어 컴포넌트는 “HDFS (Hadoop Distributed File System)” 와
MR (MapReduce)” 입니다.
오늘 포스팅의 내용은 하둡 맵리듀스 원격에서 실행 하기 입니다.
기본적으로 MapReduce (이하 MR로 표기 하겠습니다.)의 실행은 크게 2가지일 경우 일 것입니다.

(1) “Name Node“에 “MR이 구현된 jar“를 배포 후에 “hadoop -jar …” 커맨드를 사용 하는 경우
(2) 직관적 “SQL“기반의 하둡 에코 시스템중 하나인 “Hive“를 통해서 실행 하는 경우

최근에는 후자인 “Hive“를 사용 경우가 대부분이 아닐까 조심 스럽게 예측을 합니다.
Hive“를 쓰는 몇몇 이유는 아래와 같습니다.

(1) 다소 복잡한 “MR 프로그래밍”이 보다 친근하고, 직관적인 “SQL”의 지원
(2) 다이나믹한 검색 조건 지정
(3) 매번 “Name Node” 배포 없이 원격(Remote)에서 “MR Job”를 실행 지원

하지만 모든 “MR“이 검색만 있는 것이 아니고, 다양하게 사용을 할수 있습니다.
예를 들어서 “서버 부하 테스트를 위한 클라이언트 역할”,
2012년 구매내역들에 대해서 동시에 병렬로 DB, ERP, 3-party 시스템에 전송
등이 있을 수 있습니다.

무엇보다 “Hive” 같은 경우는 검색은 유용하지만, 좀더 복잡하고, 디테일한
실무 Usecase에는 다소 미흡한 면이 있습니다.

그렇다고 직접 “MR”을 직접 만들어서 “Name Node”에 배포하는 것도
만만치 않을 것입니다.
(물론 요새는 다양한 빌드/ 배포툴이 있으니..)
또한 관리자 콘솔에서 직접 MR를 실행하는것 또한
애매한 부분들이 존재 합니다.

그래서 “만약 원격에서 MR Job을 실행하면 어떨까?” 하는 생각이 들었습니다.
만약 그렇다면 오히려 조금만 커스텀마이징 하면
Hive” 와 유사하게도 가능 하겠다는 생각이 들었습니다.
(개인적으로는 Velocity를 사용해 보고 싶기도 합니다. ^^)

올해 부터는 “포스팅 스타일“을 바꿔서 서두에는 제 스타일인 존댓말을 사용하고
본론 부터는 존댓말을 생략하는 스타일로 진행할 예정입니다.

또한 본 주제가 “하둡 맵리듀스 원격에서 실행 하기” 이기 때문에
하둡 과 MR“에 대해서는 아래의 링크로 대체를 하려고 합니다.

Hadoop & HDFS
MapReduce

2. MapReduce Job Flow

MapReduce“의 “Job Flow“는 대략 아래와 같이 진행 된다.

17583610504.gif

(1) “MR“실행하는 어플리케이션은 반드시 “Map 클래스“, “Reduce 클래스“,
Drvice 클래스“, “JobConf” 정보를 “JobClient“에게 전송을 한다.

(2) “JobClient“는 “Job Tracker“에게 “Job“를 위한 정보를 전달한다. 또한
실제 “MR“이 실행될 “.jar” 파일을 특정 “HDFS” 위치에 저장을 한다.
※ JobTracker는 대부분 “Name Node”에 위치하는 경우가 많다.

(3) “Job Tracker“는 각 “Data Node“에 있는 “Task Tracker“에 “Job“에
대한 실행을 요청 한다.

(4) “Task Tracker“는 실제로 일할 “Map Task Child 데몬” 과
“Reduce Task Child 데몬“을 생성 한다.

(5) 각 “Map Task 데몬“들은 “Job Client“가 저장했던 “.jar“파일 다운로드 하고,
Input Data“를 읽어서 “Map” 작업을 수행한다.

(6) “Map“작업이 끝나면 결과 데이터를 를 “Reduce Task 데몬“으로 전달 한다.

(7) “Reduce” 작업을 수행하고 결과를 “Job Tracker“에게 전달 한다.

예컨데 아래와 같이 “Name Node“에서 실행을 하면 위와 같은 Flow가 진행 된다.

~/hadoop/bin/hadoop jar  mr.jar pe.beyondj2ee.hadoop.mr.WordCount /user/mspider/wd /user/mspider/wordcount

여기서 우리가 집중해야할 포인트 구간은 “Job Client”이며, 특히 중요한 포인트는
동시에 “HDFS” 와 “Job Tracker”에 요청을 할때 HDFS 에게는 “.jar” 파일을
“Job Tracker”에는 “Job에 대한 config 정보”를 요청 한다는 것이다.

3. MapReduce Class Diagram

※ 기본적으로 “MR” 로직은 가장 유명한 샘플인  “WordCount”를 기준으로 한다.
또한 MapReduce 프로그래밍에 대한 부분은 생략하고, 컬러로 표시된 부분만
설명하도록 한다.

이미지 2

Tool.java : Job를 실행 시 설정에 대한 옵션을 셋업하는 인터페이스
구현체 클래스는 “run(String[] args)” 메서드를 구현해야 하며 주로 설정
초기화 로직이 구현이 된다. “MR 실행시 필요한 Driver 클래스 와 유사한 역할을 한다“.

WordCount.java : 실제 MR에 대한 핵심 로직을 담고 있다. Mapper 로직 과 Reduce 로직이 있는
클래스 들이 inner 클래스로 정의되어 있다. 또한 “Tool 인터페이스“를 구현함으로써
MR 설정 옵션“를 초기화 한다.

MRLauncher.java : MR를 실행하기 위한 라우팅 클래스 이다.
Job Name“에 따라서 거기에 해당 “MR”를 선택 후 “Job Tracker“에게 “Job”을 요청하는
Launcher” 클래스 이다.

MRLauncherTest.java : 본 예제를 테스트하는 “Junit 클래스” 이다.
파라미터로 “로컬 Jar 경로”, “input file“, “output file“를 정의 후
MRLauncher” 인스턴스를 실행 한다.

4. Test Enviroment

본 포스팅의 예제를 실행 했던 환경은 아래와 같다.

<<Server Side>>

(1) JDK : Java HotSpot(TM) 64-Bit
(2) Hadoop : CDH 3U5 (0.20.2) / Hadoop 1.0.4
※ 0.20.2 , 1.0.4 모두 테스트

<<Client Side>>

(1) JDK : Java HotSpot(TM) 32-Bit
(2) Maven pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>pe.beyondj2ee.hadoop</groupId>
	<artifactId>mapred</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<properties>

		<!-- maven build -->
		<compile.source.encoding>UTF-8</compile.source.encoding>
		<compile.jdk.version>1.6</compile.jdk.version>
		<maven.compiler.plugin.version>2.5.1</maven.compiler.plugin.version>
		<maven.resources.plugin.version>2.5</maven.resources.plugin.version>
		<maven.surefire.plugin.version>2.12</maven.surefire.plugin.version>
		<maven.enforcer.plugin>1.0.1</maven.enforcer.plugin>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-core</artifactId>
			<version>0.20.205.0</version>
		</dependency>
		<dependency>
			<groupId>org.codehaus.jackson</groupId>
			<artifactId>jackson-core-asl</artifactId>
			<version>1.9.11</version>
		</dependency>
		<dependency>
			<groupId>org.codehaus.jackson</groupId>
			<artifactId>jackson-mapper-asl</artifactId>
			<version>1.9.11</version>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.11</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>${maven.compiler.plugin.version}</version>
				<configuration>
					<source>${compile.jdk.version}</source>
					<target>${compile.jdk.version}</target>
					<encoding>${compile.source.encoding}</encoding>
				</configuration>
			</plugin>

			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-resources-plugin</artifactId>
				<version>${maven.resources.plugin.version}</version>
				<configuration>
					<encoding>${compile.source.encoding}</encoding>
				</configuration>
			</plugin>

			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-surefire-plugin</artifactId>
				<version>${maven.surefire.plugin.version}</version>
				<configuration>
					<skipTests>true</skipTests>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>

5. Sequence Diagram

이미지 3

6. Into the Source

<<WordCount.java>>

/* Copyright (c) 2013
 * All right reserved.
 * blog : http://beyondj2ee.wordpress.com
 * twitter : http://www.twitter.com/beyondj2ee
 * facebook : https://www.facebook.com/beyondj2ee
 * mail: beyondj2ee@gmail.com
 * Revision History
 * Author              Date                         Description
 * ------------------   --------------                  ------------------
 *   beyondj2ee         2013. 1. 11
 */
package pe.beyondj2ee.hadoop.mr;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;

public class WordCount extends Configured implements Tool {

	//Map Class.
	public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {

		static enum Counters {

			INPUT_WORDS
		}

		private final static IntWritable one = new IntWritable(1);

		private Text word = new Text();

		private boolean caseSensitive = true;

		private Set patternsToSkip = new HashSet();

		private long numRecords = 0;

		private String inputFile;

		public void configure(JobConf job) {
			caseSensitive = job.getBoolean("wordcount.case.sensitive", true);
			inputFile = job.get("map.input.file");

			if (job.getBoolean("wordcount.skip.patterns", false)) {
				Path[] patternsFiles = new Path[0];
				try {
					patternsFiles = DistributedCache.getLocalCacheFiles(job);
				}
				catch (IOException ioe) {
					System.err.println("Caught exception while getting cached files: " + StringUtils.stringifyException(ioe));
				}
				for (Path patternsFile : patternsFiles) {
					parseSkipFile(patternsFile);
				}
			}
		}

		private void parseSkipFile(Path patternsFile) {
			try {
				BufferedReader fis = new BufferedReader(new FileReader(patternsFile.toString()));
				String pattern = null;
				while ((pattern = fis.readLine()) != null) {
					patternsToSkip.add(pattern);
				}
			}
			catch (IOException ioe) {
				System.err.println("Caught exception while parsing the cached file '" + patternsFile + "' : "
						+ StringUtils.stringifyException(ioe));
			}
		}

		public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
			String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase();

			for (String pattern : patternsToSkip) {
				line = line.replaceAll(pattern, "");
			}

			StringTokenizer tokenizer = new StringTokenizer(line);
			while (tokenizer.hasMoreTokens()) {
				word.set(tokenizer.nextToken());
				output.collect(word, one);
				reporter.incrCounter(Counters.INPUT_WORDS, 1);
			}

			if ((++numRecords % 100) == 0) {
				reporter.setStatus("Finished processing " + numRecords + " records " + "from the input file: " + inputFile);
			}
		}
	}

	//Reduce Class.
	public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

		public void reduce(Text key, Iterator values, OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {
			int sum = 0;
			while (values.hasNext()) {
				sum += values.next().get();
			}
			output.collect(key, new IntWritable(sum));
		}
	}

	//Run method.
	public int run(String[] args) throws Exception {
		JobConf conf = new JobConf(getConf(), WordCount.class);
		conf.setJobName(WordCount.class.getName());

		conf.setJarByClass(WordCount.class);
		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(IntWritable.class);

		conf.setMapperClass(Map.class);
		conf.setCombinerClass(Reduce.class);
		conf.setReducerClass(Reduce.class);

		conf.setInputFormat(TextInputFormat.class);
		conf.setOutputFormat(TextOutputFormat.class);

		// conf.setp
		conf.setJar(args[0]);

		FileInputFormat.setInputPaths(conf, new Path(args[1]));
		FileOutputFormat.setOutputPath(conf, new Path(args[2]));

		JobClient.runJob(conf);
		return 0;
	}
}

※ 소스의 나머지 부분은 기본적인 “WordCount“의 로직임으로 여기서는 생략 한다.
131 Line : “JobConf” 의 역할은 “Configuration” 객체를 이용해서
하둡 정보 + 입력 파라미터“를 설정을 참조해 모든 task들에게 정보를 제공 한다.

146 Line : “.jar” 파일에 경로를 입력 한다. 여기서 말하는 경로는 “HDFS“의 경로가
아니라 클라이언트의 로컬 경로를 말한다.
예를 들어서 “c:/mr.jar”에 라이브러리가 있을 경우 conf.setjar("c:/mr.jar")으로 선언 한다.
상당히 중요한 부분이며, 여기에 설정된 “.jar“가 “JobClient“를 통해서 “HDFS“에 저장되고,
Job Tracker“에게 저장된 경로 정보를 전달 한다.

args[0] : “.jar”가 저장된 로컬 경로
args[1] :  Map Task 데몬이 읽어 드릴 하둡 파일 경로
args[2] :  Reduce Task 데몬이 실행후 결과를 저장하는 디렉토리 경로

151 Line : “JobConf” 오브젝트에서 최종 설정 및 옵션 정보를 취합해서
Name Node“, “Job Tracker“로 “.jar” 파일 과 함께 전송 한다.


<<MRLauncher.java>>

/* Copyright (c) 2013
 * All right reserved.
 * blog : http://beyondj2ee.wordpress.com
 * twitter : http://www.twitter.com/beyondj2ee
 * facebook : https://www.facebook.com/beyondj2ee
 * Revision History
 * Author              Date                         Description
 * ------------------   --------------                  ------------------
 *   beyondj2ee         2013. 1. 11
 */
package pe.beyondj2ee.hadoop.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;

public class MRLauncher {

	static final String FS_DEFAULT_NAME = "hdfs://192.168.0.1:9000";

	static final String MAPRED_JOB_TRACKER = "hdfs://192.168.0.1:9001";

	static final String HADOOP_USER = "mspider";

	public int invokeMR(String jobName, String[] args) throws Exception {

		int res = 1;

		if (WordCount.class.getName().equals(jobName)) {

			Configuration conf = new Configuration();
			conf.set("fs.default.name", FS_DEFAULT_NAME);
			conf.set("mapred.job.tracker", MAPRED_JOB_TRACKER);
			conf.set("hadoop.job.ugi", HADOOP_USER);

			res = ToolRunner.run(conf, new WordCount(), args);
		}

		return res;
	}

}

31 Line : “.jar“를 배포할 “Name Node” 서버 정보를 설정 한다.
32 Line : “JobConf” 정보를 전송할 “Job Tracker” 서버 정보를 설정 한다.
34 Line : “ToolRunner” 오브젝트는 “Tool 인터페이스“를 구현한 클래스의 “run” 메서드
를 호출한다. 여기서는 “WordCount” 오브젝트를 파라미터로 설정 한다.

<<MRLauncherTest.java>>

/* Copyright (c) 2013
 * All right reserved.
 * blog : http://beyondj2ee.wordpress.com
 * twitter : http://www.twitter.com/beyondj2ee
 * facebook : https://www.facebook.com/beyondj2ee
 * Revision History
 * Author              Date                         Description
 * ------------------   --------------                  ------------------
 *   beyondj2ee         2013. 1. 11
 */
package pe.beyondj2ee.hadoop.mr.test;

import org.junit.Test;

import pe.beyondj2ee.hadoop.mr.MRLauncher;
import pe.beyondj2ee.hadoop.mr.WordCount;

public class MRLauncherTest {

	@Test
	public void testWordCount () throws Exception {

		ClassLoader cl = MRLauncher.class.getClassLoader();
		String jarAbsolutePath = cl.getResource("mapred-0.0.1-SNAPSHOT.jar").toString();

		MRLauncher mrl = new MRLauncher();
		String [] args = new String[3];

		args[0] = jarAbsolutePath;
		args[1] = "/user/mspider/wd";
		args[2] = "/user/mspider/wordcount_output58";

		mrl.invokeMR(WordCount.class.getName(), args);
	}

}

29 Line : 배포할 “.jar“의 물리적 경로를 설정 한다. 추후 빌드/배포를 고려해서
classpath” 에 저장을 했다.

30 Line : 입력 디렉토리 정보를 설정 한다. (“HDFS”의 경로)
31 Line : MR이 수행 완료후 결과 디렉토리를 설정 한다.
33 Line : “MRLauncher” 오브젝트를 수행 한다.

7. Project Structure

이미지 4

8. Build & Run

<<input file 내용>>

[mspider@mspider1 conf]$ ~/hadoop/bin/hadoop fs -cat  /user/mspider/wd/wordcount.txt
hello java
i am beyondj2ee

<<project build>>

메이븐으로 프로젝트를 빌드 한다.

이미지 5

이미지 6

결과 파일을 “classpath“로 복사를 한다.

이미지 7

“Junit”을 실행 한다. 콘솔 화면을 보면 실제 “맵리듀스” 진행 상황을 볼 수 있다.

이미지 8

결과를 확인 한다.

이미지 9

9.Project Refactoring

현재 프로젝트는 “MR 소스” 와 “실행 소스” 같이 존재 함으로서 다소 헷갈린 측면이 있다.
그래서 아래와 같이 “MR 소스”  와 “어플리케이션 소스“를 분리 해서 관리 한다.
추후 “어플리케이션 소스” 빌드 시 “dependency“를 하기 때문에 자동으로 빌드가
되며, 위에서 언급 했듯이 “classpath“로 설정이 잡히기 때문에
한번만 경로 설정을 하면 이후 수정할 필요가 없다.

이미지 10

10. Real Architect

현재 구축된 시스템 과 추후 시도할만 아키텍쳐링을 마지막으로 마무리 하겠습니다.

이미지 11

 

예제 소스 다운받기

 

About these ads

3 thoughts on “하둡 맵리듀스 원격에서 실행 하기 (How to Run Hadoop map-reduce job on cluster remotely)

  1. 안녕하세요. 글 잘읽었습니다.

    그런데 보통 MR job 을 실행할 때 말씀하신 것처럼 name node에 jar 파일을 배포해서 실행하지는 않습니다. 별도 서버에서도 클러스터와 동일하게 하둡 설정을 해 놓으면 hadoop jar 명령어를 이용해서 MR job을 실행할 수 있습니다.

    그리고 name node와 job tracker는 같은 서버에서 실행하지 않습니다. 왜냐하면 둘 다 memory를 많이 필요로 하기 때문에 같은 곳에 두면 확장성이 좋지 않기 때문입니다.
    참고가 되셨길 바랍니다.

    • 좋은 의견 감사 합니다.
      namenode, jobtracker, secondary name node 다 분리하는게 당연히 좋겠죠^^
      그리고 제가 저렇게 하는 이유는 저희 요청 사항이 MR을 돌려서 카운트를
      가져온다든지, 결과 값을 받아서 다른 어플리케이션 과 연동을 해야 하거든여
      그럴경우 jobclient에서 다양한 정보를 얻을수 있어요…
      또한 workflow도 엮을수 있구여..

      단순히 리모트 서버에서 cli로 실행할수도 있지만 아무래도
      파워풀하게 핸드링을 하려면 저런 시나리오가 필요하거든여..

      • hadoop jar 를 실행하면 내부적으로 org.apache.hadoop.util.RunJar 라는 클래스가 실행되는데 해당 로직이 위 예제에서 작성하신 MRLauncher이 하고자 하는 동작과 크게 다르지 않습니다. 따라서 필요하다면 hadoop 의 RunJar를 직접 사용하는 방법도 고려할 수 있지 않을까 싶네요.

        그리고 여러 개의 MR 혹은 다른 어플리케이션과 연동하거나 workflow를 엮어서 실행해야 하는 이슈가 있다면 oozie나 cascading 을 검토해 보시는 것도 좋아 보입니다(저희 회사에서는 ETL 작업을 이렇게 하고 있습니다).

댓글 남기기

아래 항목을 채우거나 오른쪽 아이콘 중 하나를 클릭하여 로그 인 하세요:

WordPress.com 로고

WordPress.com의 계정을 사용하여 댓글을 남깁니다. 로그아웃 / 변경 )

Twitter 사진

Twitter의 계정을 사용하여 댓글을 남깁니다. 로그아웃 / 변경 )

Facebook 사진

Facebook의 계정을 사용하여 댓글을 남깁니다. 로그아웃 / 변경 )

Google+ photo

Google+의 계정을 사용하여 댓글을 남깁니다. 로그아웃 / 변경 )

%s에 연결하는 중