一个dataFrame表示是一个二维的表, 一个二维表, 必然存在 行 列 表结构描述信息
表结构描述信息(元数据): StructType
字段: StructField
定义: 字段的名称, 字段的类型, 字段是否可以为Null
认为: 在一个StructType对象下, 由多个StructField组成的, 构建了一个完整的元数据信息
行: Row对象
列: Column对象
注意: dataFrame本质上就是一个RDD, 只是对RDD进行包装, 在其基础上添加schema元数据信息,从而处理结构化数据
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import os# 锁定远端环境, 确保环境统一
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("如何获取DF对象方式一: 通过RDD得到DF对象")# 1. 创建SparkSession对象spark = SparkSession.builder.master('local[*]').appName('get_df_01').getOrCreate()sc = spark.sparkContext# 2- 构建RDD的数据集rdd = sc.parallelize(['张三 20', '李四 18', '王五 23'])# 3. 对数据进行处理操作# [('张三', '20'), ('李四', '18'), ('王五', '23')]rdd_map = rdd.map(lambda name_age:(name_age.split()[0],int(name_age.split()[1])))# 4. 将RDD转换为DF# 4.1 方案一:# schema元数据定义方式一:schema = StructType()\.add('name', StringType())\.add('age', IntegerType())# schema元数据定义方式二:schema = StructType(fields=[StructField('name',StringType(),True),StructField('age',IntegerType(),False)])df = spark.createDataFrame(data=rdd_map,schema=schema)df.printSchema()df.show()df = spark.createDataFrame(data=rdd_map,schema='name string,age integer')df.printSchema()df.show()df = spark.createDataFrame(data=rdd_map, schema=['name','age']) # 自动类型推断, 但是一般推断类型比较大df.printSchema()df.show()# 4.2 方案二: df = rdd_map.toDF(schema=schema)df.printSchema()df.show()df = rdd_map.toDF(schema='name string,age integer')df.printSchema()df.show()df = rdd_map.toDF(schema=['name','age'])df.printSchema()df.show()
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import os# 锁定远端环境, 确保环境统一
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("如何基于pandas的DF转换为Spark SQL的DF对象")# 1- 创建SparkSession对象spark = SparkSession.builder.appName('pd_df_spark_df').master('local[*]').getOrCreate()# 2- 基于pandas 构建一个DF对象pd_df = pd.DataFrame({'id':[1,2,3,4],'name': ['张三','李四','王五','赵六'],'address':['北京','上海','深圳','广州']})# 3- 将pandas df 转换为 spark df# 字段名可以自动识别到pandas的字段类型, 对于数据类型, 当不设置schema的时候, 会进行自动推断spark_df = spark.createDataFrame(pd_df,schema='id int,name string,address string')spark_df.printSchema()spark_df.show()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import os# 锁定远端环境, 确保环境统一
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("方式三: 通过内部初始化, 直接创建DF对象")# 1. 创建SparkSession对象spark = SparkSession.builder.master('local[*]').appName('create_df').getOrCreate()# 2- 创建DFdf = spark.createDataFrame(data=[(1,'张三','北京'),(2,'李四','上海'),(3,'王五','广州'),(4,'赵六','深圳')],schema='id int,name string,address string')df.printSchema()df.show()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import os# 锁定远端环境, 确保环境统一
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("演示: 读取外部文件--->text")# 1- 创建SparkSession对象spark = SparkSession.builder.appName('read_text').master('local[*]').getOrCreate()# 2- 读取外部文件数据集# 当采用Text读取方式来读取外部文件的数据, 仅会产生一列数据, 一行数据为一列, 默认的列名为value 支持修改列名, 数据类型必须为stringdf = spark.read\.format('text')\.schema('dept string')\.load(path='file:///export/data/workspace/ky06_pyspark/_03_SparkSql/data/dept.txt')# 3- 打印 查看结果df.printSchema()df.show()
下一篇:hibernate学习(六)