Package Dbase :: Module StorageUtils
[hide private]
[frames] | no frames]

Source Code for Module Dbase.StorageUtils

  1  # $Id: StorageUtils.py 331 2007-09-18 05:12:00Z glandrum $ 
  2  # 
  3  #  Copyright (C) 2003-2006 Rational Discovery LLC 
  4  # 
  5  #   @@ All Rights Reserved  @@ 
  6  # 
  7  """ Various storage (molecular and otherwise) functionality 
  8   
  9  """ 
 10  import RDConfig 
 11  from Dbase import DbModule 
 12  from Dbase.DbConnection import DbConnect 
 13   
14 -def ValidateRDId(id):
15 """ returns whether or not an RDId is valid 16 17 >>> ValidateRDId('RDCmpd-000-009-9') 18 1 19 >>> ValidateRDId('RDCmpd-009-000-009-8') 20 1 21 >>> ValidateRDId('RDCmpd-009-000-109-8') 22 0 23 >>> ValidateRDId('bogus') 24 0 25 26 """ 27 id = id.replace('_','-') 28 splitId = id.split('-') 29 if len(splitId)<4: 30 return 0 31 accum = 0 32 for entry in splitId[1:-1]: 33 for char in entry: 34 try: 35 v = int(char) 36 except: 37 return 0 38 accum += v 39 crc = int(splitId[-1]) 40 return accum%10 == crc
41
42 -def RDIdToInt(id,validate=1):
43 """ Returns the integer index for a given RDId 44 Throws a ValueError on error 45 46 >>> RDIdToInt('RDCmpd-000-009-9') 47 9 48 >>> RDIdToInt('RDCmpd-009-000-009-8') 49 9000009 50 >>> RDIdToInt('RDData_000_009_9') 51 9 52 >>> try: 53 ... RDIdToInt('RDCmpd-009-000-109-8') 54 ... except ValueError: 55 ... print 'ok' 56 ... else: 57 ... print 'failed' 58 ok 59 >>> try: 60 ... RDIdToInt('bogus') 61 ... except ValueError: 62 ... print 'ok' 63 ... else: 64 ... print 'failed' 65 ok 66 67 """ 68 if validate and not ValidateRDId(id): 69 raise ValueError,"Bad RD Id" 70 id = id.replace('_','-') 71 terms = id.split('-')[1:-1] 72 res = 0 73 factor = 1 74 terms.reverse() 75 for term in terms: 76 res += factor*int(term) 77 factor *= 1000 78 return res
79 80
81 -def IndexToRDId(idx,leadText='RDCmpd'):
82 """ Converts an integer index into an RDId 83 84 The format of the ID is: 85 leadText-xxx-xxx-xxx-y 86 The number blocks are zero padded and the the final digit (y) 87 is a checksum: 88 >>> str(IndexToRDId(9)) 89 'RDCmpd-000-009-9' 90 >>> str(IndexToRDId(9009)) 91 'RDCmpd-009-009-8' 92 93 A millions block is included if it's nonzero: 94 >>> str(IndexToRDId(9000009)) 95 'RDCmpd-009-000-009-8' 96 97 The text at the beginning can be altered: 98 >>> str(IndexToRDId(9,leadText='RDAlt')) 99 'RDAlt-000-009-9' 100 101 Negative indices are errors: 102 >>> try: 103 ... IndexToRDId(-1) 104 ... except ValueError: 105 ... print 'ok' 106 ... else: 107 ... print 'failed' 108 ok 109 110 """ 111 if idx < 0: 112 raise ValueError,'indices must be >= zero' 113 114 res = leadText+'-' 115 tmpIdx = idx 116 if idx>=1e6: 117 res += '%03d-'%(idx//1e6) 118 tmpIdx = idx % int(1e6) 119 if tmpIdx<1000: 120 res += '000-' 121 else: 122 res += '%03d-'%(tmpIdx//1000) 123 tmpIdx = tmpIdx % 1000 124 125 res += '%03d-'%(tmpIdx) 126 accum = 0 127 txt = str(idx) 128 for char in txt: 129 accum += int(char) 130 131 res += str(accum%10) 132 return res
133
134 -def GetNextId(conn,table,idColName='Id'):
135 """ returns the next available Id in the database 136 137 see RegisterItem for testing/documentation 138 139 """ 140 vals = conn.GetData(table=table,fields=idColName) 141 max = 0 142 for val in vals: 143 val = RDIdToInt(val[0],validate=0) 144 if val > max: max = val 145 max += 1 146 return max
147
148 -def GetNextRDId(conn,table,idColName='Id',leadText=''):
149 """ returns the next available RDId in the database 150 151 see RegisterItem for testing/documentation 152 153 """ 154 if not leadText: 155 val = conn.GetData(table=table,fields=idColName)[0][0] 156 val = val.replace('_','-') 157 leadText = val.split('-')[0] 158 159 id = GetNextId(conn,table,idColName=idColName) 160 return IndexToRDId(id,leadText=leadText)
161
162 -def RegisterItem(conn,table,value,columnName,data=None, 163 id='',idColName='Id',leadText='RDCmpd'):
164 """ 165 166 >>> dbName = RDConfig.RDTestDatabase 167 >>> conn = DbConnect(dbName) 168 >>> tblName = 'StorageTest' 169 >>> conn.AddTable(tblName,'id varchar(32) not null primary key,label varchar(40),val int') 170 >>> RegisterItem(conn,tblName,'label1','label',['label1',1])==(1, 'RDCmpd-000-001-1') 171 True 172 >>> RegisterItem(conn,tblName,'label2','label',['label2',1])==(1, 'RDCmpd-000-002-2') 173 True 174 >>> RegisterItem(conn,tblName,'label1','label',['label1',1])==(0, 'RDCmpd-000-001-1') 175 True 176 >>> str(GetNextRDId(conn,tblName)) 177 'RDCmpd-000-003-3' 178 >>> tuple(conn.GetData(table=tblName)[0])==('RDCmpd-000-001-1', 'label1', 1) 179 True 180 181 It's also possible to provide ids by hand: 182 >>> RegisterItem(conn,tblName,'label10','label',['label10',1],id='RDCmpd-000-010-1')==(1, 'RDCmpd-000-010-1') 183 True 184 >>> str(GetNextRDId(conn,tblName)) 185 'RDCmpd-000-011-2' 186 187 """ 188 curs = conn.GetCursor() 189 query = 'select %s from %s where %s=%s'%(idColName,table,columnName,DbModule.placeHolder) 190 curs.execute(query,(value,)) 191 tmp = curs.fetchone() 192 if tmp: 193 return 0,tmp[0] 194 if not id: 195 id = GetNextRDId(conn,table,idColName=idColName,leadText=leadText) 196 if data: 197 row = [id] 198 row.extend(data) 199 conn.InsertData(table,row) 200 conn.Commit() 201 return 1,id
202 203 204 205 206 207 #------------------------------------ 208 # 209 # doctest boilerplate 210 # 211 _roundtripTests = """ 212 >>> ValidateRDId(IndexToRDId(100)) 213 1 214 >>> ValidateRDId(IndexToRDId(10000,leadText='foo')) 215 1 216 >>> indices = [1,100,1000,1000000] 217 >>> vals = [] 218 >>> for idx in indices: 219 ... vals.append(RDIdToInt(IndexToRDId(idx))) 220 >>> vals == indices 221 1 222 223 """ 224 __test__ = {"roundtrip":_roundtripTests} 225
226 -def _test():
227 import doctest,sys 228 return doctest.testmod(sys.modules["__main__"])
229 230 if __name__ == '__main__': 231 import sys 232 failed,tried = _test() 233 sys.exit(failed) 234