2014年5月16日星期五

Chapter 2 Storing Data in Hadoop

HDFS

HDFS is implemented as a block-structured filesystem. individual fiels are broken into blocks of a fixed size, which are stored across an Hadoop cluster. A file can be mand up of several blocks, which are stroed on different DataNode(indiidual machine in the cluster) chosen randomly on a block-by-block basis. As a result, access to a file usually requires access to multiple DataNodes, which means that HDFS supports file sizes far larger than a single-machine disk capacity.

The DataNode stores each HDFS data block in a separate file on its local filesystem with no knowledge about the HDFS files themselves. To improve throughput ever futher, the DataNode does not create all files in the same directory. In stead, it uses heuristics to determine the optimal number of files per directory, and created subdirectories appropriately.

One of the requirements for such a block-structured filesystem is the capacity to store, manage, and access file metadata(information about files and blocks) reliably, and to provide fast access to the metadata store. Unlike HDFS files themselves(which are accessed in a write-once and read-many model), the metadata structures can be modified by a large number of clients concurrently. It is important that this information never gets out of sync. HDFS solves this problem by introducing a dedicated special machine, called the NameNode, which stores all the metadata for the filesystem across the cluster. This means that HDFS implements a master/slave architecture. A single NameNode(which is a master server) manages the filesystem namespace and regulates access to files by clients. The existence of a single master in a cluster greatly simplifies the architecture of the system. The NameNode serves as a single arbitrator and respository for all HDFS matadata.

Metadata storage is also perssitent. The entire filesystem namespace(including the mapping of blocks to files and filesystem properties) is contained in a file called FsImage stored as a file in the NameNode's local filesystem. The NameNode also uses a transaction log to persistently record every change that occurs in filesystem metadata(metadata store). This log is stored in the edits log file on the NameNode's local filesystem.

The SecondaryNameNode is not a "backup NameNode". It cannot take over the primary NameNode's function. It serves as a checkpointing mechanism for the primary NameNode. In addition to storing the state of the HDFS NameNode, it maintains two on-disk structures that persist the current filesystem state: an image file and an edit log. The image file represents an HDFS metadata state at a point in time, and the edit log is a transactional log(compare to a log in a database architecture) of every filesystem metadata change since the image file was created.

During the NameNode (re)starts, the current state is recontstructetd by reading teh image file and then replaying the edit log. Obviousely, the larger the edit log is, the longer is takes to replay it and consequently start a NameNode. To improve NameNode startup performance, an edit log is priodically rolled, and a new image file is created by applying an edit log to the existing image. This operation can be fairly resource-intensive. To minimize the impact of checkpoint creation and the NameNode functioning, checkpointing is performed by the SecondaryNameNode daemon, often on a separate machine.

As a result of checkpointing, the Secondary NameNode holds a copy (out-of-date) of the primary's persistent state in the form of last image file. In the cases when and edit file is kept relatively small, a secondary node can be used to recover the filesystem's state. In this case, you must be aware of a certain amount of metadata (and corresponding data) loss, because the latest changes stored in the edit log are not available.

To keep the memory of footprint of the NameNode manageable, the default size of an HDFS block is 64MB -- orders of magnitude larger than the block size of the majority of most other block-structured filesystem. The additional benefit of the larger data block is that it allows HDFS to keep large amounts of data stored on the disk sequentially, which supports fast streaming reads of data.

One of the misconceptions about Hadoop is the assumption that smaller blocks(less then the block size)will still use the whole block on the filesystem. This is not the case. The smaller blocks occupy exactly as much disk space as they require.

But this does not mean that having many small files will use HDFS efficiently. Regardless of the block size, its metadata occupies exactly the same amount of memroy in the NameNode. As a result, a large number of small HDFS files(smaller than the block size) will use a lot of the NameNode's memory, thus negatively impacting HDFS scalability and performance.

The downside of HDFS fiel organization is that several DataNodes are involved in the serving of a file, which means that a file can become unavailable in the case any one of those machines is lost. To avoid this problem, HDFS replicates each block across a number of machines(three, by default).

Data replication in HDFS is implemented as part of a write operation in the form of a data pipeline. When a client is writing data to an HDFS file, this data is first written to a local file. When the local file accumulates a full block of data, the client consults the NameNode to get a list of DataNodes that assigned to host replicas of that block. The client then writes the data block from its local storage to the first DataNode in 4K portions. The DataNode stores the received blocks in a local filesystem, and forwards that portion of data to the next DataNode in the list. The same operation is repeated by the next receiving DataNode until the last node in the replica set receive data. This DataNode stores data locally without sending it any further.

If one of the DataNodes fails while block is being written, it is removed from the pipeline. In this case, when the write operation on the current block completes, the NameNode re-replicates it to make up for the missing replica caused by the failed DataNode. When a file is closed, the remaining data in the temporary local file is piplelined to the DataNodes. The client then informs the NameNode that the file is closed. At this point, the NameNode commits the file creation operation into a persistent store. If the NameNode dies before the file is closed, the file is lost.

