bigdata/chapter3/spark.py

182 lines
6.1 KiB
Python
Raw Normal View History

2019-01-17 20:15:22 +08:00
from pyspark.sql import SparkSession
from pyspark.sql import Row
# $example off:spark_hive$
import os
import pymysql
2019-01-18 11:31:19 +08:00
import datetime
2019-01-17 23:15:11 +08:00
import time
2019-01-18 15:16:16 +08:00
import json
2019-01-21 16:28:02 +08:00
from snownlp import SnowNLP
2019-01-17 20:15:22 +08:00
def mysql_query(sql):
db = pymysql.connect("localhost","root","123456789","sparkproject" )
cursor = db.cursor()
cursor.execute(sql)
data = cursor.fetchone()
db.close()
return data
def mysql_execute(sql):
2019-01-18 15:20:35 +08:00
print("execute: %s" % sql)
2019-01-17 20:15:22 +08:00
db = pymysql.connect("localhost","root","123456789","sparkproject" )
cursor = db.cursor()
try:
cursor.execute(sql)
db.commit()
except Exception as e:
print(e)
db.rollback()
finally:
db.close()
2019-01-17 23:13:38 +08:00
def today():
return time.strftime('%Y-%m-%d')
2019-01-18 11:31:19 +08:00
def getYesterday(day):
today=datetime.date.today()
oneday=datetime.timedelta(days=day)
yesterday=today-oneday
return yesterday.strftime('%Y-%m-%d')
2019-01-18 17:28:06 +08:00
def get_last_month(num):
date = datetime.datetime.now()
year = date.year
month = date.month
month = month - num
if month<=0:
month = 12 - (num-1)
year -= 1
2019-01-18 17:29:22 +08:00
return "%d-%02d" % (year, month)
2019-01-18 17:28:06 +08:00
def collect_last_month_sells(spark):
data = {}
for i in range(5):
month = get_last_month(i)
df = spark.sql("select count(*) as N from jd_comment where comment_time like '"+month+"%' ")
jd_comment_count = df.rdd.collect()[0]["N"]
data[month] = jd_comment_count
mysql_execute("update datas set data = '{}' where `key` = 'last_month_sell'".format( json.dumps(data,ensure_ascii=False) ))
2019-01-18 18:51:51 +08:00
def collect_hour_sells(spark):
data = {}
for i in range(24):
hour = "%02d" % (i)
df = spark.sql("select count(*) as N from jd_comment where comment_time like '% "+hour+":%'")
jd_comment_count = df.rdd.collect()[0]["N"]
data[hour] = jd_comment_count
mysql_execute("update datas set data = '{}' where `key` = 'hour_sell'".format( json.dumps(data,ensure_ascii=False) ))
2019-01-18 17:28:06 +08:00
2019-01-17 20:15:22 +08:00
2019-01-18 10:23:18 +08:00
def collect_crawl_info(spark):
df = spark.sql("select count(*) as N from jd_comment")
jd_comment_count = df.rdd.collect()[0]["N"]
df = spark.sql("select count(*) as N from jd_comment where created_at like '"+today()+"%'")
jd_comment_today_count = df.rdd.collect()[0]["N"]
df = spark.sql("select count(*) as N from jd")
jd_count = df.rdd.collect()[0]["N"]
df = spark.sql("select count(*) as N from jd where created_at like '"+today()+"%'")
jd_today_count = df.rdd.collect()[0]["N"]
total_count = jd_comment_count + jd_count
today_total_count = jd_comment_today_count + jd_today_count
mysql_execute("insert into crawl_infos (total_count, today_total_count, product_count, today_product_count, comment_count, today_comment_count) values ({},{},{},{},{},{})".format(
total_count, today_total_count, jd_count,jd_today_count, jd_comment_count, jd_comment_today_count) )
def collect_news(spark):
2019-01-21 10:17:09 +08:00
"""获取最新的20条采集"""
2019-01-18 10:23:18 +08:00
df = spark.sql("select * from jd_comment order by created_at desc limit 20")
2019-01-18 19:34:11 +08:00
2019-01-18 10:23:18 +08:00
for row in df.rdd.collect():
2019-01-18 10:24:34 +08:00
mysql_execute("insert into news (comment_time, content, comment_id) values ('{}', '{}', '{}')".format(
2019-01-18 10:23:18 +08:00
row["comment_time"], row["content"], row["id"]))
2019-01-18 19:34:44 +08:00
mysql_execute("delete from news where id not in ( select x.id from (select id from news order by id desc limit 20) as x)")
2019-01-18 19:34:11 +08:00
2019-01-18 11:31:19 +08:00
def get_last_day_count(spark):
"""获取过去几天的采集量"""
for i in range(5):
2019-01-18 14:08:01 +08:00
df = spark.sql("select count(*) as N from jd where created_at like '"+getYesterday(i)+"%'")
2019-01-18 11:31:19 +08:00
jd_last_count = df.rdd.collect()[0]["N"]
2019-01-18 14:08:01 +08:00
df = spark.sql("select count(*) as N from jd_comment where created_at like '"+getYesterday(i)+"%'")
2019-01-18 11:31:19 +08:00
jd_comment_last_count = df.rdd.collect()[0]["N"]
2019-01-18 14:03:00 +08:00
mysql_execute("update last_day_counts set product_c = {}, comment_c = {} where last_day = {}".format(
jd_last_count, jd_comment_last_count, i+1))
2019-01-18 11:31:19 +08:00
2019-01-18 15:16:16 +08:00
def collect_top10_sells(spark):
df = spark.sql("select * from jd order by good_count desc limit 10")
i = 1
for row in df.rdd.collect():
2019-01-18 16:20:18 +08:00
mysql_execute("update top10_sells set product_name = '{}', good_c = {}, price={} where order_n = {} ".format(
2019-01-18 15:18:10 +08:00
row["name"], row["good_count"], int(float(row["price"])),i))
2019-01-18 15:16:16 +08:00
i += 1
def collect_from_type(spark):
df = spark.sql("select from_type, count(*) N from jd_comment group by from_type")
data = {}
for row in df.rdd.collect():
if row["from_type"]:
data[row["from_type"]] = row["N"]
2019-01-18 15:37:11 +08:00
mysql_execute("update datas set data = '{}' where `key` = 'from_type'".format( json.dumps(data,ensure_ascii=False) ))
2019-01-18 15:16:16 +08:00
2019-01-18 10:23:18 +08:00
2019-01-21 16:28:02 +08:00
def _get_sentiments(df):
n = 0
for row in df.rdd.collect():
s = SnowNLP(row.content)
n += s.sentiments
return n/df.count()
def collect_sentiments(spark):
"""情感分析"""
#小米8
df1 = spark.sql("select * from jd_comment where id like '7437788%'")
#荣耀10
df2 = spark.sql("select * from jd_comment where id like '7081550%'")
2019-01-21 16:48:49 +08:00
xiaomi_s = _get_sentiments(df1)
huawei_s = _get_sentiments(df2)
2019-01-21 16:28:02 +08:00
data = {"xiaomi_s": xiaomi_s, "huawei_s": huawei_s}
mysql_execute("update datas set data = '{}' where `key` = 'sentiments'".format(
json.dumps(data,ensure_ascii=False) ))
2019-01-17 20:15:22 +08:00
if __name__ == "__main__":
# $example on:spark_hive$
# warehouse_location points to the default location for managed databases and tables
warehouse_location = os.path.abspath('spark-warehouse')
spark = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.enableHiveSupport() \
.getOrCreate()
2019-01-18 14:09:33 +08:00
count = 0
2019-01-17 23:16:48 +08:00
while True:
2019-01-18 10:23:18 +08:00
collect_crawl_info(spark)
collect_news(spark)
2019-01-17 23:16:48 +08:00
2019-01-18 14:10:01 +08:00
if count == 0 or count >100:
2019-01-18 14:09:33 +08:00
get_last_day_count(spark)
2019-01-18 15:16:16 +08:00
collect_top10_sells(spark)
collect_from_type(spark)
2019-01-18 17:28:06 +08:00
collect_last_month_sells(spark)
2019-01-18 18:51:51 +08:00
collect_hour_sells(spark)
2019-01-21 16:28:02 +08:00
collect_sentiments(spark)
2019-01-18 14:09:33 +08:00
count = 1
2019-01-18 14:03:00 +08:00
2019-01-17 23:16:48 +08:00
time.sleep(10)
2019-01-18 14:09:33 +08:00
count += 1
2019-01-17 23:16:48 +08:00
2019-01-17 20:15:22 +08:00
spark.stop()