2014年5月14日星期三

Chapter 7 Tuning Hadoop Cluster for Best Performance

A Hadoop cluster is composed of many components. A systematic way of performance tuning is to tune the components bases on their contribution on the cluster performance. Most of the BigData applications are I/O bound, so is Hadoop. so, configurations that are closely related to I/O requests should be the first priority for performance tuning. For example, suboptimal configuration on data replication properties can cause a large number of data block copies over the network, which will pose a negative effect on the performance of a cluster. Similary, improper JVM configuration can cause large data swaps for intermediate data. And, unbalanced data block distribution on the DataNodes can cuase the suboptimal execution of map and reduce tasks.

Benchmarkign and profiling a Hadoop Cluster

Perform HDFS benchmarks:
  1. hadoop jar $HADOOP_HOME/hadoop-test-*.jar testfilesystem -files 10 -megaBytes 10:
    This command will generate 10 files with 10MB each for testing.
  2. Benchmark the distributed write consistency on the distributed filesystem:
    hadoop jar $HADOOP_HOME/hadoop-test-*.jar DistributedFSCheck -write -nrFiles 10 -fileSize 50
    This command will write 10(controlled by the -nrFiles option) files of 50 MB (controlled by the -fileSize option) with random content to the HDFS.
  3. Similarly, we can benchmark the distributed read consistency on the distributed filesystem:
    hadooop jar $HADOOP_HOME/hadoop-test-*.jar DistributedFSCheck -read -nrFiles 10 -fileSize 50
    This command will read 10 files with the size of 50MB from the cluster and will generate a result file.

From the output of the two(read/write consistency check) commands, we know that writing is more expensive than reading for the HDFS cluster. This is because the write operations, in turn, involve more operations, such as computing and recordign the checksums of the data blocks and many more.

Perform MapReduce clsuter benchmarks
  1. Benchmark MapReduce jobs:
    hadoop jar $HADOOP_HOME/hadoop-test-*.jar mapredtest 5 1000
    The mapredtest benchmark does a load test on the MapReduce computing framework. This benchmark is done with random integers, which are generated written to files, read read back from files, and tested with the original files.
  2. Test the reliability of the MapReduce distributing computing framework:
    hadoop jar $HADOOP_HOME/hadoop-test-*.jar MRReliabilityTest -libjars $HADOOP_HOME/hadoop-examples-*.jar
    This command will intentionally cause task and TaskTracker failures on a running job.
    The failed tasks will be re-executed on a diffferent TaksTracker. So, killing one or a few tasks should not fail a job if the cluster is resilient to failures. If a job fails after a few killed tasks, it is possibly because MapReduce is not reliable enough or not resilient to failures, and hence reliability tuning(such as by adding more computing TaskTrackers) is needed.
  3. Benchmark MapReduce to deal with a large number of small jobs:
    hadoop jar $HADOOP_HOME/hadoop-test-*.jar mrbench -numRuns 20
    mrbench executes a small job a number of times(20 in this command) and checks if these small jobs are responsive and can run effciently on the cluster.
  4. Benchmark the MapReduce load generator:
    hadoop jar $HADOOP_HOME/hadoop-test-*.jar loadgen -m 100 -r 20 -keepmap 50 -keepred 50 -indir input -outdir output
  5. Do a stress test with the NameNode:
    hadoop jar $HADOOP_HOME/hadoop-test-*.jar nnbench -create_write
  6. Test the Hadoop performance with large non-splittable files:
    hadoop jar $HADOOP_HOME/hadoop-test-*.jar testbigmapoutput -input input -output output -create 2048
  7. Test thread map spills:
    hadoop jar $HADOOP_HOME/hadoop-test-*.jar threadedmapbench
    threadedmapbench is a MapReduce benchmark that compare the performance of maps with multiple spills to maps with one spill.
Sort is a typical operation of MapReduce jobs. By sorting random data, we can peek the health of our Hadoop cluster.
Benchmark Hadoop sort
  1. Generate some random text data:
    hadoop jar $HADOOP_HOME/hadoop-examples-*.jar randomwriter random.writer.out
  2. Sort the generated random data:
    hadoop jar $HADOOP_HOME/hadoop-examples-*.jar sort random.writer.out random.writer.out.sorted
  3. Validate the MapReduce sort algorithm:
    hadoop jar $HADOOOP_HOME/hadoop-test-*.jar tsetmapredsort -m 50 -r 5 -sortInput random.writer.out -sortOutput random.writer.out.sorted
    This command will validate the accuracy of the sort algorithm.

