2014年5月20日星期二

Chapter 2 - Storing Data in Hadoop

HDFS

HDFS is implemented as a block-structured filesystem. individual fiels are broken into blocks of a fixed size, which are stored across an Hadoop cluster. A file can be mand up of several blocks, which are stroed on different DataNode(indiidual machine in the cluster) chosen randomly on a block-by-block basis. As a result, access to a file usually requires access to multiple DataNodes, which means that HDFS supports file sizes far larger than a single-machine disk capacity.

The DataNode stores each HDFS data block in a separate file on its local filesystem with no knowledge about the HDFS files themselves. To improve throughput ever futher, the DataNode does not create all files in the same directory. In stead, it uses heuristics to determine the optimal number of files per directory, and created subdirectories appropriately.

One of the requirements for such a block-structured filesystem is the capacity to store, manage, and access file metadata(information about files and blocks) reliably, and to provide fast access to the metadata store. Unlike HDFS files themselves(which are accessed in a write-once and read-many model), the metadata structures can be modified by a large number of clients concurrently. It is important that this information never gets out of sync. HDFS solves this problem by introducing a dedicated special machine, called the NameNode, which stores all the metadata for the filesystem across the cluster. This means that HDFS implements a master/slave architecture. A single NameNode(which is a master server) manages the filesystem namespace and regulates access to files by clients. The existence of a single master in a cluster greatly simplifies the architecture of the system. The NameNode serves as a single arbitrator and respository for all HDFS matadata.

Metadata storage is also perssitent. The entire filesystem namespace(including the mapping of blocks to files and filesystem properties) is contained in a file called FsImage stored as a file in the NameNode's local filesystem. The NameNode also uses a transaction log to persistently record every change that occurs in filesystem metadata(metadata store). This log is stored in the edits log file on the NameNode's local filesystem.

The SecondaryNameNode is not a "backup NameNode". It cannot take over the primary NameNode's function. It serves as a checkpointing mechanism for the primary NameNode. In addition to storing the state of the HDFS NameNode, it maintains two on-disk structures that persist the current filesystem state: an image file and an edit log. The image file represents an HDFS metadata state at a point in time, and the edit log is a transactional log(compare to a log in a database architecture) of every filesystem metadata change since the image file was created.

During the NameNode (re)starts, the current state is recontstructetd by reading teh image file and then replaying the edit log. Obviousely, the larger the edit log is, the longer is takes to replay it and consequently start a NameNode. To improve NameNode startup performance, an edit log is priodically rolled, and a new image file is created by applying an edit log to the existing image. This operation can be fairly resource-intensive. To minimize the impact of checkpoint creation and the NameNode functioning, checkpointing is performed by the SecondaryNameNode daemon, often on a separate machine.

As a result of checkpointing, the Secondary NameNode holds a copy (out-of-date) of the primary's persistent state in the form of last image file. In the cases when and edit file is kept relatively small, a secondary node can be used to recover the filesystem's state. In this case, you must be aware of a certain amount of metadata (and corresponding data) loss, because the latest changes stored in the edit log are not available.

To keep the memory of footprint of the NameNode manageable, the default size of an HDFS block is 64MB -- orders of magnitude larger than the block size of the majority of most other block-structured filesystem. The additional benefit of the larger data block is that it allows HDFS to keep large amounts of data stored on the disk sequentially, which supports fast streaming reads of data.

One of the misconceptions about Hadoop is the assumption that smaller blocks(less then the block size)will still use the whole block on the filesystem. This is not the case. The smaller blocks occupy exactly as much disk space as they require.

But this does not mean that having many small files will use HDFS efficiently. Regardless of the block size, its metadata occupies exactly the same amount of memroy in the NameNode. As a result, a large number of small HDFS files(smaller than the block size) will use a lot of the NameNode's memory, thus negatively impacting HDFS scalability and performance.

The downside of HDFS fiel organization is that several DataNodes are involved in the serving of a file, which means that a file can become unavailable in the case any one of those machines is lost. To avoid this problem, HDFS replicates each block across a number of machines(three, by default).

Data replication in HDFS is implemented as part of a write operation in the form of a data pipeline. When a client is writing data to an HDFS file, this data is first written to a local file. When the local file accumulates a full block of data, the client consults the NameNode to get a list of DataNodes that assigned to host replicas of that block. The client then writes the data block from its local storage to the first DataNode in 4K portions. The DataNode stores the received blocks in a local filesystem, and forwards that portion of data to the next DataNode in the list. The same operation is repeated by the next receiving DataNode until the last node in the replica set receive data. This DataNode stores data locally without sending it any further.

