Wednesday, January 31, 2018

Sqoop Imports from Oracle

Sqoop is a tool designed to transfer data between Hadoop and relational databases. You can use Sqoop to import data from a relational database management system (RDBMS) such as MySQL or Oracle into the Hadoop Distributed File System (HDFS), transform the data in Hadoop MapReduce, and then export the data back into an RDBMS.

JDBC ORACLE: Examples for Import:


sqoop-import --connect jdbc:oracle:thin:@db.test.com:PORT:INSTANCE_NAME --table DW_DATAMART.HCM_EMPLOYEE_D --fields-terminated-by '\t' --lines-terminated-by '\n' --username RAHUL -P


sqoop-import --connect jdbc:oracle:thin:@db.test.com:PORT:INSTANCE_NAME --table DW_DATAMART.HCM_EMPLOYEE_D --fields-terminated-by '\t' --lines-terminated-by '\n' --username RAHUL -P

JDBC ORACLE: Example for Select:


The sqoop-eval tool allows users to quickly run simple SQL queries against a database; results are printed to the console. This allows users to preview their import queries to ensure they import the data they expect.



sqoop-eval --connect jdbc:oracle:thin:@db.test.com:PORT:INSTANCE_NAME --select * from DW_DATAMART.HCM_COMPANY_D

Sqoop Import to HBASE table:


sqoop-import --connect jdbc:oracle:thin:@db.test.com:PORT:INSTANCE_NAME --username RAHUL --P --table DW_DATAMART.PAY_PAY_CHK_OPTION_D --hbase-table DW_DATAMART.PAY_PAY_CHK_OPTION_D --column-family cf1 --hbase-create-table

If no primary key defined in the Oracle table

sqoop-import --connect jdbc:oracle:thin:@db.test.com:1725:hrlites --username RAHUL --P --table PSMERCHANTID --hbase-table PSMERCHANTID --column-family cf --hbase-row-key MERCHANTID --hbase-create-table --split-by MERCHANTID

sqoop-import --connect jdbc:oracle:thin:@db.test.com:PORT:INSTANCE_NAME --username RAHUL --P --table DW_DATAMART.PAY_PAYGROUP_D --hbase-table DW_DATAMART.PAY_PAYGROUP_D --column-family cf1 --hbase-create-table


sqoop-import --connect jdbc:oracle:thin:@db.test.com:1725:hrlites --username RAHUL --P --table PSMERCHANTID --hbase-table PSMERCHANTID --column-family cf --hbase-create-table --split-by MERCHANTID

DistCp (Distributed Copy)

Hadoop DistCp can be used to copy data between Hadoop clusters (and also within a Hadoop cluster). DistCp uses MapReduce to implement its distribution, error handling and reporting. It expands a list of files and directories into map tasks, each of which copies a partition of the files specified in the source list.

Common use of DistCp is an inter-cluster copy:


hadoop distcp hdfs://nn1:8020/source hdfs://nn2:8020/destination 

Specify multiple source directories:


hadoop distcp hdfs://nn1:8020/source/a hdfs://nn1:8020/source/b hdfs:// nn2:8020/destination

Specify multiple source directories from a file with the -f option:


hadoop distcp -f hdfs://nn1:8020/srclist hdfs://nn2:8020/destination

Note: srclist contains:
hdfs://nn1:8020/source/a
hdfs://nn1:8020/source/b

DistCp from HDP-1.3.x to HDP-2.x:


hadoop distcp hftp://<hdp 1.3.x namenode host>:50070/<folder path of source> hdfs://<hdp 2.x namenode host>/<folder path of target>

DistCp copy from HDP 1.3.0 to HDP-2.0:


hadoop distcp hftp://namenodehdp130.test.com:50070/apps/hive/warehouse/db/ hdfs://namenodehdp20.test.com/data/raw/

Update & Overwrite:


  • The DistCp -update option is used to copy files from a source that do not exist at the target, or that have different contents. 
  • The DistCp -overwrite option overwrites target files even if they exist at the source, or if they have the same contents.


Tuesday, January 30, 2018

Daily Activities of Hadoop Admin

1. Working with data delivery teams to setup new Hadoop users. This job includes setting up Linux users, setting up Kerberos principals and testing HDFS, Hive, Pig and MapReduce access for the new users.

2. Cluster maintenance as well as creation and removal of nodes using tools like Ambari and other tools.

3. Screen Hadoop cluster job performances and capacity planning

4. Monitor Hadoop cluster connectivity and security

5. Manage and review Hadoop log files.

6. File system management and monitoring.

7. Collaborating with application teams to install operating system and Hadoop updates, patches, version upgrades when required.

8. Performance tuning of Hadoop clusters and Hadoop MapReduce routines.

9. HDFS support and maintenance.

10. Disk space management.

11. Working on Snapshot.

12. Transfering data using DISTCP.

Saturday, January 27, 2018

Resource Manager High Availability

This blog provides instructions on setting up the ResourceManager (RM) High Availability (HA) feature in a HDFS cluster. The Active and Standby ResourceManagers embed the Zookeeper-based ActiveStandbyElector to determine which ResourceManager should be active.

NoteThis guide assumes that an existing HDP cluster has been manually installed and deployed. It provides instructions on how to manually enable ResourceManager HA on top of the existing cluster.

The ResourceManager was a single point of failure (SPOF) in an HDFS cluster. Each cluster had a single ResourceManager, and if that machine or process became unavailable, the entire cluster would be unavailable until the ResourceManager was either restarted or started on a separate machine. This situation impacted the total availability of the HDFS cluster in two major ways:

• In the case of an unplanned event such as a machine crash, the cluster would be unavailable until an operator restarted the ResourceManager.

• Planned maintenance events such as software or hardware upgrades on the ResourceManager machine would result in windows of cluster downtime.

The ResourceManager HA feature addresses these problems. This feature enables you to run redundant ResourceManagers in the same cluster in an Active/Passive configuration with a hot standby. This mechanism thus facilitates either a fast failover to the standby ResourceManager during machine crash or a graceful administrator-initiated failover during planned maintenance.

Hardware Resources


Ensure that you prepare the following hardware resources:

• ResourceManager machines: The machines where you run Active and Standby ResourceManagers should have exactly the same hardware. For recommended hardware for ResourceManagers, see "Hardware for Master Nodes" in the Cluster Planning Guide.

• Zookeeper machines: For automated failover functionality, there must be an existing Zookeeper cluster available. The Zookeeper service nodes can be co-located with other Hadoop daemons.

Deploy ResourceManager HA Cluster

HA configuration is backwards-compatible and works with your existing single ResourceManager configuration.

As described in the following sections, first configure manual or automatic ResourceManager failover. Then deploy the ResourceManager HA cluster.

Configure Manual or Automatic ResourceManager Failover

Complete the following prerequisites:

• Make sure that you have a working Zookeeper service. If you had an Ambari deployed HDP cluster with Zookeeper, you can use that Zookeeper service. If not, deploy ZooKeeper using the instructions provided in the Non-Ambari Cluster Installation Guide.

Note: In a typical deployment, ZooKeeper daemons are configured to run on three or five nodes. It is, however, acceptable to co-locate the ZooKeeper nodes on the same hardware as the HDFS NameNode and Standby Node. Many operators choose to deploy the third ZooKeeper process on the same node as the YARN ResourceManager. To achieve performance and improve isolation, Hortonworks recommends configuring the ZooKeeper nodes such that the ZooKeeper data and HDFS metadata is stored on separate disk drives.

• Shut down the cluster using the instructions provided in "Controlling HDP Services Manually," in the HDP Reference Guide.

Set Common ResourceManager HA Properties

The following properties are required for both manual and automatic ResourceManager HA. Add these properties to the etc/hadoop/conf/yarn-site.xml file:





















Note: You can also set values for each of the following properties in yarnsite.xml:

yarn.resourcemanager.address.<rm#id>
yarn.resourcemanager.scheduler.address.<rm#id>
yarn.resourcemanager.admin.address.<rm#id>
yarn.resourcemanager.resource#tracker.address.<rm#id>
yarn.resourcemanager.webapp.address.<rm#id>

If these addresses are not explicitly set, each of these properties will use 
yarn.resourcemanager.hostname.<rm-id>:default_port

such as DEFAULT_RM_PORT, DEFAULT_RM_SCHEDULER_PORT, etc.

The following is a sample yarn-site.xml file with these common ResourceManager
HA properties configured:

<!-- RM HA Configurations-->
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>

<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>

<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>${rm1 address}</value>
</property>

<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>${rm2 address}</value>
</property>

<property>
<name>yarn.resourcemanager.webapp.address.rm1</name>
<value>rm1_web_address:port_num</value>
<description>We can set rm1_web_address separately.
If not, it will use
${yarn.resourcemanager.hostname.rm1}:DEFAULT_RM_WEBAPP_PORT
</description>
</property>

<property>
<name>yarn.resourcemanager.webapp.address.rm2</name>
<value>rm2_web_address:port_num</value>
</property>

<property>
<name>yarn.resourcemanager.recovery.enabled</name>
<value>true</value>
</property>

<property>
<name>yarn.resourcemanager.store.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.
ZKRMStateStore</value>
</property>

<property>
<name>yarn.resourcemanager.zk-address</name>
<value>${zk1.address,zk2.address}</value>
</property>

<property>
<name>yarn.client.failover-proxy-provider</name>
<value>org.apache.hadoop.yarn.client.
ConfiguredRMFailoverProxyProvider</value>
</property>


Configure Manual ResourceManager Failover

Automatic ResourceManager failover is enabled by default, so it must be disabled for manual failover.

To configure manual failover for ResourceManager HA, add the yarn.resourcemanager.ha.automatic-failover.enabled configuration property to the etc/hadoop/conf/yarn-site.xml file, and set its value to "false":

<property>
<name>yarn.resourcemanager.ha.automatic-failover.enabled</name>
<value>false</value>
</property>

Configure Automatic ResourceManager Failover

The preceding section described how to configure manual failover. In that mode, the system will not automatically trigger a failover from the active to the standby ResourceManager, even if the active node has failed. This section describes how to configure automatic failover.

1. Add the following configuration options to the yarn-site.xml file:











Example:

<property>
<name>yarn.resourcemanager.ha.automatic-failover.zk-base-path</name>
<value>/yarn-leader-election</value>
<description>Optional setting. The default value is
/yarn-leader-election</description>
</property>

<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>yarn-cluster</value>
</property>

2. Automatic ResourceManager failover is enabled by default.

If you previously configured manual ResourceManager failover by setting the value of
yarn.resourcemanager.ha.automatic-failover.enabled to "false", you must
delete this property to return automatic failover to its default enabled state.

Deploy the ResourceManager HA Cluster

1. Copy the etc/hadoop/conf/yarn-site.xml file from the primary ResourceManager host to the standby ResourceManager host.

2. Make sure that the clientPort value set in etc/zookeeper/conf/zoo.cfg matches the port set in the following yarn-site.xml property:

<property>
<name>yarn.resourcemanager.zk-state-store.address</name>
<value>localhost:2181</value>
</property>

3. Start ZooKeeper. Execute this command on the ZooKeeper host machine(s):
su - zookeeper -c "export ZOOCFGDIR=/usr/hdp/current/zookeeper-server/
conf ; export ZOOCFG=zoo.cfg; source /usr/hdp/current/zookeeper-server/conf/
zookeeper-env.sh ; /usr/hdp/current/zookeeper-server/bin/zkServer.sh start"

4. Start HDFS using the instructions provided in "Controlling HDP Services Manually".

5. Start YARN using the instructions provided in "Controlling HDP Services Manually".

6. Set the active ResourceManager:

MANUAL FAILOVER ONLY: If you configured manual ResourceManager failover, you
must transition one of the ResourceManagers to Active mode. Execute the following CLI
command to transition ResourceManager "rm1" to Active:

yarn rmadmin -transitionToActive rm1

You can use the following CLI command to transition ResourceManager "rm1" to Standby mode:

yarn rmadmin -transitionToStandby rm1

AUTOMATIC FAILOVER: If you configured automatic ResourceManager failover, no
action is required -- the Active ResourceManager will be chosen automatically.

7. Start all remaining unstarted cluster services using the instructions provided in
"Controlling HDP Services Manually," in the HDP Reference Guide.

Minimum Settings for Automatic ResourceManager

HA Configuration

The minimum yarn-site.xml configuration settings for ResourceManager HA with automatic failover are as follows:

<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>

<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>

<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>192.168.1.9</value>
</property>

<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>192.168.1.10</value>
</property>

<property>
<name>yarn.resourcemanager.recovery.enabled</name>
<value>true</value>
</property>

<property>
<name>yarn.resourcemanager.store.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.
ZKRMStateStore</value>
</property>

<property>
<name>yarn.resourcemanager.zk-address</name>
<value>192.168.1.9:2181,192.168.1.10:2181</value>
<description>For multiple zk services, separate them with comma</description>
</property>

<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>yarn-cluster</value>
</property>

Testing ResourceManager HA on a Single Node

If you would like to test ResourceManager HA on a single node (launch more than one
ResourceManager on a single node), you need to add the following settings in yarnsite.
xml.

To enable ResourceManager "rm1" to launch:

<property>
<name>yarn.resourcemanager.ha.id</name>
<value>rm1</value>
<description>If we want to launch more than one RM in single node, we need
this configuration</description>
</property>

To enable ResourceManager rm2 to launch:

<property>
<name>yarn.resourcemanager.ha.id</name>
<value>rm2</value>
<description>If we want to launch more than one RM in single node, we need
this configuration</description>
</property>

You should also explicitly set values specific to each ResourceManager for the following properties separately in yarn-site.xml:

• yarn.resourcemanager.address.<rm-id>
• yarn.resourcemanager.scheduler.address.<rm-id>
• yarn.resourcemanager.admin.address.<rm-id>
• yarn.resourcemanager.resource#tracker.address.<rm-id>
• yarn.resourcemanager.webapp.address.<rm-id>

For example:

<!-- RM1 Configs -->
<property>
<name>yarn.resourcemanager.address.rm1</name>
<value>localhost:23140</value>
</property>

<property>
<name>yarn.resourcemanager.scheduler.address.rm1</name>
<value>localhost:23130</value>
</property>

<property>
<name>yarn.resourcemanager.webapp.address.rm1</name>
<value>localhost:23188</value>
</property>

<property>
<name>yarn.resourcemanager.resource-tracker.address.rm1</name>
<value>localhost:23125</value>
</property>

<property>
<name>yarn.resourcemanager.admin.address.rm1</name>
<value>localhost:23141</value>
</property>

<!-- RM2 configs -->
<property>
<name>yarn.resourcemanager.address.rm2</name>
<value>localhost:33140</value>
</property>

<property>
<name>yarn.resourcemanager.scheduler.address.rm2</name>
<value>localhost:33130</value>
</property>

<property>
<name>yarn.resourcemanager.webapp.address.rm2</name>
<value>localhost:33188</value>
</property>

<property>
<name>yarn.resourcemanager.resource-tracker.address.rm2</name>
<value>localhost:33125</value>
</property>

<property>
<name>yarn.resourcemanager.admin.address.rm2</name>
<value>localhost:33141</value>
</property>

HiveServer2 Architecture

HiveServer2 (HS2) is a service that enables clients to execute queries against Hive. HiveServer2 is the successor to HiveServer1 which has been deprecated. HS2 supports multi-client concurrency and authentication. It is designed to provide better support for open API clients like JDBC and ODBC.

HS2 is a single process running as a composite service, which includes the Thrift-based Hive service (TCP or HTTP) and a Jetty web server for web UI.

HS2 Architecture

The Thrift-based Hive service is the core of HS2 and responsible for servicing the Hive queries (e.g., from Beeline). Thrift is an RPC framework for building cross-platform services. Its stack consists of 4 layers: Server, Transport, Protocol, and Processor. You can find more details about the layers at

The usage of those layers in the HS2 implementation is described below.

Server

HS2 uses a TThreadPoolServer (from Thrift) for TCP mode, or a Jetty server for the HTTP mode.

The TThreadPoolServer allocates one worker thread per TCP connection. Each thread is always associated with a connection even if the connection is idle. So there is a potential performance issue resulting from a large number of threads due to a large number of concurrent connections. In the future, HS2 might switch to another server type for TCP mode, for example, TThreadedSelectorServer. Here is an article about a performance comparison between different Thrift Java servers.