Analyzign job history with Rumen

Current Rumen implementation includes two components: TraceBuilder and foler. The TraceBuilder takes job history as input and generates easily parsed json files. The foler is a utility to manipulate on input traces, and, most of the time, it is used to scale the summarized job traces from the TraceBuilder.
  1. Use the TrackeBuilder to extract the Gold Trace from the Hadoop job history files. The syntax of the command:
    hadoop org.apche.hadoop.tools.rumen.TraceBuilder [options]
    hadoop org.apache.hadoop.tools.rumen.TraceBuilder -recursive file:///tmp/jobtraces.json file:///tmp/topology.out file:///usr/local/hadoop/logs/history/done
  2. The second step of using Rumen is to scale the data generated from the previous step:
    hadoop org.apche.hadoop.tools.rumen.Folder [options] [input] [output]
    For example, to scale the runtime of the job trace generated in the previous step to 50 minutes:
    hadoop org.apache.hadoop.tools.rumen.Folder -output-duration 50m -input-cycle 20m file:///tmp/jobtraces.json file:///tmp/job-scaled-50min.json
PS: 在 hadoop 1.2.1 下测试 Rumin 时,总报找不到 rumin 库的错误,看完下面 Gridmix3 运行前提条件后,把 hadoop-tools-*.jar 复制一份到 $HADOOP_HOME/lib 后再测试,果然就可以了。

Benchmarking a Hadoop cluster with GridMix

GridMix is a tool for benchmarking Hadoop clusters. It generates a number of synthetic MapReduce jobs and builds a model based on the performance of these jobs. Resource profiles of the cluster are modeled based on the job execution metrics. ssh hduser@master
  1. cd $HADOOP_HOME/src/benchmark/gridmix2
  2. ant
  3. cp build/gridmix.jar
  4. Open the gridmix-env-2 file:
    export HADOOP_VERSION=hadoop-1.1.2
    export HADOOP_HOME=/usr/local/hadoop
    export HADOOP_CONF_DIR=$HADOOP_HOME/conf
    export USE_REAL_DATASET=FALSE
    
    export APP_JAR=${HADOOP_hOME}/hadoop-test-*.jar
    export EXAMPLE_JAR=${HADOOP_HOME}/hadoop-examples-*.jar
    export STREAMING_JAR=${HADOOP_HOME}/contrib/streaming/hadoop-streaming-*.jar
    
  5. open the gridmix_config.xml GridMix2 configuration file and change the benchmark configuration by changing the properties for the benchmark.
  6. 
     streamSort.smallJobs.numOfJobs
     10,5
    
    
    
     streamSort.smallJobs.numOfReduces
     6,3
    
    
    These two properties specify that we will use 10 small stream sort jobs with 6 reducers, and 5 small stream sort jobs with 3 reducers.
  7. Make the generateGridmix2Data.sh script executable:
    chmod +x generateGridmix2Data.sh
  8. Generate data with the following command:
    ./generateGridmix2Data.sh
    This command will generate data on HDFS. By default the generated data will be compressed with a block compression ratio of 4. Three jobs will be started.
  9. chmod +x rungridmix-2
  10. ./rungridmix_2
GridMix is a benchmark for Hadoop clusters. It is generally used to model the performance profile of a Hadoop cluster by running a number of jobs.
The data required by GridMix is generated by running the generatedGridmix2data.sh script. We can configure this file to change, for example, the size of the generated adta file. Then , when executing the rungridmix_2 script, a number of jobs will be genearted and submitted in batch mode. In the end, the running time of these jobs will be computed. GridMix2 is shipped with the following representative jobs: streamSort, javaSort, webdataSort, combiner, and monsterSort. These jobs can be classfied int to the following categories:
  • A three-stage MapReduce job, which is motivated by the multistage or pipelined MapReduce jobs.
  • Large sort of variable key/value sizes, which is motivated by the processing of large datasets.
  • Reference select jobs, which is motivated by jobs that sample from a large, reference data set.
  • API text sort jobs, which is motivated by the application of MapReduce APIs for sorting.
