公司目前使用C语言的结构体来进行网络数据传输,这样很容易将序列化部分的代码用程序自动生成。但是缺点也很明显,如果数据条数增多之后,在网络中传输的数据就会有许多空洞,这个问题可以通过压缩来解决。另外一个缺点是,如果查询仅需要结构中的部分字段,则处理起来会十分麻烦,需要建立新的结构体。
如果使用动态语言,则可以在消耗一定效率的情况下,很容易解决这个问题。可以使用json来进行数据传输,仅传输需要的字段,而且可以进行压缩来传输。
- avro文件模式
- zip的json模式
- 多线程模式
- 线程池模式
- 多线程+内存缓存模式
第一个版本,使用avro来进行序列化,avro采用压缩的json来存储,并且头部带有元数据的定义,实现起来也较简单。第一个版本采用twisted负责网络通信,twisted是基于事件的:
1: class SuperQueryProtocol(protocol.Protocol):
2: def __init__(self, config):
3: self.config = config
4: self.constr = self.config.getValue('constr')
5:
6: def dataReceived(self, data):
7: dumper = DataDumper(self.constr, data)
8: self.transport.write(dumper.process())
9: self.transport.loseConnection()
10:
11: class SuperQueryFactory(protocol.Factory):
12: def __init__(self, config):
13: self.config = config
14:
15: def buildProtocol(self, addr):
16: return SuperQueryProtocol(config)
此处的DataDumper使用avro,并以压缩方式返回一个字符串,并通过transport写会客户端,这个最明显的问题是,控制流会被阻塞在生成avro的过程中。可以另起一个单独的线程,然后将transport传递给此线程来处理。但是这样就无需使用twisted模块了,因此我将其改造为更简单的socket方式:
1: if __name__ == "__main__":
2: config = Config('./superquery.conf')
3: config.load()
4:
5: logger = init_logger()
6:
7: conn = connect_db(config.getValue('constr'))
8: conn.outputtypehandler = outtypehandler
9:
10: listenfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
11: listenfd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
12: listenfd.bind((config.getValue('host'), int(config.getValue('port'))))
13: listenfd.listen(10)
14:
15: while True:
16: clientfd, addr = listenfd.accept()
17: logger.info('get connection from ' + str(addr))
18: datadumper = DataDumper(logger, conn, clientfd)
19: datadumper.start()
20:
21: listenfd.close()
22: conn.close()
并在一个独立的线程中执行数据的查询及写出:
1: from threading import Thread
2: class DataDumper(Thread):
3: def __init__(self, logger, conn, sock):
4: self.logger = logger
5: self.conn = conn
6: self.sock = sock
7: Thread.__init__(self)
8:
9: def run(self):
10: request = self.sock.recv(1024)
11: self.logger.info(request)
12: obj = json.loads(request)
13: self.compress = obj.has_key('compress')
14: self.sql = obj['sql']
15: self.sock.sendall(self.dump(self.sql))
16: self.sock.close()
17:
18: def makedict(self, cursor):
19: cols = [d[0] for d in cursor.description]
20:
21: def createrow(*args):
22: return dict(zip(cols, args))
23:
24: return createrow
25:
26: def dump(self, query):
27: cursor = self.conn.cursor()
28: value = []
29: try:
30: cursor.execute(query)
31: cursor.rowfactory = self.makedict(cursor)
32: while True:
33: res = cursor.fetchone()
34: if not res:break
35: value.append(res)
36: except oracle.DatabaseError, exc:
37: error, = exc.args
38: eobj = {}
39: eobj['code'] = error.code
40: eobj['message'] = error.message
41: return json.dumps(eobj) \
42: if not self.compress \
43: else zlib.compress(json.dumps(eobj), zlib.Z_BEST_COMPRESSION)
44:
45: cursor.close()
46:
47: return json.dumps(value) \
48: if not self.compress \
49: else zlib.compress(json.dumps(value), zlib.Z_BEST_COMPRESSION)
如果客户端要求压缩,则将结果压缩并返回。这个版本的服务性能得到了较好的提升,但是如果多个客户端执行相同的查询,每次都去查询数据库,则会造成很大的开销。如果将结果集缓存起来,则可以很好的解决这个问题,但是又引入了一个新的问题,假设用户A执行“select * from table1”,我们将数据集存储为dataset1,然后用户B对table1进行了几个写操作(如添加了数条记录),此时,用户A又一次执行“select * from table1”,如果将dataset1返回,是为脏数据。如果数据库提供对指定query的通知机制就好了,好在cx_oracle的5.0版本中,已经提供了这种通知机制,当数据库发生某些事件的时候,我们可以通过回调函数的方式得到通知:
1: import cx_Oracle as oracle
2:
3: def optostring(op):
4: oplist = []
5:
6: if op &; oracle.OPCODE_INSERT:
7: oplist.append("insert")
8: if op &; oracle.OPCODE_DELETE:
9: oplist.append("delete")
10: if op &; oracle.OPCODE_UPDATE:
11: oplist.append("update")
12: if op &; oracle.OPCODE_ALTER:
13: oplist.append("alter")
14: if op &; oracle.OPCODE_DROP:
15: oplist.append("drop")
16: if op &; oracle.OPCODE_ALLOPS:
17: oplist.append("all operations")
18:
19: return ", ".join(oplist)
20:
21: def onchanges(message):
22: print "message received"
23: print "database : ", message.dbname
24: print "tables : "
25: for table in message.tables:
26: print "name:", table.name,
27: print "op:",
28: print optostring(table.operation)
29: if table.rows is None \
30: or table.operation &; oracle.OPCODE_ALLROWS:
31: print "rows : all rows"
32: else:
33: print "rows : "
34: for row in table.rows:
35: print "rowid : ", row.rowid
36: print "op : ",
37: print optostring(row.operation)
38:
39:
40: if __name__ == "__main__":
41: con = oracle.Connection(constr, events=True)
42: sql = "select * from dpdm_cur_table"
43:
44: subwrite = con.subscribe(callback=onchanges, \
45: operations=oracle.OPCODE_INSERT | \
46: oracle.OPCODE_UPDATE | oracle.OPCODE_DELETE, rowids=True)
47: subwrite.registerquery(sql)
48:
49: raw_input("hit to terminate...\n")
50: con.close()
上边是一个oracle文档中提供的例子,注册onchanges来监听对’sql’的更新/插入/删除动作,我们可以通过此机制来完成:当关注的query中引用的表发生改变时,更新数据集,如将dataset1更新为dataset1`。此时客户端即可得到正确的数据。
后端可以使用一个sqlite来存储query和压缩过的data,如果客户端请求的query在sqlite中已经存在,则直接返回该条记录的data。当query对应的数据发生变更时,由另一个线程来修改sqlite中的记录。
现在此模块仍在在开发中,完成后进行一些测试,来验证加入缓存机制的效率。