Apache Spark 2 : Transformaciones (Map y filter )


filterandmap_sparkRecordemos que las transformaciones son operaciones sobre RDDs que darán como resultado un nuevo RDD, las transformaciones más comunes son las siguientes:

  • Filter: Toma una función que devuelve un nuevo RDD formado por los elementos que pasen una función filtro. Esta función se puede utilizar para limpiar un RDD de entrada, veamos un ejemplo:
public class FilterNamesExample {
	public static void main(String[] args) {
		JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("FilterNamesExample").setMaster("local[3]"));
		List names = Arrays.asList("Alex", "Pedro", "", "Juan", "Pancho", "");
		JavaRDD namesRdd = sc.parallelize(names);
		System.out.printf("Names before filter %d \n", namesRdd.count());
		namesRdd = namesRdd.filter(f->!f.isEmpty());
		System.out.printf("Names after filter %d \n", namesRdd.count());
		sc.close();
	}
}

En el código anterior podemos ver que tenemos una entrada de nombres y algunos de ellos están vacíos, lo cual sería una entrada inválida para nuestro sistema, haciendo uso de la función filter crearemos un rdd nuevo conteniendo solo los valores que son válidos.

  • Map : Toma una función y la aplica a cada elemento del RDD, el resultado de la función será utilizado para crear el nuevo elemento en el nuevo RDD, es importante comprender que el tipo de dato del RDD resultado no debe ser necesariamente del mismo tipo que el RDD de entrada, veamos el siguiente ejemplo:
class Person {
	private String name;
	private String lastName;

	public Person(String name, String lastName) {
		this.name = name;
		this.lastName = lastName;
	}

	//Getters and setters
}

Como vemos tenemos una clase llamada Person que contiene un nombre y un apellido.

public class MapPersonExample {
	public static void main(String[] args) {
		JavaSparkContext sc = new JavaSparkContext(
				new SparkConf().setAppName("FilterNamesExample").setMaster("local[3]"));
		List names = Arrays.asList("Alex,Bautista", "Pedro,Lopez", "Arturo,Mendez", "Juan,Sánchez",
				"Pancho,Pantera", "Jon,Smith");
		JavaRDD peopleRdd = sc.parallelize(names);
		JavaRDD customPeople = peopleRdd.map(p -> new Person(p.split(",")[0], p.split(",")[1]));
		System.out.println(customPeople);
		sc.close();
	}
}

El código anterior leerá un RDD de tipo String en el que cada elemento contendrá un nombre y un apellido, después utilizará la función map para transformarlo de un RDD de String a un RDD de tipo Person.

Cada vez veremos ejemplos más complejos, síguenos en nuestras redes sociales para enterarte sobre nuevo contenido https://www.facebook.com/devs4j/ y https://twitter.com/devs4j.

Puedes encontrar el código completo en el siguiente link https://github.com/raidentrance/spark-tutorial.

Autor: Alejandro Agapito Bautista

Twitter: @raidentrance

Contacto:raidentrance@gmail.com

Apache Spark 2 : Creando un RDD con Java


portada-createjavardd

En el post anterior aprendimos los conceptos básicos de un RDDs, en este post los implementaremos con Java y analizaremos su funcionamiento. En Spark tenemos tres formas de crear RDDs :

  • Paralelizando una colección existente en tu programa
  • Creando un dataset de un almacenamiento externo
  •  Creando un RDD de un RDD existente

A continuación veremos como realizar con Java cada una de las formas mencionadas.

Paralelizando una colección existente en tu programa

La primer forma será paralelizando una colección existente en tu programa, veamos un ejemplo:

public class CreateRDDParallelizing {
	public static void main(String[] args) {
		JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("WordCount").setMaster("local[3]"));

		List list = Arrays.asList(1,2,3,4,5,6);
		JavaRDD rdd = sc.parallelize(list);
		//Nuestras transformaciones y acciones
		sc.close();
	}
}

Como se puede ver creamos un objeto de tipo List de java y con spark lo paralelizamos para crear un JavaRDD, recordemos que el objeto JavaSparkContext representa una conexión a un cluster de Spark (Hablaremos más de este contexto en futuros posts) y lo utilizaremos para crear RDDs.