If one of the DataNodes fails while block is being written, it is removed from the pipeline. In this case, when the write operation on the current block completes, the NameNode re-replicates it to make up for the missing replica caused by the failed DataNode. When a file is closed, the remaining data in the temporary local file is piplelined to the DataNodes. The client then informs the NameNode that the file is closed. At this point, the NameNode commits the file creation operation into a persistent store. If the NameNode dies before the file is closed, the file is lost.

The default block size and replication factor are specified by Hadoop configuration, but can be overwritten on a per-file basis. An application can specify block size, the number of replicas, and the replication factor for a specific file at its creation time.

One of the most powerful features of HDFS is optimization of replica placement, which is crucial to HDFS reliability and performance. All decisions regarding replication of blocks are made by the NameNode, which periodically(every 3 seconds)receives a heartbeat and a block report from each of the DataNodes. A heartbeat is used to ensure proper functioning of DataNodes, and a block report allows verfifying that a list of blocks on a DataNode corresponds to the NameNode information. One of the first things that a DataNode does on startup is sending a block report to the NameNode. This allows the NameNode to rapidaly form a picture of the block distribution across the cluster.

An importatnt characteristic of the data replication in HDFS is rack awareness. Large HDFS instances run on a cluster of computers that is commonly spread across many racks. Typically, network banwidth(and consequently network performance) between machines in the same rack is greater than network bandwidth between machines in different racks.

The NameNode determines the rack ID that each DataNode belongs to via the Hadoop Rack Awareness process. A simple policy is to place replicas on unique racks. This policy prevents losing data when an entire rack is lost, and evenly distributes replicas in the cluster. It also allows using bandwidth from multiple racks when reding data. But because a write must, in this case, transfer blocks to multiple racks, the performance of writes suffers.

An optimization of a Rack Aware policy is to cut inter-rack write traffic(and consequently improve write performance)by using the number of racks that is less than the number of replicas. For example, when a replication factor is three, two replicas are placed on one rack, and the third one is on a different rack.

To minimize global bandwidth consumption and read latency, HDFS tries to satisfy a read request from a relica that is closest to the reader. If a replicas exists on the same rack as the reader node, that relica is used to satisfy the read request.

As mentaioned, each DataNode periodically sends a heartbeat message to the NameNode, which is used by the NameNode to discover DataNode failures(based on missiong heartbeat). The NameNode marks DataNode without recent heartbeats as dead, and does not dispatch any new I/O requests to them. Because data located at a dead DataNode is no longer available to HDFS, DataNode death may cause the replication factor of some blocks to fall below their specified values. The NameNode constantly tracks which blocks must be re-relicated, and initiates replication whenever necessary.

Using HDFS Files
When a file is opened for writing, opening client is granted an exclusive writing lease for the file. This means that no other client can write to this file until this client completes the operation. To ensure that no "runaway" clients are holding a lease, the lease priodically expires. The use of leases effectively ensures that no two applications can simultaneously write to a given file(compared to a write lock in the database).

The lease duration is bound by a soft limit and a hard limit. For the duration of a soft limit, a writer has an exlusive access to the file. If the soft limit expires and the client fails to close the file or renew the lease(by sending a heartbeat to the NameNode), another client can preempt the lease. If the hard limit(one hour) expires and the client has failed to renew the lease, HDFS assumes that the client has quit, and automatically closes the file on behalf the writer, and then recovers the lease.

The writer's lease does not prevent other clients from reading the file. A file may have many concurrent readers.

Hadoop-Specific File Types
In addition to "ordinary" files, HDFS also introduced several specialized file types(such as SequenceFile, MapFile, SetFile, ArrayFile, and BloomMapFile)that provide much richer functionality, which often simplifies data processing.

SequenceFile provides a persistent data structure for binary key/value pairs. Here, different instances of both key and value must represent the same Java class, but can have different sizes. Similar to other Hadoop files, SequenceFiles are append-only.

When using an ordinary file(either text or binary)for storing key/value pairs(typical data structures for MapReduce), data storage is not aware of key and value layout, which must be implemented in the readers-on top of generic storage. The use of SequenceFile provides a storage mechanism natively supporting key/value structure, thus making implementations using this data layout much simpler.

SequenceFile has three available formats: Uncompressed, Record-Compressed, and Block-Compressed. The first two are stored in a record-based format, whereas the third one uses block-bases format.

The choice of a specific format for a sequence file defines the length of the file on the hard drive. Block-Compressed files typeically are the smallest, while Uncompressed are the largest.

SequenceFile Header
FIELD DISCRIPTION
Version A 4-byte array conaining three letters(SEQ) and a sequence file version number (either 4 or 6). The currently used version is 6. Version 4 is supported for backward compatibility.
Key Class Name of the key class, which is validated against a name of the key class provided by the reader.
Value Class Name of the value class, which is validated against a name of the key class provided by the reader.
Compression A key/value compression flag
Block Compression A block compression flag
Compression Codec CompressionCodec class. This class is used only if either key/value or block compression flag are true. Othersize, this value is ignored.
Sync A sync makeer

