Public

Engine

By default, Bloop will build clients directly from boto3.client(). To customize the engine's connection, you can provide your own DynamoDB and DynamoDBStreams clients:

import bloop
import boto3

dynamodb_local = boto3.client("dynamodb", endpoint_url="http://127.0.0.1:8000")
streams_local = boto3.client("dynamodbstreams", endpoint_url="http://127.0.0.1:8001")

engine = bloop.Engine(
    dynamodb=dynamodb_local,
    dynamodbstreams=streams_local)
class bloop.engine.Engine(*, dynamodb=None, dynamodbstreams=None, table_name_template: Union[str, Callable[Any, str]] = '{table_name}')[source]

Primary means of interacting with DynamoDB.

To apply a prefix to each model's table name, you can use a simple format string:

>>> template = "my-prefix-{table_name}"
>>> engine = Engine(table_name_template=template)

For more complex table_name customization, you can provide a function:

>>> def reverse_name(model):
...     return model.Meta.table_name[::-1]
>>> engine = Engine(table_name_template=reverse_name)
Parameters
  • dynamodb -- DynamoDB client. Defaults to boto3.client("dynamodb").

  • dynamodbstreams -- DynamoDBStreams client. Defaults to boto3.client("dynamodbstreams").

  • table_name_template -- Customize the table name of each model bound to the engine. If a string is provided, string.format(table_name=model.Meta.table_name) will be called. If a function is provided, the function will be called with the model as its sole argument. Defaults to "{table_name}".

bind(model, *, skip_table_setup=False)[source]

Create backing tables for a model and its non-abstract subclasses.

Parameters
  • model -- Base model to bind. Can be abstract.

  • skip_table_setup -- Don't create or verify the table in DynamoDB. Default is False.

Raises

bloop.exceptions.InvalidModel -- if model is not a subclass of BaseModel.

delete(*objs, condition=None, sync=None)[source]

Delete one or more objects.

Parameters
  • objs -- objects to delete.

  • condition -- only perform each delete if this condition holds.

  • sync -- update objects after deleting. "old" loads attributes before the delete; None does not mutate the object locally. Default is None.

Raises

bloop.exceptions.ConstraintViolation -- if the condition (or atomic) is not met.

load(*objs, consistent=False)[source]

Populate objects from DynamoDB.

Parameters
Raises
query(model_or_index, key, filter=None, projection='all', consistent=False, forward=True)[source]

Create a reusable QueryIterator.

Parameters
  • model_or_index -- A model or index to query. For example, User or User.by_email.

  • key -- Key condition. This must include an equality against the hash key, and optionally one of a restricted set of conditions on the range key.

  • filter -- Filter condition. Only matching objects will be included in the results.

  • projection -- "all", "count", a set of column names, or a set of Column. When projection is "count", you must advance the iterator to retrieve the count.

  • consistent (bool) -- Use strongly consistent reads if True. Default is False.

  • forward (bool) -- Query in ascending or descending order. Default is True (ascending).

Returns

A reusable query iterator with helper methods.

Return type

QueryIterator

save(*objs, condition=None, sync=None)[source]

Save one or more objects.

Parameters
  • objs -- objects to save.

  • condition -- only perform each save if this condition holds.

  • sync -- update objects after saving. "new" loads attributes after the save; "old" loads attributes before the save; None does not mutate the object locally. Default is None.

Raises

bloop.exceptions.ConstraintViolation -- if the condition (or atomic) is not met.

scan(model_or_index, filter=None, projection='all', consistent=False, parallel=None)[source]

Create a reusable ScanIterator.

Parameters
  • model_or_index -- A model or index to scan. For example, User or User.by_email.

  • filter -- Filter condition. Only matching objects will be included in the results.

  • projection -- "all", "count", a list of column names, or a list of Column. When projection is "count", you must exhaust the iterator to retrieve the count.

  • consistent (bool) -- Use strongly consistent reads if True. Default is False.

  • parallel (tuple) -- Perform a parallel scan. A tuple of (Segment, TotalSegments) for this portion the scan. Default is None.

Returns

A reusable scan iterator with helper methods.

Return type

ScanIterator

stream(model, position)[source]

Create a Stream that provides approximate chronological ordering.

# Create a user so we have a record
>>> engine = Engine()
>>> user = User(id=3, email="user@domain.com")
>>> engine.save(user)
>>> user.email = "admin@domain.com"
>>> engine.save(user)

# First record lacks an "old" value since it's an insert
>>> stream = engine.stream(User, "trim_horizon")
>>> next(stream)
{'key': None,
 'old': None,
 'new': User(email='user@domain.com', id=3, verified=None),
 'meta': {
     'created_at': datetime.datetime(2016, 10, 23, ...),
     'event': {
         'id': '3fe6d339b7cb19a1474b3d853972c12a',
         'type': 'insert',
         'version': '1.1'},
     'sequence_number': '700000000007366876916'}
}
Parameters
  • model -- The model to stream records from.

  • position -- "trim_horizon", "latest", a stream token, or a datetime.datetime.

Returns

An iterator for records in all shards.

Return type

Stream

