bigdata/chapter3/spark.py

79 lines
2.4 KiB
Python

from pyspark.sql import SparkSession
from pyspark.sql import Row
# $example off:spark_hive$
import os
import pymysql
import time
def mysql_query(sql):
db = pymysql.connect("localhost","root","123456789","sparkproject" )
cursor = db.cursor()
cursor.execute(sql)
data = cursor.fetchone()
db.close()
return data
def mysql_execute(sql):
db = pymysql.connect("localhost","root","123456789","sparkproject" )
cursor = db.cursor()
try:
cursor.execute(sql)
db.commit()
except Exception as e:
print(e)
db.rollback()
finally:
db.close()
def today():
return time.strftime('%Y-%m-%d')
def collect_crawl_info(spark):
df = spark.sql("select count(*) as N from jd_comment")
jd_comment_count = df.rdd.collect()[0]["N"]
df = spark.sql("select count(*) as N from jd_comment where created_at like '"+today()+"%'")
jd_comment_today_count = df.rdd.collect()[0]["N"]
df = spark.sql("select count(*) as N from jd")
jd_count = df.rdd.collect()[0]["N"]
df = spark.sql("select count(*) as N from jd where created_at like '"+today()+"%'")
jd_today_count = df.rdd.collect()[0]["N"]
total_count = jd_comment_count + jd_count
today_total_count = jd_comment_today_count + jd_today_count
mysql_execute("insert into crawl_infos (total_count, today_total_count, product_count, today_product_count, comment_count, today_comment_count) values ({},{},{},{},{},{})".format(
total_count, today_total_count, jd_count,jd_today_count, jd_comment_count, jd_comment_today_count) )
def collect_news(spark):
df = spark.sql("select * from jd_comment order by created_at desc limit 20")
mysql_execute("delete from news")
for row in df.rdd.collect():
mysql_execute("insert into news (comment_time, content, comment_id) values ('{}', '{}', '{}')".format(
row["comment_time"], row["content"], row["id"]))
if __name__ == "__main__":
# $example on:spark_hive$
# warehouse_location points to the default location for managed databases and tables
warehouse_location = os.path.abspath('spark-warehouse')
spark = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.enableHiveSupport() \
.getOrCreate()
while True:
collect_crawl_info(spark)
collect_news(spark)
time.sleep(10)
spark.stop()