1
2
3
4
5
6
7 """ Various storage (molecular and otherwise) functionality
8
9 """
10 from rdkit import RDConfig
11 from rdkit.Dbase import DbModule
12 from rdkit.Dbase.DbConnection import DbConnect
13
15 """ returns whether or not an RDId is valid
16
17 >>> ValidateRDId('RDCmpd-000-009-9')
18 1
19 >>> ValidateRDId('RDCmpd-009-000-009-8')
20 1
21 >>> ValidateRDId('RDCmpd-009-000-109-8')
22 0
23 >>> ValidateRDId('bogus')
24 0
25
26 """
27 id = id.replace('_','-')
28 splitId = id.split('-')
29 if len(splitId)<4:
30 return 0
31 accum = 0
32 for entry in splitId[1:-1]:
33 for char in entry:
34 try:
35 v = int(char)
36 except:
37 return 0
38 accum += v
39 crc = int(splitId[-1])
40 return accum%10 == crc
41
43 """ Returns the integer index for a given RDId
44 Throws a ValueError on error
45
46 >>> RDIdToInt('RDCmpd-000-009-9')
47 9
48 >>> RDIdToInt('RDCmpd-009-000-009-8')
49 9000009
50 >>> RDIdToInt('RDData_000_009_9')
51 9
52 >>> try:
53 ... RDIdToInt('RDCmpd-009-000-109-8')
54 ... except ValueError:
55 ... print 'ok'
56 ... else:
57 ... print 'failed'
58 ok
59 >>> try:
60 ... RDIdToInt('bogus')
61 ... except ValueError:
62 ... print 'ok'
63 ... else:
64 ... print 'failed'
65 ok
66
67 """
68 if validate and not ValidateRDId(id):
69 raise ValueError,"Bad RD Id"
70 id = id.replace('_','-')
71 terms = id.split('-')[1:-1]
72 res = 0
73 factor = 1
74 terms.reverse()
75 for term in terms:
76 res += factor*int(term)
77 factor *= 1000
78 return res
79
80
82 """ Converts an integer index into an RDId
83
84 The format of the ID is:
85 leadText-xxx-xxx-xxx-y
86 The number blocks are zero padded and the the final digit (y)
87 is a checksum:
88 >>> str(IndexToRDId(9))
89 'RDCmpd-000-009-9'
90 >>> str(IndexToRDId(9009))
91 'RDCmpd-009-009-8'
92
93 A millions block is included if it's nonzero:
94 >>> str(IndexToRDId(9000009))
95 'RDCmpd-009-000-009-8'
96
97 The text at the beginning can be altered:
98 >>> str(IndexToRDId(9,leadText='RDAlt'))
99 'RDAlt-000-009-9'
100
101 Negative indices are errors:
102 >>> try:
103 ... IndexToRDId(-1)
104 ... except ValueError:
105 ... print 'ok'
106 ... else:
107 ... print 'failed'
108 ok
109
110 """
111 if idx < 0:
112 raise ValueError,'indices must be >= zero'
113
114 res = leadText+'-'
115 tmpIdx = idx
116 if idx>=1e6:
117 res += '%03d-'%(idx//1e6)
118 tmpIdx = idx % int(1e6)
119 if tmpIdx<1000:
120 res += '000-'
121 else:
122 res += '%03d-'%(tmpIdx//1000)
123 tmpIdx = tmpIdx % 1000
124
125 res += '%03d-'%(tmpIdx)
126 accum = 0
127 txt = str(idx)
128 for char in txt:
129 accum += int(char)
130
131 res += str(accum%10)
132 return res
133
135 """ returns the next available Id in the database
136
137 see RegisterItem for testing/documentation
138
139 """
140 vals = conn.GetData(table=table,fields=idColName)
141 max = 0
142 for val in vals:
143 val = RDIdToInt(val[0],validate=0)
144 if val > max: max = val
145 max += 1
146 return max
147
148 -def GetNextRDId(conn,table,idColName='Id',leadText=''):
149 """ returns the next available RDId in the database
150
151 see RegisterItem for testing/documentation
152
153 """
154 if not leadText:
155 val = conn.GetData(table=table,fields=idColName)[0][0]
156 val = val.replace('_','-')
157 leadText = val.split('-')[0]
158
159 id = GetNextId(conn,table,idColName=idColName)
160 return IndexToRDId(id,leadText=leadText)
161
162 -def RegisterItem(conn,table,value,columnName,data=None,
163 id='',idColName='Id',leadText='RDCmpd'):
164 """
165
166 >>> dbName = RDConfig.RDTestDatabase
167 >>> conn = DbConnect(dbName)
168 >>> tblName = 'StorageTest'
169 >>> conn.AddTable(tblName,'id varchar(32) not null primary key,label varchar(40),val int')
170 >>> RegisterItem(conn,tblName,'label1','label',['label1',1])==(1, 'RDCmpd-000-001-1')
171 True
172 >>> RegisterItem(conn,tblName,'label2','label',['label2',1])==(1, 'RDCmpd-000-002-2')
173 True
174 >>> RegisterItem(conn,tblName,'label1','label',['label1',1])==(0, 'RDCmpd-000-001-1')
175 True
176 >>> str(GetNextRDId(conn,tblName))
177 'RDCmpd-000-003-3'
178 >>> tuple(conn.GetData(table=tblName)[0])==('RDCmpd-000-001-1', 'label1', 1)
179 True
180
181 It's also possible to provide ids by hand:
182 >>> RegisterItem(conn,tblName,'label10','label',['label10',1],id='RDCmpd-000-010-1')==(1, 'RDCmpd-000-010-1')
183 True
184 >>> str(GetNextRDId(conn,tblName))
185 'RDCmpd-000-011-2'
186
187 """
188 curs = conn.GetCursor()
189 query = 'select %s from %s where %s=%s'%(idColName,table,columnName,DbModule.placeHolder)
190 curs.execute(query,(value,))
191 tmp = curs.fetchone()
192 if tmp:
193 return 0,tmp[0]
194 if not id:
195 id = GetNextRDId(conn,table,idColName=idColName,leadText=leadText)
196 if data:
197 row = [id]
198 row.extend(data)
199 conn.InsertData(table,row)
200 conn.Commit()
201 return 1,id
202
203 -def RegisterItems(conn,table,values,columnName,rows,
204 startId='',idColName='Id',leadText='RDCmpd'):
205 """
206 """
207 if rows and len(rows) != len(values):
208 raise ValueError,"length mismatch between rows and values"
209 nVals = len(values)
210 origOrder={}
211 for i,v in enumerate(values):
212 origOrder[v]=i
213
214 curs = conn.GetCursor()
215 qs = ','.join(DbModule.placeHolder*nVals)
216 curs.execute("create temporary table regitemstemp (%(columnName)s)"%locals())
217 curs.executemany("insert into regitemstemp values (?)",[(x,) for x in values])
218 query = 'select %(columnName)s,%(idColName)s from %(table)s where %(columnName)s in (select * from regitemstemp)'%locals()
219 curs.execute(query)
220
221 dbData = curs.fetchall()
222 if dbData and len(dbData)==nVals:
223 return 0,[x[1] for x in dbData]
224
225 if not startId:
226 startId = GetNextRDId(conn,table,idColName=idColName,leadText=leadText)
227 startId = RDIdToInt(startId)
228 ids = [None]*nVals
229 for val,id in dbData:
230 ids[origOrder[val]]=id
231
232 rowsToInsert=[]
233 for i in range(nVals):
234 if ids[i] is None:
235 id = startId
236 startId += 1
237 id = IndexToRDId(id,leadText=leadText)
238 ids[i] = id
239 if rows:
240 row = [id]
241 row.extend(rows[i])
242 rowsToInsert.append(row)
243 if rowsToInsert:
244 nCols = len(rowsToInsert[0])
245 qs = ','.join(DbModule.placeHolder*nCols)
246 curs.executemany('insert into %(table)s values (%(qs)s)'%locals(),rowsToInsert)
247 conn.Commit()
248 return len(values)-len(dbData),ids
249
250
251
252
253
254
255
256
257
258 _roundtripTests = """
259 >>> ValidateRDId(IndexToRDId(100))
260 1
261 >>> ValidateRDId(IndexToRDId(10000,leadText='foo'))
262 1
263 >>> indices = [1,100,1000,1000000]
264 >>> vals = []
265 >>> for idx in indices:
266 ... vals.append(RDIdToInt(IndexToRDId(idx)))
267 >>> vals == indices
268 1
269
270 """
271 __test__ = {"roundtrip":_roundtripTests}
272
274 import doctest,sys
275 return doctest.testmod(sys.modules["__main__"])
276
277 if __name__ == '__main__':
278 import sys
279 failed,tried = _test()
280 sys.exit(failed)
281