2014年7月31日星期四

Hadoop 运行中集群删除节点

core-site.xml 中增加配置:
  
     dfs.hosts.exclude
     excludes
  

excludes 是文件,位于 Hadoop 的安装目录下,用其列出需要删除的节点的 hostname 或 ip:port(推荐 hostname)。

运行命令:
$ hadoop dfsadmin -refreshNodes 
$ hadoop dfsadmin -report

hadoop dfsadmin -report 执行后可看到 excludes 中列出的节点的 Decommission Status 为: Decommission In Progress 或者该节点已消失(当时间足够长时)。

登录从集群中删除的节点,运行:
$ hadoop-daemon.sh stop datanode
$ hadoop-daemon.sh stop tasktracker


PS: 按正常理解, 有关 dfs. 开头的配置应该都是在 hdfs-site.xml 文件中配置,但这里并不是。另外, excludes 若位于 conf 目录下,dfs.hosts.exclude 指定其绝对路径,竟不管用。这都是坑啊。

2014年7月29日星期二

MongoDB cursor timeout

PHP 的 MongoDB 扩展里,设置查询超时:
MongoCursor::$timeout = -1;


之前,一直以为 find 之后的 cursor 设置 timeout 为 -1 后,应该就表示查询永不超时,但在循环结果集或是 count 时还是报 cursor timed out (timeout: 30000, time left: 0:0, status: 0)。参照 记一次MongoDB性能问题 找问题原因,也没解决。直到看了 PHP Mongo's driver has no effects on Mongo Cursor timeout 这里才知道,设置超时需要按上面来设置。

附火丁笔记里提到的查找方法:
使用 strace 跟踪了一下PHP脚本,发现进程卡在了recvfrom操作上:
shell> strace -f -r -p 
recvfrom(,
通过如下命令查询recvfrom操作的含义:
shell> apropos recvfrom
receive a message from a socket
或者按照下面的方式确认一下:
shell> lsof -p 
shell> ls -l /proc//fd/

2014年7月17日星期四

libboost_python-py27.so.1.53.o No such file or directory

系统升级后,就再也没跑过之前访问 Hadoop 的 Python 脚本,直到前天因测试 AWS 的 EMR, 才又捡起之前的代码。可一跑,问题也随之而来:报找不到库。
libboost_python-py27.so.1.53.o No such file or directory

sudo apt-get install libboost-all-dev 后,再执行,错误依旧存在。
后想可能版本问题,找到 libboost-python27* ,创建一个到 python-py27.so.1.53.o 的软链接应该也可以解决问题。但找遍了 /usr/lib, /usr/local/lib,连 libboost 开头的 so 也没一个。dpkg -S libboost-python 出来的是一堆 /usr/share 下的文档, .so 还是没影。

Google,找到这个Installing POV-Ray: configuration error,里面提到 apt-get install 后, libboost* 的库位于 /usr/lib/x86_64-linux-gnu/ 目录下。我的系统是 32 位, 尝试ls -al /usr/lib/i386-linux-gnu/libboost*后,果真 libboost 库在这目录下。
把 /usr/lib/i386-linux-gnu 目录 加入引入库的配置文件。 ldconfig 后,执行,顺利通过。

果然,系统升级还是有风险。

2014年5月22日星期四

从Redis的数据丢失说起

FROM : 从Redis 的数据丢失说起

碰到一个悲催的事情:一台Redis服务器,4核,16G内存且没有任何硬件上的问题。持续高压运行了大约3个月,保存了大约14G的数据,设置了比较完备的Save参数。而就是这台主机,在一次重起之后,丢失了大量的数据,14G的数据最终只恢复了几百兆而已。

正常情况下,像Redis这样定期回写磁盘的内存数据库,丢失几个数据也是在情理之中,可超过80%数据丢失率实在太离谱。排除了误操作的可能性之后,开始寻找原因。

重启动时的日志:
[26641] 21 Dec 09:46:34 * Slave ask for synchronization
[26641] 21 Dec 09:46:34 * Starting BGSAVE for SYNC
[26641] 21 Dec 09:46:34 # Can’t save in background: fork: Cannot allocate memory
[26641] 21 Dec 09:46:34 * Replication failed, can’t BGSAVE
[26641] 21 Dec 09:46:34 # Received SIGTERM, scheduling shutdown…
[26641] 21 Dec 09:46:34 # User requested shutdown…
很明显的一个问题,系统不能在后台保存,fork进程失败。

翻查了几个月的日志,发觉系统在频繁报错:
[26641] 18 Dec 04:02:14 * 1 changes in 900 seconds. Saving…
[26641] 18 Dec 04:02:14 # Can’t save in background: fork: Cannot allocate memory
系统不能在后台保存,fork 进程时无法指定内存。

对源码进行跟踪,在src/rdb.c中定位了这个报错:
int rdbSaveBackground(char *filename) {
    pid_t childpid;
    long long start;

    if (server.bgsavechildpid != -1) return REDIS_ERR;
    if (server.vm_enabled) waitEmptyIOJobsQueue();
    server.dirty_before_bgsave = server.dirty;
    start = ustime();
    if ((childpid = fork()) == 0) {
        /* Child */
        if (server.vm_enabled) vmReopenSwapFile();
        if (server.ipfd > 0) close(server.ipfd);
        if (server.sofd > 0) close(server.sofd);
        if (rdbSave(filename) == REDIS_OK) {
            _exit(0);
        } else {
            _exit(1);
        }
    } else {
        /* Parent */
        server.stat_fork_time = ustime()-start;
        if (childpid == -1) {
            redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
                strerror(errno));
            return REDIS_ERR;
        }
        redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
        server.bgsavechildpid = childpid;
        updateDictResizePolicy();
        return REDIS_OK;
    }
    return REDIS_OK; /* unreached */
}

数据丢失的问题总算搞清楚了!
Redis的数据回写机制分同步和异步两种,
  • 同步回写 即 SAVE 命令,主进程直接向磁盘回写数据。在数据大的情况下会导致系统假死很长时间,所以一般不是推荐的。
  • 异步回写即 BGSAVE 命令,主进程 fork 后,复制自身并通过这个新的进程回写磁盘,回写结束后新进程自行关闭。由于这样做不需要主进程阻塞,系统不会假死,一般默认会采用这个方法。

个人感觉方法2采用fork主进程的方式很拙劣,但似乎是唯一的方法。内存中的热数据随时可能修改,要在磁盘上保存某个时间的内存镜像必须要冻结。冻结就会导致假死。fork一个新的进程之后等于复制了当时的一个内存镜像,这样主进程上就不需要冻结,只要子进程上操作就可以了。

在小内存的进程上做一个fork,不需要太多资源,但当这个进程的内存空间以G为单位时,fork就成为一件很恐怖的操作。何况在16G内存的主机上fork 14G内存的进程呢?肯定会报内存无法分配的。更可气的是,越是改动频繁的主机上fork也越频繁,fork操作本身的代价恐怕也不会比假死好多少。

找到原因之后,直接修改 /etc/sysctl.conf 中内核参数 vm.overcommit_memory = 1
Linux 内核会根据参数vm.overcommit_memory参数的设置决定是否放行。
  • vm.overcommit_memory = 1: 直接放行
  • vm.overcommit_memory = 0:则比较此次请求分配的虚拟内存大小和系统当前空闲的物理内存加上swap,决定是否放行。
  • vm.overcommit_memory = 2:则会比较 进程所有已分配的虚拟内存加上此次请求分配的虚拟内存和系统当前的空闲物理内存加上swap,决定是否放行。


以上是转载。

昨天用做缓存的 Redis 又给挂了。到今早上检查业务时才发现。看日志,所有提示都是:Can't save in background: fork: Cannot allocate memory
[10337] 22 May 14:28:40.307 * 1 changes in 900 seconds. Saving... 而此篇解说很详细,所以就将此转载过来。

之前想服务器上 32 G 内存,缓存的数据也就 十几个 G,应该不至于到内存不够。也就没有将 vm.overcommmit_memory 设置为 1。没想到 fork 这茬一来,再多大的内存也不够用。这次算是深切体会到作者在 这里 说设置 vm.overcommit_memory=1 和大 swap 的作用了。再还有,切忌自以为是。

2014年5月20日星期二

Chapter 2 - Storing Data in Hadoop

HDFS

HDFS is implemented as a block-structured filesystem. individual fiels are broken into blocks of a fixed size, which are stored across an Hadoop cluster. A file can be mand up of several blocks, which are stroed on different DataNode(indiidual machine in the cluster) chosen randomly on a block-by-block basis. As a result, access to a file usually requires access to multiple DataNodes, which means that HDFS supports file sizes far larger than a single-machine disk capacity.

The DataNode stores each HDFS data block in a separate file on its local filesystem with no knowledge about the HDFS files themselves. To improve throughput ever futher, the DataNode does not create all files in the same directory. In stead, it uses heuristics to determine the optimal number of files per directory, and created subdirectories appropriately.

One of the requirements for such a block-structured filesystem is the capacity to store, manage, and access file metadata(information about files and blocks) reliably, and to provide fast access to the metadata store. Unlike HDFS files themselves(which are accessed in a write-once and read-many model), the metadata structures can be modified by a large number of clients concurrently. It is important that this information never gets out of sync. HDFS solves this problem by introducing a dedicated special machine, called the NameNode, which stores all the metadata for the filesystem across the cluster. This means that HDFS implements a master/slave architecture. A single NameNode(which is a master server) manages the filesystem namespace and regulates access to files by clients. The existence of a single master in a cluster greatly simplifies the architecture of the system. The NameNode serves as a single arbitrator and respository for all HDFS matadata.

Metadata storage is also perssitent. The entire filesystem namespace(including the mapping of blocks to files and filesystem properties) is contained in a file called FsImage stored as a file in the NameNode's local filesystem. The NameNode also uses a transaction log to persistently record every change that occurs in filesystem metadata(metadata store). This log is stored in the edits log file on the NameNode's local filesystem.

The SecondaryNameNode is not a "backup NameNode". It cannot take over the primary NameNode's function. It serves as a checkpointing mechanism for the primary NameNode. In addition to storing the state of the HDFS NameNode, it maintains two on-disk structures that persist the current filesystem state: an image file and an edit log. The image file represents an HDFS metadata state at a point in time, and the edit log is a transactional log(compare to a log in a database architecture) of every filesystem metadata change since the image file was created.

During the NameNode (re)starts, the current state is recontstructetd by reading teh image file and then replaying the edit log. Obviousely, the larger the edit log is, the longer is takes to replay it and consequently start a NameNode. To improve NameNode startup performance, an edit log is priodically rolled, and a new image file is created by applying an edit log to the existing image. This operation can be fairly resource-intensive. To minimize the impact of checkpoint creation and the NameNode functioning, checkpointing is performed by the SecondaryNameNode daemon, often on a separate machine.

As a result of checkpointing, the Secondary NameNode holds a copy (out-of-date) of the primary's persistent state in the form of last image file. In the cases when and edit file is kept relatively small, a secondary node can be used to recover the filesystem's state. In this case, you must be aware of a certain amount of metadata (and corresponding data) loss, because the latest changes stored in the edit log are not available.

To keep the memory of footprint of the NameNode manageable, the default size of an HDFS block is 64MB -- orders of magnitude larger than the block size of the majority of most other block-structured filesystem. The additional benefit of the larger data block is that it allows HDFS to keep large amounts of data stored on the disk sequentially, which supports fast streaming reads of data.

