1一、Spark SQL下的Parquet使用最佳实践1)过去整个业界对大数据的分析的技术栈的Pipeline一般分为以下两种方式:a)Data Source -> HDFS -> MR/Hive/Spark(相当于ETL)-> HDFS Parquet -> Spark SQL/Impala -> ResultService(可以放在DB中,也有可能被通过JDBC/ODBC来作为数据服务使用);b)Data Source -> Real timeupdate data to HBase/DB -> Export to Parquet -> Spark SQL/Impala -> ResultService(可以放在DB中,也有可能被通过JDBC/ODBC来作为数据服务使用);上述的第二种方式完全可以通过Kafka+Spark Streaming+Spark SQL(内部也强烈建议采用Parquet的方式来存储数据)的方式取代2)期待的方式:DataSource -> Kafka -> Spark Streaming -> Parquet -> Spark SQL(ML、GraphX等)-> Parquet -> 其它各种Data Mining等。
3三、代码实战Java版本:package com.dt.spark.SparkApps.sql;import java.util.List;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.sql.DataFrame;import org.apache.spark.sql.Row;import org.apache.spark.sql.SQLContext;public class SparkSQLParquetOps { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster('local').setAppName('SparkSQLParquetOps'); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); DataFrame usersDF = sqlContext.read().parquet('E:\\Spark\\Sparkinstanll_package\\Big_Data_Software\\spark-1.6.0-bin-hadoop2.6\\examples\\src\\main\\resources\\users.parquet'); /** * 注册成为临时表以供后续的SQL查询操作 */ usersDF.registerTempTable('users'); /** * 进行数据的多维度分析 */ DataFrame result = sqlContext.sql('select * from users'); JavaRDD resultRDD = result.javaRDD().map(new Function() { @Override public String call(Row row) throws Exception { return 'The name is : ' + row.getAs('name'); } }); /** * 第六步:对结果进行处理,包括由DataFrame转换成为RDD,以及结构持久化 */ List listRow = resultRDD.collect(); for(String row : listRow){ System.out.println(row); } }}Schema MergingJava版本:package com.dt.spark.SparkApps.sql;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.sql.DataFrame;import org.apache.spark.sql.Row;import org.apache.spark.sql.RowFactory;import org.apache.spark.sql.SQLContext;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import scala.Tuple2;import java.util.ArrayList;import java.util.Arrays;import java.util.List;public class SchemaOps { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster('local').setAppName('RDD2DataFrameByProgramatically'); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); // Create a simple DataFrame, stored into a partition directory JavaRDD lines = sc.parallelize(Arrays.asList(1,2,3,4,5)); PairFunction df2 = new PairFunction() { @Override public Tuple2 call(Integer x) throws Exception { return new Tuple2(x,x * 2); } }; JavaPairRDD pairs = lines.mapToPair(df2); /** * 第一步:在RDD的基础上创建类型为Row的RDD */ JavaRDD personsRDD = pairs.map(new Function, Row>() { @Override public Row call(Tuple2 integerIntegerTuple2) throws Exception { return RowFactory.create(integerIntegerTuple2._1,integerIntegerTuple2._2); } }); /** * 第二步:动态构造DataFrame的元数据,一般而言,有多少列,以及每列的具体类型可能来自于JSON文件 * 也可能来自于数据库。 * 指定类型 */ List structFields = new ArrayList(); structFields.add(DataTypes.createStructField('single',DataTypes.IntegerType,true)); structFields.add(DataTypes.createStructField('double',DataTypes.IntegerType,true)); /** * 构建StructType用于最后DataFrame元数据的描述 */ StructType structType = DataTypes.createStructType(structFields); /** * 第三步:基于以后的MetaData以及RDD来构建DataFrame */ DataFrame personsDF = sqlContext.createDataFrame(personsRDD,structType); personsDF.write().parquet('data/test_table/key=1'); // Create a simple DataFrame, stored into a partition directory JavaRDD lines1 = sc.parallelize(Arrays.asList(6,7,8,9,10)); PairFunction df3 = new PairFunction() { @Override public Tuple2 call(Integer x) throws Exception { return new Tuple2(x,x * 2); } }; JavaPairRDD pairs1 = lines.mapToPair(df2); /** * 第一步:在RDD的基础上创建类型为Row的RDD */ JavaRDD personsRDD1 = pairs1.map(new Function, Row>() { @Override public Row call(Tuple2 integerIntegerTuple2) throws Exception { return RowFactory.create(integerIntegerTuple2._1,integerIntegerTuple2._2); } }); /** * 第二步:动态构造DataFrame的元数据,一般而言,有多少列,以及每列的具体类型可能来自于JSON文件 * 也可能来自于数据库。 * 指定类型 */ List structFields1 = new ArrayList(); structFields.add(DataTypes.createStructField('single',DataTypes.IntegerType,true)); structFields.add(DataTypes.createStructField('triple',DataTypes.IntegerType,true)); /** * 构建StructType用于最后DataFrame元数据的描述 */ StructType structType1 = DataTypes.createStructType(structFields); /** * 第三步:基于以后的MetaData以及RDD来构建DataFrame */ DataFrame personsDF1 = sqlContext.createDataFrame(personsRDD1,structType1); personsDF1.write().parquet('data/test_table/key=2'); DataFrame df4 = sqlContext.read().option('mergeSchema','true').parquet('data/test_table'); df4.printSchema(); }}输出结果如下:root|--single: integer (nullable = true)|--double: integer (nullable = true)|--single2: integer (nullable = true)|--triple: integer (nullable = true)|--key: integer (nullable = true)Scala版本:// sqlContext from the previous example is used in this example.// This is used to implicitly convert an RDD to a DataFrame.import sqlContext.implicits._// Create a simple DataFrame, stored into a partition directoryval df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF('single', 'double')df1.write.parquet('data/test_table/key=1')// Create another DataFrame in a new partition directory,// adding a new column and dropping an existing columnval df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF('single', 'triple')df2.write.parquet('data/test_table/key=2')// Read the partitioned tableval df3 = sqlContext.read.option('mergeSchema', 'true').parquet('data/test_table')df3.printSchema()// The final schema consists of all 3 columns in the Parquet files together// with the partitioning column appeared in the partition directory paths.// root// |-- single: int (nullable = true)// |-- double: int (nullable = true)// |-- triple: int (nullable = true)// |-- key : int (nullable = true)