Thursday, October 16, 2014

My SQL



Issues:
1) Conn't connect to mysql gives following error:
    ERROR 2002 (HY000): Can't connect to local MySQL server through socket '/var/lib/mysql/mysql.sock' (2) 


mysqldump -uroot -p****** metastore > /tmp/divakar.sql


mysqldump -uroot -p***** metastore < /tmp/divakar.sql

Hadoop Administration Part 15 : Sentry


Friday, October 3, 2014

Shell Scripting Part 1

[root@script]# cat first.sh
# my first program
#clear
echo "Knowledge is power"
[root@ebdp-wc-d01d script]#

./first.sh
Knowledge is power
-------------------------------------------
Script to print user information who currently login , current date & time
[root@ebdp- script]# cat userinfo.sh
#user info who logged into system date & time
echo "Hello $USER"
echo "Today's date is ";date
echo "Number of user login ";  who | wc -l
echo "Calender"
cal
exit 0
[root@ebdp-script]#
-----------------------------------------
[root@ebdp-wc-d01d script]# cat math.sh
echo "your first name please:"
read fname
echo "Hello $fname, have a fun"
[root@ebdp-wc-d01d script]#
--------------------------------------
vi script.sh
#! /bin/bash
echo "Hello World"
chmod +x script.sh
./script.sh
output:
Hello World
---------------------------------------
-sh-4.1$ which bash
/bin/bash
-sh-4.1$
--------------------------
-sh-4.1$ cat sample1_string.sh
#! /bin/bash
STRING="Hellp World"
echo $STRING
-sh-4.1$
-sh-4.1$ chmod +x sample1_string.sh
-sh-4.1$ ./sample1_string.sh
Hellp World
---------------------------------------

Tuesday, September 30, 2014

Hadoop Administration Part 12 : Job Scheduling

FIFO Scheduler: The default job schedule is queue-based and uses FIFO (First In First Out) ordering. The FIFO queue scheduler runs jobs based on the order in which the jobs were submitted. You can prioritize a job by changing the value of the mapred.job.priority property or by calling the setJobPriority() method

Fair Scheduler:   Allows multiple users of cluster a fair share simultaneously. Each job is assigned with a pool and each pool is assigned with an even share of available task slots.

Capacity Scheduler: Support multiple queues. Queues are guaranteed a fraction of the capacity of the grid (their 'guaranteed capacity') in the sense that a certain capacity of resources will be at their disposal. All jobs submitted to a queue will have access to the capacity guaranteed to the queue.

yarn.scheduler.capacity.maximum-am-resource-percent=0.2
yarn.scheduler.capacity.maximum-applications=10000
yarn.scheduler.capacity.node-locality-delay=40
yarn.scheduler.capacity.root.acl_administer_queue=*
yarn.scheduler.capacity.root.capacity=100
yarn.scheduler.capacity.root.default.acl_administer_jobs=*
yarn.scheduler.capacity.root.default.acl_submit_applications=*
yarn.scheduler.capacity.root.default.capacity=50
yarn.scheduler.capacity.root.default.maximum-capacity=100
yarn.scheduler.capacity.root.default.minimum-user-limit-percent=10
yarn.scheduler.capacity.root.default.state=RUNNING
yarn.scheduler.capacity.root.default.user-limit-factor=1
yarn.scheduler.capacity.root.hiveserver.capacity=50
yarn.scheduler.capacity.root.hiveserver.hive1.capacity=50
yarn.scheduler.capacity.root.hiveserver.hive1.user-limit-factor=4
yarn.scheduler.capacity.root.hiveserver.hive2.capacity=50
yarn.scheduler.capacity.root.hiveserver.hive2.user-limit-factor=4
yarn.scheduler.capacity.root.hiveserver.queues=hive1,hive2
yarn.scheduler.capacity.root.queues=default,hiveserver

yarn.scheduler.capacity.root.unfunded.capacity=50

yarn.scheduler.maximum-allocation-mb=237568
yarn.scheduler.minimum-allocation-mb=5999
YARN Java heap size=4096











Important mapred-site.xml properties for Fair Schedule:
<property>
  <name>mapred.fairscheduler.assignmultiple</name>
  <value>true</value>
  <description> MapRConf </description>
