The Google File System’s conscious design tradeoffs

The Google File System’s conscious design tradeoffs

 

 

This is my first post on the Google File System where I will very briefly touch base on a very specific feature-set that is driven by conscious design tradeoffs that have made GFS and derived systems so successful.

  1.  Highly Redundant Data vs. Highly Available Hardware

    When working with Petabytes of data hardware failure is a norm more than an exception, expensive highly redundant hardware is replaced with commodity components that allow the file system to store multiple copies of data across storage nodes and switches at a reasonable cost.

  2.  Store a small number of large files vs. millions of small individual documents

    With the need to store hundreds of terabytes composed of billions of small objects (i.e. e-Mail Messages, Webpages), GFS attempts to simplify file system design by serializing these small individual objects to be grouped together into larger files. Having a small number of large files allows GFS to keep all file and namespace metadata in memory on the GFS master which in turn allows the master to leverage this global visibility to make smarter load balancing and redundancy decisions.

  3.  Generally Immutable data

    Once a serialized object or file record is written to disk it will never be updated again, as Google states on their research paper random writes are practically non-existent. This is driven by application requirements where data is generally written once and then consumed by applications over time without alteration. Google describes the application data as mutating by either inserting new records or appending on the last “chunk” or block of a file, applications are encouraged to constrain their update strategies to these two operations.

On my next series of post I will analyze other architecture and performance characteristics that make the Google File System brilliantly innovative, stay tuned!

 

Reference:

“The Google File System”; Ghemawat, Gobioff, Leung; Google Research

What is Apache Spark Streaming?

 

Continuing with the rapid innovation of the Apache Spark code base the Spark Streaming API allows enterprises to leverage the full power of the Spark architecture to process real-time workloads.

Built upon the foundation of Core Spark, Spark Streams is able to consume data from common real time pipelines such as Apache Kafka, Apache Flume, Kinesis, TCP Sockets and run complex algorithms (MLib Predictive Models, GraphX Algorithms). Results can be then displayed in real time dashboards or be stored in HDFS.

Apache Spark Streaming Architecture
Apache Spark Streaming Architecture: Tutorial, interview questions, interview preparation, big data, Apache Kafka, real time
Reference:
  • Apache Spark Streaming Programming Guide:
    https://spark.apache.org/docs/2.1.0/streaming-programming-guide.html

What to enter under hadoop config resources? where to find core-site.xml and hdfs-xml.xml?

Apache Hive is a data warehouse software project built on top of Apache Hadoop for providing data summarization, query and analysis.Hive gives an SQL-like interface to query data stored in various databases and file systems that integrate with Hadoop.

In configuring an Apache NiFi Data Flow (within Hortonworks Dataflow) I ran in to the need to configure the the PutHDFS component to connect to HDFS, this personal knowledge base article documents the the locations of the resources I needed.

Where is are my core-site.xml and hdfs-site.xml files located? What should I enter under Hadoop Config Resources?

When configuring Apache NiFi to connect to HDFS using the  PutHDFS component you will need to enter the location of your core-site.xml and hdfs-site.xml files under Hadoop config resources. Below you can see the location in my hadoop node, to find the location in your installation look under directory /etc/hadoop

The script below can help you with this:

 

#find the hadoop folder
cd /etc/hadoop
#run a search for the core-site.xml file, starting at the current location
find . -name core-site.xml

#in my case after examining the results from the command the file is located at:
/etc/hadoop/2.6.5.0-292/0/core-site.xml

#I then went to the directory and listed its contents to find the location of my HDFS config file:
/etc/hadoop/2.6.5.0-292/0/hdfs-site.xml

 

 

 

Three key things to remember about Apache Spark RDD Operations

There are three key concepts that are essential for the beginner Apache Spark Developer, we will cover them here. If you want to receive a condensed summary of the most relevant news in Big Data, Data Science and Advanced Analytics do not forget to subscribe to our newsletter, we send it once a month so you get the very best, only once a month.