Raises

bloop.exceptions.InvalidStream -- if the model does not have a stream.

transaction(mode='w')[source]

Create a new ReadTransaction or WriteTransaction.

As a context manager, calling commit when the block exits:

>>> engine = Engine()
>>> user = User(id=3, email="user@domain.com")
>>> tweet = Tweet(id=42, data="hello, world")
>>> with engine.transaction("w") as tx:
...     tx.delete(user)
...     tx.save(tweet, condition=Tweet.id.is_(None))

Or manually calling prepare and commit:

>>> engine = Engine()
>>> user = User(id=3, email="user@domain.com")
>>> tweet = Tweet(id=42, data="hello, world")
>>> tx = engine.transaction("w")
>>> tx.delete(user)
>>> tx.save(tweet, condition=Tweet.id.is_(None))
>>> tx.prepare().commit()
Parameters

mode (str) -- Either "r" or "w" to create a ReadTransaction or WriteTransaction. Default is "w"

Returns

A new transaction that can be committed.

Return type

ReadTransaction or WriteTransaction

Models

See defining models in the User Guide.

BaseModel

class bloop.models.BaseModel(**attrs)[source]

Abstract base that all models derive from.

Provides a basic __init__ method that takes **kwargs whose keys are columns names:

class URL(BaseModel):
    id = Column(UUID, hash_key=True)
    ip = Column(IPv6)
    name = Column(String)

url = URL(id=uuid.uuid4(), name="google")

By default, the __init__ method is not called when new instances are required, for example when iterating results from Query, Scan or a Stream.

Meta[source]

Holds table configuration and computed properties of the model. See model meta in the User Guide.

Column

class bloop.models.Column(typedef, hash_key=False, range_key=False, dynamo_name=None, default=<Sentinel[missing]>)[source]
default

A no-arg function used during instantiation of the column's model. Returns bloop.util.missing when the column does not have a default. Defaults to lambda: bloop.util.missing.

dynamo_name

The name of this column in DynamoDB. Defaults to the column's name.

hash_key

True if this is the model's hash key.

model

The model this column is attached to.

name

The name of this column in the model. Not settable.

>>> class Document(BaseModel):
...     ...
...     cheat_codes = Column(Set(String), dynamo_name="cc")
...
>>> Document.cheat_codes.name
cheat_codes
>>> Document.cheat_codes.dynamo_name
cc
range_key

True if this is the model's range key.

__copy__()[source]

Create a shallow copy of this Column. Primarily used when initializing models that subclass other abstract models or mixins (baseless classes that contain Columns and Indexes). You can override this method to change how derived models are created:

import copy


class MyColumn(Column):
    def __copy__(self):
        new = super().__copy__()
        new.derived = True
        return new


column = MyColumn(Integer)
same = copy.copy(column)
assert same.derived  # True
Returns

A shallow copy of this Column, with the model and _name attributes unset.

GlobalSecondaryIndex

class bloop.models.GlobalSecondaryIndex(*, projection, hash_key, range_key=None, read_units=None, write_units=None, dynamo_name=None, **kwargs)[source]

See GlobalSecondaryIndex in the DynamoDB Developer Guide for details.

Parameters
  • projection -- Either "keys", "all", or a list of column name or objects. Included columns will be projected into the index. Key columns are always included.

  • hash_key -- The column that the index can be queried against.

  • range_key -- (Optional) The column that the index can be sorted on. Default is None.

  • read_units (int) -- (Optional) Provisioned read units for the index. Default is None. When no value is provided and the index does not exist, it will be created with 1 read unit. If the index already exists, it will use the actual index's read units.

  • write_units (int) -- (Optional) Provisioned write units for the index. Default is None. When no value is provided and the index does not exist, it will be created with 1 write unit. If the index already exists, it will use the actual index's write units.

  • dynamo_name (str) -- (Optional) The index's name in in DynamoDB. Defaults to the index’s name in the model.

dynamo_name

The name of this index in DynamoDB. Defaults to the index's name.

hash_key

The column that the index can be queried against.

model

The model this index is attached to.

name

The name of this index in the model. Not settable.

>>> class Document(BaseModel):
...     ...
...     by_email = GlobalSecondaryIndex(
...         projection="keys", dynamo_name="ind_e", hash_key="email")
...
>>> Document.by_email.name
by_email
>>> Document.by_email.dynamo_name
ind_e
projection
{
    "available":  # Set of columns that can be returned from a query or search.
    "included":   # Set of columns that can be used in query and scan filters.
    "mode":       # "all", "keys", or "include"
    "strict":     # False if queries and scans can fetch non-included columns
}

GSIs can't incur extra reads, so "strict" will always be true and "available" is always the same as "included".

range_key

The column that the index can be sorted on. May be None.

read_units

Provisioned read units for the index. GSIs have their own provisioned throughput.

write_units

Provisioned write units for the index. GSIs have their own provisioned throughput.

__copy__()

Create a shallow copy of this Index. Primarily used when initializing models that subclass other abstract models or mixins (baseless classes that contain Columns and Indexes). You can override this method to change how derived models are created:

import copy


