Source code for alkali.storage
import os
import types
import fcntl
from contextlib import contextmanager
#from zope.interface import Interface, Attribute, implements
import json
import csv
from .peekorator import Peekorator
import logging
logger = logging.getLogger(__name__)
[docs]class FileAlreadyLocked(Exception):
"""
the exception that is thrown when a storage instance tries
and fails to lock its data file
"""
pass
# class IStorage( Interface ):
# extension = Attribute("class level attr of desired filename extension. eg. json")
# def read(model_class):
# """
# yield (or return a list) of instantiated model_class objects or dicts
# up to implementer but likely you want to read filename
# """
# def write(iterator):
# """
# accept an iterator that yields elements
# up to implementer but likely you want to write out to filename
# """
[docs]class Storage(object):
"""
helper base class for the Storage object hierarchy
"""
def __init__(self, *args, **kw ):
pass
@property
def _name(self):
return self.__class__.__name__
[docs]class FileStorage(Storage):
"""
this helper class determines the on-disk representation of the database. it
could write out objects as json or plain txt or binary, that's up to
the implementation and should be transparent to any models/database.
"""
#implements(IStorage)
extension = 'raw'
def __init__(self, filename=None, *args, **kw ):
self._fhandle = None
self.filename = filename
def __del__(self):
self.unlock()
@property
def filename(self):
if self._fhandle is None:
return None
return self._fhandle.name
@filename.setter
def filename(self, filename):
"""
when setting the filename, immediately open and lock the file handle
"""
self.unlock()
if filename is None:
if self._fhandle:
self._fhandle.close()
self._fhandle = None
return
if isinstance(filename, str):
filename = os.path.expanduser(filename)
if os.path.exists(filename):
assert os.path.isfile(filename)
self._fhandle = open(filename, 'r+')
else:
self._fhandle = open(filename, 'w+')
else: # assuming file type
self._fhandle = filename
self.lock()
[docs] def lock(self):
if not self._fhandle:
return
try:
fcntl.flock(self._fhandle, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
raise FileAlreadyLocked("can't lock: {}".format(self.filename))
[docs] def unlock(self):
if not self._fhandle:
return
# I don't think this can ever fail
fcntl.flock(self._fhandle, fcntl.LOCK_UN)
[docs] def read(self, model_class):
"""
helper function that just reads a file
"""
self._fhandle.seek(0)
return self._fhandle.read()
def _write(self, iterator):
"""
helper function that just writes a file if data is not None
"""
if iterator is None:
return False
self._fhandle.seek(0)
for data in iterator:
self._fhandle.write(str(data))
self._fhandle.truncate()
self._fhandle.flush()
return True
[docs] def write(self, iterator):
return self._write(iterator)
[docs]class JSONStorage(FileStorage):
"""
save models in json format
"""
extension = 'json'
[docs] def read(self, model_class):
data = super(JSONStorage, self).read(model_class)
if not data:
return
for elem in json.loads(data):
yield elem
[docs] def write(self, iterator):
if iterator is None:
return False
f = self._fhandle
f.seek(0)
f.write('[\n')
_peek = Peekorator(iter(iterator))
for e in _peek:
data = json.dumps(e.dict)
f.write(data)
if not _peek.is_last():
f.write(',\n')
f.write('\n]')
# since the file may shrink (we've deleted records) then
# we must truncate the file at our current position to avoid
# stale data being present on the next load
f.truncate()
f.flush()
return True
[docs]class CSVStorage(FileStorage):
"""
load models in csv format
first line assumed to be column headers (aka: field names)
use `remap_fieldnames` to change column headers into model field names
"""
extension = 'csv'
[docs] def read(self, model_class):
self._fhandle.seek(0)
reader = csv.DictReader(self._fhandle)
for row in reader:
row = self.remap_fieldnames(model_class, row)
yield model_class(**row)
[docs] def remap_fieldnames(self, model_class, row):
"""
example of remap_fieldnames that could be defined
in derived class or as a stand-alone function.
warning: make sure your header row that contains field
names has no spaces in it
::
def remap_fieldnames(self, model_class, row):
fields = model_class.Meta.fields.keys()
for k in row.keys():
results_key = k.lower().replace(' ', '_')
if results_key not in fields:
if k == 'Some Wierd Name':
results_key = 'good_name'
else:
raise RuntimeError( "unknown field: {}".format(k) )
row[results_key] = row.pop(k)
return row
"""
return row
[docs] def write(self, iterator):
"""
warning: if ``remap_fieldnames`` changes names then saved file
will have a different header line than original file
"""
if iterator is None:
return False
f = self._fhandle
f.seek(0)
_peek = Peekorator(iter(iterator))
writer = None
for e in _peek:
if _peek.is_first():
fieldnames = e.Meta.fields.keys()
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerow(e.dict)
else:
writer.writerow(e.dict)
f.truncate()
return True