One of the misconceptions about Hadoop is the assumption that smaller blocks(less then the block size)will still use the whole block on the filesystem. This is not the case. The smaller blocks occupy exactly as much disk space as they require.

But this does not mean that having many small files will use HDFS efficiently. Regardless of the block size, its metadata occupies exactly the same amount of memroy in the NameNode. As a result, a large number of small HDFS files(smaller than the block size) will use a lot of the NameNode's memory, thus negatively impacting HDFS scalability and performance.

The downside of HDFS fiel organization is that several DataNodes are involved in the serving of a file, which means that a file can become unavailable in the case any one of those machines is lost. To avoid this problem, HDFS replicates each block across a number of machines(three, by default).

Data replication in HDFS is implemented as part of a write operation in the form of a data pipeline. When a client is writing data to an HDFS file, this data is first written to a local file. When the local file accumulates a full block of data, the client consults the NameNode to get a list of DataNodes that assigned to host replicas of that block. The client then writes the data block from its local storage to the first DataNode in 4K portions. The DataNode stores the received blocks in a local filesystem, and forwards that portion of data to the next DataNode in the list. The same operation is repeated by the next receiving DataNode until the last node in the replica set receive data. This DataNode stores data locally without sending it any further.

If one of the DataNodes fails while block is being written, it is removed from the pipeline. In this case, when the write operation on the current block completes, the NameNode re-replicates it to make up for the missing replica caused by the failed DataNode. When a file is closed, the remaining data in the temporary local file is piplelined to the DataNodes. The client then informs the NameNode that the file is closed. At this point, the NameNode commits the file creation operation into a persistent store. If the NameNode dies before the file is closed, the file is lost.

The default block size and replication factor are specified by Hadoop configuration, but can be overwritten on a per-file basis. An application can specify block size, the number of replicas, and the replication factor for a specific file at its creation time.

One of the most powerful features of HDFS is optimization of replica placement, which is crucial to HDFS reliability and performance. All decisions regarding replication of blocks are made by the NameNode, which periodically(every 3 seconds)receives a heartbeat and a block report from each of the DataNodes. A heartbeat is used to ensure proper functioning of DataNodes, and a block report allows verfifying that a list of blocks on a DataNode corresponds to the NameNode information. One of the first things that a DataNode does on startup is sending a block report to the NameNode. This allows the NameNode to rapidaly form a picture of the block distribution across the cluster.

An importatnt characteristic of the data replication in HDFS is rack awareness. Large HDFS instances run on a cluster of computers that is commonly spread across many racks. Typically, network banwidth(and consequently network performance) between machines in the same rack is greater than network bandwidth between machines in different racks.

The NameNode determines the rack ID that each DataNode belongs to via the Hadoop Rack Awareness process. A simple policy is to place replicas on unique racks. This policy prevents losing data when an entire rack is lost, and evenly distributes replicas in the cluster. It also allows using bandwidth from multiple racks when reding data. But because a write must, in this case, transfer blocks to multiple racks, the performance of writes suffers.

An optimization of a Rack Aware policy is to cut inter-rack write traffic(and consequently improve write performance)by using the number of racks that is less than the number of replicas. For example, when a replication factor is three, two replicas are placed on one rack, and the third one is on a different rack.

To minimize global bandwidth consumption and read latency, HDFS tries to satisfy a read request from a relica that is closest to the reader. If a replicas exists on the same rack as the reader node, that relica is used to satisfy the read request.

As mentaioned, each DataNode periodically sends a heartbeat message to the NameNode, which is used by the NameNode to discover DataNode failures(based on missiong heartbeat). The NameNode marks DataNode without recent heartbeats as dead, and does not dispatch any new I/O requests to them. Because data located at a dead DataNode is no longer available to HDFS, DataNode death may cause the replication factor of some blocks to fall below their specified values. The NameNode constantly tracks which blocks must be re-relicated, and initiates replication whenever necessary.

Using HDFS Files
When a file is opened for writing, opening client is granted an exclusive writing lease for the file. This means that no other client can write to this file until this client completes the operation. To ensure that no "runaway" clients are holding a lease, the lease priodically expires. The use of leases effectively ensures that no two applications can simultaneously write to a given file(compared to a write lock in the database).

The lease duration is bound by a soft limit and a hard limit. For the duration of a soft limit, a writer has an exlusive access to the file. If the soft limit expires and the client fails to close the file or renew the lease(by sending a heartbeat to the NameNode), another client can preempt the lease. If the hard limit(one hour) expires and the client has failed to renew the lease, HDFS assumes that the client has quit, and automatically closes the file on behalf the writer, and then recovers the lease.

The writer's lease does not prevent other clients from reading the file. A file may have many concurrent readers.

Hadoop-Specific File Types
In addition to "ordinary" files, HDFS also introduced several specialized file types(such as SequenceFile, MapFile, SetFile, ArrayFile, and BloomMapFile)that provide much richer functionality, which often simplifies data processing.

SequenceFile provides a persistent data structure for binary key/value pairs. Here, different instances of both key and value must represent the same Java class, but can have different sizes. Similar to other Hadoop files, SequenceFiles are append-only.

When using an ordinary file(either text or binary)for storing key/value pairs(typical data structures for MapReduce), data storage is not aware of key and value layout, which must be implemented in the readers-on top of generic storage. The use of SequenceFile provides a storage mechanism natively supporting key/value structure, thus making implementations using this data layout much simpler.

SequenceFile has three available formats: Uncompressed, Record-Compressed, and Block-Compressed. The first two are stored in a record-based format, whereas the third one uses block-bases format.

The choice of a specific format for a sequence file defines the length of the file on the hard drive. Block-Compressed files typeically are the smallest, while Uncompressed are the largest.

SequenceFile Header
FIELD DISCRIPTION
Version A 4-byte array conaining three letters(SEQ) and a sequence file version number (either 4 or 6). The currently used version is 6. Version 4 is supported for backward compatibility.
Key Class Name of the key class, which is validated against a name of the key class provided by the reader.
Value Class Name of the value class, which is validated against a name of the key class provided by the reader.
Compression A key/value compression flag
Block Compression A block compression flag
Compression Codec CompressionCodec class. This class is used only if either key/value or block compression flag are true. Othersize, this value is ignored.
Sync A sync makeer

A sync is specialized marker, which is used for faster search inside SequenceFile. A sync marker also use in MapReduce implementation - data splits are done only on sync boundaries.

Record Layout
FIELD DESCRIPTION
Record Length The length of the record(bytes)
Key Length The length of the key(bytes)
Key Byte array, containing the record's key
Value Byte array, containing the record's value
In this case, header and sync are serving the same purposes as in the case of a record-based SequenceFile format. The actual data is contained in the blocks.

Block Layout
FIELD DESCRIPTION
Keys lengths length In this case, all the keys for a given block are sorted together. This field specified compressed key-lengths size(in bytes)
Keys Lengths Byte array, containing compressed key-lengths block
keys length Compressed keys size(in bytes)
Keys Byte array, containing compressed keys for a block
Values lengths length In this case, all the valeus for a given block are stored together. This field specifies compressed value-lenghts block size(in bytes)
Values Lengths Byte array, containing compressed value-lengths block
Values length Compressed value size(in bytes0
Values Byte array, containing compressed values for a block

All formats use the same header that contains information allowing the reader to recognize it. The header contains key and value class names that are used by the reader to instantiate those classes, the version number, and compression information. If compression in enabled, the Compression Codec class name field is addes to the header.

Metadata for the SequenceFile is a set of key/value text pairs that can conatin additional information about the SequenceFile that can be used by the file reader/writer.

Implementation of write operations for Uncompressed and Record-Compressed format is very similar. Each all to an append() method adds a record containing the length of the whole record(key length plus value length), the length of the key, and the raw of key and value to the SequenceFile. The difference between the compressed and the uncompressed version is whether or not the raw data is compressed, with the specified code.

The Block-Compressed format applies a more aggressive compression. Data is not written until it reaches a threshold (block size), at which point all keys are compressed together. The same thing happnens for the values and the lists of key and value lengths.

A special reader(SequenceFile.Reader) and writer(SequenceFile.Writer) are provided by Hadoop for use with SequenceFiles.
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path path = new Path("filename");
SequenceFile.Writer sequenceWriter = new SequenceFile.Writer(fs, conf, path, Key.class, Value.class, fs.getConf().getInt("io.file.buffer.size", 4096), fs. getDefaultReplication(), 1073741824, null, new Metadata());

.......................................
SequenceWriter.append(bytesWritable, bytesWritable);
.......................................
IOUtils.closeStream(sequenceWriter);
A minimal SequenceFile writer constructor(SequenceFile.Writer(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass)) requires the specification of the filesystem, Hadoop configuration, path(file location), and definition of both key and value classes. A constructuctor that is used in the previous example enables you to specify additonal file parameters, including the following:
  1. int bufferSize - The default buffer size(4096) is used, if it is not defined.
  2. short replication - Default replication is used.
  3. long blockSize - The value of 1073741824(1024 MB) is used.
  4. Progressable progress - None is used.
  5. SequenceFile.Metadata metadata - An empty metadata class is used.
Once the writer is created, it can be used to add key/record pairs to the files.

One of the limitations of SequenceFile is the inability to seek based on the key values. Additional Hadoop file types(MapFile, SetFile, ArrayFile, and BloomMapFile) enalbe you to overcome this limitation by adding a key-based index on top of SequenceFile.

A MapFile is really not a file, but rather a directory containing two files - the data(sequence) file, containing all keys and values in the map, and a smaller index file containing a fraction of the keys. Create MapFiles by adding entries in order. MapFiles are typically used to enable an efficient search and retrieval of the contents of the file by searching on their index.

The index file is populated with the key and a LongWritable that contains the starting byte position of the record correspondign to this key. An index file does ot contain all the keys, but just a fraction of them. You can set the indexInterval using setIndexInterval() method on the writer. The index is read entirely into memory, so for the large map, it is necessary to set an index skip value that allows making the index file small enough so that it fits in memory completely.

Similar to the SequenceFile, a special reader(MapFile.Reader) and writer(MapFile.Writer) are provided by Hadoop for use with map files.

SetFile and ArrayFile are variations of MapFile for specialized implementations of key/value types. The SetFile is a MapFile for the data represented as a set of keys with no values(a value is represented by NullWritable instance). The ArrayFile deals with key/value pairs where keys are just a sequential long. It keeps an internal counter, which is incremented as part of every append call. The value of this counter is used as a key.

Both file types are useful for sorting keys, not values.

A bloom filter is a space-efficient, probabilistic data structure that is used to test whether an element is a member of a set. The result of the test is that the element either definitely is not in the set or may be in the set.
The base data structure of a bloom filter is a bit vector. The probability of false positives depends on the size of the element's set and size of the bit vector.

Although risking false positive, bloom filters have a strong space advantage over other data structures for representing sets, such as self-balancing binary search trees, tries, hash tables, or simple arrays or linked lists of the entries. Most of these require storing at least the data items themselves, which can require anywhere from a small number of bits(for small integers) to an arbitrary number of bits, such as for strings.(Tries are an exception, because they can share storage between elements with equal prefixes.)

This advantage of a bloom filter comes partly from its compactness(inherited from arrays), and partly from its probabilistic nature).

Finally, the BloomMapFile extends the MapFile implementation by adding a dynamic bloom filter that provides a fast membership test for keys. It also offers a fast version of a key search operation, especially in the case of sparsely populated MapFiles. A writer's append() operation updates a DynamicBloomFilter, which is then serialized when the writer is closed. This filter is loaded in memory when a reader is created. A reader's get() operation first checks the filter for the key membership, and if the key is absent, it immediately returns null without doing any further I/O.

