diff --git a/chapter3/spark.py b/chapter3/spark.py index 6f1bda8..b6e0d7e 100644 --- a/chapter3/spark.py +++ b/chapter3/spark.py @@ -90,11 +90,13 @@ def collect_crawl_info(spark): def collect_news(spark): df = spark.sql("select * from jd_comment order by created_at desc limit 20") - mysql_execute("delete from news") + for row in df.rdd.collect(): mysql_execute("insert into news (comment_time, content, comment_id) values ('{}', '{}', '{}')".format( row["comment_time"], row["content"], row["id"])) + mysql_execute("delete from news where id not in ( select x.id from (select id from news order by id desc limit 20) as x);") + def get_last_day_count(spark): """获取过去几天的采集量""" for i in range(5):