sgykfjsm.github.com

mesos上にHadoopをインストールする

前回、mesosをDEBパッケージを使ってインストールする方法を示した。これを前提としてmesosが動いているクラスタにHadoopをインストールし、wordcountをchrnosを通じて動かすところまでの手順を示す。

ちなみに見てくれている人の時間をムダにしないために言うと、一部を除けば普通の手順(Clouderaのチュートリアルとか)でやれば大抵は普通に動くはず。なのでHadoopに詳しい人は、「Hadoop mesos」でググったほうが良いと思う。ここではSparkの利用を見越しての設定を行なうのと、とHadoopもmesosもあまり詳しくない人向けに記述する。Chronosについては、まぁついでにというレベルだし、実際非常に導入が楽なので、公式のQuick Startでも見たほうが良い。

Hadoopの準備

Hadoopの作業場所を用意してあげる。実際は諸々インストールした後にmapred(hdfs?)ユーザのホームディレクトリに用意するのが正しい作法なのだろうと後から気づいた。

vagrant@master1:/tmp$ mkdir -p /home/vagrant/hadoop_home

1-click installerのダウンロードとインストール、リポジトリキーのインストールを行なう。

vagrant@master1:/tmp$ wget http://archive.cloudera.com/cdh4/one-click-install/precise/amd64/cdh4-repository_1.0_all.deb
--2014-01-01 08:51:10--  http://archive.cloudera.com/cdh4/one-click-install/precise/amd64/cdh4-repository_1.0_all.deb
Resolving archive.cloudera.com (archive.cloudera.com)... 184.73.217.71
Connecting to archive.cloudera.com (archive.cloudera.com)|184.73.217.71|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3304 (3.2K) [application/x-debian-package]
Saving to: `cdh4-repository_1.0_all.deb'

100%[============================================================>] 3,304       --.-K/s   in 0.001s

2014-01-01 08:51:12 (4.34 MB/s) - `cdh4-repository_1.0_all.deb' saved [3304/3304]

