import logging
from typing import Any, Callable, Union
from .conditions import render
from .exceptions import (
InvalidModel,
InvalidStream,
InvalidTemplate,
MissingObjects,
)
from .models import BaseModel, Index, subclassof, unpack_from_dynamodb
from .search import Search
from .session import SessionWrapper
from .signals import (
before_create_table,
model_bound,
model_validated,
object_deleted,
object_loaded,
object_saved,
)
from .stream import Stream
from .transactions import ReadTransaction, WriteTransaction
from .util import dump_key, extract_key, index_for, walk_subclasses
__all__ = ["Engine"]
logger = logging.getLogger("bloop.engine")
_sync_values = {
"save": {
None: "NONE",
"new": "ALL_NEW",
"old": "ALL_OLD"
},
"delete": {
None: "NONE",
"old": "ALL_OLD"
}
}
def validate_not_abstract(*objs):
for obj in objs:
if obj.Meta.abstract:
cls = obj if isinstance(obj, type) else obj.__class__
raise InvalidModel("{!r} is abstract.".format(cls.__name__))
def validate_is_model(model):
if not subclassof(model, BaseModel):
cls = model if isinstance(model, type) else model.__class__
raise InvalidModel("{!r} does not subclass BaseModel.".format(cls.__name__))
def validate_sync(mode, value):
allowed = _sync_values[mode]
wire = allowed.get(value)
if wire is None:
raise ValueError(f"Unrecognized option {value!r} for sync parameter, must be one of {set(allowed.keys())}")
return wire
TableNameFormatter = Callable[[Any], str]
def create_get_table_name_func(table_name_template: Union[str, TableNameFormatter]) -> TableNameFormatter:
if isinstance(table_name_template, str):
if "{table_name}" not in table_name_template:
raise InvalidTemplate("table name template must contain '{table_name}'")
return lambda o: table_name_template.format(table_name=o.Meta.table_name)
elif callable(table_name_template):
return table_name_template
else:
raise ValueError("table name template must be a string or function")
[docs]class Engine:
# noinspection PyUnresolvedReferences
"""Primary means of interacting with DynamoDB.
To apply a prefix to each model's table name, you can use a simple format string:
.. code-block:: pycon
>>> template = "my-prefix-{table_name}"
>>> engine = Engine(table_name_template=template)
For more complex table_name customization, you can provide a function:
.. code-block:: pycon
>>> def reverse_name(model):
... return model.Meta.table_name[::-1]
>>> engine = Engine(table_name_template=reverse_name)
:param dynamodb: DynamoDB client. Defaults to ``boto3.client("dynamodb")``.
:param dynamodbstreams: DynamoDBStreams client. Defaults to ``boto3.client("dynamodbstreams")``.
:param table_name_template: Customize the table name of each model bound to the engine. If a string
is provided, string.format(table_name=model.Meta.table_name) will be called. If a function is provided, the
function will be called with the model as its sole argument. Defaults to "{table_name}".
"""
def __init__(
self, *,
dynamodb=None, dynamodbstreams=None,
table_name_template: Union[str, TableNameFormatter] = "{table_name}"):
self._compute_table_name = create_get_table_name_func(table_name_template)
self.session = SessionWrapper(dynamodb=dynamodb, dynamodbstreams=dynamodbstreams)
[docs] def bind(self, model, *, skip_table_setup=False):
"""Create backing tables for a model and its non-abstract subclasses.
:param model: Base model to bind. Can be abstract.
:param skip_table_setup: Don't create or verify the table in DynamoDB. Default is False.
:raises bloop.exceptions.InvalidModel: if ``model`` is not a subclass of :class:`~bloop.models.BaseModel`.
"""
# Make sure we're looking at models
validate_is_model(model)
concrete = set(filter(lambda m: not m.Meta.abstract, walk_subclasses(model)))
if not model.Meta.abstract:
concrete.add(model)
logger.debug("binding non-abstract models {}".format(
sorted(c.__name__ for c in concrete)
))
# create_table doesn't block until ACTIVE or validate.
# It also doesn't throw when the table already exists, making it safe
# to call multiple times for the same unbound model.
if skip_table_setup:
logger.info("skip_table_setup is True; not trying to create tables or validate models during bind")
else:
self.session.clear_cache()
is_creating = {}
for model in concrete:
table_name = self._compute_table_name(model)
before_create_table.send(self, engine=self, model=model)
if not skip_table_setup:
if table_name in is_creating:
continue
creating = self.session.create_table(table_name, model)
is_creating[table_name] = creating
for model in concrete:
if not skip_table_setup:
table_name = self._compute_table_name(model)
if is_creating[table_name]:
# polls until table is active
self.session.describe_table(table_name)
if model.Meta.ttl:
self.session.enable_ttl(table_name, model)
if model.Meta.backups and model.Meta.backups["enabled"]:
self.session.enable_backups(table_name, model)
self.session.validate_table(table_name, model)
model_validated.send(self, engine=self, model=model)
model_bound.send(self, engine=self, model=model)
logger.info("successfully bound {} models to the engine".format(len(concrete)))
[docs] def delete(self, *objs, condition=None, sync=None):
"""Delete one or more objects.
:param objs: objects to delete.
:param condition: only perform each delete if this condition holds.
:param sync:
update objects after deleting. "old" loads attributes before the delete;
None does not mutate the object locally. Default is None.
:raises bloop.exceptions.ConstraintViolation: if the condition (or atomic) is not met.
"""
objs = set(objs)
validate_not_abstract(*objs)
validate_sync("delete", sync)
for obj in objs:
attrs = self.session.delete_item({
"TableName": self._compute_table_name(obj.__class__),
"Key": dump_key(self, obj),
"ReturnValues": validate_sync("delete", sync),
**render(self, obj=obj, condition=condition)
})
if attrs is not None:
unpack_from_dynamodb(attrs=attrs, expected=obj.Meta.columns, engine=self, obj=obj)
object_deleted.send(self, engine=self, obj=obj)
logger.info("successfully deleted {} objects".format(len(objs)))
[docs] def load(self, *objs, consistent=False):
"""Populate objects from DynamoDB.
:param objs: objects to delete.
:param bool consistent: Use `strongly consistent reads`__ if True. Default is False.
:raises bloop.exceptions.MissingKey: if any object doesn't provide a value for a key column.
:raises bloop.exceptions.MissingObjects: if one or more objects aren't loaded.
__ http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadConsistency.html
"""
get_table_name = self._compute_table_name
objs = set(objs)
validate_not_abstract(*objs)
table_index, object_index, request = {}, {}, {}
for obj in objs:
table_name = get_table_name(obj.__class__)
key = dump_key(self, obj)
index = index_for(key)
if table_name not in object_index:
table_index[table_name] = list(sorted(key.keys()))
object_index[table_name] = {}
request[table_name] = {"Keys": [], "ConsistentRead": consistent}
if index not in object_index[table_name]:
request[table_name]["Keys"].append(key)
object_index[table_name][index] = set()
object_index[table_name][index].add(obj)
response = self.session.load_items(request)
for table_name, list_of_attrs in response.items():
for attrs in list_of_attrs:
key_shape = table_index[table_name]
key = extract_key(key_shape, attrs)
index = index_for(key)
for obj in object_index[table_name].pop(index):
unpack_from_dynamodb(
attrs=attrs, expected=obj.Meta.columns, engine=self, obj=obj)
object_loaded.send(self, engine=self, obj=obj)
if not object_index[table_name]:
object_index.pop(table_name)
if object_index:
not_loaded = set()
for index in object_index.values():
for index_set in index.values():
not_loaded.update(index_set)
logger.info("loaded {} of {} objects".format(len(objs) - len(not_loaded), len(objs)))
raise MissingObjects("Failed to load some objects.", objects=not_loaded)
logger.info("successfully loaded {} objects".format(len(objs)))
[docs] def query(self, model_or_index, key, filter=None, projection="all", consistent=False, forward=True):
"""Create a reusable :class:`~bloop.search.QueryIterator`.
:param model_or_index: A model or index to query. For example, ``User`` or ``User.by_email``.
:param key:
Key condition. This must include an equality against the hash key, and optionally one
of a restricted set of conditions on the range key.
:param filter: Filter condition. Only matching objects will be included in the results.
:param projection:
"all", "count", a set of column names, or a set of :class:`~bloop.models.Column`. When projection is
"count", you must advance the iterator to retrieve the count.
:param bool consistent: Use `strongly consistent reads`__ if True. Default is False.
:param bool forward: Query in ascending or descending order. Default is True (ascending).
:return: A reusable query iterator with helper methods.
:rtype: :class:`~bloop.search.QueryIterator`
__ http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadConsistency.html
"""
if isinstance(model_or_index, Index):
model, index = model_or_index.model, model_or_index
else:
model, index = model_or_index, None
validate_not_abstract(model)
q = Search(
mode="query", engine=self, model=model, index=index, key=key, filter=filter,
projection=projection, consistent=consistent, forward=forward)
return iter(q.prepare())
[docs] def save(self, *objs, condition=None, sync=None):
"""Save one or more objects.
:param objs: objects to save.
:param condition: only perform each save if this condition holds.
:param sync:
update objects after saving. "new" loads attributes after the save;
"old" loads attributes before the save; None does not mutate the object locally. Default is None.
:raises bloop.exceptions.ConstraintViolation: if the condition (or atomic) is not met.
"""
objs = set(objs)
validate_not_abstract(*objs)
for obj in objs:
attrs = self.session.save_item({
"TableName": self._compute_table_name(obj.__class__),
"Key": dump_key(self, obj),
"ReturnValues": validate_sync("save", sync),
**render(self, obj=obj, condition=condition, update=True)
})
if attrs is not None:
unpack_from_dynamodb(attrs=attrs, expected=obj.Meta.columns, engine=self, obj=obj)
object_saved.send(self, engine=self, obj=obj)
logger.info("successfully saved {} objects".format(len(objs)))
[docs] def scan(self, model_or_index, filter=None, projection="all", consistent=False, parallel=None):
"""Create a reusable :class:`~bloop.search.ScanIterator`.
:param model_or_index: A model or index to scan. For example, ``User`` or ``User.by_email``.
:param filter: Filter condition. Only matching objects will be included in the results.
:param projection:
"all", "count", a list of column names, or a list of :class:`~bloop.models.Column`. When projection is
"count", you must exhaust the iterator to retrieve the count.
:param bool consistent: Use `strongly consistent reads`__ if True. Default is False.
:param tuple parallel: Perform a `parallel scan`__. A tuple of (Segment, TotalSegments)
for this portion the scan. Default is None.
:return: A reusable scan iterator with helper methods.
:rtype: :class:`~bloop.search.ScanIterator`
__ http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadConsistency.html
__ http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/QueryAndScan.html#QueryAndScanParallelScan
"""
if isinstance(model_or_index, Index):
model, index = model_or_index.model, model_or_index
else:
model, index = model_or_index, None
validate_not_abstract(model)
s = Search(
mode="scan", engine=self, model=model, index=index, filter=filter,
projection=projection, consistent=consistent, parallel=parallel)
return iter(s.prepare())
[docs] def stream(self, model, position):
# noinspection PyUnresolvedReferences
"""Create a :class:`~bloop.stream.Stream` that provides approximate chronological ordering.
.. code-block:: pycon
# Create a user so we have a record
>>> engine = Engine()
>>> user = User(id=3, email="user@domain.com")
>>> engine.save(user)
>>> user.email = "admin@domain.com"
>>> engine.save(user)
# First record lacks an "old" value since it's an insert
>>> stream = engine.stream(User, "trim_horizon")
>>> next(stream)
{'key': None,
'old': None,
'new': User(email='user@domain.com', id=3, verified=None),
'meta': {
'created_at': datetime.datetime(2016, 10, 23, ...),
'event': {
'id': '3fe6d339b7cb19a1474b3d853972c12a',
'type': 'insert',
'version': '1.1'},
'sequence_number': '700000000007366876916'}
}
:param model: The model to stream records from.
:param position: "trim_horizon", "latest", a stream token, or a :class:`datetime.datetime`.
:return: An iterator for records in all shards.
:rtype: :class:`~bloop.stream.Stream`
:raises bloop.exceptions.InvalidStream: if the model does not have a stream.
"""
validate_not_abstract(model)
if not model.Meta.stream or not model.Meta.stream.get("arn"):
raise InvalidStream("{!r} does not have a stream arn".format(model))
stream = Stream(model=model, engine=self)
stream.move_to(position=position)
return stream
[docs] def transaction(self, mode="w"):
# noinspection PyUnresolvedReferences
"""
Create a new :class:`~bloop.transactions.ReadTransaction` or :class:`~bloop.transactions.WriteTransaction`.
As a context manager, calling commit when the block exits:
.. code-block:: pycon
>>> engine = Engine()
>>> user = User(id=3, email="user@domain.com")
>>> tweet = Tweet(id=42, data="hello, world")
>>> with engine.transaction("w") as tx:
... tx.delete(user)
... tx.save(tweet, condition=Tweet.id.is_(None))
Or manually calling prepare and commit:
.. code-block:: pycon
>>> engine = Engine()
>>> user = User(id=3, email="user@domain.com")
>>> tweet = Tweet(id=42, data="hello, world")
>>> tx = engine.transaction("w")
>>> tx.delete(user)
>>> tx.save(tweet, condition=Tweet.id.is_(None))
>>> tx.prepare().commit()
:param str mode: Either "r" or "w" to create a ReadTransaction or WriteTransaction. Default is "w"
:return: A new transaction that can be committed.
:rtype: :class:`~bloop.transactions.ReadTransaction` or :class:`~bloop.transactions.WriteTransaction`
"""
if mode == "r":
cls = ReadTransaction
elif mode == "w":
cls = WriteTransaction
else:
raise ValueError(f"unknown mode {mode}")
return cls(self)