# code adapted from https://github.com/MongoEngine/mongoengine/blob/master/mongoengine/connection.py
# code adapted from https://github.com/heynemann/aiomotorengine/blob/master/motorengine/connection.py
import sys
try:
import six
except ImportError:
pass
try:
from motor.motor_asyncio import (
AsyncIOMotorClient as MotorClient,
AsyncIOMotorReplicaSetClient as MotorReplicaSetClient
)
except ImportError:
pass
from aiomotorengine.database import Database
DEFAULT_CONNECTION_NAME = 'default'
class ConnectionError(Exception):
pass
_connection_settings = {}
_connections = {}
_default_dbs = {}
def register_connection(db, alias, **kwargs):
global _connection_settings
global _default_dbs
_connection_settings[alias] = kwargs
_default_dbs[alias] = db
def cleanup():
global _connections
global _connection_settings
global _default_dbs
_connections = {}
_connection_settings = {}
_default_dbs = {}
def disconnect(alias=DEFAULT_CONNECTION_NAME):
global _connections
global _connections_settings
global _default_dbs
if alias in _connections:
get_connection(alias=alias).disconnect()
del _connections[alias]
del _connection_settings[alias]
del _default_dbs[alias]
def get_connection(alias=DEFAULT_CONNECTION_NAME, db=None):
global _connections
global _default_dbs
if alias not in _connections:
conn_settings = _connection_settings[alias].copy()
db = conn_settings.pop('name', None)
connection_class = MotorClient
if 'replicaSet' in conn_settings:
connection_class = MotorReplicaSetClient
conn_settings['hosts_or_uri'] = conn_settings.pop('host', None)
# Discard port since it can't be used on MongoReplicaSetClient
conn_settings.pop('port', None)
# Discard replicaSet if not base string
if not isinstance(conn_settings['replicaSet'], six.string_types):
conn_settings.pop('replicaSet', None)
try:
_connections[alias] = connection_class(**conn_settings)
except Exception:
exc_info = sys.exc_info()
err = ConnectionError("Cannot connect to database %s :\n%s" % (alias, exc_info[1]))
raise six.reraise(ConnectionError, err, exc_info[2])
try:
if not _connections[alias].connected:
_connections[alias].open_sync()
except Exception:
exc_info = sys.exc_info()
err = ConnectionError("Cannot connect to database %s :\n%s" % (alias, exc_info[1]))
raise six.reraise(ConnectionError, err, exc_info[2])
database = None
if db is None:
database = getattr(_connections[alias], _default_dbs[alias])
else:
database = getattr(_connections[alias], db)
return Database(_connections[alias], database)
[docs]def connect(db, alias=DEFAULT_CONNECTION_NAME, **kwargs):
"""Connect to the database specified by the 'db' argument.
Connection settings may be provided here as well if the database is not
running on the default port on localhost. If authentication is needed,
provide username and password arguments as well.
Multiple databases are supported by using aliases. Provide a separate
`alias` to connect to a different instance of :program:`mongod`.
Extra keyword-arguments are passed to Motor when connecting to the database.
"""
global _connections
if alias not in _connections:
kwargs['name'] = db
register_connection(db, alias, **kwargs)
return get_connection(alias, db=db)