All right, getting back to our topic, the three key things to remember when you begin working with Spark RDDs are:

  • Creating RDDs does not need to be a hard, involved process: For your learning environment you can easily create an RDD from a collection or by loading a CSV file. This saves you the step of transferring files to Hadoop’s HDFS file system and enhances your productivity in your sandbox environment.
  • Remember to persist your RDDs: This has to do with the fact that Spark RDDs are lazy, transformations are not executed as you define them, only once you ask Spark for a result.  Experienced data scientists will define a base RDD and then create different sub-sets through transformations, every time you define one of these subsets by defining a new RDD remember to persist it, otherwise this will execute again and again every time you ask for the results of a downstream RDD.
  • Remember that RDDs in Spark are immutable: A big reason why the previous point is difficult to digest for new Spark developers is that we are not accustomed to the Functional Programming paradigm that underlies Spark. In our regular programming languages we create a variable that references a specific space in memory and then we are able to assign distinct values to this variable in different parts of our program. In Functional programming each object or variable is immutable, every time you create a new RDD based on a the results of an upstream RDD Apache will execute  all of the logic that led to the creation of the source RDD.

I hope you enjoyed this introduction to Apache Spark Resilient Distributed Dataset (RDD) Operations, stay tuned for additional coverage on best practices as well as for Apache Spark Data Frames.

 

 

Hadoop Ecosystem: Zookeeper – The distributed coordination server

Apache Zookeeper Logo

Apache Zookeeper is a centralized service for distributed systems to a hierarchical key-value store, which is used to provide a distributed configuration service, synchronization service, and naming registry for large distributed systems.“ ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications. Each time they are implemented there is a lot of work that goes into fixing the bugs and race conditions that are inevitable. Because of the difficulty of implementing these kinds of services, applications initially usually skimp on them ,which make them brittle in the presence of change and difficult to manage. Even when done correctly, different implementations of these services lead to management complexity when the applications are deployed. “ [1]

At first it is hard to visualize the role of Zookeeper as a component in the Hadoop ecosystem so let’s examine a couple of the services and constructs that it provides to distributed computing applications:

  • Locks: Zookeeper provides mechanisms to create an maintain globally distributed lock mechanisms, this allows applications to maintain transaction atomicity for any kind of object by ensuring that at any point in time no two clients or transactions can hold a lock on the same resource.
  • Queues:  Zookeeper allows distributed applications to maintain regular FIFO and priority-based queues where a list of messages or objects is held by  a Zookeeper node that clients connect to to submit new queue member as well as to request  a list of the members pending processing. This allows applications to implement asynchronous processes where a unit of processing is placed on a queue and processed whenever the next worker process is available to take on the work.
  • Two-Phased Commit Coordination: Zookeeper allows applications that need to commit or abort a transaction across multiple processing nodes to coordinate the two phase commit pattern through its infrastructure. Each client will apply the transaction tentatively on the first commit phase and notify the coordination node that will then let all parties involved know whether or not the transaction was globally successful or not.
  • Barriers: Zookeeper supports the creation of synchronization points called Barriers. This is useful when multiple asynchronous processes need to converge on a common synchronization point  once all worker processes have executed their independent units of work.
  • Leader Election: Zookeeper allows distributed applications to automate leader election across a list of available nodes, this helps applications running on a cluster optimize for locality and load balancing.

As you can see Zookeeper play a  vital role as foundation service for distributed applications that need to coordinate independent, asynchronous processes across large computing nodes on a cluster environment.

References:

[1] Zookeeper Websitehttp://zookeeper.apache.org/

[2] Zookeeper Recipes, http://zookeeper.apache.org/doc/trunk/recipes.html

Error deploying Hive using the Ambari agent (MySQL JAVA Connector JAR file missing)

Apache Hive is a data warehouse software project built on top of Apache Hadoop for providing data summarization, query and analysis.Hive gives an SQL-like interface to query data stored in various databases and file systems that integrate with Hadoop.

 

I recently ran into an issue when deploying Hive using Ambari. Here’s what my personal Knowledge Base article on the issue.

