From 4a8a5288f436870a1b48784af725805ec6c603b1 Mon Sep 17 00:00:00 2001 From: guange <8863824@gmail.com> Date: Thu, 17 Jan 2019 23:13:38 +0800 Subject: [PATCH] . --- chapter3/spark.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/chapter3/spark.py b/chapter3/spark.py index d3de4e0..ced82ea 100644 --- a/chapter3/spark.py +++ b/chapter3/spark.py @@ -25,6 +25,9 @@ def mysql_execute(sql): finally: db.close() +def today(): + return time.strftime('%Y-%m-%d') + if __name__ == "__main__": # $example on:spark_hive$ @@ -38,6 +41,22 @@ if __name__ == "__main__": .enableHiveSupport() \ .getOrCreate() - df = spark.sql("select * from jd limit 1") + df = sqlContext.sql("select count(*) as N from jd_comment") + jd_comment_count = df.rdd.collect()[0]["N"] + + df = sqlContext.sql("select count(*) as N from jd_comment where created_at like '"+today()+"%'") + jd_comment_today_count = df.rdd.collect()[0]["N"] + + df = sqlContext.sql("select count(*) as N from jd") + jd_count = df.rdd.collect()[0]["N"] + + df = sqlContext.sql("select count(*) as N from jd where created_at like '"+today()+"%'") + jd_today_count = df.rdd.collect()[0]["N"] + + total_count = jd_comment_count + jd_count + today_total_count = jd_comment_today_count + jd_today_count + + mysql_execute("insert into crawl_infos (total_count, today_total_count, product_count, today_product_count, comment_count, today_comment_count) values ({},{},{},{},{},{})".format( + total_count, today_total_count, jd_count,jd_today_count, jd_comment_count, jd_comment_today_count) ) spark.stop() \ No newline at end of file