Friday, September 14, 2012

JRuby transparently running methods asynchronously. Combining Ruby metaprogramming techiniques and Java concurrent Future

JRuby is great, it offers an opportunity to combine my two favorite languages and here is another great way of combining Java power with Ruby beauty and convenience.

In this case I created a small gem in a couple of hours (still not really well tested, just some simple unit tests) that allows to use some of the nice metaprogramming techniques from Ruby to transparently execute methods asynchronously, wrapping their return value in a java.util.concurrent.Future, so that when we access any method of the returned object, the future's get method will be called to make sure we have access to the value only when we really need it.

What follows is the source code of the main file in the Gem. Where all the relevant logic is:

require 'java'
java_import 'java.util.concurrent.ExecutorService'
java_import 'java.util.concurrent.Executors'
java_import 'java.util.concurrent.Future'
java_import 'java.util.concurrent.TimeUnit'
java_import 'java.util.concurrent.Callable'

module Futurizeit
  module ClassMethods
    def futurize(*methods)
      Futurizeit.futurize(self, *methods)
    end
  end

  def self.included(klass)
    klass.extend(ClassMethods)
  end

  def self.executor
    @executor ||= Executors.newFixedThreadPool(10)
  end

  def self.futurize(klass, *methods)
    klass.class_eval do
      methods.each do |method|
        alias :"non_futurized_#{method}" :"#{method}"
        define_method :"#{method}" do |*args|
          @future = Futurizeit.executor.submit(CallableRuby.new { self.send(:"non_futurized_#{method}", *args) })
          Futuwrapper.new(@future)
        end
      end
    end
  end
end

module Futurizeit
  class Futuwrapper < BasicObject
    def initialize(future)
      @future = future
    end

    def method_missing(method, *params)
      instance = @future.get
      instance.send(method, *params)
    end
  end

  class CallableRuby
    include Callable

    def initialize(&block)
      @block = block
    end

    def call
      @block.call
    end
  end
end
The functionality can be used in two ways, including the module in a class and calling the macro method futurize on the class, or from the outside calling the Futurizeit.futurize method directly passing a class and the instance methods of that class that we want to run asynchronously.

The way it works is straightforward:

First it creates an alias to the original instance method called "non_futurized_xxx" where xxx is the name of the original method. Then it defines a new method with the original name. This method will create a CallableRuby object which implements (include the module) the Java Callable interface.

This CallableRuby instance is then submitted to a preconfigured ExecutorService. The ExecutorService will create a Future internally and return it inmediately. We the wrap this Future in a Futurewrapper instance.

The Futurewrapper is the object that will be returned by the method. When we try to access any method on this wrapper, it will internally call the future's get method which in turn will return the actual instance that the original method would have returned without the futurizing feature.

Following is the RSpec test that tests the current functionality:

require '../lib/futurizeit'

class Futurized
  def do_something_long
    sleep 3
    "Done!"
  end
end

class FuturizedWithModuleIncluded
  include Futurizeit
  def do_something_long
    sleep 3
    "Done!"
  end
  futurize :do_something_long
end


describe "Futurizer" do
  before(:all) do
    Futurizeit::futurize(Futurized, :do_something_long)
  end

  it "should wrap methods in futures and return correct values" do
    object = Futurized.new
    start_time = Time.now.sec
    value = object.do_something_long
    end_time = Time.now.sec
    (end_time - start_time).should < 2
    value.to_s.should == 'Done!'
  end

  it "should allow calling the value twice" do
     object = Futurized.new
     value = object.do_something_long
     value.to_s.should == 'Done!'
     value.to_s.should == 'Done!'
   end

  it "should increase performance a lot parallelizing work" do
    object1 = Futurized.new
    object2 = Futurized.new
    object3 = Futurized.new
    start_time = Time.now.sec
    value1 = object1.do_something_long
    value2 = object2.do_something_long
    value3 = object3.do_something_long
    value1.to_s.should == 'Done!'
    value2.to_s.should == 'Done!'
    value3.to_s.should == 'Done!'
    end_time = Time.now.sec
    (end_time - start_time).should < 4
  end

  it "should work with class including module" do
      object = FuturizedWithModuleIncluded.new
      start_time = Time.now.sec
      value = object.do_something_long
      end_time = Time.now.sec
      (end_time - start_time).should < 2
      value.to_s.should == 'Done!'
    end

  after(:all) do
    Futurizeit.executor.shutdown
  end
