Apache Spark 2 : Flat map con Java


A diferencia de Map, FlatMap nos permitirá generar una lista de elementos de uno solo, en Map teníamos un elemento de entrada y teníamos uno de salida, con FlatMap tendremos un elemento de entrada y a partir de este generaremos un conjunto de elementos de salida.

En el post Apache Spark 2 : Conceptos básicos utilizamos Flat map para realizar un conteo de palabras, en este post haremos una variación del programa word count, veamos el siguiente código:


import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

/**
 * @author maagapi
 *
 */
public class FlatMapExample {
	public static void main(String[] args) {
		JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("FlatMapExample").setMaster("local[3]"));
		List phrases = Arrays.asList("This is the first set of words",
							"This is the second set of words",
							"This is the third set of words",
							"This is the fourth set of words",
							"This is the last set of words");
		JavaRDD phrasesRdd = sc.parallelize(phrases);
		System.out.printf("The program has %d phrases \n",phrasesRdd.count());

		JavaRDD words = phrasesRdd.flatMap(f->Arrays.asList(f.split(" ")).iterator());
		System.out.printf("The program has %d words \n",words.count());

		sc.close();

	}
}

Analicemos el código anterior:

  • Primero creamos nuestro contexto de Spark
  • Tomamos como entrada un conjunto de frases
  • Transformamos la lista de frases en un RDD
  • Imprimimos el número de frases en nuestro RDD, en este caso imprimirá 5
  • Después haciendo uso de flatMap partiremos nuestro RDD de frases y lo transformaremos en un RDD de palabras
  • Una vez que tenemos nuestro RDD de palabras imprimimos el conteo, en este caso imprimirá 35
  • Terminamos nuestro contexto de Spark

Como vemos, la operación Flat map nos permitió tomar una frase de entrada y generar muchas palabras de salida, esto es diferente a map ya que map solo permite tener por cada valor de entrada, uno de salida.

Puedes encontrar el código completo en el siguiente enlace https://github.com/raidentrance/spark-tutorial.

En próximos posts veremos ejemplos cada vez más complejos, síguenos en nuestras redes sociales para enterarte sobre nuevo contenido https://www.facebook.com/devs4j/ y https://twitter.com/devs4j.

Autor: Alejandro Agapito Bautista

Twitter: @raidentrance

Contacto:raidentrance@gmail.com

Apache Spark 2 : Transformaciones (Map y filter )


filterandmap_sparkRecordemos que las transformaciones son operaciones sobre RDDs que darán como resultado un nuevo RDD, las transformaciones más comunes son las siguientes:

  • Filter: Toma una función que devuelve un nuevo RDD formado por los elementos que pasen una función filtro. Esta función se puede utilizar para limpiar un RDD de entrada, veamos un ejemplo:
public class FilterNamesExample {
	public static void main(String[] args) {
		JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("FilterNamesExample").setMaster("local[3]"));
		List names = Arrays.asList("Alex", "Pedro", "", "Juan", "Pancho", "");
		JavaRDD namesRdd = sc.parallelize(names);
		System.out.printf("Names before filter %d \n", namesRdd.count());
		namesRdd = namesRdd.filter(f->!f.isEmpty());
		System.out.printf("Names after filter %d \n", namesRdd.count());
		sc.close();
	}
}

En el código anterior podemos ver que tenemos una entrada de nombres y algunos de ellos están vacíos, lo cual sería una entrada inválida para nuestro sistema, haciendo uso de la función filter crearemos un rdd nuevo conteniendo solo los valores que son válidos.

  • Map : Toma una función y la aplica a cada elemento del RDD, el resultado de la función será utilizado para crear el nuevo elemento en el nuevo RDD, es importante comprender que el tipo de dato del RDD resultado no debe ser necesariamente del mismo tipo que el RDD de entrada, veamos el siguiente ejemplo:
class Person {
	private String name;
	private String lastName;

	public Person(String name, String lastName) {
		this.name = name;
		this.lastName = lastName;
	}

	//Getters and setters
}

Como vemos tenemos una clase llamada Person que contiene un nombre y un apellido.

