Three key things to remember about Apache Spark RDD Operations

There are three key concepts that are essential for the beginner Apache Spark Developer, we will cover them here. If you want to receive a condensed summary of the most relevant news in Big Data, Data Science and Advanced Analytics do not forget to subscribe to our newsletter, we send it once a month so you get the very best, only once a month.

All right, getting back to our topic, the three key things to remember when you begin working with Spark RDDs are:

  • Creating RDDs does not need to be a hard, involved process: For your learning environment you can easily create an RDD from a collection or by loading a CSV file. This saves you the step of transferring files to Hadoop’s HDFS file system and enhances your productivity in your sandbox environment.
  • Remember to persist your RDDs: This has to do with the fact that Spark RDDs are lazy, transformations are not executed as you define them, only once you ask Spark for a result.  Experienced data scientists will define a base RDD and then create different sub-sets through transformations, every time you define one of these subsets by defining a new RDD remember to persist it, otherwise this will execute again and again every time you ask for the results of a downstream RDD.
  • Remember that RDDs in Spark are immutable: A big reason why the previous point is difficult to digest for new Spark developers is that we are not accustomed to the Functional Programming paradigm that underlies Spark. In our regular programming languages we create a variable that references a specific space in memory and then we are able to assign distinct values to this variable in different parts of our program. In Functional programming each object or variable is immutable, every time you create a new RDD based on a the results of an upstream RDD Apache will execute  all of the logic that led to the creation of the source RDD.

I hope you enjoyed this introduction to Apache Spark Resilient Distributed Dataset (RDD) Operations, stay tuned for additional coverage on best practices as well as for Apache Spark Data Frames.

 

 

Hadoop Ecosystem: Zookeeper – The distributed coordination server

Apache Zookeeper Logo

Apache Zookeeper is a centralized service for distributed systems to a hierarchical key-value store, which is used to provide a distributed configuration service, synchronization service, and naming registry for large distributed systems.“ ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications. Each time they are implemented there is a lot of work that goes into fixing the bugs and race conditions that are inevitable. Because of the difficulty of implementing these kinds of services, applications initially usually skimp on them ,which make them brittle in the presence of change and difficult to manage. Even when done correctly, different implementations of these services lead to management complexity when the applications are deployed. “ [1]

At first it is hard to visualize the role of Zookeeper as a component in the Hadoop ecosystem so let’s examine a couple of the services and constructs that it provides to distributed computing applications:

  • Locks: Zookeeper provides mechanisms to create an maintain globally distributed lock mechanisms, this allows applications to maintain transaction atomicity for any kind of object by ensuring that at any point in time no two clients or transactions can hold a lock on the same resource.
  • Queues:  Zookeeper allows distributed applications to maintain regular FIFO and priority-based queues where a list of messages or objects is held by  a Zookeeper node that clients connect to to submit new queue member as well as to request  a list of the members pending processing. This allows applications to implement asynchronous processes where a unit of processing is placed on a queue and processed whenever the next worker process is available to take on the work.
  • Two-Phased Commit Coordination: Zookeeper allows applications that need to commit or abort a transaction across multiple processing nodes to coordinate the two phase commit pattern through its infrastructure. Each client will apply the transaction tentatively on the first commit phase and notify the coordination node that will then let all parties involved know whether or not the transaction was globally successful or not.
  • Barriers: Zookeeper supports the creation of synchronization points called Barriers. This is useful when multiple asynchronous processes need to converge on a common synchronization point  once all worker processes have executed their independent units of work.
  • Leader Election: Zookeeper allows distributed applications to automate leader election across a list of available nodes, this helps applications running on a cluster optimize for locality and load balancing.

As you can see Zookeeper play a  vital role as foundation service for distributed applications that need to coordinate independent, asynchronous processes across large computing nodes on a cluster environment.

References:

[1] Zookeeper Websitehttp://zookeeper.apache.org/

[2] Zookeeper Recipes, http://zookeeper.apache.org/doc/trunk/recipes.html

Error deploying Hive using the Ambari agent (MySQL JAVA Connector JAR file missing)

Apache Hive is a data warehouse software project built on top of Apache Hadoop for providing data summarization, query and analysis.Hive gives an SQL-like interface to query data stored in various databases and file systems that integrate with Hadoop.

 

I recently ran into an issue when deploying Hive using Ambari. Here’s what my personal Knowledge Base article on the issue.

Symptoms

When deploying Hive and Hive Metastore to a new node, the “Hive Metastore Start” task fails. At the bottom of stderr I found the following message:

 

File "/usr/lib/ambari-agent/lib/resource_management/core/source.py", line 52, in __call__

    return self.get_content()

  File "/usr/lib/ambari-agent/lib/resource_management/core/source.py", line 197, in get_content

    raise Fail("Failed to download file from {0} due to HTTP error: {1}".format(self.url, str(ex)))