</property>

<property>
  <name>mapred.fairscheduler.eventlog.enabled</name>
  <value>false</value>
  <description>Enable scheduler logging in ${HADOOP_LOG_DIR}/fairscheduler/
  MapRConf </description>
</property>

<property>
  <name>mapred.fairscheduler.smalljob.schedule.enable</name>
  <value>true</value>
  <description>Enable small job fast scheduling inside fair scheduler.TaskTrackers should reserve a slot called ephemeral slot which is used for smalljob if cluster is busy. MapRConf </description>
</property>

<property>
  <name>mapred.fairscheduler.smalljob.max.maps</name>
  <value>10</value>
  <description>Small job definition. Max number of maps allowed in small job. MapRConf
  </description>
</property>

<property>
  <name>mapred.fairscheduler.smalljob.max.reducers</name>
  <value>10</value>
  <description>Small job definition. Max number of reducers allowed in small job. MapRConf
  </description>
</property>

<property>
  <name>mapred.fairscheduler.smalljob.max.inputsize</name>
  <value>10737418240</value>
  <description>Small job definition. Max input size in bytes allowed for a small job. Default is 10GB.
  MapRConf </description>
</property>

<property>
  <name>mapred.fairscheduler.smalljob.max.reducer.inputsize</name>
  <value>1073741824</value>
  <description>Small job definition.Max estimated input size for a reducer allowed in small job. Default is 1GB per reducer.  MapRConf </description>
</property>

<property>
  <name>mapred.cluster.ephemeral.tasks.memory.limit.mb</name>
  <value>200</value>
  <description>Small job definition. Max memory in mbytes reserved for an ephermal slot.
  Default is 200mb. This value must be same on JobTracker and TaskTracker nodes. MapRConf
  </description>
</property>

<property>
  <name>mapreduce.jobtracker.node.labels.file</name>
  <value></value>
  <description>File on maprfs that has mapping of nodes and labels.</description>
</property>

<property>
  <name>mapred.tasktracker.ephemeral.tasks.maximum</name>
  <value>1</value>
  <description>Reserved slot for small job scheduling MapRConf </description>
</property> 

Monday, September 29, 2014

Hadoop Administration - Cluster Management


Basic knowledge on Slots





ü  Each node contains -> 24 CPUS = 12 Cores(Physical)
ü  Total nodes (Masters & Slaves) = 108
ü  Number of Nodes Excluding Masters (CLDB & Jt) = 100
ü  Total Cores=100 nodes *12 Cores = 1200 Cores
ü  Total number of Cores after enabling Hyper Threading = 1200*2 = 2400 Cores
ü  Each Node will take 2 cores one for OS and 2nd one for File System,Warden and TT à 100 nodes will consume 200 cores
ü  Total Number of remaining Cores = 2400 -200 = 2200 Cores
ü  Roughly Each Core will handle 1.5 slots.
ü  2200 Cores * 1.5 = 3300 Slots ( Map Task & Reduce Tasks)

   Example 2:
   Total nodes = 13 data nodes
    Each data node = 22 Physical ( 44 CPUs, after hyper thread enabled)
    Total CPUs = 13*44 = 572 cores ( 1 CPU = 1 Core)
     Each data node will consume 2 cores one for OS and another one for Hadoop Service.
     13*2 = 26
    Remaining CPUs = 572-26=546
    Total number of slots = 546*1.5 (  Roughly Each Core will handle 1.5 slots)
    Total slots = 819 slots for Mappers and Reducers    


Disk Size on each node:












Master Node:
Memory : 94.4 GB
Disk Space =883 GB/3 disks 
CPU =32
Network I/O =34.6 KB/Sec
Disk I/O = 144KB/Sec

Slave Node:
 Memory =126 GB
Disk Space = 30.9 TB /12 disks (2.5 TB/disk)
CPU=24



Chuck Size:

Hadoop Administration Part 10 : RAID in Linux

RAID stands for Redundant Array of Inexpensive Disks. This is a solution where several physical hard disks (two or more) are governed by a unit called RAID controller, which turns them into a single, cohesive data storage block.

