From 01ee70d42019380ca36084a7f03c055d07c83b35 Mon Sep 17 00:00:00 2001 From: Tim Gerla Date: Tue, 25 Jun 2013 13:08:00 -0700 Subject: [PATCH] README reformatting/editing, minor style tweaks --- hadoop/README.md | 259 +++++++++++++++------ hadoop/group_vars/all | 10 +- hadoop/hosts | 5 + hadoop/roles/common/tasks/common.yml | 8 +- hadoop/roles/hadoop_primary/tasks/main.yml | 2 +- hadoop/site.yml | 5 +- 6 files changed, 203 insertions(+), 86 deletions(-) diff --git a/hadoop/README.md b/hadoop/README.md index 9d208a1..5b6f1f4 100644 --- a/hadoop/README.md +++ b/hadoop/README.md @@ -1,43 +1,69 @@ -## Deploying Hadoop Clusters using Ansible. +# Deploying Hadoop Clusters using Ansible ## Preface -The Playbooks in this example are made to deploy Hadoop Clusters for users, these playbooks can be used to: +The playbooks in this example are designed to deploy a Hadoop cluster on a +CentOS 6 or RHEL 6 environment using Ansible. The playbooks can: -1) Deploy a fully functional Hadoop Cluster wth HA and automatic failover. +1) Deploy a fully functional Hadoop cluster with HA and automatic failover. -2) Deploy a fully functional hadoop cluster with no HA. +2) Deploy a fully functional Hadoop cluster with no HA. -3) Deploy Additional nodes to scale the cluster +3) Deploy additional nodes to scale the cluster +These playbooks require Ansible 1.2, CentOS 6 or RHEL 6 target machines, and install +the open-source Cloudera Hadoop Distribution (CDH) version 4. -## Brief introduction to diffrent components of Hadoop Cluster. +## Hadoop Components -Hadoop is framework that allows processing of large datasets across large clusters. The two main components that make up a Hadoop cluster are HDFS Filesystem -and the MapReduce framework. -Briefly HDFS filesystem is responisble for storing data across the cluster nodes on it's local disks. -While MapReduce is the jobs that would run on these nodes to get a meaningful result using the data stored on these hdfs filesystem. +Hadoop is framework that allows processing of large datasets across large +clusters. The two main components that make up a Hadoop cluster are the HDFS +Filesystem and the MapReduce framework. Briefly, the HDFS filesystem is responsible +for storing data across the cluster nodes on its local disks. The MapReduce +jobs are the tasks that would run on these nodes to get a meaningful result +using the data stored on the HDFS filesystem. -Lets have a closer look at each of these components. +Let's have a closer look at each of these components. ## HDFS ![Alt text](/images/hdfs.png "HDFS") -The above diagram illustrates a hdfs filesystem, The cluster consists of three DataNodes who are responsible for storing/replicating data, while the NameNode is a process which is responsible for storing the metadata for the entire Filesystem. -As the example illustrates above when a client wants to write a file to the HDFS cluster it first contacts the namenode and let's it know that it want to write a file. The namenode then decides where and how the file should be saved and notifies the client about it's decision. +The above diagram illustrates an HDFS filesystem. The cluster consists of three +DataNodes which are responsible for storing/replicating data, while the NameNode +is a process which is responsible for storing the metadata for the entire +filesystem. As the example illustrates above, when a client wants to write a +file to the HDFS cluster it first contacts the NameNode and lets it know that +it want to write a file. The NameNode then decides where and how the file +should be saved and notifies the client about its decision. -In the given example "File1" has a size of 128MB and the block size of the HDFS filesystem is 64 MB, Hence the namenode instructs the client to break down the file into two diffrent blocks and write the first block to Datanode1 and the second block to Datanode2. Upon recieving the notification from NameNode the client contacts the Datanode1 and Datanode2 directly and writes the data. Once the data is recieved by the datanodes the datanodes replicates the the block cross other nodes, the number of nodes across which the data would be replicated is based on the dfs configuration, the default value is 3. -Meanwhile the NameNode updates it metadata with the entry of the new file "File1" and the locations where they are stored. +In the given example "File1" has a size of 128MB and the block size of the HDFS +filesystem is 64 MB. Hence, the NameNode instructs the client to break down the +file into two different blocks and write the first block to DataNode 1 and the +second block to DataNode 2. Upon receiving the notification from the NameNode, +the client contacts DataNode 1 and DataNode 2 directly to write the data. +Once the data is recieved by the DataNodes, they replicate the block across the +other nodes. The number of nodes across which the data would be replicated is +based on the HDFS configuration, the default value being 3. Meanwhile the +NameNode updates its metadata with the entry of the new file "File1" and the +locations where the parts are stored. -##MapReduce +## MapReduce -MapReduce is mostly a java application that utilizes the data stored in the hdfs filesystem to get some useful/meaningful result. The whole job/application is split into two the "Map" job and the "Reduce" Job. +MapReduce is a Java application that utilizes the data stored in the +HDFS filesystem to get some useful and meaningful result. The whole job is +split into two parts: the "Map" job and the "Reduce" Job. -Let's consider an example, In the previous step we had uploaded the "File1" into the hdfs filesystem and the file broken down into two diffrent blocks, let's consider that the first block had the data "black sheep" in it and the second block has data "white sheep" in it. Now let's assume a client want to get count of all the words occuring in "File1". -Inorder to get the count, the first thing the client would have to do is write a "map" program then a "reduce" program. -Here's a psudeo code of how the map and reduce job might look like: +Let's consider an example. In the previous step we had uploaded the "File1" +into the HDFS filesystem and the file was broken down into two different +blocks. Let's assume that the first block had the data "black sheep" in it and +the second block has the data "white sheep" in it. Now let's assume a client +wants to get count of all the words occurring in "File1". In order to get the +count, the first thing the client would have to do is write a "Map" program +then a "Reduce" program. + +Here's a psudeo code of how the Map and Reduce jobs might look: mapper (File1, file-contents): for each word in file-contents: @@ -49,18 +75,31 @@ Here's a psudeo code of how the map and reduce job might look like: sum = sum + value emit (word, sum) -The work of the mapper job is to go through all the words in the file and emit a key,value pair, in this case the key is the word itself and value is always 1. -The reducer is quite simple increment the value of sum by 1, for each value it gets. +The work of the Map job is to go through all the words in the file and emit +a key/value pair. In this case the key is the word itself and value is always +1. -Once the map and reduce jobs is ready the client would instruct the "JobTracker" ( The process resposible for scheduling the jobs on the cluster) to run the mapreduce job on "File1" +The Reduce job is quite simple: it increments the value of sum by 1, for each +value it gets. + +Once the Map and Reduce jobs are ready, the client would instruct the +"JobTracker" (the process resposible for scheduling the jobs on the cluster) +to run the MapReduce job on "File1" -Let have closer look at the anotomy of a Map Job. +Let's have closer look at the anotomy of a Map job. ![Alt text](/images/map.png "Map job") -As the Figure above shows when the client instructs the jobtracker to run a job on File1, the jobtracker first contacts the namenode to determine where the blocks of the File1 are, Then the jobtracker sends down the map jobs jar file down to the nodes having the blocks and the tasktracker process in those nodes run those jar/java files. -In the above example datanode1 and datanode2 had the blocks so the tasktrackers on those nodes run the map jobs, Once the jobs are completed the two nodes would have key,value results as below: +As the figure above shows, when the client instructs the JobTracker to run a +job on File1, the JobTracker first contacts the NameNode to determine where the +blocks of the File1 are. Then the JobTracker sends the Map job's JAR file down +to the nodes having the blocks, and the TaskTracker process those nodes to run +the application. + +In the above example, DataNode 1 and DataNode 2 havw the blocks, so the +TaskTrackers on those nodes run the Map jobs. Once the jobs are completed the +two nodes would have key/value results as below: MapJob Results: @@ -73,77 +112,129 @@ MapJob Results: "Sheep: 1" -Once the Map Phase is completed the jobtracker process initiates the Shuffle and Reduce process. -Let's have closer look at the shuffle-reduce job. +Once the Map phase is completed the JobTracker process initiates the Shuffle +and Reduce process. + +Let's have closer look at the Shuffle-Reduce job. ![Alt text](/images/reduce.png "Reduce job") -As the figure above demostrates the first thing that jobtracker does is that it spawns a reducer job on the datanode/tasktracker nodes for each "key" in the job results. In this case we have three keys "black,white,sheep" in our results, so three reducers are spawned one for each key and the map jobs shuffles/ or give out thier keys to the respective reduce jobs who owns that key. Then as per the reduce jobs code the sum is calculated and the result is written into the HDFS filesystem in a common directory. In the above example the output directory is specified as "/home/ben/oputput" so all the reducers will write thier results into this directory under diffrent files, the file names being "part-00xx", where x being the reducer/partition number. +As the figure above demonstrates, the first thing that the JobTracker does is +spawn a Reducer job on the DataNode/Tasktracker nodes for each "key" in the job +result. In this case we have three keys: "black, white, sheep" in our result, +so three Reducers are spawned: one for each key. The Map jobs shuffle the keys +out to the respective Reduce jobs. Then the Reduce job code runs and the sum is +calculated, and the result is written into the HDFS filesystem in a common +directory. In the above example the output directory is specified as +"/home/ben/output" so all the Reducers will write their results into this +directory under different filenames; the file names being "part-00xx", where x +is the Reducer/partition number. -## Hadoop Deployment. +## Hadoop Deployment ![Alt text](/images/hadoop.png "Reduce job") -The above diagram depicts a typical hadoop deployment, the namenode and jobtracker usually resides on the same node, though it can on seperate node. The datanodes and tasktrackers run on the same node. The size of the cluster can be scaled to thousands of node with petabytes of storage. -The above deployment model provides redundancy for data as the hdfs filesytem takes care of the data replication, The only single point of failure are the NameNode and the TaskTracker. If any of these components fail the cluster wont be usable. +The above diagram depicts a typical Hadoop deployment. The NameNode and +JobTracker usually reside on the same machine, though they can run on seperate +machines. The DataNodes and TaskTrackers run on the same node. The size of the +cluster can be scaled to thousands of nodes with petabytes of storage. -## Making Hadoop HA. +The above deployment model provides redundancy for data as the HDFS filesytem +takes care of the data replication. The only single point of failure are the +NameNode and the TaskTracker. If any of these components fail the cluster will +not be usable. -To make Hadoop Cluster Highly Available we would have to add another set of Jobtracker/Namenode and make sure that the data updated by the master is also somehow also updated by the Client, and incase of the failure of the primary nodes/process the Seconday node/process takes over that role. -The First things that has be taken care is the data held/updated by the NameNode, As we recall NameNode holds all the metadata about the filesytem so any update to the metadata should also be reflected on the secondary namenode's metadata copy. -This syncroniztion of the primary and secondary namenode metadata is handled by the Quorum Journal Manager. +## Making Hadoop HA -###Quorum Journal Manager. +To make the Hadoop cluster highly available we would have to add another set of +JobTracker/NameNodes, and make sure that the data updated by the master is also +somehow also updated by the client. In case of failure of the primary node, the +secondary node takes over that role. +The first thing that has to be dealt with is the data held by the NameNode. As +we recall, the NameNode holds all of the metadata about the filesystem, so any +update to the metadata should also be reflected on the secondary NameNode's +metadata copy. The synchronization of the primary and seconary NameNode +metadata is handled by the Quorum Journal Manager. + + +### Quorum Journal Manager ![Alt text](/images/qjm.png "QJM") -As the figure above shows the Quorum Journal manager consists of the journal manager client and journal manager nodes. The journal manager clients resides on the same node as the namenodes, and in case of primary collects all the edits logs happening on the namenode and sends it out to the Journal nodes. The journal manager client residing on the secondary namenode regurlary contacts the journal nodes and updates it's local metadata to be consistant with the master node. Incase of primary node failure the the seconday namenode updates itself to the lastest edit logs and takes over as the primary namenode. +As the figure above shows the Quorum Journal manager consists of the journal +manager client and journal manager nodes. The journal manager clients reside +on the same node as the NameNodes, and in case of primary node, collects all the +edits logs happening on the NameNode and sends it out to the Journal nodes. The +journal manager client residing on the secondary namenode regurlary contacts +the journal nodes and updates its local metadata to be consistant with the +master node. In case of primary node failure the secondary NameNode updates +itself to the latest edit logs and takes over as the primary NameNode. -###Zookeeper -Apart from the data consistentcy, a distrubuted/cluster system would also need mechanism for centralized co-ordination, for example there should be a way for secondary node to tell if the primary node is running properly, and if not it has to take up the act of becoming the primary. +### Zookeeper -Zookeeper provides Hadoop with a mechanism to co-ordinate with each other. +Apart from data consistency, a distributed cluster system also needs a +mechanism for centralized coordination. For example, there should be a way for +the secondary node to tell if the primary node is running properly, and if not +it has to take up the role of the primary. Zookeeper provides Hadoop with a +mechanism to coordinate in this way. ![Alt text](/images/zookeeper.png "Zookeeper") -As the figure above shows the the zookeeper services are a client server based service, The server service itself is replicated over a set of machines that comprise the service, in short HA is built inbuilt for Zookeeper servers. +As the figure above shows, the Zookeeper services are client/server baseds +service. The server component itself is replicated over a set of machines that +comprise the service. In short, high availability is built into the Zookeeper +servers. -For hadoop two zookeeper client have been built, zkfc (zookeeper failover controller ) for namenode and jobtracker, which runs on the same machines as the namenode/jobtracker themselves. - -When a zkfc client is started it establishes a connection with one of the zookeeper nodes and obtians a session id. The Client then keeps a health check on the namenode/jobtracker and keeps sending heartbeats to the zookeeper. -If the zkfc client detects a failure of the namenode/jobtracker it removes itself from the zookeeper active/stanby election, and the other zkfc client fences this node/service and takes over the primary role. +For Hadoop, two Zookeeper clients have been built: ZKFC (Zookeeper Failover +Controller), one for the NameNode and one for JobTracker. These clients run on +the same machines as the NameNode/JobTrackers themselves. -## Hadoop HA Deployment. +When a ZKFC client is started, it establishes a connection with one of the +Zookeeper nodes and obtains a session ID. The client then keeps a health check +on the NameNode/JobTracker and sends heartbeats to the ZooKeeper. + +If the ZKFC client detects a failure of the NameNode/JobTracker, it removes +itself from the ZooKeeper active/standby election, and the other ZKFC client +fences the node/service and takes over the primary role. + + +## Hadoop HA Deployment ![Alt text](/images/hadoopha.png "Hadoop_HA") -The above diagram depicts a fully HA Hadoop Cluster with no single point of Failure and automated failover. +The above diagram depicts a fully HA Hadoop Cluster with no single point of +failure and automated failover. -## Deploying Hadoop Clusters with Ansible. +## Deploying Hadoop Clusters with Ansible -Setting up a hadoop cluster without HA itself can be a bit task and time consuming, and come HA things would be a bit more difficult to get the configurations and sequences proper and get the cluster up. +Setting up a Hadoop cluster without HA itself can be a challenging and +time-consuming task, and with HA, things become even more difficult. -Ansible can automate the whole process deploying a Hadoop Cluster with HA or without HA. (Yes, with the same playbook), and all this in matter of minutes. This can be really handy if you need to build environments frequently, or in case of disaster or node failures recovery time can be greatly reduced by automating deployments with Ansible. +Ansible can automate the whole process of deploying a Hadoop cluster with or +without HA with the same playbook, in a matter of minutes. This can be used for +quick environment rebuild, or in case of disaster or node failures, recovery +time can be greatly reduced with Ansible automation. -let's have look how these can be done using Ansible. +Let's have a look to see how this is done. +## Deploying a Hadoop cluster with HA -## Deploying a Hadoop Cluster with HA +### Prerequisites -####Pre-requesite's +These playbooks have been tested using Ansible v1.2, and CentOS 6.x (64 bit) -The Playbooks have been tested using Ansible v1.2, and Centos 6.x (64 bit) +Modify group_vars/all to choose the network interface for Hadoop communication. -Modify group_vars/all to choose the interface for hadoop communication. +Optionally you change the Hadoop-specific parameters like ports or directories +by editing group_vars/all file. -Optionally you change the hadoop specific parameter like port's or directories by editing group_vars/all file. - -Before launching the deployment playbook make sure the inventory file ( hosts ) have be setup properly, Here's a sample: +Before launching the deployment playbook make sure the inventory file (hosts) +is set up properly. Here's a sample: [hadoop_master_primary] zhadoop1 @@ -170,34 +261,45 @@ Before launching the deployment playbook make sure the inventory file ( hosts ) zhadoop2 zoo_id=2 zhadoop3 zoo_id=3 -Once the inventory is setup the Hadoop cluster can be setup using the following command +Once the inventory is set up, the Hadoop cluster can be setup using the following +command ansible-playbook -i hosts site.yml -Once deployed we can check the cluster sanity in difrent ways, to check the status of the hdfs filesystem and a report on all the datanodes login as hdfs useron any hadoop master servers, and issue the following command to get the report. +Once deployed, we can check the cluster sanity in different ways. To check the +status of the HDFS filesystem and a report on all the DataNodes, log in as the +'hdfs' user on any Hadoop master server, and issue the following command to get +the report: hadoop dfsadmin -report -To check the sanity of HA, first login as hdfs user on any hadoop master server and get the current active/standby namenode servers. +To check the sanity of HA, first log in as the 'hdfs' user on any Hadoop master +server and get the current active/standby NameNode servers this way: -bash-4.1$ hdfs haadmin -getServiceState zhadoop1 active -bash-4.1$ hdfs haadmin -getServiceState zhadoop2 standby -To get the state of the Jobtracker process login as mapred user in any hadoop master server and issue the following command: +To get the state of the JobTracker process login as the 'mapred' user on any +Hadoop master server and issue the following command: -bash-4.1$ hadoop mrhaadmin -getServiceState hadoop1 standby -bash-4.1$ hadoop mrhaadmin -getServiceState hadoop2 active -Once the active and the standby has been detected kill the namenode/jobtracker process in the server listed as active and issue the same commands as above -and you should get a result where the standby has been promoted to the active state. Later you can start the killed process and see those processes listed as the passive processes. +Once you have determined which server is active and which is standby, you can +kill the NameNode/JobTracker process on the server listed as active and issue +the same commands again, and you should see that the standby has been promoted +to the active state. Later, you can restart the killed process and see that node +listed as standby. -### Running a mapreduce job on the cluster. +### Running a MapReduce Job -To deploy the mapreduce job run the following script from any of the hadoop master nodes as user 'hdfs'. The job would count the number of occurance of the word 'hello' in the given inputfile. Eg: su - hdfs -c "/tmp/job.sh" +To deploy the mapreduce job run the following script from any of the hadoop +master nodes as user 'hdfs'. The job would count the number of occurance of the +word 'hello' in the given inputfile. Eg: su - hdfs -c "/tmp/job.sh" #!/bin/bash cat > /tmp/inputfile << EOF @@ -212,18 +314,21 @@ To deploy the mapreduce job run the following script from any of the hadoop mast hadoop jar /usr/lib/hadoop-0.20-mapreduce/hadoop-examples.jar grep /inputfile /outputfile 'hello' hadoop fs -get /outputfile /tmp/outputfile/ -to verify the result read the file on server located at /tmp/outputfile/part-00000, which should give you the count. +To verify the result, read the file on the server located at +/tmp/outputfile/part-00000, which should give you the count. -##Scale the Cluster +## Scale the Cluster -The Hadoop cluster when reaches it's maximum capacity it can be scaled by adding nodes, this can be easily accomplished by adding the node entry in the invetory file (hosts) under the hadoop_slaves group and running the following command. +When the Hadoop cluster reaches its maximum capacity, it can be scaled by +adding nodes. This can be easily accomplished by adding the node hostname to +the Ansible inventory under the hadoop_slaves group, and running the following +command: ansible-playbook -i hosts site.yml --tags=slaves -## Deploy a non HA Hadoop Cluster - -The following diagram illustrates a standalone hadoop cluster. +## Deploy a non-HA Hadoop Cluster +The following diagram illustrates a standalone Hadoop cluster. To deploy this cluster fill in the inventory file as follows: @@ -241,12 +346,14 @@ To deploy this cluster fill in the inventory file as follows: hadoop2 hadoop3 -and edit the group_vars/all file to disable HA: +Edit the group_vars/all file to disable HA: ha_enabled: False -and run the following command: +And run the following command: ansible-playbook -i hosts site.yml -The validity of the cluster can be checked by running the same mapreduce job that has documented above for an HA Hadoop Cluster +The validity of the cluster can be checked by running the same MapReduce job +that has documented above for an HA Hadoop cluster. + diff --git a/hadoop/group_vars/all b/hadoop/group_vars/all index 4c3886d..019dc13 100644 --- a/hadoop/group_vars/all +++ b/hadoop/group_vars/all @@ -1,6 +1,12 @@ -iface: eth1 +# Defaults to the first ethernet interface. Change this to: +# +# iface: eth1 +# +# ...to override. +# +iface: '{{ ansible_default_ipv4.interface }}' -ha_enabled: True +ha_enabled: False hadoop: diff --git a/hadoop/hosts b/hadoop/hosts index e87b3e2..06cc524 100644 --- a/hadoop/hosts +++ b/hadoop/hosts @@ -1,3 +1,8 @@ +[hadoop_all:children] +hadoop_masters +hadoop_slaves +qjournal_servers +zookeeper_servers [hadoop_master_primary] hadoop1 diff --git a/hadoop/roles/common/tasks/common.yml b/hadoop/roles/common/tasks/common.yml index 08a8bed..d5f089e 100644 --- a/hadoop/roles/common/tasks/common.yml +++ b/hadoop/roles/common/tasks/common.yml @@ -7,10 +7,10 @@ - name: Install the openjdk package yum: name=java-1.6.0-openjdk state=installed -- name: create a directory for java - file: state=directory path=/usr/java/ +- name: Create a directory for java + file: state=directory path=/usr/java/ -- name: create a link for java +- name: Create a link for java file: src=/usr/lib/jvm/java-1.6.0-openjdk-1.6.0.0.x86_64/jre state=link path=/usr/java/default - name: Create the hosts file for all machines @@ -19,7 +19,7 @@ - name: Disable SELinux in conf file lineinfile: dest=/etc/sysconfig/selinux regexp='^SELINUX=' line='SELINUX=disabled' state=present -- name: Disabel selinux dynamically +- name: Disable SELinux dynamically shell: creates=/etc/sysconfig/selinux.disabled setenforce 0 ; touch /etc/sysconfig/selinux.disabled - name: Create the iptables file for all machines diff --git a/hadoop/roles/hadoop_primary/tasks/main.yml b/hadoop/roles/hadoop_primary/tasks/main.yml index bfc20ca..9693414 100644 --- a/hadoop/roles/hadoop_primary/tasks/main.yml +++ b/hadoop/roles/hadoop_primary/tasks/main.yml @@ -1,5 +1,5 @@ --- -# Playbook for Hadoop master primary servers +# Playbook for Hadoop master primary servers - include: hadoop_master.yml when: ha_enabled diff --git a/hadoop/site.yml b/hadoop/site.yml index 59a075f..cbe33a9 100644 --- a/hadoop/site.yml +++ b/hadoop/site.yml @@ -1,8 +1,7 @@ --- -# The main file to delpoy the site +# The main playbook to deploy the site - -- hosts: all +- hosts: hadoop_all roles: - common