A GridMix benchmark is a mix of a number of small, medium, and large jobs from different categories. We can specify the mix in the gridmix_config.xml file. Based on the specification, a number of jobs will be created and submitted to the Hadoop cluster until it finishes.
Benchmarking Hadoop cluster with GridMix1
  1. cd $HADOOP_HOME/src/benchmarks/gridmix
  2. Change the configuration in the gridmix-env file:
    export HADOOP_HOME=/usr/local/hadoop
    export GRID_MIX_HOME=$HADOOP_HOME/src/benchmarks/gridmix
    export APP_JAR=${HADOOP_HOME}/hadoop-test-*.jar
    export EXAMPLE_JAR=${HADOOP_HOME}/hadoop-examples-*.jar
    export STREAMING_JAR=${HADOOP_HOME}/contrib/streaming/hadoop-streaming-*.jar
    export GRID_MIX_DATA=/gridmix1/data
    export GRID_MIX_PROG=/gridmix1/programs
    
    The last two environment variables GRID_MIX_DATA and GRID_MIX_PROG specify two directories on HDFS. So, the generated data and programs will be on HDFS.
  3. chmod +x generateData.sh
  4. sh ./generateData.sh
  5. Run a small javasort job:
    sh javasort/text-sort.small
Benchmarking Hadoop cluster with GridMix3
  1. Copy the required JAR files to the Hadoop lib directory:
    cp $HADOOP_HOME/hadoop-tools-*.jar $HADOOP_HOME/lib
    cp $HADOOP_HOME/contrib/gridmix/hadoop-gridmix-*.jar $HADOOP_hOME/lib
    The hadoop-tools-*.jar file contains tools such as Rumen, which is needed by GridMix3. And the hadoop-gridmix-*.jar file contains the GridMix3 benchmark tool. In addition, the GridMix3 job mix for a Hadoop cluster is typically described with a job trace file, which is generated from job configuration files using Rumen.
  2. Use Rumen to generate a job trace file:
    hadoop org.apache.hadoop.tools.rumen.TraceBuilder -recursive file:///tmp/jobtrace.son file:///tmp/topology.out file:///usr/local/hadoop/logs/history/done
  3. Run the GridMix3 benchmark:
    hadoop org.apche.hadoop.mapred.gridmix.Gridmix -generated 100m gridmixdata /tmp/jobtrace.json
  4. To acquire the usage and available parameters for GridMix3:
    hadoop org.apche.hadoop.mapred.gridmix.Gridmix

Using Hadoop Vaidya to identify performance problems

Hadoop Vaidya is an open source, rule-based performace diagnostic framework for Apache Hadoop. Each rule can identify a specific performance problem. For example, Hadoop cluster administarators can use Vaida to identify slwo progressing jobs that are wasting cluster resources. Hadoop clusters can use Vaidya to identify configuration mistakes for their submitted jobs.
Perform the followings to use Hadoop Vaidya on master:
  1. Locate the directory for the job configuration file you want to analyze. The default location of this folder is $HADOOP_HOME/logs
  2. Locate the job history files under the job history directory:
    find $HADOOP_HOME/logs -name 'job_201304012330_0001*'
  3. Use Vaidya to analyze the job trace files:
    sh $HADOOP_HOME/contrib/vaidya/bin/vaidya.sh -jobconf file:///use/local/hadoop/logs/history/job_20130412330_0002_conf.xml -joblog file:///usr/local/hadoop/logs/history/job_20130412330_002_1364874504561_hduer_TeraGen -report report.txt

Hibench is a benchmarking suite for Hadoop. It has nine typical workloads, including mircro, HFS, web search machine learning, and data analytics benchmarks. For example, it supports benchmarks for Nutch(a text indexing software package), PageRank(the PageRank algorithm), the Mahout machine learning argorithms and Hive queries.

Balancing data blocks for a Hadoop cluster