A sync is specialized marker, which is used for faster search inside SequenceFile. A sync marker also use in MapReduce implementation - data splits are done only on sync boundaries.

Record Layout
FIELD DESCRIPTION
Record Length The length of the record(bytes)
Key Length The length of the key(bytes)
Key Byte array, containing the record's key
Value Byte array, containing the record's value
In this case, header and sync are serving the same purposes as in the case of a record-based SequenceFile format. The actual data is contained in the blocks.

Block Layout
FIELD DESCRIPTION
Keys lengths length In this case, all the keys for a given block are sorted together. This field specified compressed key-lengths size(in bytes)
Keys Lengths Byte array, containing compressed key-lengths block
keys length Compressed keys size(in bytes)
Keys Byte array, containing compressed keys for a block
Values lengths length In this case, all the valeus for a given block are stored together. This field specifies compressed value-lenghts block size(in bytes)
Values Lengths Byte array, containing compressed value-lengths block
Values length Compressed value size(in bytes0
Values Byte array, containing compressed values for a block

All formats use the same header that contains information allowing the reader to recognize it. The header contains key and value class names that are used by the reader to instantiate those classes, the version number, and compression information. If compression in enabled, the Compression Codec class name field is addes to the header.

Metadata for the SequenceFile is a set of key/value text pairs that can conatin additional information about the SequenceFile that can be used by the file reader/writer.

Implementation of write operations for Uncompressed and Record-Compressed format is very similar. Each all to an append() method adds a record containing the length of the whole record(key length plus value length), the length of the key, and the raw of key and value to the SequenceFile. The difference between the compressed and the uncompressed version is whether or not the raw data is compressed, with the specified code.

The Block-Compressed format applies a more aggressive compression. Data is not written until it reaches a threshold (block size), at which point all keys are compressed together. The same thing happnens for the values and the lists of key and value lengths.

A special reader(SequenceFile.Reader) and writer(SequenceFile.Writer) are provided by Hadoop for use with SequenceFiles.
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path path = new Path("filename");
SequenceFile.Writer sequenceWriter = new SequenceFile.Writer(fs, conf, path, Key.class, Value.class, fs.getConf().getInt("io.file.buffer.size", 4096), fs. getDefaultReplication(), 1073741824, null, new Metadata());

.......................................
SequenceWriter.append(bytesWritable, bytesWritable);
.......................................
IOUtils.closeStream(sequenceWriter);
A minimal SequenceFile writer constructor(SequenceFile.Writer(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass)) requires the specification of the filesystem, Hadoop configuration, path(file location), and definition of both key and value classes. A constructuctor that is used in the previous example enables you to specify additonal file parameters, including the following:
  1. int bufferSize - The default buffer size(4096) is used, if it is not defined.
  2. short replication - Default replication is used.
  3. long blockSize - The value of 1073741824(1024 MB) is used.
  4. Progressable progress - None is used.
  5. SequenceFile.Metadata metadata - An empty metadata class is used.
Once the writer is created, it can be used to add key/record pairs to the files.

One of the limitations of SequenceFile is the inability to seek based on the key values. Additional Hadoop file types(MapFile, SetFile, ArrayFile, and BloomMapFile) enalbe you to overcome this limitation by adding a key-based index on top of SequenceFile.

A MapFile is really not a file, but rather a directory containing two files - the data(sequence) file, containing all keys and values in the map, and a smaller index file containing a fraction of the keys. Create MapFiles by adding entries in order. MapFiles are typically used to enable an efficient search and retrieval of the contents of the file by searching on their index.

The index file is populated with the key and a LongWritable that contains the starting byte position of the record correspondign to this key. An index file does ot contain all the keys, but just a fraction of them. You can set the indexInterval using setIndexInterval() method on the writer. The index is read entirely into memory, so for the large map, it is necessary to set an index skip value that allows making the index file small enough so that it fits in memory completely.

Similar to the SequenceFile, a special reader(MapFile.Reader) and writer(MapFile.Writer) are provided by Hadoop for use with map files.

SetFile and ArrayFile are variations of MapFile for specialized implementations of key/value types. The SetFile is a MapFile for the data represented as a set of keys with no values(a value is represented by NullWritable instance). The ArrayFile deals with key/value pairs where keys are just a sequential long. It keeps an internal counter, which is incremented as part of every append call. The value of this counter is used as a key.

Both file types are useful for sorting keys, not values.

A bloom filter is a space-efficient, probabilistic data structure that is used to test whether an element is a member of a set. The result of the test is that the element either definitely is not in the set or may be in the set.
The base data structure of a bloom filter is a bit vector. The probability of false positives depends on the size of the element's set and size of the bit vector.

Although risking false positive, bloom filters have a strong space advantage over other data structures for representing sets, such as self-balancing binary search trees, tries, hash tables, or simple arrays or linked lists of the entries. Most of these require storing at least the data items themselves, which can require anywhere from a small number of bits(for small integers) to an arbitrary number of bits, such as for strings.(Tries are an exception, because they can share storage between elements with equal prefixes.)

This advantage of a bloom filter comes partly from its compactness(inherited from arrays), and partly from its probabilistic nature).