resource_management.core.exceptions.Fail: Failed to download file from http://<my_node_host>:8080/resources/ mysql-connector-java.jar due to HTTP error: HTTP Error 404: Not Found

 

Root Cause

I did deploy the Amari, HDP and HDP Utils repositories on a local mirror host. It seems as if the Ambari Agent assumes that the MySQL JAVA Connector JAR file would also be hosted by my local mirror.

This could also happen if the repositories configured during deployment no longer host the MySQL JAVA Connector JAR file.

Solution

Once I figured the root cause out it was easy to solve the issue by using YUM to manually install the MySQL JAVA Connector JAR file:

 

#On the Linux/Unix host(s) experiencing the install error, issue the following command:

sudo yum -y -q install mysql-connector-java

 

Hadoop Ecosystem: Hive – the Data Warehouse and SQL interface

Apache Hive

Apache Hive

The Apache Hive data warehouse software facilitates querying and managing large datasets residing in distributed storage. Hive provides a mechanism to project structure onto this data and query the data using a SQL-like language called HiveQL. At the same time this language also allows traditional map/reduce programmers to plug in their custom mappers and reducers when it is inconvenient or inefficient to express this logic in HiveQL.

 

Hive is both a metadata layer on top of HDFS and a SQL interpreter. This allows companies to store structured or semi-structured data as files on Hadoop without a large initial data modeling effort, once business requirements align with the need to extract new insights from the stored data a development team can leverage the “schema on read” paradigm to create metadata about these files.

 

Having a SQL interpreter allows business analysts and power users to have access to terabytes or petabytes of information through a familiar query language. This is a dramatic departure from MapReduce where a very specialized skill set would be required to write multiple Map and Reduce functions in order to achieve the same results.

Hortonworks HDP / Ambari Install – Configure Open File Descriptors

Apache Ambari Logo

This is a very simple script I run in the target hosts that I am prepping for Apache Ambari Server or Agent and Hadoop deployment. The main thing it achieves is displaying the current settings for Linux Open File Descriptors, it then allows the user to specify whether they are below Ambari’s and/or Hadoop’s minimum system requirements, if that is the case the script will update the settings for you:

 

#!/bin/bash
#Script Name: ignacio_ofd.scr
#Author: Ignacio de la Torre
#Independent Contractor Profile: http://linkedin.com/in/idelatorre
#################
#Configure Maximum Open File Descriptors
echo ">>> Configure Maximum Open File Descriptors..."
echo "! ! ! Pay attention to the output below, if any of the two numbers displayed is less than 10,000, enter y at the prompt:"

ulimit -Sn
ulimit -Hn
echo "Enter y if the limits are below 10,000:"
read var_yesno
if [ "$var_yesno" = "y" ]
then
     echo "Updating /etc/security/limits.conf"
    #this updates the limits globally
    sudo chmod 666 /etc/security/limits.conf
    sudo echo "ubuntu    hard    nofile    10000" >> /etc/security/limits.conf
    sudo echo "ubuntu    soft    nofile    10000" >> /etc/security/limits.conf
    sudo echo "root    hard    nofile    100000" >> /etc/security/limits.conf
    sudo echo "root    soft    nofile    100000" >> /etc/security/limits.conf
    sudo chmod 644 /etc/security/limits.conf
else
    echo "ulimit not updated, not necessary"
fi

 

 

 

 

Hortonworks HDP / Ambari Install – Configure Network Time Protocol (NTP)

Apache Ambari aims at making Hadoop management simpler by developing software for provisioning, managing, and monitoring Apache Hadoop clusters. Ambari provides an intuitive, easy-to-use Hadoop management web UI backed by its RESTful APIs.

This is a small script I developed to configure NTP on my hosts before deploying the Ambari server or agent and Hadoop:

 

 

 

 

 

#!/bin/bash
#Script Name: ignacio_config_ntp.scr
#Author: Ignacio de la Torre
#Independent Contractor Profile: https://linkedin.com/in/idelatorre
#################
#configure ntp to auto-start at boot time
#Install NTP
sudo yum install -y -q ntp

#Disable autostart
sudo systemctl disable ntpd
sudo timedatectl set-ntp no

#configure NTP
sudo ntpdate pool.ntp.org
sudo timedatectl set-timezone America/Los_Angeles

#re-enable NTP autostart
sudo systemctl enable ntpd
sudo timedatectl set-ntp on

Google File System Design Assumptions

 

The Google File System’s conscious design tradeoffs

In today’s post I want to highlight the brilliance of the Google Research team, their ability to step back and look at old assumptions kind of reminds me of the Wright brothers realizing that lift values from the 1700’s and other widespread assumptions of the time were the main constrains holding them back from being able to come with the first airplane.

 

