En los post anteriores instalamos Hadoop en un nodo, tanto en modo standalone como pseudo-distribuido, y en un cluster de dos nodos, ahora revisaremos la creación de una aplicación MapReduce.

Hadoop Logo

Para estas pruebas configuraremos un ambiente de desarrollo usando una instalación en modo pseudo-distribuida, eclipse y el plugin de Hadoop para éste IDE. También se puede usar la maquina virtual de entrenamiento proveída por Cloudera, que incluye una instalación de Hadoop, ejemplos, eclipse, entre otras herramientas. Para ejecutar ésta máquina virtual se puede usar VMWare Player. Una vez descargada la imagen de la maquina virtual se descomprime, se abre el archivo .vmx con VMPlayer.

Configuración del entorno de desarrollo

Para nuestro ambiente de desarrollo iniciamos con una máquina con Hadoop configurado en modo pseudo-distribuido, descargamos e instalamos eclipse, y procedemos con la instalación del plugin de Hadoop:

  1. Establecer las variables de entorno JAVA_HOME y HADOOP_HOME, con la ruta a la carpeta raiz de Java y Hadoop respectivamente.
  2. Copiar el plugin de hadoop a la carpeta de plugins de eclipse.
    cp $HADOOP_HOME/contrib/eclipse-plugin/hadoop-0.20.2-eclipse-plugin.jar PATH_TO/eclipse/plugins/
  3. Iniciamos Eclipse.
  4. Abrimos la perspectiva Map/Reduce. Window->Open Perspective->Others->Map/Reduce
  5. Entramos en la vista MapReduce Locations.
  6. Clic derecho sobre la lista MapReduce Locations, seleccionar New Hadoop location.
  7. Le damos un nombre a la nueva ubicación, y configuramos el host y puerto tanto del MapReduce master(JobTracker) cómo del dfs.master (NameNode).
  8. Ahora en el explorador de proyectos podremos ver los archivos de nuestro sistema de archivos distribuido en DFS Locations.

Ahora tenemos listo nuestro ambiente de desarrollo, a continuación crearemos una aplicación MapReduce.

Crear una nueva aplicación

El plugin de Hadoop nos crea un nuevo tipo de proyecto en eclipse y tres tipos de archivo Mapper, Reducer y MapReduce Driver. Para crear nuestra nueva aplicación crearemos un proyecto MapReduce:

En la siguiente ventana seleccionamos el nombre de nuestro proyecto y configuramos el directorio de la instalación de Hadoop a utilizar.

Continuamos con el asistente hasta finalizar.