Finally, the BloomMapFile extends the MapFile implementation by adding a dynamic bloom filter that provides a fast membership test for keys. It also offers a fast version of a key search operation, especially in the case of sparsely populated MapFiles. A writer's append() operation updates a DynamicBloomFilter, which is then serialized when the writer is closed. This filter is loaded in memory when a reader is created. A reader's get() operation first checks the filter for the key membership, and if the key is absent, it immediately returns null without doing any further I/O.

DATA COMPRESSION

An important consideration for storing data in HDFS files is data compression, shifting the computation load in data processing from I/O to CPU. As shown in several publications, providing a systematic evalation of the compute versus I/O trade-offs when using compression for MapReudce implementation, the benefits of data compression depend on the type of data-processing jobs. For read-heavy(I/O bound)applications(for example, text data processing), compressing given 35 to 60 percent performance savings. On the other hand, for compute-intensive applications(CPU bound), performance gains from data compression are negligible.

This does not mean that data compression is not advantageous for such applications. Hadoop clusters are shared resources, and as a result, a diminishing I/O load for one application increases the capability of other applications to use this I/O.

Does this mean that data compression is always desirable? The answer is "no". For example, if you are using text or custom binary input files. data compression might be undesirable, because compressed files are not splittable. On the other hand, in the case of SequenceFile and their derivatives, comperssion is always desirable. Finally, it always makes sense to compress the intermediate files used for shuffle and sort.

Keep in mind that the result of data compression depend greatly on the type of data being compressed and the compression algorithm.

HDFS Federation and High Availability
The main benefits of HDFS Federation:
  • Namespace scalability - HDFS cluster storage scales horizontally, but the namespace does not. Large deployments(or deployments using lot of small files)benefit from scaling the namespace by adding more NameNodes to the cluster.
  • Performance - Filesystem operation throughput is limited by a single NameNode. Adding more NameNode to the cluster scales the filesytem read/write operation's throughput.
  • Isolation - A single NameNode offers no isolation in a multi-user environment. An exprimental application can overload the NameNode and slow down production-critical applications. With multiple NameNodes, different categories of application and users can be isolated to different namespaces.
Implementation of HDFS is bases on the collection of independent NameNodes that don't require coordination with each other. The DataNodes are used as storage for blocks by all the NameNodes. Each DataNode registers with all the NameNodes in the cluster. DataNodes send periodic heartbeats and block reports, and handle commands from the NameNodes.

A namespace operates on a set of blocks - a block pool. Although a pool is dedicated to a specific namespace, the actual data can be allocated on any of the DataNodes in the cluster. Each block pool is managed independently, which allows a namespace to generte block IDs for new blocks without the need for coordination with the other namespaces. The failure of a NameNode does not prevent the DataNode from serving other NameNodes in the cluster.

A namespace and its block pool together are called a namespace volume. This is a self-contained unit of management. When a NameNode/namespace is deleted, the corresponding block pool at the DataNodes is deleted. Each namespace volume is upgraded as a unit, during clustr upgrade.

HDFS Federation configuration is backward-compatible, and allows existing single NameNode configuration to work without any change. The new configuration is designed such that all the nodes in the cluster have the same configuration without the need for deploying a different configuraiton based on the type of the node in the cluster.

Although HDFS Federation solves the problem of HDFS scalability, it does not solve the NameNode reliability issue.(In reality, it makes it worse - a probability of one NameNode failure in this case is higher).

A new HDFS high-availability architecture that contains two separate machines configured as NameNodes with exactly one of them in an active state at any point in time. The active NameNode is responsible for all client operations in the cluster, while the other one(standby) si simply acting as a slave, maintaining, enough state to provide a fast failover if necessary. To keep sate of both nodes synchronized, the implementation requires that both nodes have access to a directory on a sharded storage device.

When any namespace modification is performed by the active node, it durably logs a record of the modification to a log file located in the shared directory. The standby node is constantly watching this directory for changes, and applies them to its own namespace. In the event of a failover, the standby ensures that it has read all of the changes before transitioning to the active state.

To provide a fast failover, it is also necessary for the standby node to have up-to-date information regarding the location of blocks in the cluster. This is achieved by configuring DataNodes to send block location information and heartbeats to both DataNodes(这里应是 both NameNodes吧?).


HBASE

HBase Architecture
HBase supports a very loose schema definition, and does not provide any joins, query language, or SQL.