HDFS stores data blocks on DataNode machines. When Hadoop processes jobs, data is generated and deleted. Over time, some DataNodes can host much more data blocks than others. This unbalanced distribution of data on the cluster is called data skew.
Data skew is a big problem for a Hadoop cluster. We know that when the JobTracker assigns tasks to TaskTrackers, it follows the general rule of being data local, which means the map tasks will be assigned to those hosts where data blocks reside in. If the data block storage distribution is skewed, or in other words, the data blocks locate only on a small percentage of DataNodes, only those nodes with data blocks can follow the data local rule. Also, if JobTracker assigns tasks to other nodes that do not have data hosted locally, the data needs to be transferred from remote machines to the TaskTracker machine. The data transfer will cost a large amount of network bandwidth, downgrading the overall performance of the cluster.
Balance HDFS data blocks with balancer:
  1. Check the data skew through the web UI.
  2. Use the following command to balance the data blocks on the DataNode machines:
    hadoop balancer -threshold 0.2
    This command will take some time to finish depending on the status of the distributioned filesystem as well as the value for the --threshold option. The -threshold option specifies the threshold for whether the cluster is balanced. It is a real number within ragnge [0, 1] with a default value of 0.1. A smaller value for this option leads to more even distribution of the data blocks. On the other hand, it will require more time to finish. Setting this option to be 0 is not recommended because it is not practical to achieve an ideal balance.
  3. Alternatively, we can start the Hadoop balancer daemon to automatically balance the data blocks on HDFS.
    start-balancer.sh
    The balancer will move data blocks among the DataNodes according to the space utilization.
    PS:
    start-balancer.sh 可以执行 -threshold 参数。 -threshold 参数是指定平衡的阈值,默认是 10, 即每个 DataNode 节点的实际 HDFS 存储使用量/集群 HDFS 存储量 。
    eg: DataNode HDFS 使用量1.2G; 集群总 hdfs存储量 10T 即 10000G; 则t值为1.2/10000 = 0.00012; 当执行 balance 的 -threshold 参数小于 0.00012 时,集群进行balance;所以命令可为:start-balancer.sh -threshold 0.0001
    start-balancer.sh 命令可在 NameNode 或 DataNode 启动; 可以随时停止 stop-balancer.sh。 默认带宽是 1M/s。

    The Hadoop balancer balances data blocks on HDFS according to a preconfigured threshold value, which sets the target for whether the cluster is balanced or not. A node id considered balanced if the difference between space utilization of the node and space utilization of the cluster is less than the threshold.

    Sometimes, we want to limit the percentage of bandwidth used by the balancer. By default, Hadoop defines a property dfs.balance.bandwidthPerSec, which determines the maximum speed that a data block will be moved from one DataNode to another. Its default value is 1MB/s. By configuring this property to be a higher value, the balancing speed will be faster, but more resources will be used. For example, to change the value of this property to be 10MB/s, we can change this in the $HADOOP_HOME/conf/hdfs-site.xml.

Choosing a Proper Block Size

HDFS stores data as data blocks distributed on multiple machines. So, when a large file is put onto HDFS, it will first be split into a number of data blocks. These data blocks are then distributed by the NameNode to the DataNodes in the cluster. The granularity of the data blocks can affect the distribution and parallel execution of the tasks.
Based on the property of the jobs being executed, one block size might result in better performance than others.
Configure the proper HDFS block size:
  1. Run a typical job on the configured cluster.
    hadoop jar $HADOOP_HOME/hadoop-examples-*.jar terasort input output
  2. Use Rumen to generate job traces from the job history file and the job logfile:
    hadoop org.apache.hadoop.tools.rumen.TraceBuilder file:///tmp/jobtraces.json file:///tmp/topology.out file:///usr/local/hadoop/logs/history/done/job_201304012206_0002_conf.xml file:///usr/local/hadoop/logs/history/done/job_201304012206_0002_log
  3. Use GridMix3 to generate a Hadoop cluster benchmark with different block size:
    hadoop org.apche.hadoop.mapred.gridmix.GridMix -generate 10m input jobtraces.json
  4. stop-all.sh
  5. Change the dfs.block.size in the $HADOOP_hOME/conf/hdfs-site.xml.
  6. stop-all.sh

Using Compression for input and output