DATA COMPRESSION

An important consideration for storing data in HDFS files is data compression, shifting the computation load in data processing from I/O to CPU. As shown in several publications, providing a systematic evalation of the compute versus I/O trade-offs when using compression for MapReudce implementation, the benefits of data compression depend on the type of data-processing jobs. For read-heavy(I/O bound)applications(for example, text data processing), compressing given 35 to 60 percent performance savings. On the other hand, for compute-intensive applications(CPU bound), performance gains from data compression are negligible.

This does not mean that data compression is not advantageous for such applications. Hadoop clusters are shared resources, and as a result, a diminishing I/O load for one application increases the capability of other applications to use this I/O.

Does this mean that data compression is always desirable? The answer is "no". For example, if you are using text or custom binary input files. data compression might be undesirable, because compressed files are not splittable. On the other hand, in the case of SequenceFile and their derivatives, comperssion is always desirable. Finally, it always makes sense to compress the intermediate files used for shuffle and sort.

Keep in mind that the result of data compression depend greatly on the type of data being compressed and the compression algorithm.

HDFS Federation and High Availability
The main benefits of HDFS Federation:
  • Namespace scalability - HDFS cluster storage scales horizontally, but the namespace does not. Large deployments(or deployments using lot of small files)benefit from scaling the namespace by adding more NameNodes to the cluster.
  • Performance - Filesystem operation throughput is limited by a single NameNode. Adding more NameNode to the cluster scales the filesytem read/write operation's throughput.
  • Isolation - A single NameNode offers no isolation in a multi-user environment. An exprimental application can overload the NameNode and slow down production-critical applications. With multiple NameNodes, different categories of application and users can be isolated to different namespaces.
Implementation of HDFS is bases on the collection of independent NameNodes that don't require coordination with each other. The DataNodes are used as storage for blocks by all the NameNodes. Each DataNode registers with all the NameNodes in the cluster. DataNodes send periodic heartbeats and block reports, and handle commands from the NameNodes.

A namespace operates on a set of blocks - a block pool. Although a pool is dedicated to a specific namespace, the actual data can be allocated on any of the DataNodes in the cluster. Each block pool is managed independently, which allows a namespace to generte block IDs for new blocks without the need for coordination with the other namespaces. The failure of a NameNode does not prevent the DataNode from serving other NameNodes in the cluster.

A namespace and its block pool together are called a namespace volume. This is a self-contained unit of management. When a NameNode/namespace is deleted, the corresponding block pool at the DataNodes is deleted. Each namespace volume is upgraded as a unit, during clustr upgrade.

HDFS Federation configuration is backward-compatible, and allows existing single NameNode configuration to work without any change. The new configuration is designed such that all the nodes in the cluster have the same configuration without the need for deploying a different configuraiton based on the type of the node in the cluster.

Although HDFS Federation solves the problem of HDFS scalability, it does not solve the NameNode reliability issue.(In reality, it makes it worse - a probability of one NameNode failure in this case is higher).

A new HDFS high-availability architecture that contains two separate machines configured as NameNodes with exactly one of them in an active state at any point in time. The active NameNode is responsible for all client operations in the cluster, while the other one(standby) si simply acting as a slave, maintaining, enough state to provide a fast failover if necessary. To keep sate of both nodes synchronized, the implementation requires that both nodes have access to a directory on a sharded storage device.

When any namespace modification is performed by the active node, it durably logs a record of the modification to a log file located in the shared directory. The standby node is constantly watching this directory for changes, and applies them to its own namespace. In the event of a failover, the standby ensures that it has read all of the changes before transitioning to the active state.

To provide a fast failover, it is also necessary for the standby node to have up-to-date information regarding the location of blocks in the cluster. This is achieved by configuring DataNodes to send block location information and heartbeats to both DataNodes(这里应是 both NameNodes吧?).


HBASE

HBase Architecture
HBase supports a very loose schema definition, and does not provide any joins, query language, or SQL.

Although HBase does not support real-time joins and queries, bach joins and/or queries via MapReduce can be easily implemented. In fact, they are well-supported by high-level systems such as Pig and Hive, which use a limited SQL dialect to execute operations.

The main focus of HBase is on Create, Read, Update, and Delete(CRUD) operations on wide sparse tables. (Currently, HBase does not support transactions(but provides limited locking support and some atomic operations)) and secondary indexing(several community projects are trying to implement this functionality, but they are not part of the core HBase implementations). As a result, more HBase-based implementations are using highly denormalized data.

HBase levergaes HDFS for its persistent data storage. This allows HBase to leverage all advanced features that HDFS provides, including checksums, replication, and failover. HBase data management is implemented by distributed region servers, which are managed by HBase master(HMaster).

A region server's implementation consists of the following major components:
  • memstore is HBase's implementation of in-memory data cache, which allows improving the overall performance of HBase by serving as much data as possible directly from memory. The memstore holds in-memory modifications to the store in the form of key/value. A write-ahead-log(WAL) records all changes to the data. This is important in case something happens to the primary storage. If the server crashes, it can effectively replay that log to get everything up to where the server should have been just before the crash. It also means that if writing the record to the WAL fails, the whole operation must be considered a failure.
    One of the HBase optmizatiion techniques is disabling the writes to the WAL. This represents a trade-off between performance and reliability. Disabling writes to the WAL prevent recovery when a region server fails before a write operation completes. You should use such an optimization with care, and only in cases when either data loss is acceptable, or a write operation can be "replayed" based on an additional data source.
  • HFile is a specialized HDFS file format for HBase. The implementation of HFile in a region server is responsible for reading and writing HFiles to and from HDFS.
    ZOOKEEPER
    Zookeeper is a replicated synchronization service with eventual consistency. It is robust, because the persisted data is distributed between multiple nodes(this set of nodes is called an ensemble) and a client connects to any of them(that is, a specific "server"), migrating if a given server fails. As long as a strict majority of nodes are working, the ensemble of Zookeeper nodes is alive.

    Zookeeper's master node is dynamically chosen by consensus within the ensemble. If the master node fails, the remainder of nodes picks a new master. The master is the authority for writes. This guarantees that the writes are persisted in-order(that is, writes are linear). Each time a client writes to the ensemble, a majority of nodes persist the information. This means that each write makes the server up-to-date with the master.

    A canonical example of Zookeeper usage is distributed-memory computation, where some data is shared between client nodes, and must be accessed/updated in a very careful way to account for synchronization. Zookeeper offers the library to construct custom synchronization primitives, while the capability to run a distributed server avoids the single-point-of-failure issue you have when using a centralized message repository.

A distributed HBase instance depends on a running Zookeeper cluster. All participating nodes and clients must be able to access the running Zookeeper instances. By default, HBase manages a Zookeeper "cluster" - it starts and stops the zookeeper processes as part of the HBase start/stop process. Because the HBase master may be relocated, clients bootstrap by looking for the current location of the HBase master and -Root- table.

HBase uses an auto-sharding and distribution approach to cope with a large data size(compared to HDFS block-based design and data access).

To store a table of arbitrary length, HBae partitions this table into regions, with every region containing a sorted(by primary key), continuous range of rows. The term "continuous" here does not mean that a region contains all the keys from the given interval. Rather, it means that all keys from an interval are guaranteed to be partitioned to the same region, which can have any number of holes in a key space.

The way regions are split depends not on the key space, but rather on the data size. The size of data partition for a given table can be configured during table creation. These regions are "randomly" spread across region servers.(A single region can serve any nubmer of regions for a given table)They can also be moved around for load balancing and failover.

When a new record is inserted in the table, HBase decide which region server it should go to (based on the key value) and inserts it there. If the region's size exceed the predefined one, the region automatically split. The region split is a fairly expensive operation. To avoid some of it, the table can aso be pre-split during creation, or manually at any point.

When a record(or set of records) is read/updated, HBase decide which regions should contain the data, and directs the client to the appropriate ones. From this point, region servers implement the actual read/update operation.

HBase leverage a specailized table(.META) to resolve a specific key/value pair to the specific region server. This table contains a list of avilable region servers, and a list of descriptors for user tables. Each descriptor specifies a key range for a given table, contained in a given region.

A .META. table is discovered using another specialized HBase table(-ROOT-), which contains a list of descriptors for a .META. table. A location of the -ROOT- table is held in Zookeeper.

An HBase table is sparse, distributed, persistent multidimensional sorted map. The first map level is a key/row value. As mentioned, row keys are always sorted, which is the foundation of a table's sharding and efficient reads and scans - reding of the key/value pairs in sequence.

One thing to be aware of is the fact that HBase operates on byte arrays. All of the components of HBase data - keys, column family names, and column names - are treated by HBase as array of uninterpereted bytes. This means that all interval value comparisons and, consequently, sorting, is done in lexicographical order. this is very important to remember, especially for the row keys design to avoid unpleasant surprise. A typical example is usage of integer keys. If they are not left padded to have the same length, then, as a result of HBase sorting, a key 11, for example, will appear before 5.(key 按词典顺序进行排序,所以,假设用整数做 key,在它们未做左补 0 时,字符长度不同,这时进行排序时,key 为 11 的就会排在 5 的前面。)

The second map level used by HBase is based on a column family.(Column families were originally introduced by columnar databases for fast analytical queries. In this case, data is sorted not row by row as in a traditional RDBMS, but rather by column families.) Column families are used by HBase for separtion of data bases on access patterns(and size).

Column families play a special role in HBase implementation. They define how HBase data is stored and accessed. Every column family is stored in a sperate HFILE. This is an important considertion to keep in mind during table design. It is recommended that you create a column family per data access type - that is, data typically read/written together should be placed in the same column family.(Column Family 存在对应的 HFILE 文件中,所以同时读写的数据需放在同一 Column Family 中。)

A set of the column families is defined during table creation(although it can be altered at a later point). Different column families can also use a different compression mechanism, which might be an important factor, for example, when separate column family are created for metadata and data(a common design pattern). In this case, metadata is often relatively small, and does not require compression, whereas can be sufficiently large, and compression often allows improving HBase thoughput.

Because of such a storage organization, HBase implements merged reads. For a given row, it reads all of files of the column families and combines them together before sending them back to the client. As a result, if the whole row is always processed together, a single column family will typically provide the best performance.(如需经常读取整行数据,从性能上来考虑,最好是只有一 Column Family.)

The last map level is based on columns. HBase treats columns as a dynamic map of key/value pairs. This means that columns are not difined during table creation, but are dynamically populated during write/update operations. As a result, every row/column family in the HBase table can contain an arbitrary set of clumns. The columns contain the actual values.

Technically, there is one more map level supported by HBaes - versioning of every column value. HBase does not distinguish between writes and updates - an update is effectively a write with a new version. By default, HBase stores the last three vesions for a given column value(while automatically deleting order versions). The depth of versioning can be controlled during tale creation. A default implementation of the version is the timestamp of the data insertion, but it can be easily overwritten by a custom version value.

