Bloop: DynamoDB Modeling

DynamoDB's concurrency model is great, but using it correctly is tedious and unforgiving. Bloop manages that complexity for you.

Requires Python 3.6+

Features

  • Simple declarative modeling

  • Stream interface that makes sense

  • Easy transactions

  • Extensible type system, useful built-in types

  • Secure expression-based wire format

  • Expressive conditions

  • Model composition

  • Diff-based saves

  • Server-Side Encryption

  • Time-To-Live

  • Continuous Backups

  • On-Demand Billing

Ergonomics

The basics:

class Account(BaseModel):
    id = Column(UUID, hash_key=True)
    name = Column(String)
    email = Column(String)
    by_email = GlobalSecondaryIndex(
        projection='keys', hash_key='email')

engine.bind(Account)

some_account = Account(id=uuid.uuid4(), email='foo@bar.com')
engine.save(some_account)

q = engine.query(Account.by_email, key=Account.email == 'foo@bar.com')
same_account = q.one()

print(same_account.id)

Iterate over a stream:

template = "old: {old}\nnew: {new}\ndetails:{meta}"

stream = engine.stream(User, 'trim_horizon')
while True:
    record = next(stream)
    if not record:
        time.sleep(0.5)
        continue
    print(template.format(**record))

Use transactions:

with engine.transaction() as tx:
    tx.save(account)
    tx.delete(update_token, condition=Token.until <= now())

What's Next

Get started by installing Bloop, or check out a larger example.

Installation

pip install bloop

# or

git clone git://github.com/numberoverzero/bloop.git
cd bloop && python setup.py install

Quickstart

First define a model and create the backing table in DynamoDB:

>>> import uuid
>>> from bloop import (
...     BaseModel, Boolean, Column, Engine,
...     GlobalSecondaryIndex, String, UUID)
...
>>> class Account(BaseModel):
...     id = Column(UUID, hash_key=True)
...     name = Column(String)
...     email = Column(String)
...     by_email = GlobalSecondaryIndex(
...         projection='keys',
...         hash_key='email')
...     verified = Column(Boolean, default=False)
...
>>> engine = Engine()
>>> engine.bind(Account)

To create an instance and save it in DynamoDB:

>>> account = Account(
...     id=uuid.uuid4(),
...     name='username',
...     email='foo@bar.com')
...
>>> engine.save(account)

You can load the account by id, or query the GSI by email:

>>> same_account = Account(id=account.id)
>>> engine.load(same_account)
>>> q = engine.query(
...     Account.by_email,
...     key=Account.email == 'foo@bar.com')
...
>>> also_same_account = q.first()

Kick it up a notch with conditional operations:

# Only save if the account doesn't already exist
>>> if_not_exist = Account.id.is_(None)
>>> engine.save(account, condition=if_not_exist)

# Only update the account if the name hasn't changed
>>> account.email = 'new@email.com'
>>> engine.save(account, condition=Account.name == 'username')

# Only delete the account if the email hasn't changed since we last saved
>>> engine.delete(account, condition=Account.email == "new@email.com")

Or load the last state of an object before it was deleted:

>>> engine.delete(account, sync="old")
>>> print(f"last email was {account.email}")

Define Models

A Basic Definition

Every model inherits from BaseModel, and needs at least a hash key:

>>> from bloop import BaseModel, Column, UUID

>>> class User(BaseModel):
...     id = Column(UUID, hash_key=True)
...
>>> User
<Model[User]>
>>> User.id
<Column[User.id=hash]>

Let's add some columns, a range key, and a GSI:

>>> from bloop import (
...     BaseModel, Boolean, Column, DateTime,
...     GlobalSecondaryIndex, String, UUID)
...
>>> class User(BaseModel):
...     id = Column(UUID, hash_key=True)
...     version = Column(String, range_key=True)
...     email = Column(String)
...     created_on = Column(DateTime)
...     verified = Column(Boolean)
...     profile = Column(String)
...     by_email = GlobalSecondaryIndex(projection="keys", hash_key="email")
...
>>> User
<Model[User]>
>>> User.by_email
<GSI[User.by_email=keys]>

Then create the table in DynamoDB:

>>> from bloop import Engine
>>> engine = Engine()
>>> engine.bind(User)

Hint

Alternatively, we could have called engine.bind(BaseModel) to bind all non-abstract models that subclass BaseModel. If any model doesn't match its backing table, TableMismatch is raised.

Note

Models must be hashable. If you implement __eq__ without __hash__, Bloop will inject the first hash method it finds by walking the model's class.mro().

Creating Instances

The default __init__ takes **kwargs and applies them by each column's model name:

>>> import datetime, uuid
>>> now = datetime.datetime.now(datetime.timezone.utc)
>>> user = User(
...     id=uuid.uuid4(),
...     version="1",
...     email="user@domain.com",
...     created_at=now)
>>> user.email
'user@domain.com'
>>> user
User(created_on=datetime.datetime(2016, 10, 29, ...), ...)

A local object's hash and range keys don't need values until you're ready to interact with DynamoDB:

>>> user = User(email="u@d.com", version="1")
>>> engine.save(user)
MissingKey: User(email='u@d.com') is missing hash_key: 'id'
>>> user.id = uuid.uuid4()
>>> engine.save(user)

Metadata: Table Configuration

You can provide an inner Meta class to configure the model's DynamoDB table:

>>> class Tweet(BaseModel):
...     class Meta:
...         table_name = "custom-table-name"
...         read_units = 200
...     user = Column(Integer, hash_key=True)
...
>>> Tweet.Meta.read_units
200
>>> Tweet.Meta.keys
{<Column[Tweet.user=hash]}
>>> Tweet.Meta.indexes
set()

Table configuration defaults are:

class Meta:
    abstract = False
    table_name = __name__  # model class name
    read_units = None  # uses DynamoDB value, or 1 for new tables
    write_units = None  # uses DynamoDB value, or 1 for new tables
    stream = None
    ttl = None
    encryption = None
    backups = None
abstract

If abstract is true, no backing table will be created in DynamoDB. Instances of abstract models can't be saved or loaded. You can use abstract models, or even plain classes with Columns and Indexes, as mixins. Derived models never copy their parents' Meta value. For more information, see the Inheritance and Mixins section.

table_name

The default table_name is simply the model's __name__. This property is useful for mapping a model to an existing table, or mapping multiple models to the same table:

class Employee(BaseModel):
    class Meta:
        table_name = "employees-uk"
    ...

Changed in version 2.0.0: Engines can customize table names using table_name_template. This does not change the value of Meta.table_name. For example, the template "dev-{table_name}" would cause the Employee model above to use the table "dev-employees-uk"

read_units, write_units

Default read_units and write_units are None. These do not include provisioned throughput for any GlobalSecondaryIndex, which has its own read and write units.

If you do not specify the read or write units of a table or GSI, the existing values in DynamoDB are used. When the table or GSI does not exist, they fall back to 1.

Changed in version 1.2.0: Previously, read_units and write_units defaulted to 1. This was inconvenient when throughput is controlled by an external script, and totally broken with the new auto-scaling features.

backups

You can use backups to enable Continuous Backups and Point-in-Time Recovery. By default continuous backups are not enabled, and this is None. To enable continuous backups, use:

class Meta:
    backups = {
        "enabled": True
    }
billing

You can use billing to enable On-Demand Billing or explicitly require provisioned throughput. By default billing is None.

If you do not specify the billing mode, the existing configuration in DynamoDB is used. When the table does not exist and billing mode is None, the table is created using provisioned throughput.

class Meta:
    billing = {
        "mode": "on_demand"
    }

class Meta:
    billing = {
        "mode": "provisioned"  # if not specified, provisioned billing is used for new tables
    }
encryption

You can use encryption to enable Server-Side Encryption. By default encryption is not enabled, and this is None. To enable server-side encryption, use:

class Meta:
    encryption = {
        "enabled": True
    }
stream

You can use stream to enable DynamoDBStreams on the table. By default streaming is not enabled, and this is None. To enable a stream with both new and old images, use:

class Meta:
    stream = {
        "include": ["new", "old"]
    }

See the Streams section of the user guide to get started. Streams are awesome.

ttl

You can use ttl to enable the TTL feature on the table. By default a TTL attribute is not set, and this is None. To enable a ttl on the attribute "delete_after", use:

class Meta:
    ttl = {
        "column": "delete_after"
    }

The Column.typedef of the ttl column must be Number and per the DynamoDB documents, must represent the deletion time as number of seconds since the epoch. The Timestamp type is provided for your convenience, and is used as a class:datetime.datetime:

class TemporaryPaste(BaseModel):
    id = Column(UUID, hash_key=True)
    private = Column(Boolean)
    delete_after = Column(Timestamp)

    class Meta:
        ttl = {"column": "delete_after"}

Like DateTime, bloop.ext exposes drop-in replacements for Timestamp for each of three popular python datetime libraries: arrow, delorean, and pendulum.

Metadata: Model Introspection

When a new model is created, a number of attributes are computed and stored in Meta. These can be used to generalize conditions for any model, or find columns by their name in DynamoDB.

These top-level properties can be used to describe the model in broad terms:

  • model -- The model this Meta is attached to

  • columns -- The set of all columns in the model

  • columns_by_name -- Dictionary of model Column objects by their name attribute.

  • keys -- The set of all table keys in the model (hash key, or hash and range keys)

  • indexes -- The set of all indexes (gsis, lsis) in the model

Additional properties break down the broad categories, such as splitting indexes into gsis and lsis:

  • hash_key -- The table hash key

  • range_key -- The table range key or None

  • gsis -- The set of all GlobalSecondaryIndex in the model

  • lsis -- The set of all LocalSecondaryIndex in the model

  • projection A pseudo-projection for the table, providing API parity with an Index

Here's the User model we just defined:

>>> User.Meta.hash_key
<Column[User.id=hash]>
>>> User.Meta.gsis
{<GSI[User.by_email=keys]>}
>>> User.Meta.keys
{<Column[User.version=range]>,
 <Column[User.id=hash]>}
>>> User.Meta.columns
{<Column[User.created_on]>,
 <Column[User.profile]>,
 <Column[User.verified]>,
 <Column[User.id=hash]>,
 <Column[User.version=range]>,
 <Column[User.email]>}

Metadata: Using Generic Models

A common pattern involves saving an item only if it doesn't exist. Instead of creating a specific condition for every model, we can use Meta.keys to make a function for any model:

from bloop import Condition

def if_not_exist(obj):
    condition = Condition()
    for key in obj.Meta.keys:
        condition &= key.is_(None)
    return condition

Now, saving only when an object doesn't exist is as simple as:

engine.save(some_obj, condition=if_not_exist(some_obj))

(This is also available in the patterns section of the user guide)

Columns

Every Column must have a Type that is used to load and dump values to and from DynamoDB. The typedef argument can be a type class, or a type instance. When you provide a class, the Column will create an instance by calling the constructor without args. This is a convenience for common types that do not require much configuration. The following are functionally equivalent:

Column(Integer)
Column(Integer())

Some types require an argument, such as Set. Sets must have an inner type so they can map to a string set, number set, or binary set. For example:

# FAILS: Set must have a type
Column(Set)

# GOOD: Set will instantiate the inner type
Column(Set(Integer))
Column(Set(Integer()))

To make a column the model's hash or range key, use hash_key=True or range_key=True. The usual rules apply: a column can't be both, there can't be more than one of each, and there must be a hash key.

class Impression(BaseModel):
    referrer = Column(String, hash_key=True)
    version = Column(Integer, range_key=True)

By default values will be stored in DynamoDB under the name of the column in the model definition (its name). If you want to conserve read and write units, you can use shorter names for attributes in DynamoDB (attribute names are counted against your provisioned throughput). Like the table_name in Meta, the optional dynamo_name parameter lets you use descriptive model names without binding you to those names in DynamoDB. This is also convenient when mapping an existing table, or multi-model tables where an attribute can be interpreted multiple ways.

The following model is identical to the one just defined, except that each attribute is stored using a short name:

class Impression(BaseModel):
    referrer = Column(String, hash_key=True, dynamo_name="ref")
    version = Column(Integer, range_key=True, dynamo_name="v")

Locally, the model names "referrer" and "version" are still used. An instance would be constructed as usual:

>>> click = Impression(
...     referrer="google.com",
...     version=get_current_version())
>>> engine.save(click)
Default Values

You can provide a default value or a no-arg function that returns a default value when specifying a Column:

class User(BaseModel):
    id = Column(UUID)
    verified = Column(Boolean, default=False)
    created = Column(DateTime, default=lambda: datetime.datetime.now())

Defaults are only applied when new instances are created locally by the default BaseModel.__init__ method. When new instances are created as part of a Query, Scan, or iterating a Stream, defaults are not applied. This is because a projection query may not include an existing value; applying the default would locally overwrite the previous value in DynamoDB.

import datetime

def two_days_later():
    offset = datetime.timedelta(days=2)
    now = datetime.datetime.now()
    return now + offset


class TemporaryPaste(BaseModel):
    class Meta:
        ttl = {"column": "delete_after"}

    id = Column(UUID, hash_key=True, default=uuid.uuid4)
    delete_after = Column(Timestamp, default=two_days_later)
    verified = Column(Boolean, default=False)
    views = Column(Integer, default=1)

Like default function arguments in python, the provided value is not copied but used directly. For example, a default value of [1, 2, 3] will use the same list object on each new instance of the model. If you want a copy of a mutable value, you should wrap it in a lambda: lambda: [1, 2, 3].

If you don't want to set a default value, you can return the special sentinel bloop.missing from your function:

import datetime
import random
from bloop import missing

specials = [
    "one free latte",
    "50% off chai for a month",
    "free drip coffee for a year",
]

offer_ends = datetime.datetime.now() + datetime.timedelta(hours=8)


def limited_time_offer():
    now = datetime.datetime.now()
    if now < offer_ends:
        return random.choice(specials)
    return missing


class User(BaseModel):
    id = Column(UUID, hash_key=True)
    active_coupon = Column(String, default=limited_time_offer)

In this example, a random special is applied to new users for the next 8 hours. Afterwards, the limited_time_offer function will return bloop.missing and the user won't have an active coupon.

Returning bloop.missing tells Bloop not to set the value, which is different than setting the value to None. An explicit None will clear any existing value on save, while not setting it leaves the value as-is.

Indexes

Indexes provide additional ways to query and scan your data. If you have not used indexes, you should first read the Developer's Guide on Improving Data Access with Secondary Indexes.

A single GSI or LSI can be used by two models with different projections, so long as the projections that each model expects are a subset of the actual projection. This can be a useful way to restrict which columns are loaded by eg. a partially hydrated version of a model, while the table's underlying index still provides access to all attributes.

GlobalSecondaryIndex

Every GlobalSecondaryIndex must declare a projection, which describes the columns projected into the index. Only projected columns are loaded from queries and scans on the index, and non-projected columns can't be used in filter expressions. A projection can be "all" for all columns in the model; "keys" for the hash and range columns of the model and the index; or a set of Column objects or their model names. If you specify a set of columns, key columns will always be included.

class HeavilyIndexed(BaseModel):
    ...
    by_email = GlobalSecondaryIndex("all", hash_key="email")
    by_username = GlobalSecondaryIndex("keys", hash_key="username")
    by_create_date = GlobalSecondaryIndex(
        {"email", "username"}, hash_key="created_on")

A GlobalSecondaryIndex must have a hash_key, and can optionally have a range_key. This can either be the name of a column, or the column object itself:

class Impression(BaseModel):
    id = Column(UUID, hash_key=True)
    referrer = Column(String)
    version = Column(Integer)
    created_on = Column(DateTime)

    by_referrer = GlobalSecondaryIndex("all", hash_key=referrer)
    by_version = GlobalSecondaryIndex("keys", hash_key="version")

Unlike LocalSecondaryIndex, a GSI does not share its throughput with the table. You can specify the read_units and write_units of the GSI. If you don't specify the throughput and the GSI already exists, the values will be read from DynamoDB. If the table doesn't exist, the GSI's read and write units will instead default to 1.

GlobalSecondaryIndex("all", hash_key=version, read_units=500, write_units=20)

As with Column you can provide a dynamo_name for the GSI in DynamoDB. This can be used to map to an existing index while still using a pythonic model name locally:

class Impression(BaseModel):
    ...
    by_email = GlobalSecondaryIndex("keys", hash_key=email, dynamo_name="index_email")

See also

Global Secondary Indexes in the DynamoDB Developer Guide

LocalSecondaryIndex

LocalSecondaryIndex is similar to GlobalSecondaryIndex in its use, but has different requirements. LSIs always have the same hash key as the model, and it can't be changed. The model must have a range key, and the LSI must specify a range_key:

LocalSecondaryIndex("all", range_key=created_on)

You can specify a name to use in DynamoDB, just like Column and GSI:

class Impression(BaseModel):
    url = Column(String, hash_key=True)
    user_agent = Column(String, range_key=True, dynamo_name="ua")
    visited_at = Column(DateTime, dynamo_name="at")

    by_date = LocalSecondaryIndex(
    "keys", range_key=visited_at, dynamo_name="index_date")

The final optional parameter is strict, which defaults to True. This controls whether DynamoDB may incur additional reads on the table when querying the LSI for columns outside the projection. Bloop enforces this by evaluating the key, filter, and projection conditions against the index's allowed columns and raises an exception if it finds any non-projected columns.

It is recommended that you leave strict=True, to prevent accidentally consuming twice as many read units with an errant projection or filter condition. Since this is local to Bloop and not part of the index definition in DynamoDB, you can always disable and re-enable it in the future.

See also

Local Secondary Indexes in the DynamoDB Developer Guide

Inheritance and Mixins

Your models will often have identical constructs, especially when sharing a table. Rather than define these repeatedly in each model, Bloop provides the ability to derive Columns and Indexes from base classes. Consider a set of models that each has a UUID and sorts on a DateTime:

class HashRangeBase(BaseModel):
    id = Column(UUID, hash_key=True, dynamo_name="i")
    date = Column(DateTime, range_key=True, dynamo_name="d")

    class Meta:
        abstract = True


class User(HashRangeBase):
    pass


class Upload(HashRangeBase):
    class Meta:
        write_units = 50
        read_units = 10

Subclassing BaseModel is optional, and provides early validation against missing columns/indexes. Mixins do not need to be specified in any particular order:

class IndexedEmail:
    by_email = GlobalSecondaryIndex(projection="keys", hash_key="email")


class WithEmail:
    email = Column(String)


class User(BaseModel, IndexedEmail, WithEmail):
    id = Column(Integer, hash_key=True)


assert User.by_email.hash_key is User.email  # True
assert User.email is not WithEmail.email  # True

Even though the by_email Index requires the email Column to exist, it is first in the User's bases.

Modify Derived Columns

Bloop uses the __copy__ method to create shallow copies of the base Columns and Indexes. You can override this to modify derived Columns and Indexes:

class MyColumn(Column):
    def __copy__(self):
        copy = super().__copy__()
        copy.derived = True


class WithEmail:
    email = MyColumn(String)


class User(BaseModel, WithEmail):
    id = Column(String, hash_key=True)


assert User.email.derived  # True
assert not hasattr(WithEmail.email, "derived")  # True
Conflicting Derived Values

A model cannot derive from two base models or mixins that define the same column or index, or that have an overlapping dynamo_name. Consider the following mixins:

class Id:
    id = Column(String)

class AlsoId:
    id = Column(String, dynamo_name="shared-id")

class AnotherId:
    some_id = Column(String, dynamo_name="shared-id")

Each of the following are invalid, and will fail:

# Id, AlsoId have the same column name "id"
class Invalid(BaseModel, Id, AlsoId):
    hash = Column(String, hash_key=True)

# AlsoId, AnotherId have same column dynamo_name "shared-id"
class AlsoInvalid(BaseModel, AlsoId, AnotherId):
    hash = Column(String, hash_key=True)

For simplicity, Bloop also disallows subclassing more than one model or mixin that defines a hash key, a range key, or an Index (either by name or dynamo_name).

However, a derived class may always overwrite an inherited column or index. The following is valid:

class SharedIds:
    hash = Column(String, hash_key=True)
    range = Column(Integer, range_key=True)


class CustomHash(BaseModel, SharedIds):
    hash = Column(Integer, hash_key=True)


assert CustomHash.hash.typedef is Integer  # True
assert SharedIds.hash.typedef is String  # True  # mixin column is unchanged
assert CustomHash.range.typedef is Integer  # Still inherited

This also allows you to hide or omit a derived column:

class SharedColumns:
    foo = Column(String)
    bar = Column(String)


class MyModel(BaseModel, SharedColumns):
    id = Column(Integer, hash_key=True)

    foo = None


assert MyModel.foo is None  # True
assert MyModel.bar.typedef is String  # True
assert {MyModel.id, MyModel.bar} == MyModel.Meta.columns  # True

Using the Engine

The Engine is the main way you'll interact with DynamoDB (and DynamoDBStreams). Once you've defined some models, you're ready to start loading, saving and querying.

Attention

This section uses the same User model from the previous section. If you've haven't already done so, go back and set that up.

Configuration

Engines expose a small number of configuration options. On __init__, there are three optional kwargs:

  • dynamodb, a DynamoDB client defaulting to boto3.client("dynamodb")

  • dynamodbstreams, a DynamoDBStreams client defaulting to boto3.client("dynamodbstreams")

  • table_name_template, a format string containing "{table_name}" or a function that takes a model and returns a table name for the engine.

You will rarely need to modify the first two, except when you are constructing multiple engines (eg. cross-region replication) or connecting to DynamoDBLocal. For examples of both, see Bloop Patterns.

Most of the time, you will use table_name_template to inject configuration into your model/table bindings. For example, the following will prefix every table name with "dev-" for local development:

engine = Engine(table_name_template="dev-{table_name}")

Meanwhile, the following function will suffix the table name with a random int:

def with_nonce(model):
    return f"{model.Meta.table_name}-{random.randint(0, 10)}"

engine = Engine(table_name_template=with_nonce)

Bind

As noted in the previous section, every model must first be bound to a backing table with Engine.bind before we can interact with instances in DynamoDB.

Note

Starting with 1.1.0, the skip_table_setup parameter is available to bypass the create/verify calls to DynamoDB. This is not recommended except in situations where models are bound frequently, ie. a high-volume Lambda function. See Issue #83.

When an engine binds a model, it also binds all non-abstract subclasses. This means you can bind all models in one call, centralizing any error handling or table correction. For example, you may have specialized models for users, notifications, and impressions. Each of these can be grouped with an abstract base, and then all specialized models created at once:

class BaseUser(BaseModel):
    class Meta:
        abstract = True

class BaseNotification(BaseModel):
    class Meta:
        abstract = True

...

class Admin(BaseUser):
    ...

class Moderator(BaseUser):
    ...

class PriorityNotification(BaseNotification):
    ...

class EmailNotification(BaseNotification):
    ...


try:
    engine.bind(BaseUser)
except TableMismatch:
    print("Failed to bind all user models")

try:
    engine.bind(BaseNotification)
except TableMismatch:
    print("Failed to bind all notification models")

Now you can import a single base (BaseModel or a subclass) from your models.py module and automatically bind any dynamic models created from that base.

Save

Save is performed with UpdateItem since absolute overwrites (such as PutItem) are rarely desired in a distributed, optimistic concurrency system. This is the central decision that enables a table to back multiple models. A partial save allows a model to update an item in the table without accidentally clearing the columns that model doesn't know about.

Saving an item or items is very simple:

>>> from datetime import datetime, timezone
>>> now = datetime.now(timezone.utc)
>>> user = User(...)
>>> engine.save(user)
>>> tweet = Tweet(...)
>>> user.last_activity = now
>>> engine.save(user, tweet)
Save Conditions

You can perform optimistic saves with a condition. If a condition is not met when DynamoDB tries to apply the update, the update fails and bloop immediately raises ConstraintViolation. Conditions are specified on columns using the standard <, >=, ==, ... operators, as well as begins_with, between, contains, in_. Conditions can be chained together and combined with bitwise operators &, |, ~:

>>> user = User(username="numberoverzero")
>>> username_available = User.username.is_(None)
>>> engine.save(user, condition=username_available)
# Success
>>> engine.save(user, condition=username_available)
Traceback (most recent call last):
  ...
ConstraintViolation: The condition was not met.
Return Values

You can optionally specify sync="old" or sync="new" to update the saved objects with the last seen or most recent values when the save completes. This saves a read unit and is strongly consistent, and can be useful to eg. read the last value before you overwrote an attr or fetch attributes you didn't modify:

>>> user = User(username="n0", email="user@n0.dev")
>>> engine.save(user, sync="new")
>>> if not user.verified:
...     helpers.send_verification_reminder(user.email, since=user.created_on)

In a highly concurrent environment the sync="old" option is very useful to capture the last value a field held before overwriting; then you can safely clean up any cascading references. For example, if you store an s3 object key that points to the latest revision of some document you might model it as follows:

class Document(BaseModel):
    name = Column(String, hash_key=True)
    location = Column(String)

The following could cause dangling objects if two updates occur at the same time:

def wrong_update(name, new_location):
    doc = Document(name=name)
    engine.load(doc)
    if doc.location != new_location:
        delete_s3_object(doc.location)
    doc.location = new_location
    engine.save(doc)

Instead, you should read the previous values when you perform the write, and then clean up the location:

def correct_update(name, new_location):
    doc = Document(name=name, location=new_location)
    engine.save(doc, sync="old")
    if doc.location != new_location:
        delete_s3_object(doc.location)
Actions