Transport

HTTP mode is required when a proxy is needed between the client and server (for example, for load balancing or security reasons). That is why it is supported, as well as TCP mode. You can specify the transport mode of the Thrift service through the Hive configuration property hive.server2.transport.mode.

Protocol

The Protocol implementation is responsible for serialization and deserialization. HS2 is currently using TBinaryProtocol as its Thrift protocol for serialization. In the future, other protocols may be considered, such as TCompactProtocol, based on more performance evaluation.

Processor

Process implementation is the application logic to handle requests. For example, the ThriftCLIService.ExecuteStatement() method implements the logic to compile and execute a Hive query.

Dependencies of HS2

Metastore

The metastore can be configured as embedded (in the same process as HS2) or as a remote server (which is a Thrift-based service as well). HS2 talks to the metastore for the metadata required for query compilation. 

Hadoop cluster

HS2 prepares physical execution plans for various execution engines (MapReduce/Tez/Spark) and submits jobs to the Hadoop cluster for execution.

JDBC Client

The JDBC driver is recommended for the client side to interact with HS2. Note that there are some use cases (e.g., Hadoop Hue) where the Thrift client is used directly and JDBC is bypassed.

Here is a sequence of API calls involved to make the first query:

  • The JDBC client (e.g., Beeline) creates a HiveConnection by initiating a transport connection (e.g., TCP connection) followed by an OpenSession API call to get a SessionHandle. The session is created from the server side.
  • The HiveStatement is executed (following JDBC standards) and an ExecuteStatement API call is made from the Thrift client. In the API call, SessionHandle information is passed to the server along with the query information.
  • The HS2 server receives the request and asks the driver (which is a CommandProcessor) for query parsing and compilation. The driver kicks off a background job that will talk to Hadoop and then immediately returns a response to the client. This is an asynchronous design of the ExecuteStatement API. The response contains an OperationHandle created from the server side.
  • The client uses the OperationHandle to talk to HS2 to poll the status of the query execution.

Tuesday, January 23, 2018

YARN Application Execution

YARN application execution consists of the following steps:
  • Application submission.
  • Bootstrapping the Application Master instance for the application.
  • Application execution managed by the Application Master instance.


Let’s walk through an application execution sequence (steps are illustrated in the diagram):
  1. A client program submits the application, including the necessary specifications to launch the application-specific Application Master.
  2. The Resource Manager assumes responsibility for negotiating a specified Container in which to start the Application Master, and then launches the Application Master.
  3. On boot-up, the Application Master registers with the Resource Manager. The registration allows the client program to query the Resource Manager for details, which allows it to directly communicate with its own Application Master.
  4. During normal operation, the Application Master negotiates appropriate resource Containers via the resource-request protocol.
  5. Upon successful Container allocations, the Application Master launches the Container by providing the Container launch specification to the Node Manager. The launch specification typically includes the necessary information to allow the Container to communicate with the Application Master.
  6. The application code executing within the Container then provides necessary information (progress, status etc.) to its Application Master via an application-specific protocol.
  7. During the application execution, the client that submitted the program communicates directly with the Application Master to get status, progress updates, etc. via an application-specific protocol.
  8. Once the application is complete, the Application Master de-registers with the Resource Manager and shuts down, allowing its own Container to be repurposed.

Monday, January 22, 2018

Snapshots


HDFS Snapshots are read-only point-in-time copies of the file system. Snapshots can be taken on a subtree of the file system or the entire file system. Some common use cases of snapshots are data backup, protection against user errors and disaster recovery.

The implementation of HDFS Snapshots is efficient:
  • Snapshot creation is instantaneous: the cost is O(1) excluding the inode lookup time.
  • Additional memory is used only when modifications are made relative to a snapshot: memory usage is O(M), where M is the number of modified files/directories.
  • Blocks in datanodes are not copied: the snapshot files record the block list and the file size. There is no data copying.
  • Snapshots do not adversely affect regular HDFS operations: modifications are recorded in reverse chronological order so that the current data can be accessed directly. The snapshot data is computed by subtracting the modifications from the current data.

Snapshottable Directories

Snapshots can be taken on any directory once the directory has been set as snapshottable. A snapshottable directory is able to accommodate 65,536 simultaneous snapshots. There is no limit on the number of snapshottable directories. Administrators may set any directory to be snapshottable. If there are snapshots in a snapshottable directory, the directory can be neither deleted nor renamed before all the snapshots are deleted.

Nested snapshottable directories are currently not allowed. In other words, a directory cannot be set to snapshottable if one of its ancestors/descendants is a snapshottable directory.

Snapshot Paths

For a snapshottable directory, the path component “.snapshot” is used for accessing its snapshots. Suppose /foo is a snapshottable directory, /foo/bar is a file/directory in /foo, and /foo has a snapshot s0. Then, the path /foo/.snapshot/s0/bar refers to the snapshot copy of /foo/bar. The usual API and CLI can work with the “.snapshot” paths. The following are some examples.

Allow Snapshots

Allowing snapshots of a directory to be created. If the operation completes successfully, the directory becomes snapshottable.

Command:
hdfs dfsadmin -allowSnapshot <path>

Arguments:
path: The path of the snapshottable directory.

Disallow Snapshots

Disallowing snapshots of a directory to be created. All snapshots of the directory must be deleted before disallowing snapshots.

Command:
hdfs dfsadmin -disallowSnapshot <path>

Arguments:
path: The path of the snapshottable directory.

Create Snapshots

