# -*- coding: utf-8 -*-
"""
::
from alkali import Database, Model
class MyModel( Model ):
id = fields.IntField(primary_key=True)
title = fields.StringField()
db = Database( models=[MyModel] )
# create 10 instances and save them
for i in range(10):
MyModel(id=i, title='number %d' % i).save()
assert MyModel.objects.count == 10
assert MyModel.objects.filter(id__gt=5).count == 4
assert MyModel.objects.filter(id__gt=5, id__le=7).count == 2
assert MyModel.objects.get(pk=1).title == 'number 1'
assert MyModel.objects.order_by('id')[0].id == 0
assert MyModel.objects.order_by('-id')[0].id == 9
"""
import types
import operator
import collections
import copy
import re
import logging
logger = logging.getLogger(__name__)
[docs]class Aggregate:
"""
A reducing function that returns a single value
"""
def __init__(self, field):
"""
:param field str:
"""
self.field = field
[docs]class Count(Aggregate):
"""
number of objects in query
"""
def __call__(self, query):
return len( query )
[docs]class Sum(Aggregate):
"""
sum of given field (numeric field required)
"""
def __call__(self, query):
return sum( query.values_list(self.field, flat=True) )
[docs]class Max(Aggregate):
"""
largest field (numeric field required)
"""
def __call__(self, query):
return max( query.values_list(self.field, flat=True) )
[docs]class Min(Aggregate):
"""
smallest field (numeric field required)
"""
def __call__(self, query):
return min( query.values_list(self.field, flat=True) )
# def copy_instances(func):
# def wrapper(*args, **kw):
# return map( copy.copy, func(*args, **kw) )
# return wrapper
[docs]def as_list(func):
def wrapper(*args, **kw):
ret = func(*args, **kw)
if type(ret) in [map, filter]:
ret = list(ret)
return ret
return wrapper
[docs]class Query:
"""
this class performs queries on manager instances
returns lists of model instances
this class is one of the main reasons to use alkali
the Django docs at https://docs.djangoproject.com/en/1.10/topics/db/queries/
will be fairly relevant to alkali, except for anything related to
foreign or many2many fields.
"""
def __init__( self, manager):
"""
this is an internal class so you shouldn't have to create it directly. create
via Manager. ``MyModel.objects``
:param Manager manager:
"""
self.manager = manager
# THINK: Query should work on the keys of manager instances,
# this might save a copy or two, then only dereferencing a query
# should return a copy. If we ever get multiple indexes then
# that might also dramatically speed up queries
# THINK: I tried really hard to make Query._instances an
# manager._instances.itervalues() but this is fraught with
# peril. You have to convert to a list to get it's length, you
# have to convert to a list to get __getitem__, and if you
# iterate over it once then it's "empty" if you need to do so
# again. I'm afraid that for now each Query object needs to get
# a copy of the Manager values. Note, you still need to copy the
# individual elements as they leave the Query.
self._instances = list(manager._instances.values())
self.order_by('pk')
def __len__(self):
return len(self._instances)
def __iter__(self):
for elem in self._instances:
yield copy.copy(elem)
def __getitem__(self, i):
return copy.copy(self._instances[i])
def __str__(self):
return "<Query: {}>".format(", ".join([str(q) for q in self]))
@property
def count(self):
"""
**property**: number of model instances we are currently tracking
"""
return len(self)
@property
def fields(self):
"""
**property**: helper function to get dict of model fields
:rtype: ``dict``
"""
return self.model_class.Meta.fields
@property
def model_class(self):
"""
**property**: return our managers model class
"""
return self.manager.model_class
@property
def field_names(self):
"""
**property**: return our model field names
:rtype: ``list`` of ``str``
"""
return self.fields.keys()
[docs] def all(self):
return self
[docs] def filter(self, **kw):
"""
:param kw: ``field_name__op=value``, note: ``field_name`` can be a ``property``
:rtype: Query
perform a query, keeping model instances that pass the criteria specified
in the ``kw`` parameter.
see example code above. see Django page for very thorough docs on
this functionality. basically, its field_name '__' operation = value.
::
# field/property f is 'foo' or 'bar'
MyModel.objects.filter( f__in=['foo','bar'] )
# 'foo' is in field/property myset
MyModel.objects.filter( myset__rin='foo' )
"""
for field, query in kw.items():
try:
field, oper = field.split('__')
oper = oper or 'eq'
except ValueError: # no __ in field name
field = field
oper = 'eq'
self._instances = self._filter(field, oper, query, self._instances)
return self
@as_list
def _filter(self, field, oper, value, instances):
"""
helper function that does the actual work of filtering out instances
"""
def in_(coll, val):
if not isinstance(coll, str) \
and isinstance(coll, collections.abc.Iterable):
return bool( set(coll) & set(val) ) # intersection
else:
return coll in val
def rin_(coll, val):
if not isinstance(val, str) \
and isinstance(val, collections.abc.Iterable):
return bool( set(coll) & set(val) ) # intersection
else:
return val in coll
def regex(coll, val):
return re.search(val, coll, re.UNICODE)
def regexi(coll, val):
return re.search(val, coll, re.UNICODE | re.IGNORECASE)
if oper == 'in':
assert isinstance(value, collections.abc.Iterable)
oper = in_
elif oper == 'rin':
assert isinstance(field, collections.abc.Iterable)
oper = rin_
elif oper == 're':
oper = regex
elif oper == 'rei':
oper = regexi
else:
oper = getattr(operator, oper)
# TODO: exact, iexact, (i)contains == rin, (i)startswith, (i)endswith,
# range (for dates), date (return datetime as date), year/month/day,
# hour/minute/second, week_day (sun=1, sat=7)
return filter( lambda e: oper(getattr(e, field), value), instances)
[docs] def order_by(self, *fields):
"""
change order of self.instances
:param str fields: field names, prefixed with optional '-' to
indicate reverse order
:rtype: Query
**warning**: because this isn't a real database and we don't have
grouping, passing in multiple fields will very possibly sort
on the last field only. python sorting is stable however, so a
multiple field sort may work as intended.
"""
def _order_by( field ):
"return reversed, field_name"
if field.startswith('-'):
return True, field[1:]
else:
return False, field
if fields == ('pk',):
fields = self.model_class.Meta.pk_fields.keys()
for field in fields:
reverse, field = _order_by( field )
key = operator.attrgetter(field)
self._instances = sorted(self._instances, key=key, reverse=reverse)
return self
[docs] def group_by(self, field):
"""
returns a dict of distinct values and Query objects
:param field: field name
:rtype: ``dict``
::
MyModel.objects.group_by('str_type')
{ 's1': <Query MyModel(1), MyModel(3)>
's2': <Query MyModel(2)> }
"""
def _filter(value):
# need to start with a fresh query object to work with all objects
return Query(self.manager).filter(**{field: value})
values = self.distinct(field)[0]
groups = { value: _filter(value) for value in values }
return groups
[docs] @as_list
def limit(self, n):
"""
return first(+) or last(-) n elements
this has to be the last call during a query since it returns a
list of instances and not a Query. passing in 0 is a no-op and
returns all instances
:param int n: non-zero integer
:rtype: ``list``
"""
if n > 0:
return map(copy.copy, self._instances[:n])
elif n < 0:
return map(copy.copy, self._instances[n:])
else: # n == 0, return all instead of [] because why not?
return map(copy.copy, self._instances)
[docs] def first(self):
"""
return first object from query, depends on ordering
raise if query is empty
"""
try:
return self[0]
except IndexError:
raise self.model_class.DoesNotExist()
[docs] @as_list
def values(self, *fields):
"""
returns list of dicts, each sub-list contains (field_name, field_value)
:rtype: list of OrderedDict
::
MyModel.objects.values('int_type','str_type')
# [ OrderedDict([('int_type', 10), ('str_type', u'hi')]),
# OrderedDict([('int_type', 11), ('str_type', u'hi')]) ]
"""
if not fields:
fields = self.field_names
def _mk_dict( obj, fields ):
vals = [ (field, getattr(obj, field)) for field in fields ]
return collections.OrderedDict(vals)
return map(lambda obj: _mk_dict(obj, fields), self._instances)
[docs] def values_list(self, *fields, **kw):
"""
returns nested list of values in given ``fields`` order
if flat=True, returns single list
:param str fields: field names or all fields if empty
:param bool kw: ``flat``
:rtye: ``list``
see :func:`alkali.query.Query.annotate` for example
"""
flat = kw.pop('flat', False)
assert len(kw) == 0, "extra kwargs passed to values_list"
if not fields:
fields = self.field_names
if flat:
return [
getattr(e, field) for field in fields
for e in self._instances
]
else:
return [
[getattr(e, field) for field in fields]
for e in self._instances
]
[docs] def exists(self):
"""
does the current query hold any elements
:rtype: int
"""
return len(self) > 0
[docs] def aggregate(self, *args, **kw):
"""
Aggregate (aka reduce) the query via the given function. Each callable
``Aggregate`` object takes a field/property name as a parameter.
The returned dictionary has key ``<field_name>__<agg function>`` unless
keyword is given.
:param Aggregate args: ``Count`` ``Sum`` ``Max`` ``Min``
:param kw: ``key_value=Aggregate``, note: ``field_name`` can be a ``property``
:rtype: ``dict``
::
MyModel.objects.aggregate( the_count=Count('id'), Sum('size') )
# { 'the_count': 12, 'size__sum': 24957 }
"""
ret = {}
for agg in args:
key = '{}__{}'.format(agg.field, agg.__class__.__name__.lower())
ret[key] = agg(self)
for field, agg in kw.items():
ret[field] = agg(self)
return ret
[docs] def annotate(self, **kw):
"""
add a variable to each model instance currently in the query
each element in query is passed into function.
:param kw: variable=function
:rtype: Query
::
c = itertools.count()
Counter = lambda elem, c=c: c.next()
MyModel(int_type=10).save()
MyModel(int_type=11).save()
MyModel.objects.annotate(counter=Counter).values_list('int_type','counter')
# [[10, 1], [11, 2]]
MyModel.objects.annotate(counter=Counter).values_list('int_type','counter')
# [[10, 3], [11, 4]]
"""
# make sure instances are a copy so we don't annotate the originals
self._instances = [copy.copy(obj) for obj in self._instances]
for name, func in kw.items():
if not callable(func):
func = lambda elem, val=func: val
for elem in self._instances:
setattr( elem, name, func(elem) )
return self
[docs] def distinct(self, *fields):
"""
returns a list of lists, each sub-list contains distinct values
of the given field
:param str fields: field names
:rtype: ``list``
::
MyModel(int_type=10, str_type='hi').save()
MyModel(int_type=11, str_type='hi').save()
MyModel(int_type=12, str_type='there').save()
MyModel.objects.distinct('int_type','str_type')
# [[10, 11, 12], [u'there', u'hi']]
MyModel.objects.distinct('str_type')
# [[u'there', u'hi']]
"""
ret = []
for field in fields:
distinct = {getattr(elem, field) for elem in self._instances} # set
ret.append( list(distinct) )
return ret