在前面的演示案例里面,我们介绍了编写一个简单的spark sql,同时也介绍过读取csv或者json格式很方便,所以这篇文章我们介绍下spark sql读取csv和json格式的数据源。
一、读取json格式
在spark sql中读取json格式的文件非常容易,因为json自带有key键,因此直接读取整个json,进行反序列化即可,所以在编写应用程序的时候,我们只需要填写读取文件的路径即可,例如:
val dataframeRdd = session.read.json("hdfs://43.134.170.235:9000/usersource/user.txt")
二、读取csv格式
在spark sql读取csv格式的数据源的时候会稍微复杂一点,需要定义读取模式,schema等等,先来个案例:
//CSV 格式数据文本文件数据 -> 依据 CSV文件首行是否是列名称,决定读取数据方式不一样的 val dataframeRdd = session.read.format("csv") .option("mode","FAILFAST") //读取模式 .option("sep", "\\t") //每一行的分隔符 .option("header", "true") //文本里面是否包含header头 .option("inferSchema", "true") // 是否自动推断 schema .option("path","hdfs://43.134.170.235:9000/usersource/u.txt") // 文件路径 .load() dataframeRdd.show()
在读取csv格式的时候,我们需要定义很多的option参数,下面分别介绍下:
序号 | 参数 | 解释 |
1 | sep | 每一行的分隔符 |
2 | encoding | 默认是uft-8通过给定的编码类型进行解码 |
3 | quote | 默认是,其中分隔符可以是值的一部分,设置用于转义带引号的值的单个字符。如果您想关闭引号,则需要设置一个空字符串,而不是null。 |
4 | escape | 默认(\)设置单个字符用于在引号里面转义引号 |
5 | charToEscapeQuoteEscaping | 默认是转义字符(上面的escape)或者\0,当转义字符和引号(quote)字符不同的时候,默认是转义字符(escape),否则为0 |
6 | comment | 默认是空值,设置用于跳过行的单个字符,以该字符开头。默认情况下,它是禁用的 |
7 | header | 默认是false,将第一行作为列名 |
8 | enforceSchema | 默认是true, 如果将其设置为true,则指定或推断的模式将强制应用于数据源文件,而CSV文件中的标头将被忽略。 如果选项设置为false,则在header选项设置为true的情况下,将针对CSV文件中的所有标题验证模式。模式中的字段名称和CSV标头中的列名称是根据它们的位置检查的,并考虑了*spark.sql.caseSensitive。虽然默认值为true,但是建议禁用 enforceSchema选项,以避免产生错误的结果 |
9 | inferSchema | inferSchema(默认为false`):从数据自动推断输入模式。 需要对数据进行一次额外的传递 |
10 | samplingRatio | 默认为1.0,定义用于模式推断的行的分数 |
11 | ignoreLeadingWhiteSpace | 默认为false,一个标志,指示是否应跳过正在读取的值中的前导空格 |
12 | ignoreTrailingWhiteSpace | 默认为false一个标志,指示是否应跳过正在读取的值的结尾空格 |
13 | nullValue | 默认是空的字符串,设置null值的字符串表示形式。从2.0.1开始,这适用于所有支持的类型,包括字符串类型 |
14 | emptyValue | 默认是空字符串,设置一个空值的字符串表示形式 |
15 | nanValue | 默认是Nan,设置非数字的字符串表示形式 |
16 | positiveInf | 默认是Inf |
17 | negativeInf | 默认是-Inf 设置负无穷值的字符串表示形式 |
18 | dateFormat | 默认是yyyy-MM-dd,设置指示日期格式的字符串。自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于日期类型 |
19 | timestampFormat | 默认是yyyy-MM-dd'T'HH:mm:ss.SSSXXX,设置表示时间戳格式的字符串。自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于时间戳记类型 |
20 | maxColumns | 默认是20480定义多少列数目的硬性设置 |
21 | maxCharsPerColumn | 默认是-1定义读取的任何给定值允许的最大字符数。默认情况下为-1,表示长度不受限制 |
22 | mode | 默认(允许)允许一种在解析过程中处理损坏记录的模式。它支持以下不区分大小写的模式。请注意,Spark尝试在列修剪下仅解析CSV中必需的列。因此,损坏的记录可以根据所需的字段集而有所不同。可以通过spark.sql.csv.parser.columnPruning.enabled(默认启用)来控制此行为。 |
23 | columnNameOfCorruptRecord | 默认值指定在spark.sql.columnNameOfCorruptRecord,允许重命名由PERMISSIVE模式创建的格式错误的新字段。这会覆盖spark.sql.columnNameOfCorruptRecord |
24 | multiLine | 默认是false,解析一条记录,该记录可能跨越多行 |
备注:
读取模式,这里我们主要有3种模式,分别是:
PERMISSIVE
当它遇到损坏的记录时,将格式错误的字符串放入由“ columnNameOfCorruptRecord”配置的*字段中,并将其他字段设置为“ null”。为了保留损坏的记录,用户可以在用户定义的模式中设置一个名为columnNameOfCorruptRecord
DROPMALFORMED
忽略整个损坏的记录
FAILFAST
遇到损坏的记录时引发异常
三、写入json格式
在写入的时候比较简单,代码没太多的配置,写入json格式的文件,代码示例如下:
dataframeRdd.where("name == zhangsan").write .mode(SaveMode.Overwrite).json("hdfs://43.134.170.235:9000/usersource/u1.json")
四、写入csv格式
dataframeRdd.where("name == zhangsan").write .mode(SaveMode.Overwrite).csv("hdfs://43.134.170.235:9000/usersource/u1.csv")
备注:
1、在读取csv格式的时候添加的参数会比较多一点,我们根据实际情况和自身需求进行设置就可以了。
2、在写入的时候不管是哪种文件都提供有对应的API函数,也是直接拿过来开箱即用。
还没有评论,来说两句吧...