Hadoop Ecosystem: Zookeeper – The distributed coordination server

Apache Zookeeper Logo

Apache Zookeeper is a centralized service for distributed systems to a hierarchical key-value store, which is used to provide a distributed configuration service, synchronization service, and naming registry for large distributed systems.“ ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications. Each time they are implemented there is a lot of work that goes into fixing the bugs and race conditions that are inevitable. Because of the difficulty of implementing these kinds of services, applications initially usually skimp on them ,which make them brittle in the presence of change and difficult to manage. Even when done correctly, different implementations of these services lead to management complexity when the applications are deployed. “ [1]

At first it is hard to visualize the role of Zookeeper as a component in the Hadoop ecosystem so let’s examine a couple of the services and constructs that it provides to distributed computing applications:

  • Locks: Zookeeper provides mechanisms to create an maintain globally distributed lock mechanisms, this allows applications to maintain transaction atomicity for any kind of object by ensuring that at any point in time no two clients or transactions can hold a lock on the same resource.
  • Queues:  Zookeeper allows distributed applications to maintain regular FIFO and priority-based queues where a list of messages or objects is held by  a Zookeeper node that clients connect to to submit new queue member as well as to request  a list of the members pending processing. This allows applications to implement asynchronous processes where a unit of processing is placed on a queue and processed whenever the next worker process is available to take on the work.
  • Two-Phased Commit Coordination: Zookeeper allows applications that need to commit or abort a transaction across multiple processing nodes to coordinate the two phase commit pattern through its infrastructure. Each client will apply the transaction tentatively on the first commit phase and notify the coordination node that will then let all parties involved know whether or not the transaction was globally successful or not.
  • Barriers: Zookeeper supports the creation of synchronization points called Barriers. This is useful when multiple asynchronous processes need to converge on a common synchronization point  once all worker processes have executed their independent units of work.
  • Leader Election: Zookeeper allows distributed applications to automate leader election across a list of available nodes, this helps applications running on a cluster optimize for locality and load balancing.

As you can see Zookeeper play a  vital role as foundation service for distributed applications that need to coordinate independent, asynchronous processes across large computing nodes on a cluster environment.

References:

[1] Zookeeper Websitehttp://zookeeper.apache.org/

[2] Zookeeper Recipes, http://zookeeper.apache.org/doc/trunk/recipes.html

Google File System Design Assumptions

 

The Google File System’s conscious design tradeoffs

In today’s post I want to highlight the brilliance of the Google Research team, their ability to step back and look at old assumptions kind of reminds me of the Wright brothers realizing that lift values from the 1700’s and other widespread assumptions of the time were the main constrains holding them back from being able to come with the first airplane.

 

At Google Research something similar went on when they realized that traditional data storage and processing paradigms did not fit well with their  application’s processing workloads. Here are some of the design assumptions for Google File System straight from the published research paper with my comments:

 

  1. Failure is an expectation, not an exception

    Google realized that the traditional way to address failure on the datacenter is to increase the sophistication of the hardware platforms involved. This approach increases cost both by using highly specialized hardware and by requiring system administrators with very sophisticated skills. The main innovation here is realizing that when dealing with massive datasets (i.e. downloading a copy of the entire web) hardware failure is a fact of life rather than an exception; once this observation is incorporated into their design costs can be decreased by storing and processing data on very large clusters of commodity hardware where redundancy and replication across processing nodes and racks allows for seamless recovery from hardware failure.

  2. The system stores a modest number of large data files

    This observation is arrived at by looking at the nature of the data being processed such as HTML markup from crawling a large number of websites, this is what we would call “unstructured data” that is cleaned and serialized by the crawler before it is “batched” together into large files.  Once again, by taking a step back and looking at the problem with fresh eyes the researchers were able to realize their design did not need to optimize for the storage of billions of small files, this is a great constraint to remove from their design as we will explore when we look at the ability of the GFS master server to control and store metadata for all files in a cluster in memory, thus allowing it to make very smart load balancing, placement and replication decisions.

  3. Workloads primarily consist of large streaming reads and small random reads

    By looking at actual application workloads the researchers found that they could generally group read operations in these two categories and that sucessive read operations from the same client will often read contiguous regions of a file; also, performance minded applications will batch and sort their reads so that their progress through a dataset is one directional moving from beginning to end instead of going back and forth with random I/O operations.

  4. The workloads also have many large, sequential writes that append to data files

    Notice here how “delete” and “update” operations are extremely rare to non-existent, this frees up the system design from the onerous task of maintaining locks to ensure the atomicity of these two operations.

  5. Atomicity with minimal synchronization is essential

    The system design focuses on supporting large writes by batch processes and “append” operations by a large number of concurrent clients, freeing itself from the constraints mentioned on the previous point.

  6. High sustained bandwidth is more important than low latency

    A good observation on the fact that when dealing with these large datasets most applications are batch oriented and benefit the most of high processing throughput versus the traditional database application that places a premium in fast response times.

 