The default block size and replication factor are specified by Hadoop configuration, but can be overwritten on a per-file basis. An application can specify block size, the number of replicas, and the replication factor for a specific file at its creation time.

One of the most powerful features of HDFS is optimization of replica placement, which is crucial to HDFS reliability and performance. All decisions regarding replication of blocks are made by the NameNode, which periodically(every 3 seconds)receives a heartbeat and a block report from each of the DataNodes. A heartbeat is used to ensure proper functioning of DataNodes, and a block report allows verfifying that a list of blocks on a DataNode corresponds to the NameNode information. One of the first things that a DataNode does on startup is sending a block report to the NameNode. This allows the NameNode to rapidaly form a picture of the block distribution across the cluster.

An importatnt characteristic of the data replication in HDFS is rack awareness. Large HDFS instances run on a cluster of computers that is commonly spread across many racks. Typically, network banwidth(and consequently network performance) between machines in the same rack is greater than network bandwidth between machines in different racks.

The NameNode determines the rack ID that each DataNode belongs to via the Hadoop Rack Awareness process. A simple policy is to place replicas on unique racks. This policy prevents losing data when an entire rack is lost, and evenly distributes replicas in the cluster. It also allows using bandwidth from multiple racks when reding data. But because a write must, in this case, transfer blocks to multiple racks, the performance of writes suffers.

An optimization of a Rack Aware policy is to cut inter-rack write traffic(and consequently improve write performance)by using the number of racks that is less than the number of replicas. For example, when a replication factor is three, two replicas are placed on one rack, and the third one is on a different rack.

To minimize global bandwidth consumption and read latency, HDFS tries to satisfy a read request from a relica that is closest to the reader. If a replicas exists on the same rack as the reader node, that relica is used to satisfy the read request.

As mentaioned, each DataNode periodically sends a heartbeat message to the NameNode, which is used by the NameNode to discover DataNode failures(based on missiong heartbeat). The NameNode marks DataNode without recent heartbeats as dead, and does not dispatch any new I/O requests to them. Because data located at a dead DataNode is no longer available to HDFS, DataNode death may cause the replication factor of some blocks to fall below their specified values. The NameNode constantly tracks which blocks must be re-relicated, and initiates replication whenever necessary.

Using HDFS Files
When a file is opened for writing, opening client is granted an exclusive writing lease for the file. This means that no other client can write to this file until this client completes the operation. To ensure that no "runaway" clients are holding a lease, the lease priodically expires. The use of leases effectively ensures that no two applications can simultaneously write to a given file(compared to a write lock in the database).

The lease duration is bound by a soft limit and a hard limit. For the duration of a soft limit, a writer has an exlusive access to the file. If the soft limit expires and the client fails to close the file or renew the lease(by sending a heartbeat to the NameNode), another client can preempt the lease. If the hard limit(one hour) expires and the client has failed to renew the lease, HDFS assumes that the client has quit, and automatically closes the file on behalf the writer, and then recovers the lease.

The writer's lease does not prevent other clients from reading the file. A file may have many concurrent readers.

Hadoop-Specific File Types
In addition to "ordinary" files, HDFS also introduced several specialized file types(such as SequenceFile, MapFile, SetFile, ArrayFile, and BloomMapFile)that provide much richer functionality, which often simplifies data processing.

SequenceFile provides a persistent data structure for binary key/value pairs. Here, different instances of both key and value must represent the same Java class, but can have different sizes. Similar to other Hadoop files, SequenceFiles are append-only.

When using an ordinary file(either text or binary)for storing key/value pairs(typical data structures for MapReduce), data storage is not aware of key and value layout, which must be implemented in the readers-on top of generic storage. The use of SequenceFile provides a storage mechanism natively supporting key/value structure, thus making implementations using this data layout much simpler.

SequenceFile has three available formats: Uncompressed, Record-Compressed, and Block-Compressed. The first two are stored in a record-based format, whereas the third one uses block-bases format.

The choice of a specific format for a sequence file defines the length of the file on the hard drive. Block-Compressed files typeically are the smallest, while Uncompressed are the largest.

SequenceFile Header
FIELD DISCRIPTION
Version A 4-byte array conaining three letters(SEQ) and a sequence file version number (either 4 or 6). The currently used version is 6. Version 4 is supported for backward compatibility.
Key Class Name of the key class, which is validated against a name of the key class provided by the reader.
Value Class Name of the value class, which is validated against a name of the key class provided by the reader.
Compression A key/value compression flag
Block Compression A block compression flag
Compression Codec CompressionCodec class. This class is used only if either key/value or block compression flag are true. Othersize, this value is ignored.
Sync A sync makeer

A sync is specialized marker, which is used for faster search inside SequenceFile. A sync marker also use in MapReduce implementation - data splits are done only on sync boundaries.

Record Layout
FIELD DESCRIPTION
Record Length The length of the record(bytes)
Key Length The length of the key(bytes)
Key Byte array, containing the record's key
Value Byte array, containing the record's value
In this case, header and sync are serving the same purposes as in the case of a record-based SequenceFile format. The actual data is contained in the blocks.

