python数据源操作MySQL实例
本文来源:http://blog.52fq.net/post/97/,感谢dancebear的工作和分享,前往该页面可以查看彩色语法显示的版本!
需要提前安装的Python扩展包:
- mysqldb:http://mysql-python.sourceforge.net/
- dict4ini:http://code.google.com/p/dict4ini/
数据库配置文件:articl.conf:
[indexer] #每批次读取的文件数目 perpage=1000 [mysql] #MySQL服务器地址 host= 127.0.0.1 #MySQL用户名 username=root #MySQL密码 password=root #MySQL数据库名 dbname=article #MySQL默认字符集 charset=gbk #MySQL数据表表前缀 tableprefix=cms_ #MySQL服务器端口 port=3306
python数据源脚本:/usr/local/search/datasources/article.py:
#! /usr/bin/env python
#coding=utf-8
# coreseek3.2.13 python source文章读取
# author: dancebear
# date: 2010-07-02 11:46
import MySQLdb,logging
confFilePrivate='/usr/local/search/conf/article.conf'
#print [x.lower() for x in sys.path]
class mainSource(object):
def __init__(self,cfg):
self.cfg=ini.DictIni(confFilePrivate)
#setLoggerHandler(self.cfg)
self._rowCount=0
self.m_cursor = None
self.m_dbconn = None
#range query.
self.m_minid = 0
self.m_maxid = 0
self.m_start_id = 0;
#base str
self.m_basesql_str = ''
def GetScheme(self): #获取结构,docid、文本、整数
return [
('id' , {'docid':True, } ),
('fid', { 'type':'integer'} ),
('description', { 'type':'text'} ),
('pink', {'type':'integer'} ),
('author', {'type':'text'} ),
('authorid', {'type':'integer'} ),
('subject', {'type':'text'} ),
('dateline', {'type':'integer'} ),
('state', {'type':'integer'} ),
('top', {'type':'integer'} ),
('content', {'type':'text'} ),
('views', {'type':'integer'} ),
('comments', {'type':'integer'} ),
]
def GetFieldOrder(self): #字段的优先顺序
return [('subject', 'content','description','author')]
def Connected(self):
#如果是数据库,则在此处做数据库连接
if not self.cfg.has_key('mysql'):
logging.error('Not has MySQL info')
return False
try:
self.m_dbconn = MySQLdb.connect (host = self.cfg.mysql.host,\
port = int(self.cfg.mysql.port),\
user = self.cfg.mysql.username,\
passwd = self.cfg.mysql.password,\
db = self.cfg.mysql.dbname)
except MySQLdb.Error, e:
logginf.error( "Error %d: %s" , e.args[0], e.args[1])
return False
return True
def OnBeforeIndex(self):
sql = """SELECT MIN(id),MAX(id) FROM {$prefix}contentlist"""
#select max & min doc_id
if self.m_dbconn == None:
return False
if self.m_dbconn == None:
return False
try:
self.m_cursor = self.m_dbconn.cursor ()
#Change DataBase Enocding here, is to support gbk or others.
rowCount = self.m_cursor.execute ("SET NAMES "+self.cfg.mysql.charset)
except MySQLdb.Error, e:
logging.error( "Error %d: %s" ,e.args[0], e.args[1])
return False
self.m_cursor.close ()
sql = sql.replace("{$prefix}",self.cfg.mysql.tableprefix)
try:
self.m_cursor = self.m_dbconn.cursor ()
rowCount = self.m_cursor.execute (sql)
except MySQLdb.Error, e:
logging.error( "Error %d: %s" , e.args[0], e.args[1])
return False
if rowCount == 0:
return False
tm_row = self.m_cursor.fetchone()
#print tm_row[0],tm_row[1]
if tm_row[0]:
self.m_minid = tm_row[0]
if tm_row[1]:
self.m_maxid = tm_row[1]
#self.m_minid = 4667671
self.m_start_id = self.m_minid
#self.m_cursor.close ()
#self.m_cursor = None
sql = """SELECT l.id AS id, l.CateID AS fid,
l.Description AS description,l.pink AS pink,
l.Author as author, l.AuthorID AS poster_id,
l.Title AS title, l.PublicTime AS post_time,
l.State as state,l.Top as top,
c.content AS content,l.HitNum as views, l.CommentNum as comments
FROM {$prefix}contentlist AS l, {$prefix}content AS c WHERE c.ContentID = l.id"""
self.m_basesql_str = sql.replace("{$prefix}",self.cfg.mysql.tableprefix)
return True
def NextDocument(self): #取得每一个文档记录的调用
if self._rowCount<=0:
#do fetch
try:
self.m_cursor = self.m_dbconn.cursor(cursorclass=MySQLdb.cursors.DictCursor)
sql_condition = " AND l.id>="+str(self.m_start_id) + " AND l.id<"+str(self.m_start_id+self.cfg.indexer.perpage)
sql_condition = sql_condition.replace("{$prefix}",self.cfg.mysql.tableprefix)
#print self.m_basesql_str+sql_condition
self._rowCount = self.m_cursor.execute (self.m_basesql_str+sql_condition)-1
self.m_start_id = self.m_start_id + self.cfg.indexer.perpage; #append 1 to avoid doc_id duplicate.
logging.info('Select data from %d to %d',self.m_start_id, self.m_start_id+self.cfg.indexer.perpage)
return self._getRow()
except MySQLdb.Error, e:
logging.error( "Error %d:%s",e.args[0],e.args[1])
self.OnIndexFinished()
return False
else:
self._rowCount-=1
return self._getRow()
def _getRow(self):
m_row=self.m_cursor.fetchone()
if m_row:
#print m_row['id'],
#['pink', 'content', 'post_time', 'description', 'author', 'views', 'top', 'title', 'comments', 'poster_id', 'state', 'fid', 'id']
self.id = m_row['id']
self.fid = m_row['fid']
if m_row['description'] is None:
self.description =''
else:
self.description = m_row['description'].decode(self.cfg.mysql.charset,'ignore').encode('utf-8')
self.pink = m_row['pink']
if m_row['author'] is None:
self.author =''
else:
self.author = m_row['author'].decode(self.cfg.mysql.charset,'ignore').encode('utf-8')
self.authorid = m_row['poster_id']
if m_row['title'] is None:
self.subject =''
else:
self.subject = m_row['title'].decode(self.cfg.mysql.charset,'ignore').encode('utf-8')
self.dateline = m_row['post_time']
self.state = m_row['state']
self.top = m_row['top']
if m_row['content'] is None:
self.content =''
else:
self.content = m_row['content'].decode(self.cfg.mysql.charset,'ignore').encode('utf-8')
self.views = m_row['views']
self.comments = m_row['comments']
return True
else:
return False
def OnIndexFinished(self):
sql = """UPDATE {$prefix}settings SET `value`='{$id}' WHERE variable='idx_postid'"""
sql = sql.replace("{$prefix}", self.cfg.mysql.tableprefix)
sql = sql.replace("{$id}", str(self.m_maxid))
self.Connected()
self.m_dbconn.cursor().execute (sql)
#print sql
logging.info('END fetch')
return True
class deltaSource():
def __init__(self,cfg):
self.cfg=ini.DictIni(confFilePrivate)
#setLoggerHandler(self.cfg)
self._rowCount=0
self.m_cursor = None
self.m_dbconn = None
#range query.
self.m_minid = 0
self.m_maxid = 0
self.m_start_id = 0;
#base str
self.m_basesql_str = ''
def GetScheme(self): #获取结构,docid、文本、整数
return [
('id' , {'docid':True, } ),
('fid', { 'type':'integer'} ),
('description', { 'type':'text'} ),
('pink', {'type':'integer'} ),
('author', {'type':'text'} ),
('authorid', {'type':'integer'} ),
('subject', {'type':'text'} ),
('dateline', {'type':'integer'} ),
('state', {'type':'integer'} ),
('top', {'type':'integer'} ),
('content', {'type':'text'} ),
('views', {'type':'integer'} ),
('comments', {'type':'integer'} ),
]
def GetFieldOrder(self): #字段的优先顺序
return [('subject', 'content','description','author')]
def Connected(self):
#如果是数据库,则在此处做数据库连接
if not self.cfg.has_key('mysql'):
logging.error('Not has MySQL info')
return False
try:
self.m_dbconn = MySQLdb.connect (host = self.cfg.mysql.host,\
port = int(self.cfg.mysql.port),\
user = self.cfg.mysql.username,\
passwd = self.cfg.mysql.password,\
db = self.cfg.mysql.dbname)
except MySQLdb.Error, e:
logginf.error( "Error %d: %s" , e.args[0], e.args[1])
return False
return True
def OnBeforeIndex(self):
sql = """SELECT MIN(id),MAX(id) FROM {$prefix}contentlist"""
#select max & min doc_id
if self.m_dbconn == None:
return False
if self.m_dbconn == None:
return False
try:
self.m_cursor = self.m_dbconn.cursor ()
#Change DataBase Enocding here, is to support gbk or others.
rowCount = self.m_cursor.execute ("SET NAMES "+self.cfg.mysql.charset)
except MySQLdb.Error, e:
logging.error( "Error %d: %s" ,e.args[0], e.args[1])
return False
self.m_cursor.close ()
sql = sql.replace("{$prefix}",self.cfg.mysql.tableprefix)
try:
self.m_cursor = self.m_dbconn.cursor ()
rowCount = self.m_cursor.execute (sql)
except MySQLdb.Error, e:
logging.error( "Error %d: %s" , e.args[0], e.args[1])
return False
if rowCount == 0:
return False
tm_row = self.m_cursor.fetchone()
#print tm_row[0],tm_row[1]
if tm_row[0]:
self.m_minid = tm_row[0]
if tm_row[1]:
self.m_maxid = tm_row[1]
#self.m_minid = 4667671
self.m_start_id = self.m_minid
#self.m_cursor.close ()
#self.m_cursor = None
#try to get the indexed id value
sql = "SELECT value FROM {$prefix}settings WHERE variable='idx_postid'";
sql = sql.replace("{$prefix}",self.cfg.mysql.tableprefix)
try:
self.m_cursor = self.m_dbconn.cursor ()
rowCount = self.m_cursor.execute (sql)
tm_row = self.m_cursor.fetchone()
self.m_minid = int(tm_row[0]) + 1
self.m_cursor.close ()
self.m_start_id = self.m_minid
#print tm_row
except MySQLdb.Error, e:
print "Error %d: %s" % (e.args[0], e.args[1])
return False
sql = """SELECT l.id AS id, l.CateID AS fid,
l.Description AS description,l.pink AS pink,
l.Author as author, l.AuthorID AS poster_id,
l.Title AS title, l.PublicTime AS post_time,
l.State as state,l.Top as top,
c.content AS content,l.HitNum as views, l.CommentNum as comments
FROM {$prefix}contentlist AS l, {$prefix}content AS c WHERE c.ContentID = l.id"""
self.m_basesql_str = sql.replace("{$prefix}",self.cfg.mysql.tableprefix)
return True
def NextDocument(self): #取得每一个文档记录的调用
if self._rowCount<=0:
#do fetch
try:
self.m_cursor = self.m_dbconn.cursor(cursorclass=MySQLdb.cursors.DictCursor)
sql_condition = " AND l.id>="+str(self.m_start_id) + " AND l.id<"+str(self.m_start_id+self.cfg.indexer.perpage)
sql_condition = sql_condition.replace("{$prefix}",self.cfg.mysql.tableprefix)
#print self.m_basesql_str+sql_condition
self._rowCount = self.m_cursor.execute (self.m_basesql_str+sql_condition)-1
self.m_start_id = self.m_start_id + self.cfg.indexer.perpage; #append 1 to avoid doc_id duplicate.
logging.info('Select data from %d to %d',self.m_start_id, self.m_start_id+self.cfg.indexer.perpage)
return self._getRow()
except MySQLdb.Error, e:
logging.error( "Error %d:%s",e.args[0],e.args[1])
self.OnIndexFinished()
return False
else:
self._rowCount-=1
return self._getRow()
def _getRow(self):
m_row=self.m_cursor.fetchone()
if m_row:
#print m_row['id'],
#['pink', 'content', 'post_time', 'description', 'author', 'views', 'top', 'title', 'comments', 'poster_id', 'state', 'fid', 'id']
self.id = m_row['id']
self.fid = m_row['fid']
if m_row['description'] is None:
self.description =''
else:
self.description = m_row['description'].decode(self.cfg.mysql.charset,'ignore').encode('utf-8')
self.pink = m_row['pink']
if m_row['author'] is None:
self.author =''
else:
self.author = m_row['author'].decode(self.cfg.mysql.charset,'ignore').encode('utf-8')
self.authorid = m_row['poster_id']
if m_row['title'] is None:
self.subject =''
else:
self.subject = m_row['title'].decode(self.cfg.mysql.charset,'ignore').encode('utf-8')
self.dateline = m_row['post_time']
self.state = m_row['state']
self.top = m_row['top']
if m_row['content'] is None:
self.content =''
else:
self.content = m_row['content'].decode(self.cfg.mysql.charset,'ignore').encode('utf-8')
self.views = m_row['views']
self.comments = m_row['comments']
return True
else:
return False
def OnIndexFinished(self):
sql = """UPDATE {$prefix}settings SET `value`='{$id}' WHERE variable='idx_postid'"""
sql = sql.replace("{$prefix}", self.cfg.mysql.tableprefix)
sql = sql.replace("{$id}", str(self.m_maxid))
self.Connected()
self.m_dbconn.cursor().execute (sql)
#print sql
logging.info('END fetch')
return True
if __name__ == "__main__": #直接访问演示部分
conf = {}
source = mainSource(conf)
source.Connected()
while source.NextDocument():
print "id=%d, subject=%s ,content =%s" % (source.id, source.subject,source.content[0:20])
pass
coreseek配置:csft.conf
# python路径定义
python
{
path = /usr/local/search/datasources/
}
#源定义
source python_main
{
type = python
name = article.mainSource#此处的article为你保存上面的python的文件名
}
source python_delta
{
type = python
name = article.deltaSource
}
#index定义
index main
{
source = python_main
path = /usr/local/search/data/article
docinfo = extern
stopwords = /usr/local/search/dict/stopwords.txt
charset_dictpath = /usr/local/search/dict/
charset_type = zh_cn.utf-8
#source = python #对应的source名称
mlock = 0
morphology = none
min_word_len = 1
html_strip = 1
}
index delta
{
source = python_delta
path = /usr/local/search/data/article_delta
docinfo = extern
stopwords = /usr/local/search/dict/stopwords.txt
charset_dictpath = /usr/local/search/dict/
charset_type = zh_cn.utf-8
#source = python #对应的source名称
mlock = 0
morphology = none
min_word_len = 1
html_strip = 1
}
#全局index定义
indexer
{
mem_limit = 128M
}
#searchd服务定义
searchd
{
listen = 30001
max_matches = 1000
}
上一篇: 【万能】数据源:python数据源
下一篇: 数据源配置:pgsql数据源