Following are the four primray data operations supported by HBase:
  • Get return values of column families/columns/versions for a specified row(or rows). It can be further narrowed to apply to a specific column family/clolumn/version. It is important to realize that if Get is narrowed to a single column fmaily, HBase would not have to implement a merged read.(HBase 返回行或多行的值时可以指定具体的 column family/column/version 的值。)
  • Put either add a new row(or rows) to a table if the key doesn not exist, or updates an existing row(or rows) if the key already exists. Similar to Get, Put can be limited to apply to a specific column family/column.(新增/更新行或多行时也可以指定具体的 column family/column.)
  • Scan allows iteration over multiple rows(a key range) for specified values, which can include the whole row, or any of its subset.
  • Delete removes a row (or rows), or any part thereof from a table. HBase does not modify data in place, and, as a result, Deletes are handled by creating new markers called tombstones. These tombstones, along with the dead values, are cleaned up on major compactions.

Get and Scan operations on HBase can be optionally configured with filters, which are applied on the region server. They provide an HBase read optimization technique, enableing you to improve the performance of Get/Scan operations. Filters are effectiely conditions that are applied to the read data on the region server, and only the rows(or parts of th rows) that pass filtering are delivered back to the client.

HBase supports a wide range of filters from row key to column family, and columns to value fiters. Additionally, fiters can be combined in the chains using boolean operations. Keep in mind that filtering does not reduce the amount of data read(and still ofen requires a full table scan), but can significant reduce network traffic. Filter implementations must be deployed on the servers, and require its restart(filter 没有减少总的数据读取,通常还是需要进行全表扫描,但大大的减少了网络传输。).

In additional to general-purpose Put/Get operations, HBase supports the following specialized operations:
  • Atomic conditional operation(including atomic compare and set)allow executing a server-side update, guarded by a check and atomic compare and delete(executing a server-side guarded Delete)
  • Atomic "counters" increment operations, which guarantee synchronized operations on them. Synchronization, in this case, is done within a region server, not on the client.

Below table describe teh format that HBase uses to store the table data.

Table Data Layout in the File
KEY TIMESTAMP COLUMN FAMILY:COLUMN NAME VALUE
Row key Timestamp of the last update Family:column Value

This table explains the mechanics used by HBase to deal with sparsely populated tables and arbitrary column names. HBase explicity stores each column, defined for a specific key in an appropriate column family file.

Because HBase uses HDFS as a persistence mechanism, it never overwrites data.(HDFS does not support updates.) As a result, every time memstore is flushed to disk, it does not overwrite an existing store file, but rather creates a new one. To avoid proliferation of the store files, HBase implements a process known as compaction.

Two types of compaction exist: minor and major. Minor compactions usually pick up a couple of the smaller adjacent store file and rewrite them as one. Minor compactions do not drop Deletes or expired cells; only major compations do this. Sometimes a minor compaction picks up all the store files, in which case it actually promotes itself to being a major compaction.

After a major compaction runs, there will be a single store file per store, which usually improves performance. Compactions will not perform region merges.

For faster key search within the files, HBase leverages bloom filters, which enable you to check on row or ow/column level, and potentially filter an entire stored file from read. This filtering is especially useful in the case of a population of sparse keys. Bloom filters are generated when a filter is persisted, and are stored at the end of each file.

HBase Schema Design
Guidelines:
  • The most effective access to HBase data is usage of Get or Scan operations based on the row key. HBase does not support any secondary keys/indexes. This means that, ideally, a rwo key should be designed to accomodate all of the access patterns reuqired for a specific table. This often means using the compositive row key to accommodate more data access patterns.
  • A general guideline is to limit the number of column families per table not to exceed 10 to 15.(Remember that every clomun family is stored by HBase in a separate file, so a large amount of column families are required to read and merge multiple files). Also, based on the fact that the column family name is stored explicitly with every column name, you should minimize the size of the column family. Single-letter names are often recommended, if the amount of data in the column is small.
  • Although HBase does not impose any limitation on the size of columns in a given row, you should consider the following:
    1. Rows are not splittable. As a result, a huge column data size(close to the region size)typically manifests that this type of data should not be stored in HBase.
    2. Each column value is stored along with its metadata(row key, column family name, column name), which means that a very small column data size leads to a very ineffeicitent use of storage(that is, more space is occupied with metadta than the actual table data). It also implies that you should not use long column names.
  • When deciding between tall-narrow(millions of keys with a limited amount of columns) versus flat-wide(a limited number of keys with millions of columns) table design, the former is generally rocommended. This is because of the following:
    1. In the extreme cases, a flat-wide table might end up with one row per region, which is bad for performance and scalability.
    2. Table scans are typically more efficient compared to massive reads. As a result, assuming that only a subset of the row data is needed, a tall-narrow design provides better overall performance.

On of the main strengths of HBase design is the distributed execution of requests between multiple region servers. However, taking advantage of this design and ensuring that there are no "hot"(overloaded) servers during an application's execution might require special design approaches for the row key. A general rocommendation is to avoid a monotonically increasing sequence(for example, 1, 2, 3, or timestamps)as the row key for massive Put operations. You can mitigate any congestion in a single region brought on by monotonically increasing keys by randomizing the key values so that they are not in sorted order.(在写操作很大的表中,避免使用递增整数/时间戳做 row key.)

Data locality is also an importatn design consideration for HBase Get/Scan operations. Rows that are often retrived together should be co-located, which means that they must have adjacent keys.

A general rule is to use sequential keys in the Scan-heavy cases, especially if you can leverage bulk imports for data populateion. Random keys are recommended for massive parallel writes with random, single key access.(一个简单的规则是,在 Scan 操作大的表中,建议采用有顺序的 key 做为 row key,而在 Put 并行操作大的或是单个 key 操作比较多的表中,采用随机 key 做 row key.)


Some specific row key design patterns:
  • Key "salting" - This entails prefixing a sequentail key with a random value, which allows for "bucketing" sequential keys across multiple regions.
  • The swap/promotion of key fields(for example "reverse domains") - A common design pattern in web analytics is to use a domain name as a row key. Using a reverse domain name as a key helps in this case by keeping information about this would be use an MD5 hash.
  • Complete randomization of keys - An exmple of this would be use an MD5 hash.



Programming for HBase
One thing to keep in mind is that HTable implementation is single-threaded. If it is necessary to have access to HBase from multiple threads, every thread must create it own instance of the HTable class. One of the ways to solve this problem is to use the HTablePool class. This class servers one purpose - pooling client API(HTableInterface) instances in the HBase cluster.
......

New HBase Features
HFile V2 Format
The problem with the current HFile format is that it causes high memory usage and slow startup times for the region server because of large filters and block index sizes.

In the curretn HFile format, there is a single index file that always must be stored in memory. This can result in gigabytes of memory per server consumed by block sizes, which has a significatn negative impact on region server scalability and performance. Additionally, because a region is not considered opened until all of its block index data is loaded, such block index size can significantly slow up region startup.

To solve this problem, the HFile v2 format breaks a block index into a root index block and leaf blocks. Only the root index(indexing data blocks) must always be kept in memory. A leaf index is stored on the level of blocks, which means that its presence in memory depends on the presence of blocks in memory. A leaf index is loaded in memory only when the block is loaded, and is evicted when the block is evicted from memory. Additionally, leaf-level indexes are structured in a way to allow a binary search on the key without deserializing.

A similar approach is taken by HFile v2 implementers for bloom filters. Every data block effectively uses its own bloom filter, which is being written to disk as soon as a block is filled. At read time, the appropriate bloom filters do not rely on an estimate of how many keys will be added to the bloom filter, so they can hit the target false positive rate much more precisely.

Following are some additional enhancements of HFile v2:
  • A unified HFile block format enables you to seek to the provious block efficiently without using a block index.
  • The HFile refactoring into a reader and writer hierarachy allows for significatn improvements in code maintainability.
  • A sparse lock implementation simplifies synchronization of block operations for hierarchical block index implementation.
An important feature of the current HFile v2 reader implementation is that it is capable of reading both HFile v1 and v2. The writer implementation, on the other hand, only writes HFile v2. This allows for seamless transition of the existing HBase installations from HFile v1 to HFile v2. The use of HFile v2 leads to noticeable improvements in HBase scalability and performance.

Coprocessors
HBase coprocessors were inspired by Google's BigTable coprocessors, and are designed to support efficient computational parallelism - beyond what Hadoop MapReduce can provide. In addition, coprocessors can be used for implementation of new features - for example, seconday indexing, complex filtering (push down predicates), and access control.

Although inspired by BigTable, HBase coprocessors diverge in implementation detail. They implement a framework that provides a library and runtime environment for executing user code within the HBase region server(that is, the same Java Virtual Machine, or JVM) and master processes. In contrast, Google coprocessors do not run inside with the tablet server(the equivalent of an HBase region server), but rather outside of its address space.(HBase developers are also considering an option for deployment of coprocessor code external to the server process for future implementations.)

HBase defiens two types of coprocessors:
  • System coprocessors are loaded globally on all tables and regions hosted by a region server.
  • Table coprocessors are loaded on all regions for a table on a per-table basis.

The framework for coprocessors is very flexible, and allows implementing two basic coprocessor types:
  • Observer(which is similar to triggers in conventional databases)
  • Endpoint(which is similar to conventional database stored procedures)

Observers allow inserting user's code in the execution of HBase calls. This code is invoked by the core HBase code. The coprocessor framework handles all of the details of invoking the user's code. The coprocessor implementation need only contain the desired functionality.

  • RegionObserver - This provides hooks for data access operations(Get, Delete, Scan, and so on), and provides a way for supplementing these operations with custom user's code. An instance of RegionObserver coprocessor is loaded to every table region. Its scope is limited to the region in which it is present. A RegionObserver needs to be loaded into every HBase region server.
  • WALObserver - This provides hooks for write-ahead-log operations. This is a way to enhance WAL writing and reconstruction events with custom user's code. A WALObserver runs on a region server in the context of WAL processing. A WALObserver needs to be loaded into every HBase region server.
  • MasterObserver - This provides hooks for table management operations(that is, create, delete, modify table, and so on) and provides a way for supplementing these operations with custom user's code. The MasterObserver runs within the context of the HBase master.

Observers of a given type can be chained together to execute sequentially in order of assigned priorities. Coprocessors in a given chain can additionally communicate with each other by passing information through the execution.

Endpoint is an interfce for dynamic remote procedure call(RPC) extension. The endpoint implementation is installed on the server side, and can then be invoded with HBase RPC. The client library provides convenient methods for invoking such dynamic interfaces.

The sequence of steps for building a custom endpoint is as follows:
  1. Create a new interface extending CoprocessorProtocol and supporting a data exchange required for RPC implementation. Data transfer must be implemented as byte arrays.
  2. Implement the endpoint interface using(extending) the abstract calss BaseEndPointCoprocessor, which hides some internal implementation details(such as coprocessor of framework class loading). The implementation must contain all the required coprocessor functionality, and will be loaded into and executed from the region context. There is nothing that prevents this implementation from issuing HBase operations, which might involve additional region servers.

On the client side, the endpoint can be invoked by new HBase client APIs that allow executing it on either a single region server, or a range of region servers.

The current implementation provides two options for deploying a custom coprocessor:
  • Load from configuration(which happens when the master or region servers start up)
  • Load from a table attribute(that is, dynamic loading when the table is(re)opened)

When considering the use of coprocessors for your own development, be aware of the following:
  • Because, in the current implementation, coprocessors are executed within a region server execution context, badly behavign coprocessors can take down a region server.
  • Coprocessor execution is non-transactional, which means that if a Put coprocessor that is supplementing this Put with additional write operations fails, the Put is still in place.

Although HBase provides a much richer data access model and typically better data access performance, it has limitations when it comes to the data size per row.





2014年5月16日星期五

Chapter 2 Storing Data in Hadoop

HDFS

