Using the Engine

The Engine is the main way you'll interact with DynamoDB (and DynamoDBStreams). Once you've defined some models, you're ready to start loading, saving and querying.

Attention

This section uses the same User model from the previous section. If you've haven't already done so, go back and set that up.

Configuration

Engines expose a small number of configuration options. On __init__, there are three optional kwargs:

  • dynamodb, a DynamoDB client defaulting to boto3.client("dynamodb")

  • dynamodbstreams, a DynamoDBStreams client defaulting to boto3.client("dynamodbstreams")

  • table_name_template, a format string containing "{table_name}" or a function that takes a model and returns a table name for the engine.

You will rarely need to modify the first two, except when you are constructing multiple engines (eg. cross-region replication) or connecting to DynamoDBLocal. For examples of both, see Bloop Patterns.

Most of the time, you will use table_name_template to inject configuration into your model/table bindings. For example, the following will prefix every table name with "dev-" for local development:

engine = Engine(table_name_template="dev-{table_name}")

Meanwhile, the following function will suffix the table name with a random int:

def with_nonce(model):
    return f"{model.Meta.table_name}-{random.randint(0, 10)}"

engine = Engine(table_name_template=with_nonce)

Bind

As noted in the previous section, every model must first be bound to a backing table with Engine.bind before we can interact with instances in DynamoDB.

Note

Starting with 1.1.0, the skip_table_setup parameter is available to bypass the create/verify calls to DynamoDB. This is not recommended except in situations where models are bound frequently, ie. a high-volume Lambda function. See Issue #83.

When an engine binds a model, it also binds all non-abstract subclasses. This means you can bind all models in one call, centralizing any error handling or table correction. For example, you may have specialized models for users, notifications, and impressions. Each of these can be grouped with an abstract base, and then all specialized models created at once:

class BaseUser(BaseModel):
    class Meta:
        abstract = True

class BaseNotification(BaseModel):
    class Meta:
        abstract = True

...

class Admin(BaseUser):
    ...

class Moderator(BaseUser):
    ...

class PriorityNotification(BaseNotification):
    ...

class EmailNotification(BaseNotification):
    ...


try:
    engine.bind(BaseUser)
except TableMismatch:
    print("Failed to bind all user models")

try:
    engine.bind(BaseNotification)
except TableMismatch:
    print("Failed to bind all notification models")

Now you can import a single base (BaseModel or a subclass) from your models.py module and automatically bind any dynamic models created from that base.

Save

Save is performed with UpdateItem since absolute overwrites (such as PutItem) are rarely desired in a distributed, optimistic concurrency system. This is the central decision that enables a table to back multiple models. A partial save allows a model to update an item in the table without accidentally clearing the columns that model doesn't know about.

Saving an item or items is very simple:

>>> from datetime import datetime, timezone
>>> now = datetime.now(timezone.utc)
>>> user = User(...)
>>> engine.save(user)
>>> tweet = Tweet(...)
>>> user.last_activity = now
>>> engine.save(user, tweet)

Save Conditions

You can perform optimistic saves with a condition. If a condition is not met when DynamoDB tries to apply the update, the update fails and bloop immediately raises ConstraintViolation. Conditions are specified on columns using the standard <, >=, ==, ... operators, as well as begins_with, between, contains, in_. Conditions can be chained together and combined with bitwise operators &, |, ~:

>>> user = User(username="numberoverzero")
>>> username_available = User.username.is_(None)
>>> engine.save(user, condition=username_available)
# Success
>>> engine.save(user, condition=username_available)
Traceback (most recent call last):
  ...
ConstraintViolation: The condition was not met.

Return Values

You can optionally specify sync="old" or sync="new" to update the saved objects with the last seen or most recent values when the save completes. This saves a read unit and is strongly consistent, and can be useful to eg. read the last value before you overwrote an attr or fetch attributes you didn't modify:

>>> user = User(username="n0", email="user@n0.dev")
>>> engine.save(user, sync="new")
>>> if not user.verified:
...     helpers.send_verification_reminder(user.email, since=user.created_on)

In a highly concurrent environment the sync="old" option is very useful to capture the last value a field held before overwriting; then you can safely clean up any cascading references. For example, if you store an s3 object key that points to the latest revision of some document you might model it as follows:

class Document(BaseModel):
    name = Column(String, hash_key=True)
    location = Column(String)

