I code it

Code and Life

Oracle通用查询服务器的设计与实现

公司目前使用C语言的结构体来进行网络数据传输,这样很容易将序列化部分的代码用程序自动生成。但是缺点也很明显,如果数据条数增多之后,在网络中传输的数据就会有许多空洞,这个问题可以通过压缩来解决。另外一个缺点是,如果查询仅需要结构中的部分字段,则处理起来会十分麻烦,需要建立新的结构体。

如果使用动态语言,则可以在消耗一定效率的情况下,很容易解决这个问题。可以使用json来进行数据传输,仅传输需要的字段,而且可以进行压缩来传输。

  1. avro文件模式
  2. zip的json模式
  3. 多线程模式
  4. 线程池模式
  5. 多线程+内存缓存模式

第一个版本,使用avro来进行序列化,avro采用压缩的json来存储,并且头部带有元数据的定义,实现起来也较简单。第一个版本采用twisted负责网络通信,twisted是基于事件的:

   1: class SuperQueryProtocol(protocol.Protocol):
   2:     def __init__(self, config):
   3:         self.config = config
   4:         self.constr = self.config.getValue('constr')
   5:  
   6:     def dataReceived(self, data):
   7:         dumper = DataDumper(self.constr, data)
   8:         self.transport.write(dumper.process())
   9:         self.transport.loseConnection()
  10:  
  11: class SuperQueryFactory(protocol.Factory):
  12:     def __init__(self, config):
  13:         self.config = config
  14:  
  15:     def buildProtocol(self, addr):
  16:         return SuperQueryProtocol(config)

此处的DataDumper使用avro,并以压缩方式返回一个字符串,并通过transport写会客户端,这个最明显的问题是,控制流会被阻塞在生成avro的过程中。可以另起一个单独的线程,然后将transport传递给此线程来处理。但是这样就无需使用twisted模块了,因此我将其改造为更简单的socket方式:

   1: if __name__ == "__main__":
   2:     config = Config('./superquery.conf')
   3:     config.load()
   4:  
   5:     logger = init_logger()
   6:  
   7:     conn = connect_db(config.getValue('constr'))
   8:     conn.outputtypehandler = outtypehandler
   9:  
  10:     listenfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  11:     listenfd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  12:     listenfd.bind((config.getValue('host'), int(config.getValue('port'))))
  13:     listenfd.listen(10)
  14:  
  15:     while True:
  16:         clientfd, addr = listenfd.accept()
  17:         logger.info('get connection from ' + str(addr))
  18:         datadumper = DataDumper(logger, conn, clientfd)
  19:         datadumper.start()
  20:  
  21:     listenfd.close()
  22:     conn.close()

 
并在一个独立的线程中执行数据的查询及写出:
   1: from threading import Thread
   2: class DataDumper(Thread):
   3:     def __init__(self, logger, conn, sock):
   4:         self.logger = logger
   5:         self.conn = conn
   6:         self.sock = sock
   7:         Thread.__init__(self)
   8:     
   9:     def run(self):
  10:         request = self.sock.recv(1024)
  11:         self.logger.info(request)
  12:         obj = json.loads(request)
  13:         self.compress = obj.has_key('compress')
  14:         self.sql = obj['sql']
  15:         self.sock.sendall(self.dump(self.sql))
  16:         self.sock.close()
  17:  
  18:     def makedict(self, cursor):
  19:         cols = [d[0] for d in cursor.description]
  20:  
  21:         def createrow(*args):
  22:             return dict(zip(cols, args))
  23:  
  24:         return createrow
  25:  
  26:     def dump(self, query):
  27:         cursor = self.conn.cursor()
  28:         value = []
  29:         try:
  30:             cursor.execute(query)
  31:             cursor.rowfactory = self.makedict(cursor)
  32:             while True:
  33:                 res =  cursor.fetchone()
  34:                 if not res:break
  35:                 value.append(res)
  36:         except oracle.DatabaseError, exc:
  37:             error, = exc.args
  38:             eobj = {}
  39:             eobj['code'] = error.code
  40:             eobj['message'] = error.message
  41:             return json.dumps(eobj) \
  42:             if not self.compress \
  43:             else zlib.compress(json.dumps(eobj), zlib.Z_BEST_COMPRESSION)
  44:  
  45:         cursor.close()
  46:  
  47:         return json.dumps(value) \
  48:         if not self.compress \
  49:         else zlib.compress(json.dumps(value), zlib.Z_BEST_COMPRESSION)
 
如果客户端要求压缩,则将结果压缩并返回。这个版本的服务性能得到了较好的提升,但是如果多个客户端执行相同的查询,每次都去查询数据库,则会造成很大的开销。如果将结果集缓存起来,则可以很好的解决这个问题,但是又引入了一个新的问题,假设用户A执行“select * from table1”,我们将数据集存储为dataset1,然后用户B对table1进行了几个写操作(如添加了数条记录),此时,用户A又一次执行“select * from table1”,如果将dataset1返回,是为脏数据。如果数据库提供对指定query的通知机制就好了,好在cx_oracle的5.0版本中,已经提供了这种通知机制,当数据库发生某些事件的时候,我们可以通过回调函数的方式得到通知:
   1: import cx_Oracle as oracle
   2:  
   3: def optostring(op):
   4:     oplist = []
   5:  
   6:     if op &; oracle.OPCODE_INSERT:
   7:         oplist.append("insert")
   8:     if op &; oracle.OPCODE_DELETE:
   9:         oplist.append("delete")
  10:     if op &; oracle.OPCODE_UPDATE:
  11:         oplist.append("update")
  12:     if op &; oracle.OPCODE_ALTER:
  13:         oplist.append("alter")
  14:     if op &; oracle.OPCODE_DROP:
  15:         oplist.append("drop")
  16:     if op &; oracle.OPCODE_ALLOPS:
  17:         oplist.append("all operations")
  18:  
  19:     return ", ".join(oplist)
  20:  
  21: def onchanges(message):
  22:     print "message received"
  23:     print "database : ", message.dbname
  24:     print "tables   : "
  25:     for table in message.tables:
  26:         print "name:", table.name,
  27:         print "op:",
  28:         print optostring(table.operation)
  29:         if table.rows is None \
  30:                 or table.operation &; oracle.OPCODE_ALLROWS:
  31:                     print "rows : all rows"
  32:         else:
  33:             print "rows : "
  34:             for row in table.rows:
  35:                 print "rowid : ", row.rowid
  36:                 print "op : ",
  37:                 print optostring(row.operation)
  38:  
  39:  
  40: if __name__ == "__main__":
  41:     con = oracle.Connection(constr, events=True)
  42:     sql = "select * from dpdm_cur_table"
  43:  
  44:     subwrite = con.subscribe(callback=onchanges, \
  45:             operations=oracle.OPCODE_INSERT | \
  46:             oracle.OPCODE_UPDATE | oracle.OPCODE_DELETE, rowids=True)
  47:     subwrite.registerquery(sql)
  48:  
  49:     raw_input("hit to terminate...\n")
  50:     con.close()
 
上边是一个oracle文档中提供的例子,注册onchanges来监听对’sql’的更新/插入/删除动作,我们可以通过此机制来完成:当关注的query中引用的表发生改变时,更新数据集,如将dataset1更新为dataset1`。此时客户端即可得到正确的数据。
 
后端可以使用一个sqlite来存储query和压缩过的data,如果客户端请求的query在sqlite中已经存在,则直接返回该条记录的data。当query对应的数据发生变更时,由另一个线程来修改sqlite中的记录。
 
现在此模块仍在在开发中,完成后进行一些测试,来验证加入缓存机制的效率。

Comments