HDFS is implemented as a block-structured filesystem. individual fiels are broken into blocks of a fixed size, which are stored across an Hadoop cluster. A file can be mand up of several blocks, which are stroed on different DataNode(indiidual machine in the cluster) chosen randomly on a block-by-block basis. As a result, access to a file usually requires access to multiple DataNodes, which means that HDFS supports file sizes far larger than a single-machine disk capacity.

The DataNode stores each HDFS data block in a separate file on its local filesystem with no knowledge about the HDFS files themselves. To improve throughput ever futher, the DataNode does not create all files in the same directory. In stead, it uses heuristics to determine the optimal number of files per directory, and created subdirectories appropriately.

One of the requirements for such a block-structured filesystem is the capacity to store, manage, and access file metadata(information about files and blocks) reliably, and to provide fast access to the metadata store. Unlike HDFS files themselves(which are accessed in a write-once and read-many model), the metadata structures can be modified by a large number of clients concurrently. It is important that this information never gets out of sync. HDFS solves this problem by introducing a dedicated special machine, called the NameNode, which stores all the metadata for the filesystem across the cluster. This means that HDFS implements a master/slave architecture. A single NameNode(which is a master server) manages the filesystem namespace and regulates access to files by clients. The existence of a single master in a cluster greatly simplifies the architecture of the system. The NameNode serves as a single arbitrator and respository for all HDFS matadata.

Metadata storage is also perssitent. The entire filesystem namespace(including the mapping of blocks to files and filesystem properties) is contained in a file called FsImage stored as a file in the NameNode's local filesystem. The NameNode also uses a transaction log to persistently record every change that occurs in filesystem metadata(metadata store). This log is stored in the edits log file on the NameNode's local filesystem.

The SecondaryNameNode is not a "backup NameNode". It cannot take over the primary NameNode's function. It serves as a checkpointing mechanism for the primary NameNode. In addition to storing the state of the HDFS NameNode, it maintains two on-disk structures that persist the current filesystem state: an image file and an edit log. The image file represents an HDFS metadata state at a point in time, and the edit log is a transactional log(compare to a log in a database architecture) of every filesystem metadata change since the image file was created.

During the NameNode (re)starts, the current state is recontstructetd by reading teh image file and then replaying the edit log. Obviousely, the larger the edit log is, the longer is takes to replay it and consequently start a NameNode. To improve NameNode startup performance, an edit log is priodically rolled, and a new image file is created by applying an edit log to the existing image. This operation can be fairly resource-intensive. To minimize the impact of checkpoint creation and the NameNode functioning, checkpointing is performed by the SecondaryNameNode daemon, often on a separate machine.

As a result of checkpointing, the Secondary NameNode holds a copy (out-of-date) of the primary's persistent state in the form of last image file. In the cases when and edit file is kept relatively small, a secondary node can be used to recover the filesystem's state. In this case, you must be aware of a certain amount of metadata (and corresponding data) loss, because the latest changes stored in the edit log are not available.

To keep the memory of footprint of the NameNode manageable, the default size of an HDFS block is 64MB -- orders of magnitude larger than the block size of the majority of most other block-structured filesystem. The additional benefit of the larger data block is that it allows HDFS to keep large amounts of data stored on the disk sequentially, which supports fast streaming reads of data.

One of the misconceptions about Hadoop is the assumption that smaller blocks(less then the block size)will still use the whole block on the filesystem. This is not the case. The smaller blocks occupy exactly as much disk space as they require.

But this does not mean that having many small files will use HDFS efficiently. Regardless of the block size, its metadata occupies exactly the same amount of memroy in the NameNode. As a result, a large number of small HDFS files(smaller than the block size) will use a lot of the NameNode's memory, thus negatively impacting HDFS scalability and performance.

The downside of HDFS fiel organization is that several DataNodes are involved in the serving of a file, which means that a file can become unavailable in the case any one of those machines is lost. To avoid this problem, HDFS replicates each block across a number of machines(three, by default).

Data replication in HDFS is implemented as part of a write operation in the form of a data pipeline. When a client is writing data to an HDFS file, this data is first written to a local file. When the local file accumulates a full block of data, the client consults the NameNode to get a list of DataNodes that assigned to host replicas of that block. The client then writes the data block from its local storage to the first DataNode in 4K portions. The DataNode stores the received blocks in a local filesystem, and forwards that portion of data to the next DataNode in the list. The same operation is repeated by the next receiving DataNode until the last node in the replica set receive data. This DataNode stores data locally without sending it any further.

If one of the DataNodes fails while block is being written, it is removed from the pipeline. In this case, when the write operation on the current block completes, the NameNode re-replicates it to make up for the missing replica caused by the failed DataNode. When a file is closed, the remaining data in the temporary local file is piplelined to the DataNodes. The client then informs the NameNode that the file is closed. At this point, the NameNode commits the file creation operation into a persistent store. If the NameNode dies before the file is closed, the file is lost.

The default block size and replication factor are specified by Hadoop configuration, but can be overwritten on a per-file basis. An application can specify block size, the number of replicas, and the replication factor for a specific file at its creation time.

One of the most powerful features of HDFS is optimization of replica placement, which is crucial to HDFS reliability and performance. All decisions regarding replication of blocks are made by the NameNode, which periodically(every 3 seconds)receives a heartbeat and a block report from each of the DataNodes. A heartbeat is used to ensure proper functioning of DataNodes, and a block report allows verfifying that a list of blocks on a DataNode corresponds to the NameNode information. One of the first things that a DataNode does on startup is sending a block report to the NameNode. This allows the NameNode to rapidaly form a picture of the block distribution across the cluster.

An importatnt characteristic of the data replication in HDFS is rack awareness. Large HDFS instances run on a cluster of computers that is commonly spread across many racks. Typically, network banwidth(and consequently network performance) between machines in the same rack is greater than network bandwidth between machines in different racks.

The NameNode determines the rack ID that each DataNode belongs to via the Hadoop Rack Awareness process. A simple policy is to place replicas on unique racks. This policy prevents losing data when an entire rack is lost, and evenly distributes replicas in the cluster. It also allows using bandwidth from multiple racks when reding data. But because a write must, in this case, transfer blocks to multiple racks, the performance of writes suffers.

An optimization of a Rack Aware policy is to cut inter-rack write traffic(and consequently improve write performance)by using the number of racks that is less than the number of replicas. For example, when a replication factor is three, two replicas are placed on one rack, and the third one is on a different rack.

To minimize global bandwidth consumption and read latency, HDFS tries to satisfy a read request from a relica that is closest to the reader. If a replicas exists on the same rack as the reader node, that relica is used to satisfy the read request.

As mentaioned, each DataNode periodically sends a heartbeat message to the NameNode, which is used by the NameNode to discover DataNode failures(based on missiong heartbeat). The NameNode marks DataNode without recent heartbeats as dead, and does not dispatch any new I/O requests to them. Because data located at a dead DataNode is no longer available to HDFS, DataNode death may cause the replication factor of some blocks to fall below their specified values. The NameNode constantly tracks which blocks must be re-relicated, and initiates replication whenever necessary.

Using HDFS Files
When a file is opened for writing, opening client is granted an exclusive writing lease for the file. This means that no other client can write to this file until this client completes the operation. To ensure that no "runaway" clients are holding a lease, the lease priodically expires. The use of leases effectively ensures that no two applications can simultaneously write to a given file(compared to a write lock in the database).

The lease duration is bound by a soft limit and a hard limit. For the duration of a soft limit, a writer has an exlusive access to the file. If the soft limit expires and the client fails to close the file or renew the lease(by sending a heartbeat to the NameNode), another client can preempt the lease. If the hard limit(one hour) expires and the client has failed to renew the lease, HDFS assumes that the client has quit, and automatically closes the file on behalf the writer, and then recovers the lease.

The writer's lease does not prevent other clients from reading the file. A file may have many concurrent readers.

Hadoop-Specific File Types
In addition to "ordinary" files, HDFS also introduced several specialized file types(such as SequenceFile, MapFile, SetFile, ArrayFile, and BloomMapFile)that provide much richer functionality, which often simplifies data processing.

SequenceFile provides a persistent data structure for binary key/value pairs. Here, different instances of both key and value must represent the same Java class, but can have different sizes. Similar to other Hadoop files, SequenceFiles are append-only.

When using an ordinary file(either text or binary)for storing key/value pairs(typical data structures for MapReduce), data storage is not aware of key and value layout, which must be implemented in the readers-on top of generic storage. The use of SequenceFile provides a storage mechanism natively supporting key/value structure, thus making implementations using this data layout much simpler.

SequenceFile has three available formats: Uncompressed, Record-Compressed, and Block-Compressed. The first two are stored in a record-based format, whereas the third one uses block-bases format.

The choice of a specific format for a sequence file defines the length of the file on the hard drive. Block-Compressed files typeically are the smallest, while Uncompressed are the largest.

SequenceFile Header
FIELD DISCRIPTION
Version A 4-byte array conaining three letters(SEQ) and a sequence file version number (either 4 or 6). The currently used version is 6. Version 4 is supported for backward compatibility.
Key Class Name of the key class, which is validated against a name of the key class provided by the reader.
Value Class Name of the value class, which is validated against a name of the key class provided by the reader.
Compression A key/value compression flag
Block Compression A block compression flag
Compression Codec CompressionCodec class. This class is used only if either key/value or block compression flag are true. Othersize, this value is ignored.
Sync A sync makeer

A sync is specialized marker, which is used for faster search inside SequenceFile. A sync marker also use in MapReduce implementation - data splits are done only on sync boundaries.

Record Layout
FIELD DESCRIPTION
Record Length The length of the record(bytes)
Key Length The length of the key(bytes)
Key Byte array, containing the record's key
Value Byte array, containing the record's value
In this case, header and sync are serving the same purposes as in the case of a record-based SequenceFile format. The actual data is contained in the blocks.