vagrant@master1:/tmp$ sudo dpkg -i cdh4-repository_1.0_all.deb
(Reading database ... 72497 files and directories currently installed.)
Preparing to replace cdh4-repository 1.0 (using cdh4-repository_1.0_all.deb) ...
Unpacking replacement cdh4-repository ...
Setting up cdh4-repository (1.0) ...
gpg: keyring `/etc/apt/trusted.gpg.d/cloudera-cdh4.gpg' created
gpg: key 02A818DD: public key "Cloudera Apt Repository" imported
gpg: Total number processed: 1
gpg:               imported: 1
vagrant@master1:/tmp$ curl -s http://archive.cloudera.com/cdh4/ubuntu/precise/amd64/cdh/archive.key | sudo apt-key add -
OK

CDH4 with MRv1をインストールする。

# マスターノードのみ(jobtrackerはいらないかも?というか、多分いらない)
vagrant@master1:/tmp$ sudo apt-get -y install hadoop-0.20-mapreduce-jobtracker hadoop-hdfs-namenode hadoop-client
# マスターノードとスレーブノードの両方
vagrant@master1:/tmp$ sudo apt-get -y install hadoop-0.20-mapreduce-tasktracker hadoop-hdfs-datanode hadoop-client

設定ファイルを修正する。細かい点については多々不適切というかチューニングの余地があると思うけど、とりあえず動かすならこの設定で大丈夫だと思う。

core-site.xml / link
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://master1:8020/</value>
  </property>
  <property>
    <name>hadoop.tmp.dir</name>
    <value>/tmp/hadoop</value>
  </property>
  <property>
    <name>hadoop.security.group.mapping</name>
    <value>org.apache.hadoop.security.ShellBasedUnixGroupsMapping</value>
  </property>
</configuration>
hdfs-site.xml / link
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
  <property>
     <name>dfs.name.dir</name>
     <value>/var/lib/hadoop-hdfs/cache/hdfs/dfs/name</value>
  </property>
  <property>
    <name>dfs.permissions.superusergroup</name>
    <value>hadoop</value>
  </property>
  <property>
    <name>dfs.name.dir</name>
    <value>/home/vagrant/hadoop_home/data/1/dfs/nn</value>
  </property>
  <property>
    <name>dfs.webhdfs.enabled</name>
    <value>true</value>
  </property>
  <property>
    <name>dfs.datanode.data.dir</name>
    <value>/home/vagrant/hadoop_home/data/1/dfs/dn,/home/vagrant/hadoop_home/data/2/dfs/dn,/home/vagrant/hadoop_home/data/3/dfs/dn</value>
  </property>
</configuration>
mapred-site.xml / link
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
  <property>
   <name>mapred.job.tracker</name>
   <value>master1:8021</value>
  </property>
  <property>
   <name>mapred.local.dir</name>
   <value>/home/vagrant/hadoop_home/data/1/mapred/local,/home/vagrant/hadoop_home/data/2/mapred/local,/home/vagrant/hadoop_home/data/3/mapred/local</value>

  </property>
  <property>
   <name>mapreduce.jobtracker.restart.recover</name>
   <value>true</value>
  </property>
  <!-- The web UI will bind -->
  <property>
      <name>mapred.job.tracker.http.address</name>
      <value></value>
  </property>

  <!-- This identifies the mesos-master. E.g. zk://1.1.1.1:2181,2.2.2.2:2181,3.3.3.3:2181/mesos -->
  <property>
      <name>mapred.mesos.master</name>
      <value>zk://master1:2181,slave1:2181,slave2:2181/mesos</value>
  </property>
    <!--
      This property identifies the location of the modified hadoop distribution containing this XML file.
      The mesos slave will download this distribution if a hadoop job is launched, extract the file and use the hadoop binary
      to start the task tracker.
      Sample hdfs://<hdfs-namenode-host & optional port>/hadoop-2.0.0-mr1-cdh4.2.1.tgz -> hdfs://namenode.mesosphere.io:9000/hadoop-2.0.0-mr1-cdh4.2.1.tgz
    -->
    <property>
        <name>mapred.mesos.executor.uri</name>
        <value>hdfs://master1/hadoop-2.0.0-mr1-cdh4.2.1.tgz</value>
    </property>
    <!--
      The remaining properties do not require adjustment, but for running production jobs it's recommended to modify them
      to optimize for different cluster & machine sizes.
    -->
    <property>
        <name>mapred.mesos.slot.cpus</name>
        <value>0.20</value>
    </property>
    <property>
        <name>mapred.mesos.slot.disk</name>
        <!-- The value is in MB. -->
        <value>512</value>
    </property>
    <property>
        <name>mapred.mesos.slot.mem</name>
        <!-- Note that this is the total memory required for
         JVM overhead (256 MB) and the heap (-Xmx) of the task.
         The value is in MB. -->
        <value>368</value>
    </property>
    <property>
        <name>mapred.mesos.tasktracker.mem</name>
        <value>368</value>
    </property>
    <property>
        <name>mapred.mesos.total.map.slots.minimum</name>
        <value>1</value>
    </property>
    <property>
        <name>mapred.mesos.total.reduce.slots.minimum</name>
        <value>1</value>
    </property>
    <!-- The values below should work out of the box but you might want to optimize some of them for running production jobs -->
    <property>
        <name>mapred.jobtracker.taskScheduler</name>
        <value>org.apache.hadoop.mapred.MesosScheduler</value>
    </property>
    <property>
        <name>mapred.mesos.taskScheduler</name>
        <value>org.apache.hadoop.mapred.JobQueueTaskScheduler</value>
    </property>
    <!-- The MesosScheduler will record some stats in this file -->
    <property>
        <name>mapred.mesos.state.file</name>
        <value>/tmp/jobtracker-state</value>
    </property>
    <!-- This is only relevant if a fixed slot policy is used -->
    <property>
        <name>mapred.tasktracker.map.tasks.maximum</name>
        <value>10</value>
    </property>
    <!-- This is only relevant if a fixed slot policy is used -->
    <property>
        <name>mapred.tasktracker.reduce.tasks.maximum</name>
        <value>10</value>
    </property>
    <property>
        <name>mapreduce.jobtracker.expire.trackers.interval</name>
        <value>60000</value>
    </property>
    <property>
        <name>mapred.tasktracker.expiry.interval</name>
        <value>60000</value>
    </property>
    <property>
        <name>mapreduce.jobtracker.restart.recover</name>
        <value>true</value>
    </property>
    <property>
        <name>mapred.child.java.opts</name>
        <value>-XX:+UseParallelGC -Xmx256m</value>
    </property>
    <property>
        <name>mapreduce.tasktracker.dns.interface</name>
        <value>eth0</value>
    </property>
    <!-- The reduce tasks start when 60% of the maps are done -->
    <property>
        <name>mapreduce.job.reduce.slowstart.completedmaps</name>
        <value>0.60</value>
    </property>
    <property>
        <name>mapred.reduce.slowstart.completed.maps</name>
        <value>0.60</value>
    </property>
    <!-- This is important when the tasktracker serves tons of maps, TODO(*) templetize -->
    <property>
        <name>mapreduce.tasktracker.http.threads</name>
        <value>8</value>
    </property>

    <property>
        <name>tasktracker.http.threads</name>
        <value>8</value>
    </property>
    <property>
        <name>mapreduce.reduce.shuffle.parallelcopies</name>
        <value>20</value>
    </property>
    <property>
        <name>mapred.reduce.parallel.copies</name>
        <value>20</value>
    </property>
    <property>
        <name>mapreduce.jobtracker.handler.count</name>
        <value>70</value>
    </property>
    <property>
        <name>mapred.job.tracker.handler.count</name>
        <value>70</value>
    </property>
    <property>
        <name>mapreduce.reduce.shuffle.retry-delay.max.ms</name>
        <value>10000</value>
    </property>
    <property>
        <name>mapreduce.reduce.shuffle.connect.timeout</name>
        <value>10000</value>
    </property>
    <property>
        <name>mapreduce.reduce.shuffle.read.timeout</name>
        <value>10000</value>
    </property>
    <property>
        <name>mapreduce.reduce.shuffle.maxfetchfailures</name>
        <value>4</value>
    </property>
    <property>
        <name>mapreduce.reduce.shuffle.notify.readerror</name>
        <value>true</value>
    </property>
    <property>
        <name>mapreduce.map.output.compress</name>
        <value>true</value>
    </property>
    <property>
        <name>mapreduce.task.io.sort.mb</name>
        <value>30</value>
    </property>
    <property>
        <name>io.sort.mb</name>
        <value>30</value>
    </property>
    <property>
        <name>mapreduce.task.io.sort.factor</name>
        <value>10</value>
    </property>
    <property>
        <name>io.sort.factor</name>
        <value>10</value>
    </property>
    <property>
        <name>mapreduce.job.jvm.numtasks</name>
        <value>-1</value>
    </property>
    <property>
        <name>mapred.job.reuse.jvm.num.tasks</name>
        <value>-1</value>
    </property>
    <property>
        <name>mapreduce.job.ubertask.enable</name>
        <value>true</value>
    </property>
    <property>
        <name>mapreduce.job.speculative.speculativecap</name>
        <value>0.01</value>
    </property>
    <property>
        <name>webinterface.private.actions</name>
        <value>true</value>
    </property>
    <property>
        <name>mapreduce.jobtracker.webinterface.trusted</name>
        <value>true</value>
    </property>
    <property>
        <name>mapred.reduce.max.attempts</name>
        <value>6</value>
    </property>
    <property>
        <name>mapred.map.max.attempts</name>
        <value>6</value>
    </property>

    <property>
        <name>mapreduce.map.maxattempts</name>
        <value>6</value>
    </property>
    <property>
        <name>mapreduce.reduce.maxattempts</name>
        <value>6</value>
    </property>
    <property>
        <name>mapred.max.tracker.failures</name>
        <value>6</value>
    </property>
    <property>
        <name>mapreduce.job.maxtaskfailures.per.tracker</name>
        <value>6</value>
    </property>
    <property>
        <name>mapreduce.reduce.merge.memtomem.enabled</name>
        <value>true</value>
    </property>
    <property>
        <name>mapred.skip.map.max.skip.records</name>
        <value>10</value>
    </property>
    <property>
        <name>mapreduce.map.skip.maxrecords</name>
        <value>10</value>
    </property>
    <property>
        <name>mapreduce.reduce.skip.maxgroups</name>
        <value>2</value>
    </property>
    <property>
        <name>mapred.skip.reduce.max.skip.groups</name>
        <value>2</value>
    </property>
    <property>
        <name>mapreduce.fileoutputcommitter.marksuccessfuljobs</name>
        <value>false</value>
    </property>
    <property>
        <name>mapred.mesos.tasktracker.cpus</name>
        <!-- This is the number of CPUs reserved for the container.-->
        <value>0.15</value>
    </property>
</configuration>

この設定について、nameノード1台、taskノード2台で試す場合

<property>
    <name>mapred.mesos.total.map.slots.minimum</name>
    <value>1</value>
</property>
<property>
    <name>mapred.mesos.total.reduce.slots.minimum</name>
    <value>1</value>
</property>

デフォルトだと、それぞれvalueはゼロとなっているが、その設定のままだと、例えばこの後に何らかのJobを動かしてMapperが2つ動いてしまい、Reducerが動かなくなってしまう(Mapper 100%, Reducer 0%でPendingしてしまう)。なので、それぞれ(Reducerだけでいいかもしれない)のminimumを1にしておく必要がある。

設定ファイルの修正が済んだら、必要なディレクトリを用意する。

vagrant@master1:/tmp$ sudo mkdir -p /home/vagrant/hadoop_home/data/{1,2,3}/mapred/local
vagrant@master1:/tmp$ sudo chown -R mapred:hadoop /home/vagrant/hadoop_home

ここまでが通常のHadoopの準備。設定ファイルは全ノードにばらまいてdatanodeを再起動しておく。もちろんマスターノードはnamenodeも再起動しておく。

vagrat:master1:~/hadoop_home$ for x in `cd /etc/init.d/; ls hadoop-*`;do sudo service ${x} restart ;done

次がHadoop on mesosの準備。

Hadoop on mesosとは?(自分なりの理解)

ここで改めてHadoop on mesosと先ほど準備したHadoopとの違い(?)について、自分なりに理解のおさらいをする。
mesosって要は散在しているマシンリソースを束ねてクラスタ化する、みたいなイメージなんだけど、そこにHadoopが乗っかることでスケーラビリティだったり、バッチ処理における耐障害性の向上をもたらすことができるようになる。
もう少し整理して言うと、Hadoopのマスターノードがスレーブノードにjobtrackerを通じて仕事を渡して、スレーブノードはtasktrackerを通じて成果を返す。Hadoopのバッチ処理の肝はここにあると思っていて、ばら撒いた仕事がちゃんと期待した成果として戻ってくることを保証してくれることにある。なので、渡した仕事が返ってこなかったり、仕事中にスレーブノードが死んだりしたときに、その仕事を別のスレーブノードに適切に渡してあげないといけないし、マスターノードが死んでた時には新しいマスターノードがちゃんとこれまでの成果を引き継いでおいて、スレーブノードは新しいマスターノードに成果を渡せるようになっておかないといけない。つまり、jobtrackerとtasktrackerの間のコミュケーションを適切に維持していく必要がある。mesosはこの点を強化させていくことができる、のだと思う…。あまり自信がない…。まぁ大筋では外していないと思う。

Hadoop on mesosの設定をする。

なので、Hadoopのjobtrackerやtasktrackerをうまくmesosと融合というか連携というか、アレする必要があるので以下のようにする。以降の手順はmesosphereのRun Apache Hadoop on Apache Mesosを踏襲しているので、そちらも参考にされたし。

vagrat:master1:~/hadoop_home$ wget http://downloads.mesosphere.io/dcc/apps/hadoop/distros/mr1-2.0.0-mr1-cdh4.2.1.tar.gz
vagrat:master1:~/hadoop_home$ tar xzf mr1-2.0.0-mr1-cdh4.2.1.tar.gz
vagrat:master1:~/hadoop_home$ cd hadoop-2.0.0-mr1-cdh4.2.1
vagrat:master1:~/hadoop_home/hadoop-2.0.0-mr1-cdh4.2.1 $ wget -O lib/ http://downloads.mesosphere.io/dcc/apps/hadoop/hadoop-mesos/hadoop-mesos-0.0.3.jar
vagrat:master1:~/hadoop_home/hadoop-2.0.0-mr1-cdh4.2.1 $ wget -O lib/ wget http://downloads.mesosphere.io/maven/org/apache/mesos/mesos/0.14.0-rc4/mesos-0.14.0-rc4.jar

ここで取得した2つのjarをHadoopのlibディレクトリに配置して、それをtarで梱包したら出来上がり。あとはこの梱包したmesos用Hadoopの実行モジュールをスレーブノードが使えるようにHDFS上にアップする。試してはいないけど、これって多分何らかの方法で実行モジュールが取得できればいいと思うので、S3とかにアップロードしてもいいんじゃないかと思う。もちろんその場合は、mapred.mesos.executor.uriのvalueをS3のURLにしておく必要がある。はず。

vagrat:master1:~/hadoop_home/hadoop-2.0.0-mr1-cdh4.2.1 $ cd ../
vagrat:master1:~/hadoop_home$ tar czf hadoop-2.0.0-mr1-cdh4.2.1.tar.gz hadoop-2.0.0-mr1-cdh4.2.1
vagrat:master1:~/hadoop_home$ sudo -u mapred /usr/bin/hadoop dfs -copyFromLocal ./hadoop-2.0.0-mr1-cdh4.2.1.tgz /

これでおおよその準備は整った。で、バッチ処理を走らせるにあたってjobtrackerを起動させるわけだけど、先ほど触れたようにmesosがjobtrackerやtasktrackerをよろしくやってくれる(と思っている)ので、これらはHDFS上にアップした実行モジュールのjobtrackerを用いる。マスターノードで起動させるので、梱包前のディレクトリに移動し、以下の様な感じで起動させれば良い。

launch_hadoop_jobtracker.sh / link
1
2
3
4
5
6
7
8
#!/bin/bash

sudo -u mapred -g hadoop \
env JAVA_HOME=/usr/lib/jvm/java-1.7.0-openjdk-amd64 \
HADOOP_HOME=`pwd`/hadoop-2.0.0-mr1-cdh4.2.1 \
HADOOP_CONF_DIR=/etc/hadoop/conf \
JAVA_LIBRARY_PATH=/usr/local/lib/ \
`pwd`/hadoop-2.0.0-mr1-cdh4.2.1/bin/hadoop jobtracker

上記のスクリプトで起動されるjobtrackerはserviceスクリプトみたいにデーモン化はされなくて、そのままコンソール上にログを垂れ流す。止めたい場合はCtrl + Cとすると、でjobtrackerがシャットダウンされる。デーモン化させたい場合は、${HADOOP_HOME}/bin/hadoop_daemon.shとかに渡してあげればいいんじゃないかと。試してないけど。あと、上記スクリプトはsudo bash launch_hadoop_jobtracker.shとしてあげてください。

実際に起動してみると、以下の様なログが出て、ジョブ待ちの状態となる。

14/01/10 16:38:00 INFO mapred.ResourcePolicy: JobTracker Status
      Pending Map Tasks: 0
   Pending Reduce Tasks: 0
      Running Map Tasks: 0
   Running Reduce Tasks: 0
         Idle Map Slots: 1
      Idle Reduce Slots: 1
     Inactive Map Slots: 0 (launched but no hearbeat yet)
  Inactive Reduce Slots: 0 (launched but no hearbeat yet)
       Needed Map Slots: 0
    Needed Reduce Slots: 0
     Unhealthy Trackers: 0
14/01/10 16:38:00 INFO mapred.ResourcePolicy: Satisfied map and reduce slots needed.

これだけみてもつまらないのだけど、この状態でmesosのWebUIにアクセスしてみるとActive Frameworksとして起動中のjobtrackerが登録されているのがわかる。

ここからIDのリンクをクリックすると、以下の画面が出てスレーブノードでtasktrackerが起動していることがわかる。

さらにTask_Tracker_1のリンクをクリックしたところ。

そしてSandboxのリンクをクリックしたところ。HDFS上にアップした実行モジュールがダウンロードされて展開されていることがわかる。ちなみにこのリンクはローカルで名前解決できるようにしておかないと見れないので注意。

ちなみにjobtrackerを終わらせるとTerminated Frameworksに移動する。

これらのことから、HadoopのjobtrackerとtasktrackerはFrameworkとしてmesosに管理されていることがわかる。つまり、mesosのリソースとしてスレーブノードが管理されることでその上で走るjobtrackerなりtasktrackerも同様に管理されることになる?という理解でいいのだろうか?mesosを利用することでDCをまたいだHadoopジョブが可能になったりしてスケーラビリティが向上するとかの効用もきっとあるのだと思う。

んーでもこのへんの理解はちゃんとmesosの論文を読むなりアーキテクチャの理解を深めないとダメだ。

で。結構長くなったのでChronosを用いたHadoopジョブの実行は次回に持ち越し。