class MyIndex(Index):
    def __copy__(self):
        new = super().__copy__()
        new.derived = True
        return new


index = MyIndex(projection="keys", hash_key="some_column")
same = copy.copy(index)
assert same.derived  # True
Returns

A shallow copy of this Index, with the model and _name attributes unset, and the computed projection invalidated.

LocalSecondaryIndex

class bloop.models.LocalSecondaryIndex(*, projection, range_key, dynamo_name=None, strict=True, **kwargs)[source]

See LocalSecondaryIndex in the DynamoDB Developer Guide for details.

Unlike GlobalSecondaryIndex each LSI shares its throughput with the table and their hash key is always the table hash key.

Parameters
  • projection -- Either "keys", "all", or a list of column name or objects. Included columns will be projected into the index. Key columns are always included.

  • range_key -- The column that the index can be sorted against.

  • dynamo_name (str) -- (Optional) The index's name in in DynamoDB. Defaults to the index’s name in the model.

  • strict (bool) -- (Optional) Restricts queries and scans on the LSI to columns in the projection. When False, DynamoDB may silently incur additional reads to load results. You should not disable this unless you have an explicit need. Default is True.

dynamo_name

The name of this index in DynamoDB. Defaults to the index's name.

hash_key

LSI's hash_key is always the table hash_key.

model

The model this index is attached to.

name

The name of this index in the model. Not settable.

>>> class Document(BaseModel):
...     ...
...     by_date = LocalSecondaryIndex(
...         projection="keys", dynamo_name="ind_co", range_key="created_on")
...
>>> Document.by_date.name
by_date
>>> Document.by_date.dynamo_name
ind_co
projection
{
    "available":  # Set of columns that can be returned from a query or search.
    "included":   # Set of columns that can be used in query and scan filters.
    "mode":       # "all", "keys", or "include"
    "strict":     # False if queries and scans can fetch non-included columns
}

LSIs can incur extra reads, so "available" may be a superset of "included".

range_key

The column that the index can be sorted on. LSIs always have a range_key.

read_units

Provisioned read units for the index. LSIs share the table's provisioned throughput.

write_units

Provisioned write units for the index. LSIs share the table's provisioned throughput.

__copy__()

Create a shallow copy of this Index. Primarily used when initializing models that subclass other abstract models or mixins (baseless classes that contain Columns and Indexes). You can override this method to change how derived models are created:

import copy


class MyIndex(Index):
    def __copy__(self):
        new = super().__copy__()
        new.derived = True
        return new


index = MyIndex(projection="keys", hash_key="some_column")
same = copy.copy(index)
assert same.derived  # True
Returns

A shallow copy of this Index, with the model and _name attributes unset, and the computed projection invalidated.

Types

Most custom types only need to specify a backing_type (or subclass a built-in type) and override dynamo_dump() and dynamo_load():

class ReversedString(Type):
    python_type = str
    backing_type = "S"

    def dynamo_load(self, value, *, context, **kwargs):
        return str(value[::-1])

    def dynamo_dump(self, value, *, context, **kwargs):
        return str(value[::-1])

If a type's constructor doesn't have required args, a Column can use the class directly. The column will create an instance of the type by calling the constructor without any args:

class SomeModel(BaseModel):
    custom_hash_key = Column(ReversedString, hash_key=True)

In rare cases, complex types may need to implement _dump() or _load().

Type

class bloop.types.Type[source]

Abstract base type.

python_type

The type local values will have. Informational only, this is not used for validation.

backing_type

The DynamoDB type that Bloop will store values as.

One of:

  • "S" -- string

  • "N" -- number

  • "B" -- binary

  • "SS" -- string set

  • "NS" -- number set

  • "BS" -- binary set

  • "M" -- map

  • "L" -- list

  • "BOOL" -- boolean

See the DynamoDB API Reference for details.

supports_operation(operation: str) bool[source]

Used to ensure a conditional operation is supported by this type.

By default, uses a hardcoded table of operations that maps to each backing DynamoDB type.

You can override this method to implement your own conditional operators, or to dynamically adjust which operations your type supports.

dynamo_dump(value, *, context, **kwargs)[source]

Converts a local value into a DynamoDB value.

For example, to store a string enum as an integer:

def dynamo_dump(self, value, *, context, **kwargs):
    colors = ["red", "blue", "green"]
    return colors.index(value.lower())
dynamo_load(value, *, context, **kwargs)[source]

Converts a DynamoDB value into a local value.

For example, to load a string enum from an integer:

def dynamo_dump(self, value, *, context, **kwargs):
    colors = ["red", "blue", "green"]
    return colors[value]
_dump(value, **kwargs)[source]

Entry point for serializing values. Most custom types should use dynamo_dump().

This wraps the return value of dynamo_dump() in DynamoDB's wire format. For example, serializing a string enum to an int:

value = "green"
# dynamo_dump("green") = 2
_dump(value) == {"N": 2}

If a complex type calls this function with None, it will forward None to dynamo_dump(). This can happen when dumping eg. a sparse Map, or a missing (not set) value.

_load(value, **kwargs)[source]

