Source code for bloop.stream.buffer

import heapq


def heap_item(clock, record, shard):
    """Create a tuple of (ordering, (record, shard)) for use in a RecordBuffer."""
    # Primary ordering is by event creation time.
    # However, creation time is *approximate* and has whole-second resolution.
    # This means two events in the same shard within one second can't be ordered.
    ordering = record["meta"]["created_at"]
    # From testing, SequenceNumber isn't a guaranteed ordering either.  However,
    # it is guaranteed to be unique within a shard.  This will be tie-breaker
    # for multiple records within the same shard, within the same second.
    second_ordering = int(record["meta"]["sequence_number"])
    # It's possible though unlikely, that sequence numbers will collide across
    # multiple shards, within the same second.  The final tie-breaker is
    # a monotonically increasing integer from the buffer.
    total_ordering = (ordering, second_ordering, clock())
    return total_ordering, record, shard


[docs]class RecordBuffer: """Maintains a total ordering for records across any number of shards. Methods are thin wrappers around :mod:`heapq`. Buffer entries have the form: .. code-block: python (total_ordering, record, shard) where ``total_ordering`` is a tuple of ``(created_at, sequence_number, monotonic_clock)`` created from each record as it is inserted. """ def __init__(self): self.heap = [] # Used by the total ordering clock self.__monotonic_integer = 0
[docs] def push(self, record, shard): """Push a new record into the buffer :param dict record: new record :param shard: Shard the record came from :type shard: :class:`~bloop.stream.shard.Shard` """ heapq.heappush(self.heap, heap_item(self.clock, record, shard))
[docs] def push_all(self, record_shard_pairs): """Push multiple (record, shard) pairs at once, with only one :meth:`heapq.heapify` call to maintain order. :param record_shard_pairs: list of ``(record, shard)`` tuples (see :func:`~bloop.stream.buffer.RecordBuffer.push`). """ # Faster than inserting one at a time; the heap is sorted once after all inserts. for record, shard in record_shard_pairs: item = heap_item(self.clock, record, shard) self.heap.append(item) heapq.heapify(self.heap)
[docs] def pop(self): """Pop the oldest (lowest total ordering) record and the shard it came from. :return: Oldest ``(record, shard)`` tuple. """ return heapq.heappop(self.heap)[1:]
[docs] def peek(self): """A :func:`~bloop.stream.buffer.RecordBuffer.pop` without removing the (record, shard) from the buffer. :return: Oldest ``(record, shard)`` tuple. """ return self.heap[0][1:]
[docs] def clear(self): """Drop the entire buffer.""" self.heap.clear()
def __len__(self): return len(self.heap)
[docs] def clock(self): """Returns a monotonically increasing integer. **Do not rely on the clock using a fixed increment.** .. code-block:: python >>> buffer = RecordBuffer() >>> buffer.clock() 3 >>> buffer.clock() 40 >>> buffer.clock() 41 >>> buffer.clock() 300 :return: A unique clock value guaranteed to be larger than every previous value :rtype: int """ # Try to prevent collisions from someone accessing the underlying int. # This offset ensures _RecordBuffer__monotonic_integer will never have # the same value as any call to clock(). value = self.__monotonic_integer + 1 self.__monotonic_integer += 2 return value