Internal¶
In addition to documenting internal classes, this section describes complex internal systems (such as Streams, tracking modified columns via weakrefs) and specific parameters and error handling that Bloop employs when talking to DynamoDB (such as SessionWrapper's error inspection, and partial table validation).
SessionWrapper¶
- class bloop.session.SessionWrapper(dynamodb=None, dynamodbstreams=None)[source]¶
Provides a consistent interface to DynamoDb and DynamoDbStreams clients.
If either client is None, that client is built using
boto3.client()
.- Parameters
dynamodb -- A boto3 client for DynamoDB. Defaults to
boto3.client("dynamodb")
.dynamodbstreams -- A boto3 client for DynamoDbStreams. Defaults to
boto3.client("dynamodbstreams")
.
- create_table(table_name, model)[source]¶
Create the model's table. Returns True if the table is being created, False otherwise.
Does not wait for the table to create, and does not validate an existing table. Will not raise "ResourceInUseException" if the table exists or is being created.
- delete_item(item)[source]¶
Delete an object in DynamoDB.
Returns Optional[dict] of read attributes depending on the "ReturnValues" kwarg. Return value is None when no attributes were requested.
- Parameters
item -- Unpacked into kwargs for
boto3.DynamoDB.Client.delete_item()
.- Raises
bloop.exceptions.ConstraintViolation -- if the condition (or atomic) is not met.
- describe_stream(stream_arn, first_shard=None)[source]¶
Wraps
boto3.DynamoDBStreams.Client.describe_stream()
, handling continuation tokens.
- describe_table(table_name)[source]¶
Polls until the table is ready, then returns the first result when the table was ready.
The returned dict is standardized to ensure all fields are present, even when empty or across different DynamoDB API versions. TTL information is also inserted.
- Parameters
table_name -- The name of the table to describe
- Returns
The (sanitized) result of DescribeTable["Table"]
- Return type
- enable_backups(table_name, model)[source]¶
Calls UpdateContinuousBackups on the table according to model.Meta["continuous_backups"]
- Parameters
table_name -- The name of the table to enable Continuous Backups on
model -- The model to get Continuous Backups settings from
- enable_ttl(table_name, model)[source]¶
Calls UpdateTimeToLive on the table according to model.Meta["ttl"]
- Parameters
table_name -- The name of the table to enable the TTL setting on
model -- The model to get TTL settings from
- get_shard_iterator(*, stream_arn, shard_id, iterator_type, sequence_number=None)[source]¶
Wraps
boto3.DynamoDBStreams.Client.get_shard_iterator()
.- Parameters
stream_arn (str) -- Stream arn. Usually
Shard.stream_arn
.shard_id (str) -- Shard identifier. Usually
Shard.shard_id
.iterator_type (str) -- "sequence_at", "sequence_after", "trim_horizon", or "latest"
sequence_number --
- Returns
Iterator id, valid for 15 minutes.
- Return type
- Raises
bloop.exceptions.RecordsExpired -- Tried to get an iterator beyond the Trim Horizon.
- get_stream_records(iterator_id)[source]¶
Wraps
boto3.DynamoDBStreams.Client.get_records()
.- Parameters
iterator_id -- Iterator id. Usually
Shard.iterator_id
.- Returns
Dict with "Records" list (may be empty) and "NextShardIterator" str (may not exist).
- Return type
- Raises
bloop.exceptions.RecordsExpired -- The iterator moved beyond the Trim Horizon since it was created.
bloop.exceptions.ShardIteratorExpired -- The iterator was created more than 15 minutes ago.
- load_items(items)[source]¶
Loads any number of items in chunks, handling continuation tokens.
- Parameters
items -- Unpacked in chunks into "RequestItems" for
boto3.DynamoDB.Client.batch_get_item()
.
- query_items(request)[source]¶
Wraps
boto3.DynamoDB.Client.query()
.Response always includes "Count" and "ScannedCount"
- Parameters
request -- Unpacked into
boto3.DynamoDB.Client.query()
- save_item(item)[source]¶
Save an object to DynamoDB.
Returns Optional[dict] of read attributes depending on the "ReturnValues" kwarg. Return value is None when no attributes were requested.
- Parameters
item -- Unpacked into kwargs for
boto3.DynamoDB.Client.update_item()
.- Raises
bloop.exceptions.ConstraintViolation -- if the condition (or atomic) is not met.
- scan_items(request)[source]¶
Wraps
boto3.DynamoDB.Client.scan()
.Response always includes "Count" and "ScannedCount"
- Parameters
request -- Unpacked into
boto3.DynamoDB.Client.scan()
- search_items(mode, request)[source]¶
Invoke query/scan by name.
Response always includes "Count" and "ScannedCount"
- Parameters
mode (str) -- "query" or "scan"
request -- Unpacked into
boto3.DynamoDB.Client.query()
orboto3.DynamoDB.Client.scan()
- transaction_read(items)[source]¶
Wraps
boto3.DynamoDB.Client.db.transact_get_items()
.- Parameters
items -- Unpacked into "TransactionItems" for
boto3.DynamoDB.Client.transact_get_items()
- Raises
bloop.exceptions.TransactionCanceled -- if the transaction was canceled.
- Returns
Dict with "Records" list
- transaction_write(items, client_request_token)[source]¶
Wraps
boto3.DynamoDB.Client.db.transact_write_items()
.- Parameters
items -- Unpacked into "TransactionItems" for
boto3.DynamoDB.Client.transact_write_items()
client_request_token -- Idempotency token valid for 10 minutes from first use. Unpacked into "ClientRequestToken"
- Raises
bloop.exceptions.TransactionCanceled -- if the transaction was canceled.
- validate_table(table_name, model)[source]¶
Polls until a creating table is ready, then verifies the description against the model's requirements.
The model may have a subset of all GSIs and LSIs on the table, but the key structure must be exactly the same. The table must have a stream if the model expects one, but not the other way around. When read or write units are not specified for the model or any GSI, the existing values will always pass validation.
- Parameters
- Raises
bloop.exceptions.TableMismatch -- When the table does not meet the constraints of the model.
Modeling¶
IMeta¶
- class bloop.models.IMeta[source]¶
This class exists to provide autocomplete hints for computed variables on a model's Meta object.
Subclassing IMeta is OPTIONAL and rarely necessary; it is primarily available for users writing generic code over a class of models, eg. transforms on all columns of a model or a Marshmallow adapter.
import bloop.models class User(BaseModel): id = Column(String, hash_key=True) email = Column(String, dynamo_name="e") class Meta(bloop.models.IMeta): read_units = 500 User.Meta.co # Pycharm renders: # +---------------------------+ # | User.Meta.columns | # | User.Meta.columns_by_name | # +---------------------------+
Index¶
- class bloop.models.Index(*, projection, hash_key=None, range_key=None, dynamo_name=None, **kwargs)[source]¶
Abstract base class for GSIs and LSIs.
An index must be bound to a model by calling
bind_index(meta, model)
, which lets the index compute projected columns, validate hash and range keys, etc.See also
- Parameters
projection -- Either "keys", "all", or a list of column name or objects. Included columns will be projected into the index. Key columns are always included.
hash_key -- The column that the index can be queried against. Always the table hash_key for LSIs.
range_key -- The column that the index can be sorted on. Always required for an LSI. Default is None.
dynamo_name (str) -- (Optional) The index's name in in DynamoDB. Defaults to the index’s name in the model.
- hash_key¶
The column that the index can be queried against. (LSI's hash_key is always the table hash_key.)
- model¶
The model this index is attached to.
- name¶
The name of this index in the model. Set by
bind_index()
during__init_subclass__()
.
- projection¶
Computed during
bind_index()
during__init_subclass__()
.{ "available": # Set of columns that can be returned from a query or search. "included": # Set of columns that can be used in query and scan filters. "mode": # "all", "keys", or "include" "strict": # False if queries and scans can fetch non-included columns }
- range_key¶
The column that the index can be sorted on.
- __copy__()[source]¶
Create a shallow copy of this Index. Primarily used when initializing models that subclass other abstract models or mixins (baseless classes that contain Columns and Indexes). You can override this method to change how derived models are created:
import copy class MyIndex(Index): def __copy__(self): new = super().__copy__() new.derived = True return new index = MyIndex(projection="keys", hash_key="some_column") same = copy.copy(index) assert same.derived # True
- Returns
A shallow copy of this Index, with the
model
and_name
attributes unset, and the computed projection invalidated.
Binding¶
- models.bind_column(name, column, force=False, recursive=False, copy=False) bloop.models.Column ¶
Bind a column to the model with the given name.
This method is primarily used during BaseModel.__init_subclass__, although it can be used to easily attach a new column to an existing model:
import bloop.models class User(BaseModel): id = Column(String, hash_key=True) email = Column(String, dynamo_name="e") bound = bloop.models.bind_column(User, "email", email) assert bound is email # rebind with force, and use a copy bound = bloop.models.bind_column(User, "email", email, force=True, copy=True) assert bound is not email
If an existing index refers to this column, it will be updated to point to the new column using
refresh_index()
, including recalculating the index projection. Meta attributes includingMeta.columns
,Meta.hash_key
, etc. will be updated if necessary.If
name
or the column'sdynamo_name
conflicts with an existing column or index on the model, raisesInvalidModel
unlessforce
is True. Ifrecursive
isTrue
and there are existing subclasses ofmodel
, a copy of the column will attempt to bind to each subclass. The recursive calls will not force the bind, and will always use a new copy. Ifcopy
isTrue
then a copy of the provided column is used. This uses a shallow copy via__copy__()
.- Parameters
model -- The model to bind the column to.
name -- The name to bind the column as. In effect, used for
setattr(model, name, column)
column -- The column to bind to the model.
force -- Unbind existing columns or indexes with the same name or dynamo_name. Default is False.
recursive -- Bind to each subclass of this model. Default is False.
copy -- Use a copy of the column instead of the column directly. Default is False.
- Returns
The bound column. This is a new column when
copy
is True, otherwise the input column.
- models.bind_index(name, index, force=False, recursive=True, copy=False) bloop.models.Index ¶
Bind an index to the model with the given name.
This method is primarily used during BaseModel.__init_subclass__, although it can be used to easily attach a new index to an existing model:
import bloop.models class User(BaseModel): id = Column(String, hash_key=True) email = Column(String, dynamo_name="e") by_email = GlobalSecondaryIndex(projection="keys", hash_key="email") bound = bloop.models.bind_index(User, "by_email", by_email) assert bound is by_email # rebind with force, and use a copy bound = bloop.models.bind_index(User, "by_email", by_email, force=True, copy=True) assert bound is not by_email
If
name
or the index'sdynamo_name
conflicts with an existing column or index on the model, raisesInvalidModel
unlessforce
is True. Ifrecursive
isTrue
and there are existing subclasses ofmodel
, a copy of the index will attempt to bind to each subclass. The recursive calls will not force the bind, and will always use a new copy. Ifcopy
isTrue
then a copy of the provided index is used. This uses a shallow copy via__copy__()
.- Parameters
model -- The model to bind the index to.
name -- The name to bind the index as. In effect, used for
setattr(model, name, index)
index -- The index to bind to the model.
force -- Unbind existing columns or indexes with the same name or dynamo_name. Default is False.
recursive -- Bind to each subclass of this model. Default is False.
copy -- Use a copy of the index instead of the index directly. Default is False.
- Returns
The bound index. This is a new column when
copy
is True, otherwise the input index.
- models.refresh_index(index) None ¶
Recalculate the projection, hash_key, and range_key for the given index.
- Parameters
meta -- model.Meta to find columns by name
index -- The index to refresh
- models.unbind(name=None, dynamo_name=None) None ¶
Unconditionally remove any columns or indexes bound to the given name or dynamo_name.
import bloop.models class User(BaseModel): id = Column(String, hash_key=True) email = Column(String, dynamo_name="e") by_email = GlobalSecondaryIndex(projection="keys", hash_key=email) for dynamo_name in ("id", "e", "by_email"): bloop.models.unbind(User.Meta, dynamo_name=dynamo_name) assert not User.Meta.columns assert not User.Meta.indexes assert not User.Meta.keys
Warning
This method does not pre- or post- validate the model with the requested changes. You are responsible for ensuring the model still has a hash key, that required columns exist for each index, etc.
- Parameters
meta -- model.Meta to remove the columns or indexes from
name -- column or index name to unbind by. Default is None.
dynamo_name -- column or index name to unbind by. Default is None.
Types¶
DynamicType¶
- class bloop.types.DynamicType[source]¶
Dynamically dumps a value based on its python type.
This is used by DynamicList, DynamicMap to handle path resolution before the value for an arbitrary path is known. For example, given the following model:
class UserUpload(BaseModel): id = Column(String, hash_key=True) doc = Column(DynamicMap)
And an instance as follows:
u = UserUpload(id="numberoverzero") u.doc = { "foo": ["bar", {0: "a", 1: "b"}, True] }
The renderer must know a type for
UserUpload.doc["foo"][1][0]
before the value is provided. An instance of this type will return itself for any value during__getitem__
, and then inspects the value type during _dump to create the correct simple type.Because
DynamicType
requires access to the DynamoDB type annotation, you must call_load
and_dump
, asdynamo_load
anddynamo_dump
can't be implemented. For example:DynamicType.i._load({"S": "2016-08-09T01:16:25.322849+00:00"}) -> "2016-08-09T01:16:25.322849+00:00" DynamicType.i._load({"N": "3.14"}) -> Decimal('3.14') DynamicType.i._dump([1, True, "f"]) -> {"L": [{"N": "1"}, {"BOOL": true}, {"S": "f"}]} DynamicType.i._dump({b"1", b"2"}) -> {"BS": ["MQ==", b"Mg=="]}
- i¶
Singleton instance of the class.
- backing_type = None¶
- python_type = None¶
Actions¶
- class bloop.actions.Action(action_type: bloop.actions.ActionType, value)[source]¶
Encapsulates an update value and how Dynamo should apply the update.
Generally, you will only need to use the
Action
class if you are updating an atomic counter (ADD) or making additions and deletions from a set (ADD, DELETE).You do not need to use an
Action
for SET or REMOVE updates.>>> import bloop.actions >>> from my_models import Website, User >>> user = User() >>> website = Website() # SET and REMOVE don't need an explicit action >>> user.verified = True >>> del user.pw_hash # ADD and DELETE need explicit actions >>> website.view_count = bloop.actions.add(1) >>> website.remote_addrs = bloop.actions.delete({"::0", "localhost"})
- class bloop.actions.ActionType(value)[source]¶
Represents how Dynamo should apply an update.
- Add = ('ADD', '{name_ref.name} {value_ref.name}', False)¶
- Delete = ('DELETE', '{name_ref.name} {value_ref.name}', False)¶
- Remove = ('REMOVE', '{name_ref.name}', True)¶
- Set = ('SET', '{name_ref.name}={value_ref.name}', True)¶
- new_action(value) bloop.actions.Action [source]¶
Convenience function to instantiate an Action with this type
- bloop.actions.unwrap(x: Union[bloop.actions.Action, Any]) Any [source]¶
return an action's inner value
- bloop.actions.wrap(x: Any) bloop.actions.Action [source]¶
return an action: REMOVE if x is None else SET
Searching¶
Search¶
- class bloop.search.Search(mode=None, engine=None, model=None, index=None, key=None, filter=None, projection=None, consistent=False, forward=True, parallel=None)[source]¶
A user-created search object.
Used to prepare a
PreparedSearch
which build search iterators.- Parameters
mode (str) -- Search type, either "query" or "scan".
engine --
Engine
to unpack models with.model --
BaseModel
being searched.index --
Index
to search, or None.key -- (Query only) Key condition. This must include an equality against the hash key, and optionally one of a restricted set of conditions on the range key.
filter -- Filter condition. Only matching objects will be included in the results.
projection -- "all", "count", a set of column names, or a list of
Column
. When projection is "count", you must advance the iterator to retrieve the count.consistent (bool) -- Use strongly consistent reads if True. Not applicable to GSIs. Default is False.
forward (bool) -- (Query only) Use ascending or descending order. Default is True (ascending).
parallel (tuple) -- (Scan only) A tuple of (Segment, TotalSegments) for this portion of a parallel scan. Default is None.
- prepare()[source]¶
Constructs a
PreparedSearch
.
PreparedSearch¶
- class bloop.search.PreparedSearch[source]¶
Mutable search object.
Creates
SearchModelIterator
objects which can be used to iterate the results of a query or search multiple times.
SearchIterator¶
- class bloop.search.SearchIterator(*, session, model, index, request, projected)[source]¶
Reusable search iterator.
- Parameters
- all()[source]¶
Eagerly load all results and return a single list. If there are no results, the list is empty.
- Returns
A list of results.
- property count¶
Number of items that have been loaded from DynamoDB so far, including buffered items.
- property exhausted¶
True if there are no more results.
- first()[source]¶
Return the first result. If there are no results, raises
ConstraintViolation
.- Returns
The first result.
- Raises
bloop.exceptions.ConstraintViolation -- No results.
- move_to(token)[source]¶
Restore an iterator to the state stored in a token. This will reset all iterator state, including
count
,scanned
, andexhausted
properties.- Parameters
token -- a
SearchIterator.token
- one()[source]¶
Return the unique result. If there is not exactly one result, raises
ConstraintViolation
.- Returns
The unique result.
- Raises
bloop.exceptions.ConstraintViolation -- Not exactly one result.
- property scanned¶
Number of items that DynamoDB evaluated, before any filter was applied.
- property token¶
JSON-serializable representation of the current SearchIterator state.
Use
iterator.move_to(token)
to move an iterator to this position.Implementations will always include a "ExclusiveStartKey" key but may include additional metadata. The iterator's
count
andscanned
values are not preserved.- Returns
Iterator state as a json-friendly dict
SearchModelIterator¶
- class bloop.search.SearchModelIterator(*, engine, model, index, request, projected)[source]¶
Reusable search iterator that unpacks result dicts into model instances.
- Parameters
- all()¶
Eagerly load all results and return a single list. If there are no results, the list is empty.
- Returns
A list of results.
- property count¶
Number of items that have been loaded from DynamoDB so far, including buffered items.
- property exhausted¶
True if there are no more results.
- first()¶
Return the first result. If there are no results, raises
ConstraintViolation
.- Returns
The first result.
- Raises
bloop.exceptions.ConstraintViolation -- No results.
- move_to(token)¶
Restore an iterator to the state stored in a token. This will reset all iterator state, including
count
,scanned
, andexhausted
properties.- Parameters
token -- a
SearchIterator.token
- one()¶
Return the unique result. If there is not exactly one result, raises
ConstraintViolation
.- Returns
The unique result.
- Raises
bloop.exceptions.ConstraintViolation -- Not exactly one result.
- reset()¶
Reset to the initial state, clearing the buffer and zeroing count and scanned.
- property scanned¶
Number of items that DynamoDB evaluated, before any filter was applied.
- property token¶
JSON-serializable representation of the current SearchIterator state.
Use
iterator.move_to(token)
to move an iterator to this position.Implementations will always include a "ExclusiveStartKey" key but may include additional metadata. The iterator's
count
andscanned
values are not preserved.- Returns
Iterator state as a json-friendly dict
Streaming¶
Coordinator¶
- class bloop.stream.coordinator.Coordinator(*, session, stream_arn)[source]¶
Encapsulates the shard-level management for a whole Stream.
- Parameters
session (
SessionWrapper
) -- Used to make DynamoDBStreams calls.stream_arn (str) -- Stream arn, usually from the model's
Meta.stream["arn"]
.
- advance_shards()[source]¶
Poll active shards for records and insert them into the buffer. Rotate exhausted shards.
Returns immediately if the buffer isn't empty.
- heartbeat()[source]¶
Keep active shards with "trim_horizon", "latest" iterators alive by advancing their iterators.
- move_to(position)[source]¶
Set the Coordinator to a specific endpoint or time, or load state from a token.
- Parameters
position -- "trim_horizon", "latest",
datetime
, or aCoordinator.token
- remove_shard(shard, drop_buffered_records=False)[source]¶
Remove a Shard from the Coordinator. Drops all buffered records from the Shard.
If the Shard is active or a root, it is removed and any children promoted to those roles.
- property token¶
JSON-serializable representation of the current Stream state.
Use
Engine.stream(YourModel, token)
to create an identical stream, orstream.move_to(token)
to move an existing stream to this position.- Returns
Stream state as a json-friendly dict
- Return type
RecordBuffer¶
- class bloop.stream.buffer.RecordBuffer[source]¶
Maintains a total ordering for records across any number of shards.
Methods are thin wrappers around
heapq
. Buffer entries have the form:where
total_ordering
is a tuple of(created_at, sequence_number, monotonic_clock)
created from each record as it is inserted.- clock()[source]¶
Returns a monotonically increasing integer.
Do not rely on the clock using a fixed increment.
>>> buffer = RecordBuffer() >>> buffer.clock() 3 >>> buffer.clock() 40 >>> buffer.clock() 41 >>> buffer.clock() 300
- Returns
A unique clock value guaranteed to be larger than every previous value
- Return type
- peek()[source]¶
A
pop()
without removing the (record, shard) from the buffer.- Returns
Oldest
(record, shard)
tuple.
- pop()[source]¶
Pop the oldest (lowest total ordering) record and the shard it came from.
- Returns
Oldest
(record, shard)
tuple.
Transactions¶
- class bloop.transactions.Transaction(engine)[source]¶
Holds a collection of transaction items to be rendered into a PreparedTransaction.
If used as a context manager, calls prepare() and commit() when the outermost context exits.
>>> engine = Engine() >>> tx = Transaction(engine) >>> tx.mode = "w" >>> p1 = tx.prepare() >>> p2 = tx.prepare() # different instances >>> with tx: ... pass >>> # tx.prepare().commit() is called here
- prepare()[source]¶
Create a new PreparedTransaction that can be committed.
This is called automatically when exiting the transaction as a context:
>>> engine = Engine() >>> tx = WriteTransaction(engine) >>> prepared = tx.prepare() >>> prepared.commit() # automatically calls commit when exiting >>> with WriteTransaction(engine) as tx: ... # modify the transaction here ... pass >>> # tx commits here
- Returns
- class bloop.transactions.PreparedTransaction[source]¶
Transaction that can be committed once or more.
Usually created from a
Transaction
instance.- commit() None [source]¶
Commit the transaction with a fixed transaction id.
A read transaction can call commit() any number of times, while a write transaction can only use the same tx_id for 10 minutes from the first call.
- first_commit_at: Optional[datetime.datetime] = None¶
When the transaction was first committed at. A prepared write transaction can only call commit again within 10 minutes of its first commit. This is
None
until commit() is called at least once.
- class bloop.transactions.TxItem(type: bloop.transactions.TxType, obj: Any, condition: Optional[Any])[source]¶
Includes the type, an object, and its condition settings.
The common way to construct an item is through the
new
method:>>> get_item = TxItem.new("get", some_obj) >>> save_item = TxItem.new("save", some_obj)
- property condition¶
An optional condition that constrains an update
- property is_update¶
Whether this should render an "UpdateExpression" in the TransactItem
- property obj¶
The object that will be modified, persisted, or referenced in a transaction
- property should_render_obj¶
Whether the object values should be rendered in the TransactItem
- property type¶
How this item will be used in a transaction
Conditions¶
ObjectTracking¶
ReferenceTracker¶
- class bloop.conditions.ReferenceTracker(engine)[source]¶
De-dupes reference names for the same path segments and generates unique placeholders for all names, paths, and values. The reference tracker can also forget references if, for example, a value fails to render but the rest of the condition should be left intact. This is primarily used when a value is unexpectedly dumped as None, or an expression uses another column as a value.
- Parameters
engine (
Engine
) -- Used to dump column values for value refs.
- any_ref(*, column, value=<Sentinel[missing]>, inner=False) bloop.conditions.Reference [source]¶
Returns a NamedTuple of (name, type, value) for any type of reference.
# Name ref >>> tracker.any_ref(column=User.email) Reference(name='email', type='name', value=None) # Value ref >>> tracker.any_ref(column=User.email, value='user@domain') Reference(name='email', type='value', value={'S': 'user@domain'}) # Passed as value ref, but value is another column >>> tracker.any_ref(column=User.email, value=User.other_column) Reference(name='other_column', type='name', value=None)
- Parameters
column (
ComparisonMixin
) -- The column to reference. Ifvalue
is None, this will render a name ref for this column.value -- (Optional) If provided, this is likely a value ref. If
value
is also a column, this will render a name ref for that column (not thecolumn
parameter).inner (bool) -- (Optional) True if this is a value ref and it should be dumped through a collection's inner type, and not the collection type itself. Default is False.
- Returns
A name or value reference
- Return type
bloop.conditions.Reference
ConditionRenderer¶
- class bloop.conditions.ConditionRenderer(engine)[source]¶
Renders collections of
BaseCondition
into DynamoDB's wire format for expressions, including:"ConditionExpression"
-- used in conditional operations"FilterExpression"
-- used in queries and scans to ignore results that don't match the filter"KeyConditionExpressions"
-- used to describe a query's hash (and range) key(s)"ProjectionExpression"
-- used to include a subset of possible columns in the results of a query or scan"UpdateExpression"
-- used to save objects
Normally, you will only need to call
render()
to handle any combination of conditions. You can also call each individualrender_*
function to control how multiple conditions of each type are applied.You can collect the rendered condition at any time through
rendered
.>>> renderer.render(obj=user, atomic=True) >>> renderer.output {'ConditionExpression': '((#n0 = :v1) AND (attribute_not_exists(#n2)) AND (#n4 = :v5))', 'ExpressionAttributeNames': {'#n0': 'age', '#n2': 'email', '#n4': 'id'}, 'ExpressionAttributeValues': {':v1': {'N': '3'}, ':v5': {'S': 'some-user-id'}}}
- Parameters
engine (
Engine
) -- Used to dump values in conditions into the appropriate wire format.
- property output¶
The wire format for all conditions that have been rendered. A new
ConditionRenderer
should be used for each operation.
- render(obj=None, condition=None, update=False, filter=None, projection=None, key=None)[source]¶
Main entry point for rendering multiple expressions. All parameters are optional, except obj when atomic or update are True.
- Parameters
obj -- (Optional) An object to render an atomic condition or update expression for. Required if update or atomic are true. Default is False.
condition (
BaseCondition
) -- (Optional) Rendered as a "ConditionExpression" for a conditional operation. If atomic is True, the two are rendered in an AND condition. Default is None.update (bool) -- (Optional) True if an "UpdateExpression" should be rendered for
obj
. Default is False.filter (
BaseCondition
) -- (Optional) A filter condition for a query or scan, rendered as a "FilterExpression". Default is None.projection (set
Column
) -- (Optional) A set of Columns to include in a query or scan, rendered as a "ProjectionExpression". Default is None.key (
BaseCondition
) -- (Optional) A key condition for queries, rendered as a "KeyConditionExpression". Default is None.
Built-in Conditions¶
Utilities¶
- class bloop.util.Sentinel(name, *args, **kwargs)[source]¶
Simple string-based placeholders for missing or special values.
Names are unique, and instances are re-used for the same name:
>>> from bloop.util import Sentinel >>> empty = Sentinel("empty") >>> empty <Sentinel[empty]> >>> same_token = Sentinel("empty") >>> empty is same_token True
This removes the need to import the same signal or placeholder value everywhere; two modules can create
Sentinel("some-value")
and refer to the same object. This is especially helpful whereNone
is a possible value, and so can't be used to indicate omission of an optional parameter.Implements
__repr__
to render nicely in function signatures. Standard object-based sentinels:>>> missing = object() >>> def some_func(optional=missing): ... pass ... >>> help(some_func) Help on function some_func in module __main__: some_func(optional=<object object at 0x7f0f3f29e5d0>)
With the Sentinel class:
>>> from bloop.util import Sentinel >>> missing = Sentinel("Missing") >>> def some_func(optional=missing): ... pass ... >>> help(some_func) Help on function some_func in module __main__: some_func(optional=<Sentinel[Missing]>)
- Parameters
name (str) -- The name for this sentinel.
Implementation Details¶
Models must be Hashable¶
By default python makes all user classes are hashable:
>>> class Dict: pass
>>> hash(Dict())
8771845190811
Classes are unhashable in two cases:
The class declares
__hash__ = None
.The class implements
__eq__
but not__hash__
In either case, during __init_subclass__()
, the ensure_hash()
function will manually locate the closest __hash__
method in the model's base classes:
if getattr(cls, "__hash__", None) is not None:
return
for base in cls.__mro__:
hash_fn = getattr(base, "__hash__")
if hash_fn:
break
else:
hash_fn = object.__hash__
cls.__hash__ = hash_fn
This is required because python doesn't provide a default hash method when __eq__
is implemented,
and won't fall back to a parent class's definition:
>>> class Base:
... def __hash__(self):
... print("Base.__hash__")
... return 0
...
>>> class Derived(Base):
... def __eq__(self, other):
... return True
...
>>> hash(Base())
Base.__hash__
>>> hash(Derived())
TypeError: unhashable type: 'Derived'
Stream Ordering Guarantees¶
The DynamoDB Streams API exposes a limited amount of temporal information and few options for navigating
within a shard. Due to these constraints, it was hard to reduce the API down to a single __next__
call
without compromising performance or ordering.
The major challenges described below include:
Creating a plausible total ordering across shards
Managing an iterator:
Refreshing expired iterators without data loss
Preventing low-volume iterators without sequence numbers from expiring
Promoting children when a shard runs out of records
Distinguishing open shards from gaps between records
Managing multiple shards:
Mapping stream "trim_horizon" and "latest" to a set of shards
Buffering records from multiple shards and applying a total ordering
Loading and saving tokens:
Simplifying an entire stream into a human-readable json blob
Pruning old shards when loading
Inserting new shards when loading
Resolving TrimmedDataAccessException for old shards
The following sections use a custom notation to describe shards and records.
Sn
and Rn
represent shards and records, where n
is an integer:
R11, R13, R32 # In general, RnX comes from Sn
S1, S12, S23 # In general, SnX is a child of Sn
<
represents chronological ordering between records:
R12 < R13 # In general, RX < RY when X < Y
=>
represents parent/child relationships between shards:
S1 => {} # S1 has no children
S2 => S21 # S2 has one child
# In general, SnX and SnY are adjacent children of Sn
S3 => {S31, S32}
~
represents two shards that are not within the same lineage:
S1 ~ S2 # Not related
S1 => S12 => S13; S4 => S41
# Both child shards, but of different lineages
S12 ~ S41
:
represents a set of records from a single shard:
S1: R11, R12 # no guaranteed order
S2: R23 < R24 # guaranteed order
Merging Shards¶
Low-throughput tables will only have a single open shard at any time, and can rely on the first and second guarantees above for rebuilding the exact order of changes to the table.
For high throughput tables, there can be more than one root shard, and each shard lineage can have more than one
child open at once. In this case, Bloop's streaming interface can't guarantee ordering for all records in the
stream, because there is no absolute chronological ordering across a partitioned table. Instead, Bloop will fall
back to a total ordering scheme that uses each record's ApproximateCreationDateTime
and, when two records have
the same creation time, a monotonically increasing integral clock to break ties.
Consider the following stream:
S0 => {S1, S2}
S0: R00
S1: R11 < R12 < R13
S2: R24 < R25 < R26
Where each record has the following (simplified) creation times:
Record |
ApproximateCreationDateTime |
---|---|
|
7 hours ago |
|
6 hours ago |
|
4 hours ago |
|
2 hours ago |
|
4 hours ago |
|
3 hours ago |
|
3 hours ago |
Bloop performs the following in one step:
The second guarantee says all records in
S0
are before records in that shard's children:R00 < (R11, R12, R13, R24, R25, R26)
The first guarantee says all records in the same shard are ordered:
R00 < ((R11 < R12 < R13), (R24 < R25 < R26)
Then,
ApproximateCreationDateTime
is used to partially mergeS1
andS2
records:R00 < R11 < (R12, R24) < (R25 < R26) < R13
There were still two collisions after using
ApproximateCreationDateTime
:R12, R24
andR25, R26
.To resolve
(R12, R24)
Bloop breaks the tie with an incrementing clock, and assignsR12 < R24
.(R25, R26)
is resolved because the records are in the same shard.
The final ordering is:
R00 < R11 < R12 < R24 < R25 < R26 < R13
Record Gaps¶
Bloop initially performs up to 5 "catch up" calls to GetRecords when advancing an iterator. If a GetRecords call
returns a NextShardIterator
but no records it's either due to being nearly caught up to "latest" in an open
shard, or from traversing a period of time in the shard with no activity. Endlessly polling until a record comes back
would cause every open shard to hang for up to 4 hours, while only calling GetRecords once could desynchronize one
shard's iterator from others.
By retrying up to 5 times on an empty GetRecords response (that still has a NextShardIterator) Bloop is confident that any gaps in the shard have been advanced. This is because it takes approximately 4-5 calls to traverse an empty shard completely. In other words, the 6th empty response almost certainly indicates that the iterator is caught up to latest in an open shard, and it's safe to cut back to one call at a time.
Why only 5 calls?¶
This number came from extensive testing which compared the number of empty responses returned for shards with various activity cadences. It's reasonable to assume that this number would only decrease with time, as advances in software and hardware would enable DynamoDB to cover larger periods in time with the same time investment. Because each call from a customer incurs overhead of creating and indexing each new iterator id, as well as the usual expensive signature-based authentication, it's in DynamoDB's interest to minimize the number of calls a customer needs to traverse a sparsely populated shard.
At worst DynamoDB starts requiring more calls to fully traverse an empty shard, which could result in reordering between records in shards with vastly different activity patterns. Since the creation-time-based ordering is approximate, this doesn't relax the guarantees that Bloop's streaming interface provides.
Changing the Limit¶
In general you should not need to worry about this value, and leave it alone. In the unlikely case that DynamoDB does increase the number of calls required to traverse an empty shard, Bloop will be updated soon after.
If you still need to tune this value:
import bloop.stream.shard
bloop.stream.shard.CALLS_TO_REACH_HEAD = 5
The exact value of this parameter will have almost no impact on performance in high-activity streams, and there are so few shards in low-activity streams that the total cost will be on par with the other calls to set up the stream.