Although HBase does not support real-time joins and queries, bach joins and/or queries via MapReduce can be easily implemented. In fact, they are well-supported by high-level systems such as Pig and Hive, which use a limited SQL dialect to execute operations.

The main focus of HBase is on Create, Read, Update, and Delete(CRUD) operations on wide sparse tables. (Currently, HBase does not support transactions(but provides limited locking support and some atomic operations)) and secondary indexing(several community projects are trying to implement this functionality, but they are not part of the core HBase implementations). As a result, more HBase-based implementations are using highly denormalized data.

HBase levergaes HDFS for its persistent data storage. This allows HBase to leverage all advanced features that HDFS provides, including checksums, replication, and failover. HBase data management is implemented by distributed region servers, which are managed by HBase master(HMaster).

A region server's implementation consists of the following major components:
  • memstore is HBase's implementation of in-memory data cache, which allows improving the overall performance of HBase by serving as much data as possible directly from memory. The memstore holds in-memory modifications to the store in the form of key/value. A write-ahead-log(WAL) records all changes to the data. This is important in case something happens to the primary storage. If the server crashes, it can effectively replay that log to get everything up to where the server should have been just before the crash. It also means that if writing the record to the WAL fails, the whole operation must be considered a failure.
    One of the HBase optmizatiion techniques is disabling the writes to the WAL. This represents a trade-off between performance and reliability. Disabling writes to the WAL prevent recovery when a region server fails before a write operation completes. You should use such an optimization with care, and only in cases when either data loss is acceptable, or a write operation can be "replayed" based on an additional data source.
  • HFile is a specialized HDFS file format for HBase. The implementation of HFile in a region server is responsible for reading and writing HFiles to and from HDFS.
    ZOOKEEPER
    Zookeeper is a replicated synchronization service with eventual consistency. It is robust, because the persisted data is distributed between multiple nodes(this set of nodes is called an ensemble) and a client connects to any of them(that is, a specific "server"), migrating if a given server fails. As long as a strict majority of nodes are working, the ensemble of Zookeeper nodes is alive.

    Zookeeper's master node is dynamically chosen by consensus within the ensemble. If the master node fails, the remainder of nodes picks a new master. The master is the authority for writes. This guarantees that the writes are persisted in-order(that is, writes are linear). Each time a client writes to the ensemble, a majority of nodes persist the information. This means that each write makes the server up-to-date with the master.

    A canonical example of Zookeeper usage is distributed-memory computation, where some data is shared between client nodes, and must be accessed/updated in a very careful way to account for synchronization. Zookeeper offers the library to construct custom synchronization primitives, while the capability to run a distributed server avoids the single-point-of-failure issue you have when using a centralized message repository.

A distributed HBase instance depends on a running Zookeeper cluster. All participating nodes and clients must be able to access the running Zookeeper instances. By default, HBase manages a Zookeeper "cluster" - it starts and stops the zookeeper processes as part of the HBase start/stop process. Because the HBase master may be relocated, clients bootstrap by looking for the current location of the HBase master and -Root- table.

HBase uses an auto-sharding and distribution approach to cope with a large data size(compared to HDFS block-based design and data access).

To store a table of arbitrary length, HBae partitions this table into regions, with every region containing a sorted(by primary key), continuous range of rows. The term "continuous" here does not mean that a region contains all the keys from the given interval. Rather, it means that all keys from an interval are guaranteed to be partitioned to the same region, which can have any number of holes in a key space.

The way regions are split depends not on the key space, but rather on the data size. The size of data partition for a given table can be configured during table creation. These regions are "randomly" spread across region servers.(A single region can serve any nubmer of regions for a given table)They can also be moved around for load balancing and failover.

When a new record is inserted in the table, HBase decide which region server it should go to (based on the key value) and inserts it there. If the region's size exceed the predefined one, the region automatically split. The region split is a fairly expensive operation. To avoid some of it, the table can aso be pre-split during creation, or manually at any point.

When a record(or set of records) is read/updated, HBase decide which regions should contain the data, and directs the client to the appropriate ones. From this point, region servers implement the actual read/update operation.

HBase leverage a specailized table(.META) to resolve a specific key/value pair to the specific region server. This table contains a list of avilable region servers, and a list of descriptors for user tables. Each descriptor specifies a key range for a given table, contained in a given region.

A .META. table is discovered using another specialized HBase table(-ROOT-), which contains a list of descriptors for a .META. table. A location of the -ROOT- table is held in Zookeeper.

An HBase table is sparse, distributed, persistent multidimensional sorted map. The first map level is a key/row value. As mentioned, row keys are always sorted, which is the foundation of a table's sharding and efficient reads and scans - reding of the key/value pairs in sequence.