end


All the code is in https://github.com/calo81/futurizeit

Friday, September 7, 2012

Setting up a Hadoop virtual cluster with Vagrant

Usually for testing and using virtual machines, I go online, download the iso image of the machine I want to install, start Virtual Box, tell it to init from the iso, and install the OS manually, and then install the applications I want to use. It is a boring and tedious process but I never really cared very much about However recently I discovered the power of Vagrant and also Puppet. They allow me to automate all the steps I used to manually make before.

Here I test drive the process of automatically configuring a Hadoop cluster in virtual machines for a fully distributed mode.

First of all make sure you have Ruby installed. I’m testing with Ruby 1.9.3. You should also have Virtual Box installed. I have version 4.1.

Then from the command line install the vagrant gem:

gem install vagrant

Vagrant is a great tool that allow us to manage our Virtual Box machines using the command line and simple configuration files.

First we will install a linux Ubuntu virtual machine (or a box as it is called in vagrant)

vagrant box add base-hadoop http://files.vagrantup.com/lucid64.box

Then we go to a directory where we want to have our “workspace” and also the directory to create the vagrant configuration file for our new box and execute. This will create a Vagrantfile file with the vagrant configuration.

vagrant init base-hadoop

The virtual machine is ready to be started up now. You can start it by doing:

vagrant up

That is the virtual machine running. You can connect to it with ssh. type

vagrant ssh

Next step is to download Puppet. Do that going to the URL http://puppetlabs.com/misc/download-options/

Puppet is a tool that allow us to automate the process of provisioning servers. We will use it to manage our virtual machines, installing the required software on them and executing the required services.

So we create a directory where we are going to put our manifests (puppet configuration files)

mkdir manifests

in that new directory we create a file called base-hadoop.pp with the following content:

group { "puppet":
  ensure => "present",
}
 
In the Vagrantfile file that got created previously we uncomment the lines that look like:

config.vm.provision :puppet do |puppet|
     puppet.manifests_path = "manifests"
     puppet.manifest_file  = "base-hadoop.pp"
  end


The next thing we need to do is tell puppet to install Java in our servers. for that we open the base-hadoop.pp file and add the following:

exec { 'apt-get update':
    command => 'apt-get update',
}

package { "openjdk-6-jdk" :
   ensure => present
  require => Exec['apt-get update']
}


Next thing we need to install hadoop. For this we will create a new puppet module. A puppet module is used to encapsulate resources that belong to the same component.

We execute

mkdir -p modules/hadoop/manifests

Then we create an init.pp in this new manifests directory with the following content:

class hadoop {
 $hadoop_home = "/opt/hadoop"

exec { "download_hadoop":
command => "wget -O /tmp/hadoop.tar.gz http://apache.mirrors.timporter.net/hadoop/common/hadoop-1.0.3/hadoop-1.0.3.tar.gz",
path => $path,
unless => "ls /opt | grep hadoop-1.0.3",
require => Package["openjdk-6-jdk"]
}

exec { "unpack_hadoop" :
  command => "tar -zxf /tmp/hadoop.tar.gz -C /opt",
  path => $path,
  creates => "${hadoop_home}-1.0.3",
  require => Exec["download_hadoop"]
}
}


We have done a few things here, and they are almost self-explanatory. We are basically setting a variable to point to our hadoop installation. We are downloading Hadoop’s binaries from its Apache location and we are extracting it into the specified hadoop_home directory.

We need to add our new module to the main puppet configuration file. We add the following line at the top of the base-hadoop.pp file:

include hadoop

Then we add this new modules path to our Vagrantfile. So now our puppet section looks like:

config.vm.provision :puppet do |puppet|
     puppet.manifests_path = "manifests"
     puppet.manifest_file  = "base-hadoop.pp"
     puppet.module_path = "modules"
  end


We execute the following to reload the vagrant machine:

vagrant reload

That command will reload the vagrant machine and execute the puppet recipes. That will install the required software needed.

We will need a cluster of virtual machines. Vagrant supports that. we open our Vagrantfile and replace the content with the following:

Vagrant::Config.run do |config|
  config.vm.box = "base-hadoop"
  config.vm.provision :puppet do |puppet|
     puppet.manifests_path = "manifests"
     puppet.manifest_file  = "base-hadoop.pp"
     puppet.module_path = "modules"
  end
 
  config.vm.define :master do |master_config|
    master_config.vm.network :hostonly, "192.168.1.10"
  end

  config.vm.define :backup do |backup_config|
    backup_config.vm.network :hostonly, "192.168.1.11"
  end
 
  config.vm.define :hadoop1 do |hadoop1_config|
    hadoop1_config.vm.network :hostonly, "192.168.1.12"
  end
 
  config.vm.define :hadoop2 do |hadoop2_config|
    hadoop2_config.vm.network :hostonly, "192.168.1.13"
  end
 
  config.vm.define :hadoop3 do |hadoop3_config|
    hadoop3_config.vm.network :hostonly, "192.168.1.14"
  end
end


After this we execute:

vagrant up

That will start and provision all the servers. That will take a while

But we are not ready. Next we need to configure the hadoop cluster. In the directory modules/hadoop we create another directory called files. Here we will create the needed configuration files for our hadoop cluster.

we create the following files:

core-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 <configuration>
  <property>
   <name>fs.default.name</name>
   <value>hdfs://master:9000</value>
   <description>The name of the default file system. A URI whose scheme and authority determine the FileSystem implementation.</description>
  </property>
 </configuration>


hdfs-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
 <property>
  <name>dfs.replication</name>
  <value>3</value>
  <description>The actual number of replications can be specified when the file is created.</description>
 </property>
</configuration>
 


mapred-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
 <property>
  <name>mapred.job.tracker</name>
  <value>master:9001</value>
  <description>The host and port that the MapReduce job tracker runs at.</description>
 </property>
</configuration>
 


masters

192.168.1.11

slaves

192.168.1.12 192.168.1.13 192.168.1.14

We then need to tell puppet to copy these files to our cluster. So we modify our init.pp file in the hadoop puppet module to contain the following:

class hadoop {
 $hadoop_home = "/opt/hadoop"

exec { "download_hadoop":
command => "wget -O /tmp/hadoop.tar.gz http://apache.mirrors.timporter.net/hadoop/common/hadoop-1.0.3/hadoop-1.0.3.tar.gz",
path => $path,
unless => "ls /opt | grep hadoop-1.0.3",
require => Package["openjdk-6-jdk"]
}

exec { "unpack_hadoop" :
  command => "tar -zxf /tmp/hadoop.tar.gz -C /opt",
  path => $path,
  creates => "${hadoop_home}-1.0.3",
  require => Exec["download_hadoop"]
}
file {
  "${hadoop_home}-1.0.3/conf/slaves":
  source => "puppet:///modules/hadoop/slaves",
  mode => 644,
  owner => root,
  group => root,
  require => Exec["unpack_hadoop"]
 }
 
file {
  "${hadoop_home}-1.0.3/conf/masters":
  source => "puppet:///modules/hadoop/masters",
  mode => 644,
  owner => root,
  group => root,
  require => Exec["unpack_hadoop"]
 }

file {
  "${hadoop_home}-1.0.3/conf/core-site.xml":
  source => "puppet:///modules/hadoop/core-site.xml",
  mode => 644,
  owner => root,
  group => root,
  require => Exec["unpack_hadoop"]
 }
 
file {
  "${hadoop_home}-1.0.3/conf/mapred-site.xml":
  source => "puppet:///modules/hadoop/mapred-site.xml",
  mode => 644,
  owner => root,
  group => root,
  require => Exec["unpack_hadoop"]
 }
 
 file {
  "${hadoop_home}-1.0.3/conf/hdfs-site.xml":
  source => "puppet:///modules/hadoop/hdfs-site.xml",
  mode => 644,
  owner => root,
  group => root,
  require => Exec["unpack_hadoop"]
 }
}
 