Todos los elementos de la lista serán copiados a un dataset distribuido donde se podrán operar de forma paralela, esto lo hace muy práctico para realizar pruebas pero no para escenarios reales productivos ya que debe cargar en memoria de un nodo todos los datos antes de distribuirlos a lo largo de todo el cluster.

Creando un dataset de un almacenamiento externo

La siguiente forma es cargando un dataset de un almacenamiento externo, en este ejemplo utilizaremos un archivo de texto, veamos el código:

public class CreateRDDFromExternalStorage {
	public static void main(String[] args) {
		JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("WordCount").setMaster("local[3]"));
		JavaRDD rdd = sc.textFile("src/main/resources/words.txt");
		// Use your RDD here
		System.out.println(rdd);
		sc.close();
	}
}

Utilizaremos el método textFile(String file) para leer los datos del archivo y crear un rdd a partir de ellos. Normalmente se utilizarán almacenamientos externos como Amazon s3, HDFS, SFTP, etc. Recordemos que no estamos limitados a archivos sino que también podremos leer datos de JDBC, Cassandra, ElasticSearch, etc.

Creando un RDD de un RDD existente

La última forma será crear un RDD de un RDD existente, veamos el siguiente ejemplo:

public class CreateRDDFromExistingRDD {
	public static void main(String[] args) {
		JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("WordCount").setMaster("local[3]"));

		List list = Arrays.asList(1, 2, 3, 4, 5, 6);
		JavaRDD rdd = sc.parallelize(list);
		JavaRDD rddString = rdd.map(f->f.toString());
		// Use your RDD here
		sc.close();
	}
}

Como vemos crearemos el rddString a partir del rdd utilizando la función map.

En próximos posts veremos ejemplos cada vez más complejos, síguenos en nuestras redes sociales para enterarte sobre nuevo contenido https://www.facebook.com/devs4j/ y https://twitter.com/devs4j.

Puedes encontrar el código completo en el siguiente link https://github.com/raidentrance/spark-tutorial.

Autor: Alejandro Agapito Bautista

Twitter: @raidentrance

Contacto:raidentrance@gmail.com

 

Apache Spark 2 : RDD(Resilient distributed dataset)


portada spark rdd

Todo Apache Spark fue construido alrededor de RDD(Resilient distributed dataset)  y es el objeto principal cuando construimos aplicaciones con Spark. Pueden contener cualquier tipo de objetos incluyendo clases definidas por el desarrollador, en Spark todo el trabajo se expresa en crear, transformar o llamar operaciones que darán como resultado RDD’s.

Spark distribuirá los datos contenidos en los RDDs a lo largo del cluster y paralelizará las operaciones que se realicen sobre ellos de forma automática y sin que siquiera lo notes.

Dataset

Un dataset es una colección de datos, puede ser una lista de Strings, Integers o incluso registros de una base de datos.

¿Qué se puede hacer con un dataset?

Es posible realizar 2 tipos de operaciones con los RDDs :

  • Transformaciones: Se utiliza para aplicar funciones a los datos que darán como resultado un nuevo RDD, algunas de las transformaciones más comunes son filter, map, reduceByKey, join, cogroup, randomSplit, etc.
  • Acciones: Devuelven un resultado basado en un RDD, por ejemplo una aplicación contiene un dataset de tweets y aplica la transformación filter para obtener solo los tweets que contengan un hashtag determinado, a través de una acción podremos tomar ese RDD y transformarlo en otro tipo de dato para utilizarlo en nuestra aplicación, algunas de las acciones más comunes son firs, aggregate, collect, count, max, min, reduce, etc.

Conclusión

En conclusión, cuando trabajamos con Apache Spark nuestras aplicaciones seguirán un flujo como el siguiente:

  • Generarán RDDs con información obtenida de fuentes externas (archivos, bases de datos, brokers,etc.)
  • Aplicarán transformaciones
  • Se ejecutarán acciones para producir resultados.

En los siguientes posts veremos ejemplos de transformaciones y acciones aplicadas en ejemplos reales.

Si quieres seguir aprendiendo Apache Spark, en futuros posts veremos ejemplos cada vez más complejos, síguenos en nuestras redes sociales para enterarte sobre nuevo contenido https://www.facebook.com/devs4j/ y https://twitter.com/devs4j.

Autor: Alejandro Agapito Bautista

Twitter: @raidentrance

Contacto:raidentrance@gmail.com