scrapy异步保存数据到mysql与写入json文件

Scrapy框架 2019-01-07 2184

import json

#  scrapy提供了多种导出item的方法,按crtl+b查看  
from scrapy.exporters import JsonItemExporter  
from scrapy.pipelines.images import ImagesPipeline  
#  异步  
from twisted.enterprise import adbapi  

import MySQLdb  
import MySQLdb.cursors  


#  保存文件  
class JsonWithEncodingPipeline(object):  
    """  
    自定义json文件导出  
    """  
    def __init__(self):  
        #  打开文件,codes可以避免编码的问题  
        self.file = codecs.open('article.json', 'w', encoding='utf-8')  

    def process_item(self, item, spider):  
        lines = json.dumps(dict(item), ensure_ascii=False) + "\n"  
        self.file.write(lines)  
        return item  

    def spider_closed(self, spider):  
        self.file.close()  


class JsonExporterPipleline(object):  
    #  调用scrapy提供的json export导出json文件  
    def __init__(self):  
        self.file = open('articleexport.json', 'wb')  
        self.exporter = JsonItemExporter(self.file, encoding='utf-8', ensure_ascii=False)  
        self.exporter.start_exporting()  

    def close_spider(self, spider):  
        self.exporter.finish_exporting()  
        self.file.close()  

    def process_item(self, item, spider):  
        self.exporter.export_item(item)  
        return item  

#  还可以使用scrapy-django-item来保存数据  


class MysqlPipeline(object):  
    """  
    保存数据到数据库,方法1,实时存储  
    """  
    def __init__(self):  
        self.conn = MySQLdb.connect('localhost', 'root', 'hy1680456489', 'article_spider', charset='utf8',  
                                    use_unicode=True)  
        self.cursor = self.conn.cursor()  

    def process_item(self, item, spider):  
        insert_sql = """  
            insert into jobbole_article(title, url, create_date, zan)  
            VALUES (%s, %s, %s, %s)  
        """  
        self.cursor.execute(insert_sql, (item["title"], item["url"], item["create_date"], item["zan"], ))  
        self.conn.commit()  


class MysqlTwistedPipeline(object):  
    """  
    异步存储  
    """  
    def __init__(self, dbpool):  
        self.dbpool = dbpool  

    @classmethod  
    def from_settings(cls, settings):  
        dbparms = dict(  
            host=settings["MYSQL_HOST"],  
            db=settings["MYSQL_DBNAME"],  
            user=settings['MYSQL_USER'],  
            passwd=settings['MYSQL_PASSWORD'],  
            charset='utf8',  
            cursorclass=MySQLdb.cursors.DictCursor,  
            use_unicode=True,  
        )  
        dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)  

        return cls(dbpool)  

    def process_item(self, item, spider):  
        """  
        使用teisted将mysql插入变成异步执行  
        """  
        query = self.dbpool.runInteraction(self.do_insert, item)  
        query.addErrback(self.handle_error)  # 处理异常  

    def handle_error(self, failure):  
        #  处理异步插入的异常  
        print(failure)  

    def do_insert(self, cursor, item):  
        # 执行具体的插入  
        insert_sql = """  
                    insert into jobbole_article(title, url, create_date, zan)  
                    VALUES (%s, %s, %s, %s)  
                """  
        cursor.execute(insert_sql, (item["title"], ite  

或者

import pymysql  
from pymysql import cursors  
# 专门用来作数据库处理  
from twisted.enterprise import adbapi

class JianshuPipeline(object):  
    def __init__(self):  
        dbparams = {  
            'host': '127.0.0.1',  
            'port': 3306,  
            'user': 'root',  
            'password': 'hy1680456489',  
            'database': 'jianshu',  
            'charset': 'utf8'  
        }  
        self.conn = pymysql.connect(**dbparams)  
        self.cursor = self.conn.cursor()  
        self._sql = None  

    def process_item(self, item, spider):  
        self.cursor.execute(self.sql,(item['title'], item['content'], item['author'], item['avatar'], item['pub_time'],  
                                      item['origin_url'], item['article_id']))  
        self.conn.commit()  
        return item  

    @property  
    def sql(self):  
        if not self._sql:  
            self._sql = """  
                insert into article(id, title, content, author, avatar, pub_time,  
                origin_url, article_id) values (null, %s, %s, %s, %s, %s, %s, %s)  
            """  
            return self._sql  
        return self._sql  


class JianshuTwistedPipeline(object):  
    def __init__(self):  
        dbparams = {  
            'host': '127.0.0.1',  
            'port': 3306,  
            'user': 'root',  
            'password': 'hy1680456489',  
            'database': 'jianshu',  
            'charset': 'utf8',  
            'cursorclass': cursors.DictCursor  
        }  
        self.dbpool = adbapi.ConnectionPool('pymysql', **dbparams)  
        self._sql = None  

    def process_item(self, item, spider):  
        defer = self.dbpool.runInteraction(self.insert_item, item)  
        defer.addErrback(self.handle_error, item, spider)  
    @property  
    def sql(self):  
        if not self._sql:  
            self._sql = """  
                    insert into article(id, title, content, author, avatar, pub_time,  
                    origin_url, article_id) values (null, %s, %s, %s, %s, %s, %s, %s)  
                """  
            return self._sql  
        return self._sql  

    def insert_item(self, cursor, item):  
        cursor.execute(self.sql, (item['title'], item['content'], item['author'], item['avatar'], item['pub_time'],  
                                       item['origin_url'], item['article_id']))  

    # 错误处理  
    def handle_error(self, error, item, spider):  
        print(error)  
        print('-'*10+"error"+'-'*10)  

 

标签:Scrapy框架

文章评论

评论列表

已有0条评论