jpayne@68
|
1 # Copyright 2002 by Andrew Dalke. All rights reserved.
|
jpayne@68
|
2 # Revisions 2007-2010 copyright by Peter Cock. All rights reserved.
|
jpayne@68
|
3 # Revisions 2009 copyright by Brad Chapman. All rights reserved.
|
jpayne@68
|
4 # Revisions 2013 copyright by Tiago Antao. All rights reserved.
|
jpayne@68
|
5 #
|
jpayne@68
|
6 # This file is part of the Biopython distribution and governed by your
|
jpayne@68
|
7 # choice of the "Biopython License Agreement" or the "BSD 3-Clause License".
|
jpayne@68
|
8 # Please see the LICENSE file that should have been included as part of this
|
jpayne@68
|
9 # package.
|
jpayne@68
|
10 #
|
jpayne@68
|
11 # Note that BioSQL (including the database schema and scripts) is
|
jpayne@68
|
12 # available and licensed separately. Please consult www.biosql.org
|
jpayne@68
|
13 """Helper code for Biopython's BioSQL code (for internal use)."""
|
jpayne@68
|
14
|
jpayne@68
|
15 import os
|
jpayne@68
|
16 from typing import Dict, Type
|
jpayne@68
|
17
|
jpayne@68
|
18
|
jpayne@68
|
19 _dbutils: Dict[str, Type["Generic_dbutils"]] = {}
|
jpayne@68
|
20
|
jpayne@68
|
21
|
jpayne@68
|
22 class Generic_dbutils:
|
jpayne@68
|
23 """Default database utilities."""
|
jpayne@68
|
24
|
jpayne@68
|
25 def __init__(self):
|
jpayne@68
|
26 """Create a Generic_dbutils object."""
|
jpayne@68
|
27
|
jpayne@68
|
28 def tname(self, table):
|
jpayne@68
|
29 """Return the name of the table."""
|
jpayne@68
|
30 if table != "biosequence":
|
jpayne@68
|
31 return table
|
jpayne@68
|
32 else:
|
jpayne@68
|
33 return "bioentry"
|
jpayne@68
|
34
|
jpayne@68
|
35 def last_id(self, cursor, table):
|
jpayne@68
|
36 """Return the last used id for a table."""
|
jpayne@68
|
37 # XXX: Unsafe without transactions isolation
|
jpayne@68
|
38 table = self.tname(table)
|
jpayne@68
|
39 sql = f"select max({table}_id) from {table}"
|
jpayne@68
|
40 cursor.execute(sql)
|
jpayne@68
|
41 rv = cursor.fetchone()
|
jpayne@68
|
42 return rv[0]
|
jpayne@68
|
43
|
jpayne@68
|
44 def execute(self, cursor, sql, args=None):
|
jpayne@68
|
45 """Just execute an sql command."""
|
jpayne@68
|
46 cursor.execute(sql, args or ())
|
jpayne@68
|
47
|
jpayne@68
|
48 def executemany(self, cursor, sql, seq):
|
jpayne@68
|
49 """Execute many sql commands."""
|
jpayne@68
|
50 cursor.executemany(sql, seq)
|
jpayne@68
|
51
|
jpayne@68
|
52 def autocommit(self, conn, y=1):
|
jpayne@68
|
53 """Set autocommit on the database connection."""
|
jpayne@68
|
54 # Let's hope it was not really needed
|
jpayne@68
|
55
|
jpayne@68
|
56
|
jpayne@68
|
57 class Sqlite_dbutils(Generic_dbutils):
|
jpayne@68
|
58 """Custom database utilities for SQLite."""
|
jpayne@68
|
59
|
jpayne@68
|
60 def _sub_placeholder(self, sql):
|
jpayne@68
|
61 """Format the argument placeholders for sqlite (PRIVATE)."""
|
jpayne@68
|
62 return sql.replace("%s", "?")
|
jpayne@68
|
63
|
jpayne@68
|
64 def execute(self, cursor, sql, args=None):
|
jpayne@68
|
65 """Execute SQL command.
|
jpayne@68
|
66
|
jpayne@68
|
67 Replaces %s with ? for variable substitution in sqlite3.
|
jpayne@68
|
68 """
|
jpayne@68
|
69 sql = self._sub_placeholder(sql)
|
jpayne@68
|
70 cursor.execute(sql, args or ())
|
jpayne@68
|
71
|
jpayne@68
|
72 def executemany(self, cursor, sql, seq):
|
jpayne@68
|
73 """Execute many sql statements."""
|
jpayne@68
|
74 sql = self._sub_placeholder(sql)
|
jpayne@68
|
75 cursor.executemany(sql, seq)
|
jpayne@68
|
76
|
jpayne@68
|
77
|
jpayne@68
|
78 _dbutils["sqlite3"] = Sqlite_dbutils
|
jpayne@68
|
79
|
jpayne@68
|
80
|
jpayne@68
|
81 class Mysql_dbutils(Generic_dbutils):
|
jpayne@68
|
82 """Custom database utilities for MySQL."""
|
jpayne@68
|
83
|
jpayne@68
|
84 def last_id(self, cursor, table):
|
jpayne@68
|
85 """Return the last used id for a table."""
|
jpayne@68
|
86 if os.name == "java":
|
jpayne@68
|
87 return Generic_dbutils.last_id(self, cursor, table)
|
jpayne@68
|
88 try:
|
jpayne@68
|
89 # This worked on older versions of MySQL
|
jpayne@68
|
90 return cursor.insert_id()
|
jpayne@68
|
91 except AttributeError:
|
jpayne@68
|
92 # See bug 2390
|
jpayne@68
|
93 # Google suggests this is the new way,
|
jpayne@68
|
94 # same fix also suggested by Eric Gibert:
|
jpayne@68
|
95 return cursor.lastrowid
|
jpayne@68
|
96
|
jpayne@68
|
97
|
jpayne@68
|
98 _dbutils["MySQLdb"] = Mysql_dbutils
|
jpayne@68
|
99
|
jpayne@68
|
100
|
jpayne@68
|
101 class _PostgreSQL_dbutils(Generic_dbutils):
|
jpayne@68
|
102 """Base class for any PostgreSQL adaptor."""
|
jpayne@68
|
103
|
jpayne@68
|
104 def next_id(self, cursor, table):
|
jpayne@68
|
105 table = self.tname(table)
|
jpayne@68
|
106 sql = f"SELECT nextval('{table}_pk_seq')"
|
jpayne@68
|
107 cursor.execute(sql)
|
jpayne@68
|
108 rv = cursor.fetchone()
|
jpayne@68
|
109 return rv[0]
|
jpayne@68
|
110
|
jpayne@68
|
111 def last_id(self, cursor, table):
|
jpayne@68
|
112 table = self.tname(table)
|
jpayne@68
|
113 sql = f"SELECT currval('{table}_pk_seq')"
|
jpayne@68
|
114 cursor.execute(sql)
|
jpayne@68
|
115 rv = cursor.fetchone()
|
jpayne@68
|
116 return rv[0]
|
jpayne@68
|
117
|
jpayne@68
|
118
|
jpayne@68
|
119 class Psycopg2_dbutils(_PostgreSQL_dbutils):
|
jpayne@68
|
120 """Custom database utilities for Psycopg2 (PostgreSQL)."""
|
jpayne@68
|
121
|
jpayne@68
|
122 def autocommit(self, conn, y=True):
|
jpayne@68
|
123 """Set autocommit on the database connection."""
|
jpayne@68
|
124 if y:
|
jpayne@68
|
125 if os.name == "java":
|
jpayne@68
|
126 conn.autocommit = 1
|
jpayne@68
|
127 else:
|
jpayne@68
|
128 conn.set_isolation_level(0)
|
jpayne@68
|
129 else:
|
jpayne@68
|
130 if os.name == "java":
|
jpayne@68
|
131 conn.autocommit = 0
|
jpayne@68
|
132 else:
|
jpayne@68
|
133 conn.set_isolation_level(1)
|
jpayne@68
|
134
|
jpayne@68
|
135
|
jpayne@68
|
136 _dbutils["psycopg2"] = Psycopg2_dbutils
|
jpayne@68
|
137
|
jpayne@68
|
138
|
jpayne@68
|
139 class Pgdb_dbutils(_PostgreSQL_dbutils):
|
jpayne@68
|
140 """Custom database utilities for Pgdb (aka PyGreSQL, for PostgreSQL)."""
|
jpayne@68
|
141
|
jpayne@68
|
142 def autocommit(self, conn, y=True):
|
jpayne@68
|
143 """Set autocommit on the database connection. Currently not implemented."""
|
jpayne@68
|
144 raise NotImplementedError("pgdb does not support this!")
|
jpayne@68
|
145
|
jpayne@68
|
146
|
jpayne@68
|
147 _dbutils["pgdb"] = Pgdb_dbutils
|
jpayne@68
|
148
|
jpayne@68
|
149
|
jpayne@68
|
150 def get_dbutils(module_name):
|
jpayne@68
|
151 """Return the correct dbutils object for the database driver."""
|
jpayne@68
|
152 try:
|
jpayne@68
|
153 return _dbutils[module_name]()
|
jpayne@68
|
154 except KeyError:
|
jpayne@68
|
155 return Generic_dbutils()
|