A typical MapReduce job uses parallel mapper tasks to load data from external storage devices, such as hard drives to the main memory. When a job finished, the reduce tasks write the result data back to the hard drive. In this way, during the life cycle of a MapReduce job, many data copies are creted when dat is relayed between the hard drive and the main memroy. Sometimes, the data is copied over the network from a remote node.
Copying data from and to hard drives and transfers over the network are expensive operations. To reeuce the cost of these operations, Hadoop introduced compression on the data.
Data compression in Hadoop is done by a compression codec, which is a program that encodes and decodes data streams. Although compression and decompression can cause addditional cost to the system, the advantages far outweigh the disadvantages.
Configure input and output data compression in the $HADOOP_HOME/conf/mapred-site.xml on the Hadoop master machine.
  1. stop-all.sh
  2. Enable output compression:
    
     mapred.output.compress
     true
    
    
  3. Specify the output compression codec:
    
     mapred.output.compression.codec
     org.apache.hadoop.io.compress.GzipCodec
     
    
    
  4. Change the ouptut compression type for sequence file output:
    
     mapred.output.compression.type
     BLOCK
     
    
    
  5. Configure teh map output compression:
    
     mapred.compress.map.output
     true
     
    
    
  6. Similar to the codec configuration for the MapReduce job output, we can perform confiugration compression codecs for the map task output, the default of which is org.apche.hadoop.io.compress.DefaultCodec.
    
     mapred.map.output.compression.codec
     org.apache.hadoop.io.compress.GzipCodec
    
    
  7. Copy the configuration file from the master node to all the slave nodes:
    for host in `cat $HADOOP_HOME/conf/slaves`
    do
     echo 'Copying map-site.xml file to host:' $host
     scp $HADOOP_HOME/conf/mapred-site.xml $host:$HADOOP_HOME/conf/
    done
    
  8. start-all.sh
Available compression codecs :
Codec name Java class
DefaultCodec org.apache.hadoop.io.compress.DefaultCodec
GzipCodec org.apache.hadoop.io.compress.GzipCodec
BZip2Codec org.apache.hadoop.io.compress.BZip2Codec
SnappyCodec org.apache.hadoop.io.compress.SnappyCodec
LzoCodec org.apache.hadoop.io.compress.LzoCodec

Configuring Speculative Execution

Speculative Execution is a proactive performance boosting strategy used by JobTracker to execute one task on two TaskTracker instances. When either of these tasks finishes, the other tasks will be killed. By default, speculative execution is on.
Speculative execution can be helpful to improve the performance of MapReduce jobs by reducing the execution time for slowly progressing tasks. For example, on heterogeneous Hadoop cluster with different hardware configurations, low performance computing nodes can greatly prolong the execution time of a MapReduce job. Speculative execution can remedy this problem by prioritizing the high performance nodes for MapReduce tasks execution. Hence, the MapReduce execution time can be shortened.
On the other hand, speculative execution can negatively affect the performance of the cluster when a lot of resources are used for speculative execution. For example, many tasks will have to wait for slots that are used for speculative execution.
Configure Hadoop speculative execution in the $HADOOOP_HOME/conf/mapred-site.xml:
  1. stop-mared.sh
  2. Disable the map task speculative execution :
    
     mapred.map.tasks.speculative.exeution
     false
     
    
    
  3. Disable the reduce task speculative execution:
    
     mapred.reduce.tasks.speculative.execution
     false
    
    
  4. Configuring the maximun percentage of concurrently running speculative tasks:
    
     mapreduce.job.speculative.speculativecap
     0.2
    
    
  5. Configuring the job speculative execution threshold for slow tasks:
    
     mapreduce.job.speculative.slowtaskthreshold
     1.0
    
    
  6. Configure the threshold for a TaskTracker to speculatively execute slow tasks:
    
     mapreduce.job.speculative.slownodethreshold
     1.0
    
    
  7. Sync the configuration to the slaves.
  8. start-mapred.sh
If speculative execution has been enabled for a Hadoop cluster, we can still disable it for specific jobs. For example, when we write MapReduce jobs using Java programming lanaguage, we can use the following code snippet to disable speculative execution for this job:
Configuration conf = new Configuration();
conf.set("mapred.map.tasks.speculative.execution", "false");
conf.set("mapred.reduce.tasks.speculative.execution", "false");
The three properties mapreduce.job.speculative.speculativecap, mapreduce.job.speculative.slowtaskthreshold, and mapreduce.job.speculative.slownodethreshold control when the JobTracker should start a speculative task. Specifically, a speculative task for a regular task will start when the following conditions are met:
  • Speculative execution is enabled.
  • The completion rate, in percentage, of the regular task is less than mapreduce.job.speculative.slowtaskthreshold times the mean completion rate of all other tasks.
  • The completion rate, in percentage, of the regular task is less than mapreduce.job.speculative.slownodethreshold times the mean completion rate of all other tasks on the current TaskTracker.
  • The number of launched speculative tasks is smaller than the configured speculative map.