One thing to be aware of is the fact that HBase operates on byte arrays. All of the components of HBase data - keys, column family names, and column names - are treated by HBase as array of uninterpereted bytes. This means that all interval value comparisons and, consequently, sorting, is done in lexicographical order. this is very important to remember, especially for the row keys design to avoid unpleasant surprise. A typical example is usage of integer keys. If they are not left padded to have the same length, then, as a result of HBase sorting, a key 11, for example, will appear before 5.(key 按词典顺序进行排序,所以,假设用整数做 key,在它们未做左补 0 时,字符长度不同,这时进行排序时,key 为 11 的就会排在 5 的前面。)

The second map level used by HBase is based on a column family.(Column families were originally introduced by columnar databases for fast analytical queries. In this case, data is sorted not row by row as in a traditional RDBMS, but rather by column families.) Column families are used by HBase for separtion of data bases on access patterns(and size).

Column families play a special role in HBase implementation. They define how HBase data is stored and accessed. Every column family is stored in a sperate HFILE. This is an important considertion to keep in mind during table design. It is recommended that you create a column family per data access type - that is, data typically read/written together should be placed in the same column family.(Column Family 存在对应的 HFILE 文件中,所以同时读写的数据需放在同一 Column Family 中。)

A set of the column families is defined during table creation(although it can be altered at a later point). Different column families can also use a different compression mechanism, which might be an important factor, for example, when separate column family are created for metadata and data(a common design pattern). In this case, metadata is often relatively small, and does not require compression, whereas can be sufficiently large, and compression often allows improving HBase thoughput.

Because of such a storage organization, HBase implements merged reads. For a given row, it reads all of files of the column families and combines them together before sending them back to the client. As a result, if the whole row is always processed together, a single column family will typically provide the best performance.(如需经常读取整行数据,从性能上来考虑,最好是只有一 Column Family.)

The last map level is based on columns. HBase treats columns as a dynamic map of key/value pairs. This means that columns are not difined during table creation, but are dynamically populated during write/update operations. As a result, every row/column family in the HBase table can contain an arbitrary set of clumns. The columns contain the actual values.

Technically, there is one more map level supported by HBaes - versioning of every column value. HBase does not distinguish between writes and updates - an update is effectively a write with a new version. By default, HBase stores the last three vesions for a given column value(while automatically deleting order versions). The depth of versioning can be controlled during tale creation. A default implementation of the version is the timestamp of the data insertion, but it can be easily overwritten by a custom version value.

Following are the four primray data operations supported by HBase:
  • Get return values of column families/columns/versions for a specified row(or rows). It can be further narrowed to apply to a specific column family/clolumn/version. It is important to realize that if Get is narrowed to a single column fmaily, HBase would not have to implement a merged read.(HBase 返回行或多行的值时可以指定具体的 column family/column/version 的值。)
  • Put either add a new row(or rows) to a table if the key doesn not exist, or updates an existing row(or rows) if the key already exists. Similar to Get, Put can be limited to apply to a specific column family/column.(新增/更新行或多行时也可以指定具体的 column family/column.)
  • Scan allows iteration over multiple rows(a key range) for specified values, which can include the whole row, or any of its subset.
  • Delete removes a row (or rows), or any part thereof from a table. HBase does not modify data in place, and, as a result, Deletes are handled by creating new markers called tombstones. These tombstones, along with the dead values, are cleaned up on major compactions.

Get and Scan operations on HBase can be optionally configured with filters, which are applied on the region server. They provide an HBase read optimization technique, enableing you to improve the performance of Get/Scan operations. Filters are effectiely conditions that are applied to the read data on the region server, and only the rows(or parts of th rows) that pass filtering are delivered back to the client.

HBase supports a wide range of filters from row key to column family, and columns to value fiters. Additionally, fiters can be combined in the chains using boolean operations. Keep in mind that filtering does not reduce the amount of data read(and still ofen requires a full table scan), but can significant reduce network traffic. Filter implementations must be deployed on the servers, and require its restart(filter 没有减少总的数据读取,通常还是需要进行全表扫描,但大大的减少了网络传输。).

In additional to general-purpose Put/Get operations, HBase supports the following specialized operations:
  • Atomic conditional operation(including atomic compare and set)allow executing a server-side update, guarded by a check and atomic compare and delete(executing a server-side guarded Delete)
  • Atomic "counters" increment operations, which guarantee synchronized operations on them. Synchronization, in this case, is done within a region server, not on the client.

Below table describe teh format that HBase uses to store the table data.

Table Data Layout in the File
KEY TIMESTAMP COLUMN FAMILY:COLUMN NAME VALUE
Row key Timestamp of the last update Family:column Value

This table explains the mechanics used by HBase to deal with sparsely populated tables and arbitrary column names. HBase explicity stores each column, defined for a specific key in an appropriate column family file.

