Internal

In addition to documenting internal classes, this section describes complex internal systems (such as Streams, tracking modified columns via weakrefs) and specific parameters and error handling that Bloop employs when talking to DynamoDB (such as SessionWrapper's error inspection, and partial table validation).

SessionWrapper

class bloop.session.SessionWrapper(dynamodb=None, dynamodbstreams=None)[source]

Provides a consistent interface to DynamoDb and DynamoDbStreams clients.

If either client is None, that client is built using boto3.client().

Parameters
  • dynamodb -- A boto3 client for DynamoDB. Defaults to boto3.client("dynamodb").

  • dynamodbstreams -- A boto3 client for DynamoDbStreams. Defaults to boto3.client("dynamodbstreams").

clear_cache()[source]

Clear all cached table descriptions.

create_table(table_name, model)[source]

Create the model's table. Returns True if the table is being created, False otherwise.

Does not wait for the table to create, and does not validate an existing table. Will not raise "ResourceInUseException" if the table exists or is being created.

Parameters
  • table_name (str) -- The name of the table to create for the model.

  • model -- The BaseModel to create the table for.

Returns

True if the table is being created, False if the table exists

Return type

bool

delete_item(item)[source]

Delete an object in DynamoDB.

Returns Optional[dict] of read attributes depending on the "ReturnValues" kwarg. Return value is None when no attributes were requested.

Parameters

item -- Unpacked into kwargs for boto3.DynamoDB.Client.delete_item().

Raises

bloop.exceptions.ConstraintViolation -- if the condition (or atomic) is not met.

describe_stream(stream_arn, first_shard=None)[source]

Wraps boto3.DynamoDBStreams.Client.describe_stream(), handling continuation tokens.

Parameters
  • stream_arn (str) -- Stream arn, usually from the model's Meta.stream["arn"].

  • first_shard (str) -- (Optional) If provided, only shards after this shard id will be returned.

Returns

All shards in the stream, or a subset if first_shard is provided.

Return type

dict

describe_table(table_name)[source]

Polls until the table is ready, then returns the first result when the table was ready.

The returned dict is standardized to ensure all fields are present, even when empty or across different DynamoDB API versions. TTL information is also inserted.

Parameters

table_name -- The name of the table to describe

Returns

The (sanitized) result of DescribeTable["Table"]

Return type

dict

enable_backups(table_name, model)[source]

Calls UpdateContinuousBackups on the table according to model.Meta["continuous_backups"]

Parameters
  • table_name -- The name of the table to enable Continuous Backups on

  • model -- The model to get Continuous Backups settings from

enable_ttl(table_name, model)[source]

Calls UpdateTimeToLive on the table according to model.Meta["ttl"]

Parameters
  • table_name -- The name of the table to enable the TTL setting on

  • model -- The model to get TTL settings from

get_shard_iterator(*, stream_arn, shard_id, iterator_type, sequence_number=None)[source]

Wraps boto3.DynamoDBStreams.Client.get_shard_iterator().

Parameters
  • stream_arn (str) -- Stream arn. Usually Shard.stream_arn.

  • shard_id (str) -- Shard identifier. Usually Shard.shard_id.

  • iterator_type (str) -- "sequence_at", "sequence_after", "trim_horizon", or "latest"

  • sequence_number --

Returns

Iterator id, valid for 15 minutes.

Return type

str

Raises

bloop.exceptions.RecordsExpired -- Tried to get an iterator beyond the Trim Horizon.

get_stream_records(iterator_id)[source]

Wraps boto3.DynamoDBStreams.Client.get_records().

Parameters

iterator_id -- Iterator id. Usually Shard.iterator_id.

Returns

Dict with "Records" list (may be empty) and "NextShardIterator" str (may not exist).

Return type

dict

Raises
load_items(items)[source]

Loads any number of items in chunks, handling continuation tokens.

Parameters

items -- Unpacked in chunks into "RequestItems" for boto3.DynamoDB.Client.batch_get_item().

query_items(request)[source]

Wraps boto3.DynamoDB.Client.query().

Response always includes "Count" and "ScannedCount"

Parameters

request -- Unpacked into boto3.DynamoDB.Client.query()

save_item(item)[source]

Save an object to DynamoDB.

Returns Optional[dict] of read attributes depending on the "ReturnValues" kwarg. Return value is None when no attributes were requested.

Parameters

item -- Unpacked into kwargs for boto3.DynamoDB.Client.update_item().

Raises

bloop.exceptions.ConstraintViolation -- if the condition (or atomic) is not met.

scan_items(request)[source]

Wraps boto3.DynamoDB.Client.scan().

Response always includes "Count" and "ScannedCount"

Parameters

request -- Unpacked into boto3.DynamoDB.Client.scan()

search_items(mode, request)[source]

Invoke query/scan by name.

Response always includes "Count" and "ScannedCount"

Parameters
  • mode (str) -- "query" or "scan"

  • request -- Unpacked into boto3.DynamoDB.Client.query() or boto3.DynamoDB.Client.scan()

transaction_read(items)[source]

Wraps boto3.DynamoDB.Client.db.transact_get_items().

Parameters

items -- Unpacked into "TransactionItems" for boto3.DynamoDB.Client.transact_get_items()

Raises

bloop.exceptions.TransactionCanceled -- if the transaction was canceled.

Returns

Dict with "Records" list

transaction_write(items, client_request_token)[source]

Wraps boto3.DynamoDB.Client.db.transact_write_items().

Parameters
  • items -- Unpacked into "TransactionItems" for boto3.DynamoDB.Client.transact_write_items()

  • client_request_token -- Idempotency token valid for 10 minutes from first use. Unpacked into "ClientRequestToken"

Raises

bloop.exceptions.TransactionCanceled -- if the transaction was canceled.

validate_table(table_name, model)[source]

Polls until a creating table is ready, then verifies the description against the model's requirements.

The model may have a subset of all GSIs and LSIs on the table, but the key structure must be exactly the same. The table must have a stream if the model expects one, but not the other way around. When read or write units are not specified for the model or any GSI, the existing values will always pass validation.

Parameters
  • table_name (str) -- The name of the table to validate the model against.

  • model -- The BaseModel to validate the table of.

Raises

bloop.exceptions.TableMismatch -- When the table does not meet the constraints of the model.

Modeling

IMeta

class bloop.models.IMeta[source]

This class exists to provide autocomplete hints for computed variables on a model's Meta object.

Subclassing IMeta is OPTIONAL and rarely necessary; it is primarily available for users writing generic code over a class of models, eg. transforms on all columns of a model or a Marshmallow adapter.

import bloop.models


class User(BaseModel):
    id = Column(String, hash_key=True)
    email = Column(String, dynamo_name="e")

    class Meta(bloop.models.IMeta):
        read_units = 500

User.Meta.co  # Pycharm renders:
              #     +---------------------------+
              #     | User.Meta.columns         |
              #     | User.Meta.columns_by_name |
              #     +---------------------------+

Index

class bloop.models.Index(*, projection, hash_key=None, range_key=None, dynamo_name=None, **kwargs)[source]

Abstract base class for GSIs and LSIs.

An index must be bound to a model by calling bind_index(meta, model), which lets the index compute projected columns, validate hash and range keys, etc.

Parameters
  • projection -- Either "keys", "all", or a list of column name or objects. Included columns will be projected into the index. Key columns are always included.

  • hash_key -- The column that the index can be queried against. Always the table hash_key for LSIs.

  • range_key -- The column that the index can be sorted on. Always required for an LSI. Default is None.

  • dynamo_name (str) -- (Optional) The index's name in in DynamoDB. Defaults to the index’s name in the model.

dynamo_name

The name of this index in DynamoDB. Defaults to the index's name.

hash_key

The column that the index can be queried against. (LSI's hash_key is always the table hash_key.)

model

The model this index is attached to.

name

The name of this index in the model. Set by bind_index() during __init_subclass__().

projection

Computed during bind_index() during __init_subclass__().

{
    "available":  # Set of columns that can be returned from a query or search.
    "included":   # Set of columns that can be used in query and scan filters.
    "mode":       # "all", "keys", or "include"
    "strict":     # False if queries and scans can fetch non-included columns
}
range_key

The column that the index can be sorted on.

__copy__()[source]

Create a shallow copy of this Index. Primarily used when initializing models that subclass other abstract models or mixins (baseless classes that contain Columns and Indexes). You can override this method to change how derived models are created:

import copy


class MyIndex(Index):
    def __copy__(self):
        new = super().__copy__()
        new.derived = True
        return new


index = MyIndex(projection="keys", hash_key="some_column")
same = copy.copy(index)
assert same.derived  # True
Returns

A shallow copy of this Index, with the model and _name attributes unset, and the computed projection invalidated.

Binding

models.bind_column(name, column, force=False, recursive=False, copy=False) bloop.models.Column

Bind a column to the model with the given name.

This method is primarily used during BaseModel.__init_subclass__, although it can be used to easily attach a new column to an existing model:

import bloop.models

class User(BaseModel):
    id = Column(String, hash_key=True)


email = Column(String, dynamo_name="e")
bound = bloop.models.bind_column(User, "email", email)
assert bound is email

# rebind with force, and use a copy
bound = bloop.models.bind_column(User, "email", email, force=True, copy=True)
assert bound is not email

If an existing index refers to this column, it will be updated to point to the new column using refresh_index(), including recalculating the index projection. Meta attributes including Meta.columns, Meta.hash_key, etc. will be updated if necessary.

If name or the column's dynamo_name conflicts with an existing column or index on the model, raises InvalidModel unless force is True. If recursive is True and there are existing subclasses of model, a copy of the column will attempt to bind to each subclass. The recursive calls will not force the bind, and will always use a new copy. If copy is True then a copy of the provided column is used. This uses a shallow copy via __copy__().

Parameters
  • model -- The model to bind the column to.

  • name -- The name to bind the column as. In effect, used for setattr(model, name, column)

  • column -- The column to bind to the model.

  • force -- Unbind existing columns or indexes with the same name or dynamo_name. Default is False.

  • recursive -- Bind to each subclass of this model. Default is False.

  • copy -- Use a copy of the column instead of the column directly. Default is False.

Returns

The bound column. This is a new column when copy is True, otherwise the input column.

models.bind_index(name, index, force=False, recursive=True, copy=False) bloop.models.Index

Bind an index to the model with the given name.

This method is primarily used during BaseModel.__init_subclass__, although it can be used to easily attach a new index to an existing model:

import bloop.models

class User(BaseModel):
    id = Column(String, hash_key=True)
    email = Column(String, dynamo_name="e")


by_email = GlobalSecondaryIndex(projection="keys", hash_key="email")
bound = bloop.models.bind_index(User, "by_email", by_email)
assert bound is by_email

# rebind with force, and use a copy
bound = bloop.models.bind_index(User, "by_email", by_email, force=True, copy=True)
assert bound is not by_email

If name or the index's dynamo_name conflicts with an existing column or index on the model, raises InvalidModel unless force is True. If recursive is True and there are existing subclasses of model, a copy of the index will attempt to bind to each subclass. The recursive calls will not force the bind, and will always use a new copy. If copy is True then a copy of the provided index is used. This uses a shallow copy via __copy__().

Parameters
  • model -- The model to bind the index to.

  • name -- The name to bind the index as. In effect, used for setattr(model, name, index)

  • index -- The index to bind to the model.

  • force -- Unbind existing columns or indexes with the same name or dynamo_name. Default is False.

  • recursive -- Bind to each subclass of this model. Default is False.

  • copy -- Use a copy of the index instead of the index directly. Default is False.

Returns

The bound index. This is a new column when copy is True, otherwise the input index.

models.refresh_index(index) None

Recalculate the projection, hash_key, and range_key for the given index.

Parameters
  • meta -- model.Meta to find columns by name

  • index -- The index to refresh

models.unbind(name=None, dynamo_name=None) None

Unconditionally remove any columns or indexes bound to the given name or dynamo_name.

import bloop.models


class User(BaseModel):
    id = Column(String, hash_key=True)
    email = Column(String, dynamo_name="e")
    by_email = GlobalSecondaryIndex(projection="keys", hash_key=email)


for dynamo_name in ("id", "e", "by_email"):
    bloop.models.unbind(User.Meta, dynamo_name=dynamo_name)

assert not User.Meta.columns
assert not User.Meta.indexes
assert not User.Meta.keys

Warning

This method does not pre- or post- validate the model with the requested changes. You are responsible for ensuring the model still has a hash key, that required columns exist for each index, etc.

Parameters
  • meta -- model.Meta to remove the columns or indexes from

  • name -- column or index name to unbind by. Default is None.

  • dynamo_name -- column or index name to unbind by. Default is None.

Types

DynamicType

class bloop.types.DynamicType[source]

Dynamically dumps a value based on its python type.

This is used by DynamicList, DynamicMap to handle path resolution before the value for an arbitrary path is known. For example, given the following model:

class UserUpload(BaseModel):
    id = Column(String, hash_key=True)
    doc = Column(DynamicMap)

And an instance as follows:

u = UserUpload(id="numberoverzero")
u.doc = {
    "foo": ["bar", {0: "a", 1: "b"}, True]
}

The renderer must know a type for UserUpload.doc["foo"][1][0] before the value is provided. An instance of this type will return itself for any value during __getitem__, and then inspects the value type during _dump to create the correct simple type.

Because DynamicType requires access to the DynamoDB type annotation, you must call _load and _dump, as dynamo_load and dynamo_dump can't be implemented. For example:

DynamicType.i._load({"S": "2016-08-09T01:16:25.322849+00:00"})
    -> "2016-08-09T01:16:25.322849+00:00"
DynamicType.i._load({"N": "3.14"}) -> Decimal('3.14')

DynamicType.i._dump([1, True, "f"])
    -> {"L": [{"N": "1"}, {"BOOL": true}, {"S": "f"}]}
DynamicType.i._dump({b"1", b"2"}) -> {"BS": ["MQ==", b"Mg=="]}
i

Singleton instance of the class.

backing_type = None
python_type = None

Actions

class bloop.actions.Action(action_type: bloop.actions.ActionType, value)[source]

Encapsulates an update value and how Dynamo should apply the update.

Generally, you will only need to use the Action class if you are updating an atomic counter (ADD) or making additions and deletions from a set (ADD, DELETE).

You do not need to use an Action for SET or REMOVE updates.

>>> import bloop.actions
>>> from my_models import Website, User
>>> user = User()
>>> website = Website()
# SET and REMOVE don't need an explicit action
>>> user.verified = True
>>> del user.pw_hash
# ADD and DELETE need explicit actions
>>> website.view_count = bloop.actions.add(1)
>>> website.remote_addrs = bloop.actions.delete({"::0", "localhost"})
class bloop.actions.ActionType(value)[source]

Represents how Dynamo should apply an update.

Add = ('ADD', '{name_ref.name} {value_ref.name}', False)
Delete = ('DELETE', '{name_ref.name} {value_ref.name}', False)
Remove = ('REMOVE', '{name_ref.name}', True)
Set = ('SET', '{name_ref.name}={value_ref.name}', True)
new_action(value) bloop.actions.Action[source]

Convenience function to instantiate an Action with this type

render(name_ref, value_ref)[source]

name_ref, value_ref should be instances of bloop.conditions.Reference or None

bloop.actions.unwrap(x: Union[bloop.actions.Action, Any]) Any[source]

return an action's inner value

bloop.actions.wrap(x: Any) bloop.actions.Action[source]

return an action: REMOVE if x is None else SET

Searching

PreparedSearch

class bloop.search.PreparedSearch[source]

Mutable search object.

Creates SearchModelIterator objects which can be used to iterate the results of a query or search multiple times.

prepare(engine=None, mode=None, model=None, index=None, key=None, filter=None, projection=None, consistent=None, forward=None, parallel=None)[source]

Validates the search parameters and builds the base request dict for each Query/Scan call.

SearchIterator

class bloop.search.SearchIterator(*, session, model, index, request, projected)[source]

Reusable search iterator.

Parameters
  • session -- SessionWrapper to make Query, Scan calls.

  • model -- BaseModel for repr only.

  • index -- Index to search, or None.

  • request (dict) -- The base request dict for each search.

  • projected (set) -- Set of Column that should be included in each result.

all()[source]

Eagerly load all results and return a single list. If there are no results, the list is empty.

Returns

A list of results.

property count

Number of items that have been loaded from DynamoDB so far, including buffered items.

property exhausted

True if there are no more results.

first()[source]

Return the first result. If there are no results, raises ConstraintViolation.

Returns

The first result.

Raises

bloop.exceptions.ConstraintViolation -- No results.

move_to(token)[source]

Restore an iterator to the state stored in a token. This will reset all iterator state, including count, scanned, and exhausted properties.

Parameters

token -- a SearchIterator.token

one()[source]

Return the unique result. If there is not exactly one result, raises ConstraintViolation.

Returns

The unique result.

Raises

bloop.exceptions.ConstraintViolation -- Not exactly one result.

reset()[source]

Reset to the initial state, clearing the buffer and zeroing count and scanned.

property scanned

Number of items that DynamoDB evaluated, before any filter was applied.

property token

JSON-serializable representation of the current SearchIterator state.

Use iterator.move_to(token) to move an iterator to this position.

Implementations will always include a "ExclusiveStartKey" key but may include additional metadata. The iterator's count and scanned values are not preserved.

Returns

Iterator state as a json-friendly dict

SearchModelIterator

class bloop.search.SearchModelIterator(*, engine, model, index, request, projected)[source]

Reusable search iterator that unpacks result dicts into model instances.

Parameters
  • engine -- Engine to unpack models with.

  • model -- BaseModel being searched.

  • index -- Index to search, or None.

  • request (dict) -- The base request dict for each search call.

  • projected (set) -- Set of Column that should be included in each result.

all()

Eagerly load all results and return a single list. If there are no results, the list is empty.

Returns

A list of results.

property count

Number of items that have been loaded from DynamoDB so far, including buffered items.

property exhausted

True if there are no more results.

first()

Return the first result. If there are no results, raises ConstraintViolation.

Returns

The first result.

Raises

bloop.exceptions.ConstraintViolation -- No results.

move_to(token)

Restore an iterator to the state stored in a token. This will reset all iterator state, including count, scanned, and exhausted properties.

Parameters

token -- a SearchIterator.token

one()

Return the unique result. If there is not exactly one result, raises ConstraintViolation.

Returns

The unique result.

Raises

bloop.exceptions.ConstraintViolation -- Not exactly one result.

reset()

Reset to the initial state, clearing the buffer and zeroing count and scanned.

property scanned

Number of items that DynamoDB evaluated, before any filter was applied.

property token

JSON-serializable representation of the current SearchIterator state.

Use iterator.move_to(token) to move an iterator to this position.

Implementations will always include a "ExclusiveStartKey" key but may include additional metadata. The iterator's count and scanned values are not preserved.

Returns

Iterator state as a json-friendly dict

Streaming

Coordinator

class bloop.stream.coordinator.Coordinator(*, session, stream_arn)[source]

Encapsulates the shard-level management for a whole Stream.

Parameters
  • session (SessionWrapper) -- Used to make DynamoDBStreams calls.

  • stream_arn (str) -- Stream arn, usually from the model's Meta.stream["arn"].

advance_shards()[source]

Poll active shards for records and insert them into the buffer. Rotate exhausted shards.

Returns immediately if the buffer isn't empty.

heartbeat()[source]

Keep active shards with "trim_horizon", "latest" iterators alive by advancing their iterators.

move_to(position)[source]

Set the Coordinator to a specific endpoint or time, or load state from a token.

Parameters

position -- "trim_horizon", "latest", datetime, or a Coordinator.token

remove_shard(shard, drop_buffered_records=False)[source]

Remove a Shard from the Coordinator. Drops all buffered records from the Shard.

If the Shard is active or a root, it is removed and any children promoted to those roles.

Parameters
  • shard -- The shard to remove :type shard: Shard

  • drop_buffered_records (bool) -- Whether records from this shard should be removed. Default is False.

property token

JSON-serializable representation of the current Stream state.

Use Engine.stream(YourModel, token) to create an identical stream, or stream.move_to(token) to move an existing stream to this position.

Returns

Stream state as a json-friendly dict

Return type

dict

Shard

class bloop.stream.shard.Shard(*, stream_arn, shard_id, iterator_id=None, iterator_type=None, sequence_number=None, parent=None, session=None)[source]

Encapsulates the record-level iterator management for a single Shard.

Parameters
  • stream_arn (str) -- Stream arn, usually from the model's Meta.stream["arn"].

  • shard_id (str) -- Shard id, usually from a DescribeStream call.

  • iterator_id (str) -- (Optional) An existing Shard iterator id. Default is None.

  • iterator_type (str) -- (Optional) The shard's iterator type, usually when loading from a token. One of "trim_horizon", "at_sequence", "after_sequence", or "latest". Default is None.

  • sequence_number (str) -- (Optional) SequenceNumber for an "at_sequence" or "after_sequence" iterator type. Default is None.

  • parent (Shard) -- (Optional) This shard's parent. Default is None.

  • session (SessionWrapper) -- Used to make DynamoDBStreams calls.

property exhausted

True if the shard is closed and there are no additional records to get.

get_records()[source]

Get the next set of records in this shard. An empty list doesn't guarantee the shard is exhausted.

Returns

A list of reformatted records. May be empty.

jump_to(*, iterator_type, sequence_number=None)[source]

Move to a new position in the shard using the standard parameters to GetShardIterator.

Parameters
  • iterator_type (str) -- "trim_horizon", "at_sequence", "after_sequence", "latest"

  • sequence_number (str) -- (Optional) Sequence number to use with at/after sequence. Default is None.

load_children()[source]

If the Shard doesn't have any children, tries to find some from DescribeStream.

If the Shard is open this won't find any children, so an empty response doesn't mean the Shard will never have children.

seek_to(position)[source]

Move the Shard's iterator to the earliest record after the datetime time.

Returns the first records at or past position. If the list is empty, the seek failed to find records, either because the Shard is exhausted or it reached the HEAD of an open Shard.

Parameters

position (datetime) -- The position in time to move to.

Returns

A list of the first records found after position. May be empty.

shard_id

The shard id is set once on creation and never changes

stream_arn

The stream arn is set once on creation and never changes

property token

JSON-serializable representation of the current Shard state.

The token is enough to rebuild the Shard as part of rebuilding a Stream.

Returns

Shard state as a json-friendly dict

Return type

dict

walk_tree()[source]

Generator that yields each Shard by walking the shard's children in order.

RecordBuffer

class bloop.stream.buffer.RecordBuffer[source]

Maintains a total ordering for records across any number of shards.

Methods are thin wrappers around heapq. Buffer entries have the form:

where total_ordering is a tuple of (created_at, sequence_number, monotonic_clock) created from each record as it is inserted.

clear()[source]

Drop the entire buffer.

clock()[source]

Returns a monotonically increasing integer.

Do not rely on the clock using a fixed increment.

>>> buffer = RecordBuffer()
>>> buffer.clock()
3
>>> buffer.clock()
40
>>> buffer.clock()
41
>>> buffer.clock()
300
Returns

A unique clock value guaranteed to be larger than every previous value

Return type

int

peek()[source]

A pop() without removing the (record, shard) from the buffer.

Returns

Oldest (record, shard) tuple.

pop()[source]

Pop the oldest (lowest total ordering) record and the shard it came from.

Returns

Oldest (record, shard) tuple.

push(record, shard)[source]

Push a new record into the buffer

Parameters
  • record (dict) -- new record

  • shard (Shard) -- Shard the record came from

push_all(record_shard_pairs)[source]

Push multiple (record, shard) pairs at once, with only one heapq.heapify() call to maintain order.

Parameters

record_shard_pairs -- list of (record, shard) tuples (see push()).

Transactions

class bloop.transactions.Transaction(engine)[source]

Holds a collection of transaction items to be rendered into a PreparedTransaction.

If used as a context manager, calls prepare() and commit() when the outermost context exits.

>>> engine = Engine()
>>> tx = Transaction(engine)
>>> tx.mode = "w"
>>> p1 = tx.prepare()
>>> p2 = tx.prepare()  # different instances

>>> with tx:
...     pass
>>> #  tx.prepare().commit() is called here
prepare()[source]

Create a new PreparedTransaction that can be committed.

This is called automatically when exiting the transaction as a context:

>>> engine = Engine()
>>> tx = WriteTransaction(engine)
>>> prepared = tx.prepare()
>>> prepared.commit()

# automatically calls commit when exiting
>>> with WriteTransaction(engine) as tx:
...     # modify the transaction here
...     pass
>>> # tx commits here
Returns

class bloop.transactions.PreparedTransaction[source]

Transaction that can be committed once or more.

Usually created from a Transaction instance.

commit() None[source]

Commit the transaction with a fixed transaction id.

A read transaction can call commit() any number of times, while a write transaction can only use the same tx_id for 10 minutes from the first call.

first_commit_at: Optional[datetime.datetime] = None

When the transaction was first committed at. A prepared write transaction can only call commit again within 10 minutes of its first commit. This is None until commit() is called at least once.

prepare(engine, mode, items) None[source]

Create a unique transaction id and dumps the items into a cached request object.

tx_id: str

Unique id used as the "ClientRequestToken" for write transactions. This is generated but not sent with a read transaction, since reads are not idempotent.

class bloop.transactions.TxItem(type: bloop.transactions.TxType, obj: Any, condition: Optional[Any])[source]

Includes the type, an object, and its condition settings.

The common way to construct an item is through the new method:

>>> get_item = TxItem.new("get", some_obj)
>>> save_item = TxItem.new("save", some_obj)
property condition

An optional condition that constrains an update

property is_update

Whether this should render an "UpdateExpression" in the TransactItem

property obj

The object that will be modified, persisted, or referenced in a transaction

property should_render_obj

Whether the object values should be rendered in the TransactItem

property type

How this item will be used in a transaction

class bloop.transactions.TxType(value)[source]

Enum whose value is the wire format of its name

classmethod by_alias(name: str) bloop.transactions.TxType[source]

get a type by the common bloop operation name: get/check/delete/save

Conditions

ObjectTracking

class bloop.conditions.ObjectTracking(dict=None)[source]

ReferenceTracker

class bloop.conditions.ReferenceTracker(engine)[source]

De-dupes reference names for the same path segments and generates unique placeholders for all names, paths, and values. The reference tracker can also forget references if, for example, a value fails to render but the rest of the condition should be left intact. This is primarily used when a value is unexpectedly dumped as None, or an expression uses another column as a value.

Parameters

engine (Engine) -- Used to dump column values for value refs.

any_ref(*, column, value=<Sentinel[missing]>, inner=False) bloop.conditions.Reference[source]

Returns a NamedTuple of (name, type, value) for any type of reference.

# Name ref
>>> tracker.any_ref(column=User.email)
Reference(name='email', type='name', value=None)

# Value ref
>>> tracker.any_ref(column=User.email, value='user@domain')
Reference(name='email', type='value', value={'S': 'user@domain'})

# Passed as value ref, but value is another column
>>> tracker.any_ref(column=User.email, value=User.other_column)
Reference(name='other_column', type='name', value=None)
Parameters
  • column (ComparisonMixin) -- The column to reference. If value is None, this will render a name ref for this column.

  • value -- (Optional) If provided, this is likely a value ref. If value is also a column, this will render a name ref for that column (not the column parameter).

  • inner (bool) -- (Optional) True if this is a value ref and it should be dumped through a collection's inner type, and not the collection type itself. Default is False.

Returns

A name or value reference

Return type

bloop.conditions.Reference

pop_refs(*refs)[source]

Decrement the usage of each ref by 1.

If this was the last use of a ref, remove it from attr_names or attr_values.

ConditionRenderer

class bloop.conditions.ConditionRenderer(engine)[source]

Renders collections of BaseCondition into DynamoDB's wire format for expressions, including:

  • "ConditionExpression" -- used in conditional operations

  • "FilterExpression" -- used in queries and scans to ignore results that don't match the filter

  • "KeyConditionExpressions" -- used to describe a query's hash (and range) key(s)

  • "ProjectionExpression" -- used to include a subset of possible columns in the results of a query or scan

  • "UpdateExpression" -- used to save objects

Normally, you will only need to call render() to handle any combination of conditions. You can also call each individual render_* function to control how multiple conditions of each type are applied.

You can collect the rendered condition at any time through rendered.

>>> renderer.render(obj=user, atomic=True)
>>> renderer.output
{'ConditionExpression': '((#n0 = :v1) AND (attribute_not_exists(#n2)) AND (#n4 = :v5))',
 'ExpressionAttributeNames': {'#n0': 'age', '#n2': 'email', '#n4': 'id'},
 'ExpressionAttributeValues': {':v1': {'N': '3'}, ':v5': {'S': 'some-user-id'}}}
Parameters

engine (Engine) -- Used to dump values in conditions into the appropriate wire format.

property output

The wire format for all conditions that have been rendered. A new ConditionRenderer should be used for each operation.

render(obj=None, condition=None, update=False, filter=None, projection=None, key=None)[source]

Main entry point for rendering multiple expressions. All parameters are optional, except obj when atomic or update are True.

Parameters
  • obj -- (Optional) An object to render an atomic condition or update expression for. Required if update or atomic are true. Default is False.

  • condition (BaseCondition) -- (Optional) Rendered as a "ConditionExpression" for a conditional operation. If atomic is True, the two are rendered in an AND condition. Default is None.

  • update (bool) -- (Optional) True if an "UpdateExpression" should be rendered for obj. Default is False.

  • filter (BaseCondition) -- (Optional) A filter condition for a query or scan, rendered as a "FilterExpression". Default is None.

  • projection (set Column) -- (Optional) A set of Columns to include in a query or scan, rendered as a "ProjectionExpression". Default is None.

  • key (BaseCondition) -- (Optional) A key condition for queries, rendered as a "KeyConditionExpression". Default is None.

Built-in Conditions

class bloop.conditions.BaseCondition(operation, *, column=None, values=None)[source]
class bloop.conditions.AndCondition(*values)[source]
class bloop.conditions.OrCondition(*values)[source]
class bloop.conditions.NotCondition(value)[source]
class bloop.conditions.ComparisonCondition(operation, column, value)[source]
class bloop.conditions.BeginsWithCondition(column, value)[source]
class bloop.conditions.BetweenCondition(column, lower, upper)[source]
class bloop.conditions.ContainsCondition(column, value)[source]
class bloop.conditions.InCondition(column, values)[source]
class bloop.conditions.ComparisonMixin[source]
is_(value)

Return self==value.

is_not(value)

Return self!=value.

Utilities

class bloop.util.Sentinel(name, *args, **kwargs)[source]

Simple string-based placeholders for missing or special values.

Names are unique, and instances are re-used for the same name:

>>> from bloop.util import Sentinel
>>> empty = Sentinel("empty")
>>> empty
<Sentinel[empty]>
>>> same_token = Sentinel("empty")
>>> empty is same_token
True

This removes the need to import the same signal or placeholder value everywhere; two modules can create Sentinel("some-value") and refer to the same object. This is especially helpful where None is a possible value, and so can't be used to indicate omission of an optional parameter.

Implements __repr__ to render nicely in function signatures. Standard object-based sentinels:

>>> missing = object()
>>> def some_func(optional=missing):
...     pass
...
>>> help(some_func)
Help on function some_func in module __main__:

some_func(optional=<object object at 0x7f0f3f29e5d0>)

With the Sentinel class:

>>> from bloop.util import Sentinel
>>> missing = Sentinel("Missing")
>>> def some_func(optional=missing):
...     pass
...
>>> help(some_func)
Help on function some_func in module __main__:

some_func(optional=<Sentinel[Missing]>)
Parameters

name (str) -- The name for this sentinel.

Implementation Details

Models must be Hashable

By default python makes all user classes are hashable:

>>> class Dict: pass
>>> hash(Dict())
8771845190811

Classes are unhashable in two cases:

  1. The class declares __hash__ = None.

  2. The class implements __eq__ but not __hash__

In either case, during __init_subclass__(), the ensure_hash() function will manually locate the closest __hash__ method in the model's base classes:

if getattr(cls, "__hash__", None) is not None:
    return
for base in cls.__mro__:
    hash_fn = getattr(base, "__hash__")
    if hash_fn:
        break
else:
    hash_fn = object.__hash__
cls.__hash__ = hash_fn

This is required because python doesn't provide a default hash method when __eq__ is implemented, and won't fall back to a parent class's definition:

>>> class Base:
...     def __hash__(self):
...         print("Base.__hash__")
...         return 0
...
>>> class Derived(Base):
...     def __eq__(self, other):
...         return True
...

>>> hash(Base())
Base.__hash__
>>> hash(Derived())
TypeError: unhashable type: 'Derived'

Stream Ordering Guarantees

The DynamoDB Streams API exposes a limited amount of temporal information and few options for navigating within a shard. Due to these constraints, it was hard to reduce the API down to a single __next__ call without compromising performance or ordering.

The major challenges described below include:

  • Creating a plausible total ordering across shards

  • Managing an iterator:

    • Refreshing expired iterators without data loss

    • Preventing low-volume iterators without sequence numbers from expiring

    • Promoting children when a shard runs out of records

    • Distinguishing open shards from gaps between records

  • Managing multiple shards:

    • Mapping stream "trim_horizon" and "latest" to a set of shards

    • Buffering records from multiple shards and applying a total ordering

  • Loading and saving tokens:

    • Simplifying an entire stream into a human-readable json blob

    • Pruning old shards when loading

    • Inserting new shards when loading

    • Resolving TrimmedDataAccessException for old shards

The following sections use a custom notation to describe shards and records.

Sn and Rn represent shards and records, where n is an integer:

R11, R13, R32  # In general, RnX comes from Sn
S1, S12, S23   # In general, SnX is a child of Sn

< represents chronological ordering between records:

R12 < R13  # In general, RX < RY when X < Y

=> represents parent/child relationships between shards:

S1 => {}          # S1 has no children
S2 => S21         # S2 has one child
# In general, SnX and SnY are adjacent children of Sn
S3 => {S31, S32}

~ represents two shards that are not within the same lineage:

S1 ~ S2  # Not related

S1 => S12 => S13; S4 => S41
# Both child shards, but of different lineages
S12 ~ S41

: represents a set of records from a single shard:

S1: R11, R12   # no guaranteed order
S2: R23 < R24  # guaranteed order

Shards and Lineage

DynamoDB only offers three guarantees for chronological ordering:

  1. All records within a single Shard.

  2. All parent shard records are before all child shard records.

  3. Changes to the same hash will always go to the same shard. When a parent splits, further changes to that hash will go to only one child of that shard, and always the same child.

Given the following:

S1 ~ S2
S1: R11 < R12 < R13
R2: R24 < R25 < R26

The first rule offers no guarantees between R1x and R2x for any x.

Given the following:

S1 => {S12, S13}
S1:  R111 < R112
S12: R124 < R125
S13: R136 < R137

The second rule guarantees both of the following:

R111 < R112 < R124 < R125
R111 < R112 < R136 < R137

but does not guarantee any ordering between R12x and R13x for any x.

Given the following:

S1 => {S2, S3}
R40, R41, R42  # all modify the same hash key
R5, R7, R9     # modify different hash keys

S1: R40, R5

The third rule guarantees that R41 and R42 will both be in either S2 or S3. Meanwhile, it offers no guarantee about where R7 and R9 will be. Both of the following are possible:

S1: R40, R5
S2: R41, R42, R7
S3: R9

S1: R40, R5
S2: R7, R9
S3: R41, R42

But the following is not possible:

S1: R40, R5
S2: R41, R7
S3: R42, R9

Merging Shards

Low-throughput tables will only have a single open shard at any time, and can rely on the first and second guarantees above for rebuilding the exact order of changes to the table.

For high throughput tables, there can be more than one root shard, and each shard lineage can have more than one child open at once. In this case, Bloop's streaming interface can't guarantee ordering for all records in the stream, because there is no absolute chronological ordering across a partitioned table. Instead, Bloop will fall back to a total ordering scheme that uses each record's ApproximateCreationDateTime and, when two records have the same creation time, a monotonically increasing integral clock to break ties.

Consider the following stream:

S0 => {S1, S2}
S0: R00
S1: R11 < R12 < R13
S2: R24 < R25 < R26

Where each record has the following (simplified) creation times:

Record

ApproximateCreationDateTime

R00

7 hours ago

R11

6 hours ago

R12

4 hours ago

R13

2 hours ago

R24

4 hours ago

R25

3 hours ago

R26

3 hours ago

Bloop performs the following in one step:

  1. The second guarantee says all records in S0 are before records in that shard's children:

    R00 < (R11, R12, R13, R24, R25, R26)
    
  2. The first guarantee says all records in the same shard are ordered:

    R00 < ((R11 < R12 < R13), (R24 < R25 < R26)
    
  3. Then, ApproximateCreationDateTime is used to partially merge S1 and S2 records:

    R00 < R11 < (R12, R24) < (R25 < R26) < R13
    
  4. There were still two collisions after using ApproximateCreationDateTime: R12, R24 and R25, R26.

    1. To resolve (R12, R24) Bloop breaks the tie with an incrementing clock, and assigns R12 < R24.

    2. (R25, R26) is resolved because the records are in the same shard.

The final ordering is:

R00 < R11 < R12 < R24 < R25 < R26 < R13

Record Gaps

Bloop initially performs up to 5 "catch up" calls to GetRecords when advancing an iterator. If a GetRecords call returns a NextShardIterator but no records it's either due to being nearly caught up to "latest" in an open shard, or from traversing a period of time in the shard with no activity. Endlessly polling until a record comes back would cause every open shard to hang for up to 4 hours, while only calling GetRecords once could desynchronize one shard's iterator from others.

By retrying up to 5 times on an empty GetRecords response (that still has a NextShardIterator) Bloop is confident that any gaps in the shard have been advanced. This is because it takes approximately 4-5 calls to traverse an empty shard completely. In other words, the 6th empty response almost certainly indicates that the iterator is caught up to latest in an open shard, and it's safe to cut back to one call at a time.

Why only 5 calls?

This number came from extensive testing which compared the number of empty responses returned for shards with various activity cadences. It's reasonable to assume that this number would only decrease with time, as advances in software and hardware would enable DynamoDB to cover larger periods in time with the same time investment. Because each call from a customer incurs overhead of creating and indexing each new iterator id, as well as the usual expensive signature-based authentication, it's in DynamoDB's interest to minimize the number of calls a customer needs to traverse a sparsely populated shard.

At worst DynamoDB starts requiring more calls to fully traverse an empty shard, which could result in reordering between records in shards with vastly different activity patterns. Since the creation-time-based ordering is approximate, this doesn't relax the guarantees that Bloop's streaming interface provides.

Changing the Limit

In general you should not need to worry about this value, and leave it alone. In the unlikely case that DynamoDB does increase the number of calls required to traverse an empty shard, Bloop will be updated soon after.

If you still need to tune this value:

import bloop.stream.shard
bloop.stream.shard.CALLS_TO_REACH_HEAD = 5

The exact value of this parameter will have almost no impact on performance in high-activity streams, and there are so few shards in low-activity streams that the total cost will be on par with the other calls to set up the stream.