Entry point for deserializing values. Most custom types should use dynamo_load().

This unpacks DynamoDB's wire format and calls dynamo_load() on the inner value. For example, deserializing an int to a string enum:

value = {"N": 2}
# dynamo_load(2) = "green"
_load(value) == "green"

If a complex type calls this function with None, it will forward None to dynamo_load(). This can happen when loading eg. a sparse Map.

String

class bloop.types.String[source]
backing_type = "S"
python_type = str

Number

You should use decimal.Decimal instances to avoid rounding errors:

>>> from bloop import BaseModel, Engine, Column, Number, Integer
>>> class Product(BaseModel):
...     id = Column(Integer, hash_key=True)
...     rating = Column(Number)

>>> engine = Engine()
>>> engine.bind(Rating)

>>> product = Product(id=0, rating=3.14)
>>> engine.save(product)
# Long traceback
Inexact: [<class 'decimal.Inexact'>, <class 'decimal.Rounded'>]

>>> from decimal import Decimal
>>> product.rating = Decimal('3.14')
>>> engine.save(product)
>>> # Success!
class bloop.types.Number(context=None)[source]

Base for all numeric types.

Parameters

context -- (Optional) decimal.Context used to translate numbers. Default is a context that matches DynamoDB's stated limits, taken from boto3.

See also

If you don't want to deal with decimal.Decimal, see the Float type in the patterns section.

backing_type = "N"
python_type = decimal.Decimal
context = decimal.Context

The context used to transfer numbers to DynamoDB.

Binary

class bloop.types.Binary[source]
backing_type = "B"
python_type = bytes

Boolean

class bloop.types.Boolean[source]
backing_type = "BOOL"
python_type = bool

UUID

class bloop.types.UUID[source]
backing_type = "S"
python_type = uuid.UUID

DateTime

bloop.types.FIXED_ISO8601_FORMAT

DateTimes must be stored in DynamoDB in UTC with this exact format, and a +00:00 suffix. This is necessary for using comparison operators such as > and <= on DateTime instance.

You must not use "Z" or any other suffix than "+00:00" to indicate UTC. You must not omit the timezone specifier.

class bloop.types.DateTime[source]

Always stored in DynamoDB using the FIXED_ISO8601_FORMAT format.

Naive datetimes (tzinfo is None) are not supported, and trying to use one will raise ValueError.

from datetime import datetime, timedelta, timezone

class Model(Base):
    id = Column(Integer, hash_key=True)
    date = Column(DateTime)
engine.bind()

obj = Model(id=1, date=datetime.now(timezone.utc))
engine.save(obj)

one_day_ago = datetime.now(timezone.utc) - timedelta(days=1)

query = engine.query(
    Model,
    key=Model.id==1,
    filter=Model.date >= one_day_ago)

query.first().date

Note

To use common datetime libraries such as arrow, delorean, or pendulum, see DateTime and Timestamp Extensions in the user guide. These are drop-in replacements and support non-utc timezones:

from bloop import DateTime  # becomes:
from bloop.ext.pendulum import DateTime
backing_type = "S"
python_type = datetime.datetime

Timestamp

class bloop.types.Timestamp(context=None)[source]

Stores the unix (epoch) time in seconds. Milliseconds are truncated to 0 on load and save.

Naive datetimes (tzinfo is None) are not supported, and trying to use one will raise ValueError.

from datetime import datetime, timedelta, timezone

class Model(Base):
    id = Column(Integer, hash_key=True)
    date = Column(Timestamp)
engine.bind()

obj = Model(id=1, date=datetime.now(timezone.utc))
engine.save(obj)

one_day_ago = datetime.now(timezone.utc) - timedelta(days=1)

query = engine.query(
    Model,
    key=Model.id==1,
    filter=Model.date >= one_day_ago)

query.first().date

Note

To use common datetime libraries such as arrow, delorean, or pendulum, see DateTime and Timestamp Extensions in the user guide. These are drop-in replacements and support non-utc timezones:

from bloop import Timestamp  # becomes:
from bloop.ext.pendulum import Timestamp
backing_type = "N"
python_type = datetime.datetime

Integer

class bloop.types.Integer(context=None)[source]

Truncates values when loading or dumping.

For example, 3.14 in DynamoDB is loaded as 3. If a value is 7.5 locally, it's stored in DynamoDB as 7.

backing_type = "N"
python_type = int
context = decimal.Context

The context used to transfer numbers to DynamoDB.

Set

class bloop.types.Set(typedef)[source]

Generic set type. Must provide an inner type.

class Customer(BaseModel):
    id = Column(Integer, hash_key=True)
    account_ids = Column(Set(UUID))
Parameters

typedef -- The type to use when loading and saving values in this set. Must have a backing_type of "S", "N", or "B".

backing_type = "SS", "NS", or "BS"

Set is not a standalone type; its backing type depends on the inner type its constructor receives. For example, Set(DateTime) has backing type "SS" because DateTime has backing type "S".

python_type = set
inner_typedef = Type

The typedef for values in this Set. Has a backing type of "S", "N", or "B".

List

class bloop.types.List(typedef)[source]