An example of a RAID configuration would be to take two hard disks, each 80GB in size, and RAID them into a single unit 160GB in size. Another example of RAID would be to take these two disks and write data to each, creating two identical copies of everything.


RAID controllers can be implemented in hardware, which makes the RAID completely transparent to the operating systems running on top of these disks, or it can be implemented in software, which is the case we are interested in.

Saturday, September 27, 2014

Hadoop Administration 9 : Pre and Post System Validation

System Validation -Pre Install:

Test Case                               Expected Output
Disk Check                             All the disks should pass the test
Memory                                All the nodes should pass memory test
RPC Test                                All the nodes should pass RPC Test
Network Test                        All the nodes should pass network test
Cluster Audit Script              All the nodes should report same configuration
CPU Test

Memory Check:
The free command allows you to display the amount of free and used memory on the system. To do so, type the following at a shell prompt:

The free command provides information about both the physical memory (Mem) and swap space (Swap). It displays the total amount of memory (total), as well as the amount of memory that is in use (used), free (free), shared (shared), in kernel buffers (buffers), and cached (cached). For example:

-sh-4.1$ free -m
                     total       used       free     shared    buffers     cached
Mem:         96709      90716       5993          0        742      85233
-/+ buffers/cache:       4740      91968
Swap:        32767        420      32347
-sh-4.1$



Post Validation:
Test Case                                Expected Output
DFSIO                                    Puts IO load on the cluster and completes successfully
TeraGen                                 Puts load on the cluster and jobs run successfully
TeraSort                                  Puts load on the cluster and jobs run successfully
Cloudera/MapR Audit
More Benchmarking


Hardware Failures:
Ensure that "No Interruption to services" when you perform below activity:

Power off Data Node –Pulling the Plug
Stop Network Interface for Data Node
Disk controller Failure for Data Node – Pull the hard disk out of the box.
Disk Controller Failure for CLDB Node – Pull the hard disk out of the box.
Disk Controller Failure for MySQL Node – Pull the hard disk out of the box.
Network Interface Failure for Hive Metastore Node
Power off Hive Metastore Node - – Pulling the Plug
Power off MySQL Node – Pulling the Plug
Power off entire RACK
Network Switch Failure – We need to engage Network team to figure out how to do this and if it is doable.

Friday, September 26, 2014

Hadoop Development Part 1 : Flume


Hadoop Administration Part 8 : Flume Installation

Flume:
Apache Flume is a distributed, reliable, and available system for efficiently collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store.

The use of Apache Flume is not only restricted to log data aggregation. Since data sources are customizable, Flume can be used to transport massive quantities of event data including but not limited to network traffic data, social-media-generated data, email messages and pretty much any data source possible Installing Flume

System Requirements:

  1. Java Runtime Environment - Java 1.6 or later (Java 1.7 Recommended)
  2. Memory - Sufficient memory for configurations used by sources, channels or sinks
  3. Disk Space - Sufficient disk space for configurations used by channels or sinks
  4. Directory Permissions - Read/Write permissions for directories used by agent
Architecture:

Data Flow model:
A Flume event is defined as a unit of data flow having a byte payload and an optional set of string attributes. A Flume agent is a (JVM) process that hosts the components through which events flow from an external source to the next destination (hop).










A Flume source consumes events delivered to it by an external source like a web server. The external source sends events to Flume in a format that is recognized by the target Flume source. For example, an Avro Flume source can be used to receive Avro events from Avro clients or other Flume agents in the flow that send events from an Avro sink. A similar flow can be defined using a Thrift Flume Source to receive events from a Thrift Sink or a Flume Thrift Rpc Client or Thrift clients written in any language generated from the Flume thrift protocol. When a Flume source receives an event, it stores it into one or more channels. The channel is a passive store that keeps the event until it’s consumed by a Flume sink. The file channel is one example – it is backed by the local filesystem. The sink removes the event from the channel and puts it into an external repository like HDFS (via Flume HDFS sink) or forwards it to the Flume source of the next Flume agent (next hop) in the flow. The source and sink within the given agent run asynchronously with the events staged in the channel.

Install Flume:

yum install mapr-flume 

Configuration File:
# the channels and the sinks.
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# Describe/configure source1                                                                                                      
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -f /opt/mapr/logs/configure.log
# Describe sink1 
agent1.sinks.sink1.type = hdfs
agent.sinks.sink-1.hdfs.kerberosPrincipal = kuser1
agent.sinks.sink-1.hdfs.kerberosKeytab = /tmp/krb5new_0.keytab
agent1.sinks.sink1.hdfs.path = /user/root/flume/logtest4_kerberosTKT/
agent1.sinks.sink1.hdfs.filePrefix = LogCreateTest
# Number of seconds to wait before rolling current file (0 = never roll based on time interval)     
agent1.sinks.sink1.hdfs.rollInterval = 6
# File size to trigger roll, in bytes (0: never roll based on file size)                                                          
agent1.sinks.sink1.hdfs.rollSize = 0
#Number of events written to file before it rolled (0 = never roll based on number of events)       
agent1.sinks.sink1.hdfs.rollCount = 10000
# number of events written to file before it flushed to HDFS                                                                      
agent1.sinks.sink1.hdfs.batchSize = 10000
agent1.sinks.sink1.hdfs.txnEventMax = 40000
# -- Compression codec. one of following : gzip, bzip2, lzo, snappy                                                           
# hdfs.codeC = gzip                                                                                                               
#format: currently SequenceFile, DataStream or CompressedStream                                                    #(1)DataStream will not compress output file and please don't set codeC                                                           
#(2)CompressedStream requires set hdfs.codeC with an available codeC                                            agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.maxOpenFiles=50
# -- "Text" or "Writable"                                                                                                         
#hdfs.writeFormat                                                                                                                 
agent1.sinks.sink1.hdfs.appendTimeout = 10000
agent1.sinks.sink1.hdfs.callTimeout = 10000
# Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)                                               
agent1.sinks.sink1.hdfs.threadsPoolSize=100
# Number of threads per HDFS sink for scheduling timed file rolling                                                     
agent1.sinks.sink1.hdfs.rollTimerPoolSize = 1
# hdfs.kerberosPrin--cipal Kerberos user principal for accessing secure HDFS                                            
# hdfs.kerberosKey--tab Kerberos keytab for accessing secure HDFS                                                                 
# hdfs.round false Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)              
# hdfs.roundValue1 Rounded down to the highest multiple of this (in the unit configured using                                     
# hdfs.roundUnit), less than current time.                                                                                        
# hdfs.roundUnit second The unit of the round down value - second, minute or hour.                                                
# serializer TEXT Other possible options include AVRO_EVENT or the fully-qualified class name of an implementation of the EventSerializer.Builder interface.
# serializer.*                                                                                                                    
# Use a channel which buffers events to a file                                                                                    
# -- The component type name, needs to be FILE.                                                                                   
agent1.channels.channel1.type = FILE
# checkpointDir ~/.flume/file-channel/checkpoint The directory where checkpoint file will be stored                               
# dataDirs ~/.flume/file-channel/data The directory where log files will be stored                                                
# The maximum size of transaction supported by the channel                                                                 
agent1.channels.channel1.transactionCapacity = 1000000
# Amount of time (in millis) between checkpoints                                                                                  
agent1.channels.channel1.checkpointInterval 30000
# Max size (in bytes) of a single log file                                                                                        
agent1.channels.channel1.maxFileSize = 2146435071
# Maximum capacity of the channel                                                                                                 
agent1.channels.channel1.capacity 10000000
#keep-alive 3 Amount of time (in sec) to wait for a put operation                                                            write-timeout 3 Amount of time (in sec) to wait for a write operation                                                   
# Bind the source and sink to the channel                                                                                         
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1

Thursday, September 25, 2014

Hadoop Administration Part 7 : Hadoop Performance Tuning

Performance Tuning:
By default, the MapR cluster is tuned to perform well for most workloads. However, in certain circumstances, you might want to manually tune the MapR Filesystem to provide higher performance.

This section includes the following topics:
1) Configure NFS Write Performance
2) Chunk Size
3) Increase Caching

