Package rdkit :: Package Dbase :: Module StorageUtils
[hide private]
[frames] | no frames]

Source Code for Module rdkit.Dbase.StorageUtils

  1  # $Id: StorageUtils.py 997 2009-02-25 06:12:43Z glandrum $ 
  2  # 
  3  #  Copyright (C) 2003-2006 Rational Discovery LLC 
  4  # 
  5  #   @@ All Rights Reserved  @@ 
  6  # 
  7  """ Various storage (molecular and otherwise) functionality 
  8   
  9  """ 
 10  from rdkit import RDConfig 
 11  from rdkit.Dbase import DbModule 
 12  from rdkit.Dbase.DbConnection import DbConnect 
 13   
14 -def ValidateRDId(id):
15 """ returns whether or not an RDId is valid 16 17 >>> ValidateRDId('RDCmpd-000-009-9') 18 1 19 >>> ValidateRDId('RDCmpd-009-000-009-8') 20 1 21 >>> ValidateRDId('RDCmpd-009-000-109-8') 22 0 23 >>> ValidateRDId('bogus') 24 0 25 26 """ 27 id = id.replace('_','-') 28 splitId = id.split('-') 29 if len(splitId)<4: 30 return 0 31 accum = 0 32 for entry in splitId[1:-1]: 33 for char in entry: 34 try: 35 v = int(char) 36 except: 37 return 0 38 accum += v 39 crc = int(splitId[-1]) 40 return accum%10 == crc
41
42 -def RDIdToInt(id,validate=1):
43 """ Returns the integer index for a given RDId 44 Throws a ValueError on error 45 46 >>> RDIdToInt('RDCmpd-000-009-9') 47 9 48 >>> RDIdToInt('RDCmpd-009-000-009-8') 49 9000009 50 >>> RDIdToInt('RDData_000_009_9') 51 9 52 >>> try: 53 ... RDIdToInt('RDCmpd-009-000-109-8') 54 ... except ValueError: 55 ... print 'ok' 56 ... else: 57 ... print 'failed' 58 ok 59 >>> try: 60 ... RDIdToInt('bogus') 61 ... except ValueError: 62 ... print 'ok' 63 ... else: 64 ... print 'failed' 65 ok 66 67 """ 68 if validate and not ValidateRDId(id): 69 raise ValueError,"Bad RD Id" 70 id = id.replace('_','-') 71 terms = id.split('-')[1:-1] 72 res = 0 73 factor = 1 74 terms.reverse() 75 for term in terms: 76 res += factor*int(term) 77 factor *= 1000 78 return res
79 80
81 -def IndexToRDId(idx,leadText='RDCmpd'):
82 """ Converts an integer index into an RDId 83 84 The format of the ID is: 85 leadText-xxx-xxx-xxx-y 86 The number blocks are zero padded and the the final digit (y) 87 is a checksum: 88 >>> str(IndexToRDId(9)) 89 'RDCmpd-000-009-9' 90 >>> str(IndexToRDId(9009)) 91 'RDCmpd-009-009-8' 92 93 A millions block is included if it's nonzero: 94 >>> str(IndexToRDId(9000009)) 95 'RDCmpd-009-000-009-8' 96 97 The text at the beginning can be altered: 98 >>> str(IndexToRDId(9,leadText='RDAlt')) 99 'RDAlt-000-009-9' 100 101 Negative indices are errors: 102 >>> try: 103 ... IndexToRDId(-1) 104 ... except ValueError: 105 ... print 'ok' 106 ... else: 107 ... print 'failed' 108 ok 109 110 """ 111 if idx < 0: 112 raise ValueError,'indices must be >= zero' 113 114 res = leadText+'-' 115 tmpIdx = idx 116 if idx>=1e6: 117 res += '%03d-'%(idx//1e6) 118 tmpIdx = idx % int(1e6) 119 if tmpIdx<1000: 120 res += '000-' 121 else: 122 res += '%03d-'%(tmpIdx//1000) 123 tmpIdx = tmpIdx % 1000 124 125 res += '%03d-'%(tmpIdx) 126 accum = 0 127 txt = str(idx) 128 for char in txt: 129 accum += int(char) 130 131 res += str(accum%10) 132 return res
133
134 -def GetNextId(conn,table,idColName='Id'):
135 """ returns the next available Id in the database 136 137 see RegisterItem for testing/documentation 138 139 """ 140 vals = conn.GetData(table=table,fields=idColName) 141 max = 0 142 for val in vals: 143 val = RDIdToInt(val[0],validate=0) 144 if val > max: max = val 145 max += 1 146 return max
147
148 -def GetNextRDId(conn,table,idColName='Id',leadText=''):
149 """ returns the next available RDId in the database 150 151 see RegisterItem for testing/documentation 152 153 """ 154 if not leadText: 155 val = conn.GetData(table=table,fields=idColName)[0][0] 156 val = val.replace('_','-') 157 leadText = val.split('-')[0] 158 159 id = GetNextId(conn,table,idColName=idColName) 160 return IndexToRDId(id,leadText=leadText)
161
162 -def RegisterItem(conn,table,value,columnName,data=None, 163 id='',idColName='Id',leadText='RDCmpd'):
164 """ 165 166 >>> dbName = RDConfig.RDTestDatabase 167 >>> conn = DbConnect(dbName) 168 >>> tblName = 'StorageTest' 169 >>> conn.AddTable(tblName,'id varchar(32) not null primary key,label varchar(40),val int') 170 >>> RegisterItem(conn,tblName,'label1','label',['label1',1])==(1, 'RDCmpd-000-001-1') 171 True 172 >>> RegisterItem(conn,tblName,'label2','label',['label2',1])==(1, 'RDCmpd-000-002-2') 173 True 174 >>> RegisterItem(conn,tblName,'label1','label',['label1',1])==(0, 'RDCmpd-000-001-1') 175 True 176 >>> str(GetNextRDId(conn,tblName)) 177 'RDCmpd-000-003-3' 178 >>> tuple(conn.GetData(table=tblName)[0])==('RDCmpd-000-001-1', 'label1', 1) 179 True 180 181 It's also possible to provide ids by hand: 182 >>> RegisterItem(conn,tblName,'label10','label',['label10',1],id='RDCmpd-000-010-1')==(1, 'RDCmpd-000-010-1') 183 True 184 >>> str(GetNextRDId(conn,tblName)) 185 'RDCmpd-000-011-2' 186 187 """ 188 curs = conn.GetCursor() 189 query = 'select %s from %s where %s=%s'%(idColName,table,columnName,DbModule.placeHolder) 190 curs.execute(query,(value,)) 191 tmp = curs.fetchone() 192 if tmp: 193 return 0,tmp[0] 194 if not id: 195 id = GetNextRDId(conn,table,idColName=idColName,leadText=leadText) 196 if data: 197 row = [id] 198 row.extend(data) 199 conn.InsertData(table,row) 200 conn.Commit() 201 return 1,id
202
203 -def RegisterItems(conn,table,values,columnName,rows, 204 startId='',idColName='Id',leadText='RDCmpd'):
205 """ 206 """ 207 if rows and len(rows) != len(values): 208 raise ValueError,"length mismatch between rows and values" 209 nVals = len(values) 210 origOrder={} 211 for i,v in enumerate(values): 212 origOrder[v]=i 213 214 curs = conn.GetCursor() 215 qs = ','.join(DbModule.placeHolder*nVals) 216 curs.execute("create temporary table regitemstemp (%(columnName)s)"%locals()) 217 curs.executemany("insert into regitemstemp values (?)",[(x,) for x in values]) 218 query = 'select %(columnName)s,%(idColName)s from %(table)s where %(columnName)s in (select * from regitemstemp)'%locals() 219 curs.execute(query) 220 221 dbData = curs.fetchall() 222 if dbData and len(dbData)==nVals: 223 return 0,[x[1] for x in dbData] 224 225 if not startId: 226 startId = GetNextRDId(conn,table,idColName=idColName,leadText=leadText) 227 startId = RDIdToInt(startId) 228 ids = [None]*nVals 229 for val,id in dbData: 230 ids[origOrder[val]]=id 231 232 rowsToInsert=[] 233 for i in range(nVals): 234 if ids[i] is None: 235 id = startId 236 startId += 1 237 id = IndexToRDId(id,leadText=leadText) 238 ids[i] = id 239 if rows: 240 row = [id] 241 row.extend(rows[i]) 242 rowsToInsert.append(row) 243 if rowsToInsert: 244 nCols = len(rowsToInsert[0]) 245 qs = ','.join(DbModule.placeHolder*nCols) 246 curs.executemany('insert into %(table)s values (%(qs)s)'%locals(),rowsToInsert) 247 conn.Commit() 248 return len(values)-len(dbData),ids
249 250 251 252 253 254 #------------------------------------ 255 # 256 # doctest boilerplate 257 # 258 _roundtripTests = """ 259 >>> ValidateRDId(IndexToRDId(100)) 260 1 261 >>> ValidateRDId(IndexToRDId(10000,leadText='foo')) 262 1 263 >>> indices = [1,100,1000,1000000] 264 >>> vals = [] 265 >>> for idx in indices: 266 ... vals.append(RDIdToInt(IndexToRDId(idx))) 267 >>> vals == indices 268 1 269 270 """ 271 __test__ = {"roundtrip":_roundtripTests} 272
273 -def _test():
274 import doctest,sys 275 return doctest.testmod(sys.modules["__main__"])
276 277 if __name__ == '__main__': 278 import sys 279 failed,tried = _test() 280 sys.exit(failed) 281