public class MapPersonExample {
	public static void main(String[] args) {
		JavaSparkContext sc = new JavaSparkContext(
				new SparkConf().setAppName("FilterNamesExample").setMaster("local[3]"));
		List names = Arrays.asList("Alex,Bautista", "Pedro,Lopez", "Arturo,Mendez", "Juan,Sánchez",
				"Pancho,Pantera", "Jon,Smith");
		JavaRDD peopleRdd = sc.parallelize(names);
		JavaRDD customPeople = peopleRdd.map(p -> new Person(p.split(",")[0], p.split(",")[1]));
		System.out.println(customPeople);
		sc.close();
	}
}

El código anterior leerá un RDD de tipo String en el que cada elemento contendrá un nombre y un apellido, después utilizará la función map para transformarlo de un RDD de String a un RDD de tipo Person.

Cada vez veremos ejemplos más complejos, síguenos en nuestras redes sociales para enterarte sobre nuevo contenido https://www.facebook.com/devs4j/ y https://twitter.com/devs4j.

Puedes encontrar el código completo en el siguiente link https://github.com/raidentrance/spark-tutorial.

Autor: Alejandro Agapito Bautista

Twitter: @raidentrance

Contacto:raidentrance@gmail.com

Apache Spark 2 : Creando un RDD con Java


portada-createjavardd

En el post anterior aprendimos los conceptos básicos de un RDDs, en este post los implementaremos con Java y analizaremos su funcionamiento. En Spark tenemos tres formas de crear RDDs :

  • Paralelizando una colección existente en tu programa
  • Creando un dataset de un almacenamiento externo
  •  Creando un RDD de un RDD existente

A continuación veremos como realizar con Java cada una de las formas mencionadas.

Paralelizando una colección existente en tu programa

La primer forma será paralelizando una colección existente en tu programa, veamos un ejemplo:

public class CreateRDDParallelizing {
	public static void main(String[] args) {
		JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("WordCount").setMaster("local[3]"));

		List list = Arrays.asList(1,2,3,4,5,6);
		JavaRDD rdd = sc.parallelize(list);
		//Nuestras transformaciones y acciones
		sc.close();
	}
}

Como se puede ver creamos un objeto de tipo List de java y con spark lo paralelizamos para crear un JavaRDD, recordemos que el objeto JavaSparkContext representa una conexión a un cluster de Spark (Hablaremos más de este contexto en futuros posts) y lo utilizaremos para crear RDDs.

Todos los elementos de la lista serán copiados a un dataset distribuido donde se podrán operar de forma paralela, esto lo hace muy práctico para realizar pruebas pero no para escenarios reales productivos ya que debe cargar en memoria de un nodo todos los datos antes de distribuirlos a lo largo de todo el cluster.

Creando un dataset de un almacenamiento externo

La siguiente forma es cargando un dataset de un almacenamiento externo, en este ejemplo utilizaremos un archivo de texto, veamos el código:

public class CreateRDDFromExternalStorage {
	public static void main(String[] args) {
		JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("WordCount").setMaster("local[3]"));
		JavaRDD rdd = sc.textFile("src/main/resources/words.txt");
		// Use your RDD here
		System.out.println(rdd);
		sc.close();
	}
}

Utilizaremos el método textFile(String file) para leer los datos del archivo y crear un rdd a partir de ellos. Normalmente se utilizarán almacenamientos externos como Amazon s3, HDFS, SFTP, etc. Recordemos que no estamos limitados a archivos sino que también podremos leer datos de JDBC, Cassandra, ElasticSearch, etc.

Creando un RDD de un RDD existente

La última forma será crear un RDD de un RDD existente, veamos el siguiente ejemplo:

public class CreateRDDFromExistingRDD {
	public static void main(String[] args) {
		JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("WordCount").setMaster("local[3]"));

		List list = Arrays.asList(1, 2, 3, 4, 5, 6);
		JavaRDD rdd = sc.parallelize(list);
		JavaRDD rddString = rdd.map(f->f.toString());
		// Use your RDD here
		sc.close();
	}
}