Create a snapshot of a snapshottable directory. This operation requires owner privilege of the snapshottable directory.

Command:
hdfs dfs -createSnapshot <path> [<snapshotName>]

Arguments:
path: The path of the snapshottable directory.
snapshotName: The snapshot name, which is an optional argument. When it is omitted, a default name is generated using a timestamp with the format "'s'yyyyMMdd-HHmmss.SSS", e.g. "s20130412-151029.033".

Delete Snapshots

Delete a snapshot of from a snapshottable directory. This operation requires owner privilege of the snapshottable directory.

Command:
hdfs dfs -deleteSnapshot <path> <snapshotName>

Arguments:
path: The path of the snapshottable directory.
snapshotName: The snapshot name.


Rename Snapshots

Rename a snapshot. This operation requires owner privilege of the snapshottable directory.

Command:
hdfs dfs -renameSnapshot <path> <oldName> <newName>

Arguments:
path: The path of the snapshottable directory.
oldName: The old snapshot name.
newName: The new snapshot name.

Get Snapshottable Directory Listing

Get all the snapshottable directories where the current user has permission to take snapshtos.

Command:
hdfs lsSnapshottableDir

Get Snapshots Difference Report

Get the differences between two snapshots. This operation requires read access privilege for all files/directories in both snapshots.

Command:
hdfs snapshotDiff <path> <fromSnapshot> <toSnapshot>

General HBase Tuning

When you tune HBase, you can improve the performance and balance the memory usage.

Updating environment variables (hbase-env.sh)

Depending on the availability of memory on the cluster nodes, you can use environment variables to tune the memory that is available to the HBase master server and the HBase region servers. You can also configure the garbage collector. As part of the HBase tuning process, consider the map reduce workload and the memory that is allocated to the map reduce JVMs.

The environment variables that help you control performance in HBase are in file $BIGINSIGHTS_HOME/hdm/components/hbase/conf/hbase-env.sh. After any changes to this file, run the following command to synchronize the HBase configuration across all nodes of the cluster:
$BIGINSIGHTS_HOME/bin/syncconf.sh hbase

Then stop and restart HBase with the following commands:

$BIGINSIGHTS_HOME/bin/stop.sh hbase
$BIGINSIGHTS_HOME/bin/start.sh hbase

Master and region server memory

Each region server contains regions that contain all of the data in a key range.

The HBASE_HEAPSIZE value is the maximum amount of heap to use, in MB. The default is 1000. 
This is small for an HBase system that is used regularly in your cluster. Give HBase as much memory as you can to avoid swapping to achieve good performance. The example uses a value of 8000, but you should tune the size based on your environment and workloads.

You can increase the HBase master server JVM heap size with the following command:

export HBASE_HEAPSIZE=8000

Then, increase the JVM heap size for the region servers with this command:

export HBASE_REGIONSERVER_OPTS="$HBASE_REGIONSERVER_OPTS -Xms8G -Xmx8g"

Garbage collection

HBase uses the JVM garbage collection subsystem, which reduces some memory management issues. Garbage collection is an automated system that handles both the allocation and reclamation of memory for Java objects.

For a JVM that contains less that 4 GB of memory, use gcpolicy=gencon. A suggested best practice is the following setting:
-Xms3000m -Xmx3000m -Xgcpolicy:gencon 

The -Xms<size> sets the initial size of the heap. The -Xmx<size> sets the maximum size of the heap.
For a JVM that contains more than 4 GB or memory, use policy=balanced. With this policy, you do not need to set anything beyond the initial size and the maximum size of the heap.

-Xms8192m -Xmx8192m -Xgcpolicy:balanced

You can manipulate the garbage collection options in HBASE_OPTS:
export HBASE_OPTS="$HBASE_OPTS -Xgcthreads2 -Xgcpolicy:gencon -Xalwaysclassgc" 

Updating configuration values (hbase-site.xml)

HBase site-specific customizations are in the file hbase-site.xml. Navigate to the $BIGINSIGHTS_HOME/hdm/components/hbase/conf directory and edit the hbase-site.xml file.

hbase.regionserver.handler.count

This parameter defines the number of threads that are kept open to answer incoming requests to user tables. The default value is 30.

A rule of thumb is to keep the value low when the payload for each request is large, and keep the value high when the payload is small. Increase the hbase.regionserver.handler.count to a value that is approximately the number of CPUs on the region servers.

<property>
  <name>hbase.regionserver.handler.count</name>
  <value>64</value>
</property>

hbase.hregion.max.filesize

This parameter is the maximum HStoreFile size. The default value is 10737418240. Decrease the region server size. Big SQL determines the number of mappers based on the region size. There is one mapper for each region.

<property>     
  <name>hbase.hregion.max.filesize</name>
  <value> 1073741824 </value>   
</property>

hbase.client.write.buffer

This parameter is the size of the HTable client write buffer in bytes. The default value is 2097152.
A bigger buffer takes more memory,on both the client and server side, but a larger buffer size reduces the number of remote procedure calls that are made. Increase the hbase.client.write.buffer value:

<property>     
  <name>hbase.client.write.buffer</name>     
  <value>8388608</value>  
</property>

hbase.client.scanner.caching

This parameter is the number of rows that are fetched when calling next on a scanner, if it is not served from memory. The default value is 100.

A higher caching value enables faster scanners, but uses more memory and some calls of next can take longer times when the cache is empty. Increase the scanner cache size to improve the performance of large reads.

<property>    
  <name>hbase.client.scanner.caching</name>    
  <value>10000</value>  
</property>

Saturday, January 20, 2018

HiVE Architecture














