Páginas

sábado, 14 de abril de 2018

Ejemplo de MapReduce con Java sobre HDFS

En esta entrada vamos a realizar una breve descripción de Apache Hadoop, del sistema de archivo distribuído HDFS y del algoritmo de procesamiento MapReduce

1. Introducción a Hadoop.

La utilidad de Hadoop radica en que nos permite procesar rápidamente un volumen inmenso de datos (Big Data), la clave está en que el procesamiento se produce en paralelo en múltiples nodos. Cada nodo procesa parte del trabajo y el resultado de todo ese procesamiento paralelo se combina en el resultado final.

Cuantos mas datos tengamos que procesar y/o cuanto mayor sea la velocidad de respuesta requerida, mas nodos paralelos deberemos tener instalada en la red de servidores Hadoop.

Hadoop se basa en dos 'patas':

1.1) Un sistema de archivos llamado HDFS distribuído en N máquinas o nodos

Ya hemos comentado que Hadoop es eficiente cuando opera sobre ficheros extremadamente grandes (petabytes), estos ficheros se trocean en bloques de 128MB que son almacenados en múltiples nodos de la red de servidores. Para minimizar problemas si un noco cae, cada uno de los trozos está disponible en al menos 3 servidores.

Hay un nodo maestro que gestiona qué trozos forman parte de cada fichero y en qué nodos está replicado. Este nodo maestro está también replicado para evitar caídas del sistema.

El sistema de Archivos HDFS corre sobre Java, por lo que puede ejecutar sobre cualquier arquitectura que cuente con una JVM.

1.2) Un algoritmo de procesamiento map-reduce basado en dos operaciones simples

El algoritmo base de Haddop se llama MapReduce. La base de MapReduce es muy simple:

  • Un algoritmo MapReduce recibe un conjunto de pares <Clave,Valor> y genera pares <Clave,Valor>
  • La operación Map recibe un conjunto de pares <Clave,Valor> y genera pares <Clave,Valor>
  • La operación Reduce recibe un conjunto de pares <Clave,Valor> y genera pares <Clave,Valor>

Supongamos que tenemos una ciudad con una red de miles sensores. Los sensores toman la temperatura de su ubicación cada 10 segundos y la envían a un almacén centralizado, los sensores que son nuestros productores de datos, podrían enviar sus lecturas a un topic de Apache Kafka y ese topic lo tendríamos enganchado a un consumidor HDFS mediante un conector.

Cada sensor enviaría un registro en formato JSON como el siguinte:

{"idsensor":"00000001", fechahora: "20180414214500", "longitud": -122.0829009197085, "latitud": 37.4238253802915, "temp": 11.2}

Si por ejemplo nos piden sacar la temperatura media de cada sensor para cada día del año pasado deberíamos programar la siguiente secuencia de procesos mapreduce:

1.2.1) Partimos de un fichero imaginario fichero-temperaturas-madrid-2017.json

El fichero no es otra cosa sino un conjunto de pares <Clave,Valor> donde las claves son los números de línea "numlinea" y los valores el contenido de cada línea "contenidolinea". Partiríamos pues de algo así:

[
{"numlinea": 1, "contenidolinea": {"idsensor":"00000001", "fechahora": "20180414214500", "longitud": -122.0829009197085, "latitud": 37.4238253802915, "temp": 11.2}},
{"numlinea": 2, "contenidolinea": {"idsensor":"00000001", "fechahora": "20180414214500", "longitud": -122.0829009197085, "latitud": 37.4238253802915, "temp": 11.2}},
{"numlinea": 3, "contenidolinea": {"idsensor":"00000001", "fechahora": "20180414214500", "longitud": -122.0829009197085, "latitud": 37.4238253802915, "temp": 11.2}},
....
{"numlinea": 999, "contenidolinea": {"idsensor":"00000001", "fechahora": "20180414214500", "longitud": -122.0829009197085, "latitud": 37.4238253802915, "temp": 11.2}}
]

