bigdata/chapter1/crawler/taobao/pipelines.py

146 lines
5.1 KiB
Python

# -*- coding: utf-8 -*-
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html
import json
import pdb
import re
import happybase
from scrapy.conf import settings
from scrapy.pipelines.images import ImagesPipeline
from taobao.items import JDProductItem, JDCommentItem, JDCommentSummary,ProductSpecItem
from taobao.utils import check_alive_proxy
from scrapy.exceptions import DropItem
class CheckProxyPipeline(object):
def process_item(self, item, spider):
try:
time = check_alive_proxy(item["ip"], item["port"])
item['checktime'] = time
return item
except Exception as e:
print(e)
raise DropItem("校验出错")
class MyImagesPipeline(ImagesPipeline):
pass
class ZhihuPipeline(object):
def process_item(self, item, spider):
return item
class JsonWithEncodingCnblogsPipeline(object):
def open_spider(self, spider):
self.file = open('../../datas/%s.json' % spider.keyword, 'w', encoding='utf-8')
def process_item(self, item, spider):
line = json.dumps(dict(item), ensure_ascii=False) + "\n"
self.file.write(line)
return item
def spider_closed(self, spider):
self.file.close()
class JsonWithEncodingCommentsPipeline(object):
def open_spider(self, spider):
self.file = open('../../datas/comments/%s.json' % spider.product_id, 'w', encoding='utf-8')
def process_item(self, item, spider):
line = json.dumps(dict(item), ensure_ascii=False) + "\n"
self.file.write(line)
return item
def spider_closed(self, spider):
self.file.close()
class JDCleanDataPipeline(object):
"""
京东数据清先
"""
def process_item(self, item, spider):
if isinstance(item, JDProductItem):
if isinstance(item["name"], str):
item["name"] = item["name"].strip().replace('\n', '')
m = re.match(r'.+\/(\d+)\.html', item['url'])
if m:
item["id"] = m.group(1)
if not isinstance(item["id"], str):
return None
elif isinstance(item, JDCommentItem):
pass
return item
class JDHbasePipeline(object):
"""
数据存到hbase中
"""
def __init__(self):
host = settings['HBASE_HOST']
table_name = settings['HBASE_TABLE']
connection = happybase.Connection(host, port=settings["HBASE_PORT"])
table = connection.table(table_name)
self.product_table = table
self.comment_table = connection.table('jd_comment')
def process_item(self, item, spider):
if isinstance(item, JDProductItem):
self.product_table.put(item["id"],
{"product:name": item["name"],
"product:type": item["type"],
"product:price": item["price"],
"product:url": item["url"],
"product:img": item["img"],
"product:shop": item["shop"],
"product:comment_num": item["comment_num"],
"product:created_at": item["created_at"]
})
elif isinstance(item, JDCommentItem):
self.comment_table.put(item["id"],
{
"comment:user_name": item["user_name"],
"comment:from_type": item["from_type"],
"comment:content": item["content"],
"comment:comment_time": item["comment_time"],
"comment:created_at": item["created_at"]
})
elif isinstance(item, JDCommentSummary):
self.product_table.put(item["id"],
{"comment:good_count": str(item["good_count"]),
"comment:general_count": str(item["general_count"]),
"comment:poor_count": str(item["poor_count"]),
"comment:comment_count": str(item["comment_count"]),
"comment:default_good_count": str(item["default_good_count"]),
"comment:good_rate": str(item["good_rate"])
})
elif isinstance(item, ProductSpecItem):
self.product_table.put(item["pid"],
{
"spec:cpu": str(item["cpu"]),
"spec:rom": str(item['rom']),
"spec:ram": str(item["ram"]),
"spec:resolution": str(item["resolution"]),
"spec:charge": str(item["charge"]),
"spec:weight": str(item["weight"]),
"spec:brand": str(item["brand"]),
}
)
return item