Configure NFS Write Performance:
The kernel tunable value sunrpc.tcp_slot_table_entries represents the number of simultaneous Remote Procedure Call (RPC) requests. This tunable's default value is 16. Increasing this value to 128 may improve write speeds. Use the command sysctl -w sunrpc.tcp_slot_table_entries=128 to set the value. Add an entry to your sysctl.conf file to make the setting persist across reboots.NFS write performance varies between different Linux distributions. This suggested change may have no or negative effect on your particular cluster.

Chunk Size:
Chunk size affects parallel processing and random disk I/O during MapReduce jobs. A higher chunk size means less parallel processing because there are fewer map inputs, and therefore fewer mappers. A lower chunk size improves parallelism, but results in higher random disk I/O during shuffle because there are more map outputs. Set the io.sort.mb parameter to a value between 120% and 150% of the chunk size.
Here are general guidelines for chunk size:
For most purposes, set the chunk size to the default 256 MB and set the value of the io.sort.mb parameter to the default 380 MB.
On very small clusters or nodes with not much RAM, set the chunk size to 128 mb and set the value of the io.sort.mb parameter to 190 MB.

If application-level compression is in use, the i.o.sort.mb parameter should be at least 380MB.

Setting Chunk Size:
You can set the chunk size for a given directory in two ways:
Change the ChunkSize attribute in the .dfs_attributes file at the top level of the directory
Use the command hadoop mfs -setchunksize <size> <directory>

For example, if the volume test is NFS-mounted at /mapr/my.cluster.com/projects/test you can set the chunk size to 268,435,456 bytes by editing the file /mapr/my.cluster.com/projects/test/.dfs_attributes and setting ChunkSize=268435456. To accomplish the same thing from the hadoop shell, 
use the following command:
hadoop mfs -setchunksize 268435456 /mapr/my.cluster.com/projects/test

Increase Caching:
If you can give more memory to MapR-FS, performance improves due to greater data caching. If your main constraint is disk I/O, this is especially important. For the parameters that you can configure to give warden more memory, see Memory Allocation for Nodes.

Memory for the MapR-FS
By default, Warden allocates 35% of available memory to MapR-FS to allow adequate memory for MapR-DB. If you do not intend to use MapR-DB, you can set the -noDB option in configure.sh to specify that 20% of the memory available should be allocated to MapR-FS.

Hadoop Administration Part 6 : Topology & Data Protection

Topology:
While creating a volume we can set the Topology to specify which rack or nodes the volume will occupy.Topology scripts are used by hadoop to determine the rack location of nodes. This information is used by hadoop to replicate block data to redundant racks.

Setting Up Node Topology

Topology paths can be as simple or complex as needed to correspond to your cluster layout. In a simple cluster, each topology path might consist of the rack only (for example, /rack-1). In a deployment consisting of multiple large datacenters, each topology path can be much longer (for example, /europe/uk/london/datacenter2/room4/row22/rack5/).Establish a /data topology path to serve as the default topology path for the volumes in that cluster.Establish a  /decommissioned topology path that is not assigned to any volumes.

When you need to migrate a data volume off a particular node, move that node from the /data path to the /decommissioned path. Since no data volumes are assigned to that topology path, standard data replication will migrate the data off that node to other nodes that are still in the /data topology path.
For large clusters, you can specify complex topologies in a text file or by using a script. Each line in the text file or script output specifies a single node and the full topology path for that node in the following format:

The text file or script must be specified and available on the local filesystem on all CLDB nodes:

To set topology with a text file, set net.topology.table.file.name in /opt/mapr/conf/cldb.conf to the text file name.If you specify a script and a text file, the MapR system uses the topology specified by the script.

Setting up Topology in MapR:












Data Protection:
You can use MapR to protect your data from hardware failures, accidental overwrites, and natural disasters. MapR organizes data into volumes so that you can apply different data protection strategies to different types of data. The following scenarios describe a few common problems and how easily and effectively MapR protects your data from loss.
This page contains the following topics:

Scenario 1) : Hardware Failure
Solution: Topology and Replication Factor

Scenario 2) : Accidental Overwrite
Solution: Snapshots

Scenario 3) : Disaster Recovery
Solution: Mirroring to Another Cluster

