bigdata/chapter3/spark.py

43 lines
1.1 KiB
Python

from pyspark.sql import SparkSession
from pyspark.sql import Row
# $example off:spark_hive$
import os
import pymysql
def mysql_query(sql):
db = pymysql.connect("localhost","root","123456789","sparkproject" )
cursor = db.cursor()
cursor.execute(sql)
data = cursor.fetchone()
db.close()
return data
def mysql_execute(sql):
db = pymysql.connect("localhost","root","123456789","sparkproject" )
cursor = db.cursor()
try:
cursor.execute(sql)
db.commit()
except Exception as e:
print(e)
db.rollback()
finally:
db.close()
if __name__ == "__main__":
# $example on:spark_hive$
# warehouse_location points to the default location for managed databases and tables
warehouse_location = os.path.abspath('spark-warehouse')
spark = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.enableHiveSupport() \
.getOrCreate()
df = spark.sql("select * from jd limit 1")
spark.stop()