storage Package

base Module

class stream_framework.storage.base.BaseActivityStorage(serializer_class=None, activity_class=None, **options)[source]

Bases: stream_framework.storage.base.BaseStorage

The Activity storage globally stores a key value mapping. This is used to store the mapping between an activity_id and the actual activity object.

Example:

storage = BaseActivityStorage()
storage.add_many(activities)
storage.get_many(activity_ids)

The storage specific functions are located in

  • add_to_storage
  • get_from_storage
  • remove_from_storage
add(activity, *args, **kwargs)[source]
add_many(activities, *args, **kwargs)[source]

Adds many activities and serializes them before forwarding this to add_to_storage

Parameters:activities – the list of activities
add_to_storage(serialized_activities, *args, **kwargs)[source]

Adds the serialized activities to the storage layer

Parameters:serialized_activities – a dictionary with {id: serialized_activity}
get(activity_id, *args, **kwargs)[source]
get_from_storage(activity_ids, *args, **kwargs)[source]

Retrieves the given activities from the storage layer

Parameters:activity_ids – the list of activity ids
Returns dict:a dictionary mapping activity ids to activities
get_many(activity_ids, *args, **kwargs)[source]

Gets many activities and deserializes them

Parameters:activity_ids – the list of activity ids
remove(activity, *args, **kwargs)[source]
remove_from_storage(activity_ids, *args, **kwargs)[source]

Removes the specified activities

Parameters:activity_ids – the list of activity ids
remove_many(activities, *args, **kwargs)[source]

Figures out the ids of the given activities and forwards The removal to the remove_from_storage function

Parameters:activities – the list of activities
class stream_framework.storage.base.BaseStorage(serializer_class=None, activity_class=None, **options)[source]

Bases: object

The feed uses two storage classes, the - Activity Storage and the - Timeline Storage

The process works as follows:

feed = BaseFeed()
# the activity storage is used to store the activity and mapped to an id
feed.insert_activity(activity)
# now the id is inserted into the timeline storage
feed.add(activity)

Currently there are two activity storage classes ready for production:

  • Cassandra
  • Redis

The storage classes always receive a full activity object. The serializer class subsequently determines how to transform the activity into something the database can store.

activities_to_ids(activities_or_ids)[source]

Utility function for lower levels to chose either serialize

activity_class

alias of Activity

activity_to_id(activity)[source]
aggregated_activity_class

alias of AggregatedActivity

default_serializer_class

The default serializer class to use

alias of DummySerializer

deserialize_activities(serialized_activities)[source]

Serializes the list of activities

Parameters:
  • serialized_activities – the list of activities
  • serialized_activities – a dictionary with activity ids and activities
flush()[source]

Flushes the entire storage

metrics = <stream_framework.metrics.base.Metrics object>
serialize_activities(activities)[source]

Serializes the list of activities

Parameters:activities – the list of activities
serialize_activity(activity)[source]

Serialize the activity and returns the serialized activity

Returns str:the serialized activity
serializer

Returns an instance of the serializer class

The serializer needs to know about the activity and aggregated activity classes we’re using

class stream_framework.storage.base.BaseTimelineStorage(serializer_class=None, activity_class=None, **options)[source]

Bases: stream_framework.storage.base.BaseStorage

The Timeline storage class handles the feed/timeline sorted part of storing a feed.

Example:

storage = BaseTimelineStorage()
storage.add_many(key, activities)
# get a sorted slice of the feed
storage.get_slice(key, start, stop)
storage.remove_many(key, activities)

The storage specific functions are located in

add(key, activity, *args, **kwargs)[source]
add_many(key, activities, *args, **kwargs)[source]

Adds the activities to the feed on the given key (The serialization is done by the serializer class)

Parameters:
  • key – the key at which the feed is stored
  • activities – the activities which to store
count(key, *args, **kwargs)[source]
default_serializer_class

alias of SimpleTimelineSerializer

delete(key, *args, **kwargs)[source]
get_batch_interface()[source]

Returns a context manager which ensure all subsequent operations Happen via a batch interface

An example is redis.map

get_index_of(key, activity_id)[source]
get_slice(key, start, stop, filter_kwargs=None, ordering_args=None)[source]

Returns a sorted slice from the storage

Parameters:key – the key at which the feed is stored
get_slice_from_storage(key, start, stop, filter_kwargs=None, ordering_args=None)[source]
Parameters:
  • key – the key at which the feed is stored
  • start – start
  • stop – stop
Returns list:

Returns a list with tuples of key,value pairs

index_of(key, activity_or_id)[source]

Returns activity’s index within a feed or raises ValueError if not present

Parameters:
  • key – the key at which the feed is stored
  • activity_id – the activity’s id to search
remove(key, activity, *args, **kwargs)[source]
remove_from_storage(key, serialized_activities)[source]
remove_many(key, activities, *args, **kwargs)[source]

Removes the activities from the feed on the given key (The serialization is done by the serializer class)

Parameters:
  • key – the key at which the feed is stored
  • activities – the activities which to remove
trim(key, length)[source]

Trims the feed to the given length

Parameters:
  • key – the key location
  • length – the length to which to trim

memory Module

class stream_framework.storage.memory.InMemoryActivityStorage(serializer_class=None, activity_class=None, **options)[source]

Bases: stream_framework.storage.base.BaseActivityStorage

add_to_storage(activities, *args, **kwargs)[source]
flush()[source]
get_from_storage(activity_ids, *args, **kwargs)[source]
remove_from_storage(activity_ids, *args, **kwargs)[source]
class stream_framework.storage.memory.InMemoryTimelineStorage(serializer_class=None, activity_class=None, **options)[source]

Bases: stream_framework.storage.base.BaseTimelineStorage

add_to_storage(key, activities, *args, **kwargs)[source]
contains(key, activity_id)[source]
count(key, *args, **kwargs)[source]
delete(key, *args, **kwargs)[source]
classmethod get_batch_interface()[source]
get_index_of(key, activity_id)[source]
get_slice_from_storage(key, start, stop, filter_kwargs=None, ordering_args=None)[source]
remove_from_storage(key, activities, *args, **kwargs)[source]
trim(key, length)[source]
stream_framework.storage.memory.reverse_bisect_left(a, x, lo=0, hi=None)[source]

same as python bisect.bisect_left but for lists with reversed order