We then execute:

vagrant provision

And we get these files copied to all our servers.

We need to setup ssh password-less communication between our servers. We modify our hadoop-base.pp and leave like this:

file {
  "/root/.ssh/id_rsa":
  source => "puppet:///modules/hadoop/id_rsa",
  mode => 600,
  owner => root,
  group => root,
  require => Exec['apt-get update']
 }
 
file {
  "/root/.ssh/id_rsa.pub":
  source => "puppet:///modules/hadoop/id_rsa.pub",
  mode => 644,
  owner => root,
  group => root,
  require => Exec['apt-get update']
 }

ssh_authorized_key { "ssh_key":
    ensure => "present",
    key    => "AAAAB3NzaC1yc2EAAAADAQABAAABAQCeHdBPVGuSPVOO+n94j/Y5f8VKGIAzjaDe30hu9BPetA+CGFpszw4nDkhyRtW5J9zhGKuzmcCqITTuM6BGpHax9ZKP7lRRjG8Lh380sCGA/691EjSVmR8krLvGZIQxeyHKpDBLEmcpJBB5yoSyuFpK+4RhmJLf7ImZA7mtxhgdPGhe6crUYRbLukNgv61utB/hbre9tgNX2giEurBsj9CI5yhPPNgq6iP8ZBOyCXgUNf37bAe7AjQUMV5G6JMZ1clEeNPN+Uy5Yrfojrx3wHfG40NuxuMrFIQo5qCYa3q9/SVOxsJILWt+hZ2bbxdGcQOd9AXYFNNowPayY0BdAkSr",
    type   => "ssh-rsa",
    user   => "root",
    require => File['/root/.ssh/id_rsa.pub']
}
 


We are ready to run our hadoop cluster now. For that, once again we modify the init.pp file in the hadoop puppet module, we add the following at the end, before closing the hadoop class:

 file {
  "${hadoop_home}-1.0.3/conf/hadoop-env.sh":
  source => "puppet:///modules/hadoop/hadoop-env.sh",
  mode => 644,
  owner => root,
  group => root,
  require => Exec["unpack_hadoop"]
 }
 


The haddop-env.sh file is the original one but we have uncommented the JAVA_HOME setting and pointed it to the correct Java installation.

We can give different names to each host in the Vagrantfile. For that we replace its contents with the following:


Vagrant::Config.run do |config|
  config.vm.box = "base-hadoop"
  config.vm.provision :puppet do |puppet|
     puppet.manifests_path = "manifests"
     puppet.manifest_file  = "base-hadoop.pp"
     puppet.module_path = "modules"
  end
 
  config.vm.define :backup do |backup_config|
    backup_config.vm.network :hostonly, "192.168.1.11"
    backup_config.vm.host_name = "backup"
  end
 
  config.vm.define :hadoop1 do |hadoop1_config|
    hadoop1_config.vm.network :hostonly, "192.168.1.12"
    hadoop1_config.vm.host_name = "hadoop1"
  end
 
  config.vm.define :hadoop2 do |hadoop2_config|
    hadoop2_config.vm.network :hostonly, "192.168.1.13"
    hadoop2_config.vm.host_name = "hadoop2"
  end
 
  config.vm.define :hadoop3 do |hadoop3_config|
    hadoop3_config.vm.network :hostonly, "192.168.1.14"
    hadoop3_config.vm.host_name = "hadoop3"
  end

  config.vm.define :master do |master_config|
    master_config.vm.network :hostonly, "192.168.1.10"
    master_config.vm.host_name = "master"
  end

end


Let’s do “vagrant reload” and wait for all systems to reload.

We have provisioned ur systems. Let’s go to our master node and start everything:

vagrant ssh master

then when we are logged in we go to /opt/hadoop-1.0.3/bin

and do:

sudo ./hadoop namenode -format

sudo ./start-all.sh

We have started now our hadoop cluster. Now we can visit http://192.168.1.10:50070/ to access our master node and see that our hadoop cluster is indeed running.

All the files for this example (except for the box itself) exist in git@github.com:calo81/vagrant-hadoop-cluster.git for free use.