diff --git a/chapter3/spark.py b/chapter3/spark.py index 4ffc4de..750e795 100644 --- a/chapter3/spark.py +++ b/chapter3/spark.py @@ -6,6 +6,7 @@ import pymysql import datetime import time import json +from snownlp import SnowNLP def mysql_query(sql): db = pymysql.connect("localhost","root","123456789","sparkproject" ) @@ -127,6 +128,28 @@ def collect_from_type(spark): mysql_execute("update datas set data = '{}' where `key` = 'from_type'".format( json.dumps(data,ensure_ascii=False) )) +def _get_sentiments(df): + n = 0 + for row in df.rdd.collect(): + s = SnowNLP(row.content) + n += s.sentiments + return n/df.count() + +def collect_sentiments(spark): + """情感分析""" + #小米8 + df1 = spark.sql("select * from jd_comment where id like '7437788%'") + #荣耀10 + df2 = spark.sql("select * from jd_comment where id like '7081550%'") + + xiaomi_s = get_sentiments(df1) + huawei_s = get_sentiments(df2) + + data = {"xiaomi_s": xiaomi_s, "huawei_s": huawei_s} + mysql_execute("update datas set data = '{}' where `key` = 'sentiments'".format( + json.dumps(data,ensure_ascii=False) )) + + if __name__ == "__main__": # $example on:spark_hive$ # warehouse_location points to the default location for managed databases and tables @@ -150,6 +173,7 @@ if __name__ == "__main__": collect_from_type(spark) collect_last_month_sells(spark) collect_hour_sells(spark) + collect_sentiments(spark) count = 1 time.sleep(10)