Como vemos crearemos el rddString a partir del rdd utilizando la función map.

En próximos posts veremos ejemplos cada vez más complejos, síguenos en nuestras redes sociales para enterarte sobre nuevo contenido https://www.facebook.com/devs4j/ y https://twitter.com/devs4j.

Puedes encontrar el código completo en el siguiente link https://github.com/raidentrance/spark-tutorial.

Autor: Alejandro Agapito Bautista

Twitter: @raidentrance

Contacto:raidentrance@gmail.com

 

Apache Spark 2 : RDD(Resilient distributed dataset)


portada spark rdd

Todo Apache Spark fue construido alrededor de RDD(Resilient distributed dataset)  y es el objeto principal cuando construimos aplicaciones con Spark. Pueden contener cualquier tipo de objetos incluyendo clases definidas por el desarrollador, en Spark todo el trabajo se expresa en crear, transformar o llamar operaciones que darán como resultado RDD’s.

Spark distribuirá los datos contenidos en los RDDs a lo largo del cluster y paralelizará las operaciones que se realicen sobre ellos de forma automática y sin que siquiera lo notes.

Dataset

Un dataset es una colección de datos, puede ser una lista de Strings, Integers o incluso registros de una base de datos.

¿Qué se puede hacer con un dataset?

Es posible realizar 2 tipos de operaciones con los RDDs :

  • Transformaciones: Se utiliza para aplicar funciones a los datos que darán como resultado un nuevo RDD, algunas de las transformaciones más comunes son filter, map, reduceByKey, join, cogroup, randomSplit, etc.
  • Acciones: Devuelven un resultado basado en un RDD, por ejemplo una aplicación contiene un dataset de tweets y aplica la transformación filter para obtener solo los tweets que contengan un hashtag determinado, a través de una acción podremos tomar ese RDD y transformarlo en otro tipo de dato para utilizarlo en nuestra aplicación, algunas de las acciones más comunes son firs, aggregate, collect, count, max, min, reduce, etc.

Conclusión

En conclusión, cuando trabajamos con Apache Spark nuestras aplicaciones seguirán un flujo como el siguiente:

  • Generarán RDDs con información obtenida de fuentes externas (archivos, bases de datos, brokers,etc.)
  • Aplicarán transformaciones
  • Se ejecutarán acciones para producir resultados.

En los siguientes posts veremos ejemplos de transformaciones y acciones aplicadas en ejemplos reales.

Si quieres seguir aprendiendo Apache Spark, en futuros posts veremos ejemplos cada vez más complejos, síguenos en nuestras redes sociales para enterarte sobre nuevo contenido https://www.facebook.com/devs4j/ y https://twitter.com/devs4j.

Autor: Alejandro Agapito Bautista

Twitter: @raidentrance

Contacto:raidentrance@gmail.com

Apache Spark 2 : Conceptos básicos


portada_spark

Apache Spark es un cluster dedicado al procesamiento de información de forma muy rápida, provee soporte para el desarrollo de aplicaciones con Java, Scala, Python y R. Su engine cuenta con soporte para SQL, Machine Learning, Streaming, GraphX, etc.

Características

Cuando se procesan grandes datasets uno de los aspectos más importantes es la velocidad,  por esto Spark cuenta con un motor llamado DAG que permite ejecutar tareas incluso 100 veces más rápido que Hadoop.

Es posible escribir aplicaciones y comunicarlas en diferentes lenguajes como Java, Scala, Python, R y SQL.

Provee soporte para diferentes tipos de problemas, por esto es posible combinarlos para realizar soluciones más complejas.

Captura de pantalla 2018-08-31 a las 1.39.37 p.m.

Puedes ejecutarlo donde sea Apache Mesos, Kubernetes, standalone o incluso en la nube, además de que puede acceder a diferentes fuentes de datos.

Todo lo anterior lo convierte a una plataforma que permite resolver diferentes tipos de problemas, de diferentes fuentes de información y utilizando diferentes lenguajes de programación. Es importante entender que puedes usar uno o varios módulos de Spark para construir tu aplicación.

Requerimientos para nuestros ejemplos