Most changes you make to modeled objects fall into two update categories: SET and REMOVE. Any time a value serializes as None or you call del myobj.some_attr it will likely be a remove, while myobj.attr = value will be a set. (This is up to the column's type, so you can override this behavior to use your own sentinel values).

Warning

As mentioned in Issue #136 and the DynamoDb Developer Guide, an atomic counter is not appropriate unless you can tolerate overcounting or undercounting. AWS explicitly discourages using add or delete in general.

Dynamo exposes two additional update types: ADD and DELETE. These allow you to specify relative changes without knowing the current value stored in Dynamo. One of the most common examples is a website view count: for a popular website the optimistic concurrency model will cause a lot of write contention and cap your throughput since each change requires a read, modify, save. If there's a conflict you'll need to do all three again, for each writer.

Instead of reading the value and using a conditional save, you can instead wrap the offset in a bloop.actions.add() and tell bloop to apply the desired change. Compare the two following:

# Option 1) conditional write, wrap in retries
website = Website("google.com")
engine.load(website)
website.views += 1
    # raises ConstraintViolation most of the time due to write contention
engine.save(website, condition=Website.views==(website.views-1))


# Option 2) add instead of set
website = Website("google.com")
website.views = bloop.actions.add(1)
    # no contention
engine.save(website)

When combined with return values above, we can add 1 and see the new value all in one call:

website = Website("google.com")
website.views = bloop.actions.add(1)
engine.save(website, sync=True)
print(f"views after save: {website.views}")

Note that bloop.actions.set() and bloop.actions.remove() are assumed if you don't set a column to an explicit action:

# both equivalent
website.views = 21
website.views = bloop.actions.set(21)

# all equivalent
website.views = None
del website.views
website.views = bloop.actions.remove(None)

Finally, the bloop.actions.add() action only supports Number and Set data types. In addition, add can only be used on top-level attributes, not nested attributes.

Meanwhile bloop.actions.delete() only supports the Set data type. It can also only be used on top-level attributes.

Delete

Delete has the same signature as save(). Both operations are mutations on an object that may or may not exist, and simply map to two different APIs (Delete calls DeleteItem). You can delete multiple objects at once; specify a condition; and use sync="old" to update local objects with their last values before deletion.

>>> from datetime import datetime, timedelta, timezone
>>> engine.delete(user, tweet)
>>> now = datetime.now(timezone.utc)
>>> cutoff = now - timedelta(years=2)
>>> engine.delete(
...     account,
...     condition=Account.last_login < cutoff)
>>> banned_account = Account(id="user@n0.dev")
>>> engine.delete(banned_account, sync="old")
>>> last_email = banned_account.email
>>> helpers.notify_acct_change(last_email, reason="spamming")

Load

Unlike most existing DynamoDB object mappers, Bloop does not create new instances when loading objects. This improves performance and allows you to use thick or thin models by minimizing how many times the constructor is invoked for effectively the same object (same hash/range keys).

Like save() and delete() above, Engine.load takes a variable number of objects to load from DynamoDB:

>>> user = User(id="some-id")
>>> tweet = Tweet(user="some-id", id="some-tweet")
>>> engine.load(user, tweet)

If consistent is True, then strongly consistent reads will be used:

>>> objs = user, tweet
>>> engine.load(*objs, consistent=True)

If any objects aren't loaded, Bloop raises MissingObjects:

>>> user = User(username="not-real")
>>> engine.load(user)
Traceback (most recent call last):
  ...
MissingObjects: Failed to load some objects.

You can access MissingObjects.objects to see which objects failed to load.

Query

This section defines a new model to demonstrate the various filtering and conditions available:

class Account(BaseModel):
    name = Column(String, hash_key=True)
    number = Column(Integer, range_key=True)
    created_on = Column(DateTime)
    balance = Column(Number)
    level = Column(Integer)

    by_level = GlobalSecondaryIndex(
        projection="all", hash_key=level)

    by_balance = LocalSecondaryIndex(
        projection={"created_on"}, range_key="balance")

engine = Engine()
engine.bind(Account)
All

Bloop's query and scan iterators are lazy, fetching only as many pages as needed to advance when you call next(). If you want to eagerly load all results, you can use all() to load all results into a single list. Note that calling all() will reset the query, and will return an empty list if there are no results.

>>> q = engine.query(Account,
...     key=Account.name == "numberoverzero")
>>> q.all()
[Account(name='numberoverzero', number=21623]
>>> q.exhausted
True
>>> q.all()
[Account(name='numberoverzero', number=21623]
First

Often, you'll only need a single result from the query; with the correct sorting and indexes, the first result can be used to get a maximum or minimum. Use first() to get the first result, if it exists. If there are no results, raises ConstraintViolation.

>>> q = engine.query(Account,
...     key=Account.name == "numberoverzero")
>>> q.first()
Account(name='numberoverzero', number=21623)
One

Similar to first(), you can get the unique result of a query with one(). If there are no results, or more than one result, raises ConstraintViolation.

>>> q = engine.query(Account,
...     key=Account.name == "numberoverzero")
>>> q.one()
Traceback (most recent call last):
    ...
ConstraintViolation: Query found more than one result.
Count

To get a count of items that match some query use the "count" projection.

>>> q = engine.query(
...         Account.by_email,
...         key=Account.email == "foo@bar.com",
...         projection="count")
>>> q.count
256

Both count and scanned are calculated only when the query is executed, so you must call QueryIterator.reset() to see changes take effect.

>>> new = Account(...)
>>> engine.save(new)
>>> q.count
256
>>> q.reset()
>>> q.count
257
Key Conditions

Queries can be performed against a Model or an Index. You must specify at least a hash key equality condition; a range key condition is optional.

>>> owned_by_stacy = Account.name == "Stacy"
>>> q = engine.query(Account, key=owned_by_stacy)
>>> for account in q:
...     print(account)
...

Here, the query uses the Index's range_key to narrow the range of accounts to find:

>>> owned_by_stacy = Account.name == "Stacy"
>>> at_least_one_mil = Account.balance >= 1000000
>>> q = engine.query(Account.by_balance,
...     key=owned_by_stacy & at_least_one_mil)
>>> for account in q:
...     print(account.balance)

Note

A query must always include an equality check == or is_ against the model or index's hash key. If you want to include a condition on the range key, it can be one of ==, <, <=, >, >=, between, begins_with.

See the KeyConditionExpression parameter of the Query operation in the Developer's Guide.

Filtering

If you provide a filter condition, DynamoDB only returns items that match the filter. Conditions can be on any column -- except the hash and range key being queried -- projected into the Index. All non-key columns are available for queries against a model. A filter condition can use any condition operations. Here is the same LSI query as above, but now excluding accounts created in the last 30 days:

>>> from datetime import datetime, timedelta, timezone
>>> now = datetime.now(timezone.utc)
>>> recent = now - timedelta(days=30)
>>> key_condition = owned_by_stacy & at_least_one_mil
>>> exclude_recent = Account.created_on < recent
>>> q = engine.query(Account.by_balance,
...     key=key_condition,
...     filter=exclude_recent)

Warning

Trying to use a column that's not part of an Index's projection will raise InvalidFilterCondition, since the value can't be loaded. This does not apply to queries against an LSI with strict=False, which will consume additional reads to apply the filter.

>>> q = engine.query(Account.by_balance,
...     key=key_condition,
...     filter=Account.level == 3)
Traceback (most recent call last):
  ...
InvalidFilterCondition: <Column[Account.level]> is not available for the projection.
Projections

By default, queries return all columns projected into the index or model. You can use the projection parameter to control which columns are returned for each object. This must be "all" to include everything in the index or model's projection, or a set of columns or column model names to include.

>>> q = engine.query(Account,
...     key=key_condition,
...     projection={"email", "balance"})
>>> account = q.first()
>>> account.email
'user@domain.com'
>>> account.balance
Decimal('3400')
>>> account.level
Traceback (most recent call last):
    ...
AttributeError: ...

Because the projection did not include Account.level, it was not loaded on the account object.

Configuration Options

The remaining options are consistent and forward. When consistent is True, strongly consistent reads are used. By default, consistent is False. Use forward to query ascending or descending. By default forward is True, or ascending.

Iterator State

The QueryIterator exposes a number of properties to inspect its current progress:

  • count -- the number of items loaded from DynamoDB so far, including buffered items.

  • exhausted -- True if there are no more results

  • scanned -- the number of items DynamoDB evaluated, before applying any filter condition.

To restart a query, use QueryIterator.reset():

>>> query = engine.query(...)
>>> unique = query.one()
>>> query.exhausted
True
>>> query.reset()
>>> query.exhausted
False
>>> same = query.one()
>>> unique == same  # Assume we implemented __eq__
True
Continuation Tokens

It is possible to record the state of an iterator and recreate that state in a separate thread or process using a continuation token. Use the token property to retrieve a continuation token describing the current state of the iterator. When recreating the iterator, pass the token to the QueryIterator.move_to() method to restore the previous state:

>>> query = engine.query(...)
>>> for _ in range(10):
...     next(query) # read the first ten records.
...
>>> token = query.token
>>> resumed = engine.query(...)
>>> resumed.move_to(token)
>>> for _ in range(10):
...     next(query) # read the next ten records.

Scan

Scan and Query share a very similar interface. Unlike Query, Scan does not have a key condition and can't be performed in descending order. Scans can be performed in parallel, however.

Using the same model from Query, we can scan the model or an index:

>>> for account in engine.scan(Account):
...     print(account.email)
...
>>> for account in engine.scan(Account.by_email):
...     print(account.email)

And get the first, or unique result:

>>> some_account = engine.scan(Account).first()
>>> one_account = engine.scan(Account).one()
Traceback (most recent call last):
    ...
ConstraintViolation: Scan found more than one result.

Use filter and projection to exclude items and control which columns are included in results:

>>> scan = engine.scan(Account,
...     filter=Account.email.contains("@"),
...     projection={"level", "email"})

And consistent to use strongly consistent reads:

>>> scan = engine.scan(Account.by_balance, consistent=True)
Parallel Scans

Scans can be performed in parallel, using the parallel parameter. To specify which segment you are constructing the scan for, pass a tuple of (Segment, TotalSegments):

>>> first_segment = engine.scan(Account, parallel=(0, 2))
>>> second_segment = engine.scan(Account, parallel=(1, 2))

You can easily construct a parallel scan with s segments by calling engine.scan in a loop:

def parallelize(s, engine, *args, **kwargs):
    for i in range(s):
        kwargs["parallel"] = (i, s)
        yield engine.scan(*args, **kargs)

workers = scan_workers(n=10)
scans = parallelize(10, engine, Account, filter=...)
for worker, scan in zip(threads, scans):
    worker.process(scan)

Transactions

Note

For a detailed guide to using transactions, see the Transactions section of the User Guide.

You can construct a read or write transaction by passing each mode:

>>> read_tx = engine.transaction(mode="r")
>>> write_tx = engine.transaction(mode="w")  # defaults to write

You can also use the transaction as a context manager:

>>> with engine.transaction() as tx:
...     tx.save(user, condition=User.id.is_(None))
...     tx.delete(tweet)
...     tx.check(meta, Metadata.verified.is_(True))
...
>>> # tx is committed or raises TransactionCanceled

To manually commit a transaction, call prepare() and commit():

>>> tx = engine.transaction(mode="r")
>>> tx.load(user, tweet)
>>> prepared = tx.prepare()
>>> prepared.commit()
>>> prepared.commit()  # subsequent commits on a ReadTransaction re-load the objects

Stream

Note

Before you can create a stream on a model, you need to enable it in the model's Meta. For a detailed guide to using streams, head over to the Streams section of the User Guide.

To start from the beginning or end of the stream, use "trim_horizon" and "latest":

>>> stream = engine.stream(User, position="trim_horizon")
>>> stream = engine.stream(Account, "latest")

Alternatively, you can use an existing stream token to reload its previous state:

>>> same_stream = engine.stream(
...     Impression, previous_stream.token)

Lastly, you can use a datetime. This is an expensive call, and walks the entire stream from the trim horizon until it finds the first record in each shard after the target datetime.

>>> from datetime import datetime, timedelta, timezone
>>> now = datetime.now(timezone.utc)
>>> yesterday = now - timedelta(hours=12)
>>> stream = engine.stream(User, yesterday)

Transactions

Bloop supports reading and updating items in transactions similar to the way you already load, save, and delete items using an engine. A single read or write transaction can have at most 10 items.

To create a new transaction, call Engine.transaction(mode="w") and specify a mode:

wx = engine.transaction(mode="w")
rx = engine.transaction(mode="r")

When used as a context manager the transaction will call commit() on exit if no exception occurs:

# mode defaults to "w"
with engine.transaction() as tx:
    tx.save(some_obj)
    tx.delete(other_obj)


# read transaction loads all objects at once
user = User(id="numberoverzero")
meta = Metadata(id=to_load.id)
with engine.transaction(mode="r") as tx:
    tx.load(user, meta)

You may also call prepare() and commit() yourself:

import bloop

tx = engine.transaction()
tx.save(some_obj)
p = tx.prepare()
try:
    p.commit()
except bloop.TransactionCanceled:
    print("failed to commit")

See TransactionCanceled for the conditions that can cause each type of transaction to fail.

Write Transactions

A write transaction can save and delete items, and specify additional conditions on objects not being modified.

As with Engine.save and Engine.delete you can provide multiple objects to each WriteTransaction.save() or WriteTransaction.delete() call:

with engine.transaction() as tx:
    tx.delete(*old_tweets)
    tx.save(new_user, new_tweet)
Item Conditions

You can specify a condition with each save or delete call:

with engine.transaction() as tx:
    tx.delete(auth_token, condition=Token.last_used <= now())
Transaction Conditions

In addition to specifying conditions on the objects being modified, you can also specify a condition for the transaction on an object that won't be modified. This can be useful if you want to check another table without changing its value:

user_meta = Metadata(id="numberoverzero")

with engine.transaction() as tx:
    tx.save(new_tweet)
    tx.check(user_meta, condition=Metadata.verified.is_(True))

In the above example the transaction doesn't modify the user metadata. If we want to modify that object we should instead use a condition on the object being modified:

user_meta = Metadata(id="numberoverzero")
engine.load(user_meta)
user_meta.tweets += 1

with engine.transaction() as tx:
    tx.save(new_tweet)
    tx.save(user_meta, condition=Metadata.tweets <= 500)
Idempotency

Bloop automatically generates timestamped unique tokens (tx_id and first_commit_at) to guard against committing a write transaction twice or accidentally committing a transaction that was prepared a long time ago. While these are generated for both read and write commits, only TransactWriteItems respects the "ClientRequestToken" stored in tx_id.

When the first_commit_at value is too old, committing will raise TransactionTokenExpired.

Read Transactions

By default engine.transaction(mode="w") will create a WriteTransaction. To create a ReadTransaction pass mode="r":

with engine.transaction(mode="r") as rx:
    rx.load(user, tweet)
    rx.load(meta)

All objects in the read transaction will be loaded at the same time, when commit() is called or the transaction context closes.

Multiple Commits

Every time you call commit on the prepared transaction, the objects will be loaded again:

rx = engine.transaction(mode="r")
rx.load(user, tweet)
prepared = rx.prepare()

prepared.commit()  # first load
prepared.commit()  # second load
Missing Objects

As with Engine.load if any objects in the transaction are missing when commit is called, bloop will raise MissingObjects with the list of objects that were not found:

import bloop

engine = bloop.Engine()
...


def tx_load(*objs):
    with engine.transaction(mode="r") as rx:
        rx.load(*objs)

...

try:
    tx_load(user, tweet)
except bloop.MissingObjects as exc:
    missing = exc.objects
    print(f"failed to load {len(missing)} objects: {missing}")

Streams

Bloop provides a simple, pythonic interface to DynamoDB's complex Streams API. This abstracts away the minutiae of managing and refreshing iterators, tracking sequence numbers and shard splits, merging records from adjacent shards, and saving and loading processing state.

Warning

Chronological order is not guaranteed for high throughput streams.

DynamoDB guarantees ordering:

  • within any single shard

  • across shards for a single hash/range key

There is no way to exactly order records from adjacent shards. High throughput streams provide approximate ordering using each record's "ApproximateCreationDateTime".

Tables with a single partition guarantee order across all records.

See Stream Internals for details.

Enable Streaming

Add the following to a model's Meta to enable a stream with new and old objects in each record:

class User(BaseModel):
    class Meta:
        stream = {
            "include": ["new", "old"]
        }
    id = Column(Integer, hash_key=True)
    email = Column(String)
    verified = Column(Boolean)

engine.bind(User)

"include" has four possible values, matching StreamViewType:

{"keys"}, {"new"}, {"old"}, {"new", "old"}

Create a Stream

Next, create a stream on the model. This example starts at "trim_horizon" to get all records from the last 24 hours, but could also be "latest" to only return records created after the stream was instantiated.

>>> stream = engine.stream(User, "trim_horizon")

If you want to start at a certain point in time, you can also use a datetime.datetime. Creating streams at a specific time is very expensive, and will iterate all records since the stream's trim_horizon until the target time.

>>> stream = engine.stream(User, datetime.now() - timedelta(hours=12))

If you are trying to resume processing from the same position as another stream, you should load from a persisted Stream.token instead of using a specific time. See Pausing and Resuming for an example of a stream token.

>>> import json
>>> original_stream = engine.stream(User, "trim_horizon")
>>> with open("/tmp/state", "w") as f:
...     json.dump(original_stream.token, f)
...
# Some time later
>>> with open("/tmp/state", "r") as f:
...     token = json.load(f)
...
>>> stream = engine.stream(User, token)

Retrieve Records

You only need to call next() on a Stream to get the next record:

>>> record = next(stream)

If there are no records at the current position, record will be None. A common pattern is to poll immediately when a record is found, but to wait a small amount when no record is found.

>>> while True:
...     record = next(stream)
...     if not record:
...         time.sleep(0.2)
...     else:
...         process(record)
Record Structure

Each record is a dict with instances of the model in one or more of "key", "old", and "new". These are populated according to the stream's "include" above, as well as the event type. A key-only stream will never have new or old objects. If a stream includes new and old objects and the event type is delete, new will be None.

Save a new user, and then update the email address:

>>> user = User(id=3, email="user@domain.com")
>>> engine.save(user)
>>> user.email = "admin@domain.com"
>>> engine.save(user)

The first record won't have an old value, since it was the first time this item was saved:

>>> next(stream)
{'key': None,
 'old': None,
 'new': User(email='user@domain.com', id=3, verified=None),
 'meta': {
     'created_at': datetime.datetime(2016, 10, 23, ...),
     'event': {
         'id': '3fe6d339b7cb19a1474b3d853972c12a',
         'type': 'insert',
         'version': '1.1'},
     'sequence_number': '700000000007366876916'}
}

The second record shows the change to email, and has both old and new objects:

>>> next(stream)
{'key': None,
 'old': User(email='user@domain.com', id=3, verified=None),
 'new': User(email='admin@domain.com', id=3, verified=None),
 'meta': {
     'created_at': datetime.datetime(2016, 10, 23, ...),
     'event': {
         'id': '73a4b8568a85a0bcac25799f806df239',
         'type': 'modify',
         'version': '1.1'},
     'sequence_number': '800000000007366876936'}
}
Periodic Heartbeats

You should call Stream.heartbeat() at least every 14 minutes in your processing loop.

Iterators only last 15 minutes, and need to be refreshed periodically. There's no way to safely refresh an iterator that hasn't found a record. For example, refreshing an iterator at "latest" could miss records since the time that the previous iterator was at "latest". If you call this every 15 minutes, an iterator may expire due to clock skew or processing time.

Only iterators without sequence numbers will be refreshed. Once a shard finds a record it's skipped on every subsequent heartbeat. For a moderately active stream, heartbeat will make about one call per shard.

The following pattern will call heartbeat every 12 minutes (if record processing is quick):

>>> from datetime import datetime, timedelta
>>> now = datetime.now
>>> future = lambda: datetime.now() + timedelta(minutes=12)
>>>
>>> next_heartbeat = now()
>>> while True:
...     record = next(stream)
...     process(record)
...     if now() > next_heartbeat:
...         next_heartbeat = future()
...         stream.heartbeat()
Pausing and Resuming

Use Stream.token to save the current state and resume processing later:

>>> with open("/tmp/stream-token", "w") as f:
...     json.dump(stream.token, f, indent=2)
>>> with open("/tmp/stream-token", "r") as f:
...     token = json.load(f)
>>> stream = engine.stream(User, token)

When reloading from a token, Bloop will automatically prune shards that have expired, and extend the state to include new shards. Any iterators that fell behind the current trim_horizon will be moved to each of their children's trim_horizons.

Here's a token from a new stream. After 8-12 hours there will be one active shard, but also a few closed shards that form the lineage of the stream.

{
    "active": [
        "shardId-00000001477207595861-d35d208d"
    ],
    "shards": [
        {
            "iterator_type": "after_sequence",
            "sequence_number": "800000000007366876936",
            "shard_id": "shardId-00000001477207595861-d35d208d"
        }
    ],
    "stream_arn": "arn:.../stream/2016-10-23T07:26:33.312"
}
Moving Around

This function takes the same position argument as Engine.stream:

# Any stream token; this one rebuilds the
# stream in its current location
>>> stream.move_to(stream.token)

# Jump back in time 2 hours
>>> stream.move_to(datetime.now() - timedelta(hours=2))

# Move to the oldest record in the stream
>>> stream.move_to("trim_horizon")

As noted above, moving to a specific time is very expensive.

Types

Types are used when defining Columns and are responsible for translating between local values and their DynamoDB representations. For example, DateTime maps between datetime.now(timezone.utc) and "2016-08-09T01:16:25.322849+00:00".

DynamoDB is split into scalar types ("S", "N", "B", "BOOL") and vector types ("SS", "NS", "BS", "L", "M"). Bloop provides corresponding types, as well as a handful of useful derived types, such as DateTime and UUID.

For the full list of built-in types, see the Public API Reference.

Backing Types

In bloop, each Type must have a backing_type that is one of the DynamoDB types (except NULL). The valid DynamoDB types are:

  • "S" -- string

  • "N" -- number

  • "B" -- binary

  • "SS" -- string set

  • "NS" -- number set

  • "BS" -- binary set

  • "M" -- map

  • "L" -- list

  • "BOOL" -- boolean

Most types have a fixed backing_type, such as String and Map. Others like Set construct the backing_type when a new instance is created, based on the inner typedef.

Instantiation

In many cases, a Column will use a Type class. For example, this and similar constructs have been used throughout the User Guide:

>>> from bloop import Column, Number
>>> balance = Column(Number)

This is syntactic sugar for a common pattern, and the column is actually creating an instance of the Number type:

>>> balance = Column(Number())

Most types are simply a binding between a local python format and DynamoDB's wire format, and won't have any parameters. Some types have optional parameters that configure their behavior, such as Number:

>>> from bloop import Number
>>> from decimal import Context
>>> context = Context(Emin=-128, Emax=126, rounding=None, prec=38, traps=[...])
>>> created_at = Column(Number(context=context))

Finally, some types have required parameters and can't be instantiated by the Column directly:

>>> from bloop import Set
>>> Column(Set)
Traceback (most recent call last):
  ...
TypeError: __init__() missing 1 required positional argument: 'typedef'

These types must be instantiated when defining a column:

>>> from bloop import Integer
>>> Column(Set(Integer))

Note that Set is providing the same sugar, and actually creates an instance of its inner type:

>>> Column(Set(Integer()))

Scalar Types

Bloop provides the following 4 primitive scalar types:

These can be instantiated without a constructor, and map to the expected python types:

from bloop import BaseModel, Column, String, Number, Binary, Boolean

class Account(BaseModel):
    email = Column(String, hash_key=True)
    balance = Column(Number)
    public_key = Column(Binary)
    verified = Column(Boolean)

account = Account(
    email="user@domain.com",
    balance=4100,
    public_key=public_bytes(some_key),
    verified=False
)

Bloop also includes a handful of common scalar types that are built on top of the primitive types. The following demonstrates that hash and range key columns can be any Type that is backed by "S", "N", or "B" and not just the primitive types above.

import uuid
from datetime import datetime, timedelta, timezone
from bloop import DateTime, Timestamp, UUID, Integer


class Tweet(BaseModel):
    account_id = Column(Integer, hash_key=True)
    tweet_id = Column(UUID, range_key=True)
    created_at = Column(DateTime)
    delete_after = Column(Timestamp)

now = datetime.now(timezone.utc)
tomorrow = now + timedelta(days=1)
tweet = Tweet(
    account_id=3,
    tweet_id=uuid.uuid4(),
    created_at=now,
    delete_after=tomorrow
)

Note

Bloop's Number type uses a decimal.Context to control rounding and exactness. When exactness is not required, many people find the default context too conservative for practical use. For example, the default context can't save float('3.14') due to inexactness.

As noted in the Public API Reference, you can provide your own context or use an existing pattern. Keep in mind that the convenience comes at the expense of exactness.

Sets

Bloop exposes a single Set for all three sets. The particular set type is determined by the Set's inner type. For example, Set(Integer) has backing_type "NS" and Set(DateTime) has backing_type "SS".

The inner type must have a backing type of "S", "N", or "B". When Bloop loads or dumps a set, it defers to the inner type for each value in the set. Using the enum example below, a set of enums can be stored as follows:

>>> from bloop import BaseModel, Column, Set, Integer
>>> from my_types import StringEnum
>>> import enum
>>> class Colors(enum.Enum):
...     red = 1
...     green = 2
...     blue = 3
...
>>> class Palette(BaseModel):
...     id = Column(Integer, hash_key=True)
...     colors = Column(Set(StringEnum(Colors)))
...
>>> palette = Palette(id=0, colors={Colors.red, Colors.green})

The pallete.colors value would be persisted in DynamoDB as:

{"SS": ["red", "green"]}

Structured Documents

Bloop provides two types for each of DynamoDB's document types: List and DynamicList for lists, and Map and DynamicMap for maps.

When you know your document's types up front, Map and List are the best choice. Use these when your document is highly structured but you still want to use a DynamoDB document. You will need to declare the types of each key (or the list's single type) when you create the type:

MyDocument = Map(**{
    "first": String,
    "last": String,
    "age": Integer,
    "stuffed_animals": List(String),
    "nested": Map(**{
        "bs": Set(Binary),
        "ns": Set(Timestamp)
    })
})

Modeling your documents up front provides earlier validation of condition arguments, especially when using paths:

MyDocument["nested"]["ns"].contains(3) | MyDocument["stuffed_animals"][2].begins_with("BoatyMc")

If you want to store arbitrary lists and dicts without specifying types up front, see Dynamic Documents.

List

Unlike Set, a List's inner type can be anything, including other Lists, Sets, and Maps. Due to the lack of type information when loading values, Bloop's built-in List can only hold one type of value:

>>> from bloop import List, Set, Integer
>>> exams = Set(Integer)  # Unique scores for one student
>>> from bloop import BaseModel, Column
>>> class Semester(BaseModel):
...     id = Column(Integer, hash_key=True)
...     scores = List(exam_scores)  # All student scores
...
>>> semester = Semester(id=0, scores=[
...     {95, 98, 64, 32},
...     {0},
...     {64, 73, 75, 50, 52}
... ])

The semester's scores would be saved as (formatted for readability):

{"L": [
    {"NS": ['95', '98', '64', '32']},
    {"NS": ['0']},
    {"NS": ['64', '73', '75', '50', '52']},
]}
Map

As stated, Map doesn't support arbitrary types out of the box. Instead, you must provide the type to use for each key in the Map:

# Using kwargs directly
Map(username=String, wins=Integer)

# Unpacking from a dict
Metadata = Map(**{
    "created": DateTime,
    "referrer": UUID,
    "cache": String
})

Only defined keys will be loaded or saved. In the following, the impression's "version" metadata will not be saved:

class Impression(BaseModel):
    id = Column(UUID, hash_key=True)
    metadata = Column(Metadata)

impression = Impression(id=uuid.uuid4())
impression.metadata = {
    "created": datetime.now(timezone.utc),
    "referrer": referrer.id,
    "cache": "https://img-cache.s3.amazonaws.com/" + img.filename,
    "version": 1.1  # NOT SAVED
}

Warning

Saving a Map M in DynamoDB fully replaces the existing value.

Despite my desire to support partial updates, DynamoDB does not expose a way to reliably update a path within a Map. There is no way to upsert along a path:

I attempted a few other approaches, like having two update statements - first setting it to an empty map with the if_not_exists function, and then adding the child element, but that doesn't work because paths cannot overlap between expressions.

DavidY@AWS (emphasis added)

If DynamoDB ever allows overlapping paths in expressions, Bloop will be refactored to use partial updates for arbitrary types.

Given the thread's history, it doesn't look promising.

Dynamic Documents

As an alternative to Structured Documents, you can use a DynamicList or DynamicMap when your data is unstructured:

class User(BaseModel):
    ...
    bio = Column(DynamicMap)

user.bio = {
    "foo": [1, True, {b"23", b"24"}],
    "in": {"j": "k"}
}

Unfortunately, DynamicMap and DynamicList can only store the direct types for each DynamoDB backing type. These are:

This is because Bloop uses the type information from DynamoDB to load a python value. For example, when loading this value, it could be a String or a DateTime (or another custom type):

{"S": "2016-08-09T01:16:25.322849+00:00"}

Therefore dynamic types will only load the most direct corresponding type for each backing type.

Custom Types

Creating new types is straightforward. Most of the time, you'll only need to implement dynamo_dump() and dynamo_load(). Here's a type that stores an PIL.Image.Image as bytes:

import io
from PIL import Image

class ImageType(bloop.Binary):
    python_type = Image.Image

    def __init__(self, fmt="JPEG"):
        self.fmt = fmt
        super().__init__()

    def dynamo_dump(self, image, *, context, **kwargs):
        if image is None:
            return None
        buffer = io.BytesIO()
        image.save(buffer, format=self.fmt)
        return super().dynamo_dump(
            buffer.getvalue(), context=context, **kwargs)

    def dynamo_load(self, value, *, context, **kwargs):
        image_bytes = super().dynamo_load(
            value, context=context, **kwargs)
        if image_bytes is None:
            return None
        buffer = io.BytesIO(image_bytes)
        image = Image.open(buffer)
        return image

Now the model doesn't need to know how to load or save the image bytes, and just interacts with instances of Image:

class User(BaseModel):
    name = Column(String, hash_key=True)
    profile_image = Column(ImageType("PNG"))
engine.bind(User)

user = User(name="numberoverzero")
engine.load(user)

user.profile_image.rotate(90)
engine.save(user)
Missing and None

When there's no value for a Column that's being loaded, your type will need to handle None. For many types, None is the best sentinel to return for "this has no value" -- Most of the built-in types use None.

Bloop will transparently map None to empty values for types. For example, Set returns an empty set, so that you'll never need to check for None before adding and removing elements. Map will load None for the type associated with each of its keys, and insert those in the dict. String and Binary will replace None with "" and b"", respectively.

You will also need to handle None when dumping values to DynamoDB. This can happen when a value is deleted from a Model instance, or it's explicitly set to None. In almost all cases, your dynamo_dump function should simply return None to signal omission (or deletion, depending on the context).

You should return None when dumping empty values like list(), or DynamoDB will complain about setting something to an empty list or set. By returning None, Bloop will know to put that column in the DELETE section of the UpdateItem.

Example: String Enum

This is a simple Type that stores an enum.Enum by its string value.

class StringEnum(bloop.String):
    def __init__(self, enum_cls):
        self.enum_cls = enum_cls
        super().__init__()

    def dynamo_dump(self, value, *, context, **kwargs):
        if value is None:
            return value
        value = value.name
        return super().dynamo_dump(value, context=context, **kwargs)

    def dynamo_load(self, value, *, context, **kwargs):
        if value is None:
            return value
        value = super().dynamo_load(value, context=context, **kwargs)
        return self.enum_cls[value]

That's it! To see it in action, here's an enum:

import enum
class Color(enum.Enum):
    red = 1
    green = 2
    blue = 3

And using that in a model:

class Shirt(BaseModel):
    id = Column(String, hash_key=True)
    color = Column(StringEnum(Color))
engine.bind(Shirt)

shirt = Shirt(id="t-shirt", color=Color.red)
engine.save(shirt)
Example: Integer Enum

To instead store enums as their integer values, we can modify the enum class above:

class IntEnum(bloop.Integer):
    def __init__(self, enum_cls):
        self.enum_cls = enum_cls
        super().__init__()

    def dynamo_dump(self, value, *, context, **kwargs):
        if value is None:
            return value
        value = value.value
        return super().dynamo_dump(value, context=context, **kwargs)

    def dynamo_load(self, value, *, context, **kwargs):
        if value is None:
            return value
        value = super().dynamo_load(value, context=context, **kwargs)
        return self.enum_cls(value)

Type Validation

By default Bloop does not verify that each model's values have the correct types. For example, consider this model:

class Appointment(BaseModel):
    id = Column(UUID, hash_key=True)
    date = Column(DateTime)
    location = Column(String)

The following code won't throw type errors until we try to persist to DynamoDB:

>>> engine.bind(Appointment)
>>> a = Appointment(id="not-a-uuid")
>>> a.location = 421
>>> a
Appointment(id='not-a-uuid', location=421)

>>> engine.save(a)
ParamValidationError: ...

This is because Bloop is designed to be maximally customizable, and easily extend your existing object model framework. There's also no built-in way to specify that a column is non-nullable. For an example of adding both these constraints to your Column, see Customizing the Column Class. Alternatively, consider a more robust option such as the exceptional marshmallow. An example integrating with marshmallow and flask is available here.

Conditions

Conditions are used for:

Built-In Conditions

There is no DynamoDB type that supports all of the conditions. For example, contains does not work with a numeric type "N" such as Number or Integer. DynamoDB's ConditionExpression Reference has the full specification.

class Model(BaseModel):
    column = Column(SomeType)

# Comparisons
Model.column < value
Model.column <= value
Model.column == value
Model.column >= value
Model.column > value
Model.column != value

Model.column.begins_with(value)
Model.column.between(low, high)
Model.column.contains(value)
Model.column.in_([foo, bar, baz])
Model.column.is_(None)
Model.column.is_not(False)

# bitwise operators combine conditions
not_none = Model.column.is_not(None)
in_the_future = Model.column > now

in_the_past = ~in_the_future
either = not_none | in_the_future
both = not_none & in_the_future

Chained Conditions (AND, OR)

Bloop overloads the & and | operators for conditions, allowing you to more easily construct compound conditions. Some libraries allow you to chain filters with .filter(c1).filter(c2) or pass a list of conditions .filter([c1, c2]) but both of these forms struggle to express nested conditions, especially when expressing an OR operation.

For example, consider a query to find popular articles. We want either new articles with more than 100 likes, recent articles with more than 500 likes, or older articles with more than 1000 likes. We're running a spotlight on editor of the month "Nancy Stevens" so let's include those as well.

from datetime import datetime, timedelta, timezone
now = datetime.now(timezone.utc)
yesterday = now - timedelta(hours=12)
last_week = now - timedelta(days=7)
last_year = now - timedelta(weeks=52)

popular = (
    ((Article.likes >= 100) & (Article.publish_date >= yesterday)) |
    ((Article.likes >= 500) & (Article.publish_date >= last_week)) |
    ((Article.likes >= 1000) & (Article.publish_date >= last_year))
)
spotlight = Article.editor == "nstevens"

articles = engine.scan(Article, filter=popular|spotlight)

We can programmatically build conditions from a base of bloop.Condition, which is an empty condition. In the following example, editors may have come from a query param or form submission:

editors = ["nstevens", "jsmith", "bholly"]
condition = bloop.Condition()

for editor in editors:
    condition |= Article.editor == editor

articles = engine.scan(Article, filter=condition)

Although less frequently used, there is also the ~ operator to negate an existing condition. This is useful to flip a compound condition, rather than trying to invert all the intermediate operators. To find all the unpopular or non-spotlighted articles, we'll use the variables from the first example above:

popular = (...)  # see first example
spotlight = ...

popular_articles = engine.scan(Article, filter=popular|spotlight)
unpopular_articles = engine.scan(Article, filter=~(popular|spotlight))

Document Paths

You can construct conditions against individual elements of List and Map types with the usual indexing notation.

Item = Map(
    name=String,
    price=Number,
    quantity=Integer)
Metrics = Map(**{
    "payment-duration": Number,
    "coupons.used"=Integer,
    "coupons.available"=Integer
})
class Receipt(BaseModel):
    transaction_id = Column(UUID, column=True)
    total = Column(Integer)

    items = Column(List(Item))
    metrics = Column(Metrics)

Here are some basic conditions using paths:

Receipt.metrics["payment-duration"] > 30000
Receipt.items[0]["name"].begins_with("deli:salami:")

Signals

Signals (powered by blinker) allow you to easily respond to events. Bloop exposes a number of signals during model creation, validation, and as objects are loaded and saved.

>>> from bloop import model_created
>>> @model_created.connect
... def on_new_model(_, *, model, **__):
...     models.append(model)
...
>>> models = []

To disconnect a receiver:

>>> model_created.disconnect(on_new_model)

You can specify a sender to restrict who you receive notifications from. This simplifies many cross-region tasks, where multiple engines are sending the same type of notifications. For example, you can automatically bind and save models to a second region:

>>> @model_created.connect(sender=primary_engine)
>>> def on_new_model(_, model, **__):
...     secondary_engine.bind(model)
...
>>> @object_saved.connect(sender=primary_engine)
... def on_save(_, obj, **__):
...     secondary_engine.save(obj)

Parameters

Your receiver must accept **kwargs, and should only use _ or sender for the positional argument. The following templates are recommended for all receivers:

def receiver(_, *, kwarg1, kwarg2, **__):

def receiver(sender, *, kwarg1, kwarg2, **__):

Instead of forcing you to remember which parameter the sender is (engine? model?) Bloop sends every parameter as a kwarg. This means your receiver can always ignore the positional argument, and cherry pick the parameters you care about. The sender is accessed the same as all other parameters.

You can still specify a sender when you connect, but you should not use that parameter name in your function signature. For example, model_bound is sent by engine and includes engine and model. If you set up a receiver that names its first positional arg "engine", this causes a TypeError:

>>> @model_bound.connect
... def wrong_receiver(engine, model, **__):
...     pass
...
>>> model_bound.send("engine", model="model", engine="engine")
TypeError: wrong_receiver() got multiple values for argument 'engine'

Here's the correct version, which also filters on sender:

>>> @model_bound.connect(sender="engine")
... def correct_receiver(_, model, engine, **__):
...     print("Called!")
...
>>> model_bound.send("engine", model="model", engine="engine")
Called!

Note

  • New parameters can be added in a minor version.

  • A sender can be added to an anonymous signal in a minor version.

  • A major version can remove a parameter and remove or replace a sender.

Built-in Signals

See the Public API for a list of available signals.

Bloop Patterns

DynamoDB Local

Connect to a local DynamoDB instance. As of 2018-08-29 DynamoDBLocal still does not support features like TTL or ContinuousBackups (even in a stubbed capacity) which means you will need to patch the client for local testing.

import boto3
import bloop

dynamodb = boto3.client("dynamodb", endpoint_url="http://127.0.0.1:8000")
dynamodbstreams = boto3.client("dynamodbstreams", endpoint_url="http://127.0.0.1:8000")
engine = bloop.Engine(dynamodb=dynamodb, dynamodbstreams=dynamodbstreams)

To resolve missing features in DynamoDBLocal, you can patch the client (see below) or use an alternative to DynamoDBLocal such as localstack. Localstack isn't recommended until Issue #728 is addressed.

The following code is designed to be easily copied and pasted. When you set up your engine for local testing just import and call patch_engine to stub responses to missing methods. By default describe ttl and describe backups will return "DISABLED" for every table. You can use client.mock_ttl["my-table-name"] = True or client.mock_backups["my-table-name"] = True to instead return "ENABLED".

The original patching code used by bloop's integration tests can be found here while historical context on using DynamoDBLocal with bloop can be found in Issue #117.

# patch_local.py
import bloop


class PatchedDynamoDBClient:
    def __init__(self, real_client):
        self.__client = real_client
        self.mock_ttl = {}
        self.mock_backups = {}

    def describe_time_to_live(self, TableName, **_):
        r = "ENABLED" if self.mock_ttl.get(TableName) else "DISABLED"
        return {"TimeToLiveDescription": {"TimeToLiveStatus": r}}

    def describe_continuous_backups(self, TableName, **_):
        r = "ENABLED" if self.mock_backups.get(TableName) else "DISABLED"
        return {"ContinuousBackupsDescription": {"ContinuousBackupsStatus": r}}

    # TODO override any other methods that DynamoDBLocal doesn't provide

    def __getattr__(self, name):
        # use the original client for everything else
        return getattr(self.__client, name)


def patch_engine(engine):
    client = PatchedDynamoDBClient(engine.session.dynamodb_client)
    engine.session.dynamodb_client = client
    return client

And its usage, assuming you've saved the file as patch_local.py:

from .patch_local import patch_engine

# same 3 lines from above
dynamodb = boto3.client("dynamodb", endpoint_url="http://127.0.0.1:8000")
dynamodbstreams = boto3.client("dynamodbstreams", endpoint_url="http://127.0.0.1:8000")
engine = bloop.Engine(dynamodb=dynamodb, dynamodbstreams=dynamodbstreams)

client = patch_engine(engine)

client.mock_ttl["MyTableName"] = True
client.mock_backups["MyTableName"] = False

Generic "if not exist"

Create a condition for any model or object that fails the operation if the item already exists.

from bloop import Condition

def if_not_exist(obj):
    condition = Condition()
    for key in obj.Meta.keys:
        condition &= key.is_(None)
    return condition

tweet = Tweet(account=uuid.uuid4(), id="numberoverzero")

engine.save(tweet, condition=if_not_exist(tweet))
# or
engine.save(tweet, condition=if_not_exist(Tweet))

Snapshot Condition

Creates a condition that ensures the object hasn't changed in DynamoDb since you loaded it. You need to create the condition before you modify the object locally.

from bloop import Condition
from copy import deepcopy

def snapshot(obj):
    condition = Condition()
    for col in obj.Meta.columns:
        value = getattr(obj, col.name, None)
        # use a deep copy here for nested dicts, lists
        condition &= (col == deepcopy(value))
    return condition

And to use it:

from bloop_patterns import snapshot
from my_models import User

user = User(name="n/0")
engine.load(user)

# snapshot before any modifications
last_seen = snapshot(user)

# modify the object locally
user.verified = True

# save only if the state matches what was loaded
engine.save(user, condition=last_seen)

Float Type

A number type that loads values as floats but preserves the Decimal context recommended by DynamoDB when saving. While you could specify a relaxed decimal.Context in the constructor, that is strongly discouraged as it will cause issues comparing values.

class Float(Number):
    def dynamo_load(self, *args, **kwargs):
        return float(super.dynamo_load(*args, **kwargs))

Sharing Tables and Indexes

Bloop allows you to map multiple models to the same table. You can rename columns during init with the dynamo_name= param, change column types across models, and still use conditional operations. This flexibility extends to GSIs and LSIs as long as a Model's Index projects a subset of the actual Index. On shared tables, a shared index provides tighter query validation and reduces consumed throughput.

In the following (very contrived) example, the employees-uk table is used for both employees and managers. Queries against by_level provide emails for Employees of a certain level, and provides all directs for managers at a certain level.

class Employee(BaseModel):
    class Meta:
        table_name = "employees-uk"
    id = Column(UUID, hash_key=True)
    level = Column(Integer)
    email = Column(String)
    manager_id = Column(UUID)

    by_level = GlobalSecondaryIndex(
        projection={email}, hash_key=level)


class Manager(BaseModel):
    class Meta:
        table_name = "employees-uk"
    id = Column(UUID, hash_key=True)
    level = Column(Integer)
    email = Column(String)
    manager_id = Column(UUID)
    directs = Column(Set(UUID))

    by_level = GlobalSecondaryIndex(
        projection={directs}, hash_key=level)

Note

If you try to create these tables by binding the models, one of them will fail. If Employee is bound first, Manager won't see directs in the by_level GSI. You must create the indexes through the console, or use a dummy model.

def build_indexes(engine):
    """Call before binding Employee or Manager"""
    class _(BaseModel):
        class Meta:
            table_name = "employees-uk"
        id = Column(UUID, hash_key=True)
        level = Column(Integer)
        email = Column(String)
        manager_id = Column(UUID)
        directs = Column(Set(UUID))
        by_level = GlobalSecondaryIndex(
            projection={directs, email},
            hash_key=level)
    engine.bind(_)

Cross-Region Replication

Replicating the same model across multiple regions using streams is straightforward. We'll need one engine per region, which can be instantiated with the following helper:

import boto3
import bloop


def engine_for_region(region):
    dynamodb = boto3.client("dynamodb", region_name=region)
    dynamodbstreams = boto3.client("dynamodbstreams", region_name=region)
    return bloop.Engine(dynamodb=dynamodb, dynamodbstreams=dynamodbstreams)


src_engine = engine_for_region("us-west-2")
dst_engine = engine_for_region("us-east-1")

And here's our replication. This assumes that the model has been bound to both engines. Although this starts at the trim horizon, we'd usually keep track of progress somewhere else using Stream.token to avoid replicating stale changes (every run would start at trim_horizon).

stream = src_engine.stream(MyModelHere, "trim_horizon")
while True:
    record = next(stream)
    if not record:
        continue
    old, new = record["old"], record["new"]
    if new:
        dst_engine.save(new)
    else:
        dst_engine.delete(old)

This is a simplified example; see Periodic Heartbeats for automatically managing shard iterator expiration.

Customizing the Column Class

As mentioned in Type Validation, Bloop intentionally does not impose its own concept of type validation or a nullable constraint on columns. Instead, these can be trivially added to the existing Column class:

import bloop

class Column(bloop.Column):

    def __init__(self, *args, nullable=True, check_type=True, **kwargs):
        super().__init__(*args, **kwargs)
        self.nullable = nullable
        self.check_type = check_type

    def __set__(self, obj, value):
        if value is None:
            if self.nullable:
                return
            raise ValueError(f"{self!r} does not allow None")
        elif self.check_type and not isinstance(value, self.typedef.python_type):
            msg = "Tried to set {} with invalid type {} (expected {})"
            raise TypeError(msg.format(
                self.name, type(value),
                self.typedef.python_type
            ))
        super().__set__(obj, value)

Using this class, a type failure looks like:

>>> class Appointment(BaseModel):
...     id = Column(UUID, hash_key=True, nullable=False)
...     date = Column(DateTime)
...     location = Column(String, check_type=True)
>>> engine.bind(Appointment)
>>> appt = Appointment(id=uuid.uuid4())

>>> appt.id = None
ValueError: Tried to set id to None but column is not nullable
>>> appt.location = 3
TypeError: Tried to set location with invalid type <class 'int'> (expected <class 'str'>)

Json Serialization

When you're ready to serialize your objects for use in other systems you should reach for marshmallow. Marshmallow's context-specific serialization is useful for excluding fields for different consumers, such as internal account notes. You can specify multiple formats and switch based on use eg. base64 to send bytes over the wire or as raw bytes to write to disk.

But when you want to quickly send something over the wire, marshmallow can be heavy. The following is a drop in function for the default argument to json.dumps.

It is not intended for production use. For historical discussion, see Issue #135.

# bloop_serializer.py
import base64
import datetime
import decimal
import uuid
from bloop import BaseModel

def serialize(use_float: bool = True, explicit_none: bool = True):
    def default(obj):
        # bloop.Set[T]
        if isinstance(obj, set):
            return list(obj)
        # bloop.{Datetime,Timestamp}
        if isinstance(obj, datetime.datetime):
            return obj.isoformat()
        # bloop.UUID
        elif isinstance(obj, uuid.UUID):
            return str(obj)
        # bloop.Number
        elif isinstance(obj, decimal.Decimal):
            if use_float:
                return float(obj)
            return str(obj)
        # bloop.Binary
        elif isinstance(obj, bytes):
            return base64.b64encode(obj).decode("utf-8")
        # bloop.BaseModel
        elif isinstance(obj, BaseModel):
            return {
                c.name: getattr(obj, c.name, None)
                for c in obj.Meta.columns
                if hasattr(obj, c.name) or explicit_none
            }
        raise TypeError(f"Type {type(obj)} is not serializable")
    return default

To use the serializer, simply pass it to json.dumps:

import json
from bloop_serializer import serialize

user = User(...)
json.dumps(
    user,
    default=serialize(),
    indent=True, sort_keys=True
)

# render None/empty values as null instead of omitting
json.dumps(
    user,
    default=serialize(explicit_none=True),
    indent=True, sort_keys=True
)

Integrating with Marshmallow

Instead of adding your own validation layer to the Column class as detailed above you can easily leverage powerful libraries such as marshmallow and flask-marshmallow. Here's a self-contained example that uses flask and marshmallow to expose get and list operations for a User class:

from flask import Flask, jsonify
from flask_marshmallow import Marshmallow
from bloop import BaseModel, Column, Engine, Integer, String, DateTime
from datetime import datetime

app = Flask(__name__)
ma = Marshmallow(app)
engine = Engine()


class User(Model):
    def __init__(self, **kwargs):
        kwargs.setdefault("date_created", datetime.now())
        super().__init__(**kwargs)

    email = Column(String, hash_key=True)
    password = Column(String)
    date_created = Column(DateTime, default=lambda: datetime.now())

engine.bind(User)


class UserSchema(ma.Schema):
    class Meta:
        # Fields to expose
        fields = ["_links"]
        fields += [column.name for column in User.Meta.columns]
    # Smart hyperlinking
    _links = ma.Hyperlinks({
        'self': ma.URLFor('user_detail', id='<id>'),
        'collection': ma.URLFor('users')
    })

user_schema = UserSchema()
users_schema = UserSchema(many=True)


@app.route('/api/users/')
def users():
    all_users = list(engine.scan(User))
    result = users_schema.dump(all_users)
    return jsonify(result.data)

@app.route('/api/users/<id>')
def user_detail(id):
    user = User(id=id)
    engine.load(user)
    return user_schema.jsonify(user)

Bloop Extensions

Extension dependencies aren't installed with Bloop, because they may include a huge number of libraries that Bloop does not depend on. For example, two extensions could provide automatic mapping to Django or SQLAlchemy models. Many users would never need either of these, since Bloop does not depend on them for normal usage.

Bloop extensions are part of the Public API, and subject to its versioning policy.

DateTime and Timestamp

Working with python's datetime.datetime is tedious, but there are a number of popular libraries that improve the situation. Bloop includes drop-in replacements for the basic DateTime and Timestamp types for arrow, delorean, and pendulum through the extensions module. For example, let's swap out some code using the built-in DateTime:

import datetime
from bloop import DateTime
from bloop import BaseModel, Column, Integer

class User(BaseModel):
    id = Column(Integer, hash_key=True)
    created_on = Column(DateTime)

utc = datetime.timezone.utc
now = datetime.datetime.now(utc)

user = User(id=0, created_on=now)

Now, using pendulum:

import pendulum
from bloop.ext.pendulum import DateTime
from bloop import BaseModel, Column, Integer

class User(BaseModel):
    id = Column(Integer, hash_key=True)
    created_on = Column(DateTime)

now = pendulum.now("utc")

user = User(id=0, created_on=now)

Now, using arrow:

import arrow
from bloop.ext.arrow import DateTime
from bloop import BaseModel, Column, Integer

class User(BaseModel):
    id = Column(Integer, hash_key=True)
    created_on = Column(DateTime)

now = arrow.now("utc")

user = User(id=0, created_on=now)

Public

Engine

By default, Bloop will build clients directly from boto3.client(). To customize the engine's connection, you can provide your own DynamoDB and DynamoDBStreams clients:

import bloop
import boto3

dynamodb_local = boto3.client("dynamodb", endpoint_url="http://127.0.0.1:8000")
streams_local = boto3.client("dynamodbstreams", endpoint_url="http://127.0.0.1:8001")

engine = bloop.Engine(
    dynamodb=dynamodb_local,
    dynamodbstreams=streams_local)
class bloop.engine.Engine(*, dynamodb=None, dynamodbstreams=None, table_name_template: Union[str, Callable[Any, str]] = '{table_name}')[source]

Primary means of interacting with DynamoDB.

To apply a prefix to each model's table name, you can use a simple format string:

>>> template = "my-prefix-{table_name}"
>>> engine = Engine(table_name_template=template)

For more complex table_name customization, you can provide a function:

>>> def reverse_name(model):
...     return model.Meta.table_name[::-1]
>>> engine = Engine(table_name_template=reverse_name)
Parameters
  • dynamodb -- DynamoDB client. Defaults to boto3.client("dynamodb").

  • dynamodbstreams -- DynamoDBStreams client. Defaults to boto3.client("dynamodbstreams").

  • table_name_template -- Customize the table name of each model bound to the engine. If a string is provided, string.format(table_name=model.Meta.table_name) will be called. If a function is provided, the function will be called with the model as its sole argument. Defaults to "{table_name}".

bind(model, *, skip_table_setup=False)[source]

Create backing tables for a model and its non-abstract subclasses.

Parameters
  • model -- Base model to bind. Can be abstract.

  • skip_table_setup -- Don't create or verify the table in DynamoDB. Default is False.

Raises

bloop.exceptions.InvalidModel -- if model is not a subclass of BaseModel.

delete(*objs, condition=None, sync=None)[source]

Delete one or more objects.

Parameters
  • objs -- objects to delete.

  • condition -- only perform each delete if this condition holds.

  • sync -- update objects after deleting. "old" loads attributes before the delete; None does not mutate the object locally. Default is None.

Raises

bloop.exceptions.ConstraintViolation -- if the condition (or atomic) is not met.

load(*objs, consistent=False)[source]

Populate objects from DynamoDB.

Parameters
Raises
query(model_or_index, key, filter=None, projection='all', consistent=False, forward=True)[source]

Create a reusable QueryIterator.

Parameters
  • model_or_index -- A model or index to query. For example, User or User.by_email.

  • key -- Key condition. This must include an equality against the hash key, and optionally one of a restricted set of conditions on the range key.

  • filter -- Filter condition. Only matching objects will be included in the results.

  • projection -- "all", "count", a set of column names, or a set of Column. When projection is "count", you must advance the iterator to retrieve the count.

  • consistent (bool) -- Use strongly consistent reads if True. Default is False.

  • forward (bool) -- Query in ascending or descending order. Default is True (ascending).

Returns

A reusable query iterator with helper methods.

Return type

QueryIterator

save(*objs, condition=None, sync=None)[source]

Save one or more objects.

Parameters
  • objs -- objects to save.

  • condition -- only perform each save if this condition holds.

  • sync -- update objects after saving. "new" loads attributes after the save; "old" loads attributes before the save; None does not mutate the object locally. Default is None.

Raises

bloop.exceptions.ConstraintViolation -- if the condition (or atomic) is not met.

scan(model_or_index, filter=None, projection='all', consistent=False, parallel=None)[source]

Create a reusable ScanIterator.

Parameters
  • model_or_index -- A model or index to scan. For example, User or User.by_email.

  • filter -- Filter condition. Only matching objects will be included in the results.

  • projection -- "all", "count", a list of column names, or a list of Column. When projection is "count", you must exhaust the iterator to retrieve the count.

  • consistent (bool) -- Use strongly consistent reads if True. Default is False.

  • parallel (tuple) -- Perform a parallel scan. A tuple of (Segment, TotalSegments) for this portion the scan. Default is None.

Returns

A reusable scan iterator with helper methods.

Return type

ScanIterator

stream(model, position)[source]

Create a Stream that provides approximate chronological ordering.

# Create a user so we have a record
>>> engine = Engine()
>>> user = User(id=3, email="user@domain.com")
>>> engine.save(user)
>>> user.email = "admin@domain.com"
>>> engine.save(user)

# First record lacks an "old" value since it's an insert
>>> stream = engine.stream(User, "trim_horizon")
>>> next(stream)
{'key': None,
 'old': None,
 'new': User(email='user@domain.com', id=3, verified=None),
 'meta': {
     'created_at': datetime.datetime(2016, 10, 23, ...),
     'event': {
         'id': '3fe6d339b7cb19a1474b3d853972c12a',
         'type': 'insert',
         'version': '1.1'},
     'sequence_number': '700000000007366876916'}
}
Parameters
  • model -- The model to stream records from.

  • position -- "trim_horizon", "latest", a stream token, or a datetime.datetime.

Returns

An iterator for records in all shards.

Return type

Stream

Raises

bloop.exceptions.InvalidStream -- if the model does not have a stream.

transaction(mode='w')[source]

Create a new ReadTransaction or WriteTransaction.

As a context manager, calling commit when the block exits:

>>> engine = Engine()
>>> user = User(id=3, email="user@domain.com")
>>> tweet = Tweet(id=42, data="hello, world")
>>> with engine.transaction("w") as tx:
...     tx.delete(user)
...     tx.save(tweet, condition=Tweet.id.is_(None))

Or manually calling prepare and commit:

>>> engine = Engine()
>>> user = User(id=3, email="user@domain.com")
>>> tweet = Tweet(id=42, data="hello, world")
>>> tx = engine.transaction("w")
>>> tx.delete(user)
>>> tx.save(tweet, condition=Tweet.id.is_(None))
>>> tx.prepare().commit()
Parameters

mode (str) -- Either "r" or "w" to create a ReadTransaction or WriteTransaction. Default is "w"

Returns

A new transaction that can be committed.

Return type

ReadTransaction or WriteTransaction

Models

See defining models in the User Guide.

BaseModel
class bloop.models.BaseModel(**attrs)[source]

Abstract base that all models derive from.

Provides a basic __init__ method that takes **kwargs whose keys are columns names:

class URL(BaseModel):
    id = Column(UUID, hash_key=True)
    ip = Column(IPv6)
    name = Column(String)

url = URL(id=uuid.uuid4(), name="google")

By default, the __init__ method is not called when new instances are required, for example when iterating results from Query, Scan or a Stream.

Meta[source]

Holds table configuration and computed properties of the model. See model meta in the User Guide.

Column
class bloop.models.Column(typedef, hash_key=False, range_key=False, dynamo_name=None, default=<Sentinel[missing]>)[source]
default

A no-arg function used during instantiation of the column's model. Returns bloop.util.missing when the column does not have a default. Defaults to lambda: bloop.util.missing.

dynamo_name

The name of this column in DynamoDB. Defaults to the column's name.

hash_key

True if this is the model's hash key.

model

The model this column is attached to.

name

The name of this column in the model. Not settable.

>>> class Document(BaseModel):
...     ...
...     cheat_codes = Column(Set(String), dynamo_name="cc")
...
>>> Document.cheat_codes.name
cheat_codes
>>> Document.cheat_codes.dynamo_name
cc
range_key

True if this is the model's range key.

__copy__()[source]

Create a shallow copy of this Column. Primarily used when initializing models that subclass other abstract models or mixins (baseless classes that contain Columns and Indexes). You can override this method to change how derived models are created:

import copy


class MyColumn(Column):
    def __copy__(self):
        new = super().__copy__()
        new.derived = True
        return new


column = MyColumn(Integer)
same = copy.copy(column)
assert same.derived  # True
Returns

A shallow copy of this Column, with the model and _name attributes unset.

GlobalSecondaryIndex
class bloop.models.GlobalSecondaryIndex(*, projection, hash_key, range_key=None, read_units=None, write_units=None, dynamo_name=None, **kwargs)[source]

See GlobalSecondaryIndex in the DynamoDB Developer Guide for details.

Parameters
  • projection -- Either "keys", "all", or a list of column name or objects. Included columns will be projected into the index. Key columns are always included.

  • hash_key -- The column that the index can be queried against.

  • range_key -- (Optional) The column that the index can be sorted on. Default is None.

  • read_units (int) -- (Optional) Provisioned read units for the index. Default is None. When no value is provided and the index does not exist, it will be created with 1 read unit. If the index already exists, it will use the actual index's read units.

  • write_units (int) -- (Optional) Provisioned write units for the index. Default is None. When no value is provided and the index does not exist, it will be created with 1 write unit. If the index already exists, it will use the actual index's write units.

  • dynamo_name (str) -- (Optional) The index's name in in DynamoDB. Defaults to the index’s name in the model.

dynamo_name

The name of this index in DynamoDB. Defaults to the index's name.

hash_key

The column that the index can be queried against.

model

The model this index is attached to.

name

The name of this index in the model. Not settable.

>>> class Document(BaseModel):
...     ...
...     by_email = GlobalSecondaryIndex(
...         projection="keys", dynamo_name="ind_e", hash_key="email")
...
>>> Document.by_email.name
by_email
>>> Document.by_email.dynamo_name
ind_e
projection
{
    "available":  # Set of columns that can be returned from a query or search.
    "included":   # Set of columns that can be used in query and scan filters.
    "mode":       # "all", "keys", or "include"
    "strict":     # False if queries and scans can fetch non-included columns
}

GSIs can't incur extra reads, so "strict" will always be true and "available" is always the same as "included".

range_key

The column that the index can be sorted on. May be None.

read_units

Provisioned read units for the index. GSIs have their own provisioned throughput.

write_units

Provisioned write units for the index. GSIs have their own provisioned throughput.

__copy__()

Create a shallow copy of this Index. Primarily used when initializing models that subclass other abstract models or mixins (baseless classes that contain Columns and Indexes). You can override this method to change how derived models are created:

import copy


class MyIndex(Index):
    def __copy__(self):
        new = super().__copy__()
        new.derived = True
        return new


index = MyIndex(projection="keys", hash_key="some_column")
same = copy.copy(index)
assert same.derived  # True
Returns

A shallow copy of this Index, with the model and _name attributes unset, and the computed projection invalidated.

LocalSecondaryIndex
class bloop.models.LocalSecondaryIndex(*, projection, range_key, dynamo_name=None, strict=True, **kwargs)[source]

See LocalSecondaryIndex in the DynamoDB Developer Guide for details.

Unlike GlobalSecondaryIndex each LSI shares its throughput with the table and their hash key is always the table hash key.

Parameters
  • projection -- Either "keys", "all", or a list of column name or objects. Included columns will be projected into the index. Key columns are always included.

  • range_key -- The column that the index can be sorted against.

  • dynamo_name (str) -- (Optional) The index's name in in DynamoDB. Defaults to the index’s name in the model.

  • strict (bool) -- (Optional) Restricts queries and scans on the LSI to columns in the projection. When False, DynamoDB may silently incur additional reads to load results. You should not disable this unless you have an explicit need. Default is True.

dynamo_name

The name of this index in DynamoDB. Defaults to the index's name.

hash_key

LSI's hash_key is always the table hash_key.

model

The model this index is attached to.

name

The name of this index in the model. Not settable.

>>> class Document(BaseModel):
...     ...
...     by_date = LocalSecondaryIndex(
...         projection="keys", dynamo_name="ind_co", range_key="created_on")
...
>>> Document.by_date.name
by_date
>>> Document.by_date.dynamo_name
ind_co
projection
{
    "available":  # Set of columns that can be returned from a query or search.
    "included":   # Set of columns that can be used in query and scan filters.
    "mode":       # "all", "keys", or "include"
    "strict":     # False if queries and scans can fetch non-included columns
}

LSIs can incur extra reads, so "available" may be a superset of "included".

range_key

The column that the index can be sorted on. LSIs always have a range_key.

read_units

Provisioned read units for the index. LSIs share the table's provisioned throughput.

write_units

Provisioned write units for the index. LSIs share the table's provisioned throughput.

__copy__()

Create a shallow copy of this Index. Primarily used when initializing models that subclass other abstract models or mixins (baseless classes that contain Columns and Indexes). You can override this method to change how derived models are created:

import copy


class MyIndex(Index):
    def __copy__(self):
        new = super().__copy__()
        new.derived = True
        return new


index = MyIndex(projection="keys", hash_key="some_column")
same = copy.copy(index)
assert same.derived  # True
Returns

A shallow copy of this Index, with the model and _name attributes unset, and the computed projection invalidated.

Types

Most custom types only need to specify a backing_type (or subclass a built-in type) and override dynamo_dump() and dynamo_load():

class ReversedString(Type):
    python_type = str
    backing_type = "S"

    def dynamo_load(self, value, *, context, **kwargs):
        return str(value[::-1])

    def dynamo_dump(self, value, *, context, **kwargs):
        return str(value[::-1])

If a type's constructor doesn't have required args, a Column can use the class directly. The column will create an instance of the type by calling the constructor without any args:

class SomeModel(BaseModel):
    custom_hash_key = Column(ReversedString, hash_key=True)

In rare cases, complex types may need to implement _dump() or _load().

Type
class bloop.types.Type[source]

Abstract base type.

python_type

The type local values will have. Informational only, this is not used for validation.

backing_type

The DynamoDB type that Bloop will store values as.

One of:

  • "S" -- string

  • "N" -- number

  • "B" -- binary

  • "SS" -- string set

  • "NS" -- number set

  • "BS" -- binary set

  • "M" -- map

  • "L" -- list

  • "BOOL" -- boolean

See the DynamoDB API Reference for details.

supports_operation(operation: str) bool[source]

Used to ensure a conditional operation is supported by this type.

By default, uses a hardcoded table of operations that maps to each backing DynamoDB type.

You can override this method to implement your own conditional operators, or to dynamically adjust which operations your type supports.

dynamo_dump(value, *, context, **kwargs)[source]

Converts a local value into a DynamoDB value.

For example, to store a string enum as an integer:

def dynamo_dump(self, value, *, context, **kwargs):
    colors = ["red", "blue", "green"]
    return colors.index(value.lower())
dynamo_load(value, *, context, **kwargs)[source]

Converts a DynamoDB value into a local value.

For example, to load a string enum from an integer:

def dynamo_dump(self, value, *, context, **kwargs):
    colors = ["red", "blue", "green"]
    return colors[value]
_dump(value, **kwargs)[source]

Entry point for serializing values. Most custom types should use dynamo_dump().

This wraps the return value of dynamo_dump() in DynamoDB's wire format. For example, serializing a string enum to an int:

value = "green"
# dynamo_dump("green") = 2
_dump(value) == {"N": 2}

If a complex type calls this function with None, it will forward None to dynamo_dump(). This can happen when dumping eg. a sparse Map, or a missing (not set) value.

_load(value, **kwargs)[source]

Entry point for deserializing values. Most custom types should use dynamo_load().

This unpacks DynamoDB's wire format and calls dynamo_load() on the inner value. For example, deserializing an int to a string enum:

value = {"N": 2}
# dynamo_load(2) = "green"
_load(value) == "green"

If a complex type calls this function with None, it will forward None to dynamo_load(). This can happen when loading eg. a sparse Map.

String
class bloop.types.String[source]
backing_type = "S"
python_type = str
Number

You should use decimal.Decimal instances to avoid rounding errors:

>>> from bloop import BaseModel, Engine, Column, Number, Integer
>>> class Product(BaseModel):
...     id = Column(Integer, hash_key=True)
...     rating = Column(Number)

>>> engine = Engine()
>>> engine.bind(Rating)

>>> product = Product(id=0, rating=3.14)
>>> engine.save(product)
# Long traceback
Inexact: [<class 'decimal.Inexact'>, <class 'decimal.Rounded'>]

>>> from decimal import Decimal
>>> product.rating = Decimal('3.14')
>>> engine.save(product)
>>> # Success!
class bloop.types.Number(context=None)[source]

Base for all numeric types.

Parameters

context -- (Optional) decimal.Context used to translate numbers. Default is a context that matches DynamoDB's stated limits, taken from boto3.

See also

If you don't want to deal with decimal.Decimal, see the Float type in the patterns section.

backing_type = "N"
python_type = decimal.Decimal
context = decimal.Context

The context used to transfer numbers to DynamoDB.

Binary
class bloop.types.Binary[source]
backing_type = "B"
python_type = bytes
Boolean
class bloop.types.Boolean[source]
backing_type = "BOOL"
python_type = bool
UUID
class bloop.types.UUID[source]
backing_type = "S"
python_type = uuid.UUID
DateTime
bloop.types.FIXED_ISO8601_FORMAT

DateTimes must be stored in DynamoDB in UTC with this exact format, and a +00:00 suffix. This is necessary for using comparison operators such as > and <= on DateTime instance.

You must not use "Z" or any other suffix than "+00:00" to indicate UTC. You must not omit the timezone specifier.

class bloop.types.DateTime[source]

Always stored in DynamoDB using the FIXED_ISO8601_FORMAT format.

Naive datetimes (tzinfo is None) are not supported, and trying to use one will raise ValueError.

from datetime import datetime, timedelta, timezone

class Model(Base):
    id = Column(Integer, hash_key=True)
    date = Column(DateTime)
engine.bind()

obj = Model(id=1, date=datetime.now(timezone.utc))
engine.save(obj)

one_day_ago = datetime.now(timezone.utc) - timedelta(days=1)

query = engine.query(
    Model,
    key=Model.id==1,
    filter=Model.date >= one_day_ago)

query.first().date

Note

To use common datetime libraries such as arrow, delorean, or pendulum, see DateTime and Timestamp Extensions in the user guide. These are drop-in replacements and support non-utc timezones:

from bloop import DateTime  # becomes:
from bloop.ext.pendulum import DateTime
backing_type = "S"
python_type = datetime.datetime
Timestamp
class bloop.types.Timestamp(context=None)[source]

Stores the unix (epoch) time in seconds. Milliseconds are truncated to 0 on load and save.

Naive datetimes (tzinfo is None) are not supported, and trying to use one will raise ValueError.

from datetime import datetime, timedelta, timezone

class Model(Base):
    id = Column(Integer, hash_key=True)
    date = Column(Timestamp)
engine.bind()

obj = Model(id=1, date=datetime.now(timezone.utc))
engine.save(obj)

one_day_ago = datetime.now(timezone.utc) - timedelta(days=1)

query = engine.query(
    Model,
    key=Model.id==1,
    filter=Model.date >= one_day_ago)

query.first().date

Note

To use common datetime libraries such as arrow, delorean, or pendulum, see DateTime and Timestamp Extensions in the user guide. These are drop-in replacements and support non-utc timezones:

from bloop import Timestamp  # becomes:
from bloop.ext.pendulum import Timestamp
backing_type = "N"
python_type = datetime.datetime
Integer
class bloop.types.Integer(context=None)[source]

Truncates values when loading or dumping.

For example, 3.14 in DynamoDB is loaded as 3. If a value is 7.5 locally, it's stored in DynamoDB as 7.

backing_type = "N"
python_type = int
context = decimal.Context

The context used to transfer numbers to DynamoDB.

Set
class bloop.types.Set(typedef)[source]

Generic set type. Must provide an inner type.

class Customer(BaseModel):
    id = Column(Integer, hash_key=True)
    account_ids = Column(Set(UUID))
Parameters

typedef -- The type to use when loading and saving values in this set. Must have a backing_type of "S", "N", or "B".

backing_type = "SS", "NS", or "BS"

Set is not a standalone type; its backing type depends on the inner type its constructor receives. For example, Set(DateTime) has backing type "SS" because DateTime has backing type "S".

python_type = set
inner_typedef = Type

The typedef for values in this Set. Has a backing type of "S", "N", or "B".

List
class bloop.types.List(typedef)[source]

Holds values of a single type.

Similar to Set because it requires a single type. However, that type can be another List, or Map, or Boolean. This is restricted to a single type even though DynamoDB is not because there is no way to know which Type to load a DynamoDB value with.

For example, {"S": "6d8b54a2-fa07-47e1-9305-717699459293"} could be loaded with UUID, String, or any other class that is backed by "S".

SingleQuizAnswers = List(String)

class AnswerBook(BaseModel):
    ...
    all_answers = Column(List(SingleQuizAnswers))

See also

To store arbitrary lists, see DynamicList.

Parameters

typedef -- The type to use when loading and saving values in this list.

backing_type = "L"
python_type = list
inner_typedef = Type

The typedef for values in this List. All types supported.

Map
class bloop.types.Map(**types)[source]

Mapping of fixed keys and their Types.

Metadata = Map(**{
    "created": DateTime,
    "referrer": UUID,
    "cache": String
})

Product = Map(
    id=Integer,
    metadata=Metadata,
    price=Number
)

class ProductCatalog(BaseModel):
    ...
    all_products = Column(List(Product))

See also

To store arbitrary documents, see DynamicMap.

Parameters

types -- (Optional) specifies the keys and their Types when loading and dumping the Map. Any keys that aren't specified in types are ignored when loading and dumping.

backing_type = "M"
python_type = dict
types = dict

Specifies the Type for each key in the Map. For example, a Map with two keys "id" and "rating" that are a UUID and Number respectively would have the following types:

{
    "id": UUID(),
    "rating": Number()
}
DynamicList
class bloop.types.DynamicList[source]

Holds a list of arbitrary values, including other DynamicLists and DynamicMaps.

Similar to List but is not constrained to a single type.

value = [1, True, "f"]
DynamicList()._dump(value)
    -> {"L": [{"N": "1"}, {"BOOL": true}, {"S": "f"}]}

Note

Values will only be loaded and dumped as their DynamoDB backing types. This means datetimes and uuids are stored and loaded as strings, and timestamps are stored and loaded as integers. For more information, see Dynamic Documents.

backing_type = "L"
python_type = list
DynamicMap
class bloop.types.DynamicMap[source]

Holds a dictionary of arbitrary values, including other DynamicLists and DynamicMaps.

Similar to Map but is not constrained to a single type.

value = {"f": 1, "in": [True]]
DynamicMap()._dump(value)
    -> {"M": {"f": {"N": 1}, "in": {"L": [{"BOOL": true}]}}}

Note

Values will only be loaded and dumped as their DynamoDB backing types. This means datetimes and uuids are stored and loaded as strings, and timestamps are stored and loaded as integers. For more information, see Dynamic Documents.

backing_type = "M"
python_type = dict

Actions

In most cases you do not need an action. However, you can use bloop.actions.add() to change a numeric value or a set's members without reading it, or bloop.actions.delete() to change a set's members without reading it.

As mentioned in the Atomic Counters section of the DynamoDB Developer Guide, you should understand the limitations of atomic counters and be

bloop.actions.add(value)[source]

Create a new ADD action.

The ADD action only supports Number and Set data types. In addition, ADD can only be used on top-level attributes, not nested attributes.

>>> import bloop.actions
>>> from my_models import Website
>>> website = Website(...)
>>> website.views = bloop.actions.add(1)
>>> website.remote_addrs = bloop.actions.add({"::0", "localhost"})
bloop.actions.delete(value)[source]

Create a new DELETE action.

The DELETE action only supports Set data types. In addition, DELETE can only be used on top-level attributes, not nested attributes.

>>> import bloop.actions
>>> from my_models import Website
>>> website = Website(...)
>>> website.remote_addrs = bloop.actions.delete({"::0", "localhost"})
bloop.actions.remove(value=None)[source]

Create a new REMOVE action.

Most types automatically create this action when you use del obj.some_attr or obj.some_attr = None

>>> import bloop.actions
>>> from my_models import User
>>> user = User(...)
# equivalent
>>> user.shell = None
>>> user.shell = bloop.actions.remove(None)
bloop.actions.set(value)[source]

Create a new SET action.

Most types automatically create this action when you use obj.some_attr = value

>>> import bloop.actions
>>> from my_models import User
>>> user = User(...)
# equivalent
>>> user.shell = "/bin/sh"
>>> user.shell = bloop.actions.set("/bin/sh")

Query

class bloop.search.QueryIterator(*, engine, model, index, request, projected)[source]

Reusable query iterator that unpacks result dicts into model instances.

Returned from Engine.query.

Parameters
  • engine -- Engine to unpack models with.

  • model -- BaseModel being queried.

  • index -- Index to query, or None.

  • request (dict) -- The base request dict for each Query call.

  • projected (set) -- Set of Column that should be included in each result.

all()

Eagerly load all results and return a single list. If there are no results, the list is empty.

Returns

A list of results.

property count

Number of items that have been loaded from DynamoDB so far, including buffered items.

property exhausted

True if there are no more results.

first()

Return the first result. If there are no results, raises ConstraintViolation.

Returns

The first result.

Raises

bloop.exceptions.ConstraintViolation -- No results.

move_to(token)

Restore an iterator to the state stored in a token. This will reset all iterator state, including count, scanned, and exhausted properties.

Parameters

token -- a SearchIterator.token

one()

Return the unique result. If there is not exactly one result, raises ConstraintViolation.

Returns

The unique result.

Raises

bloop.exceptions.ConstraintViolation -- Not exactly one result.

reset()

Reset to the initial state, clearing the buffer and zeroing count and scanned.

property scanned

Number of items that DynamoDB evaluated, before any filter was applied.

property token

JSON-serializable representation of the current SearchIterator state.

Use iterator.move_to(token) to move an iterator to this position.

Implementations will always include a "ExclusiveStartKey" key but may include additional metadata. The iterator's count and scanned values are not preserved.

Returns

Iterator state as a json-friendly dict

Scan

class bloop.search.ScanIterator(*, engine, model, index, request, projected)[source]

Reusable scan iterator that unpacks result dicts into model instances.

Returned from Engine.scan.

Parameters
  • engine -- Engine to unpack models with.

  • model -- BaseModel being scanned.

  • index -- Index to scan, or None.

  • request (dict) -- The base request dict for each Scan call.

  • projected (set) -- Set of Column that should be included in each result.

all()

Eagerly load all results and return a single list. If there are no results, the list is empty.

Returns

A list of results.

property count

Number of items that have been loaded from DynamoDB so far, including buffered items.

property exhausted

True if there are no more results.

first()

Return the first result. If there are no results, raises ConstraintViolation.

Returns

The first result.

Raises

bloop.exceptions.ConstraintViolation -- No results.

move_to(token)

Restore an iterator to the state stored in a token. This will reset all iterator state, including count, scanned, and exhausted properties.

Parameters

token -- a SearchIterator.token

one()

Return the unique result. If there is not exactly one result, raises ConstraintViolation.

Returns

The unique result.

Raises

bloop.exceptions.ConstraintViolation -- Not exactly one result.

reset()

Reset to the initial state, clearing the buffer and zeroing count and scanned.

property scanned

Number of items that DynamoDB evaluated, before any filter was applied.

property token

JSON-serializable representation of the current SearchIterator state.

Use iterator.move_to(token) to move an iterator to this position.

Implementations will always include a "ExclusiveStartKey" key but may include additional metadata. The iterator's count and scanned values are not preserved.

Returns

Iterator state as a json-friendly dict

Stream

Engine.stream() is the recommended way to create a stream. If you manually create a stream, you will need to call move_to() before iterating the Stream.

Warning

Chronological order is not guaranteed for high throughput streams.

DynamoDB guarantees ordering:

  • within any single shard

  • across shards for a single hash/range key

There is no way to exactly order records from adjacent shards. High throughput streams provide approximate ordering using each record's "ApproximateCreationDateTime".

Tables with a single partition guarantee order across all records.

See Stream Internals for details.

class bloop.stream.Stream(*, model, engine)[source]

Iterator over all records in a stream.

Parameters
  • model -- The model to stream records from.

  • engine (Engine) -- The engine to load model objects through.

heartbeat()[source]

Refresh iterators without sequence numbers so they don't expire.

Call this at least every 14 minutes.

move_to(position)[source]

Move the Stream to a specific endpoint or time, or load state from a token.

Moving to an endpoint with "trim_horizon" or "latest" and loading from a previous token are both very efficient.

In contrast, seeking to a specific time requires iterating all records in the stream up to that time. This can be very expensive. Once you have moved a stream to a time, you should save the Stream.token so reloading will be extremely fast.

Parameters

position -- "trim_horizon", "latest", datetime, or a Stream.token

property token

JSON-serializable representation of the current Stream state.

Use Engine.stream(YourModel, token) to create an identical stream, or stream.move_to(token) to move an existing stream to this position.

Returns

Stream state as a json-friendly dict

Return type

dict

Transactions

class bloop.transactions.ReadTransaction(engine)[source]

Loads all items in the same transaction. Items can be from different models and tables.

load(*objs) bloop.transactions.ReadTransaction[source]

Add one or more objects to be loaded in this transaction.

At most 10 items can be loaded in the same transaction. All objects will be loaded each time you call commit().

Parameters

objs -- Objects to add to the set that are loaded in this transaction.

Returns

this transaction for chaining

Raises

bloop.exceptions.MissingObjects -- if one or more objects aren't loaded.

prepare()

Create a new PreparedTransaction that can be committed.

This is called automatically when exiting the transaction as a context:

>>> engine = Engine()
>>> tx = WriteTransaction(engine)
>>> prepared = tx.prepare()
>>> prepared.commit()

# automatically calls commit when exiting
>>> with WriteTransaction(engine) as tx:
...     # modify the transaction here
...     pass
>>> # tx commits here
Returns

class bloop.transactions.WriteTransaction(engine)[source]

Applies all updates in the same transaction. Items can be from different models and tables.

As with an engine, you can apply conditions to each object that you save or delete, or a condition for the entire transaction that won't modify the specified object:

# condition on some_obj
>>> tx.save(some_obj, condition=SomeModel.name.begins_with("foo"))
# condition on the tx, based on the values of some_other_obj
>>> tx.check(some_other_obj, condition=ThatModel.capacity >= 100)
check(obj, condition) bloop.transactions.WriteTransaction[source]

Add a condition which must be met for the transaction to commit.

While the condition is checked against the provided object, that object will not be modified. It is only used to provide the hash and range key to apply the condition to.

At most 10 items can be checked, saved, or deleted in the same transaction. The same idempotency token will be used for a single prepared transaction, which allows you to safely call commit on the PreparedCommit object multiple times.

Parameters
  • obj -- The object to use for the transaction condition. This object will not be modified.

  • condition -- A condition on an object which must hold for the transaction to commit.

Returns

this transaction for chaining

delete(*objs, condition=None) bloop.transactions.WriteTransaction[source]

Add one or more objects to be deleted in this transaction.

At most 10 items can be checked, saved, or deleted in the same transaction. The same idempotency token will be used for a single prepared transaction, which allows you to safely call commit on the PreparedCommit object multiple times.

Parameters
  • objs -- Objects to add to the set that are deleted in this transaction.

  • condition -- A condition for these objects which must hold for the transaction to commit.

Returns

this transaction for chaining

prepare()

Create a new PreparedTransaction that can be committed.

This is called automatically when exiting the transaction as a context:

>>> engine = Engine()
>>> tx = WriteTransaction(engine)
>>> prepared = tx.prepare()
>>> prepared.commit()

# automatically calls commit when exiting
>>> with WriteTransaction(engine) as tx:
...     # modify the transaction here
...     pass
>>> # tx commits here
Returns

save(*objs, condition=None) bloop.transactions.WriteTransaction[source]

Add one or more objects to be saved in this transaction.

At most 10 items can be checked, saved, or deleted in the same transaction. The same idempotency token will be used for a single prepared transaction, which allows you to safely call commit on the PreparedCommit object multiple times.

Parameters
  • objs -- Objects to add to the set that are updated in this transaction.

  • condition -- A condition for these objects which must hold for the transaction to commit.

Returns

this transaction for chaining

Conditions

The only public class the conditions system exposes is the empty condition, Condition. The rest of the conditions system is baked into Column and consumed by the various Engine functions like Engine.save().

This function creates a condition for any model that can be used when saving to ensure you don't overwrite an existing value. The model's Meta attribute describes the required keys:

from bloop import Condition

def ensure_unique(model):
    condition = Condition()
    for key in model.Meta.keys:
        condition &= key.is_(None)
    return condition

See also

Conditions in the User Guide describes the possible conditions, and when and how to use them.

class bloop.conditions.Condition[source]

An empty condition.

combined = Condition()

for each_condition in get_conditions_list():
    combined &= each_condition

if not combined:
    print("Conditions list only had empty conditions, or no conditions")

Useful for iteratively building complex conditions, you can concatenate multiple conditions together without finding an initial condition in a possibly-empty list.

An empty condition is equivalent to omitting a condition:

engine.save(some_user)
engine.save(some_user, condition=Condition())

Signals

bloop.signals.before_create_table

Sent by engine before a model's backing table is created.

# Nonce table names to avoid testing collisions
@before_create_table.connect
def apply_table_nonce(_, model, **__):
    nonce = datetime.now().isoformat()
    model.Meta.table_name += "-test-{}".format(nonce)
Parameters
  • engine -- Engine creating the model's table.

  • model -- The BaseModel class to create a table for.

bloop.signals.object_loaded

Sent by engine after an object is loaded from DynamoDB.

# Track objects "checked out" locally
local_objects = {}

def key(obj):
    meta = obj.Meta
    return (getattr(obj, k.name) for k in meta.keys)

@object_loaded.connect
def on_loaded(_, obj, **__):
    local_objects[key(obj)] = obj
Parameters
  • engine -- The Engine that loaded the object.

  • obj -- The BaseModel loaded from DynamoDB.

bloop.signals.object_saved

Sent by engine after an object is saved to DynamoDB.

# Track objects "checked out" locally
local_objects = {}

def key(obj):
    meta = obj.Meta
    return (getattr(obj, k.name) for k in meta.keys)

@object_saved.connect
def on_saved(_, obj, **__):
    local_objects.pop(key(obj))
Parameters
  • engine -- The Engine that saved the object.

  • obj -- The BaseModel saved to DynamoDB.

bloop.signals.object_deleted

Sent by engine after an object is deleted from DynamoDB.

# Track objects "checked out" locally
local_objects = {}

def key(obj):
    meta = obj.Meta
    return (getattr(obj, k.name) for k in meta.keys)

@object_deleted.connect
def on_deleted(_, obj, **__):
    local_objects.pop(key(obj))
Parameters
  • engine -- The Engine that deleted the object.

  • obj -- The BaseModel deleted from DynamoDB.

bloop.signals.object_modified

Sent by column after an object's attribute is set or deleted.

This is sent on __set__ if an exception isn't raised, and on __del__ regardless of exceptions.

# Account balance can't be less than 0

@object_modified.connect
def enforce_positive_balance(_, obj, column, value, **__):
    if column is Account.balance and value < 0:
        # Danger: careful around infinite loops!
        setattr(obj, column.name, 0)
Parameters
  • column -- The Column that corresponds to the modified attribute.

  • obj -- The BaseModel that was modified.

  • value -- The new value of the attribute.

bloop.signals.model_bound

Sent by engine after a model is bound to that Engine.

This signal is sent after model_validated.

Parameters
  • engine -- The Engine that the model was bound to.

  • model -- The BaseModel class that was bound.

bloop.signals.model_created

Sent by None after a new model is defined.

While this signal is sent when the BaseModel is created, the BaseModel is created so early in Bloop's import order that no handlers will be connected when it occurs.

You can manually send the BaseModel through your handler with:

model_created.send(model=BaseModel)
Parameters

model -- The subclass of BaseModel that was created.

bloop.signals.model_validated

Sent by engine after a model is validated.

This signal is sent before model_bound.

Parameters
  • engine -- The Engine that validated the model.

  • model -- The BaseModel class that was validated.

Exceptions

Except to configure sessions, Bloop aims to completely abstract the boto3/botocore layers. If you encounter an exception from either boto3 or botocore, please open an issue. Bloop's exceptions are broadly divided into two categories: unexpected state, and invalid input.

To catch any exception from Bloop, use BloopException:

try:
    engine.stream(User, "latest")
except BloopException:
    print("Didn't expect an exception, but Bloop raised:")
    raise
class bloop.exceptions.BloopException[source]

An unexpected exception occurred.

Unexpected state

These are exceptions that you should be ready to handle in the normal course of using DynamoDB. For example, failing to load objects will raise MissingObjects, while conditional operations may fail with :exc`~bloop.exceptions.ConstraintViolation`.

class bloop.exceptions.ConstraintViolation[source]

A required condition was not met.

class bloop.exceptions.MissingObjects(*args, objects=None)[source]

Some objects were not found.

objects: list

The objects that failed to load

class bloop.exceptions.RecordsExpired[source]

The requested stream records are beyond the trim horizon.

class bloop.exceptions.ShardIteratorExpired[source]

The shard iterator is past its expiration date.

class bloop.exceptions.TableMismatch[source]

The expected and actual tables for this Model do not match.

class bloop.exceptions.TransactionCanceled[source]

The transaction was canceled.

A WriteTransaction is canceled when:
  • A condition in one of the condition expressions is not met.

  • A table in the TransactWriteItems request is in a different account or region.

  • More than one action in the TransactWriteItems operation targets the same item.

  • There is insufficient provisioned capacity for the transaction to be completed.

  • An item size becomes too large (larger than 400 KB), or a local secondary index (LSI) becomes too large, or a similar validation error occurs because of changes made by the transaction.

A ReadTransaction is canceled when:
  • There is an ongoing TransactGetItems operation that conflicts with a concurrent PutItem, UpdateItem, DeleteItem or TransactWriteItems request.

  • A table in the TransactGetItems request is in a different account or region.

  • There is insufficient provisioned capacity for the transaction to be completed.

  • There is a user error, such as an invalid data format.

See also

The API reference for TransactionCanceledException

class bloop.exceptions.TransactionTokenExpired[source]

The transaction's tx_id (ClientRequestToken) was first used more than 10 minutes ago

Bad Input

These are thrown when an option is invalid or missing, such as forgetting a key condition for a query, or trying to use an unknown projection type.

class bloop.exceptions.InvalidCondition[source]

This is not a valid Condition.

class bloop.exceptions.InvalidModel[source]

This is not a valid Model.

class bloop.exceptions.InvalidPosition[source]

This is not a valid position for a Stream.

class bloop.exceptions.InvalidSearch[source]

The search was malformed

class bloop.exceptions.InvalidShardIterator[source]

This is not a valid shard iterator.

class bloop.exceptions.InvalidStream[source]

This is not a valid stream definition.

class bloop.exceptions.InvalidTemplate[source]

This is not a valid template string.

class bloop.exceptions.MissingKey[source]

The instance must provide values for its key columns.

Extensions

DateTime
class DateTime(timezone=datetime.timezone.utc)

Drop-in replacement for DateTime. Support for arrow, delorean, and pendulum:

from bloop.ext.arrow import DateTime
from bloop.ext.delorean import DateTime
from bloop.ext.pendulum import DateTime
backing_type = "S"
python_type

Depending on where it's imported from, one of:

timezone = tzinfo

The timezone that values loaded from DynamoDB will use. Note that DateTimes are always stored in DynamoDB according to FIXED_ISO8601_FORMAT.

Timestamp
class Timestamp(timezone=datetime.timezone.utc)

Drop-in replacement for Timestamp. Support for arrow, delorean, and pendulum:

from bloop.ext.arrow import Timestamp
from bloop.ext.delorean import Timestamp
from bloop.ext.pendulum import Timestamp
backing_type = "N"
python_type

Depending on where it's imported from, one of:

timezone = tzinfo

The timezone that values loaded from DynamoDB will use.

Internal

In addition to documenting internal classes, this section describes complex internal systems (such as Streams, tracking modified columns via weakrefs) and specific parameters and error handling that Bloop employs when talking to DynamoDB (such as SessionWrapper's error inspection, and partial table validation).

SessionWrapper

class bloop.session.SessionWrapper(dynamodb=None, dynamodbstreams=None)[source]

Provides a consistent interface to DynamoDb and DynamoDbStreams clients.

If either client is None, that client is built using boto3.client().

Parameters
  • dynamodb -- A boto3 client for DynamoDB. Defaults to boto3.client("dynamodb").

  • dynamodbstreams -- A boto3 client for DynamoDbStreams. Defaults to boto3.client("dynamodbstreams").

clear_cache()[source]

Clear all cached table descriptions.

create_table(table_name, model)[source]

Create the model's table. Returns True if the table is being created, False otherwise.

Does not wait for the table to create, and does not validate an existing table. Will not raise "ResourceInUseException" if the table exists or is being created.

Parameters
  • table_name (str) -- The name of the table to create for the model.

  • model -- The BaseModel to create the table for.

Returns

True if the table is being created, False if the table exists

Return type

bool

delete_item(item)[source]

Delete an object in DynamoDB.

Returns Optional[dict] of read attributes depending on the "ReturnValues" kwarg. Return value is None when no attributes were requested.

Parameters

item -- Unpacked into kwargs for boto3.DynamoDB.Client.delete_item().

Raises

bloop.exceptions.ConstraintViolation -- if the condition (or atomic) is not met.

describe_stream(stream_arn, first_shard=None)[source]

Wraps boto3.DynamoDBStreams.Client.describe_stream(), handling continuation tokens.

Parameters
  • stream_arn (str) -- Stream arn, usually from the model's Meta.stream["arn"].

  • first_shard (str) -- (Optional) If provided, only shards after this shard id will be returned.

Returns

All shards in the stream, or a subset if first_shard is provided.

Return type

dict

describe_table(table_name)[source]

Polls until the table is ready, then returns the first result when the table was ready.

The returned dict is standardized to ensure all fields are present, even when empty or across different DynamoDB API versions. TTL information is also inserted.

Parameters

table_name -- The name of the table to describe

Returns

The (sanitized) result of DescribeTable["Table"]

Return type

dict

enable_backups(table_name, model)[source]

Calls UpdateContinuousBackups on the table according to model.Meta["continuous_backups"]

Parameters
  • table_name -- The name of the table to enable Continuous Backups on

  • model -- The model to get Continuous Backups settings from

enable_ttl(table_name, model)[source]

Calls UpdateTimeToLive on the table according to model.Meta["ttl"]

Parameters
  • table_name -- The name of the table to enable the TTL setting on

  • model -- The model to get TTL settings from

get_shard_iterator(*, stream_arn, shard_id, iterator_type, sequence_number=None)[source]

Wraps boto3.DynamoDBStreams.Client.get_shard_iterator().

Parameters
  • stream_arn (str) -- Stream arn. Usually Shard.stream_arn.

  • shard_id (str) -- Shard identifier. Usually Shard.shard_id.

  • iterator_type (str) -- "sequence_at", "sequence_after", "trim_horizon", or "latest"

  • sequence_number --

Returns

Iterator id, valid for 15 minutes.

Return type

str

Raises

bloop.exceptions.RecordsExpired -- Tried to get an iterator beyond the Trim Horizon.

get_stream_records(iterator_id)[source]

Wraps boto3.DynamoDBStreams.Client.get_records().

Parameters

iterator_id -- Iterator id. Usually Shard.iterator_id.

Returns

Dict with "Records" list (may be empty) and "NextShardIterator" str (may not exist).

Return type

dict

Raises
load_items(items)[source]

Loads any number of items in chunks, handling continuation tokens.

Parameters

items -- Unpacked in chunks into "RequestItems" for boto3.DynamoDB.Client.batch_get_item().

query_items(request)[source]

Wraps boto3.DynamoDB.Client.query().

Response always includes "Count" and "ScannedCount"

Parameters

request -- Unpacked into boto3.DynamoDB.Client.query()

save_item(item)[source]

Save an object to DynamoDB.

Returns Optional[dict] of read attributes depending on the "ReturnValues" kwarg. Return value is None when no attributes were requested.

Parameters

item -- Unpacked into kwargs for boto3.DynamoDB.Client.update_item().

Raises

bloop.exceptions.ConstraintViolation -- if the condition (or atomic) is not met.

scan_items(request)[source]

Wraps boto3.DynamoDB.Client.scan().

Response always includes "Count" and "ScannedCount"

Parameters

request -- Unpacked into boto3.DynamoDB.Client.scan()

search_items(mode, request)[source]

Invoke query/scan by name.

Response always includes "Count" and "ScannedCount"

Parameters
  • mode (str) -- "query" or "scan"

  • request -- Unpacked into boto3.DynamoDB.Client.query() or boto3.DynamoDB.Client.scan()

transaction_read(items)[source]

Wraps boto3.DynamoDB.Client.db.transact_get_items().

Parameters

items -- Unpacked into "TransactionItems" for boto3.DynamoDB.Client.transact_get_items()

Raises

bloop.exceptions.TransactionCanceled -- if the transaction was canceled.

Returns

Dict with "Records" list

transaction_write(items, client_request_token)[source]

Wraps boto3.DynamoDB.Client.db.transact_write_items().

Parameters
  • items -- Unpacked into "TransactionItems" for boto3.DynamoDB.Client.transact_write_items()

  • client_request_token -- Idempotency token valid for 10 minutes from first use. Unpacked into "ClientRequestToken"

Raises

bloop.exceptions.TransactionCanceled -- if the transaction was canceled.

validate_table(table_name, model)[source]

Polls until a creating table is ready, then verifies the description against the model's requirements.

The model may have a subset of all GSIs and LSIs on the table, but the key structure must be exactly the same. The table must have a stream if the model expects one, but not the other way around. When read or write units are not specified for the model or any GSI, the existing values will always pass validation.

Parameters
  • table_name (str) -- The name of the table to validate the model against.

  • model -- The BaseModel to validate the table of.

Raises

bloop.exceptions.TableMismatch -- When the table does not meet the constraints of the model.

Modeling

IMeta
class bloop.models.IMeta[source]

This class exists to provide autocomplete hints for computed variables on a model's Meta object.

Subclassing IMeta is OPTIONAL and rarely necessary; it is primarily available for users writing generic code over a class of models, eg. transforms on all columns of a model or a Marshmallow adapter.

import bloop.models


class User(BaseModel):
    id = Column(String, hash_key=True)
    email = Column(String, dynamo_name="e")

    class Meta(bloop.models.IMeta):
        read_units = 500

User.Meta.co  # Pycharm renders:
              #     +---------------------------+
              #     | User.Meta.columns         |
              #     | User.Meta.columns_by_name |
              #     +---------------------------+
Index
class bloop.models.Index(*, projection, hash_key=None, range_key=None, dynamo_name=None, **kwargs)[source]

Abstract base class for GSIs and LSIs.

An index must be bound to a model by calling bind_index(meta, model), which lets the index compute projected columns, validate hash and range keys, etc.

Parameters
  • projection -- Either "keys", "all", or a list of column name or objects. Included columns will be projected into the index. Key columns are always included.

  • hash_key -- The column that the index can be queried against. Always the table hash_key for LSIs.

  • range_key -- The column that the index can be sorted on. Always required for an LSI. Default is None.

  • dynamo_name (str) -- (Optional) The index's name in in DynamoDB. Defaults to the index’s name in the model.

dynamo_name

The name of this index in DynamoDB. Defaults to the index's name.

hash_key

The column that the index can be queried against. (LSI's hash_key is always the table hash_key.)

model

The model this index is attached to.

name

The name of this index in the model. Set by bind_index() during __init_subclass__().

projection

Computed during bind_index() during __init_subclass__().

{
    "available":  # Set of columns that can be returned from a query or search.
    "included":   # Set of columns that can be used in query and scan filters.
    "mode":       # "all", "keys", or "include"
    "strict":     # False if queries and scans can fetch non-included columns
}
range_key

The column that the index can be sorted on.

__copy__()[source]

Create a shallow copy of this Index. Primarily used when initializing models that subclass other abstract models or mixins (baseless classes that contain Columns and Indexes). You can override this method to change how derived models are created:

import copy


class MyIndex(Index):
    def __copy__(self):
        new = super().__copy__()
        new.derived = True
        return new


index = MyIndex(projection="keys", hash_key="some_column")
same = copy.copy(index)
assert same.derived  # True
Returns

A shallow copy of this Index, with the model and _name attributes unset, and the computed projection invalidated.

Binding
models.bind_column(name, column, force=False, recursive=False, copy=False) bloop.models.Column

Bind a column to the model with the given name.

This method is primarily used during BaseModel.__init_subclass__, although it can be used to easily attach a new column to an existing model:

import bloop.models

class User(BaseModel):
    id = Column(String, hash_key=True)


email = Column(String, dynamo_name="e")
bound = bloop.models.bind_column(User, "email", email)
assert bound is email

# rebind with force, and use a copy
bound = bloop.models.bind_column(User, "email", email, force=True, copy=True)
assert bound is not email

If an existing index refers to this column, it will be updated to point to the new column using refresh_index(), including recalculating the index projection. Meta attributes including Meta.columns, Meta.hash_key, etc. will be updated if necessary.

If name or the column's dynamo_name conflicts with an existing column or index on the model, raises InvalidModel unless force is True. If recursive is True and there are existing subclasses of model, a copy of the column will attempt to bind to each subclass. The recursive calls will not force the bind, and will always use a new copy. If copy is True then a copy of the provided column is used. This uses a shallow copy via __copy__().

Parameters
  • model -- The model to bind the column to.

  • name -- The name to bind the column as. In effect, used for setattr(model, name, column)

  • column -- The column to bind to the model.

  • force -- Unbind existing columns or indexes with the same name or dynamo_name. Default is False.

  • recursive -- Bind to each subclass of this model. Default is False.

  • copy -- Use a copy of the column instead of the column directly. Default is False.

Returns

The bound column. This is a new column when copy is True, otherwise the input column.

models.bind_index(name, index, force=False, recursive=True, copy=False) bloop.models.Index

Bind an index to the model with the given name.

This method is primarily used during BaseModel.__init_subclass__, although it can be used to easily attach a new index to an existing model:

import bloop.models

class User(BaseModel):
    id = Column(String, hash_key=True)
    email = Column(String, dynamo_name="e")


by_email = GlobalSecondaryIndex(projection="keys", hash_key="email")
bound = bloop.models.bind_index(User, "by_email", by_email)
assert bound is by_email

# rebind with force, and use a copy
bound = bloop.models.bind_index(User, "by_email", by_email, force=True, copy=True)
assert bound is not by_email

If name or the index's dynamo_name conflicts with an existing column or index on the model, raises InvalidModel unless force is True. If recursive is True and there are existing subclasses of model, a copy of the index will attempt to bind to each subclass. The recursive calls will not force the bind, and will always use a new copy. If copy is True then a copy of the provided index is used. This uses a shallow copy via __copy__().

Parameters
  • model -- The model to bind the index to.

  • name -- The name to bind the index as. In effect, used for setattr(model, name, index)

  • index -- The index to bind to the model.

  • force -- Unbind existing columns or indexes with the same name or dynamo_name. Default is False.

  • recursive -- Bind to each subclass of this model. Default is False.

  • copy -- Use a copy of the index instead of the index directly. Default is False.

Returns

The bound index. This is a new column when copy is True, otherwise the input index.

models.refresh_index(index) None

Recalculate the projection, hash_key, and range_key for the given index.

Parameters
  • meta -- model.Meta to find columns by name

  • index -- The index to refresh

models.unbind(name=None, dynamo_name=None) None

Unconditionally remove any columns or indexes bound to the given name or dynamo_name.

import bloop.models


class User(BaseModel):
    id = Column(String, hash_key=True)
    email = Column(String, dynamo_name="e")
    by_email = GlobalSecondaryIndex(projection="keys", hash_key=email)


for dynamo_name in ("id", "e", "by_email"):
    bloop.models.unbind(User.Meta, dynamo_name=dynamo_name)

assert not User.Meta.columns
assert not User.Meta.indexes
assert not User.Meta.keys

Warning

This method does not pre- or post- validate the model with the requested changes. You are responsible for ensuring the model still has a hash key, that required columns exist for each index, etc.

Parameters
  • meta -- model.Meta to remove the columns or indexes from

  • name -- column or index name to unbind by. Default is None.

  • dynamo_name -- column or index name to unbind by. Default is None.

Types

DynamicType
class bloop.types.DynamicType[source]

Dynamically dumps a value based on its python type.

This is used by DynamicList, DynamicMap to handle path resolution before the value for an arbitrary path is known. For example, given the following model:

class UserUpload(BaseModel):
    id = Column(String, hash_key=True)
    doc = Column(DynamicMap)

And an instance as follows:

u = UserUpload(id="numberoverzero")
u.doc = {
    "foo": ["bar", {0: "a", 1: "b"}, True]
}

The renderer must know a type for UserUpload.doc["foo"][1][0] before the value is provided. An instance of this type will return itself for any value during __getitem__, and then inspects the value type during _dump to create the correct simple type.

Because DynamicType requires access to the DynamoDB type annotation, you must call _load and _dump, as dynamo_load and dynamo_dump can't be implemented. For example:

DynamicType.i._load({"S": "2016-08-09T01:16:25.322849+00:00"})
    -> "2016-08-09T01:16:25.322849+00:00"
DynamicType.i._load({"N": "3.14"}) -> Decimal('3.14')

DynamicType.i._dump([1, True, "f"])
    -> {"L": [{"N": "1"}, {"BOOL": true}, {"S": "f"}]}
DynamicType.i._dump({b"1", b"2"}) -> {"BS": ["MQ==", b"Mg=="]}
i

Singleton instance of the class.

backing_type = None
python_type = None

Actions

class bloop.actions.Action(action_type: bloop.actions.ActionType, value)[source]

Encapsulates an update value and how Dynamo should apply the update.

Generally, you will only need to use the Action class if you are updating an atomic counter (ADD) or making additions and deletions from a set (ADD, DELETE).

You do not need to use an Action for SET or REMOVE updates.

>>> import bloop.actions
>>> from my_models import Website, User
>>> user = User()
>>> website = Website()
# SET and REMOVE don't need an explicit action
>>> user.verified = True
>>> del user.pw_hash
# ADD and DELETE need explicit actions
>>> website.view_count = bloop.actions.add(1)
>>> website.remote_addrs = bloop.actions.delete({"::0", "localhost"})
class bloop.actions.ActionType(value)[source]

Represents how Dynamo should apply an update.

Add = ('ADD', '{name_ref.name} {value_ref.name}', False)
Delete = ('DELETE', '{name_ref.name} {value_ref.name}', False)
Remove = ('REMOVE', '{name_ref.name}', True)
Set = ('SET', '{name_ref.name}={value_ref.name}', True)
new_action(value) bloop.actions.Action[source]

Convenience function to instantiate an Action with this type

render(name_ref, value_ref)[source]

name_ref, value_ref should be instances of bloop.conditions.Reference or None

bloop.actions.unwrap(x: Union[bloop.actions.Action, Any]) Any[source]

return an action's inner value

bloop.actions.wrap(x: Any) bloop.actions.Action[source]

return an action: REMOVE if x is None else SET

Searching

PreparedSearch
class bloop.search.PreparedSearch[source]

Mutable search object.

Creates SearchModelIterator objects which can be used to iterate the results of a query or search multiple times.

prepare(engine=None, mode=None, model=None, index=None, key=None, filter=None, projection=None, consistent=None, forward=None, parallel=None)[source]

Validates the search parameters and builds the base request dict for each Query/Scan call.

SearchIterator
class bloop.search.SearchIterator(*, session, model, index, request, projected)[source]

Reusable search iterator.

Parameters
  • session -- SessionWrapper to make Query, Scan calls.

  • model -- BaseModel for repr only.

  • index -- Index to search, or None.

  • request (dict) -- The base request dict for each search.

  • projected (set) -- Set of Column that should be included in each result.

all()[source]

Eagerly load all results and return a single list. If there are no results, the list is empty.

Returns

A list of results.

property count

Number of items that have been loaded from DynamoDB so far, including buffered items.

property exhausted

True if there are no more results.

first()[source]

Return the first result. If there are no results, raises ConstraintViolation.

Returns

The first result.

Raises

bloop.exceptions.ConstraintViolation -- No results.

move_to(token)[source]

Restore an iterator to the state stored in a token. This will reset all iterator state, including count, scanned, and exhausted properties.

Parameters

token -- a SearchIterator.token

one()[source]

Return the unique result. If there is not exactly one result, raises ConstraintViolation.

Returns

The unique result.

Raises

bloop.exceptions.ConstraintViolation -- Not exactly one result.

reset()[source]

Reset to the initial state, clearing the buffer and zeroing count and scanned.

property scanned

Number of items that DynamoDB evaluated, before any filter was applied.

property token

JSON-serializable representation of the current SearchIterator state.

Use iterator.move_to(token) to move an iterator to this position.

Implementations will always include a "ExclusiveStartKey" key but may include additional metadata. The iterator's count and scanned values are not preserved.

Returns

Iterator state as a json-friendly dict

SearchModelIterator
class bloop.search.SearchModelIterator(*, engine, model, index, request, projected)[source]

Reusable search iterator that unpacks result dicts into model instances.

Parameters
  • engine -- Engine to unpack models with.

  • model -- BaseModel being searched.

  • index -- Index to search, or None.

  • request (dict) -- The base request dict for each search call.

  • projected (set) -- Set of Column that should be included in each result.

all()

Eagerly load all results and return a single list. If there are no results, the list is empty.

Returns

A list of results.

property count

Number of items that have been loaded from DynamoDB so far, including buffered items.

property exhausted

True if there are no more results.

first()

Return the first result. If there are no results, raises ConstraintViolation.

Returns

The first result.

Raises

bloop.exceptions.ConstraintViolation -- No results.

move_to(token)

Restore an iterator to the state stored in a token. This will reset all iterator state, including count, scanned, and exhausted properties.

Parameters

token -- a SearchIterator.token

one()

Return the unique result. If there is not exactly one result, raises ConstraintViolation.

Returns

The unique result.

Raises

bloop.exceptions.ConstraintViolation -- Not exactly one result.

reset()

Reset to the initial state, clearing the buffer and zeroing count and scanned.

property scanned

Number of items that DynamoDB evaluated, before any filter was applied.

property token

JSON-serializable representation of the current SearchIterator state.

Use iterator.move_to(token) to move an iterator to this position.

Implementations will always include a "ExclusiveStartKey" key but may include additional metadata. The iterator's count and scanned values are not preserved.

Returns

Iterator state as a json-friendly dict

Streaming

Coordinator
class bloop.stream.coordinator.Coordinator(*, session, stream_arn)[source]

Encapsulates the shard-level management for a whole Stream.

Parameters
  • session (SessionWrapper) -- Used to make DynamoDBStreams calls.

  • stream_arn (str) -- Stream arn, usually from the model's Meta.stream["arn"].

advance_shards()[source]

Poll active shards for records and insert them into the buffer. Rotate exhausted shards.

Returns immediately if the buffer isn't empty.

heartbeat()[source]

Keep active shards with "trim_horizon", "latest" iterators alive by advancing their iterators.

move_to(position)[source]

Set the Coordinator to a specific endpoint or time, or load state from a token.

Parameters

position -- "trim_horizon", "latest", datetime, or a Coordinator.token

remove_shard(shard, drop_buffered_records=False)[source]

Remove a Shard from the Coordinator. Drops all buffered records from the Shard.

If the Shard is active or a root, it is removed and any children promoted to those roles.

Parameters
  • shard -- The shard to remove :type shard: Shard

  • drop_buffered_records (bool) -- Whether records from this shard should be removed. Default is False.

property token

JSON-serializable representation of the current Stream state.

Use Engine.stream(YourModel, token) to create an identical stream, or stream.move_to(token) to move an existing stream to this position.

Returns

Stream state as a json-friendly dict

Return type

dict

Shard
class bloop.stream.shard.Shard(*, stream_arn, shard_id, iterator_id=None, iterator_type=None, sequence_number=None, parent=None, session=None)[source]

Encapsulates the record-level iterator management for a single Shard.

Parameters
  • stream_arn (str) -- Stream arn, usually from the model's Meta.stream["arn"].

  • shard_id (str) -- Shard id, usually from a DescribeStream call.

  • iterator_id (str) -- (Optional) An existing Shard iterator id. Default is None.

  • iterator_type (str) -- (Optional) The shard's iterator type, usually when loading from a token. One of "trim_horizon", "at_sequence", "after_sequence", or "latest". Default is None.

  • sequence_number (str) -- (Optional) SequenceNumber for an "at_sequence" or "after_sequence" iterator type. Default is None.

  • parent (Shard) -- (Optional) This shard's parent. Default is None.

  • session (SessionWrapper) -- Used to make DynamoDBStreams calls.

property exhausted

True if the shard is closed and there are no additional records to get.

get_records()[source]

Get the next set of records in this shard. An empty list doesn't guarantee the shard is exhausted.

Returns

A list of reformatted records. May be empty.

jump_to(*, iterator_type, sequence_number=None)[source]

Move to a new position in the shard using the standard parameters to GetShardIterator.

Parameters
  • iterator_type (str) -- "trim_horizon", "at_sequence", "after_sequence", "latest"

  • sequence_number (str) -- (Optional) Sequence number to use with at/after sequence. Default is None.

load_children()[source]

If the Shard doesn't have any children, tries to find some from DescribeStream.

If the Shard is open this won't find any children, so an empty response doesn't mean the Shard will never have children.

seek_to(position)[source]

Move the Shard's iterator to the earliest record after the datetime time.

Returns the first records at or past position. If the list is empty, the seek failed to find records, either because the Shard is exhausted or it reached the HEAD of an open Shard.

Parameters

position (datetime) -- The position in time to move to.

Returns

A list of the first records found after position. May be empty.

shard_id

The shard id is set once on creation and never changes

stream_arn

The stream arn is set once on creation and never changes

property token

JSON-serializable representation of the current Shard state.

The token is enough to rebuild the Shard as part of rebuilding a Stream.

Returns

Shard state as a json-friendly dict

Return type

dict

walk_tree()[source]

Generator that yields each Shard by walking the shard's children in order.

RecordBuffer
class bloop.stream.buffer.RecordBuffer[source]

Maintains a total ordering for records across any number of shards.

Methods are thin wrappers around heapq. Buffer entries have the form:

where total_ordering is a tuple of (created_at, sequence_number, monotonic_clock) created from each record as it is inserted.

clear()[source]

Drop the entire buffer.

clock()[source]

Returns a monotonically increasing integer.

Do not rely on the clock using a fixed increment.

>>> buffer = RecordBuffer()
>>> buffer.clock()
3
>>> buffer.clock()
40
>>> buffer.clock()
41
>>> buffer.clock()
300
Returns

A unique clock value guaranteed to be larger than every previous value

Return type

int

peek()[source]

A pop() without removing the (record, shard) from the buffer.

Returns

Oldest (record, shard) tuple.

pop()[source]

Pop the oldest (lowest total ordering) record and the shard it came from.

Returns

Oldest (record, shard) tuple.

push(record, shard)[source]

Push a new record into the buffer

Parameters
  • record (dict) -- new record

  • shard (Shard) -- Shard the record came from

push_all(record_shard_pairs)[source]

Push multiple (record, shard) pairs at once, with only one heapq.heapify() call to maintain order.

Parameters

record_shard_pairs -- list of (record, shard) tuples (see push()).

Transactions

class bloop.transactions.Transaction(engine)[source]

Holds a collection of transaction items to be rendered into a PreparedTransaction.

If used as a context manager, calls prepare() and commit() when the outermost context exits.

>>> engine = Engine()
>>> tx = Transaction(engine)
>>> tx.mode = "w"
>>> p1 = tx.prepare()
>>> p2 = tx.prepare()  # different instances

>>> with tx:
...     pass
>>> #  tx.prepare().commit() is called here
prepare()[source]

Create a new PreparedTransaction that can be committed.

This is called automatically when exiting the transaction as a context:

>>> engine = Engine()
>>> tx = WriteTransaction(engine)
>>> prepared = tx.prepare()
>>> prepared.commit()

# automatically calls commit when exiting
>>> with WriteTransaction(engine) as tx:
...     # modify the transaction here
...     pass
>>> # tx commits here
Returns

class bloop.transactions.PreparedTransaction[source]

Transaction that can be committed once or more.

Usually created from a Transaction instance.

commit() None[source]

Commit the transaction with a fixed transaction id.

A read transaction can call commit() any number of times, while a write transaction can only use the same tx_id for 10 minutes from the first call.

first_commit_at: Optional[datetime.datetime] = None

When the transaction was first committed at. A prepared write transaction can only call commit again within 10 minutes of its first commit. This is None until commit() is called at least once.

prepare(engine, mode, items) None[source]

Create a unique transaction id and dumps the items into a cached request object.

tx_id: str

Unique id used as the "ClientRequestToken" for write transactions. This is generated but not sent with a read transaction, since reads are not idempotent.

class bloop.transactions.TxItem(type: bloop.transactions.TxType, obj: Any, condition: Optional[Any])[source]

Includes the type, an object, and its condition settings.

The common way to construct an item is through the new method:

>>> get_item = TxItem.new("get", some_obj)
>>> save_item = TxItem.new("save", some_obj)
property condition

An optional condition that constrains an update

property is_update

Whether this should render an "UpdateExpression" in the TransactItem

property obj

The object that will be modified, persisted, or referenced in a transaction

property should_render_obj

Whether the object values should be rendered in the TransactItem

property type

How this item will be used in a transaction

class bloop.transactions.TxType(value)[source]

Enum whose value is the wire format of its name

classmethod by_alias(name: str) bloop.transactions.TxType[source]

get a type by the common bloop operation name: get/check/delete/save

Conditions

ObjectTracking
class bloop.conditions.ObjectTracking(dict=None)[source]
ReferenceTracker
class bloop.conditions.ReferenceTracker(engine)[source]

De-dupes reference names for the same path segments and generates unique placeholders for all names, paths, and values. The reference tracker can also forget references if, for example, a value fails to render but the rest of the condition should be left intact. This is primarily used when a value is unexpectedly dumped as None, or an expression uses another column as a value.

Parameters

engine (Engine) -- Used to dump column values for value refs.

any_ref(*, column, value=<Sentinel[missing]>, inner=False) bloop.conditions.Reference[source]

Returns a NamedTuple of (name, type, value) for any type of reference.

# Name ref
>>> tracker.any_ref(column=User.email)
Reference(name='email', type='name', value=None)

# Value ref
>>> tracker.any_ref(column=User.email, value='user@domain')
Reference(name='email', type='value', value={'S': 'user@domain'})

# Passed as value ref, but value is another column
>>> tracker.any_ref(column=User.email, value=User.other_column)
Reference(name='other_column', type='name', value=None)
Parameters
  • column (ComparisonMixin) -- The column to reference. If value is None, this will render a name ref for this column.

  • value -- (Optional) If provided, this is likely a value ref. If value is also a column, this will render a name ref for that column (not the column parameter).

  • inner (bool) -- (Optional) True if this is a value ref and it should be dumped through a collection's inner type, and not the collection type itself. Default is False.

Returns

A name or value reference

Return type

bloop.conditions.Reference

pop_refs(*refs)[source]

Decrement the usage of each ref by 1.

If this was the last use of a ref, remove it from attr_names or attr_values.

ConditionRenderer
class bloop.conditions.ConditionRenderer(engine)[source]

Renders collections of BaseCondition into DynamoDB's wire format for expressions, including:

  • "ConditionExpression" -- used in conditional operations

  • "FilterExpression" -- used in queries and scans to ignore results that don't match the filter

  • "KeyConditionExpressions" -- used to describe a query's hash (and range) key(s)

  • "ProjectionExpression" -- used to include a subset of possible columns in the results of a query or scan

  • "UpdateExpression" -- used to save objects

Normally, you will only need to call render() to handle any combination of conditions. You can also call each individual render_* function to control how multiple conditions of each type are applied.

You can collect the rendered condition at any time through rendered.

>>> renderer.render(obj=user, atomic=True)
>>> renderer.output
{'ConditionExpression': '((#n0 = :v1) AND (attribute_not_exists(#n2)) AND (#n4 = :v5))',
 'ExpressionAttributeNames': {'#n0': 'age', '#n2': 'email', '#n4': 'id'},
 'ExpressionAttributeValues': {':v1': {'N': '3'}, ':v5': {'S': 'some-user-id'}}}
Parameters

engine (Engine) -- Used to dump values in conditions into the appropriate wire format.

property output

The wire format for all conditions that have been rendered. A new ConditionRenderer should be used for each operation.

render(obj=None, condition=None, update=False, filter=None, projection=None, key=None)[source]

Main entry point for rendering multiple expressions. All parameters are optional, except obj when atomic or update are True.

Parameters
  • obj -- (Optional) An object to render an atomic condition or update expression for. Required if update or atomic are true. Default is False.

  • condition (BaseCondition) -- (Optional) Rendered as a "ConditionExpression" for a conditional operation. If atomic is True, the two are rendered in an AND condition. Default is None.

  • update (bool) -- (Optional) True if an "UpdateExpression" should be rendered for obj. Default is False.

  • filter (BaseCondition) -- (Optional) A filter condition for a query or scan, rendered as a "FilterExpression". Default is None.

  • projection (set Column) -- (Optional) A set of Columns to include in a query or scan, rendered as a "ProjectionExpression". Default is None.

  • key (BaseCondition) -- (Optional) A key condition for queries, rendered as a "KeyConditionExpression". Default is None.

Built-in Conditions
class bloop.conditions.BaseCondition(operation, *, column=None, values=None)[source]
class bloop.conditions.AndCondition(*values)[source]
class bloop.conditions.OrCondition(*values)[source]
class bloop.conditions.NotCondition(value)[source]
class bloop.conditions.ComparisonCondition(operation, column, value)[source]
class bloop.conditions.BeginsWithCondition(column, value)[source]
class bloop.conditions.BetweenCondition(column, lower, upper)[source]
class bloop.conditions.ContainsCondition(column, value)[source]
class bloop.conditions.InCondition(column, values)[source]
class bloop.conditions.ComparisonMixin[source]
is_(value)

Return self==value.

is_not(value)

Return self!=value.

Utilities

class bloop.util.Sentinel(name, *args, **kwargs)[source]

Simple string-based placeholders for missing or special values.

Names are unique, and instances are re-used for the same name:

>>> from bloop.util import Sentinel
>>> empty = Sentinel("empty")
>>> empty
<Sentinel[empty]>
>>> same_token = Sentinel("empty")
>>> empty is same_token
True

This removes the need to import the same signal or placeholder value everywhere; two modules can create Sentinel("some-value") and refer to the same object. This is especially helpful where None is a possible value, and so can't be used to indicate omission of an optional parameter.

Implements __repr__ to render nicely in function signatures. Standard object-based sentinels:

>>> missing = object()
>>> def some_func(optional=missing):
...     pass
...
>>> help(some_func)
Help on function some_func in module __main__:

some_func(optional=<object object at 0x7f0f3f29e5d0>)

With the Sentinel class:

>>> from bloop.util import Sentinel
>>> missing = Sentinel("Missing")
>>> def some_func(optional=missing):
...     pass
...
>>> help(some_func)
Help on function some_func in module __main__:

some_func(optional=<Sentinel[Missing]>)
Parameters

name (str) -- The name for this sentinel.

Implementation Details

Models must be Hashable

By default python makes all user classes are hashable:

>>> class Dict: pass
>>> hash(Dict())
8771845190811

Classes are unhashable in two cases:

  1. The class declares __hash__ = None.

  2. The class implements __eq__ but not __hash__

In either case, during __init_subclass__(), the ensure_hash() function will manually locate the closest __hash__ method in the model's base classes:

if getattr(cls, "__hash__", None) is not None:
    return
for base in cls.__mro__:
    hash_fn = getattr(base, "__hash__")
    if hash_fn:
        break
else:
    hash_fn = object.__hash__
cls.__hash__ = hash_fn

This is required because python doesn't provide a default hash method when __eq__ is implemented, and won't fall back to a parent class's definition:

>>> class Base:
...     def __hash__(self):
...         print("Base.__hash__")
...         return 0
...
>>> class Derived(Base):
...     def __eq__(self, other):
...         return True
...

>>> hash(Base())
Base.__hash__
>>> hash(Derived())
TypeError: unhashable type: 'Derived'
Stream Ordering Guarantees

The DynamoDB Streams API exposes a limited amount of temporal information and few options for navigating within a shard. Due to these constraints, it was hard to reduce the API down to a single __next__ call without compromising performance or ordering.

The major challenges described below include:

  • Creating a plausible total ordering across shards

  • Managing an iterator:

    • Refreshing expired iterators without data loss

    • Preventing low-volume iterators without sequence numbers from expiring

    • Promoting children when a shard runs out of records

    • Distinguishing open shards from gaps between records

  • Managing multiple shards:

    • Mapping stream "trim_horizon" and "latest" to a set of shards

    • Buffering records from multiple shards and applying a total ordering

  • Loading and saving tokens:

    • Simplifying an entire stream into a human-readable json blob

    • Pruning old shards when loading

    • Inserting new shards when loading

    • Resolving TrimmedDataAccessException for old shards

The following sections use a custom notation to describe shards and records.

Sn and Rn represent shards and records, where n is an integer:

R11, R13, R32  # In general, RnX comes from Sn
S1, S12, S23   # In general, SnX is a child of Sn

< represents chronological ordering between records:

R12 < R13  # In general, RX < RY when X < Y

=> represents parent/child relationships between shards:

S1 => {}          # S1 has no children
S2 => S21         # S2 has one child
# In general, SnX and SnY are adjacent children of Sn
S3 => {S31, S32}

~ represents two shards that are not within the same lineage:

S1 ~ S2  # Not related

S1 => S12 => S13; S4 => S41
# Both child shards, but of different lineages
S12 ~ S41

: represents a set of records from a single shard:

S1: R11, R12   # no guaranteed order
S2: R23 < R24  # guaranteed order
Shards and Lineage

DynamoDB only offers three guarantees for chronological ordering:

  1. All records within a single Shard.

  2. All parent shard records are before all child shard records.

  3. Changes to the same hash will always go to the same shard. When a parent splits, further changes to that hash will go to only one child of that shard, and always the same child.

Given the following:

S1 ~ S2
S1: R11 < R12 < R13
R2: R24 < R25 < R26

The first rule offers no guarantees between R1x and R2x for any x.

Given the following:

S1 => {S12, S13}
S1:  R111 < R112
S12: R124 < R125
S13: R136 < R137

The second rule guarantees both of the following:

R111 < R112 < R124 < R125
R111 < R112 < R136 < R137

but does not guarantee any ordering between R12x and R13x for any x.

Given the following:

S1 => {S2, S3}
R40, R41, R42  # all modify the same hash key
R5, R7, R9     # modify different hash keys

S1: R40, R5

The third rule guarantees that R41 and R42 will both be in either S2 or S3. Meanwhile, it offers no guarantee about where R7 and R9 will be. Both of the following are possible:

S1: R40, R5
S2: R41, R42, R7
S3: R9

S1: R40, R5
S2: R7, R9
S3: R41, R42

But the following is not possible:

S1: R40, R5
S2: R41, R7
S3: R42, R9
Merging Shards

Low-throughput tables will only have a single open shard at any time, and can rely on the first and second guarantees above for rebuilding the exact order of changes to the table.

For high throughput tables, there can be more than one root shard, and each shard lineage can have more than one child open at once. In this case, Bloop's streaming interface can't guarantee ordering for all records in the stream, because there is no absolute chronological ordering across a partitioned table. Instead, Bloop will fall back to a total ordering scheme that uses each record's ApproximateCreationDateTime and, when two records have the same creation time, a monotonically increasing integral clock to break ties.

Consider the following stream:

S0 => {S1, S2}
S0: R00
S1: R11 < R12 < R13
S2: R24 < R25 < R26

Where each record has the following (simplified) creation times:

Record

ApproximateCreationDateTime

R00

7 hours ago

R11

6 hours ago

R12

4 hours ago

R13

2 hours ago

R24

4 hours ago

R25

3 hours ago

R26

3 hours ago

Bloop performs the following in one step:

  1. The second guarantee says all records in S0 are before records in that shard's children:

    R00 < (R11, R12, R13, R24, R25, R26)
    
  2. The first guarantee says all records in the same shard are ordered:

    R00 < ((R11 < R12 < R13), (R24 < R25 < R26)
    
  3. Then, ApproximateCreationDateTime is used to partially merge S1 and S2 records:

    R00 < R11 < (R12, R24) < (R25 < R26) < R13
    
  4. There were still two collisions after using ApproximateCreationDateTime: R12, R24 and R25, R26.

    1. To resolve (R12, R24) Bloop breaks the tie with an incrementing clock, and assigns R12 < R24.

    2. (R25, R26) is resolved because the records are in the same shard.

The final ordering is:

R00 < R11 < R12 < R24 < R25 < R26 < R13
Record Gaps

Bloop initially performs up to 5 "catch up" calls to GetRecords when advancing an iterator. If a GetRecords call returns a NextShardIterator but no records it's either due to being nearly caught up to "latest" in an open shard, or from traversing a period of time in the shard with no activity. Endlessly polling until a record comes back would cause every open shard to hang for up to 4 hours, while only calling GetRecords once could desynchronize one shard's iterator from others.

By retrying up to 5 times on an empty GetRecords response (that still has a NextShardIterator) Bloop is confident that any gaps in the shard have been advanced. This is because it takes approximately 4-5 calls to traverse an empty shard completely. In other words, the 6th empty response almost certainly indicates that the iterator is caught up to latest in an open shard, and it's safe to cut back to one call at a time.

Why only 5 calls?

This number came from extensive testing which compared the number of empty responses returned for shards with various activity cadences. It's reasonable to assume that this number would only decrease with time, as advances in software and hardware would enable DynamoDB to cover larger periods in time with the same time investment. Because each call from a customer incurs overhead of creating and indexing each new iterator id, as well as the usual expensive signature-based authentication, it's in DynamoDB's interest to minimize the number of calls a customer needs to traverse a sparsely populated shard.

At worst DynamoDB starts requiring more calls to fully traverse an empty shard, which could result in reordering between records in shards with vastly different activity patterns. Since the creation-time-based ordering is approximate, this doesn't relax the guarantees that Bloop's streaming interface provides.

Changing the Limit

In general you should not need to worry about this value, and leave it alone. In the unlikely case that DynamoDB does increase the number of calls required to traverse an empty shard, Bloop will be updated soon after.

If you still need to tune this value:

import bloop.stream.shard
bloop.stream.shard.CALLS_TO_REACH_HEAD = 5

The exact value of this parameter will have almost no impact on performance in high-activity streams, and there are so few shards in low-activity streams that the total cost will be on par with the other calls to set up the stream.

Versions

This document provides migration instructions for each major version, as well as the complete changelog for versions dating back to v0.9.0 from December 2015. The migration guides provide detailed examples and tips for migrating from the previous major version (excluding the 1.0.0 guide, which only covers migration from 0.9.0 and newer).

Migrating to 3.0.0

The 3.0.0 release includes two api changes from 2.4.0 that you may need to update your code to handle.

  • The atomic= kwarg to Engine.save and Engine.delete was deprecated in 2.4.0 and is removed in 3.0.0.

  • The return type of Type._dump must now be a bloop.action.Action instance, even when the value is None. This does not impact custom types that only implement dynamo_load and dynamo_dump.

atomic keyword

The atomic keyword to Engine.save and Engine.delete has been removed in favor of a user pattern. This offers a reasonable performance improvement for users that never used the atomic keyword, and addresses ambiguity related to per-row atomic vs transactional atomic operations. For context on the deprecation, see Issue #138. For the equivalent user pattern, see Snapshot Condition. To migrate your existing code, you can use the following:

# pre-3.0 code to migrate:
engine.load(some_object)
some_object.some_attr = "new value"
engine.save(some_object, atomic=True)

# post-3.0 code:

# https://bloop.readthedocs.io/en/latest/user/patterns.html#snapshot-condition
from your_patterns import snapshot

engine.load(some_object)
condition = snapshot(some_object)
some_object.some_attr = "new value"
engine.save(some_object, condition=condition)
Type._dump

Bloop now allows users to specify how a value should be applied in an UpdateExpression by wrapping a value in a bloop.actions.Action object. This is done transparently for raw values, which are interpreted as either bloop.actions.set or bloop.actions.remove. With 2.4 and to support Issue #136 you can also specify an add or delete action:

my_user.aliases = bloop.actions.add("new_alias")
my_website.views = bloop.actions.add(1)

To maintain flexibility the bloop Type class has the final say as to which action a value should use. This allows eg. the List type to take a literal [] and change the action from actions.set to actions.remove(None) to indicate that the value should be cleared. This also means your custom type could see an actions.delete and modify the value to instead be expressible in an actions.set.

If your custom types today only override dynamo_dump or dynamo_load then you don't need to do anything for this migration. However if you currently override _dump then you should update your function to (1) handle input that may be an action or not, and (2) always return an action instance. In general, you should not modify an input action and instead should return a new instance (possibly with the same action_type).

Here's the migration of the base Type._dump:

# pre-3.0 code to migrate:
def _dump(self, value, **kwargs):
    value = self.dynamo_dump(value, **kwargs)
    if value is None:
        return None
    return {self.backing_type: value}


# post-3.0 code:
from bloop import actions

def _dump(self, value, **kwargs):
    wrapped = actions.wrap(value)  # [1]
    value = self.dynamo_dump(wrapped.value, **kwargs)
    if value is None:
        return actions.wrap(None)  # [2]
    else:
        value = {self.backing_type: value}
        return wrapped.type.new_action(value)  # [3]

# [1] always wrap the input value to ensure you're working with an Action instance
# [2] returns actions.remove(None) which will remove the value like None previously did
# [3] new_action uses the **same action type** as the input.
#         If you want to always return a SET action instead use: return actions.set(value)

Migrating to 2.0.0

The 2.0.0 release includes a number of api changes and new features.

  • The largest functional change is the ability to compose models through subclassing; this is referred to as Abstract Inheritance and Mixins throughout the User Guide.

  • Python 3.6.0 is the minimum required version.

  • Meta.init now defaults to cls.__new__(cls) instead of cls.__init__(); when model instances are created as part of engine.query, engine.stream etc. these will not call your model's __init__ method. The default BaseModel.__init__ is not meant for use outside of local instantiation.

  • The Column and Index kwarg name was renamed to dynamo_name to accurately reflect how the value was used: Column(SomeType, name="foo") becomes Column(SomeType, dynamo_name="foo"). Additionally, the column and index attribute model_name was renamed to name; dynamo_name is unchanged and reflects the kwarg value, if provided.

Engine

A new Engine kwarg table_name_template can be used to modify the table name used per-engine, as documented in the new Engine Configuration section of the User Guide. Previously, you may have used the before_create_table signal as follows:

# Nonce table names to avoid testing collisions
@before_create_table.connect
def apply_table_nonce(_, model, **__):
    nonce = datetime.now().isoformat()
    model.Meta.table_name += "-test-{}".format(nonce)

This will modify the actual model's Meta.table_name, whereas the new kwarg can be used to only modify the bound table name for a single engine. The following can be expressed for a single Engine as follows:

def apply_nonce(model):
    nonce = datetime.now().isoformat()
    return f"{model.Meta.table_name}-test-{nonce}"

engine = Engine(table_name_template=apply_nonce)
Inheritance

You can now use abstract base models to more easily compose common models. For example, you may use the same id structure for classes. Previously, this would look like the following:

class User(BaseModel):
    id = Column(String, hash_key=True)
    version = Column(Integer, range_key=True)
    data = Column(Binary)

class Profile(BaseModel):
    id = Column(String, hash_key=True)
    version = Column(Integer, range_key=True)
    summary = Column(String)

Now, you can define an abstract base and re-use the id and version columns in both:

class MyBase(BaseModel):
    class Meta:
        abstract = True
    id = Column(String, hash_key=True)
    version = Column(Integer, range_key=True)

class User(MyBase):
    data = Column(Binary)

class Profile(MyBase):
    summary = Column(String)

You can use multiple inheritance to compose models from multiple mixins; base classes do not need to subclass BaseModel. Here's the same two models as above, but the hash and range keys are defined across two mixins:

class StringHash:
    id = Column(String, hash_key=True)

class IntegerRange:
    version = Column(Integer, range_key=True)


class User(StringHash, IntegerRange, BaseModel):
    data = Column(Binary)

class Profile(StringHash, IntegerRange, BaseModel):
    summary = Column(String)

Mixins may also contain GlobalSecondaryIndex and LocalSecondaryIndex, even if their hash/range keys aren't defined in that mixin:

class ByEmail:
    by_email = GlobalSecondaryIndex(projection="keys", hash_key="email")


class User(StringHash, IntegerRange, ByEmail, BaseModel):
    email = Column(String)
Meta.init

With the addition of column defaults (see below) Bloop needed to differentiate local mode instantiation from remote model instantiation. Local model instantiation still uses __init__, as in:

user = User(email="me@gmail.com", verified=False)

Unlike Engine.load which takes existing model instances, all of Engine.query, Engine.scan, Engine.stream will create their own instances. These methods use the model's Meta.init to create new instances. Previously this defaulted to __init__. However, with the default __init__ method applying defaults in 2.0.0 this is no longer acceptable for remote instantiation. Instead, cls.__new__(cls) is used by default to create instances during query/scan/stream.

This is an important distinction that Bloop should have made early on, but was forced due to defaults. For example, imagine querying an index that doesn't project a column with a default. If the base __init__ was still used, the Column's default would be used for the non-projected column even if there was already a value in DynamoDB. Here's one model that would have the problem:

class User(BaseModel):
    id = Column(UUID, hash_key=True)
    created = Column(DateTime, default=datetime.datetime.now)
    email = Column(String)
    by_email = GlobalSecondaryIndex(projection="keys", hash_key=email)

user = User(id=uuid.uuid4(), email="me@gmail.com")
engine.save(user)
print(user.created)  # Some datetime T1


query = engine.Query(User.by_email, hash_key=User.email=="me@gmail.com")
partial_user = query.first()

partial_user.created  # This column isn't part of the index's projection!

If User.Meta.init was still User.__init__ then partial_user.created would invoke the default function for User.created and give us the current datetime. Instead, Bloop 2.0.0 will call User.__new__(User) and we'll get an AttributeError because partial_user doesn't have a created value.

Column Defaults

Many columns have the same initialization value, even across models. For example, all but one of the following columns will be set to the same value or using the same logic:

class User(BaseModel):
    email = Column(String, hash_key=True)
    id = Column(UUID)
    verified = Column(Boolean)
    created = Column(DateTime)
    followers = Column(Integer)

Previously, you might apply defaults by creating a simple function:

def new_user(email) -> User:
    return User(
        email=email,
        id=uuid.uuid4(),
        verified=False,
        created=datetime.datetime.now(),
        followers=0)

You'll still need a function for related initialization (eg. across fields or model instances) but for simple defaults, you can now specify them with the Column:

class User(BaseModel):
    email = Column(String, hash_key=True)
    id = Column(UUID, default=uuid.uuid4)
    verified = Column(Boolean, default=False)
    created = Column(DateTime, default=datetime.datetime.now)
    followers = Column(Integer, default=0)


def new_user(email) -> User:
    return User(email=email)

Defaults are only applied when creating new local instances inside the default BaseModel.__init__ - they are not evaluated when loading objects with Engine.load, Engine.query, Engine.stream etc. If you define a custom __init__ without calling super().__init__(...) they will not be applied.

In a related change, see above for the BaseModel.Meta.init change. By default Bloop uses cls.__new__(cls) to create new instances of your models during Engine.scan and Engine.query instead of the previous default to __init__. This is intentional, to avoid applying unnecessary defaults to partially-loaded objects.

TTL

DynamoDB introduced the ability to specify a TTL column, which indicates a date (in seconds since the epoch) after which the row may be automatically (eventually) cleaned up. This column must be a Number, and Bloop exposes the Timestamp type which is used as a datetime.datetime. Like the DynamoDBStreams feature, the TTL is configured on a model's Meta attribute:

class TemporaryPaste(BaseModel):
    class Meta:
        ttl = {
            "column": "delete_after"
        }
    id = Column(String, hash_key=True)
    s3_location = Column(String, dynamo_name="s3")
    delete_after = Column(Timestamp)

Remember that it can take up to 24 hours for the row to be deleted; you should guard your reads using the current time against the cleanup time, or a filter with your queries:

# made up index
query = engine.Query(
    TemporaryPaste.by_email,
    key=TemporaryPaste.email=="me@gmail.com",
    filter=TemporaryPaste.delete_after <= datetime.datetime.now())
print(query.first())

Bloop still refuses to update existing tables, so TTL will only be enabled on tables if they are created by Bloop during Engine.bind. Otherwise, the declaration exists exclusively to verify configuration.

Types

A new type Timestamp was added for use with the new TTL feature (see above). This is a datetime.datetime in Python just like the DateTime type, but is stored as an integer (whole seconds since epoch) instead of an ISO 8601 string. As with DateTime, drop-in replacements are available for arrow, delorean, and pendulum.

Exceptions
  • InvalidIndex was replaced by the existing InvalidModel

  • InvalidSearchMode, InvalidKeyCondition, InvalidFilterCondition, and InvalidProjection were replaced by InvalidSearch

  • UnboundModel was removed without replacement; Engine.bind was refactored so that it would never be raised.

  • InvalidComparisonOperator was removed without replacement; it was never raised.

Migrating to 1.0.0

The 1.0.0 release includes a number of api changes, although functionally not much has changed since 0.9.6. The biggest changes are to Query and Scan syntax, which has changed from a builder pattern to a single call. The remaining changes are mostly resolved through a different import or parameter/attribute name.

Session, Client

In 1.0.0 the Engine wraps two clients: one for DynamoDB, and one for DynamoDBStreams. Bloop will create default clients for any missing parameters using boto3.client:

import boto3
from bloop import Engine

ddb = boto3.client("dynamodb")
streams = boto3.client("dynamodbstreams")
engine = Engine(dynamodb=ddb, dynamodbstreams=streams)
Before 0.9.11

Prior to 0.9.11, you could customize the session that an Engine used to talk to DynamoDB by creating an instance of a boto3.session.Session and passing it to the Engine during instantiation. This allowed you to use a different profile name:

from boto3 import Session
from bloop import Engine

session = Session(profile_name="my-profile")
engine = Engine(session=session)

Now, you will need to create client instances from that session:

from boto3 import session
from bloop import Engine

session = Session(profile_name="my-profile")
engine = Engine(
    dynamodb=session.client("dynamodb"),
    dynamodbstreams=session.client("dynamodbstreams")
)
After 0.9.11

In 0.9.11, the Engine changed to take a bloop.Client which wrapped a boto3 client. This allowed you to connect to a different endpoint, such as a DynamoDBLocal instance:

import boto3
from bloop import Client, Engine

boto_client = boto3.client("dynamodb", endpoint_url="http://localhost:8000")
bloop_client = Client(boto_client=boto_client)
engine = Engine(client=bloop_client)

The intermediate bloop Client is no longer necessary, but a dynamodbstreams client can be provided:

import boto3
from bloop import Client, Engine

ddb = boto3.client("dynamodb", endpoint_url="http://localhost:8000")
streams = boto3.client("dynamodbstreams", endpoint_url="http://localhost:8000")
engine = Engine(dynamodb=ddb, dynamodbstreams=streams)
Engine
Config

Prior to 1.0.0, Engine took a number of configuration options. These have all been removed, and baked into existing structures, or are only specified at the operation level. Engine no longer takes **config kwargs.

  • atomic controlled the default value for delete and save operations. If your engine had a default atomic of True, you must now explicitly specify that with each delete and save. The same is true for consistent, which controlled the default for load, query, and scan.

  • prefetch controlled the default number of items that Bloop would fetch for a query or scan. Bloop now uses the built-in pagination controls, and will fetch the next page when the currently buffered page has been iterated. There is no way to control the number of items loaded into the buffer at once.

  • strict controlled the default setting for query and scan against an LSI. This is now part of the declaration of an LSI: by_create = LocalSecondaryIndex(projection="all", range_key="created", strict=False). By default an LSI is strict, which matches the default configuration option. This change means an LSI must be accessed by every caller the same way. You can't have one caller use strict=True while another uses strict=False.

EngineView and context

Because there are no more engine.config values, there is no point to using engines as context managers. Previously, you could use an EngineView to change one config option of an engine for a local command, without changing the underlying engine's configuration:

with engine.context(atomic=True) as atomic:
    atomic.save(...)
    # a bunch of operations that perform atomic saves

Engine.context and the EngineView class have been removed since there is no longer an Engine.config.

Engine.save, Engine.delete

These functions take *objs instead of objs, which makes passing a small number of items more comfortable.

user = User(...)
tweet = Tweet(...)

# Old: explicit list required
engine.save([user, tweet])

# 1.0.0: *varargs
engine.save(user, tweet)

# 1.0.0: save a list
some_users = get_modified()
engine.save(*some_users)
Query, Scan

Queries and Scans are now created in a single call, instead of using an ambiguous builder pattern. This will simplify most calls, but will be disruptive if you rely on partially building queries in different parts of your code.

Creating Queries

The most common issue with the builder pattern was creating multi-condition filters. Each call would replace the existing filter, not append to it. For example:

# This only checks the date, NOT the count
q = engine.query(User).key(User.id == 0)
q = q.filter(User.friends >= 3)
q = q.filter(User.created >= arrow.now().replace(years=-1))

# 1.0.0 only has one filter option
q = engine.query(
    User, key=User.id == 0,
    filter=(
        (User.friends >= 3) &
        (User.created >= ...)
    )
)

The other query controls have been baked in, including projection, consistent, and forward. Previously, you changed the forward option through the properties ascending and descending. Use forward=False to sort descending.

Here is a query with all options before and after. The structure is largely the same, with a lower symbolic overhead:

# Pre 1.0.0
q = (
    engine.query(User)
        .key(User.id == 0)
        .projection("all")
        .descending
        .consistent
        .filter(User.name.begins_with("a"))
)

# 1.0.0
q = engine.query(
    User,
    key=User.id == 0,
    projection="all",
    forward=False,
    consistent=True,
    filter=User.name.begins_with("a")
)

The same changes apply to Engine.scan, although Scans can't be performed in descending order.

Parallel Scans

1.0.0 allows you to create a parallel scan by specifying the segment that this scan covers. This is just a tuple of (Segment, TotalSegments). For example, to scan Users in three pieces:

scans = [
    engine.scan(User, parallel=(0, 3)),
    engine.scan(User, parallel=(1, 3)),
    engine.scan(User, parallel=(2, 3))
]

for worker, scan in zip(workers, scans):
    worker.process(scan)
Iteration and Properties

The all method and prefetch and limit options have been removed. Each call to Engine.query() or Engine.scan() will create a new iterator that tracks its progress and can be reset. To create different iterators over the same parameters, you must call Engine.query() multiple times.

# All the same iterator
>>> scan = engine.scan(User, filter=...)
>>> it_one = iter(scan)
>>> it_two = iter(scan)
>>> it_one is it_two is scan
True

Query and Scan no longer buffer their results, and you will need to reset the query to execute it again.

>>> scan = engine.scan(User)
>>> for result in scan:
...     pass
...
>>> scan.exhausted
True
>>> scan.reset()
>>> for result in scan:
...     print(result.id)
...
0
1
2
  • The complete property has been renamed to exhausted to match the new Stream interface.

  • The results property has been removed.

  • count, scanned, one(), and first() are unchanged.

Models
Base Model and abstract

Model declaration is largely unchanged, except for the model hierarchy. Early versions tied one base model to one engine; later versions required a function to create each new base. In 1.0.0, every model inherits from a single abstract model, BaseModel:

from bloop import BaseModel, Column, Integer


class User(BaseModel):
    id = Column(Integer, hash_key=True)
    ...

Additionally, any model can be an abstract base for a number of other models (to simplify binding subsets of all models) by setting the Meta attribute abstract to True:

from bloop import BaseModel

class AbstractUser(BaseModel):
    class Meta:
        abstract = True

    @property
    def is_email_verified(self):
        return bool(getattr(self, "verified", False))
Before 0.9.6

Models were tied to a single Engine, and so the base class for any model had to come from that Engine:

from bloop import Engine

primary = Engine()
secondary = Engine()

class User(primary.model):
    ...

# secondary can't save or load instances of User!

Now that models are decoupled from engines, any engine can bind and load any model:

from bloop import BaseModel, Engine

primary = Engine()
secondary = Engine()

class User(BaseModel):
    ...

primary.bind(User)
secondary.bind(User)
After 0.9.6

After models were decoupled from Engines, Bloop still used some magic to create base models that didn't have hash keys but also didn't fail various model creation validation. This meant you had to get a base model from new_base():

from bloop import Engine, new_base

primary = Engine()
secondary = Engine()

Base = new_base()

class User(Base):
    ...

primary.bind(User)
secondary.bind(User)

Now, the base model is imported directly. You can simplify the transition using an alias import. To adapt the above code, we would alias BaseModel to Base:

from bloop import Engine
from bloop import BaseModel as Base

primary = Engine()
secondary = Engine()

class User(Base):
    ...

primary.bind(User)
secondary.bind(User)
Binding

Engine.bind has undergone a few stylistic tweaks, and started offering recursive binding. The parameter base is no longer keyword-only.

To bind all concrete (Meta.abstract=False) models from a single base, pass the base model:

from bloop import BaseModel, Engine

class AbstractUser(BaseModel):
    class Meta:
        abstract = True

class AbstractDataBlob(BaseModel):
    class Meta:
        abstract = True

class User(AbstractUser):
    ...

class S3Blob(AbstractDataBlob):
    ...

engine = Engine()
engine.bind(AbstractUser)

This will bind User but not S3Blob.

Indexes
Projection is Required

In 1.0.0, projection is required for both GlobalSecondaryIndex and LocalSecondaryIndex. This is because Bloop now supports binding multiple models to the same table, and the "all" projection is not really DynamoDB's all, but instead an INCLUDE with all columns that the model defines.

Previously:

from bloop import new_base, Column, Integer, GlobalSecondaryIndex

class MyModel(new_base()):
    id = Column(Integer, hash_key=True)
    data = Column(Integer)

    # implicit "keys"
    by_data = GlobalSecondaryIndex(hash_key="data")

Now, this must explicitly state that the projection is "keys":

from bloop import BaseModel, Column, Integer, GlobalSecondaryIndex

class MyModel(BaseModel):
    id = Column(Integer, hash_key=True)
    data = Column(Integer)

    by_data = GlobalSecondaryIndex(
        projection="keys", hash_key="data")
Hash and Range Key

1.0.0 also lets you use the Column object (and not just its model name) as the parameter to hash_key and range_key:

class MyModel(BaseModel):
    id = Column(Integer, hash_key=True)
    data = Column(Integer)

    by_data = GlobalSecondaryIndex(
        projection="keys", hash_key=data)
__set__ and __del__

Finally, Bloop disallows setting and deleting attributes on objects with the same name as an index. Previously, it would simply set that value on the object and silently ignore it when loading or saving. It wasn't clear that the value wasn't applied to the Index's hash or range key.

>>> class MyModel(BaseModel):
...     id = Column(Integer, hash_key=True)
...     data = Column(Integer)
...     by_data = GlobalSecondaryIndex(
...         projection="keys", hash_key=data)
...
>>> obj = MyModel()
>>> obj.by_data = "foo"
Traceback (most recent call last):
  ...
AttributeError: MyModel.by_data is a GlobalSecondaryIndex
Types
DateTime

Previously, DateTime was backed by arrow. Instead of forcing a particular library on users -- and there are a number of high-quality choices -- Bloop's built-in datetime type is now backed by the standard library's datetime.datetime. This type only loads and dumps values in UTC, and uses a fixed ISO8601 format string which always uses +00:00 for the timezone. DateTime will forcefully convert the timezone when saving to DynamoDB with datetime.datetime.astimezone() which raises on naive datetime objects. For this reason, you must specify a timezone when using this type.

Most users are expected to have a preferred datetime library, and so Bloop now includes implementations of DateTime in a new extensions module bloop.ext for the three most popular datetime libraries: arrow, delorean, and pendulum. These expose the previous interface, which allows you to specify a local timezone to apply when loading values from DynamoDB. It still defaults to UTC.

To swap out an existing DateTime class and continue using arrow objects:

# from bloop import DateTime
from bloop.ext.arrow import DateTime

To use delorean instead:

# from bloop import DateTime
from bloop.ext.delorean import DateTime

Future extensions will also be grouped by external package, and are not limited to types. For example, an alternate Engine implementation could be provided in bloop.ext.sqlalchemy that can bind SQLAlchemy's ORM models and transparently maps Bloop types to SQLALchemy types.

Float

Float has been renamed to Number and now takes an optional decimal.Context to use when translating numbers to DynamoDB's wire format. The same context used in previous versions (which comes from the specifications in DynamoDB's User Guide) is used as the default; existing code only needs to use the new name or alias it on import:

# from bloop import Float
from bloop import Number as Float

A new pattern has been added that provides a less restrictive type which always loads and dumps float instead of decimal.Decimal. This comes at the expense of exactness, since Float's decimal context does not trap Rounding or Inexact signals. This is a common request for boto3; keep its limitations in mind when storing and loading values. It's probably fine for a cached version of a product rating, but you're playing with fire storing account balances with it.

String

A minor change, String no longer calls str(value) when dumping to DynamoDB. This was obscuring cases where the wrong value was provided, but the type silently coerced a string using that object's __str__. Now, you will need to manually call str on objects, or boto3 will complain of an incorrect type.

>>> from bloop import BaseModel, Column, Engine, String

>>> class MyModel(BaseModel):
...     id = Column(String, hash_key=True)
...
>>> engine = Engine()
>>> engine.bind(MyModel)

>>> not_a_str = object()
>>> obj = MyModel(id=not_a_str)

# previously, this would store "<object object at 0x7f92a5a2f680>"
# since that is str(not_a_str).
>>> engine.save(obj)

# now, this raises (newlines for readability)
Traceback (most recent call last):
  ..
ParamValidationError: Parameter validation failed:
Invalid type for
    parameter Key.id.S,
    value: <object object at 0x7f92a5a2f680>,
    type: <class 'object'>,
    valid types: <class 'str'>
Exceptions

NotModified was raised by Engine.load when some objects were not found. This has been renamed to MissingObjects and is otherwise unchanged.

Exceptions for unknown or abstract models have changed slightly. When an Engine fails to load or dump a model, it will raise UnboundModel. When a value fails to load or dump but isn't a subclass of BaseModel, the engine raises UnknownType. When you attempt to perform a mutating operation (load, save, ...) on an abstract model, the engine raises InvalidModel.

Changelog

This changelog structure is based on Keep a Changelog v0.3.0. Bloop follows Semantic Versioning 2.0.0 and a draft appendix for its Public API.

Unreleased

(no unreleased changes)

3.1.0 - 2021-11-11

Fixed an issue where copying an Index would lose projection information when the projection mode was "include". This fix should have no effect for most users. You would only run into this issue if you were manually calling bind_index with copy=True on a projection mode "include" or you subclass a model that has an index with that projection mode. This does not require a major version change since there is no reasonable workaround that would be broken by making this fix. For example, a user might decide to monkeypatch Index.__copy__, bind_index or refresh_index to preserve the projection information. Those workarounds will not be broken by this change. For an example of the issue, see Issue #147.

[Changed]
  • Index.projection is now a set instead of a list`.  Since ``Column implements __hash__ this won't affect any existing calls that pass in lists. To remain consistent, this change is reflected in Engine.search, Search.__init__, Index.__init__, and any docs or examples that refer to passing lists/sets of Columns.

[Fixed]
  • Index.__copy__ preserves Index.projection["included"] when projection mode is "include".

3.0.0 - 2019-10-11

Remove deprecated keyword atomic= from Engine.save and Engine.delete, and Type._dump must return a bloop.actions.Action instance. See the Migration Guide for context on these changes, and sample code to easily migrate your existing custom Types.

[Added]
  • (internal) util.default_context can be used to create a new load/dump context and respects existing dict objects and keys (even if empty).

[Changed]
  • Type._dump must return a bloop.actions.Action now. Most users won't need to change any code since custom types usually override dynamo_dump. If you have implemented your own _dump function, you can probably just use actions.wrap and actions.unwrap to migrate:

    def _dump(self, value, *, context, **kwargs):
        value = actions.unwrap(value)
        # the rest of your function here
        return actions.wrap(value)
    
[Removed]
  • The deprecated atomic= keyword has been removed from Engine.save and Engine.delete.

  • The exception bloop.exceptions.UnknownType is no longer raised and has been removed.

  • (internal) BaseModel._load and BaseModel._dump have been removed. These were not documented or used anywhere in the code base, and unpack_from_dynamodb should be used where _load was anyway.

  • (internal) Engine._load and Engine._dump have been removed. These were not documented and are trivially replaced with calls to typedef._load and typedef._dump instead.

  • (internal) The dumped attr for Conditions is no longer needed since there's no need to dump objects except at render time.

2.4.1 - 2019-10-11

Bug fix. Thanks to @wilfre in PR #141!

[Fixed]
  • bloop.stream.shard.py::unpack_shards no longer raises when a Shard in the DescribeStream has a ParentId that is not also available in the DescribeStream response (the parent shard has been deleted). Previously the code would raise while trying to link the two shard objects in memory. Now, the shard will have a ParentId of None.

2.4.0 - 2019-06-13

The atomic= keyword for Engine.save and Engine.delete is deprecated and will be removed in 3.0. In 2.4 your code will continue to work but will raise DeprecationWarning when you specify a value for atomic=.

The Type._dump function return value is changing to Union[Any, bloop.Action] in 2.4 to prepare for the change in 3.0 to exclusively returning a bloop.Action. For built-in types and most custom types that only override dynamo_dump this is a no-op, but if you call Type._dump you can use bloop.actions.unwrap() on the result to get the inner value. If you have a custom Type._dump method it must return an action in 3.0. For ease of use you can use bloop.actions.wrap() which will specify either the SET or REMOVE action to match existing behavior. Here's an example of how you can quickly modify your code:

# current pre-2.4 method, continues to work until 3.0
def _dump(self, value, **kwargs):
    value = self.dynamo_dump(value, **kwargs)
    if value is None:
        return None
    return {self.backing_type: value}

# works in 2.4 and 3.0
from bloop import actions
def _dump(self, value, **kwargs):
    value = actions.unwrap(value)
    value = self.dynamo_dump(value, **kwargs)
    return actions.wrap(value)

Note that this is backwards compatible in 2.4: Type._dump will not change unless you opt to pass the new Action object to it.

[Added]
  • SearchIterator.token provides a way to start a new Query or Scan from a previous query/scan's state. See Issue #132.

  • SearchIterator.move_to takes a token to update the search state. Count/ScannedCount state are lost when moving to a token.

  • Engine.delete and Engine.save take an optional argument sync= which can be used to update objects with the old or new values from DynamoDB after saving or deleting. See the Return Values section of the User Guide and Issue #137.

  • bloop.actions expose a way to manipulate atomic counters and sets. See the Atomic Counters section of the User Guide and Issue #136.

[Changed]
  • The atomic= keyword for Engine.save and Engine.delete emits DeprecationWarning and will be removed in 3.0.

  • Type._dump will return a bloop.action.Action object if one is passed in, in preparation for the change in 3.0.

2.3.3 - 2019-01-27

Engine.bind is much faster for multi-model tables. See Issue #130.

[Changed]
  • (internal) SessionWrapper caches DescribeTable responses. You can clear these with SessionWrapper.clear_cache; mutating calls such as .enable_ttl will invalidate the cached description.

  • (internal) Each Engine.bind will call CreateTable at most once per table. Subsequent calls to bind will call CreateTable again.

2.3.2 - 2019-01-27

Minor bug fix.

[Fixed]
  • (internal) bloop.conditions.iter_columns no longer yields None on Condition() (or any other condition whose .column attribute is None).

2.3.0 - 2019-01-24

This release adds support for Transactions and On-Demand Billing. Transactions can include changes across tables, and provide ACID guarantees at a 2x throughput cost and a limit of 10 items per transaction. See the User Guide for details.

with engine.transaction() as tx:
    tx.save(user, tweet)
    tx.delete(event, task)
    tx.check(meta, condition=Metadata.worker_id == current_worker)
[Added]
  • Engine.transaction(mode="w") returns a transaction object which can be used directly or as a context manager. By default this creates a WriteTransaction, but you can pass mode="r" to create a read transaction.

  • WriteTransaction and ReadTransaction can be prepared for committing with .prepare() which returns a PreparedTransaction which can be committed with .commit() some number of times. These calls are usually handled automatically when using the read/write transaction as a context manager:

    # manual calls
    tx = engine.transaction()
    tx.save(user)
    p = tx.prepare()
    p.commit()
    
    # equivalent functionality
    with engine.transaction() as tx:
        tx.save(user)
    
  • Meta supports On-Demand Billing:

    class MyModel(BaseModel):
        id = Column(String, hash_key=True)
        class Meta:
            billing = {"mode": "on_demand"}
    
  • (internal) bloop.session.SessionWrapper.transaction_read and bloop.session.SessionWrapper.transaction_write can be used to call TransactGetItems and TransactWriteItems with fully serialized request objects. The write api requires a client request token to provide idempotency guards, but does not provide temporal bounds checks for those tokens.

[Changed]
  • Engine.load now logs at INFO instead of WARNING when failing to load some objects.

  • Meta.ttl["enabled"] will now be a literal True or False after binding the model, rather than the string "enabled" or "disabled".

  • If Meta.encryption or Meta.backups is None or missing, it will now be set after binding the model.

  • Meta and GSI read/write units are not validated if billing mode is "on_demand" since they will be 0 and the provided setting is ignored.

2.2.0 - 2018-08-30
[Added]
  • DynamicList and DynamicMap types can store arbitrary values, although they will only be loaded as their primitive, direct mapping to DynamoDB backing types. For example:

    class MyModel(BaseModel):
        id = Column(String, hash_key=True)
        blob = Column(DynamicMap)
    i = MyModel(id="i")
    i.blob = {"foo": "bar", "inner": [True, {1, 2, 3}, b""]}
    
  • Meta supports Continuous Backups for Point-In-Time Recovery:

    class MyModel(BaseModel):
        id = Column(String, hash_key=True)
        class Meta:
            backups = {"enabled": True}
    
  • SearchIterator exposes an all() method which eagerly loads all results and returns a single list. Note that the query or scan is reset each time the method is called, discarding any previously buffered state.

[Changed]
  • String and Binary types load None as "" and b"" respectively.

  • Saving an empty String or Binary ("" or b"") will no longer throw a botocore exception, and will instead be treated as None. This brings behavior in line with the Set, List, and Map types.

2.1.0 - 2018-04-07

Added support for Server-Side Encryption. This uses an AWS-managed Customer Master Key (CMK) stored in KMS which is managed for free: "You are not charged for the following: AWS-managed CMKs, which are automatically created on your behalf when you first attempt to encrypt a resource in a supported AWS service."

[Added]
  • Meta supports Server Side Encryption:

    class MyModel(BaseModel):
        id = Column(String, hash_key=True)
        class Meta:
            encryption = {"enabled": True}
    
2.0.1 - 2018-02-03

Fix a bug where the last records in a closed shard in a Stream were dropped. See Issue #87 and PR #112.

[Fixed]
  • Stream no longer drops the last records from a closed Shard when moving to the child shard.

2.0.0 - 2017-11-27

2.0.0 introduces 4 significant new features:

  • Model inheritance and mixins

  • Table name templates: table_name_template="prod-{table_name}"

  • TTL support: ttl = {"column": "not_after"}

  • Column defaults:

    verified=Column(Boolean, default=False)
    not_after = Column(
        Timestamp,
        default=lambda: (
            datetime.datetime.now() +
            datetime.timedelta(days=30)
        )
    )
    

Python 3.6.0 is now the minimum required version, as Bloop takes advantage of __set_name__ and __init_subclass__ to avoid the need for a Metaclass.

A number of internal-only and rarely-used external methods have been removed, as the processes which required them have been simplified:

  • Column.get, Column.set, Column.delete in favor of their descriptor protocol counterparts

  • bloop.Type._register is no longer necessary before using a custom Type

  • Index._bind is replaced by helpers bind_index and refresh_index. You should not need to call these.

  • A number of overly-specific exceptions have been removed.

[Added]
  • Engine takes an optional keyword-only arg "table_name_template" which takes either a string used to format each name, or a function which will be called with the model to get the table name of. This removes the need to connect to the before_create_table signal, which also could not handle multiple table names for the same model. With this change BaseModel.Meta.table_name will no longer be authoritative, and the engine must be consulted to find a given model's table name. An internal function Engine._compute_table_name is available, and the per-engine table names may be added to the model.Meta in the future. (see Issue #96)

  • A new exception InvalidTemplate is raised when an Engine's table_name_template is a string but does not contain the required "{table_name}" formatting key.

  • You can now specify a TTL (see Issue #87) on a model much like a Stream:

    class MyModel(BaseModel):
        class Meta:
            ttl = {
                "column": "expire_after"
            }
    
    
        id = Column(UUID, hash_key=True)
        expire_after = Column(Timestamp)
    
  • A new type, Timestamp was added. This stores a datetime.datetime as a unix timestamp in whole seconds.

  • Corresponding Timestamp types were added to the following extensions, mirroring the DateTime extension: bloop.ext.arrow.Timestamp, bloop.ext.delorean.Timestamp, and bloop.ext.pendulum.Timestamp.

  • Column takes an optional kwarg default, either a single value or a no-arg function that returns a value. Defaults are applied only during BaseModel.__init__ and not when loading objects from a Query, Scan, or Stream. If your function returns bloop.util.missing, no default will be applied. (see PR #90, PR #105 for extensive discussion)

  • (internal) A new abstract interface, bloop.models.IMeta was added to assist with code completion. This fully describes the contents of a BaseModel.Meta instance, and can safely be subclassed to provide hints to your editor:

    class MyModel(BaseModel):
        class Meta(bloop.models.IMeta):
            table_name = "my-table"
        ...
    
  • (internal) bloop.session.SessionWrapper.enable_ttl can be used to enable a TTL on a table. This SHOULD NOT be called unless the table was just created by bloop.

  • (internal) helpers for dynamic model inheritance have been added to the bloop.models package:

    • bloop.models.bind_column

    • bloop.models.bind_index

    • bloop.models.refresh_index

    • bloop.models.unbind

    Direct use is discouraged without a strong understanding of how binding and inheritance work within bloop.

[Changed]
  • Python 3.6 is the minimum supported version.

  • BaseModel no longer requires a Metaclass, which allows it to be used as a mixin to an existing class which may have a Metaclass.

  • BaseModel.Meta.init no longer defaults to the model's __init__ method, and will instead use cls.__new__(cls) to obtain an instance of the model. You can still specify a custom initialization function:

    class MyModel(BaseModel):
        class Meta:
            @classmethod
            def init(_):
                instance = MyModel.__new__(MyModel)
                instance.created_from_init = True
        id = Column(...)
    
  • Column and Index support the shallow copy method __copy__ to simplify inheritance with custom subclasses. You may override this to change how your subclasses are inherited.

  • DateTime explicitly guards against tzinfo is None, since datetime.astimezone started silently allowing this in Python 3.6 -- you should not use a naive datetime for any reason.

  • Column.model_name is now Column.name, and Index.model_name is now Index.name.

  • Column(name=) is now Column(dynamo_name=) and Index(name=) is now Index(dynamo_name=)

  • The exception InvalidModel is raised instead of InvalidIndex.

  • The exception InvalidSearch is raised instead of the following: InvalidSearchMode, InvalidKeyCondition, InvalidFilterCondition, and InvalidProjection.

  • (internal) bloop.session.SessionWrapper methods now require an explicit table name, which is not read from the model name. This exists to support different computed table names per engine. The following methods now require a table name: create_table, describe_table (new), validate_table, and enable_ttl (new).

[Removed]
  • bloop no longer supports Python versions below 3.6.0

  • bloop no longer depends on declare

  • Column.get, Column.set, and Column.delete helpers have been removed in favor of using the Descriptor protocol methods directly: Column.__get__, Column.__set__, and Column.__delete__.

  • bloop.Type no longer exposes a _register method; there is no need to register types before using them, and you can remove the call entirely.

  • Column.model_name, Index.model_name, and the kwargs Column(name=), Index(name=) (see above)

  • The exception InvalidIndex has been removed.

  • The exception InvalidComparisonOperator was unused and has been removed.

  • The exception UnboundModel is no longer raised during Engine.bind and has been removed.

  • The exceptions InvalidSearchMode, InvalidKeyCondition, InvalidFilterCondition, and InvalidProjection have been removed.

  • (internal) Index._bind has been replaced with the more complete solutions in bloop.models.bind_column and bloop.models.bind_index.

1.3.0 - 2017-10-08

This release is exclusively to prepare users for the name/model_name/dynamo_name changes coming in 2.0; your 1.2.0 code will continue to work as usual but will raise DeprecationWarning when accessing model_name on a Column or Index, or when specifying the name= kwarg in the __init__ method of Column, GlobalSecondaryIndex, or LocalSecondaryIndex.

Previously it was unclear if Column.model_name was the name of this column in its model, or the name of the model it is attached to (eg. a shortcut for Column.model.__name__). Additionally the name= kwarg actually mapped to the object's .dynamo_name value, which was not obvious.

Now the Column.name attribute will hold the name of the column in its model, while Column.dynamo_name will hold the name used in DynamoDB, and is passed during initialization as dynamo_name=. Accessing model_name or passing name= during __init__ will raise deprecation warnings, and bloop 2.0.0 will remove the deprecated properties and ignore the deprecated kwargs.

[Added]
  • Column.name is the new home of the Column.model_name attribute. The same is true for Index, GlobalSecondaryIndex, and LocalSecondaryIndex.

  • The __init__ method of Column, Index, GlobalSecondaryIndex, and LocalSecondaryIndex now takes dynamo_name= in place of name=.

[Changed]
  • Accessing Column.model_name raises DeprecationWarning, and the same for Index/GSI/LSI.

  • Providing Column(name=) raises DeprecationWarning, and the same for Index/GSI/LSI.

1.2.0 - 2017-09-11
[Changed]
  • When a Model's Meta does not explicitly set read_units and write_units, it will only default to 1/1 if the table does not exist and needs to be created. If the table already exists, any throughput will be considered valid. This will still ensure new tables have 1/1 iops as a default, but won't fail if an existing table has more than one of either.

    There is no behavior change for explicit integer values of read_units and write_units: if the table does not exist it will be created with those values, and if it does exist then validation will fail if the actual values differ from the modeled values.

    An explicit None for either read_units or write_units is equivalent to omitting the value, but allows for a more explicit declaration in the model.

    Because this is a relaxing of a default only within the context of validation (creation has the same semantics) the only users that should be impacted are those that do not declare read_units and write_units and rely on the built-in validation failing to match on values != 1. Users that rely on the validation to succeed on tables with values of 1 will see no change in behavior. This fits within the extended criteria of a minor release since there is a viable and obvious workaround for the current behavior (declare 1/1 and ensure failure on other values).

  • When a Query or Scan has projection type "count", accessing the count or scanned properties will immediately execute and exhaust the iterator to provide the count or scanned count. This simplifies the previous workaround of calling next(query, None) before using query.count.

[Fixed]
  • Fixed a bug where a Query or Scan with projection "count" would always raise KeyError (see Issue #95)

  • Fixed a bug where resetting a Query or Scan would cause __next__ to raise botocore.exceptions.ParamValidationError (see Issue #95)

1.1.0 - 2017-04-26
[Added]
  • Engine.bind takes optional kwarg skip_table_setup to skip CreateTable and DescribeTable calls (see Issue #83)

  • Index validates against a superset of the projection (see Issue #71)

1.0.3 - 2017-03-05

Bug fix.

[Fixed]
  • Stream orders records on the integer of SequenceNumber, not the lexicographical sorting of its string representation. This is an annoying bug, because as documented we should be using lexicographical sorting on the opaque string. However, without leading 0s that sort fails, and we must assume the string represents an integer to sort on. Particularly annoying, tomorrow the SequenceNumber could start with non-numeric characters and still conform to the spec, but the sorting-as-int assumption breaks. However, we can't properly sort without making that assumption.

1.0.2 - 2017-03-05

Minor bug fix.

[Fixed]
  • extension types in ext.arrow, ext.delorean, and ext.pendulum now load and dump None correctly.

1.0.1 - 2017-03-04

Bug fixes.

[Changed]
  • The arrow, delorean, and pendulum extensions now have a default timezone of "utc" instead of datetime.timezone.utc. There are open issues for both projects to verify if that is the expected behavior.

[Fixed]
  • DynamoDBStreams return a Timestamp for each record's ApproximateCreationDateTime, which botocore is translating into a real datetime.datetime object. Previously, the record parser assumed an int was used. While this fix is a breaking change for an internal API, this bug broke the Stream iterator interface entirely, which means no one could have been using it anyway.

1.0.0 - 2016-11-16

1.0.0 is the culmination of just under a year of redesigns, bug fixes, and new features. Over 550 commits, more than 60 issues closed, over 1200 new unit tests. At an extremely high level:

  • The query and scan interfaces have been polished and simplified. Extraneous methods and configuration settings have been cut out, while ambiguous properties and methods have been merged into a single call.

  • A new, simple API exposes DynamoDBStreams with just a few methods; no need to manage individual shards, maintain shard hierarchies and open/closed polling. I believe this is a first since the Kinesis Adapter and KCL, although they serve different purposes. When a single worker can keep up with a model's stream, Bloop's interface is immensely easier to use.

  • Engine's methods are more consistent with each other and across the code base, and all of the configuration settings have been made redundant. This removes the need for EngineView and its associated temporary config changes.

  • Blinker-powered signals make it easy to plug in additional logic when certain events occur: before a table is created; after a model is validated; whenever an object is modified.

  • Types have been pared down while their flexibility has increased significantly. It's possible to create a type that loads another object as a column's value, using the engine and context passed into the load and dump functions. Be careful with this; transactions on top of DynamoDB are very hard to get right.

See the Migration Guide above for specific examples of breaking changes and how to fix them, or the User Guide for a tour of the new Bloop. Lastly, the Public and Internal API References are finally available and should cover everything you need to extend or replace whole subsystems in Bloop (if not, please open an issue).

[Added]
  • bloop.signals exposes Blinker signals which can be used to monitor object changes, when instances are loaded from a query, before models are bound, etc.

    • before_create_table

    • object_loaded

    • object_saved

    • object_deleted

    • object_modified

    • model_bound

    • model_created

    • model_validated

  • Engine.stream can be used to iterate over all records in a stream, with a total ordering over approximate record creation time. Use engine.stream(model, "trim_horizon") to get started. See the User Guide for details.

  • New exceptions RecordsExpired and ShardIteratorExpired for errors in stream state

  • New exceptions Invalid* for bad input subclass BloopException and ValueError

  • DateTime types for the three most common date time libraries:

    • bloop.ext.arrow.DateTime

    • bloop.ext.delorean.DateTime

    • bloop.ext.pendulum.DateTime

  • model.Meta has a new optional attribute stream which can be used to enable a stream on the model's table.

  • model.Meta exposes the same projection attribute as Index so that (index or model.Meta).projection can be used interchangeably

  • New Stream class exposes DynamoDBStreams API as a single iterable with powerful seek/jump options, and simple json-friendly tokens for pausing and resuming iteration.

  • Over 1200 unit tests added

  • Initial integration tests added

  • (internal) bloop.conditions.ReferenceTracker handles building #n0, :v1, and associated values. Use any_ref to build a reference to a name/path/value, and pop_refs when backtracking (eg. when a value is actually another column, or when correcting a partially valid condition)

  • (internal) bloop.conditions.render is the preferred entry point for rendering, and handles all permutations of conditions, filters, projections. Use over ConditionRenderer unless you need very specific control over rendering sequencing.

  • (internal) bloop.session.SessionWrapper exposes DynamoDBStreams operations in addition to previous bloop.Client wrappers around DynamoDB client

  • (internal) New supporting classes streams.buffer.RecordBuffer, streams.shard.Shard, and streams.coordinator.Coordinator to encapsulate the hell^Wjoy that is working with DynamoDBStreams

  • (internal) New class util.Sentinel for placeholder values like missing and last_token that provide clearer docstrings, instead of showing func(..., default=object<0x...>) these will show func(..., default=Sentinel<[Missing]>)

[Changed]
  • bloop.Column emits object_modified on __set__ and __del__

  • Conditions now check if they can be used with a column's typedef and raise InvalidCondition when they can't. For example, contains can't be used on Number, nor > on Set(String)

  • bloop.Engine no longer takes an optional bloop.Client but instead optional dynamodb and dynamodbstreams clients (usually created from boto3.client("dynamodb") etc.)

  • Engine no longer takes **config -- its settings have been dispersed to their local touch points

    • atomic is a parameter of save and delete and defaults to False

    • consistent is a parameter of load, query, scan and defaults to False

    • prefetch has no equivalent, and is baked into the new Query/Scan iterator logic

    • strict is a parameter of a LocalSecondaryIndex, defaults to True

  • Engine no longer has a context to create temporary views with different configuration

  • Engine.bind is no longer by keyword arg only: engine.bind(MyBase) is acceptable in addition to engine.bind(base=MyBase)

  • Engine.bind emits new signals before_create_table, model_validated, and model_bound

  • Engine.delete and Engine.save take *objs instead of objs to easily save/delete small multiples of objects (engine.save(user, tweet) instead of engine.save([user, tweet]))

  • Engine guards against loading, saving, querying, etc against abstract models

  • Engine.load raises MissingObjects instead of NotModified (exception rename)

  • Engine.scan and Engine.query take all query and scan arguments immediately, instead of using the builder pattern. For example, engine.scan(model).filter(Model.x==3) has become engine.scan(model, filter=Model.x==3).

  • bloop.exceptions.NotModified renamed to bloop.exceptions.MissingObjects

  • Any code that raised AbstractModelException now raises UnboundModel

  • bloop.types.DateTime is now backed by datetime.datetime instead of arrow. Only supports UTC now, no local timezone. Use the bloop.ext.arrow.DateTime class to continue using arrow.

  • The query and scan interfaces have been entirely refactored: count, consistent, ascending and other properties are part of the Engine.query(...) parameters. all() is no longer needed, as Engine.scan and .query immediately return an iterable object. There is no prefetch setting, or limit.

  • The complete property for Query and Scan have been replaced with exhausted, to be consistent with the Stream module

  • The query and scan iterator no longer cache results

  • The projection parameter is now required for GlobalSecondaryIndex and LocalSecondaryIndex

  • Calling Index.__set__ or Index.__del__ will raise AttributeError. For example, some_user.by_email = 3 raises if User.by_email is a GSI

  • bloop.Number replaces bloop.Float and takes an optional decimal.Context for converting numbers. For a less strict, lossy Float type see the Patterns section of the User Guide

  • bloop.String.dynamo_dump no longer calls str() on the value, which was hiding bugs where a non-string object was passed (eg. some_user.name = object() would save with a name of <object <0x...>)

  • bloop.DateTime is now backed by datetime.datetime and only knows UTC in a fixed format. Adapters for arrow, delorean, and pendulum are available in bloop.ext

  • bloop.DateTime does not support naive datetimes; they must always have a tzinfo

  • docs:

    • use RTD theme

    • rewritten three times

    • now includes public and internal api references

  • (internal) Path lookups on Column (eg. User.profile["name"]["last"]) use simpler proxies

  • (internal) Proxy behavior split out from Column's base class bloop.conditions.ComparisonMixin for a cleaner namespace

  • (internal) bloop.conditions.ConditionRenderer rewritten, uses a new bloop.conditions.ReferenceTracker with a much clearer api

  • (internal) ConditionRenderer can backtrack references and handles columns as values (eg. User.name.in_([User.email, "literal"]))

  • (internal) _MultiCondition logic rolled into bloop.conditions.BaseCondition, AndCondition and OrCondition no longer have intermediate base class

  • (internal) AttributeExists logic rolled into bloop.conditions.ComparisonCondition

  • (internal) bloop.tracking rolled into bloop.conditions and is hooked into the object_* signals. Methods are no longer called directly (eg. no need for tracking.sync(some_obj, engine))

  • (internal) update condition is built from a set of columns, not a dict of updates to apply

  • (internal) bloop.conditions.BaseCondition is a more comprehensive base class, and handles all manner of out-of-order merges (and(x, y) vs and(y, x) where x is an and condition and y is not)

  • (internal) almost all *Condition classes simply implement __repr__ and render; BaseCondition takes care of everything else

  • (internal) bloop.Client became bloop.session.SessionWrapper

  • (internal) Engine._dump takes an optional context, **kwargs, matching the signature of Engine._load

  • (internal) BaseModel no longer implements __hash__, __eq__, or __ne__ but ModelMetaclass will always ensure a __hash__ function when the subclass is created

  • (internal) Filter and FilterIterator rewritten entirely in the bloop.search module across multiple classes

[Removed]
  • AbstractModelException has been rolled into UnboundModel

  • The all() method has been removed from the query and scan iterator interface. Simply iterate with next(query) or for result in query:

  • Query.results and Scan.results have been removed and results are no longer cached. You can begin the search again with query.reset()

  • The new_base() function has been removed in favor of subclassing BaseModel directly

  • bloop.Float has been replaced by bloop.Number

  • (internal) bloop.engine.LoadManager logic was rolled into bloop.engine.load(...)

  • EngineView has been removed since engines no longer have a baseline config and don't need a context to temporarily modify it

  • (internal) Engine._update has been removed in favor of util.unpack_from_dynamodb

  • (internal) Engine._instance has been removed in favor of directly creating instances from model.Meta.init() in unpack_from_dynamodb

[Fixed]
  • Column.contains(value) now renders value with the column typedef's inner type. Previously, the container type was used, so Data.some_list.contains("foo")) would render as (contains(some_list, ["f", "o", "o"])) instead of (contains(some_list, "foo"))

  • Set renders correct wire format -- previously, it incorrectly sent {"SS": [{"S": "h"}, {"S": "i"}]} instead of the correct {"SS": ["h", "i"]}

  • (internal) Set and List expose an inner_typedef for conditions to force rendering of inner values (currently only used by ContainsCondition)

0.9.13 - 2016-10-31
[Fixed]
  • Set was rendering an invalid wire format, and now renders the correct "SS", "NS", or "BS" values.

  • Set and List were rendering contains conditions incorrectly, by trying to dump each value in the value passed to contains. For example, MyModel.strings.contains("foo") would render contains(#n0, :v1) where :v1 was {"SS": [{"S": "f"}, {"S": "o"}, {"S": "o"}]}. Now, non-iterable values are rendered singularly, so :v1 would be {"S": "foo"}. This is a temporary fix, and only works for simple cases. For example, List(List(String)) will still break when performing a contains check. This is fixed correctly in 1.0.0 and you should migrate as soon as possible.

0.9.12 - 2016-06-13
[Added]
  • model.Meta now exposes gsis and lsis, in addition to the existing indexes. This simplifies code that needs to iterate over each type of index and not all indexes.

[Removed]
  • engine_for_profile was no longer necessary, since the client instances could simply be created with a given profile.

0.9.11 - 2016-06-12
[Changed]
  • bloop.Client now takes boto_client, which should be an instance of boto3.client("dynamodb") instead of a boto3.session.Session. This lets you specify endpoints and other configuration only exposed during the client creation process.

  • Engine no longer uses "session" from the config, and instead takes a client param which should be an instance of bloop.Client. bloop.Client will be going away in 1.0.0 and Engine will simply take the boto3 clients directly.

0.9.10 - 2016-06-07
[Added]
  • New exception AbstractModelException is raised when attempting to perform an operation which requires a table, on an abstract model. Raised by all Engine functions as well as bloop.Client operations.

[Changed]
  • Engine operations raise AbstractModelException when attempting to perform operations on abstract models.

  • Previously, models were considered non-abstract if model.Meta.abstract was False, or there was no value. Now, ModelMetaclass will explicitly set abstract to False so that model.Meta.abstract can be used everywhere, instead of getattr(model.Meta, "abstract", False).

0.9.9 - 2016-06-06
[Added]
  • Column has a new attribute model, the model it is bound to. This is set during the model's creation by the ModelMetaclass.

[Changed]
  • Engine.bind will now skip intermediate models that are abstract. This makes it easier to pass abstract models, or models whose subclasses may be abstract (and have non-abstract grandchildren).

0.9.8 - 2016-06-05

(no public changes)

0.9.7 - 2016-06-05
[Changed]
  • Conditions implement __eq__ for checking if two conditions will evaluate the same. For example:

    >>> large = Blob.size > 1024**2
    >>> small = Blob.size < 1024**2
    >>> large == small
    False
    >>> also_large = Blob.size > 1024**2
    >>> large == also_large
    True
    >>> large is also_large
    False
    
0.9.6 - 2016-06-04

0.9.6 is the first significant change to how Bloop binds models, engines, and tables. There are a few breaking changes, although they should be easy to update.

Where you previously created a model from the Engine's model:

from bloop import Engine

engine = Engine()

class MyModel(engine.model):
    ...

You'll now create a base without any relation to an engine, and then bind it to any engines you want:

from bloop import Engine, new_base

BaseModel = new_base()

class MyModel(BaseModel):
    ...

engine = Engine()
engine.bind(base=MyModel)  # or base=BaseModel
[Added]
  • A new function engine_for_profile takes a profile name for the config file and creates an appropriate session. This is a temporary utility, since Engine will eventually take instances of dynamodb and dynamodbstreams clients. This will be going away in 1.0.0.

  • A new base exception BloopException which can be used to catch anything thrown by Bloop.

  • A new function new_base() creates an abstract base for models. This replaces Engine.model now that multiple engines can bind the same model. This will be going away in 1.0.0 which will provide a BaseModel class.

[Changed]
  • The session parameter to Engine is now part of the config kwargs. The underlying bloop.Client is no longer created in Engine.__init__, which provides an opportunity to swap out the client entirely before the first Engine.bind call. The semantics of session and client are unchanged.

  • Engine._load, Engine._dump, and all Type signatures now pass an engine explicitly through the context parameter. This was mentioned in 0.9.2 and context is now required.

  • Engine.bind now binds the given class and all subclasses. This simplifies most workflows, since you can now create a base with MyBase = new_base() and then bind every model you create with engine.bind(base=MyBase).

  • All exceptions now subclass a new base exception BloopException instead of Exception.

  • Vector types Set, List, Map, and TypedMap accept a typedef of None so they can raise a more helpful error message. This will be reverted in 1.0.0 and will once again be a required parameter.

[Removed]
  • Engine no longer has model, unbound_models, or models attributes. Engine.model has been replaced by the new_base() function, and models are bound directly to the underlying type engine without tracking on the Engine instance itself.

  • EngineView dropped the corresponding attributes above.

0.9.5 - 2016-06-01
[Changed]
  • EngineView attributes are now properties, and point to the underlying engine's attributes; this includes client, model, type_engine, and unbound_models. This fixed an issue when using with engine.context(...) as view: to perform operations on models bound to the engine but not the engine view. EngineView will be going away in 1.0.0.

0.9.4 - 2015-12-31
[Added]
  • Engine functions now take optional config parameters to override the engine's config. You should update your code to use these values instead of engine.config, since engine.config is going away in 1.0.0. Engine.delete and Engine.save expose the atomic parameter, while Engine.load exposes consistent.

  • Added the TypedMap class, which provides dict mapping for a single typedef over any number of keys. This differs from Map, which must know all keys ahead of time and can use different types. TypedMap only supports a single type, but can have arbitrary keys. This will be going away in 1.0.0.

0.9.2 - 2015-12-11
[Changed]
  • Type functions _load, _dump, dynamo_load, dynamo_dump now take an optional keyword-only arg context. This dict will become required in 0.9.6, and contains the engine instance that should be used for recursive types. If your type currently uses cls.Meta.bloop_engine, you should start using context["engine"] in the next release. The bloop_engine attribute is being removed, since models will be able to bind to multiple engines.

0.9.1 - 2015-12-07

(no public changes)

0.9.0 - 2015-12-07

About

Contributing

Thanks for contributing! Feel free to open an issue for any bugs, typos, unhelpful docs, or general unhappiness which you may encounter while using Bloop. If you want to create a pull request, even more awesome! Please make sure all the tox environments pass.

To start developing Bloop first create a fork, then clone and run the tests:

git clone git@github.com:[YOU]/bloop.git
cd bloop
pip install tox -e .
tox

Note

The integration tests use docker to run a local instance of DynamoDB. The tests automatically start and tear down an image named "ddb-local" that uses port 8000. You can use --skip-cleanup to leave the container running after tests finish.

Versioning

Public API

Bloop follows Semantic Versioning 2.0.0 and a draft appendix for its Public API.

The following are enforced:

  • Backwards incompatible changes in major version only

  • New features in minor version or higher

  • Backwards compatible bug fixes in patch version or higher (see appendix)

Internal API

The Internal API is not versioned, and may make backwards incompatible changes at any time. When a class or function is not explicitly documented as part on the public or internal api, it is part of the internal api. Still, please open an issue so it can be appropriately documented.

License

The MIT License (MIT)

Copyright (c) 2021 Joe Cross

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.