准备工作:将mysql的jar包放在$SPARK_HOME/jars目录下

1.生成一个SparkSession()对象,并导入相关的库和接口

from pyspark.sql import SparkSession
from pyspark import SparkConf,SparkContext, SparkConf
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
from pyspark.sql import Row
from pyspark.sql.types import *

2.使用spark.read.format("jdbc")设置mysql的连接参数,并使用dataframe对象接收,再使用dataframe的show()方法查看结果:(*:表示部分为自己mysql的IP和密码)

jdbcDF = spark.read.format("jdbc")\
    .option("driver","com.mysql.jdbc.Driver")\
    .option("url","jdbc:mysql://***.***.**.***:3306/spark")\
    .option("dbtable","student")\
    .option("user","root")\
    .option("password","*******")\
    .load()
jdbcDF.show()

结果如下:

3.用列表 对象创建RDD,并设定模式信息

studentRDD = spark.sparkContext.parallelize([\
    "3 Xiaobao M 26",\
    "4 yixin M 27"]).map(lambda x:x.split(" "))
#转变为ROWRDD
rowRDD = studentRDD.map(lambda p:Row(int(p[0]),\
    p[1].strip(),p[2].strip(),int(p[3])))
#拼接ROW对象与模式schema
studentDF = spark.createDataFrame(rowRDD,schema)
#写入数据库
#studentDF.show()
prop = {}
prop['user'] = 'root'
prop['password'] = '*******'
prop['driver'] = 'com.mysql.jdbc.Driver'
studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark",'student','append',prop)

4.使用spark.stop()结束

5.重新执行以下部分,惊醒查看,并结束:

from pyspark.sql import SparkSession
from pyspark import SparkConf,SparkContext, SparkConf
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
from pyspark.sql import Row
from pyspark.sql.types import *
# %%
jdbcDFs = spark.read.format("jdbc")\
    .option("driver","com.mysql.jdbc.Driver")\
    .option("url","jdbc:mysql://***.***.**.***:3306/spark")\
    .option("dbtable","student")\
    .option("user","root")\
    .option("password","*******")\
    .load()
jdbcDFs.show()
spark.stop()

 结果如下:

注:进行写入操作时,使用的是localhost,的原因是:本人使用的是直接连接服务器进行的spark上SQL的操作,具体情况应视具体问题而定。

关键字:

StructType,StructField,StringType,IntegerType,.read.format("xxx"),map 

 

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