Wednesday, January 26, 2011

Distributing Hadoop

As mentioned in the previous article Hadoop Basics the value of hadoop is in running it distributed in many machines.

In this article I will introduce the how-to configure Hadoop for distributed processing.
I’ll show how to do it with just to machines, but it will be the same for more as one of the main values of hadoop is the ability to scale easily.

1. Ok, so we download hadoop 0.21.0 from here http://mirror.lividpenguin.com/pub/apache//hadoop/core/hadoop-0.21.0/hadoop-0.21.0.tar.gz
in both machines. uncompress the file.

2. We have two inndependent Hadoops right now, but we want them to run in cluster. So we have to make some configuration.
Hadoop distributed works with 5 different daemons that communicate with each other. The daemons are:

NameNode: Is the main controller of the HDFS, it takes care of how the files are broken into blocks, which nodes contain each block and the general tracking of the distributed filesystem.

DataNode: This daemon serves the HDFS requirements of individual slave nodes communicating and coordinating with the NameNode.

Secondary NameNode: Takes snapshots of the NameNode for possible recoveries.

JobTracker: Is in charge of coordinating the task submissions to different nodes.

TaskTracker: Existent in each processing node, they are in charge of executing the tasks submited by the JobTracker, communicating with it constantly.

All communication between the hadoop is done through ssh. We will designate a Master Node (which will contain the NameNode and JobTracker) and two slave nodes. The master node must be able to communicate with the slave nodes through ssh using the same username. (I’m using my username cscarioni communicating without passphrase using private/public key authentication)

So as we are using two machines our architecture will be like this:
Machine 1 (Master) Machine 2 (Slave)
NameNode
JobTracker
Secondary NameNode
TaskTracker
DataNode

TaskTracker
DataNode


We go to our Master installation of hadoop, and enter the conf directory.

In the core-site.xml we specify the NameNode information. we put the following.

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://master-hadoop:9000</value>
</property>
</configuration>


In the mapred-site.xml we specify where the job tracker daemon is:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>mapred.job.tracker</name>
<value>master-hadoop:9001</value>
</property>
</configuration>
In the hdfs-site.xml we specify the replication of the cluster. In our case 2:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
</configuration>

The masters and slaves files as they name says contains the names of the masters and slaves nodes. We have to modify them to include our master and slave nodes. (I defined in the hosts file of both machines the following host names.)

So in the masters we put

hadoop-master


And in the slaves we put

master-hadoop
carlo-netbook

we change now the hadoop-env.sh, uncommenting the JAVA_HOME line and point it to our JAVA_HOME.


Ok, these are all the files we need, we now distribute (copy) these files to both machines.


We go now to the bin node on the master node and execute ./hadoop namenode -format, to format the hdfs.

We execute now in the same directory: ./start-all.sh.

That’s it, we ran Hadoop. We now need to put some files in the HDFS and submit a map reduce task to it.

For this example i’ll use a custom made file that in each line has the word God or the Word Devil. I created the file with the following Groovy script

def a  = new File("/tmp/biblia.txt")
random = new Random()
a.withWriter{
    for (i in (0..5000000)){
        if(random.nextInt(2)){
            it << "GOD\n"
            }else{
            it << "Devil\n"
        }
    }
}









from the master’s hadoop bin directory, copy the file from the file system into hdfs with:

./hadoop fs -put /home/cscarioni/downloads/bible.txt bible.txt

to see that the file has been created do:

./hadoop fs -ls

I get the follwoing output

-rw-r--r-- 2 cscarioni supergroup 4445256 2011-01-24 18:25 /user/cscarioni/bible.txt

Now we create our MapReduce program (It just counts how many times the words GOD and Devil are in the file):