1.2.2) Programamos una operación MAP

La operación MAP transformará los pares <numlinea, contenidolinea> en pares <idsensorFecha,temp>, que serían del tipo:

[
{"idsensorFecha":"00000001-20180414", "temp": 11.2},
{"idsensorFecha":"00000001-20180414", "temp": 11.3},
{"idsensorFecha":"00000001-20180414", "temp": 11.4},
....
{"idsensorFecha":"99999999-20180414", "temp": 9.0},
{"idsensorFecha":"99999999-20180414", "temp": 9.2},
{"idsensorFecha":"99999999-20180414", "temp": 9.2},
]

1.2.3) Mezcla y ordenación de la salida del MAP

La salida de la operación MAP realizada en cada trozo del fichero de cada uno de los nodos de Hadoop, se mezcla y ordena en un único fichero que contiene las líneas de todos los ficheros ordenadas por la clave "idsensorFecha" y agrupando las temperaturas en una matriz <idsensorFecha,[temp]>, con la pinta siguiente:

[
{"idsensorFecha":"00000001-20180414", "temperaturas": [{"temp": 11.2},{"temp": 11.3}, ...., {"temp": 11.4}]},
        ....
{"idsensorFecha":"99999999-20180414", "temperaturas": [{"temp": 11.2},{"temp": 11.3}, ...., {"temp": 11.4}]}
]

1.2.4) Programamos una operación REDUCE.

La operación REDUCE recibe pares del tipo <idsensorFecha,[temp]> y genera un pares del tipo <idsensorFecha,tempMedia>, donde hemos sacado la media de todas las temperaturas de un sensor en una cierta fecha, generamos por tanto pares del tipo:

[
{"idsensorFecha":"00000001-20180414", "tempMedia": 13.2},
        ....
{"idsensorFecha":"99999999-20180414", "tempMedia": 17.1}
]

2. Instalar Hadoop

Accedemos a la zona de descargas de Apache Hadoop:

http://hadoop.apache.org/releases.html#Download

Nos bajamos la versión ejecutable mas actualizada, en este caso: 

http://apache.rediris.es/hadoop/common/hadoop-3.1.0/hadoop-3.1.0.tar.gz

Vamos ahora a instalar Hadoop en modo autónomo:

  • Descomprimimos en nuestro $HOME
  • Definimos la variable HADOOP_HOME ajustando su valor en el .profile

HADOOP_HOME=/home/egdepedro/hadoop-3.1.0
export HADOOP_HOME
PATH="$HADOOP_HOME/bin:$PATH"

  • Reiniciamos la sesión o recargamos en bash el contenido del .profile con el comando:

$ . ./.profile

  • Verificamos que todo se ha instalado correctamente lanzando el comando

egdepedro@OBERON:~$ hadoop version
Hadoop 3.1.0
Source code repository https://github.com/apache/hadoop -r 16b70619a24cdcf5d3b0fcf4b58ca77238ccbe6d
Compiled by centos on 2018-03-30T00:00Z
Compiled with protoc 2.5.0
From source with checksum 14182d20c972b3e2105580a1ad6990
This command was run using /home/egdepedro/hadoop-3.1.0/share/hadoop/common/hadoop-common-3.1.0.jar

3. Obtención de datos y definición del ejemplo

Hoy en día muchos organismos y ayuntamientos publican sus datos en una 'iniciativa' conocida como 'Open Data' cuyo objetivo final es que cualquier ciudadano/empresa sea capaz de explotar esos datos de forma productiva. Los datos están siempre convenientemente anonimizados, es decir, no se publica nada sobre nadie que permita identificar al afectado.

Vamos a descargar por ejemplo los datos del Ayuntamiento de Madrid:  https://datos.madrid.es/portal/site/egob/ y descargamos por ejemplo los accidentes de tráfico con bicicleta del 2017. El enlace es: https://datos.madrid.es/egob/catalogo/300110-0-accidentes-bicicleta.csv