Symptoms

When deploying Hive and Hive Metastore to a new node, the “Hive Metastore Start” task fails. At the bottom of stderr I found the following message:

 

File "/usr/lib/ambari-agent/lib/resource_management/core/source.py", line 52, in __call__

    return self.get_content()

  File "/usr/lib/ambari-agent/lib/resource_management/core/source.py", line 197, in get_content

    raise Fail("Failed to download file from {0} due to HTTP error: {1}".format(self.url, str(ex)))

resource_management.core.exceptions.Fail: Failed to download file from http://<my_node_host>:8080/resources/ mysql-connector-java.jar due to HTTP error: HTTP Error 404: Not Found

 

Root Cause

I did deploy the Amari, HDP and HDP Utils repositories on a local mirror host. It seems as if the Ambari Agent assumes that the MySQL JAVA Connector JAR file would also be hosted by my local mirror.

This could also happen if the repositories configured during deployment no longer host the MySQL JAVA Connector JAR file.

Solution

Once I figured the root cause out it was easy to solve the issue by using YUM to manually install the MySQL JAVA Connector JAR file:

 

#On the Linux/Unix host(s) experiencing the install error, issue the following command:

sudo yum -y -q install mysql-connector-java

 

Hadoop Ecosystem: Hive – the Data Warehouse and SQL interface

Apache Hive

Apache Hive

The Apache Hive data warehouse software facilitates querying and managing large datasets residing in distributed storage. Hive provides a mechanism to project structure onto this data and query the data using a SQL-like language called HiveQL. At the same time this language also allows traditional map/reduce programmers to plug in their custom mappers and reducers when it is inconvenient or inefficient to express this logic in HiveQL.

 

Hive is both a metadata layer on top of HDFS and a SQL interpreter. This allows companies to store structured or semi-structured data as files on Hadoop without a large initial data modeling effort, once business requirements align with the need to extract new insights from the stored data a development team can leverage the “schema on read” paradigm to create metadata about these files.

 

Having a SQL interpreter allows business analysts and power users to have access to terabytes or petabytes of information through a familiar query language. This is a dramatic departure from MapReduce where a very specialized skill set would be required to write multiple Map and Reduce functions in order to achieve the same results.

Hortonworks HDP / Ambari Install – Configure Open File Descriptors

Apache Ambari Logo

This is a very simple script I run in the target hosts that I am prepping for Apache Ambari Server or Agent and Hadoop deployment. The main thing it achieves is displaying the current settings for Linux Open File Descriptors, it then allows the user to specify whether they are below Ambari’s and/or Hadoop’s minimum system requirements, if that is the case the script will update the settings for you:

 

#!/bin/bash
#Script Name: ignacio_ofd.scr
#Author: Ignacio de la Torre
#Independent Contractor Profile: http://linkedin.com/in/idelatorre
#################
#Configure Maximum Open File Descriptors
echo ">>> Configure Maximum Open File Descriptors..."
echo "! ! ! Pay attention to the output below, if any of the two numbers displayed is less than 10,000, enter y at the prompt:"

ulimit -Sn
ulimit -Hn
echo "Enter y if the limits are below 10,000:"
read var_yesno
if [ "$var_yesno" = "y" ]
then
     echo "Updating /etc/security/limits.conf"
    #this updates the limits globally
    sudo chmod 666 /etc/security/limits.conf
    sudo echo "ubuntu    hard    nofile    10000" >> /etc/security/limits.conf
    sudo echo "ubuntu    soft    nofile    10000" >> /etc/security/limits.conf
    sudo echo "root    hard    nofile    100000" >> /etc/security/limits.conf
    sudo echo "root    soft    nofile    100000" >> /etc/security/limits.conf
    sudo chmod 644 /etc/security/limits.conf
else
    echo "ulimit not updated, not necessary"
fi

 

 

 

 

Hortonworks HDP / Ambari Install – Configure Network Time Protocol (NTP)

