Package Dbase :: Module DbResultSet
[hide private]
[frames] | no frames]

Source Code for Module Dbase.DbResultSet

  1  # 
  2  #  Copyright (C) 2003  Greg Landrum and Rational Discovery LLC 
  3  # 
  4  """ defines class _DbResultSet_ for lazy interactions with Db query results 
  5   
  6  **Note** 
  7   
  8    this uses the Python iterator interface, so you'll need python 2.2 or above. 
  9   
 10  """ 
 11  import sys 
 12  from Dbase import DbInfo 
 13   
14 -class DbResultBase(object):
15 - def __init__(self,cursor,conn,cmd,removeDups=-1,transform=None,extras=None):
16 self.cursor = cursor 17 self.removeDups = removeDups 18 self.transform = transform 19 self.cmd = cmd 20 self.conn=conn 21 self.extras = extras 22 self.Reset() 23 self._initColumnNamesAndTypes()
24
25 - def Reset(self):
26 """ implement in subclasses 27 28 """ 29 try: 30 if not self.extras: 31 self.cursor.execute(self.cmd) 32 else: 33 self.cursor.execute(self.cmd,self.extras) 34 except: 35 sys.stderr.write('the command "%s" generated errors:\n'%(self.cmd)) 36 import traceback 37 traceback.print_exc()
38 39
40 - def __iter__(self):
41 self.Reset() 42 return self
43
44 - def _initColumnNamesAndTypes(self):
45 self.colNames = [] 46 self.colTypes = [] 47 for cName,cType in DbInfo.GetColumnInfoFromCursor(self.cursor): 48 self.colNames.append(cName) 49 self.colTypes.append(cType) 50 self.colNames = tuple(self.colNames) 51 self.colTypes = tuple(self.colTypes)
52
53 - def GetColumnNames(self):
54 return self.colNames
55 - def GetColumnTypes(self):
56 return self.colTypes
57 - def GetColumnNamesAndTypes(self):
58 res = [None]*len(self.colNames) 59 for i in range(len(self.colNames)): 60 res[i] = self.colNames[i],self.colTypes[i] 61 return tuple(res)
62 63
64 -class DbResultSet(DbResultBase):
65 """ Only supports forward iteration 66 67 """
68 - def __init__(self,*args,**kwargs):
69 DbResultBase.__init__(self,*args,**kwargs) 70 self.seen = [] 71 self._stopped = 0
72 - def Reset(self):
73 self._stopped = 0 74 DbResultBase.Reset(self)
75
76 - def next(self):
77 if self._stopped: 78 raise StopIteration 79 r = None 80 while r is None: 81 r = self.cursor.fetchone() 82 if not r: 83 self._stopped = 1 84 raise StopIteration 85 if self.transform is not None: 86 r = self.transform(r) 87 if self.removeDups>=0: 88 v = r[self.removeDups] 89 if v in self.seen: 90 r = None 91 else: 92 self.seen.append(v) 93 return r
94 95 96
97 -class RandomAccessDbResultSet(DbResultBase):
98 """ Supports random access 99 100 """
101 - def __init__(self,*args,**kwargs):
102 DbResultBase.__init__(self,*args,**kwargs) 103 self.results = [] 104 self.seen = [] 105 self._pos = -1
106
107 - def Reset(self):
108 self._pos = -1 109 if self.cursor is not None: 110 DbResultBase.Reset(self)
111
112 - def _finish(self):
113 if self.cursor: 114 #sys.stderr.write('_finish:\n') 115 r = self.cursor.fetchone() 116 while r: 117 if self.transform is not None: 118 r = self.transform(r) 119 if self.removeDups >=0: 120 v = r[self.removeDups] 121 if v not in self.seen: 122 self.seen.append(v) 123 self.results.append(r) 124 else: 125 self.results.append(r) 126 r = self.cursor.fetchone() 127 self.cursor = None
128 - def __getitem__(self,idx):
129 if idx < 0: raise IndexError,"negative indices not supported" 130 if self.cursor is None: 131 if len(self.results): 132 if idx >= len(self.results): 133 raise IndexError,'index %d too large (%d max)'%(idx,len(self.results)) 134 else: 135 raise ValueError,'Invalid cursor' 136 137 while idx >= len(self.results): 138 r = None 139 while r is None: 140 r = self.cursor.fetchone() 141 if not r: 142 self.cursor = None 143 raise IndexError,'index %d too large (%d max)'%(idx,len(self.results)) 144 145 if self.transform is not None: 146 r = self.transform(r) 147 if self.removeDups>=0: 148 v = r[self.removeDups] 149 if v in self.seen: 150 r = None 151 else: 152 self.results.append(r) 153 self.seen.append(v) 154 else: 155 self.results.append(r) 156 157 return self.results[idx]
158
159 - def __len__(self):
160 if self.results is None: 161 raise ValueError,"len() not supported for noMemory Results Sets" 162 self._finish() 163 return len(self.results)
164
165 - def next(self):
166 self._pos += 1 167 res = None 168 if self._pos < len(self): 169 res = self.results[self._pos] 170 else: 171 raise StopIteration 172 return res
173 174 if __name__ == '__main__': 175 from Dbase.DbConnection import DbConnect 176 conn = DbConnect('TEST.GDB') 177 curs = conn.GetCursor() 178 print 'curs:',repr(curs) 179 curs.execute('select * from ten_elements') 180 set = RandomAccessDbResultSet(curs) 181 for i in range(12): 182 try: 183 val = set[i] 184 except IndexError: 185 assert i >= 10 186 187 print 'use len' 188 curs = conn.GetCursor() 189 curs.execute('select * from ten_elements') 190 set = RandomAccessDbResultSet(curs) 191 for i in range(len(set)): 192 val = set[i] 193 194 print 'use iter' 195 curs = conn.GetCursor() 196 curs.execute('select * from ten_elements') 197 set = DbResultSet(curs) 198 for thing in set: 199 id,val = thing 200 201 print 'dups' 202 curs = conn.GetCursor() 203 curs.execute('select * from ten_elements_dups') 204 set = DbResultSet(curs) 205 r = [] 206 for thing in set: 207 r.append(thing) 208 assert len(r)==20 209 210 curs = conn.GetCursor() 211 curs.execute('select * from ten_elements_dups') 212 set = DbResultSet(curs,removeDups=0) 213 r = [] 214 for thing in set: 215 r.append(thing) 216 assert len(r)==10 217 218 curs = conn.GetCursor() 219 curs.execute('select * from ten_elements_dups') 220 set = RandomAccessDbResultSet(curs,removeDups=0) 221 assert len(set)==10 222 assert set[0] == (0,11) 223 224 curs = conn.GetCursor() 225 curs.execute('select * from ten_elements_dups') 226 set = RandomAccessDbResultSet(curs,removeDups=0) 227 assert set[0] == (0,11) 228 assert set[1] == (2,21) 229 assert set[5] == (10,61) 230 231 curs = conn.GetCursor() 232 curs.execute('select * from ten_elements_dups') 233 set = RandomAccessDbResultSet(curs) 234 assert set[0] == (0,11) 235 assert set[1] == (0,11) 236