Block Layout
FIELD DESCRIPTION
Keys lengths length In this case, all the keys for a given block are sorted together. This field specified compressed key-lengths size(in bytes)
Keys Lengths Byte array, containing compressed key-lengths block
keys length Compressed keys size(in bytes)
Keys Byte array, containing compressed keys for a block
Values lengths length In this case, all the valeus for a given block are stored together. This field specifies compressed value-lenghts block size(in bytes)
Values Lengths Byte array, containing compressed value-lengths block
Values length Compressed value size(in bytes0
Values Byte array, containing compressed values for a block

All formats use the same header that contains information allowing the reader to recognize it. The header contains key and value class names that are used by the reader to instantiate those classes, the version number, and compression information. If compression in enabled, the Compression Codec class name field is addes to the header.

Metadata for the SequenceFile is a set of key/value text pairs that can conatin additional information about the SequenceFile that can be used by the file reader/writer.

Implementation of write operations for Uncompressed and Record-Compressed format is very similar. Each all to an append() method adds a record containing the length of the whole record(key length plus value length), the length of the key, and the raw of key and value to the SequenceFile. The difference between the compressed and the uncompressed version is whether or not the raw data is compressed, with the specified code.

The Block-Compressed format applies a more aggressive compression. Data is not written until it reaches a threshold (block size), at which point all keys are compressed together. The same thing happnens for the values and the lists of key and value lengths.

A special reader(SequenceFile.Reader) and writer(SequenceFile.Writer) are provided by Hadoop for use with SequenceFiles.
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path path = new Path("filename");
SequenceFile.Writer sequenceWriter = new SequenceFile.Writer(fs, conf, path, Key.class, Value.class, fs.getConf().getInt("io.file.buffer.size", 4096), fs. getDefaultReplication(), 1073741824, null, new Metadata());

.......................................
SequenceWriter.append(bytesWritable, bytesWritable);
.......................................
IOUtils.closeStream(sequenceWriter);
A minimal SequenceFile writer constructor(SequenceFile.Writer(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass)) requires the specification of the filesystem, Hadoop configuration, path(file location), and definition of both key and value classes. A constructuctor that is used in the previous example enables you to specify additonal file parameters, including the following:
  1. int bufferSize - The default buffer size(4096) is used, if it is not defined.
  2. short replication - Default replication is used.
  3. long blockSize - The value of 1073741824(1024 MB) is used.
  4. Progressable progress - None is used.
  5. SequenceFile.Metadata metadata - An empty metadata class is used.
Once the writer is created, it can be used to add key/record pairs to the files.

One of the limitations of SequenceFile is the inability to seek based on the key values. Additional Hadoop file types(MapFile, SetFile, ArrayFile, and BloomMapFile) enalbe you to overcome this limitation by adding a key-based index on top of SequenceFile.

A MapFile is really not a file, but rather a directory containing two files - the data(sequence) file, containing all keys and values in the map, and a smaller index file containing a fraction of the keys. Create MapFiles by adding entries in order. MapFiles are typically used to enable an efficient search and retrieval of the contents of the file by searching on their index.

The index file is populated with the key and a LongWritable that contains the starting byte position of the record correspondign to this key. An index file does ot contain all the keys, but just a fraction of them. You can set the indexInterval using setIndexInterval() method on the writer. The index is read entirely into memory, so for the large map, it is necessary to set an index skip value that allows making the index file small enough so that it fits in memory completely.

Similar to the SequenceFile, a special reader(MapFile.Reader) and writer(MapFile.Writer) are provided by Hadoop for use with map files.

SetFile and ArrayFile are variations of MapFile for specialized implementations of key/value types. The SetFile is a MapFile for the data represented as a set of keys with no values(a value is represented by NullWritable instance). The ArrayFile deals with key/value pairs where keys are just a sequential long. It keeps an internal counter, which is incremented as part of every append call. The value of this counter is used as a key.

Both file types are useful for sorting keys, not values.

A bloom filter is a space-efficient, probabilistic data structure that is used to test whether an element is a member of a set. The result of the test is that the element either definitely is not in the set or may be in the set.
The base data structure of a bloom filter is a bit vector. The probability of false positives depends on the size of the element's set and size of the bit vector.

Although risking false positive, bloom filters have a strong space advantage over other data structures for representing sets, such as self-balancing binary search trees, tries, hash tables, or simple arrays or linked lists of the entries. Most of these require storing at least the data items themselves, which can require anywhere from a small number of bits(for small integers) to an arbitrary number of bits, such as for strings.(Tries are an exception, because they can share storage between elements with equal prefixes.)

This advantage of a bloom filter comes partly from its compactness(inherited from arrays), and partly from its probabilistic nature).

Finally, the BloomMapFile extends the MapFile implementation by adding a dynamic bloom filter that provides a fast membership test for keys. It also offers a fast version of a key search operation, especially in the case of sparsely populated MapFiles. A writer's append() operation updates a DynamicBloomFilter, which is then serialized when the writer is closed. This filter is loaded in memory when a reader is created. A reader's get() operation first checks the filter for the key membership, and if the key is absent, it immediately returns null without doing any further I/O.

DATA COMPRESSION



2014年5月14日星期三

Hadoop 1.x.x SecondaryNameNode 配置

FROM: hadoop secondarynamenode的两种配置方式

master1 NameNode(NN)
master2 SecondaryNameNode(2NN)
  1. hdfs-site.xml
    
            dfs.http.address
            master1:50070
    
    
    
            dfs.secondary.http.address
            master2:50070
    
    
    
            fs.checkpoint.period
            3600
    
    
    
            fs.checkpoint.size
            67108864
    
    
    
            fs.checkpoint.dir
            /hadoop/dfs/namesecondary
    
    
    2NN 向 NN 获取镜像的地址是: http://${dfs.http.address}:50070/getimage?putimage=1&port=50090&machine=0.0.0.0&token=x&newChecksum=x, dfs.http.address 的默认地址是: 0.0.0.0:50070,当 2NN 和 NN 是不同主机时,该值必须指定(${namenode-ip:50070 or namenode-hostname:50070}),否则获取镜像文件失败。

    dfs.secondary.http.address 配置 Web 查看 2NN 状态的地址。默认是 0.0.0.0:50090,当 2NN 和 NN 在不同主机时,可指定为相同的端口。

    fs.checkpoint.period 配置获取镜像的间隔时间。

    fs.checkpoint.size 配置 edits(${dfs.name.dir}/current/edits) 的大小, 当 edits size 大于该值就进行checkpoint。默认是 64M。

    fs.checkpoint.dir 配置 2NN 的目录。默认是 ${hadoop.tmp.dir}/dfs/namesecondary 。
  2. masters
    master2
    
    master2 配置文件实际上是用来指定 SecondaryNameNode。所以内容是 master2,估计大部分 Hadoop 的初学者都会被其名字所迷惑。
  3. Sync configurtion files to slave nodes.

Chapter 7 Tuning Hadoop Cluster for Best Performance

A Hadoop cluster is composed of many components. A systematic way of performance tuning is to tune the components bases on their contribution on the cluster performance. Most of the BigData applications are I/O bound, so is Hadoop. so, configurations that are closely related to I/O requests should be the first priority for performance tuning. For example, suboptimal configuration on data replication properties can cause a large number of data block copies over the network, which will pose a negative effect on the performance of a cluster. Similary, improper JVM configuration can cause large data swaps for intermediate data. And, unbalanced data block distribution on the DataNodes can cuase the suboptimal execution of map and reduce tasks.

Benchmarkign and profiling a Hadoop Cluster

Perform HDFS benchmarks:
  1. hadoop jar $HADOOP_HOME/hadoop-test-*.jar testfilesystem -files 10 -megaBytes 10:
    This command will generate 10 files with 10MB each for testing.
  2. Benchmark the distributed write consistency on the distributed filesystem:
    hadoop jar $HADOOP_HOME/hadoop-test-*.jar DistributedFSCheck -write -nrFiles 10 -fileSize 50
    This command will write 10(controlled by the -nrFiles option) files of 50 MB (controlled by the -fileSize option) with random content to the HDFS.
  3. Similarly, we can benchmark the distributed read consistency on the distributed filesystem:
    hadooop jar $HADOOP_HOME/hadoop-test-*.jar DistributedFSCheck -read -nrFiles 10 -fileSize 50
    This command will read 10 files with the size of 50MB from the cluster and will generate a result file.

From the output of the two(read/write consistency check) commands, we know that writing is more expensive than reading for the HDFS cluster. This is because the write operations, in turn, involve more operations, such as computing and recordign the checksums of the data blocks and many more.

Perform MapReduce clsuter benchmarks
  1. Benchmark MapReduce jobs:
    hadoop jar $HADOOP_HOME/hadoop-test-*.jar mapredtest 5 1000
    The mapredtest benchmark does a load test on the MapReduce computing framework. This benchmark is done with random integers, which are generated written to files, read read back from files, and tested with the original files.
  2. Test the reliability of the MapReduce distributing computing framework:
    hadoop jar $HADOOP_HOME/hadoop-test-*.jar MRReliabilityTest -libjars $HADOOP_HOME/hadoop-examples-*.jar
    This command will intentionally cause task and TaskTracker failures on a running job.
    The failed tasks will be re-executed on a diffferent TaksTracker. So, killing one or a few tasks should not fail a job if the cluster is resilient to failures. If a job fails after a few killed tasks, it is possibly because MapReduce is not reliable enough or not resilient to failures, and hence reliability tuning(such as by adding more computing TaskTrackers) is needed.
  3. Benchmark MapReduce to deal with a large number of small jobs:
    hadoop jar $HADOOP_HOME/hadoop-test-*.jar mrbench -numRuns 20
    mrbench executes a small job a number of times(20 in this command) and checks if these small jobs are responsive and can run effciently on the cluster.
  4. Benchmark the MapReduce load generator:
    hadoop jar $HADOOP_HOME/hadoop-test-*.jar loadgen -m 100 -r 20 -keepmap 50 -keepred 50 -indir input -outdir output
  5. Do a stress test with the NameNode:
    hadoop jar $HADOOP_HOME/hadoop-test-*.jar nnbench -create_write
  6. Test the Hadoop performance with large non-splittable files:
    hadoop jar $HADOOP_HOME/hadoop-test-*.jar testbigmapoutput -input input -output output -create 2048
  7. Test thread map spills:
    hadoop jar $HADOOP_HOME/hadoop-test-*.jar threadedmapbench
    threadedmapbench is a MapReduce benchmark that compare the performance of maps with multiple spills to maps with one spill.
Sort is a typical operation of MapReduce jobs. By sorting random data, we can peek the health of our Hadoop cluster.
Benchmark Hadoop sort
  1. Generate some random text data:
    hadoop jar $HADOOP_HOME/hadoop-examples-*.jar randomwriter random.writer.out
  2. Sort the generated random data:
    hadoop jar $HADOOP_HOME/hadoop-examples-*.jar sort random.writer.out random.writer.out.sorted
  3. Validate the MapReduce sort algorithm:
    hadoop jar $HADOOOP_HOME/hadoop-test-*.jar tsetmapredsort -m 50 -r 5 -sortInput random.writer.out -sortOutput random.writer.out.sorted
    This command will validate the accuracy of the sort algorithm.

Analyzign job history with Rumen

Current Rumen implementation includes two components: TraceBuilder and foler. The TraceBuilder takes job history as input and generates easily parsed json files. The foler is a utility to manipulate on input traces, and, most of the time, it is used to scale the summarized job traces from the TraceBuilder.
  1. Use the TrackeBuilder to extract the Gold Trace from the Hadoop job history files. The syntax of the command:
    hadoop org.apche.hadoop.tools.rumen.TraceBuilder [options]
    hadoop org.apache.hadoop.tools.rumen.TraceBuilder -recursive file:///tmp/jobtraces.json file:///tmp/topology.out file:///usr/local/hadoop/logs/history/done
  2. The second step of using Rumen is to scale the data generated from the previous step:
    hadoop org.apche.hadoop.tools.rumen.Folder [options] [input] [output]
    For example, to scale the runtime of the job trace generated in the previous step to 50 minutes:
    hadoop org.apache.hadoop.tools.rumen.Folder -output-duration 50m -input-cycle 20m file:///tmp/jobtraces.json file:///tmp/job-scaled-50min.json
PS: 在 hadoop 1.2.1 下测试 Rumin 时,总报找不到 rumin 库的错误,看完下面 Gridmix3 运行前提条件后,把 hadoop-tools-*.jar 复制一份到 $HADOOP_HOME/lib 后再测试,果然就可以了。

Benchmarking a Hadoop cluster with GridMix