Hadoop Administration Part 5 : Snapshot and Schedules

Snapshot:
Snapshots are a useful feature for backup / restore, data import / export from environments etc

A snapshot is a read-only image of a volume at a specific point in time. On clusters with an M5 or higher license, you can create a snapshot manually or automate the process with a schedule. Snapshots are useful any time you need to be able to roll back to a known good data set at a specific point in time. 

For example, before performing a risky operation on a volume, you can create a snapshot to enable rollback capability for the entire volume. A snapshot takes no time to create, and initially uses no disk space, because it stores only the incremental changes needed to roll the volume back to the state at the time the snapshot was created. The storage used by a volume's snapshots does not count against the volume's quota. When you view the list of volumes on your cluster in the MapR Control System, the value of the Snap Size column is the disk space used by all of the snapshots for that volume.

Creating a Volume Snapshot
You can create a snapshot manually or use a schedule to automate snapshot creation. Each snapshot has an expiration date that determines how long the snapshot will be retained:

1) When you create the snapshot manually, specify an expiration date.
2) When you schedule snapshots, the expiration date is determined by the Retain parameter of the schedule.

For more information about scheduling snapshots, see Scheduling a Snapshot.

Viewing the Contents of a Snapshot
At the top level of each volume is a directory called .snapshot containing all the snapshots for the volume. You can view the directory with hadoop fs commands or by mounting the cluster with NFS. To prevent recursion problems, ls and hadoop fs -ls do not show the .snapshot directory when the top-level volume directory contents are listed. You must navigate explicitly to the .snapshot directory to view and list the snapshots for the volume.
Example:
bash$/opt/mapr/bin# hadoop fs -ls /user/divakar/.snapshot
Found 1 items
drwxrwxrwx   - root root          1 2011-06-01 09:57 /user/divakar/.snapshot/2014-04-04.09-57-49
Copy from Snapshot to Volumes
cp -v /user/divakar/.snapshot/2014-04-04.09-57-49 /user/data/

Selecting Snapshot while Creating Volumes/Mirrors









Schedules:
A schedule is a group of rules that specify recurring points in time at which certain actions are determined to occur. You can use schedules to automate the creation of snapshots and mirrors; after you create a schedule, it appears as a choice in the scheduling menu when you are editing the properties of a task that can be scheduled:

1) To apply a schedule to snapshots, see Scheduling a Snapshot.
2) To apply a schedule to volume mirroring, see Creating Volumes..

Prod Data:








User Data:


Hadoop Administration Part 4 : Distcp and Mirroring

Distcp:

The Hadoop distcp is a tool used for large inter-and intra-cluster copying.It uses MapReduce to effect distribution, error handling and recovery and reporting. Where the work of copying is done by the ‘maps’ that run in parallel across the cluster. There are no reducers.

Syntax:
hadoop  [generic options]  distcp  <source>  <destination>

MaprR to MapR
hadoop distcp maprfs:///mapr/cluster1name/user/data/jobs/udf/ maprfs:///mapr/cluster2name/user/

MapR to Hortonworks:
hadoop distcp -i -p maprfs:///mapr/prodcluster/db/aps/base/ivr/ech_national /db/aps/base/ivr/ ( This command needs to be run from Hortonworks side)

Each file is copied by a single map and distcp tries to give each map approximately the same amount of data, by bucketing files into roughly equal allocations. The minimum size of map is 256 MB for example if there is 1 GB of data. It will be required 4 maps to copy that data. Generally there will be maximum 20 maps per tasktracker.

If there is 1000 GB data to be copied in 100 node cluster. We need to insert the data in 2000 maps where each  map should copy  512 MB average.We can reduce the maps by giving  -m  option.  If we give –m 1000 for above example it will allocate 1000 maps where each map will copy 1 GB on average.  

Some Error Messages:

Copy failed: java.io.IOException: Cluster comcaststcluster has no entry in /opt/mapr//conf/mapr-clusters.conf

Copy failed: org.apache.hadoop.mapred.InvalidInputException: Input source maprfs:/mapr/cluster/home/da001c/nivr_ndw_ivr_detail.java does not exist