import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class GodVsDevils
{
    public static class WordMapper extends Mapper<LongWritable, Text, Text, LongWritable>
    {
        private LongWritable word = new LongWritable();
        private Text theKey = new Text();
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
        {
            String who =value.toString();
            word.set(1);
            if(who.equals("GOD"))
            {
                theKey.set("God");
                context.write(theKey, word);
            }
            else if(who.equals("Devil"))
            {
                theKey.set("Devil");
                context.write(theKey, word);
            }
        }
    }
    public static class AllTranslationsReducer
    extends Reducer<Text,LongWritable,Text,LongWritable>
    {
        private LongWritable result = new LongWritable();
        public void reduce(Text key, Iterable<;LongWritable>; values,
        Context context
        ) throws IOException, InterruptedException
        {
            long count = 0;
            for (LongWritable val : values)
            {
                count += val.get();
            }
            result.set(count);
            context.write(key, result);
        }
    }
    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();
        Job job = new Job(conf,"GodDevils");
        job.setJarByClass(GodVsDevils.class);
        job.setMapperClass(WordMapper.class);
        job.setReducerClass(AllTranslationsReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        FileInputFormat.addInputPath(job, new Path("/user/cscarioni"));
        FileOutputFormat.setOutputPath(job, new Path("output"));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}






We compile it , jar it and then execute the following in the master node:

./hadoop jar god.jar GodVsDevils -fs master-hadoop:9000 -jt master-hadoop:9001

This will run our map reduce in the hadoop cluster.

Monday, January 3, 2011

Building blocks of an Application Integration Framework

Based on my experience working with Mule ESB and Spring Integration, and reading about Open ESB and Service MIX, I realized that they are all based in almost the same concepts and ideas, ideas that come mainly from the book Enterprise Integration Patterns and its companion web site http://www.eaipatterns.com/

I’ll try to introduce here the main elements that constitute this integration frameworks (and ESBs) with main focus on the ones I know the most, Mule and Spring Integration but without implementation specifics.

I won’t get into details defining what an Integration Framework or an ESB are, as it’s not the point of this post. I’ll just say that they allow heterogeneous applications to communicate with one another, and that’s the basic concept.

For the next explanations I will refer to the Integration System in general as ESB although they are not necessarily the same.

Here is a diagram from the EIP web site that shows the different elements I’m explaining later:




The main element is of course the Message. It’s what we want to send across from one system to another. Inside the ESB the messages are usually normalized and encapsulated in a framework specific message, that normally carries the payload of the message and meta-information headers. The message can suffer multiple transformations through the ESB.

A basic interface for a Message in an ESB could be something like:


interface Message<T> {
    T getPayload()
    Object getHeader(String key)
    void addHeader(String key,Object value)
}





The Transformers are in charge of transforming the messages. The transformation can be easy things like adding new headers to the message to more complex stuff like tottally changing the content format of the payload (for example from an XML Soap payload to an Email message). Transformer can be chained to apply more than one transformation to a message, but as a good practice each transformer should just do one only transformation.

A basic Transformer interface would be something like:


interface Transformer{
    Message transform(Message message);
}



The message channels are of course very important elements. Message channels are the links through which the messages pass in their way from one point to the next. Messages are sent to channels which transport them across the different stages in the ESB.
The two most common divisions of channels are synchronous and asynchronous channels. A Channel will usually have two methods one for sending messages, and one for polling for messages


interface Channel{
    void send(Message message);
    Message receive();
}



The Endpoints (also refered to as gateways). The endpoints can be as the connectors, or adapters, that plug the ESB to the outside systems and protocols to enable the communication. There are normally two type of endpoints. In-Endpoints for allow external systems to connect to the ESB and Out-Endpoints to allow the ESB to connect to the external systems.
In-Endpoints have the mission of taking the incoming messages, normalizing it into ESB messages and puting it into the ESB flow.
Out-Endpoint take the message from the ESB flow, strip the ESB message information into the payload and message type understood by the out system and send the message out (of course it can wait for a response and process it further).

Endpoint implementation is specific to the kind of application they want to connect to. For example a In-Endpoint for tcp would be totally different from an In-Endpoint for file systems. The tcp probably would create a Listening server socket and wait for conections, and the file system one would probably poll some folder looking for files.

However they both most convert their messages into an ESB Message and put it in the ESB flow. So a simple interface for and endpoint would be like:


interface Endpoint{
    /**
    * This method will take the Original message as a parameter(for example a InputStream
    *from a socket) wrap it in a ESB Message and put it into the ESB flow.
    */
    void send(Object object);
    /**
    * This method will receive the ESB Message from the ESB, strip the correct payload and
    *return it for it to be send to the external System
    */
    Object receive();
}




Filters and Routers control the flow in a ESB. A filter allows or stops Messages for being further processed based on arbitrary rules. A Router redirects Messages to different channels depending on arbitrary rules.

A simple Filter Base Class would be:


class Filter {
    public void filter(Message message){
        if(accept(message)){
            nextChannel.send(message);
        }
        else{
            discardChannel.send(message);
        }
    }
    protected abstract boolean accept(message);
}



A simple Router Base Class would be like


class Router {
    public void route(Message message){
        getNextChannelForMessage(message).send(message);
    }
    protected abstract Channel getNextChannelForMessage(message);
}




That’s it. These are the main building blocks on which an integration solution is built, of course there is a lot to learn from here. For more detailed information refer to the great projects Spring Integration,and Mule ESB, as well as the book Enterprise Integration Patterns
previously mentioned