GridMix is a tool for benchmarking Hadoop clusters. It generates a number of synthetic MapReduce jobs and builds a model based on the performance of these jobs. Resource profiles of the cluster are modeled based on the job execution metrics. ssh hduser@master
  1. cd $HADOOP_HOME/src/benchmark/gridmix2
  2. ant
  3. cp build/gridmix.jar
  4. Open the gridmix-env-2 file:
    export HADOOP_VERSION=hadoop-1.1.2
    export HADOOP_HOME=/usr/local/hadoop
    export HADOOP_CONF_DIR=$HADOOP_HOME/conf
    export USE_REAL_DATASET=FALSE
    
    export APP_JAR=${HADOOP_hOME}/hadoop-test-*.jar
    export EXAMPLE_JAR=${HADOOP_HOME}/hadoop-examples-*.jar
    export STREAMING_JAR=${HADOOP_HOME}/contrib/streaming/hadoop-streaming-*.jar
    
  5. open the gridmix_config.xml GridMix2 configuration file and change the benchmark configuration by changing the properties for the benchmark.
  6. 
     streamSort.smallJobs.numOfJobs
     10,5
    
    
    
     streamSort.smallJobs.numOfReduces
     6,3
    
    
    These two properties specify that we will use 10 small stream sort jobs with 6 reducers, and 5 small stream sort jobs with 3 reducers.
  7. Make the generateGridmix2Data.sh script executable:
    chmod +x generateGridmix2Data.sh
  8. Generate data with the following command:
    ./generateGridmix2Data.sh
    This command will generate data on HDFS. By default the generated data will be compressed with a block compression ratio of 4. Three jobs will be started.
  9. chmod +x rungridmix-2
  10. ./rungridmix_2
GridMix is a benchmark for Hadoop clusters. It is generally used to model the performance profile of a Hadoop cluster by running a number of jobs.
The data required by GridMix is generated by running the generatedGridmix2data.sh script. We can configure this file to change, for example, the size of the generated adta file. Then , when executing the rungridmix_2 script, a number of jobs will be genearted and submitted in batch mode. In the end, the running time of these jobs will be computed. GridMix2 is shipped with the following representative jobs: streamSort, javaSort, webdataSort, combiner, and monsterSort. These jobs can be classfied int to the following categories:
  • A three-stage MapReduce job, which is motivated by the multistage or pipelined MapReduce jobs.
  • Large sort of variable key/value sizes, which is motivated by the processing of large datasets.
  • Reference select jobs, which is motivated by jobs that sample from a large, reference data set.
  • API text sort jobs, which is motivated by the application of MapReduce APIs for sorting.
A GridMix benchmark is a mix of a number of small, medium, and large jobs from different categories. We can specify the mix in the gridmix_config.xml file. Based on the specification, a number of jobs will be created and submitted to the Hadoop cluster until it finishes.
Benchmarking Hadoop cluster with GridMix1
  1. cd $HADOOP_HOME/src/benchmarks/gridmix
  2. Change the configuration in the gridmix-env file:
    export HADOOP_HOME=/usr/local/hadoop
    export GRID_MIX_HOME=$HADOOP_HOME/src/benchmarks/gridmix
    export APP_JAR=${HADOOP_HOME}/hadoop-test-*.jar
    export EXAMPLE_JAR=${HADOOP_HOME}/hadoop-examples-*.jar
    export STREAMING_JAR=${HADOOP_HOME}/contrib/streaming/hadoop-streaming-*.jar
    export GRID_MIX_DATA=/gridmix1/data
    export GRID_MIX_PROG=/gridmix1/programs
    
    The last two environment variables GRID_MIX_DATA and GRID_MIX_PROG specify two directories on HDFS. So, the generated data and programs will be on HDFS.
  3. chmod +x generateData.sh
  4. sh ./generateData.sh
  5. Run a small javasort job:
    sh javasort/text-sort.small
Benchmarking Hadoop cluster with GridMix3
  1. Copy the required JAR files to the Hadoop lib directory:
    cp $HADOOP_HOME/hadoop-tools-*.jar $HADOOP_HOME/lib
    cp $HADOOP_HOME/contrib/gridmix/hadoop-gridmix-*.jar $HADOOP_hOME/lib
    The hadoop-tools-*.jar file contains tools such as Rumen, which is needed by GridMix3. And the hadoop-gridmix-*.jar file contains the GridMix3 benchmark tool. In addition, the GridMix3 job mix for a Hadoop cluster is typically described with a job trace file, which is generated from job configuration files using Rumen.
  2. Use Rumen to generate a job trace file:
    hadoop org.apache.hadoop.tools.rumen.TraceBuilder -recursive file:///tmp/jobtrace.son file:///tmp/topology.out file:///usr/local/hadoop/logs/history/done
  3. Run the GridMix3 benchmark:
    hadoop org.apche.hadoop.mapred.gridmix.Gridmix -generated 100m gridmixdata /tmp/jobtrace.json
  4. To acquire the usage and available parameters for GridMix3:
    hadoop org.apche.hadoop.mapred.gridmix.Gridmix

Using Hadoop Vaidya to identify performance problems

Hadoop Vaidya is an open source, rule-based performace diagnostic framework for Apache Hadoop. Each rule can identify a specific performance problem. For example, Hadoop cluster administarators can use Vaida to identify slwo progressing jobs that are wasting cluster resources. Hadoop clusters can use Vaidya to identify configuration mistakes for their submitted jobs.
Perform the followings to use Hadoop Vaidya on master:
  1. Locate the directory for the job configuration file you want to analyze. The default location of this folder is $HADOOP_HOME/logs
  2. Locate the job history files under the job history directory:
    find $HADOOP_HOME/logs -name 'job_201304012330_0001*'
  3. Use Vaidya to analyze the job trace files:
    sh $HADOOP_HOME/contrib/vaidya/bin/vaidya.sh -jobconf file:///use/local/hadoop/logs/history/job_20130412330_0002_conf.xml -joblog file:///usr/local/hadoop/logs/history/job_20130412330_002_1364874504561_hduer_TeraGen -report report.txt

Hibench is a benchmarking suite for Hadoop. It has nine typical workloads, including mircro, HFS, web search machine learning, and data analytics benchmarks. For example, it supports benchmarks for Nutch(a text indexing software package), PageRank(the PageRank algorithm), the Mahout machine learning argorithms and Hive queries.

Balancing data blocks for a Hadoop cluster

HDFS stores data blocks on DataNode machines. When Hadoop processes jobs, data is generated and deleted. Over time, some DataNodes can host much more data blocks than others. This unbalanced distribution of data on the cluster is called data skew.
Data skew is a big problem for a Hadoop cluster. We know that when the JobTracker assigns tasks to TaskTrackers, it follows the general rule of being data local, which means the map tasks will be assigned to those hosts where data blocks reside in. If the data block storage distribution is skewed, or in other words, the data blocks locate only on a small percentage of DataNodes, only those nodes with data blocks can follow the data local rule. Also, if JobTracker assigns tasks to other nodes that do not have data hosted locally, the data needs to be transferred from remote machines to the TaskTracker machine. The data transfer will cost a large amount of network bandwidth, downgrading the overall performance of the cluster.
Balance HDFS data blocks with balancer:
  1. Check the data skew through the web UI.
  2. Use the following command to balance the data blocks on the DataNode machines:
    hadoop balancer -threshold 0.2
    This command will take some time to finish depending on the status of the distributioned filesystem as well as the value for the --threshold option. The -threshold option specifies the threshold for whether the cluster is balanced. It is a real number within ragnge [0, 1] with a default value of 0.1. A smaller value for this option leads to more even distribution of the data blocks. On the other hand, it will require more time to finish. Setting this option to be 0 is not recommended because it is not practical to achieve an ideal balance.
  3. Alternatively, we can start the Hadoop balancer daemon to automatically balance the data blocks on HDFS.
    start-balancer.sh
    The balancer will move data blocks among the DataNodes according to the space utilization.
    PS:
    start-balancer.sh 可以执行 -threshold 参数。 -threshold 参数是指定平衡的阈值,默认是 10, 即每个 DataNode 节点的实际 HDFS 存储使用量/集群 HDFS 存储量 。
    eg: DataNode HDFS 使用量1.2G; 集群总 hdfs存储量 10T 即 10000G; 则t值为1.2/10000 = 0.00012; 当执行 balance 的 -threshold 参数小于 0.00012 时,集群进行balance;所以命令可为:start-balancer.sh -threshold 0.0001
    start-balancer.sh 命令可在 NameNode 或 DataNode 启动; 可以随时停止 stop-balancer.sh。 默认带宽是 1M/s。

    The Hadoop balancer balances data blocks on HDFS according to a preconfigured threshold value, which sets the target for whether the cluster is balanced or not. A node id considered balanced if the difference between space utilization of the node and space utilization of the cluster is less than the threshold.

    Sometimes, we want to limit the percentage of bandwidth used by the balancer. By default, Hadoop defines a property dfs.balance.bandwidthPerSec, which determines the maximum speed that a data block will be moved from one DataNode to another. Its default value is 1MB/s. By configuring this property to be a higher value, the balancing speed will be faster, but more resources will be used. For example, to change the value of this property to be 10MB/s, we can change this in the $HADOOP_HOME/conf/hdfs-site.xml.

Choosing a Proper Block Size

HDFS stores data as data blocks distributed on multiple machines. So, when a large file is put onto HDFS, it will first be split into a number of data blocks. These data blocks are then distributed by the NameNode to the DataNodes in the cluster. The granularity of the data blocks can affect the distribution and parallel execution of the tasks.
Based on the property of the jobs being executed, one block size might result in better performance than others.
Configure the proper HDFS block size:
  1. Run a typical job on the configured cluster.
    hadoop jar $HADOOP_HOME/hadoop-examples-*.jar terasort input output
  2. Use Rumen to generate job traces from the job history file and the job logfile:
    hadoop org.apache.hadoop.tools.rumen.TraceBuilder file:///tmp/jobtraces.json file:///tmp/topology.out file:///usr/local/hadoop/logs/history/done/job_201304012206_0002_conf.xml file:///usr/local/hadoop/logs/history/done/job_201304012206_0002_log
  3. Use GridMix3 to generate a Hadoop cluster benchmark with different block size:
    hadoop org.apche.hadoop.mapred.gridmix.GridMix -generate 10m input jobtraces.json
  4. stop-all.sh
  5. Change the dfs.block.size in the $HADOOP_hOME/conf/hdfs-site.xml.
  6. stop-all.sh

Using Compression for input and output

A typical MapReduce job uses parallel mapper tasks to load data from external storage devices, such as hard drives to the main memory. When a job finished, the reduce tasks write the result data back to the hard drive. In this way, during the life cycle of a MapReduce job, many data copies are creted when dat is relayed between the hard drive and the main memroy. Sometimes, the data is copied over the network from a remote node.
Copying data from and to hard drives and transfers over the network are expensive operations. To reeuce the cost of these operations, Hadoop introduced compression on the data.
Data compression in Hadoop is done by a compression codec, which is a program that encodes and decodes data streams. Although compression and decompression can cause addditional cost to the system, the advantages far outweigh the disadvantages.
Configure input and output data compression in the $HADOOP_HOME/conf/mapred-site.xml on the Hadoop master machine.
  1. stop-all.sh
  2. Enable output compression:
    
     mapred.output.compress
     true
    
    
  3. Specify the output compression codec:
    
     mapred.output.compression.codec
     org.apache.hadoop.io.compress.GzipCodec
     
    
    
  4. Change the ouptut compression type for sequence file output:
    
     mapred.output.compression.type
     BLOCK
     
    
    
  5. Configure teh map output compression:
    
     mapred.compress.map.output
     true
     
    
    
  6. Similar to the codec configuration for the MapReduce job output, we can perform confiugration compression codecs for the map task output, the default of which is org.apche.hadoop.io.compress.DefaultCodec.
    
     mapred.map.output.compression.codec
     org.apache.hadoop.io.compress.GzipCodec
    
    
  7. Copy the configuration file from the master node to all the slave nodes:
    for host in `cat $HADOOP_HOME/conf/slaves`
    do
     echo 'Copying map-site.xml file to host:' $host
     scp $HADOOP_HOME/conf/mapred-site.xml $host:$HADOOP_HOME/conf/
    done
    
  8. start-all.sh