Holds values of a single type.

Similar to Set because it requires a single type. However, that type can be another List, or Map, or Boolean. This is restricted to a single type even though DynamoDB is not because there is no way to know which Type to load a DynamoDB value with.

For example, {"S": "6d8b54a2-fa07-47e1-9305-717699459293"} could be loaded with UUID, String, or any other class that is backed by "S".

SingleQuizAnswers = List(String)

class AnswerBook(BaseModel):
    ...
    all_answers = Column(List(SingleQuizAnswers))

See also

To store arbitrary lists, see DynamicList.

Parameters

typedef -- The type to use when loading and saving values in this list.

backing_type = "L"
python_type = list
inner_typedef = Type

The typedef for values in this List. All types supported.

Map

class bloop.types.Map(**types)[source]

Mapping of fixed keys and their Types.

Metadata = Map(**{
    "created": DateTime,
    "referrer": UUID,
    "cache": String
})

Product = Map(
    id=Integer,
    metadata=Metadata,
    price=Number
)

class ProductCatalog(BaseModel):
    ...
    all_products = Column(List(Product))

See also

To store arbitrary documents, see DynamicMap.

Parameters

types -- (Optional) specifies the keys and their Types when loading and dumping the Map. Any keys that aren't specified in types are ignored when loading and dumping.

backing_type = "M"
python_type = dict
types = dict

Specifies the Type for each key in the Map. For example, a Map with two keys "id" and "rating" that are a UUID and Number respectively would have the following types:

{
    "id": UUID(),
    "rating": Number()
}

DynamicList

class bloop.types.DynamicList[source]

Holds a list of arbitrary values, including other DynamicLists and DynamicMaps.

Similar to List but is not constrained to a single type.

value = [1, True, "f"]
DynamicList()._dump(value)
    -> {"L": [{"N": "1"}, {"BOOL": true}, {"S": "f"}]}

Note

Values will only be loaded and dumped as their DynamoDB backing types. This means datetimes and uuids are stored and loaded as strings, and timestamps are stored and loaded as integers. For more information, see Dynamic Documents.

backing_type = "L"
python_type = list

DynamicMap

class bloop.types.DynamicMap[source]

Holds a dictionary of arbitrary values, including other DynamicLists and DynamicMaps.

Similar to Map but is not constrained to a single type.

value = {"f": 1, "in": [True]]
DynamicMap()._dump(value)
    -> {"M": {"f": {"N": 1}, "in": {"L": [{"BOOL": true}]}}}

Note

Values will only be loaded and dumped as their DynamoDB backing types. This means datetimes and uuids are stored and loaded as strings, and timestamps are stored and loaded as integers. For more information, see Dynamic Documents.

backing_type = "M"
python_type = dict

Actions

In most cases you do not need an action. However, you can use bloop.actions.add() to change a numeric value or a set's members without reading it, or bloop.actions.delete() to change a set's members without reading it.

As mentioned in the Atomic Counters section of the DynamoDB Developer Guide, you should understand the limitations of atomic counters and be

bloop.actions.add(value)[source]

Create a new ADD action.

The ADD action only supports Number and Set data types. In addition, ADD can only be used on top-level attributes, not nested attributes.

>>> import bloop.actions
>>> from my_models import Website
>>> website = Website(...)
>>> website.views = bloop.actions.add(1)
>>> website.remote_addrs = bloop.actions.add({"::0", "localhost"})
bloop.actions.delete(value)[source]

Create a new DELETE action.

The DELETE action only supports Set data types. In addition, DELETE can only be used on top-level attributes, not nested attributes.

>>> import bloop.actions
>>> from my_models import Website
>>> website = Website(...)
>>> website.remote_addrs = bloop.actions.delete({"::0", "localhost"})
bloop.actions.remove(value=None)[source]

Create a new REMOVE action.

Most types automatically create this action when you use del obj.some_attr or obj.some_attr = None

>>> import bloop.actions
>>> from my_models import User
>>> user = User(...)
# equivalent
>>> user.shell = None
>>> user.shell = bloop.actions.remove(None)
bloop.actions.set(value)[source]

Create a new SET action.

Most types automatically create this action when you use obj.some_attr = value

>>> import bloop.actions
>>> from my_models import User
>>> user = User(...)
# equivalent
>>> user.shell = "/bin/sh"
>>> user.shell = bloop.actions.set("/bin/sh")

Query

class bloop.search.QueryIterator(*, engine, model, index, request, projected)[source]

Reusable query iterator that unpacks result dicts into model instances.

Returned from Engine.query.

Parameters
  • engine -- Engine to unpack models with.

  • model -- BaseModel being queried.

  • index -- Index to query, or None.

  • request (dict) -- The base request dict for each Query call.

  • projected (set) -- Set of Column that should be included in each result.

all()

Eagerly load all results and return a single list. If there are no results, the list is empty.

Returns

A list of results.

property count

Number of items that have been loaded from DynamoDB so far, including buffered items.

property exhausted

True if there are no more results.

first()

Return the first result. If there are no results, raises ConstraintViolation.

Returns

The first result.

Raises

