Using the Engine¶
The Engine
is the main way you'll interact with DynamoDB (and DynamoDBStreams).
Once you've defined some models, you're ready to start
loading
, saving
and
querying
.
Attention
This section uses the same User
model from the previous section. If you've haven't already done so,
go back and set that up.
Configuration¶
Engines expose a small number of configuration options. On __init__
, there are three optional kwargs:
dynamodb
, a DynamoDB client defaulting toboto3.client("dynamodb")
dynamodbstreams
, a DynamoDBStreams client defaulting toboto3.client("dynamodbstreams")
table_name_template
, a format string containing "{table_name}" or a function that takes a model and returns a table name for the engine.
You will rarely need to modify the first two, except when you are constructing multiple engines (eg. cross-region replication) or connecting to DynamoDBLocal. For examples of both, see Bloop Patterns.
Most of the time, you will use table_name_template
to inject configuration into your model/table bindings. For
example, the following will prefix every table name with "dev-"
for local development:
engine = Engine(table_name_template="dev-{table_name}")
Meanwhile, the following function will suffix the table name with a random int:
def with_nonce(model):
return f"{model.Meta.table_name}-{random.randint(0, 10)}"
engine = Engine(table_name_template=with_nonce)
Bind¶
As noted in the previous section, every model must first be bound to a backing table with
Engine.bind
before we can interact with instances in DynamoDB.
Note
Starting with 1.1.0, the skip_table_setup
parameter is available to bypass the create/verify calls
to DynamoDB. This is not recommended except in situations where models are bound frequently, ie. a high-volume
Lambda function. See Issue #83.
When an engine binds a model, it also binds all non-abstract subclasses. This means you can bind all models in one call, centralizing any error handling or table correction. For example, you may have specialized models for users, notifications, and impressions. Each of these can be grouped with an abstract base, and then all specialized models created at once:
class BaseUser(BaseModel):
class Meta:
abstract = True
class BaseNotification(BaseModel):
class Meta:
abstract = True
...
class Admin(BaseUser):
...
class Moderator(BaseUser):
...
class PriorityNotification(BaseNotification):
...
class EmailNotification(BaseNotification):
...
try:
engine.bind(BaseUser)
except TableMismatch:
print("Failed to bind all user models")
try:
engine.bind(BaseNotification)
except TableMismatch:
print("Failed to bind all notification models")
Now you can import a single base (BaseModel
or a subclass) from your models.py
module
and automatically bind any dynamic models created from that base.
Save¶
Save
is performed with UpdateItem since absolute overwrites (such as PutItem)
are rarely desired in a distributed, optimistic concurrency system. This is the central decision that enables a
table to back multiple models. A partial save allows a model to update an item in the table without accidentally
clearing the columns that model doesn't know about.
Saving an item or items is very simple:
>>> from datetime import datetime, timezone
>>> now = datetime.now(timezone.utc)
>>> user = User(...)
>>> engine.save(user)
>>> tweet = Tweet(...)
>>> user.last_activity = now
>>> engine.save(user, tweet)
Save Conditions¶
You can perform optimistic saves with a condition
. If a condition is not met when DynamoDB tries to apply the
update, the update fails and bloop immediately raises ConstraintViolation
. Conditions are
specified on columns using the standard <, >=, ==, ...
operators, as well as
begins_with, between, contains, in_
. Conditions can be chained together and combined with bitwise operators
&, |, ~
:
>>> user = User(username="numberoverzero")
>>> username_available = User.username.is_(None)
>>> engine.save(user, condition=username_available)
# Success
>>> engine.save(user, condition=username_available)
Traceback (most recent call last):
...
ConstraintViolation: The condition was not met.
Return Values¶
You can optionally specify sync="old"
or sync="new"
to update the saved objects with the last seen or most
recent values when the save completes. This saves a read unit and is strongly consistent, and can be useful to eg.
read the last value before you overwrote an attr or fetch attributes you didn't modify:
>>> user = User(username="n0", email="user@n0.dev")
>>> engine.save(user, sync="new")
>>> if not user.verified:
... helpers.send_verification_reminder(user.email, since=user.created_on)
In a highly concurrent environment the sync="old"
option is very useful to capture the last value a field held
before overwriting; then you can safely clean up any cascading references. For example, if you store an
s3 object key that points to the latest revision of some document you might model it as follows:
class Document(BaseModel):
name = Column(String, hash_key=True)
location = Column(String)
The following could cause dangling objects if two updates occur at the same time:
def wrong_update(name, new_location):
doc = Document(name=name)
engine.load(doc)
if doc.location != new_location:
delete_s3_object(doc.location)
doc.location = new_location
engine.save(doc)
Instead, you should read the previous values when you perform the write, and then clean up the location:
def correct_update(name, new_location):
doc = Document(name=name, location=new_location)
engine.save(doc, sync="old")
if doc.location != new_location:
delete_s3_object(doc.location)
Actions¶
Most changes you make to modeled objects fall into two update categories: SET
and REMOVE
. Any time a value
serializes as None
or you call del myobj.some_attr
it will likely be a remove, while myobj.attr = value
will be a set. (This is up to the column's type, so you can override this behavior to use your own sentinel values).
Warning
As mentioned in Issue #136 and the DynamoDb Developer Guide, an atomic counter is not appropriate
unless you can tolerate overcounting or undercounting. AWS explicitly discourages using add
or delete
in general.
Dynamo exposes two additional update types: ADD
and DELETE
. These allow you to specify relative changes
without knowing the current value stored in Dynamo. One of the most common examples is a website view count: for a
popular website the optimistic concurrency model will cause a lot of write contention and cap your throughput since
each change requires a read, modify, save. If there's a conflict you'll need to do all three again, for each writer.
Instead of reading the value and using a conditional save, you can instead wrap the offset in a
bloop.actions.add()
and tell bloop to apply the desired change. Compare the two following:
# Option 1) conditional write, wrap in retries
website = Website("google.com")
engine.load(website)
website.views += 1
# raises ConstraintViolation most of the time due to write contention
engine.save(website, condition=Website.views==(website.views-1))
# Option 2) add instead of set
website = Website("google.com")
website.views = bloop.actions.add(1)
# no contention
engine.save(website)
When combined with return values above, we can add 1 and see the new value all in one call:
website = Website("google.com")
website.views = bloop.actions.add(1)
engine.save(website, sync=True)
print(f"views after save: {website.views}")
Note that bloop.actions.set()
and bloop.actions.remove()
are assumed if you don't set a column
to an explicit action:
# both equivalent
website.views = 21
website.views = bloop.actions.set(21)
# all equivalent
website.views = None
del website.views
website.views = bloop.actions.remove(None)
Finally, the bloop.actions.add()
action only supports Number and Set data types.
In addition, add can only be used on top-level attributes, not nested attributes.
Meanwhile bloop.actions.delete()
only supports the Set data type.
It can also only be used on top-level attributes.
Delete¶
Delete
has the same signature as save()
. Both
operations are mutations on an object that may or may not exist, and simply map to two different APIs (Delete calls
DeleteItem). You can delete multiple objects at once; specify a condition
; and use sync="old"
to update
local objects with their last values before deletion.
>>> from datetime import datetime, timedelta, timezone
>>> engine.delete(user, tweet)
>>> now = datetime.now(timezone.utc)
>>> cutoff = now - timedelta(years=2)
>>> engine.delete(
... account,
... condition=Account.last_login < cutoff)
>>> banned_account = Account(id="user@n0.dev")
>>> engine.delete(banned_account, sync="old")
>>> last_email = banned_account.email
>>> helpers.notify_acct_change(last_email, reason="spamming")
Load¶
Unlike most existing DynamoDB object mappers, Bloop does not create new instances when loading objects. This improves performance and allows you to use thick or thin models by minimizing how many times the constructor is invoked for effectively the same object (same hash/range keys).
Like save()
and delete()
above,
Engine.load
takes a variable number of objects to load from DynamoDB:
>>> user = User(id="some-id")
>>> tweet = Tweet(user="some-id", id="some-tweet")
>>> engine.load(user, tweet)
If consistent
is True, then strongly consistent reads will be used:
>>> objs = user, tweet
>>> engine.load(*objs, consistent=True)
If any objects aren't loaded, Bloop raises MissingObjects
:
>>> user = User(username="not-real")
>>> engine.load(user)
Traceback (most recent call last):
...
MissingObjects: Failed to load some objects.
You can access MissingObjects.objects
to see which objects failed
to load.
Query¶
This section defines a new model to demonstrate the various filtering and conditions available:
class Account(BaseModel):
name = Column(String, hash_key=True)
number = Column(Integer, range_key=True)
created_on = Column(DateTime)
balance = Column(Number)
level = Column(Integer)
by_level = GlobalSecondaryIndex(
projection="all", hash_key=level)
by_balance = LocalSecondaryIndex(
projection={"created_on"}, range_key="balance")
engine = Engine()
engine.bind(Account)
All¶
Bloop's query and scan iterators are lazy, fetching only as many pages as needed to advance when you call next()
.
If you want to eagerly load all results, you can use all()
to load all
results into a single list. Note that calling all()
will reset the query, and will return an empty list if there
are no results.
>>> q = engine.query(Account,
... key=Account.name == "numberoverzero")
>>> q.all()
[Account(name='numberoverzero', number=21623]
>>> q.exhausted
True
>>> q.all()
[Account(name='numberoverzero', number=21623]
First¶
Often, you'll only need a single result from the query; with the correct sorting and indexes, the first result can
be used to get a maximum or minimum. Use first()
to get the first result,
if it exists. If there are no results, raises ConstraintViolation
.
>>> q = engine.query(Account,
... key=Account.name == "numberoverzero")
>>> q.first()
Account(name='numberoverzero', number=21623)
One¶
Similar to first()
, you can get the unique result of a query with
one()
. If there are no results, or more than one result, raises
ConstraintViolation
.
>>> q = engine.query(Account,
... key=Account.name == "numberoverzero")
>>> q.one()
Traceback (most recent call last):
...
ConstraintViolation: Query found more than one result.
Count¶
To get a count of items that match some query use the "count"
projection.
>>> q = engine.query(
... Account.by_email,
... key=Account.email == "foo@bar.com",
... projection="count")
>>> q.count
256
Both count
and scanned
are calculated only when the query is executed, so you must call
QueryIterator.reset()
to see changes take effect.
>>> new = Account(...)
>>> engine.save(new)
>>> q.count
256
>>> q.reset()
>>> q.count
257
Key Conditions¶
Queries can be performed against a Model or an Index. You must specify at least a hash key equality condition; a range key condition is optional.
>>> owned_by_stacy = Account.name == "Stacy"
>>> q = engine.query(Account, key=owned_by_stacy)
>>> for account in q:
... print(account)
...
Here, the query uses the Index's range_key to narrow the range of accounts to find:
>>> owned_by_stacy = Account.name == "Stacy"
>>> at_least_one_mil = Account.balance >= 1000000
>>> q = engine.query(Account.by_balance,
... key=owned_by_stacy & at_least_one_mil)
>>> for account in q:
... print(account.balance)
Note
A query must always include an equality check ==
or is_
against the model or index's hash key.
If you want to include a condition on the range key, it can be one of ==, <, <=, >, >=, between, begins_with
.
See the KeyConditionExpression parameter of the Query operation in the Developer's Guide.
Filtering¶
If you provide a filter
condition, DynamoDB only returns items that match the filter. Conditions can be on
any column -- except the hash and range key being queried -- projected into the Index. All non-key columns are
available for queries against a model. A filter condition can use any condition operations.
Here is the same LSI query as above, but now excluding accounts created in the last 30 days:
>>> from datetime import datetime, timedelta, timezone
>>> now = datetime.now(timezone.utc)
>>> recent = now - timedelta(days=30)
>>> key_condition = owned_by_stacy & at_least_one_mil
>>> exclude_recent = Account.created_on < recent
>>> q = engine.query(Account.by_balance,
... key=key_condition,
... filter=exclude_recent)
Warning
Trying to use a column that's not part of an Index's projection will raise
InvalidFilterCondition
, since the value can't be loaded. This does not apply to queries
against an LSI with strict=False
, which will consume additional reads to apply the filter.
>>> q = engine.query(Account.by_balance,
... key=key_condition,
... filter=Account.level == 3)
Traceback (most recent call last):
...
InvalidFilterCondition: <Column[Account.level]> is not available for the projection.
Projections¶
By default, queries return all columns projected into the index or model. You can use the projection
parameter
to control which columns are returned for each object. This must be "all" to include everything in the index or
model's projection, or a set of columns or column model names to include.
>>> q = engine.query(Account,
... key=key_condition,
... projection={"email", "balance"})
>>> account = q.first()
>>> account.email
'user@domain.com'
>>> account.balance
Decimal('3400')
>>> account.level
Traceback (most recent call last):
...
AttributeError: ...
Because the projection did not include Account.level
, it was not loaded on the account object.
Configuration Options¶
The remaining options are consistent
and forward
. When consistent
is True,
strongly consistent reads are used. By default, consistent is False. Use forward
to query ascending
or descending. By default forward
is True, or ascending.
Iterator State¶
The QueryIterator
exposes a number of properties to inspect its current progress:
count
-- the number of items loaded from DynamoDB so far, including buffered items.exhausted
-- True if there are no more resultsscanned
-- the number of items DynamoDB evaluated, before applying any filter condition.
To restart a query, use QueryIterator.reset()
:
>>> query = engine.query(...)
>>> unique = query.one()
>>> query.exhausted
True
>>> query.reset()
>>> query.exhausted
False
>>> same = query.one()
>>> unique == same # Assume we implemented __eq__
True
Continuation Tokens¶
It is possible to record the state of an iterator and recreate that state in a separate thread or process using a
continuation token. Use the token
property to retrieve a continuation token describing the current state of the
iterator. When recreating the iterator, pass the token to the
QueryIterator.move_to()
method to restore the previous state:
>>> query = engine.query(...)
>>> for _ in range(10):
... next(query) # read the first ten records.
...
>>> token = query.token
>>> resumed = engine.query(...)
>>> resumed.move_to(token)
>>> for _ in range(10):
... next(query) # read the next ten records.
Scan¶
Scan and Query share a very similar interface. Unlike Query, Scan does not have a key condition and can't be performed in descending order. Scans can be performed in parallel, however.
Using the same model from Query, we can scan the model or an index:
>>> for account in engine.scan(Account):
... print(account.email)
...
>>> for account in engine.scan(Account.by_email):
... print(account.email)
And get the first, or unique result:
>>> some_account = engine.scan(Account).first()
>>> one_account = engine.scan(Account).one()
Traceback (most recent call last):
...
ConstraintViolation: Scan found more than one result.
Use filter
and projection
to exclude items and control which columns are included in results:
>>> scan = engine.scan(Account,
... filter=Account.email.contains("@"),
... projection={"level", "email"})
And consistent
to use strongly consistent reads:
>>> scan = engine.scan(Account.by_balance, consistent=True)
Parallel Scans¶
Scans can be performed in parallel, using the parallel
parameter. To specify which segment you are
constructing the scan for, pass a tuple of (Segment, TotalSegments)
:
>>> first_segment = engine.scan(Account, parallel=(0, 2))
>>> second_segment = engine.scan(Account, parallel=(1, 2))
You can easily construct a parallel scan with s
segments by calling engine.scan in a loop:
def parallelize(s, engine, *args, **kwargs):
for i in range(s):
kwargs["parallel"] = (i, s)
yield engine.scan(*args, **kargs)
workers = scan_workers(n=10)
scans = parallelize(10, engine, Account, filter=...)
for worker, scan in zip(threads, scans):
worker.process(scan)
Transactions¶
Note
For a detailed guide to using transactions, see the Transactions section of the User Guide.
You can construct a read or write transaction by passing each mode:
>>> read_tx = engine.transaction(mode="r")
>>> write_tx = engine.transaction(mode="w") # defaults to write
You can also use the transaction as a context manager:
>>> with engine.transaction() as tx:
... tx.save(user, condition=User.id.is_(None))
... tx.delete(tweet)
... tx.check(meta, Metadata.verified.is_(True))
...
>>> # tx is committed or raises TransactionCanceled
To manually commit a transaction, call prepare()
and
commit()
:
>>> tx = engine.transaction(mode="r")
>>> tx.load(user, tweet)
>>> prepared = tx.prepare()
>>> prepared.commit()
>>> prepared.commit() # subsequent commits on a ReadTransaction re-load the objects
Stream¶
Note
Before you can create a stream on a model, you need to enable it in the model's Meta. For a detailed guide to using streams, head over to the Streams section of the User Guide.
To start from the beginning or end of the stream, use "trim_horizon" and "latest":
>>> stream = engine.stream(User, position="trim_horizon")
>>> stream = engine.stream(Account, "latest")
Alternatively, you can use an existing stream token to reload its previous state:
>>> same_stream = engine.stream(
... Impression, previous_stream.token)
Lastly, you can use a datetime. This is an expensive call, and walks the entire stream from the trim horizon until it finds the first record in each shard after the target datetime.
>>> from datetime import datetime, timedelta, timezone
>>> now = datetime.now(timezone.utc)
>>> yesterday = now - timedelta(hours=12)
>>> stream = engine.stream(User, yesterday)