Source code for pydal.adapters.mongo

# -*- coding: utf-8 -*-
import datetime
import re

from .._globals import IDENTITY
from .._compat import integer_types, basestring
from ..objects import Table, Query, Field, Expression
from ..helpers.classes import SQLALL, Reference
from ..helpers.methods import use_common_filters, xorify
from .base import NoSQLAdapter
try:
    from bson import Binary
    from bson.binary import USER_DEFINED_SUBTYPE
except:
[docs] class Binary(object): pass
USER_DEFINED_SUBTYPE = 0 long = integer_types[-1]
[docs]class MongoDBAdapter(NoSQLAdapter): drivers = ('pymongo',) driver_auto_json = ['loads', 'dumps'] uploads_in_blob = False types = { 'boolean': bool, 'string': str, 'text': str, 'json': str, 'password': str, 'blob': str, 'upload': str, 'integer': long, 'bigint': long, 'float': float, 'double': float, 'date': datetime.date, 'time': datetime.time, 'datetime': datetime.datetime, 'id': long, 'reference': long, 'list:string': list, 'list:integer': list, 'list:reference': list, } error_messages = {"javascript_needed": "This must yet be replaced" + " with javascript in order to work."} def __init__(self, db, uri='mongodb://127.0.0.1:5984/db', pool_size=0, folder=None, db_codec='UTF-8', credential_decoder=IDENTITY, driver_args={}, adapter_args={}, do_connect=True, after_connection=None): self.db = db self.uri = uri if do_connect: self.find_driver(adapter_args) import random from bson.objectid import ObjectId from bson.son import SON import pymongo.uri_parser from pymongo.write_concern import WriteConcern m = pymongo.uri_parser.parse_uri(uri) self.SON = SON self.ObjectId = ObjectId self.random = random self.WriteConcern = WriteConcern self.dbengine = 'mongodb' self.folder = folder db['_lastsql'] = '' self.db_codec = 'UTF-8' self._after_connection = after_connection self.pool_size = pool_size self.find_or_make_work_folder() #this is the minimum amount of replicates that it should wait # for on insert/update self.minimumreplication = adapter_args.get('minimumreplication', 0) # by default all inserts and selects are performand asynchronous, # but now the default is # synchronous, except when overruled by either this default or # function parameter self.safe = 1 if adapter_args.get('safe', True) else 0 if isinstance(m, tuple): m = {"database": m[1]} if m.get('database') is None: raise SyntaxError("Database is required!") def connector(uri=self.uri, m=m): return self.driver.MongoClient(uri, w=self.safe)[m.get('database')] self.reconnect(connector, cursor=False)
[docs] def object_id(self, arg=None): """ Convert input to a valid Mongodb ObjectId instance self.object_id("<random>") -> ObjectId (not unique) instance """ if not arg: arg = 0 if isinstance(arg, basestring): # we assume an integer as default input rawhex = len(arg.replace("0x", "").replace("L", "")) == 24 if arg.isdigit() and (not rawhex): arg = int(arg) elif arg == "<random>": arg = int("0x%sL" % \ "".join([self.random.choice("0123456789abcdef") \ for x in range(24)]), 0) elif arg.isalnum(): if not arg.startswith("0x"): arg = "0x%s" % arg try: arg = int(arg, 0) except ValueError as e: raise ValueError( "invalid objectid argument string: %s" % e) else: raise ValueError("Invalid objectid argument string. " + "Requires an integer or base 16 value") elif isinstance(arg, self.ObjectId): return arg if not isinstance(arg, (int, long)): raise TypeError("object_id argument must be of type " + "ObjectId or an objectid representable integer") hexvalue = hex(arg)[2:].rstrip('L').zfill(24) return self.ObjectId(hexvalue)
[docs] def parse_reference(self, value, field_type): # here we have to check for ObjectID before base parse if isinstance(value, self.ObjectId): value = long(str(value), 16) return super(MongoDBAdapter, self).parse_reference(value, field_type)
[docs] def parse_id(self, value, field_type): if isinstance(value, self.ObjectId): value = long(str(value), 16) return super(MongoDBAdapter, self).parse_id(value, field_type)
[docs] def represent(self, obj, fieldtype): # the base adapter does not support MongoDB ObjectId if isinstance(obj, self.ObjectId): value = obj else: value = NoSQLAdapter.represent(self, obj, fieldtype) # reference types must be convert to ObjectID if fieldtype =='date': if value is None: return value # this piece of data can be stripped off based on the fieldtype t = datetime.time(0, 0, 0) # mongodb doesn't has a date object and so it must datetime, # string or integer return datetime.datetime.combine(value, t) elif fieldtype == 'time': if value is None: return value # this piece of data can be stripped of based on the fieldtype d = datetime.date(2000, 1, 1) # mongodb doesn't has a time object and so it must datetime, # string or integer return datetime.datetime.combine(d, value) elif fieldtype == "blob": return MongoBlob(value) elif isinstance(fieldtype, basestring): if fieldtype.startswith('list:'): if fieldtype.startswith('list:reference'): newval = [] for v in value: newval.append(self.object_id(v)) value = newval elif fieldtype.startswith("reference") or fieldtype=="id": value = self.object_id(value) elif fieldtype == "string": value = str(value) elif isinstance(fieldtype, Table): value = self.object_id(value) return value
[docs] def parse_blob(self, value, field_type): return MongoBlob.decode(value)
def _expand_query(self, query, tablename=None, safe=None): """ Return a tuple containing query and ctable """ if not tablename: tablename = self.get_table(query) ctable = self._get_collection(tablename, safe) _filter = None if query: if use_common_filters(query): query = self.common_filter(query,[tablename]) _filter = self.expand(query) return (ctable, _filter) def _get_collection(self, tablename, safe=None): ctable = self.connection[tablename] if safe is not None and safe != self.safe: wc = self.WriteConcern(w=self._get_safe(safe)) ctable = ctable.with_options(write_concern=wc) return ctable def _get_safe(self, val=None): if val is None: return self.safe return 1 if val else 0
[docs] def create_table(self, table, migrate=True, fake_migrate=False, polymodel=None, isCapped=False): if isCapped: raise RuntimeError("Not implemented") table._dbt = None
[docs] def expand(self, expression, field_type=None): if isinstance(expression, Query): # any query using 'id':= # set name as _id (as per pymongo/mongodb primary key) # convert second arg to an objectid field # (if its not already) # if second arg is 0 convert to objectid if isinstance(expression.first,Field) and \ ((expression.first.type == 'id') or \ ("reference" in expression.first.type)): if expression.first.type == 'id': expression.first.name = '_id' # cast to Mongo ObjectId if isinstance(expression.second, (tuple, list, set)): expression.second = [self.object_id(item) for item in expression.second] else: expression.second = self.object_id(expression.second) if isinstance(expression, Field): if expression.type=='id': result = "_id" else: result = expression.name elif isinstance(expression, (Expression, Query)): first = expression.first second = expression.second op = expression.op optional_args = expression.optional_args or {} if not second is None: result = op(first, second, **optional_args) elif not first is None: result = op(first, **optional_args) else: result = op if isinstance(op, str) else op(**optional_args) elif field_type: result = self.represent(expression,field_type) elif isinstance(expression,(list,tuple)): result = [self.represent(item,field_type) for item in expression] else: result = expression return result
[docs] def drop(self, table, mode=''): ctable = self.connection[table._tablename] ctable.drop() self._drop_cleanup(table) return
[docs] def truncate(self, table, mode, safe=None): ctable = self.connection[table._tablename] ctable.remove(None, w=self._get_safe(safe))
[docs] def count(self, query, distinct=None, snapshot=True): if distinct: raise RuntimeError("COUNT DISTINCT not supported") if not isinstance(query, Query): raise SyntaxError("Not Supported") (ctable, _filter) = self._expand_query(query) result = ctable.count(filter=_filter) return result
[docs] def select(self, query, fields, attributes, snapshot=False): mongofields_dict = self.SON() new_fields, mongosort_list = [], [] # try an orderby attribute orderby = attributes.get('orderby', False) limitby = attributes.get('limitby', False) # distinct = attributes.get('distinct', False) if 'for_update' in attributes: self.db.logger.warning('mongodb does not support for_update') for key in set(attributes.keys())-set(('limitby', 'orderby', 'for_update')): if attributes[key] is not None: self.db.logger.warning( 'select attribute not implemented: %s' % key) if limitby: limitby_skip, limitby_limit = limitby[0], int(limitby[1]) - 1 else: limitby_skip = limitby_limit = 0 if orderby: if isinstance(orderby, (list, tuple)): orderby = xorify(orderby) # !!!! need to add 'random' for f in self.expand(orderby).split(','): if f.startswith('-'): mongosort_list.append((f[1:], -1)) else: mongosort_list.append((f, 1)) for item in fields: if isinstance(item, SQLALL): new_fields += item._table else: new_fields.append(item) fields = new_fields if isinstance(query, Query): tablename = self.get_table(query) elif len(fields) != 0: tablename = fields[0].tablename else: raise SyntaxError("The table name could not be found in " + "the query nor from the select statement.") if query: if use_common_filters(query): query = self.common_filter(query,[tablename]) mongoqry_dict = self.expand(query) fields = fields or self.db[tablename] for field in fields: mongofields_dict[field.name] = 1 ctable = self.connection[tablename] modifiers={'snapshot':snapshot} mongo_list_dicts = ctable.find( mongoqry_dict, mongofields_dict, skip=limitby_skip, limit=limitby_limit, sort=mongosort_list, modifiers=modifiers) rows = [] # populate row in proper order # Here we replace ._id with .id to follow the standard naming colnames = [] newnames = [] for field in fields: colname = str(field) colnames.append(colname) tablename, fieldname = colname.split(".") if fieldname == "_id": # Mongodb reserved uuid key field.name = "id" newnames.append(".".join((tablename, field.name))) for record in mongo_list_dicts: row = [] for colname in colnames: tablename, fieldname = colname.split(".") # switch to Mongo _id uuids for retrieving # record id's if fieldname == "id": fieldname = "_id" if fieldname in record: value = record[fieldname] else: value = None row.append(value) rows.append(row) processor = attributes.get('processor', self.parse) result = processor(rows, fields, newnames, blob_decode=True) return result
[docs] def insert(self, table, fields, safe=None): """Safe determines whether a asynchronous request is done or a synchronous action is done For safety, we use by default synchronous requests""" values = {} ctable = self._get_collection(table._tablename, safe) for k, v in fields: if not k.name in ["id", "safe"]: fieldname = k.name fieldtype = table[k.name].type values[fieldname] = self.represent(v, fieldtype) result = ctable.insert_one(values) if result.acknowledged: Oid = result.inserted_id rid = Reference(long(str(Oid), 16)) (rid._table, rid._record) = (table, None) return rid else: return None
[docs] def update(self, tablename, query, fields, safe=None): # return amount of adjusted rows or zero, but no exceptions # @ related not finding the result if not isinstance(query, Query): raise RuntimeError("Not implemented") amount = self.count(query, False) if not isinstance(query, Query): raise SyntaxError("Not Supported") (ctable, _filter) = self._expand_query(query, tablename, safe) # do not try to update id fields to avoid backend errors modify = {'$set': dict((k.name, self.represent(v, k.type)) for k, v in fields if (not k.name in ("_id", "id")))} try: result = ctable.update_many(filter=_filter, update=modify) if result.acknowledged: amount = result.matched_count return amount except Exception as e: # TODO Reverse update query to verifiy that the query succeded raise RuntimeError("uncaught exception when updating rows: %s" % e)
[docs] def delete(self, tablename, query, safe=None): amount = self.count(query, False) if not isinstance(query, Query): raise RuntimeError("query type %s is not supported" % \ type(query)) (ctable, _filter) = self._expand_query(query, safe) result = ctable.delete_many(_filter) if result.acknowledged: return result.deleted_count else: return amount return amount
[docs] def bulk_insert(self, table, items): return [self.insert(table,item) for item in items] ## OPERATORS
[docs] def INVERT(self, first): #print "in invert first=%s" % first return '-%s' % self.expand(first)
[docs] def NOT(self, first): op = self.expand(first) op_k = list(op)[0] op_body = op[op_k] r = None if type(op_body) is list: # apply De Morgan law for and/or # not(A and B) -> not(A) or not(B) # not(A or B) -> not(A) and not(B) not_op = '$and' if op_k == '$or' else '$or' r = {not_op: [self.NOT(first.first), self.NOT(first.second)]} else: try: sub_ops = list(op_body.keys()) if len(sub_ops) == 1 and sub_ops[0] == '$ne': r = {op_k: op_body['$ne']} except: r = {op_k: {'$ne': op_body}} if r == None: r = {op_k: {'$not': op_body}} return r
[docs] def AND(self,first,second): # pymongo expects: .find({'$and': [{'x':'1'}, {'y':'2'}]}) return {'$and': [self.expand(first),self.expand(second)]}
[docs] def OR(self,first,second): # pymongo expects: .find({'$or': [{'name':'1'}, {'name':'2'}]}) return {'$or': [self.expand(first),self.expand(second)]}
[docs] def BELONGS(self, first, second): if isinstance(second, str): # this is broken, the only way second is a string is if it has # been converted to SQL. This no worky. This might be made to # work if _select did not return SQL. raise RuntimeError("nested queries not supported") items = [self.expand(item, first.type) for item in second] return {self.expand(first) : {"$in" : items} }
[docs] def EQ(self,first,second=None): result = {} result[self.expand(first)] = self.expand(second, first.type) return result
[docs] def NE(self, first, second=None): result = {} result[self.expand(first)] = {'$ne': self.expand(second, first.type)} return result
[docs] def LT(self,first,second=None): if second is None: raise RuntimeError("Cannot compare %s < None" % first) result = {} result[self.expand(first)] = {'$lt': self.expand(second, first.type)} return result
[docs] def LE(self,first,second=None): if second is None: raise RuntimeError("Cannot compare %s <= None" % first) result = {} result[self.expand(first)] = {'$lte': self.expand(second, first.type)} return result
[docs] def GT(self,first,second): result = {} result[self.expand(first)] = {'$gt': self.expand(second, first.type)} return result
[docs] def GE(self,first,second=None): if second is None: raise RuntimeError("Cannot compare %s >= None" % first) result = {} result[self.expand(first)] = {'$gte': self.expand(second, first.type)} return result
[docs] def ADD(self, first, second): raise NotImplementedError(self.error_messages["javascript_needed"]) return '%s + %s' % (self.expand(first), self.expand(second, first.type))
[docs] def SUB(self, first, second): raise NotImplementedError(self.error_messages["javascript_needed"]) return '(%s - %s)' % (self.expand(first), self.expand(second, first.type))
[docs] def MUL(self, first, second): raise NotImplementedError(self.error_messages["javascript_needed"]) return '(%s * %s)' % (self.expand(first), self.expand(second, first.type))
[docs] def DIV(self, first, second): raise NotImplementedError(self.error_messages["javascript_needed"]) return '(%s / %s)' % (self.expand(first), self.expand(second, first.type))
[docs] def MOD(self, first, second): raise NotImplementedError(self.error_messages["javascript_needed"]) return '(%s %% %s)' % (self.expand(first), self.expand(second, first.type))
[docs] def AS(self, first, second): raise NotImplementedError(self.error_messages["javascript_needed"]) return '%s AS %s' % (self.expand(first), second) # We could implement an option that simulates a full featured SQL # database. But I think the option should be set explicit or # implemented as another library.
[docs] def ON(self, first, second): raise NotImplementedError("This is not possible in NoSQL" + " but can be simulated with a wrapper.") return '%s ON %s' % (self.expand(first), self.expand(second))
[docs] def COMMA(self, first, second): return '%s, %s' % (self.expand(first), self.expand(second)) #TODO verify full compatibilty with official SQL Like operator
def _build_like_regex(self, arg, case_sensitive=True, ends_with=False, starts_with=False, whole_string=True, like_wildcards=False): import re base = self.expand(arg,'string') need_regex = (whole_string or not case_sensitive or starts_with or ends_with or like_wildcards and ('_' in base or '%' in base)) if not need_regex: return base else: expr = re.escape(base) if like_wildcards: expr = expr.replace('\\%','.*') expr = expr.replace('\\_','.').replace('_','.') if starts_with: pattern = '^%s' elif ends_with: pattern = '%s$' elif whole_string: pattern = '^%s$' else: pattern = '%s' regex = { '$regex': pattern % expr } if not case_sensitive: regex['$options'] = 'i' return regex
[docs] def LIKE(self, first, second, case_sensitive=True): regex = self._build_like_regex( second, case_sensitive=case_sensitive, like_wildcards=True) return { self.expand(first): regex }
[docs] def ILIKE(self, first, second): return self.LIKE(first, second, case_sensitive=False)
[docs] def STARTSWITH(self, first, second): regex = self._build_like_regex(second, starts_with=True) return { self.expand(first): regex }
[docs] def ENDSWITH(self, first, second): regex = self._build_like_regex(second, ends_with=True) return { self.expand(first): regex } #TODO verify full compatibilty with official oracle contains operator
[docs] def CONTAINS(self, first, second, case_sensitive=True): ret = None if isinstance(second, self.ObjectId): val = second elif isinstance(first, Field) and first.type == 'list:string': if isinstance(second, Field) and second.type == 'string': ret = { '$where' : "this.%s.indexOf(this.%s) > -1" % (first.name, second.name) } else: val = self._build_like_regex( second, case_sensitive=case_sensitive, whole_string=True) else: val = self._build_like_regex( second, case_sensitive=case_sensitive, whole_string=False) if not ret: ret = {self.expand(first): val} return ret
[docs]class MongoBlob(Binary): MONGO_BLOB_BYTES = USER_DEFINED_SUBTYPE MONGO_BLOB_NON_UTF8_STR = USER_DEFINED_SUBTYPE + 1 def __new__(cls, value): # return None and Binary() unmolested if value is None or isinstance(value, Binary): return value # bytearray is marked as MONGO_BLOB_BYTES if isinstance(value, bytearray): return Binary.__new__(cls, bytes(value), MongoBlob.MONGO_BLOB_BYTES) # return non-strings as Binary(), eg: PY3 bytes() if not isinstance(value, basestring): return Binary(value) # if string is encodable as UTF-8, then return as string try: value.encode('utf-8') return value except: # string which can not be UTF-8 encoded, eg: pickle strings return Binary.__new__(cls, value, MongoBlob.MONGO_BLOB_NON_UTF8_STR) def __repr__(self): return repr(MongoBlob.decode(self)) @staticmethod
[docs] def decode(value): if isinstance(value, Binary): if value.subtype == MongoBlob.MONGO_BLOB_BYTES: return bytearray(value) if value.subtype == MongoBlob.MONGO_BLOB_NON_UTF8_STR: return str(value) return value