Source code for alkali.query

# -*- coding: utf-8 -*-


    from alkali import Database, Model

    class MyModel( Model ):
        id = fields.IntField(primary_key=True)
        title = fields.StringField()

    db = Database( models=[MyModel] )

    # create 10 instances and save them
    for i in range(10):
        MyModel(id=i, title='number %d' % i).save()

    assert MyModel.objects.count == 10
    assert MyModel.objects.filter(id__gt=5).count == 4
    assert MyModel.objects.filter(id__gt=5, id__le=7).count == 2
    assert MyModel.objects.get(pk=1).title == 'number 1'
    assert MyModel.objects.order_by('id')[0].id == 0
    assert MyModel.objects.order_by('-id')[0].id == 9

import types
import operator
import collections
import copy
import re

import logging
logger = logging.getLogger(__name__)

[docs]class Aggregate: """ A reducing function that returns a single value """ def __init__(self, field): """ :param field str: """ self.field = field
[docs]class Count(Aggregate): """ number of objects in query """ def __call__(self, query): return len( query )
[docs]class Sum(Aggregate): """ sum of given field (numeric field required) """ def __call__(self, query): return sum( query.values_list(self.field, flat=True) )
[docs]class Max(Aggregate): """ largest field (numeric field required) """ def __call__(self, query): return max( query.values_list(self.field, flat=True) )
[docs]class Min(Aggregate): """ smallest field (numeric field required) """ def __call__(self, query): return min( query.values_list(self.field, flat=True) )
# def copy_instances(func): # def wrapper(*args, **kw): # return map( copy.copy, func(*args, **kw) ) # return wrapper
[docs]def as_list(func): def wrapper(*args, **kw): ret = func(*args, **kw) if type(ret) in [map, filter]: ret = list(ret) return ret return wrapper
[docs]class Query: """ this class performs queries on manager instances returns lists of model instances this class is one of the main reasons to use alkali the Django docs at will be fairly relevant to alkali, except for anything related to foreign or many2many fields. """ def __init__( self, manager): """ this is an internal class so you shouldn't have to create it directly. create via Manager. ``MyModel.objects`` :param Manager manager: """ self.manager = manager # THINK: Query should work on the keys of manager instances, # this might save a copy or two, then only dereferencing a query # should return a copy. If we ever get multiple indexes then # that might also dramatically speed up queries # THINK: I tried really hard to make Query._instances an # manager._instances.itervalues() but this is fraught with # peril. You have to convert to a list to get it's length, you # have to convert to a list to get __getitem__, and if you # iterate over it once then it's "empty" if you need to do so # again. I'm afraid that for now each Query object needs to get # a copy of the Manager values. Note, you still need to copy the # individual elements as they leave the Query. self._instances = list(manager._instances.values()) self.order_by('pk') def __len__(self): return len(self._instances) def __iter__(self): for elem in self._instances: yield copy.copy(elem) def __getitem__(self, i): return copy.copy(self._instances[i]) def __str__(self): return "<Query: {}>".format(", ".join([str(q) for q in self])) @property def count(self): """ **property**: number of model instances we are currently tracking """ return len(self) @property def fields(self): """ **property**: helper function to get dict of model fields :rtype: ``dict`` """ return self.model_class.Meta.fields @property def model_class(self): """ **property**: return our managers model class """ return self.manager.model_class @property def field_names(self): """ **property**: return our model field names :rtype: ``list`` of ``str`` """ return self.fields.keys()
[docs] def all(self): return self
[docs] def filter(self, **kw): """ :param kw: ``field_name__op=value``, note: ``field_name`` can be a ``property`` :rtype: Query perform a query, keeping model instances that pass the criteria specified in the ``kw`` parameter. see example code above. see Django page for very thorough docs on this functionality. basically, its field_name '__' operation = value. :: # field/property f is 'foo' or 'bar' MyModel.objects.filter( f__in=['foo','bar'] ) # 'foo' is in field/property myset MyModel.objects.filter( myset__rin='foo' ) """ for field, query in kw.items(): try: field, oper = field.split('__') oper = oper or 'eq' except ValueError: # no __ in field name field = field oper = 'eq' self._instances = self._filter(field, oper, query, self._instances) return self
@as_list def _filter(self, field, oper, value, instances): """ helper function that does the actual work of filtering out instances """ def in_(coll, val): if not isinstance(coll, str) \ and isinstance(coll, return bool( set(coll) & set(val) ) # intersection else: return coll in val def rin_(coll, val): if not isinstance(val, str) \ and isinstance(val, return bool( set(coll) & set(val) ) # intersection else: return val in coll def regex(coll, val): return, coll, re.UNICODE) def regexi(coll, val): return, coll, re.UNICODE | re.IGNORECASE) if oper == 'in': assert isinstance(value, oper = in_ elif oper == 'rin': assert isinstance(field, oper = rin_ elif oper == 're': oper = regex elif oper == 'rei': oper = regexi else: oper = getattr(operator, oper) # TODO: exact, iexact, (i)contains == rin, (i)startswith, (i)endswith, # range (for dates), date (return datetime as date), year/month/day, # hour/minute/second, week_day (sun=1, sat=7) return filter( lambda e: oper(getattr(e, field), value), instances)
[docs] def order_by(self, *fields): """ change order of self.instances :param str fields: field names, prefixed with optional '-' to indicate reverse order :rtype: Query **warning**: because this isn't a real database and we don't have grouping, passing in multiple fields will very possibly sort on the last field only. python sorting is stable however, so a multiple field sort may work as intended. """ def _order_by( field ): "return reversed, field_name" if field.startswith('-'): return True, field[1:] else: return False, field if fields == ('pk',): fields = self.model_class.Meta.pk_fields.keys() for field in fields: reverse, field = _order_by( field ) key = operator.attrgetter(field) self._instances = sorted(self._instances, key=key, reverse=reverse) return self
[docs] def group_by(self, field): """ returns a dict of distinct values and Query objects :param field: field name :rtype: ``dict`` :: MyModel.objects.group_by('str_type') { 's1': <Query MyModel(1), MyModel(3)> 's2': <Query MyModel(2)> } """ def _filter(value): # need to start with a fresh query object to work with all objects return Query(self.manager).filter(**{field: value}) values = self.distinct(field)[0] groups = { value: _filter(value) for value in values } return groups
[docs] @as_list def limit(self, n): """ return first(+) or last(-) n elements this has to be the last call during a query since it returns a list of instances and not a Query. passing in 0 is a no-op and returns all instances :param int n: non-zero integer :rtype: ``list`` """ if n > 0: return map(copy.copy, self._instances[:n]) elif n < 0: return map(copy.copy, self._instances[n:]) else: # n == 0, return all instead of [] because why not? return map(copy.copy, self._instances)
[docs] def first(self): """ return first object from query, depends on ordering raise if query is empty """ try: return self[0] except IndexError: raise self.model_class.DoesNotExist()
[docs] @as_list def values(self, *fields): """ returns list of dicts, each sub-list contains (field_name, field_value) :rtype: list of OrderedDict :: MyModel.objects.values('int_type','str_type') # [ OrderedDict([('int_type', 10), ('str_type', u'hi')]), # OrderedDict([('int_type', 11), ('str_type', u'hi')]) ] """ if not fields: fields = self.field_names def _mk_dict( obj, fields ): vals = [ (field, getattr(obj, field)) for field in fields ] return collections.OrderedDict(vals) return map(lambda obj: _mk_dict(obj, fields), self._instances)
[docs] def values_list(self, *fields, **kw): """ returns nested list of values in given ``fields`` order if flat=True, returns single list :param str fields: field names or all fields if empty :param bool kw: ``flat`` :rtye: ``list`` see :func:`alkali.query.Query.annotate` for example """ flat = kw.pop('flat', False) assert len(kw) == 0, "extra kwargs passed to values_list" if not fields: fields = self.field_names if flat: return [ getattr(e, field) for field in fields for e in self._instances ] else: return [ [getattr(e, field) for field in fields] for e in self._instances ]
[docs] def exists(self): """ does the current query hold any elements :rtype: int """ return len(self) > 0
[docs] def aggregate(self, *args, **kw): """ Aggregate (aka reduce) the query via the given function. Each callable ``Aggregate`` object takes a field/property name as a parameter. The returned dictionary has key ``<field_name>__<agg function>`` unless keyword is given. :param Aggregate args: ``Count`` ``Sum`` ``Max`` ``Min`` :param kw: ``key_value=Aggregate``, note: ``field_name`` can be a ``property`` :rtype: ``dict`` :: MyModel.objects.aggregate( the_count=Count('id'), Sum('size') ) # { 'the_count': 12, 'size__sum': 24957 } """ ret = {} for agg in args: key = '{}__{}'.format(agg.field, agg.__class__.__name__.lower()) ret[key] = agg(self) for field, agg in kw.items(): ret[field] = agg(self) return ret
[docs] def annotate(self, **kw): """ add a variable to each model instance currently in the query each element in query is passed into function. :param kw: variable=function :rtype: Query :: c = itertools.count() Counter = lambda elem, c=c: MyModel(int_type=10).save() MyModel(int_type=11).save() MyModel.objects.annotate(counter=Counter).values_list('int_type','counter') # [[10, 1], [11, 2]] MyModel.objects.annotate(counter=Counter).values_list('int_type','counter') # [[10, 3], [11, 4]] """ # make sure instances are a copy so we don't annotate the originals self._instances = [copy.copy(obj) for obj in self._instances] for name, func in kw.items(): if not callable(func): func = lambda elem, val=func: val for elem in self._instances: setattr( elem, name, func(elem) ) return self
[docs] def distinct(self, *fields): """ returns a list of lists, each sub-list contains distinct values of the given field :param str fields: field names :rtype: ``list`` :: MyModel(int_type=10, str_type='hi').save() MyModel(int_type=11, str_type='hi').save() MyModel(int_type=12, str_type='there').save() MyModel.objects.distinct('int_type','str_type') # [[10, 11, 12], [u'there', u'hi']] MyModel.objects.distinct('str_type') # [[u'there', u'hi']] """ ret = [] for field in fields: distinct = {getattr(elem, field) for elem in self._instances} # set ret.append( list(distinct) ) return ret