In hindsight, these observations might seem obvious, specially as they have been incorporated into the design principles that drive other products such as Apache Hadoop; but, Google’s decision to invest into a custom made file system to fit their very specific needs and the ability of the Google Research team to step back and start their design with fresh eyes have truly revolutionized our data processing forever, cheers to them!

 

Reference:

“The Google File System”; Ghemawat, Gobioff, Leung; Google Research

How to send email from a unix / linux script

I like a lot of detail about my script runs when I am doing unit testing, I will usually append status and event messages to a summarized log file that I call my email file, this is the syntax I use to send an email with this summary when my script is done running:

mail -s "My script completed" [email protected] < $my_email_summary

So the general syntax for the command is:

mail -s "subject" [email protected] < file_name

 

What is the Hive Metastore URI address? hive-site.xml? hive config resources?

Apache Hive is a data warehouse software project built on top of Apache Hadoop for providing data summarization, query and analysis.Hive gives an SQL-like interface to query data stored in various databases and file systems that integrate with Hadoop.

In configuring an Apache NiFi Data Flow (within Hortonworks Dataflow) I ran in to the need to configure the Hive Streaming component to connect to a Hive Table, this personal knowledge base article documents the the locations of the resources I needed.

What is my Hive Metastore URI?

This is located on your Hive Metastore host at port 9083 and uses the Thrift protocol, an example URI would look like this:

thrift://<host_name>:9083

 

Where is my hive-site.xml file located? What should I enter under Hive Config Resources?

When configuring Apache NiFi to connect to a Hive table using Hive Streaming you will need to enter the location of your hive-site.xml file under Hive config resources. Below you can see the location in my hadoop node, to find the location in your installation look under directory /etc/hive the script below can help you with this:

 

#find the Hive folder
cd /etc/hive
#run a search for the hive-site.xml file, starting at the current location
find . -name hive-site.xml

#in my case after examining the results from the command the file is located at:

/etc/hive/2.6.5.0-292/0/hive-site.xml


 

 

 

 

What is an Apache Spark RDD?

Image result for what is a spark rdd
Apache Spark Resilient Distributed Dataset (RDD)

Apache Spark  Resilient Distributed Datasets (RDDs) are the main vehicle used by the processing engine to represent a dataset. Given that the name itself is pretty self explanatory let’s look into each of these attributes in additional detail:

  • Distributed: This is the key attribute of RDDs, an RDD is a collection of partitions or fragments distributed across processing nodes, this allows Spark to fit and process massive data sets in memory by distributing the workload in parallel across a collection of worker nodes.
  • Resilient: The ability to recover from processing from failure, this is achieved by storing multiple copies of each fragment on multiple working nodes, if a working node goes offline that workload can be relocated to another node containing the same fragment.

 

I hope you enjoyed this introduction to Apache Spark Resilient Distributed Datasets (RDDs), stay tuned for additional coverage on RDD operations and best practices as well as for Apache Spark Data Frames.

 

Reference:
Apache Spark Programming Guide
http://spark.apache.org/docs/2.1.1/programming-guide.html

Ambari Agent Node OpenSSL / EOF / Failed to Connect Error

Apache Ambari Logo

 

I recently ran into an issue when deploying  Ambari Agent to a new host in my cluster. Here’s my personal Knowledge Base article on the issue.

Symptoms

When deploying Ambari Agent to a new node, the wizard fails. At the bottom of stderr I found the following :

 