Setting proper number of map and reduce slots for the TaskTracker

The number of map and reduce slots determines the number of concurrent map/reduce tasks for a TaskTracker, which forks multiple JVMs to run these tasks.

Configure map/reduce slots for a TaskTracker:
  1. stop-mapred.sh
  2. Configrue the map slots in the $HADOOP_HOME/conf/mapred-site.xml:
    
     mapred.tasktracker.map.tasks.maximum
     4
    
    
  3. Similarly, configure the number of reduce slots for a TaskTracker:
    
     mapred.tasktracker.reduce.tasks.maximum
     4
    
    
  4. Configure the memory usage for each slot:
    
     mapred.child.java.opts
     -Xmx1024m
    
    
  5. Sync the configuration to all the slave nodes
  6. start-mapred.sh

Tuning the JobTracker Configuration

Configure the JobTracker in the $HADOOP_HOME/conf/mapred-site.xml:
  1. stop-mapred.sh
  2. Configure the maximum number of tasks for a job:
    
     mapred.jobtracker.maxtasks.per.jos
     3000
     
    
    
  3. Configure the JobTracker to recover upon restart:
    
     mapred.jobtracker.restart.recover
     true
     
    
    
  4. Configure the block size for the job history file
    
     mapred.jobtracker.job.history.block.size
     3145728
    
    
  5. Configure the task scheduler for the JobTracker:
    
     mapred.jobtracker.taskScheduler
     org.apache.hadoop.mapred.JobQueueTaskScheduler
     
    
    
  6. Configure the maximum running tasks for a job:
    
     mapred.jobtracker.taskScheduler.maxRunningTasksPerJob
     20
     
    
    

Tuning the TaskTracker Configuration

Configure the TaskTracker property in the $HADOOP_HOME/conf/mapred-site.xml:
  1. mapred-stop.sh
  2. Configure the MapReduce cluster heartbeat interval:
    
     mapred.tasktracker.expiry.interval
     600000
     
     
    
  3. Configure the sleep time before sending the SIGKILL signal
    
     mapred.tasktracker.tasks.sleeptime-before-sigkill
     6000
     
    
    
  4. Enable the TaskTracker memory management:
    
     mapred.tasktracker.tasks.maxmemory
     true
    
    
  5. Configure the TaskTracker index cache size to be 20MB:
    
     mapred.tasktracker.indexcache.mb
     20
     
    
    
  6. Configure the monitoring interval for the TaskTracker's task memory manager:
    
     mapred.tasktracker.taskmemorymanager.monitoring-interval
     5000
     
    
    
  7. Configure the TaskTracker to send an out-of-band heartbeat on task completion:
    
     mapreduce.tasktracker.outofband.heartbeat
     true
     
    
    
  8. Confiugre the maximum number of retries for a map task:
    
     mapred.map.max.attempts
     4
     
    
    
  9. Configure the maximum number of retries for a failed reduce task:
    
     mapred.reduce.max.attempts
     4
     
     
    
  10. Sync the configuration from the master node to all the slave nodes.
  11. start-mapred.sh

Tuning shuffle, merge, and sort parameters

In a MapReduce job, map tasks outputs are aggregated into JVM buffers. The size of the in-memory buffer determines how large the data can be merged and sorted at once. Too small a buffer size can cause a large number of swap operations, incurring big overhead.

Configure the sorting parameters in the $HADOOP_HOME/conf/mapred-site.xml:
  1. stop-mapred.sh
  2. Configure the buffer size, in megabytes, for sorting by changing the property:
    
     io.sort.mb
     100
     
    
    
  3. Configure the merger factor
    
     io.sort.factor
     100
     
    
    
  4. Change the percentage of buffer dedicated for record collection:
    
     io.sort.record.percent
     0.05
     
    
    
  5. Change the spill factor for buffers:
    
     io.sort.spill.percent
     0.8
     
    
    
  6. Configure the in-memory threshold:
    
     mapred.inmem.merge.threshold
     1000
     
    
    
  7. Configure the percentage of memory to be allocated from the maximum heap size to storing map outputs during the shuffle:
    
     mapred.job.shuffle.buffer.percent
     0.70
     
    
    
  8. Configure the threshold to start the in-memory merge:
    
     mapred.job.shuffle.merge.percent
     0.66
     
    
    
  9. Configure the percentage of memeory of retain map outputs during the reduce phase:
    
     mapred.job.reduce.input.buffer.percent
     0.0
     
    
    
  10. Configure the maximum retries in case of fetch failures:
    
     mapreduce.reduce.shuffle.maxfetchfailures
     10
     
    
    
  11. Sync the configuration from the master node to all the slave nodes.
  12. start-mapred.sh

