1
2
3
4 """ defines class _DbResultSet_ for lazy interactions with Db query results
5
6 **Note**
7
8 this uses the Python iterator interface, so you'll need python 2.2 or above.
9
10 """
11 import sys
12 from Dbase import DbInfo
13
15 - def __init__(self,cursor,conn,cmd,removeDups=-1,transform=None,extras=None):
24
26 """ implement in subclasses
27
28 """
29 try:
30 if not self.extras:
31 self.cursor.execute(self.cmd)
32 else:
33 self.cursor.execute(self.cmd,self.extras)
34 except:
35 sys.stderr.write('the command "%s" generated errors:\n'%(self.cmd))
36 import traceback
37 traceback.print_exc()
38
39
41 self.Reset()
42 return self
43
45 self.colNames = []
46 self.colTypes = []
47 for cName,cType in DbInfo.GetColumnInfoFromCursor(self.cursor):
48 self.colNames.append(cName)
49 self.colTypes.append(cType)
50 self.colNames = tuple(self.colNames)
51 self.colTypes = tuple(self.colTypes)
52
58 res = [None]*len(self.colNames)
59 for i in range(len(self.colNames)):
60 res[i] = self.colNames[i],self.colTypes[i]
61 return tuple(res)
62
63
65 """ Only supports forward iteration
66
67 """
75
77 if self._stopped:
78 raise StopIteration
79 r = None
80 while r is None:
81 r = self.cursor.fetchone()
82 if not r:
83 self._stopped = 1
84 raise StopIteration
85 if self.transform is not None:
86 r = self.transform(r)
87 if self.removeDups>=0:
88 v = r[self.removeDups]
89 if v in self.seen:
90 r = None
91 else:
92 self.seen.append(v)
93 return r
94
95
96
98 """ Supports random access
99
100 """
102 DbResultBase.__init__(self,*args,**kwargs)
103 self.results = []
104 self.seen = []
105 self._pos = -1
106
111
113 if self.cursor:
114
115 r = self.cursor.fetchone()
116 while r:
117 if self.transform is not None:
118 r = self.transform(r)
119 if self.removeDups >=0:
120 v = r[self.removeDups]
121 if v not in self.seen:
122 self.seen.append(v)
123 self.results.append(r)
124 else:
125 self.results.append(r)
126 r = self.cursor.fetchone()
127 self.cursor = None
129 if idx < 0: raise IndexError,"negative indices not supported"
130 if self.cursor is None:
131 if len(self.results):
132 if idx >= len(self.results):
133 raise IndexError,'index %d too large (%d max)'%(idx,len(self.results))
134 else:
135 raise ValueError,'Invalid cursor'
136
137 while idx >= len(self.results):
138 r = None
139 while r is None:
140 r = self.cursor.fetchone()
141 if not r:
142 self.cursor = None
143 raise IndexError,'index %d too large (%d max)'%(idx,len(self.results))
144
145 if self.transform is not None:
146 r = self.transform(r)
147 if self.removeDups>=0:
148 v = r[self.removeDups]
149 if v in self.seen:
150 r = None
151 else:
152 self.results.append(r)
153 self.seen.append(v)
154 else:
155 self.results.append(r)
156
157 return self.results[idx]
158
160 if self.results is None:
161 raise ValueError,"len() not supported for noMemory Results Sets"
162 self._finish()
163 return len(self.results)
164
166 self._pos += 1
167 res = None
168 if self._pos < len(self):
169 res = self.results[self._pos]
170 else:
171 raise StopIteration
172 return res
173
174 if __name__ == '__main__':
175 from Dbase.DbConnection import DbConnect
176 conn = DbConnect('TEST.GDB')
177 curs = conn.GetCursor()
178 print 'curs:',repr(curs)
179 curs.execute('select * from ten_elements')
180 set = RandomAccessDbResultSet(curs)
181 for i in range(12):
182 try:
183 val = set[i]
184 except IndexError:
185 assert i >= 10
186
187 print 'use len'
188 curs = conn.GetCursor()
189 curs.execute('select * from ten_elements')
190 set = RandomAccessDbResultSet(curs)
191 for i in range(len(set)):
192 val = set[i]
193
194 print 'use iter'
195 curs = conn.GetCursor()
196 curs.execute('select * from ten_elements')
197 set = DbResultSet(curs)
198 for thing in set:
199 id,val = thing
200
201 print 'dups'
202 curs = conn.GetCursor()
203 curs.execute('select * from ten_elements_dups')
204 set = DbResultSet(curs)
205 r = []
206 for thing in set:
207 r.append(thing)
208 assert len(r)==20
209
210 curs = conn.GetCursor()
211 curs.execute('select * from ten_elements_dups')
212 set = DbResultSet(curs,removeDups=0)
213 r = []
214 for thing in set:
215 r.append(thing)
216 assert len(r)==10
217
218 curs = conn.GetCursor()
219 curs.execute('select * from ten_elements_dups')
220 set = RandomAccessDbResultSet(curs,removeDups=0)
221 assert len(set)==10
222 assert set[0] == (0,11)
223
224 curs = conn.GetCursor()
225 curs.execute('select * from ten_elements_dups')
226 set = RandomAccessDbResultSet(curs,removeDups=0)
227 assert set[0] == (0,11)
228 assert set[1] == (2,21)
229 assert set[5] == (10,61)
230
231 curs = conn.GetCursor()
232 curs.execute('select * from ten_elements_dups')
233 set = RandomAccessDbResultSet(curs)
234 assert set[0] == (0,11)
235 assert set[1] == (0,11)
236