158 lines
5.4 KiB
Python
158 lines
5.4 KiB
Python
from pyspark.sql import SparkSession
|
|
from pyspark.sql import Row
|
|
# $example off:spark_hive$
|
|
import os
|
|
import pymysql
|
|
import datetime
|
|
import time
|
|
import json
|
|
|
|
def mysql_query(sql):
|
|
db = pymysql.connect("localhost","root","123456789","sparkproject" )
|
|
cursor = db.cursor()
|
|
cursor.execute(sql)
|
|
data = cursor.fetchone()
|
|
db.close()
|
|
return data
|
|
|
|
def mysql_execute(sql):
|
|
print("execute: %s" % sql)
|
|
db = pymysql.connect("localhost","root","123456789","sparkproject" )
|
|
cursor = db.cursor()
|
|
|
|
try:
|
|
cursor.execute(sql)
|
|
db.commit()
|
|
except Exception as e:
|
|
print(e)
|
|
db.rollback()
|
|
finally:
|
|
db.close()
|
|
|
|
def today():
|
|
return time.strftime('%Y-%m-%d')
|
|
|
|
def getYesterday(day):
|
|
today=datetime.date.today()
|
|
oneday=datetime.timedelta(days=day)
|
|
yesterday=today-oneday
|
|
return yesterday.strftime('%Y-%m-%d')
|
|
|
|
def get_last_month(num):
|
|
date = datetime.datetime.now()
|
|
year = date.year
|
|
month = date.month
|
|
month = month - num
|
|
if month<=0:
|
|
month = 12 - (num-1)
|
|
year -= 1
|
|
return "%d-%02d" % (year, month)
|
|
|
|
def collect_last_month_sells(spark):
|
|
data = {}
|
|
for i in range(5):
|
|
month = get_last_month(i)
|
|
df = spark.sql("select count(*) as N from jd_comment where comment_time like '"+month+"%' ")
|
|
jd_comment_count = df.rdd.collect()[0]["N"]
|
|
data[month] = jd_comment_count
|
|
mysql_execute("update datas set data = '{}' where `key` = 'last_month_sell'".format( json.dumps(data,ensure_ascii=False) ))
|
|
|
|
def collect_hour_sells(spark):
|
|
data = {}
|
|
for i in range(24):
|
|
hour = "%02d" % (i)
|
|
df = spark.sql("select count(*) as N from jd_comment where comment_time like '% "+hour+":%'")
|
|
jd_comment_count = df.rdd.collect()[0]["N"]
|
|
data[hour] = jd_comment_count
|
|
|
|
mysql_execute("update datas set data = '{}' where `key` = 'hour_sell'".format( json.dumps(data,ensure_ascii=False) ))
|
|
|
|
|
|
def collect_crawl_info(spark):
|
|
df = spark.sql("select count(*) as N from jd_comment")
|
|
jd_comment_count = df.rdd.collect()[0]["N"]
|
|
|
|
df = spark.sql("select count(*) as N from jd_comment where created_at like '"+today()+"%'")
|
|
jd_comment_today_count = df.rdd.collect()[0]["N"]
|
|
|
|
df = spark.sql("select count(*) as N from jd")
|
|
jd_count = df.rdd.collect()[0]["N"]
|
|
|
|
df = spark.sql("select count(*) as N from jd where created_at like '"+today()+"%'")
|
|
jd_today_count = df.rdd.collect()[0]["N"]
|
|
|
|
total_count = jd_comment_count + jd_count
|
|
today_total_count = jd_comment_today_count + jd_today_count
|
|
|
|
mysql_execute("insert into crawl_infos (total_count, today_total_count, product_count, today_product_count, comment_count, today_comment_count) values ({},{},{},{},{},{})".format(
|
|
total_count, today_total_count, jd_count,jd_today_count, jd_comment_count, jd_comment_today_count) )
|
|
|
|
|
|
def collect_news(spark):
|
|
"""获取最新的20条采集"""
|
|
df = spark.sql("select * from jd_comment order by created_at desc limit 20")
|
|
|
|
for row in df.rdd.collect():
|
|
mysql_execute("insert into news (comment_time, content, comment_id) values ('{}', '{}', '{}')".format(
|
|
row["comment_time"], row["content"], row["id"]))
|
|
|
|
mysql_execute("delete from news where id not in ( select x.id from (select id from news order by id desc limit 20) as x)")
|
|
|
|
def get_last_day_count(spark):
|
|
"""获取过去几天的采集量"""
|
|
for i in range(5):
|
|
df = spark.sql("select count(*) as N from jd where created_at like '"+getYesterday(i)+"%'")
|
|
jd_last_count = df.rdd.collect()[0]["N"]
|
|
|
|
df = spark.sql("select count(*) as N from jd_comment where created_at like '"+getYesterday(i)+"%'")
|
|
jd_comment_last_count = df.rdd.collect()[0]["N"]
|
|
|
|
mysql_execute("update last_day_counts set product_c = {}, comment_c = {} where last_day = {}".format(
|
|
jd_last_count, jd_comment_last_count, i+1))
|
|
|
|
def collect_top10_sells(spark):
|
|
df = spark.sql("select * from jd order by good_count desc limit 10")
|
|
i = 1
|
|
for row in df.rdd.collect():
|
|
mysql_execute("update top10_sells set product_name = '{}', good_c = {}, price={} where order_n = {} ".format(
|
|
row["name"], row["good_count"], int(float(row["price"])),i))
|
|
i += 1
|
|
|
|
def collect_from_type(spark):
|
|
df = spark.sql("select from_type, count(*) N from jd_comment group by from_type")
|
|
data = {}
|
|
for row in df.rdd.collect():
|
|
if row["from_type"]:
|
|
data[row["from_type"]] = row["N"]
|
|
mysql_execute("update datas set data = '{}' where `key` = 'from_type'".format( json.dumps(data,ensure_ascii=False) ))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# $example on:spark_hive$
|
|
# warehouse_location points to the default location for managed databases and tables
|
|
warehouse_location = os.path.abspath('spark-warehouse')
|
|
|
|
spark = SparkSession \
|
|
.builder \
|
|
.appName("Python Spark SQL Hive integration example") \
|
|
.config("spark.sql.warehouse.dir", warehouse_location) \
|
|
.enableHiveSupport() \
|
|
.getOrCreate()
|
|
|
|
count = 0
|
|
while True:
|
|
collect_crawl_info(spark)
|
|
collect_news(spark)
|
|
|
|
if count == 0 or count >100:
|
|
get_last_day_count(spark)
|
|
collect_top10_sells(spark)
|
|
collect_from_type(spark)
|
|
collect_last_month_sells(spark)
|
|
collect_hour_sells(spark)
|
|
count = 1
|
|
|
|
time.sleep(10)
|
|
count += 1
|
|
|
|
spark.stop() |