Configure memory for a Hadoop cluster

Configure memory properties for a Hadoop Cluster in the $HADOOP_HOME/conf/mapred-site.xml:
  1. stop-mapred.sh
  2. Configure the virtual memory size, in megabytes, for a map task used by a scheduler:
    
     mapred.cluster.map.memory.mb
     200
     
    
    
  3. Similarly, we can configure the virtual memory size, in megabytes, for a reduce task used by a scheduler:
    
     mapred.cluster.reduce.memory.mb
     512
    
    
  4. Configure the maximum virtual memory size for a map task used by a scheduler:
    
     mapred.cluster.max.map.memory.mb
     512
     
    
    
  5. Configure the maximum virtual memory size for a reducer task used by a scheduler:
    
     mapred.cluster.max.reduce.memory.mb
     512
    
    
  6. Configure the maximum virtual memroy size for a single map task for the job by a scheduler:
    
     mapred.job.map.memory.mb
     0.8
     
    
    
  7. Configure the maximum virtual memory size for a single reduce task for the job used by a scheduler:
    
     mapred.job.reduce.momory.mb
     0.8
    
    
  8. Sync the configuration from the master to all the slave nodes.
  9. start-mapred.sh

Setting proper number of parallel copies

When all or part of the map tasks finish, map outputs will be copied from the map task nodes to the reduce task nodes. The parallel copying strategy is used to increase the transfer throughput.

Configure the number of parallel copies in teh $HADOOP_HOME/conf/mapred-site.xml:
  1. stop-mapred.sh
  2. 
     mapred.reduce.parallel.copies
     20
     
    
    
  3. Sync the configuration file from master node to slave nodes.
  4. start-mapred.sh

Tuning JVM parameters

Configure JVM parameters in the $HADOOP_HOME/conf/mapred-site.xml:
  1. stop-all.sh
  2. 
     mapred.child.java.opts
     -Xmx512M
     
    
    
  3. Sync the configuraiton file from master node to slave nodes.
  4. start-all.sh

Configuring JVAM Reuse

MapReduce tasks are executed by JVM processes/threads, which are forked by the TaskTracker. The creation of a JVM, which include the initialization of execuation environmnet, is costly, especially when the number of tasks is large. In the default configuration, the number of JVMs needed to finish a job should be equal to the number of the tasks. In other words, the default setting uses one JVM to execute one task. When the execution of a task completes, its JVM will be killed by the TaskTracker.

JVM Reuse is an optimization of reusing JVMs for multiple tasks. If it is enabled, multiple tasks can be executed sequentially with one JVM.

Configure JVM Reuse in the $HADOOP_HOME/conf/mapred-site.xml:
  1. stop-mapred.sh
  2. 
     mapred.job.reuse.jvm.num.tasks
     2
     
    
    
  3. Sync the configuration file from master node to slave nodes.
  4. start-mapred.sh

Configuring the reducer intialization time

Reduce tasks can be startd when a certain percentage of map tasks has been finished. By setting this property with a smaller with a smaller number, the reduce tasks will start earlier, occupying the computing slots. Once the other hand, if the number is set too large, for example, very close to 1, the reduce tasks will have to wait for the majority of the map tasks to finish, prolonging the job execution time.

Configure reducer initialization time in the $HADOOP_HOME/conf/mapred-site.xml:
  1. stop-mapred.sh
  2. 
     mapred.reduce.slowstart.completed.maps
     0.05
    
    
  3. Sync the configuration file to all the slave nodes.
  4. start-mapred.sh

1 条评论 :

  1. Seems more research has been done to create this blog as the information is very good on this blog. To this I also attending hadoop online training, which is adding to my knowledge more.

    回复删除