Block Layout
FIELD DESCRIPTION
Keys lengths length In this case, all the keys for a given block are sorted together. This field specified compressed key-lengths size(in bytes)
Keys Lengths Byte array, containing compressed key-lengths block
keys length Compressed keys size(in bytes)
Keys Byte array, containing compressed keys for a block
Values lengths length In this case, all the valeus for a given block are stored together. This field specifies compressed value-lenghts block size(in bytes)
Values Lengths Byte array, containing compressed value-lengths block
Values length Compressed value size(in bytes0
Values Byte array, containing compressed values for a block

All formats use the same header that contains information allowing the reader to recognize it. The header contains key and value class names that are used by the reader to instantiate those classes, the version number, and compression information. If compression in enabled, the Compression Codec class name field is addes to the header.

Metadata for the SequenceFile is a set of key/value text pairs that can conatin additional information about the SequenceFile that can be used by the file reader/writer.

Implementation of write operations for Uncompressed and Record-Compressed format is very similar. Each all to an append() method adds a record containing the length of the whole record(key length plus value length), the length of the key, and the raw of key and value to the SequenceFile. The difference between the compressed and the uncompressed version is whether or not the raw data is compressed, with the specified code.

The Block-Compressed format applies a more aggressive compression. Data is not written until it reaches a threshold (block size), at which point all keys are compressed together. The same thing happnens for the values and the lists of key and value lengths.

A special reader(SequenceFile.Reader) and writer(SequenceFile.Writer) are provided by Hadoop for use with SequenceFiles.
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path path = new Path("filename");
SequenceFile.Writer sequenceWriter = new SequenceFile.Writer(fs, conf, path, Key.class, Value.class, fs.getConf().getInt("io.file.buffer.size", 4096), fs. getDefaultReplication(), 1073741824, null, new Metadata());

.......................................
SequenceWriter.append(bytesWritable, bytesWritable);
.......................................
IOUtils.closeStream(sequenceWriter);
A minimal SequenceFile writer constructor(SequenceFile.Writer(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass)) requires the specification of the filesystem, Hadoop configuration, path(file location), and definition of both key and value classes. A constructuctor that is used in the previous example enables you to specify additonal file parameters, including the following:
  1. int bufferSize - The default buffer size(4096) is used, if it is not defined.
  2. short replication - Default replication is used.
  3. long blockSize - The value of 1073741824(1024 MB) is used.
  4. Progressable progress - None is used.
  5. SequenceFile.Metadata metadata - An empty metadata class is used.
Once the writer is created, it can be used to add key/record pairs to the files.

One of the limitations of SequenceFile is the inability to seek based on the key values. Additional Hadoop file types(MapFile, SetFile, ArrayFile, and BloomMapFile) enalbe you to overcome this limitation by adding a key-based index on top of SequenceFile.

A MapFile is really not a file, but rather a directory containing two files - the data(sequence) file, containing all keys and values in the map, and a smaller index file containing a fraction of the keys. Create MapFiles by adding entries in order. MapFiles are typically used to enable an efficient search and retrieval of the contents of the file by searching on their index.

The index file is populated with the key and a LongWritable that contains the starting byte position of the record correspondign to this key. An index file does ot contain all the keys, but just a fraction of them. You can set the indexInterval using setIndexInterval() method on the writer. The index is read entirely into memory, so for the large map, it is necessary to set an index skip value that allows making the index file small enough so that it fits in memory completely.

Similar to the SequenceFile, a special reader(MapFile.Reader) and writer(MapFile.Writer) are provided by Hadoop for use with map files.

SetFile and ArrayFile are variations of MapFile for specialized implementations of key/value types. The SetFile is a MapFile for the data represented as a set of keys with no values(a value is represented by NullWritable instance). The ArrayFile deals with key/value pairs where keys are just a sequential long. It keeps an internal counter, which is incremented as part of every append call. The value of this counter is used as a key.

Both file types are useful for sorting keys, not values.

A bloom filter is a space-efficient, probabilistic data structure that is used to test whether an element is a member of a set. The result of the test is that the element either definitely is not in the set or may be in the set.
The base data structure of a bloom filter is a bit vector. The probability of false positives depends on the size of the element's set and size of the bit vector.

Although risking false positive, bloom filters have a strong space advantage over other data structures for representing sets, such as self-balancing binary search trees, tries, hash tables, or simple arrays or linked lists of the entries. Most of these require storing at least the data items themselves, which can require anywhere from a small number of bits(for small integers) to an arbitrary number of bits, such as for strings.(Tries are an exception, because they can share storage between elements with equal prefixes.)

This advantage of a bloom filter comes partly from its compactness(inherited from arrays), and partly from its probabilistic nature).

Finally, the BloomMapFile extends the MapFile implementation by adding a dynamic bloom filter that provides a fast membership test for keys. It also offers a fast version of a key search operation, especially in the case of sparsely populated MapFiles. A writer's append() operation updates a DynamicBloomFilter, which is then serialized when the writer is closed. This filter is loaded in memory when a reader is created. A reader's get() operation first checks the filter for the key membership, and if the key is absent, it immediately returns null without doing any further I/O.

DATA COMPRESSION



没有评论:

发表评论