Spark SQL——Data Sources(数据源)

2019-02-08

1 通用 加载/保存 功能

  • spark.read().load(filepath)
  • df.write().save(filename)
Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");

usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");

2 手动指定选项

  • 可以使用format(),通过指定文件格式完成对应文件格式的文件的读取和保存。
  • 主要的文件格式包含:json, parquet, jdbc, orc, libsvm, csv, text
Dataset<Row> peopleDF = spark.read().format("json").load("examples/src/main/resources/people.json");

peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");

3 直接运行SQL

可以直接用 SQL 将查询结果加载到 DataFrame,该SQL也可以直接对文件操作。

Dataset<Row> sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");

4 Save Modes(保存模式)

Save operations (保存操作)可以选择使用 SaveMode , 它指定如何处理现有数据如果存在的话. 重要的是要意识到, 这些 save modes (保存模式)不使用任何 locking (锁定)并且不是 atomic (原子). 另外, 当执行 Overwrite 时, 数据将在新数据写出之前被删除.

savemode

5 保存到持久表

DataFrames 也可以使用saveAsTable()将dataframe持久化保存到Hive中. 作为内部表保存。

5.1 分桶, 排序和分区

peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");

usersDF
  .write()
  .partitionBy("favorite_color")
  .format("parquet")
  .save("namesPartByColor.parquet");
  
peopleDF
  .write()
  .partitionBy("favorite_color")
  .bucketBy(42, "name")
  .saveAsTable("people_partitioned_bucketed");

参考文献

Data Sources (数据源)