Source code for pydal.adapters.couchdb

# -*- coding: utf-8 -*-
import datetime

from .._globals import IDENTITY
from .._compat import integer_types
from ..drivers import couchdb
from ..objects import Field, Query
from ..helpers.classes import SQLALL
from ..helpers.methods import uuid2int
from ..helpers.serializers import serializers
from .base import BaseAdapter, NoSQLAdapter, SELECT_ARGS

long = integer_types[-1]


[docs]class CouchDBAdapter(NoSQLAdapter): drivers = ('couchdb',) uploads_in_blob = True types = { 'boolean': bool, 'string': str, 'text': str, 'json': str, 'password': str, 'blob': str, 'upload': str, 'integer': long, 'bigint': long, 'float': float, 'double': float, 'date': datetime.date, 'time': datetime.time, 'datetime': datetime.datetime, 'id': long, 'reference': long, 'list:string': list, 'list:integer': list, 'list:reference': list, }
[docs] def file_exists(self, filename): pass
[docs] def file_open(self, filename, mode='rb', lock=True): pass
[docs] def file_close(self, fileobj): pass
[docs] def expand(self,expression,field_type=None): if isinstance(expression,Field): if expression.type=='id': return "%s._id" % expression.tablename return BaseAdapter.expand(self,expression,field_type)
[docs] def AND(self,first,second): return '(%s && %s)' % (self.expand(first),self.expand(second))
[docs] def OR(self,first,second): return '(%s || %s)' % (self.expand(first),self.expand(second))
[docs] def EQ(self,first,second): if second is None: return '(%s == null)' % self.expand(first) return '(%s == %s)' % (self.expand(first),self.expand(second,first.type))
[docs] def NE(self,first,second): if second is None: return '(%s != null)' % self.expand(first) return '(%s != %s)' % (self.expand(first),self.expand(second,first.type))
[docs] def COMMA(self,first,second): return '%s + %s' % (self.expand(first),self.expand(second))
[docs] def represent(self, obj, fieldtype): value = NoSQLAdapter.represent(self, obj, fieldtype) if fieldtype == 'id': return repr(str(long(value))) elif fieldtype in ('date', 'time', 'datetime', 'boolean'): return serializers.json(value) return repr(not isinstance(value, unicode) and value or value and value.encode('utf8'))
def __init__(self,db,uri='couchdb://127.0.0.1:5984', pool_size=0,folder=None,db_codec ='UTF-8', credential_decoder=IDENTITY, driver_args={}, adapter_args={}, do_connect=True, after_connection=None): super(CouchDBAdapter, self).__init__( db=db, uri=uri, pool_size=pool_size, folder=folder, db_codec=db_codec, credential_decoder=credential_decoder, driver_args=driver_args, adapter_args=adapter_args, do_connect=do_connect, after_connection=after_connection) if do_connect: self.find_driver(adapter_args) self.dbengine = 'couchdb' db['_lastsql'] = '' self.db_codec = 'UTF-8' url='http://'+uri[10:] def connector(url=url,driver_args=driver_args): driver = self.driver.Server(url,**driver_args) driver.cursor = lambda : self.fake_cursor driver.close = lambda : None driver.commit = lambda : None return driver self.reconnect(connector)
[docs] def create_table(self, table, migrate=True, fake_migrate=False, polymodel=None): if migrate: try: self.connection.create(table._tablename) except: pass
[docs] def insert(self,table,fields): id = uuid2int(self.db.uuid()) ctable = self.connection[table._tablename] values = dict((k.name,self.represent(v,k.type)) for k,v in fields) values['_id'] = str(id) ctable.save(values) return id
def _select(self,query,fields,attributes): if not isinstance(query,Query): raise SyntaxError("Not Supported") for key in set(attributes.keys())-SELECT_ARGS: raise SyntaxError('invalid select attribute: %s' % key) new_fields=[] for item in fields: if isinstance(item,SQLALL): new_fields += item._table else: new_fields.append(item) def uid(fd): return fd=='id' and '_id' or fd def get(row,fd): return fd=='id' and long(row['_id']) or row.get(fd,None) fields = new_fields tablename = self.get_table(query) fieldnames = [f.name for f in (fields or self.db[tablename])] colnames = ['%s.%s' % (tablename,k) for k in fieldnames] fields = ','.join(['%s.%s' % (tablename,uid(f)) for f in fieldnames]) fn="(function(%(t)s){if(%(query)s)emit(%(order)s,[%(fields)s]);})" %\ dict(t=tablename, query=self.expand(query), order='%s._id' % tablename, fields=fields) return fn, colnames
[docs] def select(self,query,fields,attributes): if not isinstance(query,Query): raise SyntaxError("Not Supported") fn, colnames = self._select(query,fields,attributes) tablename = colnames[0].split('.')[0] ctable = self.connection[tablename] rows = [cols['value'] for cols in ctable.query(fn)] processor = attributes.get('processor',self.parse) return processor(rows,fields,colnames,False)
[docs] def delete(self,tablename,query): if not isinstance(query,Query): raise SyntaxError("Not Supported") if query.first.type=='id' and query.op==self.EQ: id = query.second tablename = query.first.tablename assert(tablename == query.first.tablename) ctable = self.connection[tablename] try: del ctable[str(id)] return 1 except couchdb.http.ResourceNotFound: return 0 else: tablename = self.get_table(query) rows = self.select(query,[self.db[tablename]._id],{}) ctable = self.connection[tablename] for row in rows: del ctable[str(row.id)] return len(rows)
[docs] def update(self,tablename,query,fields): if not isinstance(query,Query): raise SyntaxError("Not Supported") if query.first.type=='id' and query.op==self.EQ: id = query.second tablename = query.first.tablename ctable = self.connection[tablename] try: doc = ctable[str(id)] for key,value in fields: doc[key.name] = self.represent(value,self.db[tablename][key.name].type) ctable.save(doc) return 1 except couchdb.http.ResourceNotFound: return 0 else: tablename = self.get_table(query) rows = self.select(query,[self.db[tablename]._id],{}) ctable = self.connection[tablename] table = self.db[tablename] for row in rows: doc = ctable[str(row.id)] for key,value in fields: doc[key.name] = self.represent(value,table[key.name].type) ctable.save(doc) return len(rows)
[docs] def count(self,query,distinct=None): if distinct: raise RuntimeError("COUNT DISTINCT not supported") if not isinstance(query,Query): raise SyntaxError("Not Supported") tablename = self.get_table(query) rows = self.select(query,[self.db[tablename]._id],{}) return len(rows)