from pyspark.sql import SparkSession from pyspark.sql import Row # $example off:spark_hive$ import os import pymysql import datetime import time import json from snownlp import SnowNLP def mysql_query(sql): db = pymysql.connect("localhost","root","123456789","sparkproject" ) cursor = db.cursor() cursor.execute(sql) data = cursor.fetchone() db.close() return data def mysql_execute(sql): print("execute: %s" % sql) db = pymysql.connect("localhost","root","123456789","sparkproject" ) cursor = db.cursor() try: cursor.execute(sql) db.commit() except Exception as e: print(e) db.rollback() finally: db.close() def today(): return time.strftime('%Y-%m-%d') def getYesterday(day): today=datetime.date.today() oneday=datetime.timedelta(days=day) yesterday=today-oneday return yesterday.strftime('%Y-%m-%d') def get_last_month(num): date = datetime.datetime.now() year = date.year month = date.month month = month - num if month<=0: month = 12 - (num-1) year -= 1 return "%d-%02d" % (year, month) def collect_last_month_sells(spark): data = {} for i in range(5): month = get_last_month(i) df = spark.sql("select count(*) as N from jd_comment where comment_time like '"+month+"%' ") jd_comment_count = df.rdd.collect()[0]["N"] data[month] = jd_comment_count mysql_execute("update datas set data = '{}' where `key` = 'last_month_sell'".format( json.dumps(data,ensure_ascii=False) )) def collect_hour_sells(spark): data = {} for i in range(24): hour = "%02d" % (i) df = spark.sql("select count(*) as N from jd_comment where comment_time like '% "+hour+":%'") jd_comment_count = df.rdd.collect()[0]["N"] data[hour] = jd_comment_count mysql_execute("update datas set data = '{}' where `key` = 'hour_sell'".format( json.dumps(data,ensure_ascii=False) )) def collect_crawl_info(spark): df = spark.sql("select count(*) as N from jd_comment") jd_comment_count = df.rdd.collect()[0]["N"] df = spark.sql("select count(*) as N from jd_comment where created_at like '"+today()+"%'") jd_comment_today_count = df.rdd.collect()[0]["N"] df = spark.sql("select count(*) as N from jd") jd_count = df.rdd.collect()[0]["N"] df = spark.sql("select count(*) as N from jd where created_at like '"+today()+"%'") jd_today_count = df.rdd.collect()[0]["N"] total_count = jd_comment_count + jd_count today_total_count = jd_comment_today_count + jd_today_count mysql_execute("insert into crawl_infos (total_count, today_total_count, product_count, today_product_count, comment_count, today_comment_count) values ({},{},{},{},{},{})".format( total_count, today_total_count, jd_count,jd_today_count, jd_comment_count, jd_comment_today_count) ) def collect_news(spark): """获取最新的20条采集""" df = spark.sql("select * from jd_comment order by created_at desc limit 20") for row in df.rdd.collect(): mysql_execute("insert into news (comment_time, content, comment_id) values ('{}', '{}', '{}')".format( row["comment_time"], row["content"], row["id"])) mysql_execute("delete from news where id not in ( select x.id from (select id from news order by id desc limit 20) as x)") def get_last_day_count(spark): """获取过去几天的采集量""" for i in range(5): df = spark.sql("select count(*) as N from jd where created_at like '"+getYesterday(i)+"%'") jd_last_count = df.rdd.collect()[0]["N"] df = spark.sql("select count(*) as N from jd_comment where created_at like '"+getYesterday(i)+"%'") jd_comment_last_count = df.rdd.collect()[0]["N"] mysql_execute("update last_day_counts set product_c = {}, comment_c = {} where last_day = {}".format( jd_last_count, jd_comment_last_count, i+1)) def collect_top10_sells(spark): df = spark.sql("select * from jd order by good_count desc limit 10") i = 1 for row in df.rdd.collect(): mysql_execute("update top10_sells set product_name = '{}', good_c = {}, price={} where order_n = {} ".format( row["name"], row["good_count"], int(float(row["price"])),i)) i += 1 def collect_from_type(spark): df = spark.sql("select from_type, count(*) N from jd_comment group by from_type") data = {} for row in df.rdd.collect(): if row["from_type"]: data[row["from_type"]] = row["N"] mysql_execute("update datas set data = '{}' where `key` = 'from_type'".format( json.dumps(data,ensure_ascii=False) )) def _get_sentiments(df): n = 0 for row in df.rdd.collect(): s = SnowNLP(row.content) n += s.sentiments return n/df.count() def collect_sentiments(spark): """情感分析""" #小米8 df1 = spark.sql("select * from jd_comment where id like '7437788%'") #荣耀10 df2 = spark.sql("select * from jd_comment where id like '7081550%'") xiaomi_s = _get_sentiments(df1) huawei_s = _get_sentiments(df2) data = {"xiaomi_s": xiaomi_s, "huawei_s": huawei_s} mysql_execute("update datas set data = '{}' where `key` = 'sentiments'".format( json.dumps(data,ensure_ascii=False) )) if __name__ == "__main__": # $example on:spark_hive$ # warehouse_location points to the default location for managed databases and tables warehouse_location = os.path.abspath('spark-warehouse') spark = SparkSession \ .builder \ .appName("Python Spark SQL Hive integration example") \ .config("spark.sql.warehouse.dir", warehouse_location) \ .enableHiveSupport() \ .getOrCreate() count = 0 while True: collect_crawl_info(spark) collect_news(spark) if count == 0 or count >100: get_last_day_count(spark) collect_top10_sells(spark) collect_from_type(spark) collect_last_month_sells(spark) collect_hour_sells(spark) collect_sentiments(spark) count = 1 time.sleep(10) count += 1 spark.stop()