INFO DataCleaner.py:122 - Data cleanup finished

INFO 11,947 hostname.py:67 - agent:hostname_script configuration not defined thus read hostname '<my host>' using socket.getfqdn().

INFO 11,952 PingPortListener.py:50 - Ping port listener started on port: 8670

INFO 11,954 main.py:439 - Connecting to Ambari server at https://<my host>:8440 (172.31.42.192)

INFO 955 NetUtil.py:70 - Connecting to https://<my host>:8440/ca

ERROR 11,958 NetUtil.py:96 - EOF occurred in violation of protocol (_ssl.c:579)

ERROR 11,958 NetUtil.py:97 - SSLError: Failed to connect. Please check openssl library versions.

Refer to: https://bugzilla.redhat.com/show_bug.cgi?id=1022468 for more details.

WARNING 11,958 NetUtil.py:124 - Server at https://<my host>:8440 is not reachable, sleeping for 10 seconds...


 

Root Cause

After a solving the issue I can say the issue, in my case, was that there were previously installed versions of Java that conflicted with my preferred version even after selecting my version using the alternatives command.

Working through the issue I also found a suggestion to disable certificate validation that I implemented since this is not a production cluster, I am listing it as Solution 3 here.

Solution 1 – Deploy new hosts with no previous JDK

After much tinkering with the alternatives command to repair my JDK configuration I decided that it was easier to start with a new AWS set of nodes and ensure that no JDK was installed in my image before I began my prepping of each node. If you have nodes that are having the issue after an upgrade read Solution 2.

I am including the script I used to download and configure the correct JDK pre-requisite for my version of Ambari and HDP below for your consumption:

 

#!/bin/bash
#Script Name: ignacio_install_jdk.scr
#Author: Ignacio de la Torre
#Independent Contractor Profile: https://linkedin.com/in/idelatorre
#################
##Install Oracle JDK
export hm=/home/ec2-user
cd /usr
sudo mkdir -p jdk64/jdk1.8.0_112
cd jdk64
sudo wget http://public-repo-1.hortonworks.com/ARTIFACTS/jdk-8u112-linux-x64.tar.gz
sudo gunzip jdk-8u112-linux-x64.tar.gz
sudo tar -xf jdk-8u112-linux-x64.tar

#configure paths
chmod 666 $hm/.bash_profile
echo export JAVA_HOME=/usr/jdk64/jdk1.8.0_112 >>  $hm/.bash_profile
echo export PATH=$PATH:/usr/jdk64/jdk1.8.0_112/bin >>  $hm/.bash_profile
chmod 640 /root/.bash_profile

#configure java version using alternatives
sudo alternatives --install /usr/bin/java java /usr/jdk64/jdk1.8.0_112/bin/java 1

#if the link to /usr/bin/java is broken (file displays red), rebuild using:
#ln -s -f /usr/jdk64/jdk1.8.0_112/bin/java /usr/bin/java

 

Solution 2 – Install new JDK with utilities

I realize scrapping nodes is not an option, especially for those experiencing the issue after an install. Because of a tight deadline I did not try the solution displayed here but it addresses what I think the issue is.

 

Scenario: On my original nodes that had a previous non-compatible version of JDK installed I issued the following command to select my new JDK as preferred:

#Select Oracle's JDK 1.8 as preferred after install (see install script on Solution 1)
sudo alternatives --install /usr/bin/java java /usr/jdk64/jdk1.8.0_112/bin/java 1

Issue: After selecting my new JDK I was able to see it listed in the configurations with the command below, BUT, all of the Java utilities such as jar, javadoc, etc. are pointing to null in my preferred JDK.

#list Java configured alternatives
sudo alternatives --display java

Proposed solution: Use the list of tools from the non-compatible version of JDK to install your new JDK with all the java utilities as slaves, please note that you cannot add slaves to an installed JDK, you need to issue the install command with all the utilities all at once. An example adding only JAR is displayed below:

#Select Oracle’s JDK 1.8 as preferred after install with a slave configuration for the JAR and javadoc utilities:

sudo alternatives --install "/usr/bin/java" "java" "/usr/jdk64/jdk1.8.0_112/bin/java" 1 \ 
--slave "/usr/bin/jar" "jar" "usr/jdk64/jdk1.8.0_112/bin/jar" \
--slave "/usr/bin/javadoc" "javadoc" "usr/jdk64/jdk1.8.0_112/bin/javadoc"

 

Solution 3 – Disable certificate validation

Like I said before, my cluster is not a production one and will not contain sensitive or confidential data so I opted to implement the suggestion to disable certificate validation as part of my troubleshooting. To do this you have to set verify=diable by editing the  /etc/python/cert-verification.cfg  file. Do this at your own risk.

 

 

The Google File System’s conscious design tradeoffs

The Google File System’s conscious design tradeoffs

 

 

This is my first post on the Google File System where I will very briefly touch base on a very specific feature-set that is driven by conscious design tradeoffs that have made GFS and derived systems so successful.

  1.  Highly Redundant Data vs. Highly Available Hardware

    When working with Petabytes of data hardware failure is a norm more than an exception, expensive highly redundant hardware is replaced with commodity components that allow the file system to store multiple copies of data across storage nodes and switches at a reasonable cost.

  2.  Store a small number of large files vs. millions of small individual documents

    With the need to store hundreds of terabytes composed of billions of small objects (i.e. e-Mail Messages, Webpages), GFS attempts to simplify file system design by serializing these small individual objects to be grouped together into larger files. Having a small number of large files allows GFS to keep all file and namespace metadata in memory on the GFS master which in turn allows the master to leverage this global visibility to make smarter load balancing and redundancy decisions.

  3.  Generally Immutable data

    Once a serialized object or file record is written to disk it will never be updated again, as Google states on their research paper random writes are practically non-existent. This is driven by application requirements where data is generally written once and then consumed by applications over time without alteration. Google describes the application data as mutating by either inserting new records or appending on the last “chunk” or block of a file, applications are encouraged to constrain their update strategies to these two operations.

On my next series of post I will analyze other architecture and performance characteristics that make the Google File System brilliantly innovative, stay tuned!

 

Reference:

“The Google File System”; Ghemawat, Gobioff, Leung; Google Research

What is Apache Spark Streaming?

 

Continuing with the rapid innovation of the Apache Spark code base the Spark Streaming API allows enterprises to leverage the full power of the Spark architecture to process real-time workloads.

Built upon the foundation of Core Spark, Spark Streams is able to consume data from common real time pipelines such as Apache Kafka, Apache Flume, Kinesis, TCP Sockets and run complex algorithms (MLib Predictive Models, GraphX Algorithms). Results can be then displayed in real time dashboards or be stored in HDFS.

Apache Spark Streaming Architecture
Apache Spark Streaming Architecture: Tutorial, interview questions, interview preparation, big data, Apache Kafka, real time
Reference:
  • Apache Spark Streaming Programming Guide:
    https://spark.apache.org/docs/2.1.0/streaming-programming-guide.html

What to enter under hadoop config resources? where to find core-site.xml and hdfs-xml.xml?

Apache Hive is a data warehouse software project built on top of Apache Hadoop for providing data summarization, query and analysis.Hive gives an SQL-like interface to query data stored in various databases and file systems that integrate with Hadoop.

In configuring an Apache NiFi Data Flow (within Hortonworks Dataflow) I ran in to the need to configure the the PutHDFS component to connect to HDFS, this personal knowledge base article documents the the locations of the resources I needed.

Where is are my core-site.xml and hdfs-site.xml files located? What should I enter under Hadoop Config Resources?

When configuring Apache NiFi to connect to HDFS using the  PutHDFS component you will need to enter the location of your core-site.xml and hdfs-site.xml files under Hadoop config resources. Below you can see the location in my hadoop node, to find the location in your installation look under directory /etc/hadoop

The script below can help you with this:

 

#find the hadoop folder
cd /etc/hadoop
#run a search for the core-site.xml file, starting at the current location
find . -name core-site.xml

#in my case after examining the results from the command the file is located at:
/etc/hadoop/2.6.5.0-292/0/core-site.xml

#I then went to the directory and listed its contents to find the location of my HDFS config file:
/etc/hadoop/2.6.5.0-292/0/hdfs-site.xml