Apache Ambari aims at making Hadoop management simpler by developing software for provisioning, managing, and monitoring Apache Hadoop clusters. Ambari provides an intuitive, easy-to-use Hadoop management web UI backed by its RESTful APIs.

This is a small script I developed to configure NTP on my hosts before deploying the Ambari server or agent and Hadoop:

 

 

 

 

 

#!/bin/bash
#Script Name: ignacio_config_ntp.scr
#Author: Ignacio de la Torre
#Independent Contractor Profile: https://linkedin.com/in/idelatorre
#################
#configure ntp to auto-start at boot time
#Install NTP
sudo yum install -y -q ntp

#Disable autostart
sudo systemctl disable ntpd
sudo timedatectl set-ntp no

#configure NTP
sudo ntpdate pool.ntp.org
sudo timedatectl set-timezone America/Los_Angeles

#re-enable NTP autostart
sudo systemctl enable ntpd
sudo timedatectl set-ntp on

Google File System Design Assumptions

 

The Google File System’s conscious design tradeoffs

In today’s post I want to highlight the brilliance of the Google Research team, their ability to step back and look at old assumptions kind of reminds me of the Wright brothers realizing that lift values from the 1700’s and other widespread assumptions of the time were the main constrains holding them back from being able to come with the first airplane.

 

At Google Research something similar went on when they realized that traditional data storage and processing paradigms did not fit well with their  application’s processing workloads. Here are some of the design assumptions for Google File System straight from the published research paper with my comments:

 

  1. Failure is an expectation, not an exception

    Google realized that the traditional way to address failure on the datacenter is to increase the sophistication of the hardware platforms involved. This approach increases cost both by using highly specialized hardware and by requiring system administrators with very sophisticated skills. The main innovation here is realizing that when dealing with massive datasets (i.e. downloading a copy of the entire web) hardware failure is a fact of life rather than an exception; once this observation is incorporated into their design costs can be decreased by storing and processing data on very large clusters of commodity hardware where redundancy and replication across processing nodes and racks allows for seamless recovery from hardware failure.

  2. The system stores a modest number of large data files

    This observation is arrived at by looking at the nature of the data being processed such as HTML markup from crawling a large number of websites, this is what we would call “unstructured data” that is cleaned and serialized by the crawler before it is “batched” together into large files.  Once again, by taking a step back and looking at the problem with fresh eyes the researchers were able to realize their design did not need to optimize for the storage of billions of small files, this is a great constraint to remove from their design as we will explore when we look at the ability of the GFS master server to control and store metadata for all files in a cluster in memory, thus allowing it to make very smart load balancing, placement and replication decisions.

  3. Workloads primarily consist of large streaming reads and small random reads

    By looking at actual application workloads the researchers found that they could generally group read operations in these two categories and that sucessive read operations from the same client will often read contiguous regions of a file; also, performance minded applications will batch and sort their reads so that their progress through a dataset is one directional moving from beginning to end instead of going back and forth with random I/O operations.

  4. The workloads also have many large, sequential writes that append to data files

    Notice here how “delete” and “update” operations are extremely rare to non-existent, this frees up the system design from the onerous task of maintaining locks to ensure the atomicity of these two operations.

  5. Atomicity with minimal synchronization is essential

    The system design focuses on supporting large writes by batch processes and “append” operations by a large number of concurrent clients, freeing itself from the constraints mentioned on the previous point.

  6. High sustained bandwidth is more important than low latency

    A good observation on the fact that when dealing with these large datasets most applications are batch oriented and benefit the most of high processing throughput versus the traditional database application that places a premium in fast response times.

 

In hindsight, these observations might seem obvious, specially as they have been incorporated into the design principles that drive other products such as Apache Hadoop; but, Google’s decision to invest into a custom made file system to fit their very specific needs and the ability of the Google Research team to step back and start their design with fresh eyes have truly revolutionized our data processing forever, cheers to them!

 

Reference:

“The Google File System”; Ghemawat, Gobioff, Leung; Google Research