Because HBase uses HDFS as a persistence mechanism, it never overwrites data.(HDFS does not support updates.) As a result, every time memstore is flushed to disk, it does not overwrite an existing store file, but rather creates a new one. To avoid proliferation of the store files, HBase implements a process known as compaction.

Two types of compaction exist: minor and major. Minor compactions usually pick up a couple of the smaller adjacent store file and rewrite them as one. Minor compactions do not drop Deletes or expired cells; only major compations do this. Sometimes a minor compaction picks up all the store files, in which case it actually promotes itself to being a major compaction.

After a major compaction runs, there will be a single store file per store, which usually improves performance. Compactions will not perform region merges.

For faster key search within the files, HBase leverages bloom filters, which enable you to check on row or ow/column level, and potentially filter an entire stored file from read. This filtering is especially useful in the case of a population of sparse keys. Bloom filters are generated when a filter is persisted, and are stored at the end of each file.

HBase Schema Design
Guidelines:
  • The most effective access to HBase data is usage of Get or Scan operations based on the row key. HBase does not support any secondary keys/indexes. This means that, ideally, a rwo key should be designed to accomodate all of the access patterns reuqired for a specific table. This often means using the compositive row key to accommodate more data access patterns.
  • A general guideline is to limit the number of column families per table not to exceed 10 to 15.(Remember that every clomun family is stored by HBase in a separate file, so a large amount of column families are required to read and merge multiple files). Also, based on the fact that the column family name is stored explicitly with every column name, you should minimize the size of the column family. Single-letter names are often recommended, if the amount of data in the column is small.
  • Although HBase does not impose any limitation on the size of columns in a given row, you should consider the following:
    1. Rows are not splittable. As a result, a huge column data size(close to the region size)typically manifests that this type of data should not be stored in HBase.
    2. Each column value is stored along with its metadata(row key, column family name, column name), which means that a very small column data size leads to a very ineffeicitent use of storage(that is, more space is occupied with metadta than the actual table data). It also implies that you should not use long column names.
  • When deciding between tall-narrow(millions of keys with a limited amount of columns) versus flat-wide(a limited number of keys with millions of columns) table design, the former is generally rocommended. This is because of the following:
    1. In the extreme cases, a flat-wide table might end up with one row per region, which is bad for performance and scalability.
    2. Table scans are typically more efficient compared to massive reads. As a result, assuming that only a subset of the row data is needed, a tall-narrow design provides better overall performance.

On of the main strengths of HBase design is the distributed execution of requests between multiple region servers. However, taking advantage of this design and ensuring that there are no "hot"(overloaded) servers during an application's execution might require special design approaches for the row key. A general rocommendation is to avoid a monotonically increasing sequence(for example, 1, 2, 3, or timestamps)as the row key for massive Put operations. You can mitigate any congestion in a single region brought on by monotonically increasing keys by randomizing the key values so that they are not in sorted order.(在写操作很大的表中,避免使用递增整数/时间戳做 row key.)

Data locality is also an importatn design consideration for HBase Get/Scan operations. Rows that are often retrived together should be co-located, which means that they must have adjacent keys.

A general rule is to use sequential keys in the Scan-heavy cases, especially if you can leverage bulk imports for data populateion. Random keys are recommended for massive parallel writes with random, single key access.(一个简单的规则是,在 Scan 操作大的表中,建议采用有顺序的 key 做为 row key,而在 Put 并行操作大的或是单个 key 操作比较多的表中,采用随机 key 做 row key.)


Some specific row key design patterns:
  • Key "salting" - This entails prefixing a sequentail key with a random value, which allows for "bucketing" sequential keys across multiple regions.
  • The swap/promotion of key fields(for example "reverse domains") - A common design pattern in web analytics is to use a domain name as a row key. Using a reverse domain name as a key helps in this case by keeping information about this would be use an MD5 hash.
  • Complete randomization of keys - An exmple of this would be use an MD5 hash.



Programming for HBase
One thing to keep in mind is that HTable implementation is single-threaded. If it is necessary to have access to HBase from multiple threads, every thread must create it own instance of the HTable class. One of the ways to solve this problem is to use the HTablePool class. This class servers one purpose - pooling client API(HTableInterface) instances in the HBase cluster.
......

New HBase Features
HFile V2 Format
The problem with the current HFile format is that it causes high memory usage and slow startup times for the region server because of large filters and block index sizes.

In the curretn HFile format, there is a single index file that always must be stored in memory. This can result in gigabytes of memory per server consumed by block sizes, which has a significatn negative impact on region server scalability and performance. Additionally, because a region is not considered opened until all of its block index data is loaded, such block index size can significantly slow up region startup.