En esta serie de posts utilizaremos el lenguaje Java para nuestros ejemplos, para esto es necesario tener instalado Java 8, Maven, Git y el IDE de tu preferencia en tu computadora.

Primer ejemplo con Java

Cuando trabajas con streaming y procesamiento en tiempo real el Hola mundo se convierte en el WordCount, un programa para contar palabras, veamos como hacerlo con Apache Spark y java.

Paso 1 Configurando nuestro projecto

Es posible iniciar un cluster standalone de spark solo definiendo la dependencia de maven de spark-core_2.11 y la versión de java a utilizar como se muestra a continuación:

Captura de pantalla 2018-08-31 a las 2.11.14 p.m.

Paso 2: Creando nuestro archivo de prueba

Para este ejemplo leeremos un texto de un archivo, en este caso lo crearemos en el folder src/main/resources y lo nombraremos words.txt en nuestro caso colocaremos información sobre Xochimilco.

Paso 3: Escribiendo la lógica de nuestra aplicación

El siguiente paso será escribir el código que realizará el conteo de palabras:


import java.util.Arrays;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

/**
 * @author raidentrance
 *
 */
public class WordCount {
	public static void main(String[] args) {
		JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("WordCount").setMaster("local[3]"));
		JavaRDD lines = sc.textFile("src/main/resources/words.txt");
		JavaRDD words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
		Map wordCounts = words.countByValue();

		for (Entry wordCount : wordCounts.entrySet()) {
			System.out.printf("%s - %d \n", wordCount.getKey(), wordCount.getValue());
		}
		sc.close();
	}
}

Como vemos leeremos el archivo y realizaremos el conteo de palabras utilizando Apache spark e imprimiremos el resultado, en el siguiente post explicaremos en detalle el significado de cada línea.

Paso 4 : Ejecutando la aplciación

Para ejecutar la aplicación no es necesario descargar nada, solo ejecutaremos nuestra clase Main y veremos la salida como la siguiente en la consola:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/08/31 14:06:40 INFO SparkContext: Running Spark version 2.0.0
18/08/31 14:06:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/08/31 14:06:40 INFO SecurityManager: Changing view acls to: maagapi
18/08/31 14:06:40 INFO SecurityManager: Changing modify acls to: maagapi
18/08/31 14:06:40 INFO SecurityManager: Changing view acls groups to:
18/08/31 14:06:40 INFO SecurityManager: Changing modify acls groups to:
18/08/31 14:06:40 INFO SecurityManager: SecurityManager: authentication disabled; ui acls dis
.......
estado - 1
'flor', - 1
deterioro - 1
obras - 3
Noria" - 1
Xochimilco, - 1
Xochimilco - 10
quien - 1
pueblo - 1
cual - 1
......
18/08/31 14:06:42 INFO SparkUI: Stopped Spark web UI at http://192.168.14.141:4040
18/08/31 14:06:42 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/08/31 14:06:42 INFO MemoryStore: MemoryStore cleared
18/08/31 14:06:42 INFO BlockManager: BlockManager stopped
18/08/31 14:06:42 INFO BlockManagerMaster: BlockManagerMaster stopped
18/08/31 14:06:42 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/08/31 14:06:42 INFO SparkContext: Successfully stopped SparkContext
18/08/31 14:06:42 INFO ShutdownHookManager: Shutdown hook called
18/08/31 14:06:42 INFO ShutdownHookManager: Deleting directory /private/var/folders/w6/3vf48tdj3fq0lchyt9d1yxyxhrgdrj/T/spark-cdf54202-63a3-4e4b-bb66-2dc386cd245d

Con esto tenemos nuestra primera aplicación con Apache Spark, en futuros posts veremos ejemplos cada vez más complejos, síguenos en nuestras redes sociales para enterarte sobre nuevo contenido https://www.facebook.com/devs4j/ y https://twitter.com/devs4j.

Puedes encontrar el código completo en el siguiente link https://github.com/raidentrance/spark-tutorial.

Autor: Alejandro Agapito Bautista

Twitter: @raidentrance

Contacto:raidentrance@gmail.com