python数据源操作MySQL实例



本文来源:http://blog.52fq.net/post/97/,感谢dancebear的工作和分享,前往该页面可以查看彩色语法显示的版本!


需要提前安装的Python扩展包:



数据库配置文件:articl.conf:

[indexer]   
#每批次读取的文件数目   
perpage=1000  
[mysql]   
#MySQL服务器地址   
host= 127.0.0.1   
#MySQL用户名   
username=root  
#MySQL密码   
password=root  
#MySQL数据库名   
dbname=article  
#MySQL默认字符集   
charset=gbk  
#MySQL数据表表前缀   
tableprefix=cms_  
#MySQL服务器端口   
port=3306  




python数据源脚本:/usr/local/search/datasources/article.py:

#! /usr/bin/env python   
#coding=utf-8   
# coreseek3.2.13 python source文章读取   
# author: dancebear   
# date: 2010-07-02 11:46   
import MySQLdb,logging  
confFilePrivate='/usr/local/search/conf/article.conf'   
#print  [x.lower() for x in sys.path]   
class mainSource(object):   
    def __init__(self,cfg):   
        self.cfg=ini.DictIni(confFilePrivate)   
        #setLoggerHandler(self.cfg)   
        self._rowCount=0   
        self.m_cursor = None   
        self.m_dbconn = None   
        #range query.   
        self.m_minid = 0   
        self.m_maxid = 0   
        self.m_start_id = 0;   
        #base str   
        self.m_basesql_str = ''   
  
    def GetScheme(self):  #获取结构,docid、文本、整数   
        return [   
            ('id' , {'docid':True, } ),   
            ('fid', { 'type':'integer'} ),   
            ('description', { 'type':'text'} ),   
            ('pink', {'type':'integer'} ),   
            ('author', {'type':'text'} ),   
            ('authorid', {'type':'integer'} ),   
            ('subject', {'type':'text'} ),   
            ('dateline', {'type':'integer'} ),   
            ('state', {'type':'integer'} ),   
            ('top', {'type':'integer'} ),   
            ('content', {'type':'text'} ),   
            ('views', {'type':'integer'} ),   
            ('comments', {'type':'integer'} ),   
        ]   
  
    def GetFieldOrder(self): #字段的优先顺序   
        return [('subject', 'content','description','author')]   
           
    def Connected(self):   
        #如果是数据库,则在此处做数据库连接   
        if not self.cfg.has_key('mysql'):   
            logging.error('Not has MySQL info')   
            return False  
        try:   
            self.m_dbconn =  MySQLdb.connect (host = self.cfg.mysql.host,\   
                port = int(self.cfg.mysql.port),\   
                user = self.cfg.mysql.username,\   
                passwd = self.cfg.mysql.password,\   
                db = self.cfg.mysql.dbname)   
        except MySQLdb.Error, e:   
            logginf.error( "Error %d: %s" , e.args[0], e.args[1])   
            return False  
        return True  
       
    def OnBeforeIndex(self):   
        sql = """SELECT MIN(id),MAX(id) FROM {$prefix}contentlist"""  
        #select max & min doc_id   
        if self.m_dbconn == None:   
            return False  
        if self.m_dbconn == None:   
            return False  
        try:   
            self.m_cursor = self.m_dbconn.cursor ()   
            #Change DataBase Enocding here, is to support gbk or others.   
            rowCount = self.m_cursor.execute ("SET NAMES "+self.cfg.mysql.charset)   
        except MySQLdb.Error, e:   
            logging.error( "Error %d: %s" ,e.args[0], e.args[1])   
            return False  
        self.m_cursor.close ()   
        sql = sql.replace("{$prefix}",self.cfg.mysql.tableprefix)    
        try:   
            self.m_cursor = self.m_dbconn.cursor ()   
            rowCount = self.m_cursor.execute (sql)   
        except MySQLdb.Error, e:   
            logging.error( "Error %d: %s" , e.args[0], e.args[1])   
            return False  
        if rowCount == 0:   
            return False  
        tm_row = self.m_cursor.fetchone()   
        #print tm_row[0],tm_row[1]   
        if tm_row[0]:   
            self.m_minid = tm_row[0]   
            if tm_row[1]:   
                self.m_maxid = tm_row[1]   
                #self.m_minid = 4667671   
                self.m_start_id = self.m_minid   
                   
                #self.m_cursor.close ()   
                #self.m_cursor = None   
                   
        sql = """SELECT l.id AS id, l.CateID AS fid,   
            l.Description AS description,l.pink AS pink,  
            l.Author as author, l.AuthorID AS poster_id,   
            l.Title AS title, l.PublicTime AS post_time,   
            l.State as state,l.Top as top,  
            c.content AS content,l.HitNum as views, l.CommentNum as comments  
            FROM {$prefix}contentlist AS l, {$prefix}content AS c WHERE c.ContentID = l.id"""  
        self.m_basesql_str = sql.replace("{$prefix}",self.cfg.mysql.tableprefix)    
        return True  
       
    def NextDocument(self):   #取得每一个文档记录的调用   
        if self._rowCount<=0:   
            #do fetch   
            try:   
                self.m_cursor = self.m_dbconn.cursor(cursorclass=MySQLdb.cursors.DictCursor)   
                sql_condition = " AND l.id>="+str(self.m_start_id) + " AND l.id<"+str(self.m_start_id+self.cfg.indexer.perpage)   
                sql_condition = sql_condition.replace("{$prefix}",self.cfg.mysql.tableprefix)    
                #print self.m_basesql_str+sql_condition   
                self._rowCount = self.m_cursor.execute (self.m_basesql_str+sql_condition)-1   
                self.m_start_id = self.m_start_id + self.cfg.indexer.perpage; #append 1 to avoid doc_id duplicate.   
                logging.info('Select data from %d to %d',self.m_start_id, self.m_start_id+self.cfg.indexer.perpage)   
                return self._getRow()   
            except MySQLdb.Error, e:   
                logging.error( "Error %d:%s",e.args[0],e.args[1])   
                self.OnIndexFinished()   
                return False  
        else:   
            self._rowCount-=1   
            return self._getRow()   
    def _getRow(self):   
        m_row=self.m_cursor.fetchone()   
        if m_row:   
            #print m_row['id'],   
            #['pink', 'content', 'post_time', 'description', 'author', 'views', 'top', 'title', 'comments', 'poster_id', 'state', 'fid', 'id']   
            self.id = m_row['id']   
            self.fid = m_row['fid']   
            if m_row['description'] is None:   
                self.description =''   
            else:               
                self.description = m_row['description'].decode(self.cfg.mysql.charset,'ignore').encode('utf-8')   
            self.pink = m_row['pink']   
            if m_row['author'] is None:   
                self.author =''   
            else:               
                self.author = m_row['author'].decode(self.cfg.mysql.charset,'ignore').encode('utf-8')   
            self.authorid = m_row['poster_id']   
            if m_row['title'] is None:   
                self.subject =''   
            else:               
                self.subject = m_row['title'].decode(self.cfg.mysql.charset,'ignore').encode('utf-8')   
            self.dateline = m_row['post_time']    
            self.state = m_row['state']   
            self.top = m_row['top']   
            if m_row['content'] is None:   
                self.content =''   
            else:               
                self.content = m_row['content'].decode(self.cfg.mysql.charset,'ignore').encode('utf-8')   
            self.views = m_row['views']   
            self.comments = m_row['comments']   
            return True  
        else:   
            return False  
    def OnIndexFinished(self):   
        sql = """UPDATE {$prefix}settings SET `value`='{$id}' WHERE variable='idx_postid'"""  
        sql = sql.replace("{$prefix}", self.cfg.mysql.tableprefix)    
        sql = sql.replace("{$id}", str(self.m_maxid))   
        self.Connected()   
        self.m_dbconn.cursor().execute (sql)   
        #print sql   
        logging.info('END fetch')   
        return True  