bloop.exceptions.ConstraintViolation -- No results.

move_to(token)

Restore an iterator to the state stored in a token. This will reset all iterator state, including count, scanned, and exhausted properties.

Parameters

token -- a SearchIterator.token

one()

Return the unique result. If there is not exactly one result, raises ConstraintViolation.

Returns

The unique result.

Raises

bloop.exceptions.ConstraintViolation -- Not exactly one result.

reset()

Reset to the initial state, clearing the buffer and zeroing count and scanned.

property scanned

Number of items that DynamoDB evaluated, before any filter was applied.

property token

JSON-serializable representation of the current SearchIterator state.

Use iterator.move_to(token) to move an iterator to this position.

Implementations will always include a "ExclusiveStartKey" key but may include additional metadata. The iterator's count and scanned values are not preserved.

Returns

Iterator state as a json-friendly dict

Scan

class bloop.search.ScanIterator(*, engine, model, index, request, projected)[source]

Reusable scan iterator that unpacks result dicts into model instances.

Returned from Engine.scan.

Parameters
  • engine -- Engine to unpack models with.

  • model -- BaseModel being scanned.

  • index -- Index to scan, or None.

  • request (dict) -- The base request dict for each Scan call.

  • projected (set) -- Set of Column that should be included in each result.

all()

Eagerly load all results and return a single list. If there are no results, the list is empty.

Returns

A list of results.

property count

Number of items that have been loaded from DynamoDB so far, including buffered items.

property exhausted

True if there are no more results.

first()

Return the first result. If there are no results, raises ConstraintViolation.

Returns

The first result.

Raises

bloop.exceptions.ConstraintViolation -- No results.

move_to(token)

Restore an iterator to the state stored in a token. This will reset all iterator state, including count, scanned, and exhausted properties.

Parameters

token -- a SearchIterator.token

one()

Return the unique result. If there is not exactly one result, raises ConstraintViolation.

Returns

The unique result.

Raises

bloop.exceptions.ConstraintViolation -- Not exactly one result.

reset()

Reset to the initial state, clearing the buffer and zeroing count and scanned.

property scanned

Number of items that DynamoDB evaluated, before any filter was applied.

property token

JSON-serializable representation of the current SearchIterator state.

Use iterator.move_to(token) to move an iterator to this position.

Implementations will always include a "ExclusiveStartKey" key but may include additional metadata. The iterator's count and scanned values are not preserved.

Returns

Iterator state as a json-friendly dict

Stream

Engine.stream() is the recommended way to create a stream. If you manually create a stream, you will need to call move_to() before iterating the Stream.

Warning

Chronological order is not guaranteed for high throughput streams.

DynamoDB guarantees ordering:

  • within any single shard

  • across shards for a single hash/range key

There is no way to exactly order records from adjacent shards. High throughput streams provide approximate ordering using each record's "ApproximateCreationDateTime".

Tables with a single partition guarantee order across all records.

See Stream Internals for details.

class bloop.stream.Stream(*, model, engine)[source]

Iterator over all records in a stream.

Parameters
  • model -- The model to stream records from.

  • engine (Engine) -- The engine to load model objects through.

heartbeat()[source]

Refresh iterators without sequence numbers so they don't expire.

Call this at least every 14 minutes.

move_to(position)[source]

Move the Stream to a specific endpoint or time, or load state from a token.

Moving to an endpoint with "trim_horizon" or "latest" and loading from a previous token are both very efficient.

In contrast, seeking to a specific time requires iterating all records in the stream up to that time. This can be very expensive. Once you have moved a stream to a time, you should save the Stream.token so reloading will be extremely fast.

Parameters

position -- "trim_horizon", "latest", datetime, or a Stream.token

property token

JSON-serializable representation of the current Stream state.

Use Engine.stream(YourModel, token) to create an identical stream, or stream.move_to(token) to move an existing stream to this position.

Returns

Stream state as a json-friendly dict

Return type

dict

Transactions

class bloop.transactions.ReadTransaction(engine)[source]

Loads all items in the same transaction. Items can be from different models and tables.

load(*objs) bloop.transactions.ReadTransaction[source]

Add one or more objects to be loaded in this transaction.

At most 10 items can be loaded in the same transaction. All objects will be loaded each time you call commit().

Parameters

objs -- Objects to add to the set that are loaded in this transaction.

Returns

this transaction for chaining

Raises

bloop.exceptions.MissingObjects -- if one or more objects aren't loaded.

prepare()

Create a new PreparedTransaction that can be committed.

This is called automatically when exiting the transaction as a context:

>>> engine = Engine()
>>> tx = WriteTransaction(engine)
>>> prepared = tx.prepare()
>>> prepared.commit()

# automatically calls commit when exiting
>>> with WriteTransaction(engine) as tx:
...     # modify the transaction here
...     pass
>>> # tx commits here
Returns

class bloop.transactions.WriteTransaction(engine)[source]

Applies all updates in the same transaction. Items can be from different models and tables.

As with an engine, you can apply conditions to each object that you save or delete, or a condition for the entire transaction that won't modify the specified object:

# condition on some_obj
>>> tx.save(some_obj, condition=SomeModel.name.begins_with("foo"))
# condition on the tx, based on the values of some_other_obj
>>> tx.check(some_other_obj, condition=ThatModel.capacity >= 100)
check(obj, condition) bloop.transactions.WriteTransaction[source]

Add a condition which must be met for the transaction to commit.

While the condition is checked against the provided object, that object will not be modified. It is only used to provide the hash and range key to apply the condition to.

At most 10 items can be checked, saved, or deleted in the same transaction. The same idempotency token will be used for a single prepared transaction, which allows you to safely call commit on the PreparedCommit object multiple times.

Parameters
  • obj -- The object to use for the transaction condition. This object will not be modified.

  • condition -- A condition on an object which must hold for the transaction to commit.

Returns

this transaction for chaining

delete(*objs, condition=None) bloop.transactions.WriteTransaction[source]

Add one or more objects to be deleted in this transaction.

At most 10 items can be checked, saved, or deleted in the same transaction. The same idempotency token will be used for a single prepared transaction, which allows you to safely call commit on the PreparedCommit object multiple times.

Parameters
  • objs -- Objects to add to the set that are deleted in this transaction.

  • condition -- A condition for these objects which must hold for the transaction to commit.

Returns

this transaction for chaining

prepare()

Create a new PreparedTransaction that can be committed.

This is called automatically when exiting the transaction as a context:

>>> engine = Engine()
>>> tx = WriteTransaction(engine)
>>> prepared = tx.prepare()
>>> prepared.commit()

# automatically calls commit when exiting
>>> with WriteTransaction(engine) as tx:
...     # modify the transaction here
...     pass
>>> # tx commits here
Returns

save(*objs, condition=None) bloop.transactions.WriteTransaction[source]

Add one or more objects to be saved in this transaction.

At most 10 items can be checked, saved, or deleted in the same transaction. The same idempotency token will be used for a single prepared transaction, which allows you to safely call commit on the PreparedCommit object multiple times.

Parameters
  • objs -- Objects to add to the set that are updated in this transaction.

  • condition -- A condition for these objects which must hold for the transaction to commit.

Returns

this transaction for chaining

Conditions

The only public class the conditions system exposes is the empty condition, Condition. The rest of the conditions system is baked into Column and consumed by the various Engine functions like Engine.save().

This function creates a condition for any model that can be used when saving to ensure you don't overwrite an existing value. The model's Meta attribute describes the required keys:

from bloop import Condition

def ensure_unique(model):
    condition = Condition()
    for key in model.Meta.keys:
        condition &= key.is_(None)
    return condition

See also

Conditions in the User Guide describes the possible conditions, and when and how to use them.

class bloop.conditions.Condition[source]

An empty condition.

combined = Condition()

for each_condition in get_conditions_list():
    combined &= each_condition

if not combined:
    print("Conditions list only had empty conditions, or no conditions")

Useful for iteratively building complex conditions, you can concatenate multiple conditions together without finding an initial condition in a possibly-empty list.

An empty condition is equivalent to omitting a condition:

engine.save(some_user)
engine.save(some_user, condition=Condition())

Signals

bloop.signals.before_create_table

Sent by engine before a model's backing table is created.

# Nonce table names to avoid testing collisions
@before_create_table.connect
def apply_table_nonce(_, model, **__):
    nonce = datetime.now().isoformat()
    model.Meta.table_name += "-test-{}".format(nonce)
Parameters
  • engine -- Engine creating the model's table.

  • model -- The BaseModel class to create a table for.

bloop.signals.object_loaded

Sent by engine after an object is loaded from DynamoDB.

# Track objects "checked out" locally
local_objects = {}

def key(obj):
    meta = obj.Meta
    return (getattr(obj, k.name) for k in meta.keys)

@object_loaded.connect
def on_loaded(_, obj, **__):
    local_objects[key(obj)] = obj
Parameters
  • engine -- The Engine that loaded the object.

  • obj -- The BaseModel loaded from DynamoDB.

bloop.signals.object_saved

Sent by engine after an object is saved to DynamoDB.

# Track objects "checked out" locally
local_objects = {}

def key(obj):
    meta = obj.Meta
    return (getattr(obj, k.name) for k in meta.keys)

@object_saved.connect
def on_saved(_, obj, **__):
    local_objects.pop(key(obj))
Parameters
  • engine -- The Engine that saved the object.

  • obj -- The BaseModel saved to DynamoDB.

bloop.signals.object_deleted

Sent by engine after an object is deleted from DynamoDB.

# Track objects "checked out" locally
local_objects = {}

def key(obj):
    meta = obj.Meta
    return (getattr(obj, k.name) for k in meta.keys)

@object_deleted.connect
def on_deleted(_, obj, **__):
    local_objects.pop(key(obj))
Parameters
  • engine -- The Engine that deleted the object.

  • obj -- The BaseModel deleted from DynamoDB.

bloop.signals.object_modified

Sent by column after an object's attribute is set or deleted.

This is sent on __set__ if an exception isn't raised, and on __del__ regardless of exceptions.

# Account balance can't be less than 0

