Source code for

import collections
import datetime
import logging
from typing import Dict, List

from ..exceptions import InvalidPosition, InvalidStream, RecordsExpired
from .buffer import RecordBuffer
from .shard import Shard, unpack_shards

logger = logging.getLogger("")

[docs]class Coordinator: """Encapsulates the shard-level management for a whole Stream. :param session: Used to make DynamoDBStreams calls. :type session: :class:`~bloop.session.SessionWrapper` :param str stream_arn: Stream arn, usually from the model's ``["arn"]``. """ def __init__(self, *, session, stream_arn): self.session = session # The stream that's being coordinated self.stream_arn = stream_arn # The oldest shards in each shard tree (no parents) self.roots = [] # Shards being iterated right now List[Shard] = [] # Closed shards that still have buffered records # shard -> buffered count self.closed: Dict[Shard, int] = {} # Single buffer for the lifetime of the Coordinator, but mutates frequently # Records in the buffer aren't considered read. When a Record popped from the buffer is # consumed, the Coordinator MUST notify the Shard by updating the sequence_number and iterator_type. # The new values should be: # shard.sequence_number = record["meta"]["sequence_number"] # shard.iterator_type = "after_record" # Holds records from advancing all active shard iterators. # Shards aren't advanced again until the buffer drains completely. self.buffer = RecordBuffer() def __repr__(self): # <Coordinator[.../StreamCreation-travis-661.2/stream/2016-10-03T06:17:12.741]> return "<{}[{}]>".format(self.__class__.__name__, self.stream_arn) def __iter__(self): return self def __next__(self): if not self.buffer: self.advance_shards() if self.buffer: record, shard = self.buffer.pop() # Now that the record is "consumed", advance the shard's checkpoint shard.sequence_number = record["meta"]["sequence_number"] shard.iterator_type = "after_sequence" if shard in self.closed: self.closed[shard] -= 1 if self.closed[shard] == 0: self.closed.pop(shard) return record # No records :( return None
[docs] def advance_shards(self): """Poll active shards for records and insert them into the buffer. Rotate exhausted shards. Returns immediately if the buffer isn't empty. """ # Don't poll shards when there are pending records. if self.buffer: return # 0) Collect new records from all active shards. record_shard_pairs = [] for shard in records = next(shard) if records: record_shard_pairs.extend((record, shard) for record in records) self.buffer.push_all(record_shard_pairs) self.migrate_closed_shards()
[docs] def heartbeat(self): """Keep active shards with "trim_horizon", "latest" iterators alive by advancing their iterators.""" for shard in if shard.sequence_number is None: records = next(shard) # Success! This shard now has an ``at_sequence`` iterator if records: self.buffer.push_all((record, shard) for record in records) self.migrate_closed_shards()
def migrate_closed_shards(self): # 1) Clean up exhausted Shards. Can't modify the active list while iterating it. to_migrate = {shard for shard in if shard.exhausted} # Early exit since this method will iterate the entire buffer. if not to_migrate: return # Build the count once, rather than look for each shard as we process it below. buffered_count = {} for *_, shard in self.buffer.heap: buffered_count.setdefault(shard, 0) buffered_count[shard] += 1 for shard in to_migrate: shard.load_children() # This call also promotes children to the shard's previous roles self.remove_shard(shard) for child in shard.children: child.jump_to(iterator_type="trim_horizon") # May still need to track this shard if shard in buffered_count: self.closed[shard] = buffered_count[shard] @property def token(self): """JSON-serializable representation of the current Stream state. Use :func:`, token) <>` to create an identical stream, or :func:`stream.move_to(token) <>` to move an existing stream to this position. :returns: Stream state as a json-friendly dict :rtype: dict """ # 0) Trace roots and active shards active_ids = [] shard_tokens = [] for root in self.roots: for shard in root.walk_tree(): shard_tokens.append(shard.token) # dedupe, stream_arn will be in the root token shard_tokens[-1].pop("stream_arn") active_ids.extend((shard.shard_id for shard in # 1) Inject closed shards for shard in self.closed.keys(): active_ids.append(shard.shard_id) shard_tokens.append(shard.token) shard_tokens[-1].pop("stream_arn") return { "stream_arn": self.stream_arn, "active": active_ids, "shards": shard_tokens }
[docs] def remove_shard(self, shard, drop_buffered_records=False): """Remove a Shard from the Coordinator. Drops all buffered records from the Shard. If the Shard is active or a root, it is removed and any children promoted to those roles. :param shard: The shard to remove :type shard: :class:`` :param bool drop_buffered_records: Whether records from this shard should be removed. Default is False. """ try: self.roots.remove(shard) except ValueError: # Wasn't a root Shard pass else: self.roots.extend(shard.children) try: except ValueError: # Wasn't an active Shard pass else: if drop_buffered_records: # TODO can this be improved? Gets expensive for high-volume streams with large buffers heap = self.buffer.heap # Clear buffered records from the shard. Each record is (ordering, record, shard) to_remove = [x for x in heap if x[2] is shard] for x in to_remove: heap.remove(x)
[docs] def move_to(self, position): """Set the Coordinator to a specific endpoint or time, or load state from a token. :param position: "trim_horizon", "latest", :class:`~datetime.datetime`, or a :attr:`Coordinator.token <>` """ if isinstance(position, move = _move_stream_token elif hasattr(position, "timestamp") and callable(position.timestamp): move = _move_stream_time elif isinstance(position, str) and position.lower() in ["latest", "trim_horizon"]: move = _move_stream_endpoint else: raise InvalidPosition("Don't know how to move to position {!r}".format(position)) move(self, position)
def _move_stream_endpoint(coordinator, position): """Move to the "trim_horizon" or "latest" of the entire stream.""" # 0) Everything will be rebuilt from DescribeStream. stream_arn = coordinator.stream_arn coordinator.roots.clear() coordinator.buffer.clear() # 1) Build a Dict[str, Shard] of the current Stream from a DescribeStream call current_shards = coordinator.session.describe_stream(stream_arn=stream_arn)["Shards"] current_shards = unpack_shards(current_shards, stream_arn, coordinator.session) # 2) Roots are any shards without parents. coordinator.roots.extend(shard for shard in current_shards.values() if not shard.parent) # 3.0) Stream trim_horizon is the combined trim_horizon of all roots. if position == "trim_horizon": for shard in coordinator.roots: shard.jump_to(iterator_type="trim_horizon") # 3.1) Stream latest is the combined latest of all shards without children. else: for root in coordinator.roots: for shard in root.walk_tree(): if not shard.children: shard.jump_to(iterator_type="latest") def _move_stream_time(coordinator, time): """Scan through the *entire* Stream for the first record after ``time``. This is an extremely expensive, naive algorithm that starts at trim_horizon and simply dumps records into the void until the first hit. General improvements in performance are tough; we can use the fact that Shards have a max life of 24hr to pick a pretty-good starting point for any Shard trees with 6 generations. Even then we can't know how close the oldest one is to rolling off so we either hit trim_horizon, or iterate an extra Shard more than we need to. The corner cases are worse; short trees, recent splits, trees with different branch heights. """ if time > _move_stream_endpoint(coordinator, "latest") return _move_stream_endpoint(coordinator, "trim_horizon") shard_trees = collections.deque(coordinator.roots) while shard_trees: shard = shard_trees.popleft() records = shard.seek_to(time) # Success! This section of some Shard tree is at the desired time. if records: coordinator.buffer.push_all((record, shard) for record in records) # Closed shard, keep searching its children. elif shard.exhausted: coordinator.remove_shard(shard, drop_buffered_records=True) shard_trees.extend(shard.children) def _move_stream_token(coordinator, token): """Move to the Stream position described by the token. The following rules are applied when interpolation is required: - If a shard does not exist (past the trim_horizon) it is ignored. If that shard had children, its children are also checked against the existing shards. - If none of the shards in the token exist, then InvalidStream is raised. - If a Shard expects its iterator to point to a SequenceNumber that is now past that Shard's trim_horizon, the Shard instead points to trim_horizon. """ stream_arn = coordinator.stream_arn = token["stream_arn"] # 0) Everything will be rebuilt from the DescribeStream masked by the token. coordinator.roots.clear() coordinator.closed.clear() coordinator.buffer.clear() # Injecting the token gives us access to the standard shard management functions token_shards = unpack_shards(token["shards"], stream_arn, coordinator.session) coordinator.roots = [shard for shard in token_shards.values() if not shard.parent][shard_id] for shard_id in token["active"]) # 1) Build a Dict[str, Shard] of the current Stream from a DescribeStream call current_shards = coordinator.session.describe_stream(stream_arn=stream_arn)["Shards"] current_shards = unpack_shards(current_shards, stream_arn, coordinator.session) # 2) Trying to find an intersection with the actual Stream by walking each root shard's tree. # Prune any Shard with no children that's not part of the actual Stream. # Raise InvalidStream if the entire token is pruned. unverified = collections.deque(coordinator.roots) while unverified: shard = unverified.popleft() if shard.shard_id not in current_shards:"Unknown or expired shard \"{}\" - pruning from stream token".format(shard.shard_id)) coordinator.remove_shard(shard, drop_buffered_records=True) unverified.extend(shard.children) # 3) Everything was pruned, so the token describes an unknown stream. if not coordinator.roots: raise InvalidStream("This token has no relation to the actual Stream.") # 4) Now that everything's verified, grab new iterators for the coordinator's active Shards. for shard in try: if shard.iterator_type is None: # Descendant of an unknown shard shard.iterator_type = "trim_horizon" # Move back to the token's specified position shard.jump_to(iterator_type=shard.iterator_type, sequence_number=shard.sequence_number) except RecordsExpired: # This token shard's sequence_number is beyond the trim_horizon. # The next closest record is at trim_horizon. msg = "SequenceNumber \"{}\" in shard \"{}\" beyond trim horizon: jumping to trim_horizon", shard.shard_id)) shard.jump_to(iterator_type="trim_horizon")