Revisando el fichero y limpiando un poco espacios en blanco inútiles he generado el fichero 'accidentes-bicicletas-madrid-2017.csv' que tiene la siguiente estructura:

Fecha;TRAMO HORARIO;Nm Tot Victimas;DISTRITO;Lugar;Numero;Tipo Accidente;Tipo Vehiculo

01/01/2017;DE 6:00 A 6:59;1;ARGANZUELA;CALLE DE TOLEDO;120;CHOQUE CON OBJETO FIJO;BICICLETA           
02/01/2017;DE 21:00 A 21:59;1;SAN BLAS;CALLE DE MEQUINENZA;14;CAÍDA BICICLETA;BICICLETA           
03/01/2017;DE 19:00 A 19:59;1;CENTRO;CALLE DE LA ESCALINATA;8;CAÍDA BICICLETA;BICICLETA           
04/01/2017;DE 21:00 A 21:59;1;CENTRO;CALLE DE LA CAVA DE SAN MIGUEL;13;CAÍDA BICICLETA;BICICLETA           
05/01/2017;DE 8:00 A 8:59;1;MONCLOA-ARAVACA;PUENTE DE LOS FRANCESES;;CAÍDA BICICLETA;BICICLETA           
05/01/2017;DE 12:00 A 12:59;1;LATINA;CARRETERA DE BOADILLA DEL MONTE;27;CAÍDA BICICLETA;BICICLETA         

4. Definimos el caso de uso

Supondremos que alguien nos solicita extraer el 'Número total de muertos por distrito', algo del estilo siguiente:

ARGANZUELA 11
SAN BLAS 15
....

5. Cargamos los datos en una carpeta del HDFS (sistema de ficheros distribuído de Hadoop) que llamaremos 'bicicletas:

egdepedro@OBERON:~/Escritorio$ hadoop fs -mkdir bicicletas
egdepedro@OBERON:~/Escritorio$ hadoop fs -put ./accidentes-bicicletas-madrid-2017.csv bicicletas

Podemos comprobar si se ha subido bien mediante el comando:
 
egdepedro@OBERON:~/Escritorio$ hadoop fs -ls bicicletas
Found 1 items
-rw-r--r--   1 egdepedro egdepedro      83474 2018-04-14 09:59 bicicletas/accidentes-bicicletas-madrid-2017.csv

6. Montamos los programas mapreduce para los dos casos de uso

6.1) Creamos un proyecto Maven y definimos las dependencias con el siguiente pom.xml 

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.casa</groupId>
<artifactId>TestMapReduce</artifactId>
<version>1.0</version>
<properties>
<slf4j.version>1.7.25</slf4j.version>
<proyecto.build.jdk_version>1.8</proyecto.build.jdk_version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop-common.version>3.1.0</hadoop-common.version>
<hadoop-client.version>3.1.0</hadoop-client.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>${proyecto.build.jdk_version}</source>
<target>${proyecto.build.jdk_version}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop-common.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop-client.version}</version>
</dependency>
</dependencies>
</project>

6.2) Para cada cado de uso vamos a crear 3 clases:

Clase Driver. Donde figura el Main que define el trabajo a realizar: 