@object_modified.connect
def enforce_positive_balance(_, obj, column, value, **__):
    if column is Account.balance and value < 0:
        # Danger: careful around infinite loops!
        setattr(obj, column.name, 0)
Parameters
  • column -- The Column that corresponds to the modified attribute.

  • obj -- The BaseModel that was modified.

  • value -- The new value of the attribute.

bloop.signals.model_bound

Sent by engine after a model is bound to that Engine.

This signal is sent after model_validated.

Parameters
  • engine -- The Engine that the model was bound to.

  • model -- The BaseModel class that was bound.

bloop.signals.model_created

Sent by None after a new model is defined.

While this signal is sent when the BaseModel is created, the BaseModel is created so early in Bloop's import order that no handlers will be connected when it occurs.

You can manually send the BaseModel through your handler with:

model_created.send(model=BaseModel)
Parameters

model -- The subclass of BaseModel that was created.

bloop.signals.model_validated

Sent by engine after a model is validated.

This signal is sent before model_bound.

Parameters
  • engine -- The Engine that validated the model.

  • model -- The BaseModel class that was validated.

Exceptions

Except to configure sessions, Bloop aims to completely abstract the boto3/botocore layers. If you encounter an exception from either boto3 or botocore, please open an issue. Bloop's exceptions are broadly divided into two categories: unexpected state, and invalid input.

To catch any exception from Bloop, use BloopException:

try:
    engine.stream(User, "latest")
except BloopException:
    print("Didn't expect an exception, but Bloop raised:")
    raise
class bloop.exceptions.BloopException[source]

An unexpected exception occurred.

Unexpected state

These are exceptions that you should be ready to handle in the normal course of using DynamoDB. For example, failing to load objects will raise MissingObjects, while conditional operations may fail with :exc`~bloop.exceptions.ConstraintViolation`.

class bloop.exceptions.ConstraintViolation[source]

A required condition was not met.

class bloop.exceptions.MissingObjects(*args, objects=None)[source]

Some objects were not found.

objects: list

The objects that failed to load

class bloop.exceptions.RecordsExpired[source]

The requested stream records are beyond the trim horizon.

class bloop.exceptions.ShardIteratorExpired[source]

The shard iterator is past its expiration date.

class bloop.exceptions.TableMismatch[source]

The expected and actual tables for this Model do not match.

class bloop.exceptions.TransactionCanceled[source]

The transaction was canceled.

A WriteTransaction is canceled when:
  • A condition in one of the condition expressions is not met.

  • A table in the TransactWriteItems request is in a different account or region.

  • More than one action in the TransactWriteItems operation targets the same item.

  • There is insufficient provisioned capacity for the transaction to be completed.

  • An item size becomes too large (larger than 400 KB), or a local secondary index (LSI) becomes too large, or a similar validation error occurs because of changes made by the transaction.

A ReadTransaction is canceled when:
  • There is an ongoing TransactGetItems operation that conflicts with a concurrent PutItem, UpdateItem, DeleteItem or TransactWriteItems request.

  • A table in the TransactGetItems request is in a different account or region.

  • There is insufficient provisioned capacity for the transaction to be completed.

  • There is a user error, such as an invalid data format.

See also

The API reference for TransactionCanceledException

class bloop.exceptions.TransactionTokenExpired[source]

The transaction's tx_id (ClientRequestToken) was first used more than 10 minutes ago

Bad Input

These are thrown when an option is invalid or missing, such as forgetting a key condition for a query, or trying to use an unknown projection type.

class bloop.exceptions.InvalidCondition[source]

This is not a valid Condition.

class bloop.exceptions.InvalidModel[source]

This is not a valid Model.

class bloop.exceptions.InvalidPosition[source]

This is not a valid position for a Stream.

class bloop.exceptions.InvalidSearch[source]

The search was malformed

class bloop.exceptions.InvalidShardIterator[source]

This is not a valid shard iterator.

class bloop.exceptions.InvalidStream[source]

This is not a valid stream definition.

class bloop.exceptions.InvalidTemplate[source]

This is not a valid template string.

class bloop.exceptions.MissingKey[source]

The instance must provide values for its key columns.

Extensions

DateTime

class DateTime(timezone=datetime.timezone.utc)

Drop-in replacement for DateTime. Support for arrow, delorean, and pendulum:

from bloop.ext.arrow import DateTime
from bloop.ext.delorean import DateTime
from bloop.ext.pendulum import DateTime
backing_type = "S"
python_type

Depending on where it's imported from, one of:

timezone = tzinfo

The timezone that values loaded from DynamoDB will use. Note that DateTimes are always stored in DynamoDB according to FIXED_ISO8601_FORMAT.

Timestamp

class Timestamp(timezone=datetime.timezone.utc)

Drop-in replacement for Timestamp. Support for arrow, delorean, and pendulum:

from bloop.ext.arrow import Timestamp
from bloop.ext.delorean import Timestamp
from bloop.ext.pendulum import Timestamp
backing_type = "N"
python_type

Depending on where it's imported from, one of:

timezone = tzinfo

The timezone that values loaded from DynamoDB will use.