class deltaSource():   
    def __init__(self,cfg):   
        self.cfg=ini.DictIni(confFilePrivate)   
        #setLoggerHandler(self.cfg)   
        self._rowCount=0   
        self.m_cursor = None   
        self.m_dbconn = None   
        #range query.   
        self.m_minid = 0   
        self.m_maxid = 0   
        self.m_start_id = 0;   
        #base str   
        self.m_basesql_str = ''   
  
    def GetScheme(self):  #获取结构,docid、文本、整数   
        return [   
            ('id' , {'docid':True, } ),   
            ('fid', { 'type':'integer'} ),   
            ('description', { 'type':'text'} ),   
            ('pink', {'type':'integer'} ),   
            ('author', {'type':'text'} ),   
            ('authorid', {'type':'integer'} ),   
            ('subject', {'type':'text'} ),   
            ('dateline', {'type':'integer'} ),   
            ('state', {'type':'integer'} ),   
            ('top', {'type':'integer'} ),   
            ('content', {'type':'text'} ),   
            ('views', {'type':'integer'} ),   
            ('comments', {'type':'integer'} ),   
        ]   
  
    def GetFieldOrder(self): #字段的优先顺序   
        return [('subject', 'content','description','author')]   
           
    def Connected(self):   
        #如果是数据库,则在此处做数据库连接   
        if not self.cfg.has_key('mysql'):   
            logging.error('Not has MySQL info')   
            return False  
        try:   
            self.m_dbconn =  MySQLdb.connect (host = self.cfg.mysql.host,\   
                port = int(self.cfg.mysql.port),\   
                user = self.cfg.mysql.username,\   
                passwd = self.cfg.mysql.password,\   
                db = self.cfg.mysql.dbname)   
        except MySQLdb.Error, e:   
            logginf.error( "Error %d: %s" , e.args[0], e.args[1])   
            return False  
        return True  
       
    def OnBeforeIndex(self):   
        sql = """SELECT MIN(id),MAX(id) FROM {$prefix}contentlist"""  
        #select max & min doc_id   
        if self.m_dbconn == None:   
            return False  
        if self.m_dbconn == None:   
            return False  
        try:   
            self.m_cursor = self.m_dbconn.cursor ()   
            #Change DataBase Enocding here, is to support gbk or others.   
            rowCount = self.m_cursor.execute ("SET NAMES "+self.cfg.mysql.charset)   
        except MySQLdb.Error, e:   
            logging.error( "Error %d: %s" ,e.args[0], e.args[1])   
            return False  
        self.m_cursor.close ()   
        sql = sql.replace("{$prefix}",self.cfg.mysql.tableprefix)    
        try:   
            self.m_cursor = self.m_dbconn.cursor ()   
            rowCount = self.m_cursor.execute (sql)   
        except MySQLdb.Error, e:   
            logging.error( "Error %d: %s" , e.args[0], e.args[1])   
            return False  
        if rowCount == 0:   
            return False  
        tm_row = self.m_cursor.fetchone()   
        #print tm_row[0],tm_row[1]   
        if tm_row[0]:   
            self.m_minid = tm_row[0]   
            if tm_row[1]:   
                self.m_maxid = tm_row[1]   
                #self.m_minid = 4667671   
                self.m_start_id = self.m_minid   
                   
                #self.m_cursor.close ()   
                #self.m_cursor = None   
        #try to get the indexed id value   
        sql = "SELECT value FROM {$prefix}settings WHERE variable='idx_postid'";   
        sql = sql.replace("{$prefix}",self.cfg.mysql.tableprefix)    
        try:   
            self.m_cursor = self.m_dbconn.cursor ()   
            rowCount = self.m_cursor.execute (sql)   
            tm_row = self.m_cursor.fetchone()   
            self.m_minid = int(tm_row[0]) + 1   
            self.m_cursor.close ()   
            self.m_start_id = self.m_minid   
            #print tm_row   
        except MySQLdb.Error, e:   
            print "Error %d: %s" % (e.args[0], e.args[1])   
            return False  
  
        sql = """SELECT l.id AS id, l.CateID AS fid,   
            l.Description AS description,l.pink AS pink,  
            l.Author as author, l.AuthorID AS poster_id,   
            l.Title AS title, l.PublicTime AS post_time,   
            l.State as state,l.Top as top,  
            c.content AS content,l.HitNum as views, l.CommentNum as comments  
            FROM {$prefix}contentlist AS l, {$prefix}content AS c WHERE c.ContentID = l.id"""  
        self.m_basesql_str = sql.replace("{$prefix}",self.cfg.mysql.tableprefix)    
        return True  
    def NextDocument(self):   #取得每一个文档记录的调用   
        if self._rowCount<=0:   
            #do fetch   
            try:   
                self.m_cursor = self.m_dbconn.cursor(cursorclass=MySQLdb.cursors.DictCursor)   
                sql_condition = " AND l.id>="+str(self.m_start_id) + " AND l.id<"+str(self.m_start_id+self.cfg.indexer.perpage)   
                sql_condition = sql_condition.replace("{$prefix}",self.cfg.mysql.tableprefix)    
                #print self.m_basesql_str+sql_condition   
                self._rowCount = self.m_cursor.execute (self.m_basesql_str+sql_condition)-1   
                self.m_start_id = self.m_start_id + self.cfg.indexer.perpage; #append 1 to avoid doc_id duplicate.   
                logging.info('Select data from %d to %d',self.m_start_id, self.m_start_id+self.cfg.indexer.perpage)   
                return self._getRow()   
            except MySQLdb.Error, e:   
                logging.error( "Error %d:%s",e.args[0],e.args[1])   
                self.OnIndexFinished()   
                return False  
        else:   
            self._rowCount-=1   
            return self._getRow()   
    def _getRow(self):   
        m_row=self.m_cursor.fetchone()   
        if m_row:   
            #print m_row['id'],   
            #['pink', 'content', 'post_time', 'description', 'author', 'views', 'top', 'title', 'comments', 'poster_id', 'state', 'fid', 'id']   
            self.id = m_row['id']   
            self.fid = m_row['fid']   
            if m_row['description'] is None:   
                self.description =''   
            else:               
                self.description = m_row['description'].decode(self.cfg.mysql.charset,'ignore').encode('utf-8')   
            self.pink = m_row['pink']   
            if m_row['author'] is None:   
                self.author =''   
            else:               
                self.author = m_row['author'].decode(self.cfg.mysql.charset,'ignore').encode('utf-8')   
            self.authorid = m_row['poster_id']   
            if m_row['title'] is None:   
                self.subject =''   
            else:               
                self.subject = m_row['title'].decode(self.cfg.mysql.charset,'ignore').encode('utf-8')   
            self.dateline = m_row['post_time']    
            self.state = m_row['state']   
            self.top = m_row['top']   
            if m_row['content'] is None:   
                self.content =''   
            else:               
                self.content = m_row['content'].decode(self.cfg.mysql.charset,'ignore').encode('utf-8')   
            self.views = m_row['views']   
            self.comments = m_row['comments']   
            return True  
        else:   
            return False  
    def OnIndexFinished(self):   
        sql = """UPDATE {$prefix}settings SET `value`='{$id}' WHERE variable='idx_postid'"""  
        sql = sql.replace("{$prefix}", self.cfg.mysql.tableprefix)    
        sql = sql.replace("{$id}", str(self.m_maxid))   
        self.Connected()   
        self.m_dbconn.cursor().execute (sql)   
        #print sql   
        logging.info('END fetch')   
        return True  