package org.casa;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AccidentesPorDistritoDriver {

private static final Logger LOGGER = LoggerFactory.getLogger(AccidentesPorDistritoDriver.class);

public static void main(String[] args) throws IOException {

LOGGER.info("Se crea el trabajo y su configuración, asignándole un nombre");
final JobClient miClienteHadoop = new JobClient();
final JobConf configTrabajo = new JobConf(AccidentesPorDistritoDriver.class);
configTrabajo.setJobName("AccidentesPorDistrito");

LOGGER.info("Se definen los tipos de los pares <clave,valor> de salida <texto, entero>");
configTrabajo.setOutputKeyClass(Text.class);
configTrabajo.setOutputValueClass(IntWritable.class);

LOGGER.info("Se define las clases mapper y reducer");
configTrabajo.setMapperClass(AccidentesPorDistritoMapper.class);
configTrabajo.setReducerClass(AccidentesPorDistritoReducer.class);

LOGGER.info("Se definen los tipos de los ficheros de entrada y salida (en ambos casos ficheros de texto plano)");
configTrabajo.setInputFormat(TextInputFormat.class);
configTrabajo.setOutputFormat(TextOutputFormat.class);

LOGGER.info("Se definen los directorios de entrada/salida");
FileInputFormat.setInputPaths(configTrabajo, new Path(args[0]));
FileOutputFormat.setOutputPath(configTrabajo, new Path(args[1]));

LOGGER.info("Cargamos el trabajo en el cliente Hadoop");
miClienteHadoop.setConf(configTrabajo);

try {
LOGGER.info("Ejecutamos el trabajo");
JobClient.runJob(configTrabajo);
} catch (Exception e) {
e.printStackTrace();
}
}
}

Clase Mapper. Donde se implementa la rutina de mapeo.

package org.casa;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Procesamos las líneas de un fichero CSV que contienen:
 * 
 *   fecha;tramo-horario;num-victimas;distrito;lugar;numero;tipo-accidente;tipo-vehiculo
 *   
 * Extraemos de cada línea el par <distrito, num-victimas>
 */
public class AccidentesPorDistritoMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {

private static final Logger LOGGER = LoggerFactory.getLogger(AccidentesPorDistritoMapper.class);

public void map(LongWritable numLinea, Text txtLinea, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException {

LOGGER.info(String.format("Recibimos el par <%s,%s>", numLinea.toString(), txtLinea));
final StringTokenizer s = new StringTokenizer(txtLinea.toString(),";");
s.nextToken();
s.nextToken();
final int numMuertos = Integer.parseInt(s.nextToken());
final String distrito = s.nextToken();

LOGGER.info(String.format("Generamos el par <%s,%d>", distrito, numMuertos));
output.collect(new Text(distrito), new IntWritable(numMuertos));
}
}

Clase Reducer. Donde se implementa la rutina de reduccion.

package org.casa;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AccidentesPorDistritoReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

private static final Logger LOGGER = LoggerFactory.getLogger(AccidentesPorDistritoReducer.class);

@Override
public void reduce(Text distrito, Iterator<IntWritable> muertosDistrito, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

LOGGER.info(String.format("Recibimos el par <%s,[VALORES]>", distrito.toString()));
final Text key = distrito;
int totalMuertosDistrito = 0;
int parcialMuertosDistrito = 0;
while (muertosDistrito.hasNext()) {
parcialMuertosDistrito = ((IntWritable)muertosDistrito.next()).get();
totalMuertosDistrito += parcialMuertosDistrito;
}
LOGGER.info(String.format("Generamos el par <%s,%d>", key, totalMuertosDistrito));
output.collect(key, new IntWritable(totalMuertosDistrito));
}
}

6.3) Ejecutamos el caso de uso


  • Nos posicionamos en el directorio raiz del proyecto, donde está el fichero pom.xml
  • Lanzamos el comando maven que genera el jar: mvn clean package
  • Lanzamos el comando Hadoop para que se elecute

    $ hadoop jar TestMapReduce-1.0.jar org.casa.AccidentesPorDistritoDriver bicicletas/accidentes-bicicletas-madrid-2017.csv salida

  • Obtendremos una consola como:

2018-04-14 13:19:17,464 INFO casa.AccidentesPorDistritoDriver: Se crea el trabajo y su configuración, asignándole un nombre
2018-04-14 13:19:17,689 INFO casa.AccidentesPorDistritoDriver: Se definen los tipos de los pares <clave,valor> de salida <texto, entero>
2018-04-14 13:19:17,691 INFO casa.AccidentesPorDistritoDriver: Se define las clases mapper y reducer
2018-04-14 13:19:17,694 INFO casa.AccidentesPorDistritoDriver: Se definen los tipos de los ficheros de entrada y salida (en ambos casos ficheros de texto plano)
2018-04-14 13:19:17,697 INFO casa.AccidentesPorDistritoDriver: Se definen los directorios de entrada/salida
2018-04-14 13:19:18,098 INFO casa.AccidentesPorDistritoDriver: Cargamos el trabajo en el cliente Hadoop
2018-04-14 13:19:18,098 INFO casa.AccidentesPorDistritoDriver: Ejecutamos el trabajo
....
....
2018-04-14 13:19:19,103 INFO casa.AccidentesPorDistritoMapper: Recibimos el par <0,01/01/2017;DE 6:00 A 6:59;1;ARGANZUELA;CALLE DE TOLEDO;120;CHOQUE CON OBJETO FIJO;BICICLETA>
2018-04-14 13:19:19,103 INFO casa.AccidentesPorDistritoMapper: Generamos el par <ARGANZUELA,1>
2018-04-14 13:19:19,104 INFO casa.AccidentesPorDistritoMapper: Recibimos el par <93,02/01/2017;DE 21:00 A 21:59;1;SAN BLAS;CALLE DE MEQUINENZA;14;CAÍDA BICICLETA;BICICLETA>
2018-04-14 13:19:19,104 INFO casa.AccidentesPorDistritoMapper: Generamos el par <SAN BLAS,1>
2018-04-14 13:19:19,104 INFO casa.AccidentesPorDistritoMapper: Recibimos el par <183,03/01/2017;DE 19:00 A 19:59;1;CENTRO;CALLE DE LA ESCALINATA;8;CAÍDA BICICLETA;BICICLETA>
2018-04-14 13:19:19,104 INFO casa.AccidentesPorDistritoMapper: Generamos el par <CENTRO,1>
2018-04-14 13:19:19,104 INFO casa.AccidentesPorDistritoMapper: Recibimos el par <273,04/01/2017;DE 21:00 A 21:59;1;CENTRO;CALLE DE LA CAVA DE SAN MIGUEL;13;CAÍDA BICICLETA;BICICLETA>
2018-04-14 13:19:19,104 INFO casa.AccidentesPorDistritoMapper: Generamos el par <CENTRO,1>
....
....
2018-04-14 13:19:19,685 INFO casa.AccidentesPorDistritoReducer: Recibimos el par <ARGANZUELA,[VALORES]>
2018-04-14 13:19:19,686 INFO casa.AccidentesPorDistritoReducer: Generamos el par <ARGANZUELA,[VALORES]>
2018-04-14 13:19:19,687 INFO casa.AccidentesPorDistritoReducer: Recibimos el par <BARAJAS,[VALORES]>
2018-04-14 13:19:19,688 INFO casa.AccidentesPorDistritoReducer: Generamos el par <BARAJAS,[VALORES]>
2018-04-14 13:19:19,688 INFO casa.AccidentesPorDistritoReducer: Recibimos el par <CARABANCHEL,[VALORES]>
2018-04-14 13:19:19,688 INFO casa.AccidentesPorDistritoReducer: Generamos el par <CARABANCHEL,[VALORES]>
2018-04-14 13:19:19,688 INFO casa.AccidentesPorDistritoReducer: Recibimos el par <CENTRO,[VALORES]>
2018-04-14 13:19:19,690 INFO casa.AccidentesPorDistritoReducer: Generamos el par <CENTRO,[VALORES]>
....
....
2018-04-14 13:19:19,946 INFO mapreduce.Job: Job job_local296713609_0001 completed successfully
2018-04-14 13:19:19,953 INFO mapreduce.Job: Counters: 30
....
....

  • Obtendremos, el la carpeta salida, un fichero 'part-00000' con un contenido ordenado como el siguiente:

ARGANZUELA 60
BARAJAS 18
CARABANCHEL 44
CENTRO 149
CHAMARTIN 24
CHAMBERI 56
...

REFERENCIAS:

https://www.guru99.com/create-your-first-hadoop-program.html
https://datos.madrid.es/portal/site/egob/