From 411e14684f25ea0696e2f14a5f1151f8d73a7a13 Mon Sep 17 00:00:00 2001 From: guange <8863824@gmail.com> Date: Fri, 18 Jan 2019 10:23:18 +0800 Subject: [PATCH] . --- chapter2/mysite/myapp/views.py | 2 +- chapter3/spark.py | 47 +++++++++++++++++++++------------- 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/chapter2/mysite/myapp/views.py b/chapter2/mysite/myapp/views.py index f398e96..9193f07 100644 --- a/chapter2/mysite/myapp/views.py +++ b/chapter2/mysite/myapp/views.py @@ -13,7 +13,7 @@ def crawl(request): # info = scrapy_client.get_scrapy_info() crawl_info = CrawlInfos.objects.order_by('-id').first() platform_info = get_platform_info() - news = News.objects.all() + news = News.objects.order_by('-id')[0:20] return render(request, 'myapp/crawl.html', {"crawl_info": crawl_info, "platform_info":json.dumps(platform_info), "news": news}) diff --git a/chapter3/spark.py b/chapter3/spark.py index a31777f..b5c0b71 100644 --- a/chapter3/spark.py +++ b/chapter3/spark.py @@ -30,6 +30,33 @@ def today(): return time.strftime('%Y-%m-%d') +def collect_crawl_info(spark): + df = spark.sql("select count(*) as N from jd_comment") + jd_comment_count = df.rdd.collect()[0]["N"] + + df = spark.sql("select count(*) as N from jd_comment where created_at like '"+today()+"%'") + jd_comment_today_count = df.rdd.collect()[0]["N"] + + df = spark.sql("select count(*) as N from jd") + jd_count = df.rdd.collect()[0]["N"] + + df = spark.sql("select count(*) as N from jd where created_at like '"+today()+"%'") + jd_today_count = df.rdd.collect()[0]["N"] + + total_count = jd_comment_count + jd_count + today_total_count = jd_comment_today_count + jd_today_count + + mysql_execute("insert into crawl_infos (total_count, today_total_count, product_count, today_product_count, comment_count, today_comment_count) values ({},{},{},{},{},{})".format( + total_count, today_total_count, jd_count,jd_today_count, jd_comment_count, jd_comment_today_count) ) + + +def collect_news(spark): + df = spark.sql("select * from jd_comment order by created_at desc limit 20") + for row in df.rdd.collect(): + mysql_execute("insert into news (comment_time, content, comment_id) values ('{}', '{}', '{}'')".format( + row["comment_time"], row["content"], row["id"])) + + if __name__ == "__main__": # $example on:spark_hive$ # warehouse_location points to the default location for managed databases and tables @@ -43,24 +70,8 @@ if __name__ == "__main__": .getOrCreate() while True: - df = spark.sql("select count(*) as N from jd_comment") - jd_comment_count = df.rdd.collect()[0]["N"] - - df = spark.sql("select count(*) as N from jd_comment where created_at like '"+today()+"%'") - jd_comment_today_count = df.rdd.collect()[0]["N"] - - df = spark.sql("select count(*) as N from jd") - jd_count = df.rdd.collect()[0]["N"] - - df = spark.sql("select count(*) as N from jd where created_at like '"+today()+"%'") - jd_today_count = df.rdd.collect()[0]["N"] - - total_count = jd_comment_count + jd_count - today_total_count = jd_comment_today_count + jd_today_count - - mysql_execute("insert into crawl_infos (total_count, today_total_count, product_count, today_product_count, comment_count, today_comment_count) values ({},{},{},{},{},{})".format( - total_count, today_total_count, jd_count,jd_today_count, jd_comment_count, jd_comment_today_count) ) - + collect_crawl_info(spark) + collect_news(spark) time.sleep(10)