if __name__ == "__main__":    #直接访问演示部分   
    conf = {}   
    source = mainSource(conf)   
    source.Connected()   
  
    while source.NextDocument():   
        print "id=%d, subject=%s ,content =%s" % (source.id, source.subject,source.content[0:20])   
    pass  




coreseek配置:csft.conf

# python路径定义   
python   
{   
    path = /usr/local/search/datasources/   
}   
  
#源定义   
source python_main   
{   
    type = python  
    name = article.mainSource#此处的article为你保存上面的python的文件名   
}   
source python_delta   
{   
    type = python  
    name = article.deltaSource   
}   
#index定义   
index main   
{   
    source                  = python_main   
    path                    = /usr/local/search/data/article   
    docinfo                 = extern   
    stopwords               = /usr/local/search/dict/stopwords.txt   
    charset_dictpath        = /usr/local/search/dict/   
    charset_type            = zh_cn.utf-8   
    #source            = python             #对应的source名称   
    mlock            = 0   
    morphology            = none   
    min_word_len        = 1   
    html_strip            = 1   
}   
index delta   
{   
    source                  = python_delta   
    path                    = /usr/local/search/data/article_delta   
    docinfo                 = extern   
    stopwords               = /usr/local/search/dict/stopwords.txt   
    charset_dictpath        = /usr/local/search/dict/   
    charset_type            = zh_cn.utf-8   
    #source            = python             #对应的source名称   
    mlock            = 0   
    morphology            = none   
    min_word_len        = 1   
    html_strip            = 1   
}   
#全局index定义   
indexer   
{   
    mem_limit               = 128M   
}   
  
#searchd服务定义   
searchd   
{   
    listen                  =   30001   
    max_matches             = 1000   
}