Figure shows the major components of Hive and its interactions with Hadoop. As shown in that figure, the main components of Hive are:

UI – The user interface for users to submit queries and other operations to the system. As of 2011 the system had a command line interface and a web based GUI was being developed.

Driver – The component which receives the queries. This component implements the notion of session handles and provides execute and fetch APIs modeled on JDBC/ODBC interfaces.

Compiler – The component that parses the query, does semantic analysis on the different query blocks and query expressions and eventually generates an execution plan with the help of the table and partition metadata looked up from the metastore.

Metastore – The component that stores all the structure information of the various tables and partitions in the warehouse including column and column type information, the serializers and deserializers necessary to read and write data and the corresponding HDFS files where the data is stored.

Execution Engine – The component which executes the execution plan created by the compiler. The plan is a DAG of stages. The execution engine manages the dependencies between these different stages of the plan and executes these stages on the appropriate system components.

How a typical query flows through the system

  1. The UI calls the execute interface to the Driver (step 1 in Figure). 
  2. The Driver creates a session handle for the query and sends the query to the compiler to generate an execution plan (step 2). 
  3. The compiler gets the necessary metadata from the metastore (steps 3 and 4). 
  4. This metadata is used to typecheck the expressions in the query tree as well as to prune partitions based on query predicates. 
  5. The plan generated by the compiler (step 5) is a DAG of stages with each stage being either a map/reduce job, a metadata operation or an operation on HDFS. For map/reduce stages, the plan contains map operator trees (operator trees that are executed on the mappers) and a reduce operator tree (for operations that need reducers). 
  6. The execution engine submits these stages to appropriate components (steps 6, 6.1, 6.2 and 6.3). 
  7. In each task (mapper/reducer) the deserializer associated with the table or intermediate outputs is used to read the rows from HDFS files and these are passed through the associated operator tree. 
  8. Once the output is generated, it is written to a temporary HDFS file though the serializer (this happens in the mapper in case the operation does not need a reduce).
  9. The temporary files are used to provide data to subsequent map/reduce stages of the plan. For DML operations the final temporary file is moved to the table's location.
  10. This scheme is used to ensure that dirty data is not read (file rename being an atomic operation in HDFS). For queries, the contents of the temporary file are read by the execution engine directly from HDFS as part of the fetch call from the Driver (steps 7, 8 and 9).

Kerberos Architecture
















There are following things to remember:

1. There are three parties involved in this process overall:
  1. Client : You, who want to access FileServer (Principal)
  2. KDC (It is made of two components)
    • Authentication Service
    • Ticket Granting Service
  3. FileServer : The actual resource which you want to access

2. In total 3 Secrete keys (1 for Client, 1 for File Server, 1 for KDC itself): Which never ever travels over the network.
  1. Client key resides on client machine as well as KDC
  2. Server Key resides on the Server machine as well as KDC
  3. KDC key resides only on KDC machine.

Client Machine
File Server Machine
KDC Machine
Client Key
Yes

Yes
Server Key

Yes
Yes
KDC Key


Yes

3. Total 2 Session keys, will be generated during the process and valid only for 8 hours session. (They will travel over the network and data is encrypted by these keys when communication happens between client and KDC, client and File Server).

  1. Client and KDC communication (Encrypted by Session Key 1)
  2. Client and FileServer communication (Encrypted by Session key2)


How the overall process works:


  1. Client/You want to access the FileServer in the network, but you have not authenticated user at first.
  2. You will send a request for getting “Ticket Granting Ticket” from the KDC.
    1. While sending the request your message will be encrypted by the Client Secrete key which, is only available with you and KDC.
    2. KDC checks its database whether you are real user or not and find the secrete key to decrypt your message.
  3. Now KDC will create TGT (Ticket Granting Ticket for you, this TGT is encrypted by KDC key and owned by KDC only) and a Session Key (S1).
    1. Bundle Both TGT+S1 and encrypt the bundle with user key and send to the client.
  4. Now at the client side, this bundle will be decrypted using Client Key. However, TGT cannot be decrypted because it is encrypted by KDC Key.
  5. Client will have to request the Service Ticket, so it can access the FileServer
    1. Client create a Authenticator object and encrypt it with Session Key1
    2. Bundle the TGT+Authnticator+reuest for FileServer and send to the KDC
  6. Now at KDC side, as it checks the bundle and find out that.
    1. TGT was not touched/altered in between great
    2. Using session key (S1) decrypt the Authnticator and verify the client, great
    3. And see that user need access to FileServer
  7. KDC will create a Service Ticket (Encrypted by Server Key, which is owned by only FileServer and KDC) and another session key S2. The bundle now contains the Service Ticket + Session Key 2).
    1. This bundle is encrypted by S1 (First Session key ) and sent to the client.
  8. At client side bundle will be decrypted using first Session key S1.
    1. Service Ticket (Can not be decrypted by client as it is owned by FileServer and client does not have it).
    2. Session Key (S2, second one, will be used for communication between client and server)
  9. Now Client has the ticket to request file server.
    1. Client Prepare a bundle (Server Ticket+Authenticator+Session Key s2).
    2. Send this bundle to File server
  10. At server side bundle will be opened.
    1. Server will check service ticket is encrypted by Server key and not impacted/touched.
    2. It will authenticate the user and send the acknowledgement to client.
  11. Now client and servers are authenticated and whatever communication happens between client and server will always by encrypted using session key (S2).

Thursday, January 18, 2018

TaskTracker Blacklisting

