Source code for bloop.stream.stream

from ..models import unpack_from_dynamodb
from ..signals import object_loaded
from .coordinator import Coordinator


[docs]class Stream: """Iterator over all records in a stream. :param model: The model to stream records from. :param engine: The engine to load model objects through. :type engine: :class:`~bloop.engine.Engine` """ def __init__(self, *, model, engine): self.model = model self.engine = engine self.coordinator = Coordinator( session=engine.session, stream_arn=model.Meta.stream["arn"]) def __repr__(self): # <Stream[User]> return "<{}[{}]>".format(self.__class__.__name__, self.model.__name__) def __iter__(self): return self def __next__(self): record = next(self.coordinator) if record: meta = self.model.Meta for key, expected in [("new", meta.columns), ("old", meta.columns), ("key", meta.keys)]: if key not in meta.stream["include"]: record[key] = None else: self._unpack(record, key, expected) return record
[docs] def heartbeat(self): """Refresh iterators without sequence numbers so they don't expire. Call this at least every 14 minutes. """ self.coordinator.heartbeat()
[docs] def move_to(self, position): """Move the Stream to a specific endpoint or time, or load state from a token. Moving to an endpoint with "trim_horizon" or "latest" and loading from a previous token are both very efficient. In contrast, seeking to a specific time requires iterating **all records in the stream up to that time**. This can be **very expensive**. Once you have moved a stream to a time, you should save the :attr:`Stream.token <bloop.stream.stream.Stream.token>` so reloading will be extremely fast. :param position: "trim_horizon", "latest", :class:`~datetime.datetime`, or a :attr:`Stream.token <bloop.stream.stream.Stream.token>` """ self.coordinator.move_to(position)
@property def token(self): """JSON-serializable representation of the current Stream state. Use :func:`Engine.stream(YourModel, token) <bloop.engine.Engine.stream>` to create an identical stream, or :func:`stream.move_to(token) <bloop.stream.Stream.move_to>` to move an existing stream to this position. :returns: Stream state as a json-friendly dict :rtype: dict """ return self.coordinator.token def _unpack(self, record, key, expected): """Replaces the attr dict at the given key with an instance of a Model""" attrs = record.get(key) if attrs is None: return obj = unpack_from_dynamodb( attrs=attrs, expected=expected, model=self.model, engine=self.engine ) object_loaded.send(self.engine, engine=self.engine, obj=obj) record[key] = obj