Available compression codecs :
Codec name Java class
DefaultCodec org.apache.hadoop.io.compress.DefaultCodec
GzipCodec org.apache.hadoop.io.compress.GzipCodec
BZip2Codec org.apache.hadoop.io.compress.BZip2Codec
SnappyCodec org.apache.hadoop.io.compress.SnappyCodec
LzoCodec org.apache.hadoop.io.compress.LzoCodec

Configuring Speculative Execution

Speculative Execution is a proactive performance boosting strategy used by JobTracker to execute one task on two TaskTracker instances. When either of these tasks finishes, the other tasks will be killed. By default, speculative execution is on.
Speculative execution can be helpful to improve the performance of MapReduce jobs by reducing the execution time for slowly progressing tasks. For example, on heterogeneous Hadoop cluster with different hardware configurations, low performance computing nodes can greatly prolong the execution time of a MapReduce job. Speculative execution can remedy this problem by prioritizing the high performance nodes for MapReduce tasks execution. Hence, the MapReduce execution time can be shortened.
On the other hand, speculative execution can negatively affect the performance of the cluster when a lot of resources are used for speculative execution. For example, many tasks will have to wait for slots that are used for speculative execution.
Configure Hadoop speculative execution in the $HADOOOP_HOME/conf/mapred-site.xml:
  1. stop-mared.sh
  2. Disable the map task speculative execution :
    
     mapred.map.tasks.speculative.exeution
     false
     
    
    
  3. Disable the reduce task speculative execution:
    
     mapred.reduce.tasks.speculative.execution
     false
    
    
  4. Configuring the maximun percentage of concurrently running speculative tasks:
    
     mapreduce.job.speculative.speculativecap
     0.2
    
    
  5. Configuring the job speculative execution threshold for slow tasks:
    
     mapreduce.job.speculative.slowtaskthreshold
     1.0
    
    
  6. Configure the threshold for a TaskTracker to speculatively execute slow tasks:
    
     mapreduce.job.speculative.slownodethreshold
     1.0
    
    
  7. Sync the configuration to the slaves.
  8. start-mapred.sh
If speculative execution has been enabled for a Hadoop cluster, we can still disable it for specific jobs. For example, when we write MapReduce jobs using Java programming lanaguage, we can use the following code snippet to disable speculative execution for this job:
Configuration conf = new Configuration();
conf.set("mapred.map.tasks.speculative.execution", "false");
conf.set("mapred.reduce.tasks.speculative.execution", "false");
The three properties mapreduce.job.speculative.speculativecap, mapreduce.job.speculative.slowtaskthreshold, and mapreduce.job.speculative.slownodethreshold control when the JobTracker should start a speculative task. Specifically, a speculative task for a regular task will start when the following conditions are met:
  • Speculative execution is enabled.
  • The completion rate, in percentage, of the regular task is less than mapreduce.job.speculative.slowtaskthreshold times the mean completion rate of all other tasks.
  • The completion rate, in percentage, of the regular task is less than mapreduce.job.speculative.slownodethreshold times the mean completion rate of all other tasks on the current TaskTracker.
  • The number of launched speculative tasks is smaller than the configured speculative map.

Setting proper number of map and reduce slots for the TaskTracker

The number of map and reduce slots determines the number of concurrent map/reduce tasks for a TaskTracker, which forks multiple JVMs to run these tasks.

Configure map/reduce slots for a TaskTracker:
  1. stop-mapred.sh
  2. Configrue the map slots in the $HADOOP_HOME/conf/mapred-site.xml:
    
     mapred.tasktracker.map.tasks.maximum
     4
    
    
  3. Similarly, configure the number of reduce slots for a TaskTracker:
    
     mapred.tasktracker.reduce.tasks.maximum
     4
    
    
  4. Configure the memory usage for each slot:
    
     mapred.child.java.opts
     -Xmx1024m
    
    
  5. Sync the configuration to all the slave nodes
  6. start-mapred.sh

Tuning the JobTracker Configuration

Configure the JobTracker in the $HADOOP_HOME/conf/mapred-site.xml:
  1. stop-mapred.sh
  2. Configure the maximum number of tasks for a job:
    
     mapred.jobtracker.maxtasks.per.jos
     3000
     
    
    
  3. Configure the JobTracker to recover upon restart:
    
     mapred.jobtracker.restart.recover
     true
     
    
    
  4. Configure the block size for the job history file
    
     mapred.jobtracker.job.history.block.size
     3145728
    
    
  5. Configure the task scheduler for the JobTracker:
    
     mapred.jobtracker.taskScheduler
     org.apache.hadoop.mapred.JobQueueTaskScheduler
     
    
    
  6. Configure the maximum running tasks for a job:
    
     mapred.jobtracker.taskScheduler.maxRunningTasksPerJob
     20
     
    
    

Tuning the TaskTracker Configuration

Configure the TaskTracker property in the $HADOOP_HOME/conf/mapred-site.xml:
  1. mapred-stop.sh
  2. Configure the MapReduce cluster heartbeat interval:
    
     mapred.tasktracker.expiry.interval
     600000
     
     
    
  3. Configure the sleep time before sending the SIGKILL signal
    
     mapred.tasktracker.tasks.sleeptime-before-sigkill
     6000
     
    
    
  4. Enable the TaskTracker memory management:
    
     mapred.tasktracker.tasks.maxmemory
     true
    
    
  5. Configure the TaskTracker index cache size to be 20MB:
    
     mapred.tasktracker.indexcache.mb
     20
     
    
    
  6. Configure the monitoring interval for the TaskTracker's task memory manager:
    
     mapred.tasktracker.taskmemorymanager.monitoring-interval
     5000
     
    
    
  7. Configure the TaskTracker to send an out-of-band heartbeat on task completion:
    
     mapreduce.tasktracker.outofband.heartbeat
     true
     
    
    
  8. Confiugre the maximum number of retries for a map task:
    
     mapred.map.max.attempts
     4
     
    
    
  9. Configure the maximum number of retries for a failed reduce task:
    
     mapred.reduce.max.attempts
     4
     
     
    
  10. Sync the configuration from the master node to all the slave nodes.
  11. start-mapred.sh

Tuning shuffle, merge, and sort parameters

In a MapReduce job, map tasks outputs are aggregated into JVM buffers. The size of the in-memory buffer determines how large the data can be merged and sorted at once. Too small a buffer size can cause a large number of swap operations, incurring big overhead.

Configure the sorting parameters in the $HADOOP_HOME/conf/mapred-site.xml:
  1. stop-mapred.sh
  2. Configure the buffer size, in megabytes, for sorting by changing the property:
    
     io.sort.mb
     100
     
    
    
  3. Configure the merger factor
    
     io.sort.factor
     100
     
    
    
  4. Change the percentage of buffer dedicated for record collection:
    
     io.sort.record.percent
     0.05
     
    
    
  5. Change the spill factor for buffers:
    
     io.sort.spill.percent
     0.8
     
    
    
  6. Configure the in-memory threshold:
    
     mapred.inmem.merge.threshold
     1000
     
    
    
  7. Configure the percentage of memory to be allocated from the maximum heap size to storing map outputs during the shuffle:
    
     mapred.job.shuffle.buffer.percent
     0.70
     
    
    
  8. Configure the threshold to start the in-memory merge:
    
     mapred.job.shuffle.merge.percent
     0.66
     
    
    
  9. Configure the percentage of memeory of retain map outputs during the reduce phase:
    
     mapred.job.reduce.input.buffer.percent
     0.0
     
    
    
  10. Configure the maximum retries in case of fetch failures:
    
     mapreduce.reduce.shuffle.maxfetchfailures
     10
     
    
    
  11. Sync the configuration from the master node to all the slave nodes.
  12. start-mapred.sh

Configure memory for a Hadoop cluster

Configure memory properties for a Hadoop Cluster in the $HADOOP_HOME/conf/mapred-site.xml:
  1. stop-mapred.sh
  2. Configure the virtual memory size, in megabytes, for a map task used by a scheduler:
    
     mapred.cluster.map.memory.mb
     200
     
    
    
  3. Similarly, we can configure the virtual memory size, in megabytes, for a reduce task used by a scheduler:
    
     mapred.cluster.reduce.memory.mb
     512
    
    
  4. Configure the maximum virtual memory size for a map task used by a scheduler:
    
     mapred.cluster.max.map.memory.mb
     512
     
    
    
  5. Configure the maximum virtual memory size for a reducer task used by a scheduler:
    
     mapred.cluster.max.reduce.memory.mb
     512
    
    
  6. Configure the maximum virtual memroy size for a single map task for the job by a scheduler:
    
     mapred.job.map.memory.mb
     0.8
     
    
    
  7. Configure the maximum virtual memory size for a single reduce task for the job used by a scheduler:
    
     mapred.job.reduce.momory.mb
     0.8
    
    
  8. Sync the configuration from the master to all the slave nodes.
  9. start-mapred.sh

Setting proper number of parallel copies

When all or part of the map tasks finish, map outputs will be copied from the map task nodes to the reduce task nodes. The parallel copying strategy is used to increase the transfer throughput.

Configure the number of parallel copies in teh $HADOOP_HOME/conf/mapred-site.xml:
  1. stop-mapred.sh
  2. 
     mapred.reduce.parallel.copies
     20
     
    
    
  3. Sync the configuration file from master node to slave nodes.
  4. start-mapred.sh

Tuning JVM parameters

Configure JVM parameters in the $HADOOP_HOME/conf/mapred-site.xml:
  1. stop-all.sh
  2. 
     mapred.child.java.opts
     -Xmx512M
     
    
    
  3. Sync the configuraiton file from master node to slave nodes.
  4. start-all.sh

Configuring JVAM Reuse

MapReduce tasks are executed by JVM processes/threads, which are forked by the TaskTracker. The creation of a JVM, which include the initialization of execuation environmnet, is costly, especially when the number of tasks is large. In the default configuration, the number of JVMs needed to finish a job should be equal to the number of the tasks. In other words, the default setting uses one JVM to execute one task. When the execution of a task completes, its JVM will be killed by the TaskTracker.

JVM Reuse is an optimization of reusing JVMs for multiple tasks. If it is enabled, multiple tasks can be executed sequentially with one JVM.

Configure JVM Reuse in the $HADOOP_HOME/conf/mapred-site.xml:
  1. stop-mapred.sh
  2. 
     mapred.job.reuse.jvm.num.tasks
     2
     
    
    
  3. Sync the configuration file from master node to slave nodes.
  4. start-mapred.sh

Configuring the reducer intialization time

Reduce tasks can be startd when a certain percentage of map tasks has been finished. By setting this property with a smaller with a smaller number, the reduce tasks will start earlier, occupying the computing slots. Once the other hand, if the number is set too large, for example, very close to 1, the reduce tasks will have to wait for the majority of the map tasks to finish, prolonging the job execution time.

Configure reducer initialization time in the $HADOOP_HOME/conf/mapred-site.xml:
  1. stop-mapred.sh
  2. 
     mapred.reduce.slowstart.completed.maps
     0.05
    
    
  3. Sync the configuration file to all the slave nodes.
  4. start-mapred.sh