The following could cause dangling objects if two updates occur at the same time:

def wrong_update(name, new_location):
    doc = Document(name=name)
    engine.load(doc)
    if doc.location != new_location:
        delete_s3_object(doc.location)
    doc.location = new_location
    engine.save(doc)

Instead, you should read the previous values when you perform the write, and then clean up the location:

def correct_update(name, new_location):
    doc = Document(name=name, location=new_location)
    engine.save(doc, sync="old")
    if doc.location != new_location:
        delete_s3_object(doc.location)

Actions

Most changes you make to modeled objects fall into two update categories: SET and REMOVE. Any time a value serializes as None or you call del myobj.some_attr it will likely be a remove, while myobj.attr = value will be a set. (This is up to the column's type, so you can override this behavior to use your own sentinel values).

Warning

As mentioned in Issue #136 and the DynamoDb Developer Guide, an atomic counter is not appropriate unless you can tolerate overcounting or undercounting. AWS explicitly discourages using add or delete in general.

Dynamo exposes two additional update types: ADD and DELETE. These allow you to specify relative changes without knowing the current value stored in Dynamo. One of the most common examples is a website view count: for a popular website the optimistic concurrency model will cause a lot of write contention and cap your throughput since each change requires a read, modify, save. If there's a conflict you'll need to do all three again, for each writer.

Instead of reading the value and using a conditional save, you can instead wrap the offset in a bloop.actions.add() and tell bloop to apply the desired change. Compare the two following:

# Option 1) conditional write, wrap in retries
website = Website("google.com")
engine.load(website)
website.views += 1
    # raises ConstraintViolation most of the time due to write contention
engine.save(website, condition=Website.views==(website.views-1))


# Option 2) add instead of set
website = Website("google.com")
website.views = bloop.actions.add(1)
    # no contention
engine.save(website)

When combined with return values above, we can add 1 and see the new value all in one call:

website = Website("google.com")
website.views = bloop.actions.add(1)
engine.save(website, sync=True)
print(f"views after save: {website.views}")

Note that bloop.actions.set() and bloop.actions.remove() are assumed if you don't set a column to an explicit action:

# both equivalent
website.views = 21
website.views = bloop.actions.set(21)

# all equivalent
website.views = None
del website.views
website.views = bloop.actions.remove(None)

Finally, the bloop.actions.add() action only supports Number and Set data types. In addition, add can only be used on top-level attributes, not nested attributes.

Meanwhile bloop.actions.delete() only supports the Set data type. It can also only be used on top-level attributes.

Delete

Delete has the same signature as save(). Both operations are mutations on an object that may or may not exist, and simply map to two different APIs (Delete calls DeleteItem). You can delete multiple objects at once; specify a condition; and use sync="old" to update local objects with their last values before deletion.

>>> from datetime import datetime, timedelta, timezone
>>> engine.delete(user, tweet)
>>> now = datetime.now(timezone.utc)
>>> cutoff = now - timedelta(years=2)
>>> engine.delete(
...     account,
...     condition=Account.last_login < cutoff)
>>> banned_account = Account(id="user@n0.dev")
>>> engine.delete(banned_account, sync="old")
>>> last_email = banned_account.email
>>> helpers.notify_acct_change(last_email, reason="spamming")

Load

Unlike most existing DynamoDB object mappers, Bloop does not create new instances when loading objects. This improves performance and allows you to use thick or thin models by minimizing how many times the constructor is invoked for effectively the same object (same hash/range keys).

Like save() and delete() above, Engine.load takes a variable number of objects to load from DynamoDB:

>>> user = User(id="some-id")
>>> tweet = Tweet(user="some-id", id="some-tweet")
>>> engine.load(user, tweet)

If consistent is True, then strongly consistent reads will be used:

>>> objs = user, tweet
>>> engine.load(*objs, consistent=True)

If any objects aren't loaded, Bloop raises MissingObjects:

>>> user = User(username="not-real")
>>> engine.load(user)
Traceback (most recent call last):
  ...
MissingObjects: Failed to load some objects.

You can access MissingObjects.objects to see which objects failed to load.

Query

This section defines a new model to demonstrate the various filtering and conditions available:

class Account(BaseModel):
    name = Column(String, hash_key=True)
    number = Column(Integer, range_key=True)
    created_on = Column(DateTime)
    balance = Column(Number)
    level = Column(Integer)

    by_level = GlobalSecondaryIndex(
        projection="all", hash_key=level)

    by_balance = LocalSecondaryIndex(
        projection={"created_on"}, range_key="balance")

engine = Engine()
engine.bind(Account)

All

Bloop's query and scan iterators are lazy, fetching only as many pages as needed to advance when you call next(). If you want to eagerly load all results, you can use all() to load all results into a single list. Note that calling all() will reset the query, and will return an empty list if there are no results.

>>> q = engine.query(Account,
...     key=Account.name == "numberoverzero")
>>> q.all()
[Account(name='numberoverzero', number=21623]
>>> q.exhausted
True
>>> q.all()
[Account(name='numberoverzero', number=21623]

First

Often, you'll only need a single result from the query; with the correct sorting and indexes, the first result can be used to get a maximum or minimum. Use first() to get the first result, if it exists. If there are no results, raises ConstraintViolation.

>>> q = engine.query(Account,
...     key=Account.name == "numberoverzero")
>>> q.first()
Account(name='numberoverzero', number=21623)

One

Similar to first(), you can get the unique result of a query with one(). If there are no results, or more than one result, raises ConstraintViolation.

>>> q = engine.query(Account,
...     key=Account.name == "numberoverzero")
>>> q.one()
Traceback (most recent call last):
    ...
ConstraintViolation: Query found more than one result.

Count

To get a count of items that match some query use the "count" projection.

>>> q = engine.query(
...         Account.by_email,
...         key=Account.email == "foo@bar.com",
...         projection="count")
>>> q.count
256

Both count and scanned are calculated only when the query is executed, so you must call QueryIterator.reset() to see changes take effect.

>>> new = Account(...)
>>> engine.save(new)
>>> q.count
256
>>> q.reset()
>>> q.count
257

Key Conditions

Queries can be performed against a Model or an Index. You must specify at least a hash key equality condition; a range key condition is optional.

>>> owned_by_stacy = Account.name == "Stacy"
>>> q = engine.query(Account, key=owned_by_stacy)
>>> for account in q:
...     print(account)
...

Here, the query uses the Index's range_key to narrow the range of accounts to find:

>>> owned_by_stacy = Account.name == "Stacy"
>>> at_least_one_mil = Account.balance >= 1000000
>>> q = engine.query(Account.by_balance,
...     key=owned_by_stacy & at_least_one_mil)
>>> for account in q:
...     print(account.balance)

Note

A query must always include an equality check == or is_ against the model or index's hash key. If you want to include a condition on the range key, it can be one of ==, <, <=, >, >=, between, begins_with.

See the KeyConditionExpression parameter of the Query operation in the Developer's Guide.

Filtering

If you provide a filter condition, DynamoDB only returns items that match the filter. Conditions can be on any column -- except the hash and range key being queried -- projected into the Index. All non-key columns are available for queries against a model. A filter condition can use any condition operations. Here is the same LSI query as above, but now excluding accounts created in the last 30 days:

>>> from datetime import datetime, timedelta, timezone
>>> now = datetime.now(timezone.utc)
>>> recent = now - timedelta(days=30)
>>> key_condition = owned_by_stacy & at_least_one_mil
>>> exclude_recent = Account.created_on < recent
>>> q = engine.query(Account.by_balance,
...     key=key_condition,
...     filter=exclude_recent)

Warning

Trying to use a column that's not part of an Index's projection will raise InvalidFilterCondition, since the value can't be loaded. This does not apply to queries against an LSI with strict=False, which will consume additional reads to apply the filter.

>>> q = engine.query(Account.by_balance,
...     key=key_condition,
...     filter=Account.level == 3)
Traceback (most recent call last):
  ...
InvalidFilterCondition: <Column[Account.level]> is not available for the projection.

Projections

By default, queries return all columns projected into the index or model. You can use the projection parameter to control which columns are returned for each object. This must be "all" to include everything in the index or model's projection, or a set of columns or column model names to include.

>>> q = engine.query(Account,
...     key=key_condition,
...     projection={"email", "balance"})
>>> account = q.first()
>>> account.email
'user@domain.com'
>>> account.balance
Decimal('3400')
>>> account.level
Traceback (most recent call last):
    ...
AttributeError: ...

Because the projection did not include Account.level, it was not loaded on the account object.

Configuration Options

The remaining options are consistent and forward. When consistent is True, strongly consistent reads are used. By default, consistent is False. Use forward to query ascending or descending. By default forward is True, or ascending.

Iterator State

The QueryIterator exposes a number of properties to inspect its current progress:

  • count -- the number of items loaded from DynamoDB so far, including buffered items.

  • exhausted -- True if there are no more results

  • scanned -- the number of items DynamoDB evaluated, before applying any filter condition.

To restart a query, use QueryIterator.reset():

>>> query = engine.query(...)
>>> unique = query.one()
>>> query.exhausted
True
>>> query.reset()
>>> query.exhausted
False
>>> same = query.one()
>>> unique == same  # Assume we implemented __eq__
True

Continuation Tokens

It is possible to record the state of an iterator and recreate that state in a separate thread or process using a continuation token. Use the token property to retrieve a continuation token describing the current state of the iterator. When recreating the iterator, pass the token to the QueryIterator.move_to() method to restore the previous state:

>>> query = engine.query(...)
>>> for _ in range(10):
...     next(query) # read the first ten records.
...
>>> token = query.token
>>> resumed = engine.query(...)
>>> resumed.move_to(token)
>>> for _ in range(10):
...     next(query) # read the next ten records.

Scan

Scan and Query share a very similar interface. Unlike Query, Scan does not have a key condition and can't be performed in descending order. Scans can be performed in parallel, however.

Using the same model from Query, we can scan the model or an index:

>>> for account in engine.scan(Account):
...     print(account.email)
...
>>> for account in engine.scan(Account.by_email):
...     print(account.email)

And get the first, or unique result:

>>> some_account = engine.scan(Account).first()
>>> one_account = engine.scan(Account).one()
Traceback (most recent call last):
    ...
ConstraintViolation: Scan found more than one result.

Use filter and projection to exclude items and control which columns are included in results:

>>> scan = engine.scan(Account,
...     filter=Account.email.contains("@"),
...     projection={"level", "email"})

And consistent to use strongly consistent reads:

>>> scan = engine.scan(Account.by_balance, consistent=True)

Parallel Scans

Scans can be performed in parallel, using the parallel parameter. To specify which segment you are constructing the scan for, pass a tuple of (Segment, TotalSegments):

>>> first_segment = engine.scan(Account, parallel=(0, 2))
>>> second_segment = engine.scan(Account, parallel=(1, 2))

You can easily construct a parallel scan with s segments by calling engine.scan in a loop:

def parallelize(s, engine, *args, **kwargs):
    for i in range(s):
        kwargs["parallel"] = (i, s)
        yield engine.scan(*args, **kargs)

workers = scan_workers(n=10)
scans = parallelize(10, engine, Account, filter=...)
for worker, scan in zip(threads, scans):
    worker.process(scan)

Transactions

Note

For a detailed guide to using transactions, see the Transactions section of the User Guide.

You can construct a read or write transaction by passing each mode:

>>> read_tx = engine.transaction(mode="r")
>>> write_tx = engine.transaction(mode="w")  # defaults to write

You can also use the transaction as a context manager:

>>> with engine.transaction() as tx:
...     tx.save(user, condition=User.id.is_(None))
...     tx.delete(tweet)
...     tx.check(meta, Metadata.verified.is_(True))
...
>>> # tx is committed or raises TransactionCanceled

To manually commit a transaction, call prepare() and commit():

>>> tx = engine.transaction(mode="r")
>>> tx.load(user, tweet)
>>> prepared = tx.prepare()
>>> prepared.commit()
>>> prepared.commit()  # subsequent commits on a ReadTransaction re-load the objects

Stream

Note

Before you can create a stream on a model, you need to enable it in the model's Meta. For a detailed guide to using streams, head over to the Streams section of the User Guide.

To start from the beginning or end of the stream, use "trim_horizon" and "latest":

>>> stream = engine.stream(User, position="trim_horizon")
>>> stream = engine.stream(Account, "latest")

Alternatively, you can use an existing stream token to reload its previous state:

>>> same_stream = engine.stream(
...     Impression, previous_stream.token)

Lastly, you can use a datetime. This is an expensive call, and walks the entire stream from the trim horizon until it finds the first record in each shard after the target datetime.

>>> from datetime import datetime, timedelta, timezone
>>> now = datetime.now(timezone.utc)
>>> yesterday = now - timedelta(hours=12)
>>> stream = engine.stream(User, yesterday)