Ahora crearemos la primera aplicación, con el ejemplo de batalla, el WordCount, utilizando las facilidades que nos da el plugin:

  1. Creamos un nuevo Mapper: file->new->Mapper
    package ejemplos.wordcount;
    
    import java.io.IOException;
    import java.util.*;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.Mapper;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Reporter;
    
    public class WordCountMapper extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable>  {
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
           word.set(tokenizer.nextToken());
           output.collect(word, one);
        }
      }
    }
    
  2. Si el IDE no encuentra la libreria de Hadoop, agregar hadoop<version>-core.jar al proyecto.
  3. Creamos un nuevo Reducer: file->new->Reducer
    package ejemplos.wordcount;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Reducer;
    import org.apache.hadoop.mapred.Reporter;
    
    public class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
    
    	public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
    		int sum = 0;
    		while (values.hasNext()) {
    			sum += values.next().get();
    		}
    		output.collect(key, new IntWritable(sum));
    	}
    }
    
  4. Creamos el MapReduce Driver, seleccionando las clases que generamos anteriormente como Mapper y Reducer.
    Modificamos las propiedades del Job:

    package ejemplos.wordcount;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapred.*;
    
    public class WordCount {
    
    	public static void main(String[] args) {
    		JobClient client = new JobClient();
    		JobConf conf = new JobConf(ejemplos.wordcount.WordCount.class);
    		conf.setJobName("wordcount");
    
    		//  specify output types
    		conf.setOutputKeyClass(Text.class);
    		conf.setOutputValueClass(IntWritable.class);
    
    		//  specify input and output Format and DIRECTORIES (not files)
    		conf.setInputFormat(TextInputFormat.class);
    		conf.setOutputFormat(TextOutputFormat.class);
    
    		FileInputFormat.setInputPaths(conf, new Path("input"));
    		FileOutputFormat.setOutputPath(conf, new Path("output"));
    
    		conf.setMapperClass(ejemplos.wordcount.WordCountMapper.class);
    		            conf.setCombinerClass(ejemplos.wordcount.WordCountReducer.class);
    		conf.setReducerClass(ejemplos.wordcount.WordCountReducer.class);
    
    		client.setConf(conf);
    		try {
    			JobClient.runJob(conf);
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    
    }
    
    
  5. Subimos algunos datos al sistema de archivos distribuido, lo podemos hacer desde el DFS location en el project explorer o desde una terminal ejecutando:
    $HADOOP_HOME/bin/hadoop fs -put conf input
  6. Ahora ejecutamos la aplicación, si el plugin funciona se puede ejecutar haciendo clic derecho sobre la clase que actua como MapReduce Driver y Run As->Run on Hadoop aparecerá una ventana preguntando en cual ubicación de Hadoop ejecutar la aplicación. Si no funciona, generamos el jar y lo ejecutamos:
    $HADOOP_HOME/bin/hadoop jar wordcount.jar input output

    y podemos ver el progreso de la tarea:

10/10/17 23:27:31 INFO mapred.FileInputFormat: Total input paths to process : 16
10/10/17 23:27:32 INFO mapred.JobClient: Running job: job_201010172123_0001
10/10/17 23:27:33 INFO mapred.JobClient:  map 0% reduce 0%
10/10/17 23:27:54 INFO mapred.JobClient:  map 12% reduce 0%
10/10/17 23:28:06 INFO mapred.JobClient:  map 25% reduce 0%
10/10/17 23:28:09 INFO mapred.JobClient:  map 25% reduce 4%
10/10/17 23:28:15 INFO mapred.JobClient:  map 37% reduce 4%
10/10/17 23:28:18 INFO mapred.JobClient:  map 37% reduce 8%
10/10/17 23:28:24 INFO mapred.JobClient:  map 50% reduce 12%
10/10/17 23:28:33 INFO mapred.JobClient:  map 50% reduce 16%
10/10/17 23:28:36 INFO mapred.JobClient:  map 62% reduce 16%
10/10/17 23:28:45 INFO mapred.JobClient:  map 75% reduce 16%
10/10/17 23:28:48 INFO mapred.JobClient:  map 75% reduce 20%
10/10/17 23:28:55 INFO mapred.JobClient:  map 87% reduce 22%
10/10/17 23:28:58 INFO mapred.JobClient:  map 87% reduce 25%
10/10/17 23:29:01 INFO mapred.JobClient:  map 87% reduce 29%
10/10/17 23:29:04 INFO mapred.JobClient:  map 100% reduce 29%
10/10/17 23:29:13 INFO mapred.JobClient:  map 100% reduce 100%
10/10/17 23:29:15 INFO mapred.JobClient: Job complete: job_201010172123_0001
10/10/17 23:29:15 INFO mapred.JobClient: Counters: 18
10/10/17 23:29:15 INFO mapred.JobClient:   Job Counters
10/10/17 23:29:15 INFO mapred.JobClient:     Launched reduce tasks=1
10/10/17 23:29:15 INFO mapred.JobClient:     Launched map tasks=16
10/10/17 23:29:15 INFO mapred.JobClient:     Data-local map tasks=16
10/10/17 23:29:15 INFO mapred.JobClient:   FileSystemCounters
10/10/17 23:29:15 INFO mapred.JobClient:     FILE_BYTES_READ=16565
10/10/17 23:29:15 INFO mapred.JobClient:     HDFS_BYTES_READ=18792
10/10/17 23:29:15 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=33732
10/10/17 23:29:15 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=10740
10/10/17 23:29:15 INFO mapred.JobClient:   Map-Reduce Framework
10/10/17 23:29:15 INFO mapred.JobClient:     Reduce input groups=586
10/10/17 23:29:15 INFO mapred.JobClient:     Combine output records=820
10/10/17 23:29:15 INFO mapred.JobClient:     Map input records=594
10/10/17 23:29:15 INFO mapred.JobClient:     Reduce shuffle bytes=16655
10/10/17 23:29:15 INFO mapred.JobClient:     Reduce output records=586
10/10/17 23:29:15 INFO mapred.JobClient:     Spilled Records=1640
10/10/17 23:29:15 INFO mapred.JobClient:     Map output bytes=25180
10/10/17 23:29:15 INFO mapred.JobClient:     Map input bytes=18792
10/10/17 23:29:15 INFO mapred.JobClient:     Combine input records=1819
10/10/17 23:29:15 INFO mapred.JobClient:     Map output records=1819
10/10/17 23:29:15 INFO mapred.JobClient:     Reduce input records=820

Cuando termine el trabajo podremos ver la salida en la carpeta Output del DFS, tanto a través del plugin (probablemente necesite un refresh o un reconect para actualizarse) como por linea de comandos:

$HADOOP_HOME/bin/hadoop fs -ls output
$HADOOP_HOME/bin/hadoop fs -cat output/part*

En un próximo artículo revisaremos la integración de Hadoop con otras fuentes de datos. Mientras tanto les dejo algunas referencias útiles sobre Hadoop:

Un enfoque práctico: Pro Hadoop. En especial el capítulo 2, para entender las bases de un Job MapReduce.
La guía definitiva: Hadoop: The Definitive Guide. Es la guía recomendada por Cloudera.

package ejemplos.wordcount;import java.io.IOException;
import java.util.Iterator;import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}

4 thoughts on “Mordiendo Hadoop: Desarrollo de aplicaciones MapReduce

  1. Hola Christian!! Muy bueno tu post y muy bien explicado.
    Te comento mi problema. Estoy haciendo un proyecto y necesito utilizar hadoop, pero no consigo hacerlo funcionar con Eclipse. Mi principal problema es que no consigo configurar el entorno de desarrollo. Me explico: cuando estoy en la vista MapReduce Locations y hago clic derecho sobre la lista MapReduce Locations y selecciono New Hadoop location pero no hace nada, no sale ninguna ventana. Tampoco me sale nada cuado le doy a Edit Hadoop location. Me estoy volviendo loca, no se que puede ser.
    He probado con tres versiones de eclipse distintas, con tres versiones de hadoop distintas, en windows y en linux y nada, no hay forma de configurarlo en Eclipse.
    Alguna idea?
    Veo que tu usas la version hadoop-0.20.2, es por algo en especial? Supongo que sera porque es la estable, pero podria usar la hadoop-0.21.0? Que versión de Eclise usas tu?
    Muchas gracias por tus post. Me están siendo de gran ayuda.

    Saludos

  2. Para el ejemplo estoy utilizando eclipse Helios, con la 0.21.0 te debería funcionar, yo uso la 0.20.2 solo por ser la versión “estable”. En éste hilo mencionan el problema con el plugin y hay un parche que lo corrige, puedes compilar el plugin después de aplicar el parche o puedes probar usar una versión de eclipse anterior (Europa, o cualquier otra anterior a la 3.4, dado que el problema parece venir desde Ganymede).
    Me cuentas como te va 😛

  3. Vale, me he bajado eclipse Europa y la versión 0.20.2 y ya funciona. Juraría que esa prueba ya la había hecho, pero parece ser que no… jejeje. He probado tantas versiones distintas que ya no se si había probado esa combinación.
    Con Helios me sigue sin salir. Me conformaré con Europa. Muchas gracias!!

  4. Pingback: Mordiendo Hadoop – Instalación con Ambari « Christian Ariza

Deja un comentario