To solve this problem, the HFile v2 format breaks a block index into a root index block and leaf blocks. Only the root index(indexing data blocks) must always be kept in memory. A leaf index is stored on the level of blocks, which means that its presence in memory depends on the presence of blocks in memory. A leaf index is loaded in memory only when the block is loaded, and is evicted when the block is evicted from memory. Additionally, leaf-level indexes are structured in a way to allow a binary search on the key without deserializing.

A similar approach is taken by HFile v2 implementers for bloom filters. Every data block effectively uses its own bloom filter, which is being written to disk as soon as a block is filled. At read time, the appropriate bloom filters do not rely on an estimate of how many keys will be added to the bloom filter, so they can hit the target false positive rate much more precisely.

Following are some additional enhancements of HFile v2:
  • A unified HFile block format enables you to seek to the provious block efficiently without using a block index.
  • The HFile refactoring into a reader and writer hierarachy allows for significatn improvements in code maintainability.
  • A sparse lock implementation simplifies synchronization of block operations for hierarchical block index implementation.
An important feature of the current HFile v2 reader implementation is that it is capable of reading both HFile v1 and v2. The writer implementation, on the other hand, only writes HFile v2. This allows for seamless transition of the existing HBase installations from HFile v1 to HFile v2. The use of HFile v2 leads to noticeable improvements in HBase scalability and performance.

Coprocessors
HBase coprocessors were inspired by Google's BigTable coprocessors, and are designed to support efficient computational parallelism - beyond what Hadoop MapReduce can provide. In addition, coprocessors can be used for implementation of new features - for example, seconday indexing, complex filtering (push down predicates), and access control.

Although inspired by BigTable, HBase coprocessors diverge in implementation detail. They implement a framework that provides a library and runtime environment for executing user code within the HBase region server(that is, the same Java Virtual Machine, or JVM) and master processes. In contrast, Google coprocessors do not run inside with the tablet server(the equivalent of an HBase region server), but rather outside of its address space.(HBase developers are also considering an option for deployment of coprocessor code external to the server process for future implementations.)

HBase defiens two types of coprocessors:
  • System coprocessors are loaded globally on all tables and regions hosted by a region server.
  • Table coprocessors are loaded on all regions for a table on a per-table basis.

The framework for coprocessors is very flexible, and allows implementing two basic coprocessor types:
  • Observer(which is similar to triggers in conventional databases)
  • Endpoint(which is similar to conventional database stored procedures)

Observers allow inserting user's code in the execution of HBase calls. This code is invoked by the core HBase code. The coprocessor framework handles all of the details of invoking the user's code. The coprocessor implementation need only contain the desired functionality.

  • RegionObserver - This provides hooks for data access operations(Get, Delete, Scan, and so on), and provides a way for supplementing these operations with custom user's code. An instance of RegionObserver coprocessor is loaded to every table region. Its scope is limited to the region in which it is present. A RegionObserver needs to be loaded into every HBase region server.
  • WALObserver - This provides hooks for write-ahead-log operations. This is a way to enhance WAL writing and reconstruction events with custom user's code. A WALObserver runs on a region server in the context of WAL processing. A WALObserver needs to be loaded into every HBase region server.
  • MasterObserver - This provides hooks for table management operations(that is, create, delete, modify table, and so on) and provides a way for supplementing these operations with custom user's code. The MasterObserver runs within the context of the HBase master.

Observers of a given type can be chained together to execute sequentially in order of assigned priorities. Coprocessors in a given chain can additionally communicate with each other by passing information through the execution.

Endpoint is an interfce for dynamic remote procedure call(RPC) extension. The endpoint implementation is installed on the server side, and can then be invoded with HBase RPC. The client library provides convenient methods for invoking such dynamic interfaces.

The sequence of steps for building a custom endpoint is as follows:
  1. Create a new interface extending CoprocessorProtocol and supporting a data exchange required for RPC implementation. Data transfer must be implemented as byte arrays.
  2. Implement the endpoint interface using(extending) the abstract calss BaseEndPointCoprocessor, which hides some internal implementation details(such as coprocessor of framework class loading). The implementation must contain all the required coprocessor functionality, and will be loaded into and executed from the region context. There is nothing that prevents this implementation from issuing HBase operations, which might involve additional region servers.

On the client side, the endpoint can be invoked by new HBase client APIs that allow executing it on either a single region server, or a range of region servers.

The current implementation provides two options for deploying a custom coprocessor:
  • Load from configuration(which happens when the master or region servers start up)
  • Load from a table attribute(that is, dynamic loading when the table is(re)opened)

When considering the use of coprocessors for your own development, be aware of the following:
  • Because, in the current implementation, coprocessors are executed within a region server execution context, badly behavign coprocessors can take down a region server.
  • Coprocessor execution is non-transactional, which means that if a Put coprocessor that is supplementing this Put with additional write operations fails, the Put is still in place.

Although HBase provides a much richer data access model and typically better data access performance, it has limitations when it comes to the data size per row.





没有评论 :

发表评论