In the event that a TaskTracker is not performing properly, it can be blacklisted so that no jobs will be scheduled to run on it. There are two types of TaskTracker blacklisting:

  • Per-job blacklisting, which prevents scheduling new tasks from a particular job
  • Cluster-wide blacklisting, which prevents scheduling new tasks from all jobs.

Per-Job Blacklisting


The configuration value mapred.max.tracker.failures in mapred-site.xml (MapReduce v1) specifies a number of task failures in a specific job after which the TaskTracker is blacklisted for that job. The TaskTracker can still accept tasks from other jobs, as long as it is not blacklisted cluster-wide (see below).
A job can only blacklist up to 25% of TaskTrackers in the cluster.

Cluster-Wide Blacklisting


  • The number of blacklists from successful jobs (the fault count) exceeds                             mapred.max.tracker.blacklists.
The parameter mapred.job.impact.blacklisting in the mapred-site.xml file lets you specify whether job failures should count toward the threshold set withmapred.max.tracker.blacklists. This parameter can be helpful when you are testing and know that jobs are likely to fail.
  
  • The TaskTracker has been manually blacklisted using hadoop job -blacklist-tracker <host>
  • The status of the TaskTracker (as reported by a user-provided health-check script) is not healthy.
If a TaskTracker is blacklisted, any currently running tasks are allowed to finish, but no further tasks are scheduled. If a TaskTracker has been blacklisted due to mapred.max.tracker.blacklists or using the hadoop job -blacklist-tracker <host> command, un-blacklisting requires a TaskTracker restart.

Only 50% of the TaskTrackers in a cluster can be blacklisted at any one time.

After 24 hours, the TaskTracker is automatically removed from the blacklist and can accept jobs again.

Blacklisting a TaskTracker Manually


To blacklist a TaskTracker manually, run the following command as the administrative user:

hadoop job -blacklist-tracker <hostname>

Manually blacklisting a TaskTracker prevents additional tasks from being scheduled on the TaskTracker. Any currently running tasks are allowed to finish.

Un-blacklisting a TaskTracker Manually


If a TaskTracker is blacklisted per job, you can un-blacklist it by running the following command as the administrative user:

hadoop job -unblacklist <jobid> <hostname>

If a TaskTracker has been blacklisted cluster-wide due to mapred.max.tracker.blacklists or using the hadoop job -blacklist-tracker <host> command, use the following command as the administrative user to remove that node from the blacklist:

hadoop job -unblacklist-tracker <hostname>

If a TaskTracker has been blacklisted cluster-wide due to a non-healthy status, correct the indicated problem and run the health check script again. When the script picks up the health status, the TaskTracker is un-blacklisted.

Wednesday, January 17, 2018

Disaster - Recovery using Secondary Namenode

Disaster:

1. Shutdown secondary namenode

/etc/init.d/hadoop-hdfs-secondarynamenode stop

2. Force a checkpoint on secondary namenode

hdfs secondarynamenode -checkpoint force

3. Shutdown namenode

/etc/init.d/hadoop-hdfs-namenode stop

4. On namenode, move dfs.namenode.name.dir to a different location, and create an empty directory.

[root@hdm name]# pwd
/data/nn/dfs/name
[root@hdm name]# mv current /tmp/backup_nn_current
[root@hdm name]# mkdir current
[root@hdm name]# chown hdfs:hadoop current

5. Then namenode will fail to start.

Recovery:

1. Create an empty directory specified in the dfs.namenode.checkpoint.dir configuration variable.

mkdir -p /data/secondary_nn/dfs/namesecondary
chown hdfs:hadoop /data/secondary_nn/dfs/namesecondary

2. Scp fsimage and edit logs from secondary namenode to namenode's dfs.namenode.checkpoint.dir.

[root@hdw3 namesecondary]# pwd
/data/secondary_nn/dfs/namesecondary
[root@hdw3 namesecondary]# scp -r current hdm:/data/secondary_nn/dfs/namesecondary/

3. Change owner and group on namenode