At Google Research something similar went on when they realized that traditional data storage and processing paradigms did not fit well with their  application’s processing workloads. Here are some of the design assumptions for Google File System straight from the published research paper with my comments:

 

  1. Failure is an expectation, not an exception

    Google realized that the traditional way to address failure on the datacenter is to increase the sophistication of the hardware platforms involved. This approach increases cost both by using highly specialized hardware and by requiring system administrators with very sophisticated skills. The main innovation here is realizing that when dealing with massive datasets (i.e. downloading a copy of the entire web) hardware failure is a fact of life rather than an exception; once this observation is incorporated into their design costs can be decreased by storing and processing data on very large clusters of commodity hardware where redundancy and replication across processing nodes and racks allows for seamless recovery from hardware failure.

  2. The system stores a modest number of large data files

    This observation is arrived at by looking at the nature of the data being processed such as HTML markup from crawling a large number of websites, this is what we would call “unstructured data” that is cleaned and serialized by the crawler before it is “batched” together into large files.  Once again, by taking a step back and looking at the problem with fresh eyes the researchers were able to realize their design did not need to optimize for the storage of billions of small files, this is a great constraint to remove from their design as we will explore when we look at the ability of the GFS master server to control and store metadata for all files in a cluster in memory, thus allowing it to make very smart load balancing, placement and replication decisions.

  3. Workloads primarily consist of large streaming reads and small random reads

    By looking at actual application workloads the researchers found that they could generally group read operations in these two categories and that sucessive read operations from the same client will often read contiguous regions of a file; also, performance minded applications will batch and sort their reads so that their progress through a dataset is one directional moving from beginning to end instead of going back and forth with random I/O operations.

  4. The workloads also have many large, sequential writes that append to data files

    Notice here how “delete” and “update” operations are extremely rare to non-existent, this frees up the system design from the onerous task of maintaining locks to ensure the atomicity of these two operations.

  5. Atomicity with minimal synchronization is essential

    The system design focuses on supporting large writes by batch processes and “append” operations by a large number of concurrent clients, freeing itself from the constraints mentioned on the previous point.

  6. High sustained bandwidth is more important than low latency

    A good observation on the fact that when dealing with these large datasets most applications are batch oriented and benefit the most of high processing throughput versus the traditional database application that places a premium in fast response times.

 

In hindsight, these observations might seem obvious, specially as they have been incorporated into the design principles that drive other products such as Apache Hadoop; but, Google’s decision to invest into a custom made file system to fit their very specific needs and the ability of the Google Research team to step back and start their design with fresh eyes have truly revolutionized our data processing forever, cheers to them!

 

Reference:

“The Google File System”; Ghemawat, Gobioff, Leung; Google Research

Where to download older versions of Java?

I have found myself asking where can I download old versions of Java several times lately. They are generally found on Oracle’s website on a version archive page. To help with direct acess to versions here’s a list with a few versions:

 

Version 64-bit JDK 64-bit JRE 32-bit JDK 32-bit JRE
8u25 (1.8) JDK JRE JDK JRE
7u72 (1.7) JDK JRE JDK JRE
6u45 (1.6) JDK JRE JDK JRE
5.0u22 (1.5) JDK JRE JDK JRE

Issue/Error with ODI Studio right click

As I work with the Oracle Business Intelligence Applications (OBIA) repository in ODI studio I have recently noticed I am no longer able to right click on objects. I have found two solutions, the first one is a work-around:

 

Work Around:

Let’s assume you want to right click on a particular folder or scenario, you notice as you do so the context menu does not come up, go ahead and do the following:

  1. Select the object with a left click
  2. Move your mouse pointer outside the object’s boundary, I prefer a little bit to the right
  3. Right click, the context menu should come up now

This work around works if you are restricted on changing your installation’s settings or using a hosted platform such as Citrix

 

Solution:

In cases where you have access to install software your system then you should look into the compatibility matrix for ODI Studio and the version of Java you are working with. In my case I noticed the hosting provider for my environment has setup JDK 1.7  64-bit, I noticed for some versions of ODI JDK 1.6 was required so I downloaded both 32 and 64 bit versions and pointed my odi.conf file to them version. The 64 bit version did solve my issue, which is great since I can allocate more memory to the client under this bit version.

 

Related:

ODI Tip: How to make sure a “Select distinct” is issued and an ODI interface returns a unique dataset with no duplicates

PROBLEM

 

As a developer I do have a need to make sure that the subset of columns I am mapping through from source to target on my ODI interface is unique, in other words, I want ODI to include a DISTINCT clause on the SELECT statement that will be issued on the source database.

 

SOLUTION

  • Open my interface on the ODI Interface designer
  • Click on the Flow tab on the bottom
  • Click on the Target object
  • On the Property Inspector, click on the “Distinct Rows” checkbox

    image