jpayne@69: # Copyright 2002 by Andrew Dalke. All rights reserved. jpayne@69: # Revisions 2007-2016 copyright by Peter Cock. All rights reserved. jpayne@69: # Revisions 2009 copyright by Cymon J. Cox. All rights reserved. jpayne@69: # Revisions 2013-2014 copyright by Tiago Antao. All rights reserved. jpayne@69: # jpayne@69: # This file is part of the Biopython distribution and governed by your jpayne@69: # choice of the "Biopython License Agreement" or the "BSD 3-Clause License". jpayne@69: # Please see the LICENSE file that should have been included as part of this jpayne@69: # package. jpayne@69: # jpayne@69: # Note that BioSQL (including the database schema and scripts) is jpayne@69: # available and licensed separately. Please consult www.biosql.org jpayne@69: """Connect with a BioSQL database and load Biopython like objects from it. jpayne@69: jpayne@69: This provides interfaces for loading biological objects from a relational jpayne@69: database, and is compatible with the BioSQL standards. jpayne@69: """ jpayne@69: import os jpayne@69: jpayne@69: from . import BioSeq jpayne@69: from . import Loader jpayne@69: from . import DBUtils jpayne@69: jpayne@69: jpayne@69: _POSTGRES_RULES_PRESENT = False # Hack for BioSQL Bug 2839 jpayne@69: jpayne@69: jpayne@69: def open_database(driver="MySQLdb", **kwargs): jpayne@69: """Load an existing BioSQL-style database. jpayne@69: jpayne@69: This function is the easiest way to retrieve a connection to a jpayne@69: database, doing something like:: jpayne@69: jpayne@69: from BioSQL import BioSeqDatabase jpayne@69: server = BioSeqDatabase.open_database(user="root", db="minidb") jpayne@69: jpayne@69: Arguments: jpayne@69: - driver - The name of the database driver to use for connecting. The jpayne@69: driver should implement the python DB API. By default, the MySQLdb jpayne@69: driver is used. jpayne@69: - user -the username to connect to the database with. jpayne@69: - password, passwd - the password to connect with jpayne@69: - host - the hostname of the database jpayne@69: - database or db - the name of the database jpayne@69: jpayne@69: """ jpayne@69: if driver == "psycopg": jpayne@69: raise ValueError( jpayne@69: "Using BioSQL with psycopg (version one) is no " jpayne@69: "longer supported. Use psycopg2 instead." jpayne@69: ) jpayne@69: jpayne@69: if os.name == "java": jpayne@69: from com.ziclix.python.sql import zxJDBC jpayne@69: jpayne@69: module = zxJDBC jpayne@69: if driver in ["MySQLdb"]: jpayne@69: jdbc_driver = "com.mysql.jdbc.Driver" jpayne@69: url_pref = "jdbc:mysql://" + kwargs["host"] + "/" jpayne@69: elif driver in ["psycopg2"]: jpayne@69: jdbc_driver = "org.postgresql.Driver" jpayne@69: url_pref = "jdbc:postgresql://" + kwargs["host"] + "/" jpayne@69: jpayne@69: else: jpayne@69: module = __import__(driver, fromlist=["connect"]) jpayne@69: connect = module.connect jpayne@69: jpayne@69: # Different drivers use different keywords... jpayne@69: kw = kwargs.copy() jpayne@69: if driver in ["MySQLdb", "mysql.connector"] and os.name != "java": jpayne@69: if "database" in kw: jpayne@69: kw["db"] = kw["database"] jpayne@69: del kw["database"] jpayne@69: if "password" in kw: jpayne@69: kw["passwd"] = kw["password"] jpayne@69: del kw["password"] jpayne@69: # kw["charset"] = "utf8" jpayne@69: # kw["use_unicode"] = True jpayne@69: else: jpayne@69: # DB-API recommendations jpayne@69: if "db" in kw: jpayne@69: kw["database"] = kw["db"] jpayne@69: del kw["db"] jpayne@69: if "passwd" in kw: jpayne@69: kw["password"] = kw["passwd"] jpayne@69: del kw["passwd"] jpayne@69: if driver in ["psycopg2", "pgdb"] and not kw.get("database"): jpayne@69: kw["database"] = "template1" jpayne@69: # SQLite connect takes the database name as input jpayne@69: if os.name == "java": jpayne@69: if driver in ["MySQLdb"]: jpayne@69: conn = connect( jpayne@69: url_pref + kw.get("database", "mysql"), jpayne@69: kw["user"], jpayne@69: kw["password"], jpayne@69: jdbc_driver, jpayne@69: ) jpayne@69: elif driver in ["psycopg2"]: jpayne@69: conn = connect( jpayne@69: url_pref + kw.get("database", "postgresql") + "?stringtype=unspecified", jpayne@69: kw["user"], jpayne@69: kw["password"], jpayne@69: jdbc_driver, jpayne@69: ) jpayne@69: elif driver in ["sqlite3"]: jpayne@69: conn = connect(kw["database"]) jpayne@69: else: jpayne@69: conn = connect(**kw) jpayne@69: jpayne@69: if os.name == "java": jpayne@69: server = DBServer(conn, module, driver) jpayne@69: else: jpayne@69: server = DBServer(conn, module) jpayne@69: jpayne@69: # Sets MySQL to allow double quotes, rather than only backticks jpayne@69: if driver in ["MySQLdb", "mysql.connector"]: jpayne@69: server.adaptor.execute("SET sql_mode='ANSI_QUOTES';") jpayne@69: jpayne@69: # TODO - Remove the following once BioSQL Bug 2839 is fixed. jpayne@69: # Test for RULES in PostgreSQL schema, see also Bug 2833. jpayne@69: if driver in ["psycopg2", "pgdb"]: jpayne@69: sql = ( jpayne@69: "SELECT ev_class FROM pg_rewrite WHERE " jpayne@69: "rulename='rule_bioentry_i1' OR " jpayne@69: "rulename='rule_bioentry_i2';" jpayne@69: ) jpayne@69: if server.adaptor.execute_and_fetchall(sql): jpayne@69: import warnings jpayne@69: from Bio import BiopythonWarning jpayne@69: jpayne@69: warnings.warn( jpayne@69: "Your BioSQL PostgreSQL schema includes some rules " jpayne@69: "currently required for bioperl-db but which may" jpayne@69: "cause problems loading data using Biopython (see " jpayne@69: "BioSQL's RedMine Bug 2839 aka GitHub Issue 4 " jpayne@69: "https://github.com/biosql/biosql/issues/4). " jpayne@69: "If you do not use BioPerl, please remove these " jpayne@69: "rules. Biopython should cope with the rules " jpayne@69: "present, but with a performance penalty when " jpayne@69: "loading new records.", jpayne@69: BiopythonWarning, jpayne@69: ) jpayne@69: global _POSTGRES_RULES_PRESENT jpayne@69: _POSTGRES_RULES_PRESENT = True jpayne@69: jpayne@69: elif driver == "sqlite3": jpayne@69: # Tell SQLite that we want to use foreign keys jpayne@69: # https://www.sqlite.org/foreignkeys.html#fk_enable jpayne@69: server.adaptor.execute("PRAGMA foreign_keys = ON") jpayne@69: jpayne@69: return server jpayne@69: jpayne@69: jpayne@69: class DBServer: jpayne@69: """Represents a BioSQL database containing namespaces (sub-databases). jpayne@69: jpayne@69: This acts like a Python dictionary, giving access to each namespace jpayne@69: (defined by a row in the biodatabase table) as a BioSeqDatabase object. jpayne@69: """ jpayne@69: jpayne@69: def __init__(self, conn, module, module_name=None): jpayne@69: """Create a DBServer object. jpayne@69: jpayne@69: Arguments: jpayne@69: - conn - A database connection object jpayne@69: - module - The module used to create the database connection jpayne@69: - module_name - Optionally, the name of the module. Default: module.__name__ jpayne@69: jpayne@69: Normally you would not want to create a DBServer object yourself. jpayne@69: Instead use the open_database function, which returns an instance of DBServer. jpayne@69: """ jpayne@69: self.module = module jpayne@69: if module_name is None: jpayne@69: module_name = module.__name__ jpayne@69: if module_name == "mysql.connector": jpayne@69: wrap_cursor = True jpayne@69: else: jpayne@69: wrap_cursor = False jpayne@69: # Get module specific Adaptor or the base (general) Adaptor jpayne@69: Adapt = _interface_specific_adaptors.get(module_name, Adaptor) jpayne@69: self.adaptor = Adapt( jpayne@69: conn, DBUtils.get_dbutils(module_name), wrap_cursor=wrap_cursor jpayne@69: ) jpayne@69: self.module_name = module_name jpayne@69: jpayne@69: def __repr__(self): jpayne@69: """Return a short description of the class name and database connection.""" jpayne@69: return f"{self.__class__.__name__}({self.adaptor.conn!r})" jpayne@69: jpayne@69: def __getitem__(self, name): jpayne@69: """Return a BioSeqDatabase object. jpayne@69: jpayne@69: Arguments: jpayne@69: - name - The name of the BioSeqDatabase jpayne@69: jpayne@69: """ jpayne@69: return BioSeqDatabase(self.adaptor, name) jpayne@69: jpayne@69: def __len__(self): jpayne@69: """Return number of namespaces (sub-databases) in this database.""" jpayne@69: sql = "SELECT COUNT(name) FROM biodatabase;" jpayne@69: return int(self.adaptor.execute_and_fetch_col0(sql)[0]) jpayne@69: jpayne@69: def __contains__(self, value): jpayne@69: """Check if a namespace (sub-database) in this database.""" jpayne@69: sql = "SELECT COUNT(name) FROM biodatabase WHERE name=%s;" jpayne@69: return bool(self.adaptor.execute_and_fetch_col0(sql, (value,))[0]) jpayne@69: jpayne@69: def __iter__(self): jpayne@69: """Iterate over namespaces (sub-databases) in the database.""" jpayne@69: # TODO - Iterate over the cursor, much more efficient jpayne@69: return iter(self.adaptor.list_biodatabase_names()) jpayne@69: jpayne@69: def keys(self): jpayne@69: """Iterate over namespaces (sub-databases) in the database.""" jpayne@69: return iter(self) jpayne@69: jpayne@69: def values(self): jpayne@69: """Iterate over BioSeqDatabase objects in the database.""" jpayne@69: for key in self: jpayne@69: yield self[key] jpayne@69: jpayne@69: def items(self): jpayne@69: """Iterate over (namespace, BioSeqDatabase) in the database.""" jpayne@69: for key in self: jpayne@69: yield key, self[key] jpayne@69: jpayne@69: def __delitem__(self, name): jpayne@69: """Remove a namespace and all its entries.""" jpayne@69: if name not in self: jpayne@69: raise KeyError(name) jpayne@69: db_id = self.adaptor.fetch_dbid_by_dbname(name) jpayne@69: remover = Loader.DatabaseRemover(self.adaptor, db_id) jpayne@69: remover.remove() jpayne@69: jpayne@69: def new_database(self, db_name, authority=None, description=None): jpayne@69: """Add a new database to the server and return it.""" jpayne@69: # make the database jpayne@69: sql = ( jpayne@69: "INSERT INTO biodatabase (name, authority, description)" jpayne@69: " VALUES (%s, %s, %s)" jpayne@69: ) jpayne@69: self.adaptor.execute(sql, (db_name, authority, description)) jpayne@69: return BioSeqDatabase(self.adaptor, db_name) jpayne@69: jpayne@69: def load_database_sql(self, sql_file): jpayne@69: """Load a database schema into the given database. jpayne@69: jpayne@69: This is used to create tables, etc when a database is first created. jpayne@69: sql_file should specify the complete path to a file containing jpayne@69: SQL entries for building the tables. jpayne@69: """ jpayne@69: # Not sophisticated enough for PG schema. Is it needed by MySQL? jpayne@69: # Looks like we need this more complicated way for both. Leaving it jpayne@69: # the default and removing the simple-minded approach. jpayne@69: jpayne@69: # read the file with all comment lines removed jpayne@69: sql = "" jpayne@69: with open(sql_file) as sql_handle: jpayne@69: for line in sql_handle: jpayne@69: if line.startswith("--"): # don't include comment lines jpayne@69: pass jpayne@69: elif line.startswith("#"): # ditto for MySQL comments jpayne@69: pass jpayne@69: elif line.strip(): # only include non-blank lines jpayne@69: sql += line.strip() + " " jpayne@69: jpayne@69: # two ways to load the SQL jpayne@69: # 1. PostgreSQL can load it all at once and actually needs to jpayne@69: # due to FUNCTION defines at the end of the SQL which mess up jpayne@69: # the splitting by semicolons jpayne@69: if self.module_name in ["psycopg2", "pgdb"]: jpayne@69: self.adaptor.cursor.execute(sql) jpayne@69: # 2. MySQL needs the database loading split up into single lines of jpayne@69: # SQL executed one at a time jpayne@69: elif self.module_name in ["mysql.connector", "MySQLdb", "sqlite3"]: jpayne@69: sql_parts = sql.split(";") # one line per sql command jpayne@69: # don't use the last item, it's blank jpayne@69: for sql_line in sql_parts[:-1]: jpayne@69: self.adaptor.cursor.execute(sql_line) jpayne@69: else: jpayne@69: raise ValueError(f"Module {self.module_name} not supported by the loader.") jpayne@69: jpayne@69: def commit(self): jpayne@69: """Commit the current transaction to the database.""" jpayne@69: return self.adaptor.commit() jpayne@69: jpayne@69: def rollback(self): jpayne@69: """Roll-back the current transaction.""" jpayne@69: return self.adaptor.rollback() jpayne@69: jpayne@69: def close(self): jpayne@69: """Close the connection. No further activity possible.""" jpayne@69: return self.adaptor.close() jpayne@69: jpayne@69: jpayne@69: class _CursorWrapper: jpayne@69: """A wrapper for mysql.connector resolving bytestring representations.""" jpayne@69: jpayne@69: def __init__(self, real_cursor): jpayne@69: self.real_cursor = real_cursor jpayne@69: jpayne@69: def execute(self, operation, params=None, multi=False): jpayne@69: """Execute a sql statement.""" jpayne@69: self.real_cursor.execute(operation, params, multi) jpayne@69: jpayne@69: def executemany(self, operation, params): jpayne@69: """Execute many sql statements.""" jpayne@69: self.real_cursor.executemany(operation, params) jpayne@69: jpayne@69: def _convert_tuple(self, tuple_): jpayne@69: """Decode any bytestrings present in the row (PRIVATE).""" jpayne@69: tuple_list = list(tuple_) jpayne@69: for i, elem in enumerate(tuple_list): jpayne@69: if isinstance(elem, bytes): jpayne@69: tuple_list[i] = elem.decode("utf-8") jpayne@69: return tuple(tuple_list) jpayne@69: jpayne@69: def _convert_list(self, lst): jpayne@69: ret_lst = [] jpayne@69: for tuple_ in lst: jpayne@69: new_tuple = self._convert_tuple(tuple_) jpayne@69: ret_lst.append(new_tuple) jpayne@69: return ret_lst jpayne@69: jpayne@69: def fetchall(self): jpayne@69: rv = self.real_cursor.fetchall() jpayne@69: return self._convert_list(rv) jpayne@69: jpayne@69: def fetchone(self): jpayne@69: tuple_ = self.real_cursor.fetchone() jpayne@69: return self._convert_tuple(tuple_) jpayne@69: jpayne@69: jpayne@69: class Adaptor: jpayne@69: """High level wrapper for a database connection and cursor. jpayne@69: jpayne@69: Most database calls in BioSQL are done indirectly though this adaptor jpayne@69: class. This provides helper methods for fetching data and executing jpayne@69: sql. jpayne@69: """ jpayne@69: jpayne@69: def __init__(self, conn, dbutils, wrap_cursor=False): jpayne@69: """Create an Adaptor object. jpayne@69: jpayne@69: Arguments: jpayne@69: - conn - A database connection jpayne@69: - dbutils - A BioSQL.DBUtils object jpayne@69: - wrap_cursor - Optional, whether to wrap the cursor object jpayne@69: jpayne@69: """ jpayne@69: self.conn = conn jpayne@69: if wrap_cursor: jpayne@69: self.cursor = _CursorWrapper(conn.cursor()) jpayne@69: else: jpayne@69: self.cursor = conn.cursor() jpayne@69: self.dbutils = dbutils jpayne@69: jpayne@69: def last_id(self, table): jpayne@69: """Return the last row id for the selected table.""" jpayne@69: return self.dbutils.last_id(self.cursor, table) jpayne@69: jpayne@69: def autocommit(self, y=True): jpayne@69: """Set the autocommit mode. True values enable; False value disable.""" jpayne@69: return self.dbutils.autocommit(self.conn, y) jpayne@69: jpayne@69: def commit(self): jpayne@69: """Commit the current transaction.""" jpayne@69: return self.conn.commit() jpayne@69: jpayne@69: def rollback(self): jpayne@69: """Roll-back the current transaction.""" jpayne@69: return self.conn.rollback() jpayne@69: jpayne@69: def close(self): jpayne@69: """Close the connection. No further activity possible.""" jpayne@69: return self.conn.close() jpayne@69: jpayne@69: def fetch_dbid_by_dbname(self, dbname): jpayne@69: """Return the internal id for the sub-database using its name.""" jpayne@69: self.execute( jpayne@69: "select biodatabase_id from biodatabase where name = %s", (dbname,) jpayne@69: ) jpayne@69: rv = self.cursor.fetchall() jpayne@69: if not rv: jpayne@69: raise KeyError(f"Cannot find biodatabase with name {dbname!r}") jpayne@69: return rv[0][0] jpayne@69: jpayne@69: def fetch_seqid_by_display_id(self, dbid, name): jpayne@69: """Return the internal id for a sequence using its display id. jpayne@69: jpayne@69: Arguments: jpayne@69: - dbid - the internal id for the sub-database jpayne@69: - name - the name of the sequence. Corresponds to the jpayne@69: name column of the bioentry table of the SQL schema jpayne@69: jpayne@69: """ jpayne@69: sql = "select bioentry_id from bioentry where name = %s" jpayne@69: fields = [name] jpayne@69: if dbid: jpayne@69: sql += " and biodatabase_id = %s" jpayne@69: fields.append(dbid) jpayne@69: self.execute(sql, fields) jpayne@69: rv = self.cursor.fetchall() jpayne@69: if not rv: jpayne@69: raise IndexError(f"Cannot find display id {name!r}") jpayne@69: if len(rv) > 1: jpayne@69: raise IndexError(f"More than one entry with display id {name!r}") jpayne@69: return rv[0][0] jpayne@69: jpayne@69: def fetch_seqid_by_accession(self, dbid, name): jpayne@69: """Return the internal id for a sequence using its accession. jpayne@69: jpayne@69: Arguments: jpayne@69: - dbid - the internal id for the sub-database jpayne@69: - name - the accession of the sequence. Corresponds to the jpayne@69: accession column of the bioentry table of the SQL schema jpayne@69: jpayne@69: """ jpayne@69: sql = "select bioentry_id from bioentry where accession = %s" jpayne@69: fields = [name] jpayne@69: if dbid: jpayne@69: sql += " and biodatabase_id = %s" jpayne@69: fields.append(dbid) jpayne@69: self.execute(sql, fields) jpayne@69: rv = self.cursor.fetchall() jpayne@69: if not rv: jpayne@69: raise IndexError(f"Cannot find accession {name!r}") jpayne@69: if len(rv) > 1: jpayne@69: raise IndexError(f"More than one entry with accession {name!r}") jpayne@69: return rv[0][0] jpayne@69: jpayne@69: def fetch_seqids_by_accession(self, dbid, name): jpayne@69: """Return a list internal ids using an accession. jpayne@69: jpayne@69: Arguments: jpayne@69: - dbid - the internal id for the sub-database jpayne@69: - name - the accession of the sequence. Corresponds to the jpayne@69: accession column of the bioentry table of the SQL schema jpayne@69: jpayne@69: """ jpayne@69: sql = "select bioentry_id from bioentry where accession = %s" jpayne@69: fields = [name] jpayne@69: if dbid: jpayne@69: sql += " and biodatabase_id = %s" jpayne@69: fields.append(dbid) jpayne@69: return self.execute_and_fetch_col0(sql, fields) jpayne@69: jpayne@69: def fetch_seqid_by_version(self, dbid, name): jpayne@69: """Return the internal id for a sequence using its accession and version. jpayne@69: jpayne@69: Arguments: jpayne@69: - dbid - the internal id for the sub-database jpayne@69: - name - the accession of the sequence containing a version number. jpayne@69: Must correspond to . jpayne@69: jpayne@69: """ jpayne@69: acc_version = name.split(".") jpayne@69: if len(acc_version) > 2: jpayne@69: raise IndexError(f"Bad version {name!r}") jpayne@69: acc = acc_version[0] jpayne@69: if len(acc_version) == 2: jpayne@69: version = acc_version[1] jpayne@69: else: jpayne@69: version = "0" jpayne@69: sql = "SELECT bioentry_id FROM bioentry WHERE accession = %s AND version = %s" jpayne@69: fields = [acc, version] jpayne@69: if dbid: jpayne@69: sql += " and biodatabase_id = %s" jpayne@69: fields.append(dbid) jpayne@69: self.execute(sql, fields) jpayne@69: rv = self.cursor.fetchall() jpayne@69: if not rv: jpayne@69: raise IndexError(f"Cannot find version {name!r}") jpayne@69: if len(rv) > 1: jpayne@69: raise IndexError(f"More than one entry with version {name!r}") jpayne@69: return rv[0][0] jpayne@69: jpayne@69: def fetch_seqid_by_identifier(self, dbid, identifier): jpayne@69: """Return the internal id for a sequence using its identifier. jpayne@69: jpayne@69: Arguments: jpayne@69: - dbid - the internal id for the sub-database jpayne@69: - identifier - the identifier of the sequence. Corresponds to jpayne@69: the identifier column of the bioentry table in the SQL schema. jpayne@69: jpayne@69: """ jpayne@69: # YB: was fetch_seqid_by_seqid jpayne@69: sql = "SELECT bioentry_id FROM bioentry WHERE identifier = %s" jpayne@69: fields = [identifier] jpayne@69: if dbid: jpayne@69: sql += " and biodatabase_id = %s" jpayne@69: fields.append(dbid) jpayne@69: self.execute(sql, fields) jpayne@69: rv = self.cursor.fetchall() jpayne@69: if not rv: jpayne@69: raise IndexError(f"Cannot find display id {identifier!r}") jpayne@69: return rv[0][0] jpayne@69: jpayne@69: def list_biodatabase_names(self): jpayne@69: """Return a list of all of the sub-databases.""" jpayne@69: return self.execute_and_fetch_col0("SELECT name FROM biodatabase") jpayne@69: jpayne@69: def list_bioentry_ids(self, dbid): jpayne@69: """Return a list of internal ids for all of the sequences in a sub-databae. jpayne@69: jpayne@69: Arguments: jpayne@69: - dbid - The internal id for a sub-database jpayne@69: jpayne@69: """ jpayne@69: return self.execute_and_fetch_col0( jpayne@69: "SELECT bioentry_id FROM bioentry WHERE biodatabase_id = %s", (dbid,) jpayne@69: ) jpayne@69: jpayne@69: def list_bioentry_display_ids(self, dbid): jpayne@69: """Return a list of all sequence names in a sub-databae. jpayne@69: jpayne@69: Arguments: jpayne@69: - dbid - The internal id for a sub-database jpayne@69: jpayne@69: """ jpayne@69: return self.execute_and_fetch_col0( jpayne@69: "SELECT name FROM bioentry WHERE biodatabase_id = %s", (dbid,) jpayne@69: ) jpayne@69: jpayne@69: def list_any_ids(self, sql, args): jpayne@69: """Return ids given a SQL statement to select for them. jpayne@69: jpayne@69: This assumes that the given SQL does a SELECT statement that jpayne@69: returns a list of items. This parses them out of the 2D list jpayne@69: they come as and just returns them in a list. jpayne@69: """ jpayne@69: return self.execute_and_fetch_col0(sql, args) jpayne@69: jpayne@69: def execute_one(self, sql, args=None): jpayne@69: """Execute sql that returns 1 record, and return the record.""" jpayne@69: self.execute(sql, args or ()) jpayne@69: rv = self.cursor.fetchall() jpayne@69: if len(rv) != 1: jpayne@69: raise ValueError(f"Expected 1 response, got {len(rv)}.") jpayne@69: return rv[0] jpayne@69: jpayne@69: def execute(self, sql, args=None): jpayne@69: """Just execute an sql command.""" jpayne@69: if os.name == "java": jpayne@69: sql = sql.replace("%s", "?") jpayne@69: self.dbutils.execute(self.cursor, sql, args) jpayne@69: jpayne@69: def executemany(self, sql, args): jpayne@69: """Execute many sql commands.""" jpayne@69: if os.name == "java": jpayne@69: sql = sql.replace("%s", "?") jpayne@69: self.dbutils.executemany(self.cursor, sql, args) jpayne@69: jpayne@69: def get_subseq_as_string(self, seqid, start, end): jpayne@69: """Return a substring of a sequence. jpayne@69: jpayne@69: Arguments: jpayne@69: - seqid - The internal id for the sequence jpayne@69: - start - The start position of the sequence; 0-indexed jpayne@69: - end - The end position of the sequence jpayne@69: jpayne@69: """ jpayne@69: length = end - start jpayne@69: # XXX Check this on MySQL and PostgreSQL. substr should be general, jpayne@69: # does it need dbutils? jpayne@69: # return self.execute_one( jpayne@69: # """select SUBSTRING(seq FROM %s FOR %s) jpayne@69: # from biosequence where bioentry_id = %s""", jpayne@69: # (start+1, length, seqid))[0] jpayne@69: return self.execute_one( jpayne@69: "SELECT SUBSTR(seq, %s, %s) FROM biosequence WHERE bioentry_id = %s", jpayne@69: (start + 1, length, seqid), jpayne@69: )[0] jpayne@69: jpayne@69: def execute_and_fetch_col0(self, sql, args=None): jpayne@69: """Return a list of values from the first column in the row.""" jpayne@69: self.execute(sql, args or ()) jpayne@69: return [field[0] for field in self.cursor.fetchall()] jpayne@69: jpayne@69: def execute_and_fetchall(self, sql, args=None): jpayne@69: """Return a list of tuples of all rows.""" jpayne@69: self.execute(sql, args or ()) jpayne@69: return self.cursor.fetchall() jpayne@69: jpayne@69: jpayne@69: class MysqlConnectorAdaptor(Adaptor): jpayne@69: """A BioSQL Adaptor class with fixes for the MySQL interface. jpayne@69: jpayne@69: BioSQL was failing due to returns of bytearray objects from jpayne@69: the mysql-connector-python database connector. This adaptor jpayne@69: class scrubs returns of bytearrays and of byte strings converting jpayne@69: them to string objects instead. This adaptor class was made in jpayne@69: response to backwards incompatible changes added to jpayne@69: mysql-connector-python in release 2.0.0 of the package. jpayne@69: """ jpayne@69: jpayne@69: @staticmethod jpayne@69: def _bytearray_to_str(s): jpayne@69: """If s is bytes or bytearray, convert to a string (PRIVATE).""" jpayne@69: if isinstance(s, (bytes, bytearray)): jpayne@69: return s.decode() jpayne@69: return s jpayne@69: jpayne@69: def execute_one(self, sql, args=None): jpayne@69: """Execute sql that returns 1 record, and return the record.""" jpayne@69: out = super().execute_one(sql, args) jpayne@69: return tuple(self._bytearray_to_str(v) for v in out) jpayne@69: jpayne@69: def execute_and_fetch_col0(self, sql, args=None): jpayne@69: """Return a list of values from the first column in the row.""" jpayne@69: out = super().execute_and_fetch_col0(sql, args) jpayne@69: return [self._bytearray_to_str(column) for column in out] jpayne@69: jpayne@69: def execute_and_fetchall(self, sql, args=None): jpayne@69: """Return a list of tuples of all rows.""" jpayne@69: out = super().execute_and_fetchall(sql, args) jpayne@69: return [tuple(self._bytearray_to_str(v) for v in o) for o in out] jpayne@69: jpayne@69: jpayne@69: _interface_specific_adaptors = { jpayne@69: # If SQL interfaces require a specific adaptor, use this to map the adaptor jpayne@69: "mysql.connector": MysqlConnectorAdaptor, jpayne@69: "MySQLdb": MysqlConnectorAdaptor, jpayne@69: } jpayne@69: jpayne@69: _allowed_lookups = { jpayne@69: # Lookup name / function name to get id, function to list all ids jpayne@69: "primary_id": "fetch_seqid_by_identifier", jpayne@69: "gi": "fetch_seqid_by_identifier", jpayne@69: "display_id": "fetch_seqid_by_display_id", jpayne@69: "name": "fetch_seqid_by_display_id", jpayne@69: "accession": "fetch_seqid_by_accession", jpayne@69: "version": "fetch_seqid_by_version", jpayne@69: } jpayne@69: jpayne@69: jpayne@69: class BioSeqDatabase: jpayne@69: """Represents a namespace (sub-database) within the BioSQL database. jpayne@69: jpayne@69: i.e. One row in the biodatabase table, and all all rows in the bioentry jpayne@69: table associated with it. jpayne@69: """ jpayne@69: jpayne@69: def __init__(self, adaptor, name): jpayne@69: """Create a BioDatabase object. jpayne@69: jpayne@69: Arguments: jpayne@69: - adaptor - A BioSQL.Adaptor object jpayne@69: - name - The name of the sub-database (namespace) jpayne@69: jpayne@69: """ jpayne@69: self.adaptor = adaptor jpayne@69: self.name = name jpayne@69: self.dbid = self.adaptor.fetch_dbid_by_dbname(name) jpayne@69: jpayne@69: def __repr__(self): jpayne@69: """Return a short summary of the BioSeqDatabase.""" jpayne@69: return f"BioSeqDatabase({self.adaptor!r}, {self.name!r})" jpayne@69: jpayne@69: def get_Seq_by_id(self, name): jpayne@69: """Get a DBSeqRecord object by its name. jpayne@69: jpayne@69: Example: seq_rec = db.get_Seq_by_id('ROA1_HUMAN') jpayne@69: jpayne@69: The name of this method is misleading since it returns a DBSeqRecord jpayne@69: rather than a Seq object, and presumably was to mirror BioPerl. jpayne@69: """ jpayne@69: seqid = self.adaptor.fetch_seqid_by_display_id(self.dbid, name) jpayne@69: return BioSeq.DBSeqRecord(self.adaptor, seqid) jpayne@69: jpayne@69: def get_Seq_by_acc(self, name): jpayne@69: """Get a DBSeqRecord object by accession number. jpayne@69: jpayne@69: Example: seq_rec = db.get_Seq_by_acc('X77802') jpayne@69: jpayne@69: The name of this method is misleading since it returns a DBSeqRecord jpayne@69: rather than a Seq object, and presumably was to mirror BioPerl. jpayne@69: """ jpayne@69: seqid = self.adaptor.fetch_seqid_by_accession(self.dbid, name) jpayne@69: return BioSeq.DBSeqRecord(self.adaptor, seqid) jpayne@69: jpayne@69: def get_Seq_by_ver(self, name): jpayne@69: """Get a DBSeqRecord object by version number. jpayne@69: jpayne@69: Example: seq_rec = db.get_Seq_by_ver('X77802.1') jpayne@69: jpayne@69: The name of this method is misleading since it returns a DBSeqRecord jpayne@69: rather than a Seq object, and presumably was to mirror BioPerl. jpayne@69: """ jpayne@69: seqid = self.adaptor.fetch_seqid_by_version(self.dbid, name) jpayne@69: return BioSeq.DBSeqRecord(self.adaptor, seqid) jpayne@69: jpayne@69: def get_Seqs_by_acc(self, name): jpayne@69: """Get a list of DBSeqRecord objects by accession number. jpayne@69: jpayne@69: Example: seq_recs = db.get_Seq_by_acc('X77802') jpayne@69: jpayne@69: The name of this method is misleading since it returns a list of jpayne@69: DBSeqRecord objects rather than a list of Seq objects, and presumably jpayne@69: was to mirror BioPerl. jpayne@69: """ jpayne@69: seqids = self.adaptor.fetch_seqids_by_accession(self.dbid, name) jpayne@69: return [BioSeq.DBSeqRecord(self.adaptor, seqid) for seqid in seqids] jpayne@69: jpayne@69: def __getitem__(self, key): jpayne@69: """Return a DBSeqRecord for one of the sequences in the sub-database. jpayne@69: jpayne@69: Arguments: jpayne@69: - key - The internal id for the sequence jpayne@69: jpayne@69: """ jpayne@69: record = BioSeq.DBSeqRecord(self.adaptor, key) jpayne@69: if record._biodatabase_id != self.dbid: jpayne@69: raise KeyError(f"Entry {key!r} does exist, but not in current name space") jpayne@69: return record jpayne@69: jpayne@69: def __delitem__(self, key): jpayne@69: """Remove an entry and all its annotation.""" jpayne@69: if key not in self: jpayne@69: raise KeyError( jpayne@69: f"Entry {key!r} cannot be deleted. It was not found or is invalid" jpayne@69: ) jpayne@69: # Assuming this will automatically cascade to the other tables... jpayne@69: sql = "DELETE FROM bioentry WHERE biodatabase_id=%s AND bioentry_id=%s;" jpayne@69: self.adaptor.execute(sql, (self.dbid, key)) jpayne@69: jpayne@69: def __len__(self): jpayne@69: """Return number of records in this namespace (sub database).""" jpayne@69: sql = "SELECT COUNT(bioentry_id) FROM bioentry WHERE biodatabase_id=%s;" jpayne@69: return int(self.adaptor.execute_and_fetch_col0(sql, (self.dbid,))[0]) jpayne@69: jpayne@69: def __contains__(self, value): jpayne@69: """Check if a primary (internal) id is this namespace (sub database).""" jpayne@69: sql = ( jpayne@69: "SELECT COUNT(bioentry_id) FROM bioentry " jpayne@69: "WHERE biodatabase_id=%s AND bioentry_id=%s;" jpayne@69: ) jpayne@69: # The bioentry_id field is an integer in the schema. jpayne@69: # PostgreSQL will throw an error if we use a non integer in the query. jpayne@69: try: jpayne@69: bioentry_id = int(value) jpayne@69: except ValueError: jpayne@69: return False jpayne@69: return bool( jpayne@69: self.adaptor.execute_and_fetch_col0(sql, (self.dbid, bioentry_id))[0] jpayne@69: ) jpayne@69: jpayne@69: def __iter__(self): jpayne@69: """Iterate over ids (which may not be meaningful outside this database).""" jpayne@69: # TODO - Iterate over the cursor, much more efficient jpayne@69: return iter(self.adaptor.list_bioentry_ids(self.dbid)) jpayne@69: jpayne@69: def keys(self): jpayne@69: """Iterate over ids (which may not be meaningful outside this database).""" jpayne@69: return iter(self) jpayne@69: jpayne@69: def values(self): jpayne@69: """Iterate over DBSeqRecord objects in the namespace (sub database).""" jpayne@69: for key in self: jpayne@69: yield self[key] jpayne@69: jpayne@69: def items(self): jpayne@69: """Iterate over (id, DBSeqRecord) for the namespace (sub database).""" jpayne@69: for key in self: jpayne@69: yield key, self[key] jpayne@69: jpayne@69: def lookup(self, **kwargs): jpayne@69: """Return a DBSeqRecord using an acceptable identifier. jpayne@69: jpayne@69: Arguments: jpayne@69: - kwargs - A single key-value pair where the key is one jpayne@69: of primary_id, gi, display_id, name, accession, version jpayne@69: jpayne@69: """ jpayne@69: if len(kwargs) != 1: jpayne@69: raise TypeError("single key/value parameter expected") jpayne@69: k, v = list(kwargs.items())[0] jpayne@69: if k not in _allowed_lookups: jpayne@69: raise TypeError( jpayne@69: f"lookup() expects one of {list(_allowed_lookups.keys())!r}, not {k!r}" jpayne@69: ) jpayne@69: lookup_name = _allowed_lookups[k] jpayne@69: lookup_func = getattr(self.adaptor, lookup_name) jpayne@69: seqid = lookup_func(self.dbid, v) jpayne@69: return BioSeq.DBSeqRecord(self.adaptor, seqid) jpayne@69: jpayne@69: def load(self, record_iterator, fetch_NCBI_taxonomy=False): jpayne@69: """Load a set of SeqRecords into the BioSQL database. jpayne@69: jpayne@69: record_iterator is either a list of SeqRecord objects, or an jpayne@69: Iterator object that returns SeqRecord objects (such as the jpayne@69: output from the Bio.SeqIO.parse() function), which will be jpayne@69: used to populate the database. jpayne@69: jpayne@69: fetch_NCBI_taxonomy is boolean flag allowing or preventing jpayne@69: connection to the taxonomic database on the NCBI server jpayne@69: (via Bio.Entrez) to fetch a detailed taxonomy for each jpayne@69: SeqRecord. jpayne@69: jpayne@69: Example:: jpayne@69: jpayne@69: from Bio import SeqIO jpayne@69: count = db.load(SeqIO.parse(open(filename), format)) jpayne@69: jpayne@69: Returns the number of records loaded. jpayne@69: """ jpayne@69: db_loader = Loader.DatabaseLoader(self.adaptor, self.dbid, fetch_NCBI_taxonomy) jpayne@69: num_records = 0 jpayne@69: global _POSTGRES_RULES_PRESENT jpayne@69: for cur_record in record_iterator: jpayne@69: num_records += 1 jpayne@69: # Hack to work around BioSQL Bug 2839 - If using PostgreSQL and jpayne@69: # the RULES are present check for a duplicate record before loading jpayne@69: if _POSTGRES_RULES_PRESENT: jpayne@69: # Recreate what the Loader's _load_bioentry_table will do: jpayne@69: if cur_record.id.count(".") == 1: jpayne@69: accession, version = cur_record.id.split(".") jpayne@69: try: jpayne@69: version = int(version) jpayne@69: except ValueError: jpayne@69: accession = cur_record.id jpayne@69: version = 0 jpayne@69: else: jpayne@69: accession = cur_record.id jpayne@69: version = 0 jpayne@69: gi = cur_record.annotations.get("gi") jpayne@69: sql = ( jpayne@69: "SELECT bioentry_id FROM bioentry " jpayne@69: "WHERE (identifier = '%s' AND biodatabase_id = '%s') " jpayne@69: "OR (accession = '%s' AND version = '%s' AND biodatabase_id = '%s')" jpayne@69: ) jpayne@69: self.adaptor.execute( jpayne@69: sql % (gi, self.dbid, accession, version, self.dbid) jpayne@69: ) jpayne@69: if self.adaptor.cursor.fetchone(): jpayne@69: raise self.adaptor.conn.IntegrityError( jpayne@69: "Duplicate record detected: record has not been inserted" jpayne@69: ) jpayne@69: # End of hack jpayne@69: db_loader.load_seqrecord(cur_record) jpayne@69: return num_records