How to use JDBC source to write and read data in (Py)Spark?

Posted on

Question :

How to use JDBC source to write and read data in (Py)Spark?

The goal of this question is to document:

  • steps required to read and write data using JDBC connections in PySpark

  • possible issues with JDBC sources and know solutions

With small changes these methods should work with other supported languages including Scala and R.

Asked By: zero323

||

Answer #1:

Writing data

  1. Include applicable JDBC driver when you submit the application or start shell. You can use for example --packages:

    bin/pyspark --packages group:name:version  
    

    or combining driver-class-path and jars

    bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
    

    These properties can be also set using PYSPARK_SUBMIT_ARGS environment variable before JVM instance has been started or using conf/spark-defaults.conf to set spark.jars.packages or spark.jars / spark.driver.extraClassPath.

  2. Choose desired mode. Spark JDBC writer supports following modes:

    • append: Append contents of this :class:DataFrame to existing data.
    • overwrite: Overwrite existing data.
    • ignore: Silently ignore this operation if data already exists.
    • error (default case): Throw an exception if data already exists.

    Upserts or other fine-grained modifications are not supported

    mode = ...
    
  3. Prepare JDBC URI, for example:

    # You can encode credentials in URI or pass
    # separately using properties argument
    # of jdbc method or options
    
    url = "jdbc:postgresql://localhost/foobar"
    
  4. (Optional) Create a dictionary of JDBC arguments.

    properties = {
        "user": "foo",
        "password": "bar"
    }
    

    properties / options can be also used to set supported JDBC connection properties.

  5. Use DataFrame.write.jdbc

    df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
    

    to save the data (see pyspark.sql.DataFrameWriter for details).

Known issues:

  • Suitable driver cannot be found when driver has been included using --packages (java.sql.SQLException: No suitable driver found for jdbc: ...)

    Assuming there is no driver version mismatch to solve this you can add driver class to the properties. For example:

    properties = {
        ...
        "driver": "org.postgresql.Driver"
    }
    
  • using df.write.format("jdbc").options(...).save() may result in:

    java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not allow create table as select.

    Solution unknown.

  • in Pyspark 1.3 you can try calling Java method directly:

    df._jdf.insertIntoJDBC(url, "baz", True)
    

Reading data

  1. Follow steps 1-4 from Writing data
  2. Use sqlContext.read.jdbc:

    sqlContext.read.jdbc(url=url, table="baz", properties=properties)
    

    or sqlContext.read.format("jdbc"):

    (sqlContext.read.format("jdbc")
        .options(url=url, dbtable="baz", **properties)
        .load())
    

Known issues and gotchas:

Where to find suitable drivers:

Other options

Depending on the database specialized source might exist, and be preferred in some cases:

Answered By: zero323

Answer #2:

Download mysql-connector-java driver and keep in spark jar folder,observe the bellow python code here writing data into “acotr1”,we have to create acotr1 table structure in mysql database

    spark = SparkSession.builder.appName("prasadad").master('local').config('spark.driver.extraClassPath','D:spark-2.1.0-bin-hadoop2.7jarsmysql-connector-java-5.1.41-bin.jar').getOrCreate()

    sc = spark.sparkContext

    from pyspark.sql import SQLContext

    sqlContext = SQLContext(sc)

    df = sqlContext.read.format("jdbc").options(url="jdbc:mysql://localhost:3306/sakila",driver="com.mysql.jdbc.Driver",dbtable="actor",user="root",password="****").load()

    mysql_url="jdbc:mysql://localhost:3306/sakila?user=root&password=****"

    df.write.jdbc(mysql_url,table="actor1",mode="append")
Answered By: zero323

Answer #3:

Refer this link to download the jdbc for postgres and follow the steps to download jar file

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/exercises/spark-exercise-dataframe-jdbc-postgresql.html
jar file will be download in the path like this.
“/home/anand/.ivy2/jars/org.postgresql_postgresql-42.1.1.jar”

If your spark version is 2

from pyspark.sql import SparkSession

spark = SparkSession.builder
        .appName("sparkanalysis")
        .config("spark.driver.extraClassPath",
         "/home/anand/.ivy2/jars/org.postgresql_postgresql42.1.1.jar")
        .getOrCreate()

//for localhost database//

pgDF = spark.read 
.format("jdbc") 
.option("url", "jdbc:postgresql:postgres") 
.option("dbtable", "public.user_emp_tab") 
.option("user", "postgres") 
.option("password", "Jonsnow@100") 
.load()


print(pgDF)

pgDF.filter(pgDF["user_id"]>5).show()

save the file as python and run “python respectivefilename.py”

Answered By: y durga prasad

Leave a Reply

Your email address will not be published. Required fields are marked *