chown -R hdfs:hadoop /data/secondary_nn/dfs/namesecondary/*

4. Namenode import checkpint

hdfs namenode -importCheckpoint

5. Restart HDFS cluster.

Tuesday, January 16, 2018

Planning of Hadoop Cluster

Many customers ask what kind of machine to purchase to be used in a Hadoop environment, and what configuration to use. The answer to this can be essentially derived from some simple calculations that I want to write about and demonstrate.

Sizing a Hadoop cluster is important, as the right resources will allow you to optimize the environment for your purpose.  However, this is no simple task as optimizing a distributed environment and its related software can have its complexities.

The number of machines, and specs of the machines, depends on a few factors:
  • The volume of data (obviously)
  • The data retention policy (how much can you afford to keep before throwing away)
  • The type of workload you have (data science/CPU driven vs “vanilla” use case/IO-bound)
  • Also the data storage mechanism (data container, type of compression used if any)
We have to make some assumptions from the beginning; otherwise there are just too many parameters to deal with. These assumptions drive the data nodes configuration.The other types of machines (Name Node/Job tracker, in Hadoop ) will need different specs, and are generally more straightforward.

Hardware Requirement for NameNodes:

Both Active and Passive NameNode servers should have highly reliable storage for namespace storage and edit-log journaling. 4-6 1TB SAS disk storage requirement is recommended. One separate for OS[RAID 1] 2 for the FS image [RAID 5/6]. Please remember, the disk on which you are planning to store File-system Image and edits-log, should be RAID. JournalNode should also need reliable storage to store edits-log. If your cluster is HA cluster, then plan your hadoop cluster in such a way that JNs should be configured on Master Node. At least 3 JNs is required for a typical HA cluster.

Hardware Requirement for JobTracker/ResourceManager:

JobTracker/ResourceManager servers do not need a RAID storage, because they save their persistent state to HDFS. The JobTracker/ResourceManger server can also run on a slave node with a bit of extra RAM. Therefore commodity hardware can be used for it. If you want to migrate NameNode and JobTracker/ResourceManager on same server then reliable hardware is recommended.

  • Memory sizing
The amount of memory required for the master nodes depends on the size of data i.e. number of file system objects (files and block replicas) to be created and tracked by the NameNode. Typically memory ranges should be 24GB to 64 GB.
  • Processors:
NameNodes and its clients are very “chatty”. We therefore recommend providing 2 CPU (8 or even 12 CPU cores on each CPU)to handle messaging traffic for the master nodes. Single CPU with 16 or even 24 CPU cores can also be used. CPU clock speed should be running at least 2-2.5GHz.

Hardware Requirement for SlavesNodes: In general, when considering higher-performance vs lower performance components.
“Save the money, buy more nodes!” 

The HDFS’ configuration is usually set up to replicate the data 3 ways. So you will need 3x the actual storage capacity for your data. In addition, you will need to sandbag the machine capacity for temporary storage for computation (i.e. storage for transient Map outputs stays local to the machine, it doesn’t get stored on HDFS. Also, local storage for compression is needed). A good rule of thumb is to keep the disks at 70% capacity. Then we also need to take into account the compression ratio.

Let’s take an example:

Say we have 70TB of raw data to store on a yearly basis (i.e. moving window of 1 year). So after compression (say, with Gzip) we will get 70 – (70 * 60%) = 28Tb that will multiply by 3x = 84, but keep 70% capacity: 84Tb = x * 70% thus x = 84/70% = 120Tb is the value we need for capacity planning.

Number of nodes: Here are the recommended specifications for DataNode/TaskTrackers in a balanced Hadoop cluster from Cloudera:

  • 12-24 1-4TB hard disks in a JBOD (Just a Bunch Of Disks) configuration (no RAID, please!)
  • multi-core CPUs, running at least 2-2.5GHz
  • So let’s divide up the value we have in capacity planning by the number of hard disks we need in a way that makes sense: 120Tb/12 1Tb = 10 nodes.
Number of tasks per node: First, let’s figure out the # of tasks per node.Usually count 1 core per task. If the job is not too heavy on CPU, then the number of tasks can be greater than the number of cores.

Example: 12 cores, jobs use ~75% of CPU
Let’s assign free slots= 14 (slightly > # of cores is a good rule of thumb), maxMapTasks=8, maxReduceTasks=6.

Memory: Now let’s figure out the memory we can assign to these tasks. By default, the tasktracker and datanode take up each 1 GB of RAM per default. For each task calculate mapred.child.java.opts (200MB per default) of RAM. In addition, count 2 GB for the OS. So say, having 24 Gigs of memory available, 24-2= 22 Gig available for our 14 tasks – thus we can assign 1.5 Gig for each of our tasks (14 * 1.5 = 21 Gigs).

 The memory requirement depends on the type of your hadoop cluster like Balanced workload or compute intensive workload. If your cluster is a compute intensive cluster then more memory, CPU cores are required on slaves for faster processing.

All slaves machine should be ability to add additional CPUs, disk and RAM in future. This can help in expanding an existing cluster without adding more racks or network changes, if you are plan to expand your hadoop cluster in future. Alternatively you can also add additional machines.

Network Considerations

Hadoop is very bandwidth-intensive! Often, all nodes are communicating with each other at the same time.
  • Use dedicated switches for your Hadoop cluster
  • Nodes are connected to a top-of-rack switch
  • Nodes should be connected at a minimum speed of 1Gb/sec
  • For clusters where large amounts of intermediate data is generated, consider 10Gb/sec connections –  Expensive –  Alternative: bond two 1Gb/sec connections to each node
  • Racks are interconnected via core switches
  • Core switches should connect to top-of-rack switches at 10Gb/ sec or faster
  • Beware of oversubscription in top-of-rack and core switches
  • Consider bonded Ethernet to mitigate against failure
  • Consider redundant top-of-rack and core switches
Operating System Recommendations:

You always should choose an OS where you’re comfortable to administering them.
  • CentOS: geared towards servers rather than individual workstations –  Conservative about package versions –  Very widely used in production
  • RedHat Enterprise Linux (RHEL): RedHat-supported analog to CentOS –  Includes support contracts, for a price
  • In production, we often see a mixture of RHEL and CentOS machines –  Often RHEL on master nodes, CentOS on slaves
  • Fedora Core: geared towards individual workstations –  Includes newer versions of software, at the expense of some stability –  We recommend server-based, rather than workstation-based, Linux distributions
  • Ubuntu: Very popular distribution, based on Debian –  Both desktop and server versions available –  Try to use an LTS (Long Term Support) version
  • SuSE: popular distribution, especially in Europe –  Cloudera provides CDH packages for SuSE
  • Solaris, OpenSolaris: not commonly seen in production clusters
Conclusion:

Apache Hadoop Cluster design is a serious platform-engineering project and design decisions need to be well understood. What you can get away with in a small cluster may cause issues as the cluster grows. I tried to explain the basics of Hadoop Cluster design and.I hope it will be helpful to you but will always suggest you to discuss with your distribution provider whenever you are planning to setup a cluster.

Kafka Architecture

Apache Kafka is a distributed publish-subscribe messaging system and a robust queue that can handle a high volume of data and enables you t...