mapr-clusters.conf file
Source and Target cluster name and CLDB host information needs to be added.

[root@ebdp-ch2-e001s mapr]# cat /opt/mapr//conf/mapr-clusters.conf
stgcluster ebdp-ch2-s.sys.net:7222 ebdp-ch2-c006s.sys..net:7222
[root@ebdp-ch2-e001s mapr]#

Errors:
15/01/27 22:16:33 ERROR tools.DistCp: Exception encountered
java.io.IOException: Failed on local exception: com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.

Mirroring:

A mirror volume is a read-only physical copy of another volume, the source volume. You can use mirror volumes in the same cluster (local mirroring) to provide local load balancing by using mirror volumes to serve read requests for the most frequently accessed data in the cluster. You can also mirror volumes on a separate cluster (remote mirroring) for backup and disaster readiness purposes.

When you create a mirror volume, you must specify a source volume that the mirror retrieves content from. This retrieval is called the mirroring operation. Like a normal volume, a mirror volume has a configurable replication factor.

The MapR system creates a temporary snapshot of the source volume at the start of a mirroring operation. The mirroring process reads content from the snapshot into the mirror volume.The mirroring process transmits only the differences between the source volume and the mirror. The initial mirroring operation copies the entire source volume, but subsequent mirroring operations can be extremely fast.You can automate mirror synchronization by setting a schedule. You can also use the volume mirror start command to synchronize data manually.

Creating New Mirror:














New Mirror Volume in MCS:


For large amounts of data, mirroring is much better than distcp.The reasons include

a) network connections are better utilized
b) node and network failures are handled much better by mirrors
c) incremental copies are possible

With the most recent release, you can also promote a mirror to RW status. For earlier versions, you had to copy data out of the mirror (use distcp for that part!), but mirroring is enough better than distcp that even with the extra copy, you often wind up ahead of distcp even on the first mirror.

For smaller amounts of data up to say a few 10's of GB, consider using rsync over NFS.

We have a customer who was able to move a massive amount of data (think PB) in less than a day using mirrors. Distcp is a nightmare at those volumes

What can I do to backup my data on a MapR cluster? With Hadoop, we have real problems with this since copying large amounts of data to another cluster can take forever and if the data changes during the copy then distcp can crash.What can I do?
MapR supports Snapshot and Mirrors. Snapshots are in place with zero performance loss for new write. Also snapshot share data and does redirect-on-write for new data. Mirroring allows replication of data maintaining consistency across cluster
I'm looking for details on how remote mirroring is done at a low level. Does it leverage multiple nodes similar to distcp? Is it TCP/IP or something else entirely?

Mirroring in MapR is very much a parallel operation and is far better than "distcp". Mirroring moves data directly from the set of source servers to the set of destination servers. Distcp on the other hand is a "read into client memory and then write it to remote server's memory", which involves 2 hops. Secondly, the contents of the volume are mirrored consistently (even while files in the volume may be getting written into or deleted), whereas with distcp you are on your own to ensure that changes don't occur underneath distcp while it is running.

Remote mirroring is performed between MFS nodes of one cluster to remote nodes. The replication happens using TCP/IP over port 5660 between 2 clusters. For remote replication to work, you need TCP ports 5660, ZK for 5181 and CLDB 7222 ports open between 2 clusters. When a remote mirror is created on cluster, the CLDB of remote cluster checks existence of the volume and also looks up state information in remote ZK hence both of those ports are needed.

Hortonworks distcp issues:
1) Caused by:
org.apache.hadoop.tools.mapred.RetriableFileCopyCommand$CopyReadException:
 javax.net.ssl.SSLException: SSL peer shut down incorrectly
Caused by: org.apache.hadoop.tools.mapred.RetriableFileCopyCommand$CopyReadException: java.io.IOException: Got EOF but currentPos = 336175104 < filelength = 836475643
   at org.apache.hadoop.tools.mapred.RetriableFileCopyCommand.readBytes(RetriableFileCopyCommand.java:288)
   at org.apache.hadoop.tools.mapred.RetriableFileCopyCommand.copyBytes(RetriableFileCopyCommand.java:256)
    at