from stream_framework.serializers.dummy import DummySerializer
from stream_framework.serializers.simple_timeline_serializer import \
SimpleTimelineSerializer
from stream_framework.utils import get_metrics_instance
from stream_framework.activity import AggregatedActivity, Activity
import uuid
import six
[docs]class BaseStorage(object):
'''
The feed uses two storage classes, the
- Activity Storage and the
- Timeline Storage
The process works as follows::
feed = BaseFeed()
# the activity storage is used to store the activity and mapped to an id
feed.insert_activity(activity)
# now the id is inserted into the timeline storage
feed.add(activity)
Currently there are two activity storage classes ready for production:
- Cassandra
- Redis
The storage classes always receive a full activity object.
The serializer class subsequently determines how to transform the activity
into something the database can store.
'''
#: The default serializer class to use
default_serializer_class = DummySerializer
metrics = get_metrics_instance()
activity_class = Activity
aggregated_activity_class = AggregatedActivity
def __init__(self, serializer_class=None, activity_class=None, **options):
'''
:param serializer_class: allows you to overwrite the serializer class
'''
self.serializer_class = serializer_class or self.default_serializer_class
self.options = options
if activity_class is not None:
self.activity_class = activity_class
aggregated_activity_class = options.pop(
'aggregated_activity_class', None)
if aggregated_activity_class is not None:
self.aggregated_activity_class = aggregated_activity_class
[docs] def flush(self):
'''
Flushes the entire storage
'''
pass
[docs] def activities_to_ids(self, activities_or_ids):
'''
Utility function for lower levels to chose either serialize
'''
ids = []
for activity_or_id in activities_or_ids:
ids.append(self.activity_to_id(activity_or_id))
return ids
[docs] def activity_to_id(self, activity):
return getattr(activity, 'serialization_id', activity)
@property
def serializer(self):
'''
Returns an instance of the serializer class
The serializer needs to know about the activity and
aggregated activity classes we're using
'''
serializer_class = self.serializer_class
kwargs = {}
if getattr(self, 'aggregated_activity_class', None) is not None:
kwargs[
'aggregated_activity_class'] = self.aggregated_activity_class
serializer_instance = serializer_class(
activity_class=self.activity_class, **kwargs)
return serializer_instance
[docs] def serialize_activity(self, activity):
'''
Serialize the activity and returns the serialized activity
:returns str: the serialized activity
'''
serialized_activity = self.serializer.dumps(activity)
return serialized_activity
[docs] def serialize_activities(self, activities):
'''
Serializes the list of activities
:param activities: the list of activities
'''
serialized_activities = {}
for activity in activities:
serialized_activity = self.serialize_activity(activity)
serialized_activities[
self.activity_to_id(activity)] = serialized_activity
return serialized_activities
[docs] def deserialize_activities(self, serialized_activities):
'''
Serializes the list of activities
:param serialized_activities: the list of activities
:param serialized_activities: a dictionary with activity ids and activities
'''
activities = []
# handle the case where this is a dict
if isinstance(serialized_activities, dict):
serialized_activities = serialized_activities.values()
if serialized_activities is not None:
for serialized_activity in serialized_activities:
activity = self.serializer.loads(serialized_activity)
activities.append(activity)
return activities
[docs]class BaseActivityStorage(BaseStorage):
'''
The Activity storage globally stores a key value mapping.
This is used to store the mapping between an activity_id and the actual
activity object.
**Example**::
storage = BaseActivityStorage()
storage.add_many(activities)
storage.get_many(activity_ids)
The storage specific functions are located in
- add_to_storage
- get_from_storage
- remove_from_storage
'''
[docs] def add_to_storage(self, serialized_activities, *args, **kwargs):
'''
Adds the serialized activities to the storage layer
:param serialized_activities: a dictionary with {id: serialized_activity}
'''
raise NotImplementedError()
[docs] def get_from_storage(self, activity_ids, *args, **kwargs):
'''
Retrieves the given activities from the storage layer
:param activity_ids: the list of activity ids
:returns dict: a dictionary mapping activity ids to activities
'''
raise NotImplementedError()
[docs] def remove_from_storage(self, activity_ids, *args, **kwargs):
'''
Removes the specified activities
:param activity_ids: the list of activity ids
'''
raise NotImplementedError()
[docs] def get_many(self, activity_ids, *args, **kwargs):
'''
Gets many activities and deserializes them
:param activity_ids: the list of activity ids
'''
self.metrics.on_feed_read(self.__class__, len(activity_ids))
activities_data = self.get_from_storage(activity_ids, *args, **kwargs)
return self.deserialize_activities(activities_data)
[docs] def get(self, activity_id, *args, **kwargs):
results = self.get_many([activity_id], *args, **kwargs)
if not results:
return None
else:
return results[0]
[docs] def add(self, activity, *args, **kwargs):
return self.add_many([activity], *args, **kwargs)
[docs] def add_many(self, activities, *args, **kwargs):
'''
Adds many activities and serializes them before forwarding
this to add_to_storage
:param activities: the list of activities
'''
self.metrics.on_feed_write(self.__class__, len(activities))
serialized_activities = self.serialize_activities(activities)
return self.add_to_storage(serialized_activities, *args, **kwargs)
[docs] def remove(self, activity, *args, **kwargs):
return self.remove_many([activity], *args, **kwargs)
[docs] def remove_many(self, activities, *args, **kwargs):
'''
Figures out the ids of the given activities and forwards
The removal to the remove_from_storage function
:param activities: the list of activities
'''
self.metrics.on_feed_remove(self.__class__, len(activities))
if activities and isinstance(activities[0], (six.string_types, six.integer_types, uuid.UUID)):
activity_ids = activities
else:
activity_ids = list(self.serialize_activities(activities).keys())
return self.remove_from_storage(activity_ids, *args, **kwargs)
[docs]class BaseTimelineStorage(BaseStorage):
'''
The Timeline storage class handles the feed/timeline sorted part of storing
a feed.
**Example**::
storage = BaseTimelineStorage()
storage.add_many(key, activities)
# get a sorted slice of the feed
storage.get_slice(key, start, stop)
storage.remove_many(key, activities)
The storage specific functions are located in
'''
default_serializer_class = SimpleTimelineSerializer
[docs] def add(self, key, activity, *args, **kwargs):
return self.add_many(key, [activity], *args, **kwargs)
[docs] def add_many(self, key, activities, *args, **kwargs):
'''
Adds the activities to the feed on the given key
(The serialization is done by the serializer class)
:param key: the key at which the feed is stored
:param activities: the activities which to store
'''
self.metrics.on_feed_write(self.__class__, len(activities))
serialized_activities = self.serialize_activities(activities)
return self.add_to_storage(key, serialized_activities, *args, **kwargs)
[docs] def remove(self, key, activity, *args, **kwargs):
return self.remove_many(key, [activity], *args, **kwargs)
[docs] def remove_many(self, key, activities, *args, **kwargs):
'''
Removes the activities from the feed on the given key
(The serialization is done by the serializer class)
:param key: the key at which the feed is stored
:param activities: the activities which to remove
'''
self.metrics.on_feed_remove(self.__class__, len(activities))
if activities and isinstance(activities[0], (six.string_types, six.integer_types, uuid.UUID)):
serialized_activities = {a: a for a in activities}
else:
serialized_activities = self.serialize_activities(activities)
return self.remove_from_storage(key, serialized_activities, *args, **kwargs)
[docs] def get_index_of(self, key, activity_id):
raise NotImplementedError()
[docs] def remove_from_storage(self, key, serialized_activities):
raise NotImplementedError()
[docs] def index_of(self, key, activity_or_id):
'''
Returns activity's index within a feed or raises ValueError if not present
:param key: the key at which the feed is stored
:param activity_id: the activity's id to search
'''
activity_id = self.activities_to_ids([activity_or_id])[0]
return self.get_index_of(key, activity_id)
[docs] def get_slice_from_storage(self, key, start, stop, filter_kwargs=None, ordering_args=None):
'''
:param key: the key at which the feed is stored
:param start: start
:param stop: stop
:returns list: Returns a list with tuples of key,value pairs
'''
raise NotImplementedError()
[docs] def get_slice(self, key, start, stop, filter_kwargs=None, ordering_args=None):
'''
Returns a sorted slice from the storage
:param key: the key at which the feed is stored
'''
activities_data = self.get_slice_from_storage(
key, start, stop, filter_kwargs=filter_kwargs, ordering_args=ordering_args)
activities = []
if activities_data:
serialized_activities = list(zip(*activities_data))[1]
activities = self.deserialize_activities(serialized_activities)
self.metrics.on_feed_read(self.__class__, len(activities))
return activities
[docs] def get_batch_interface(self):
'''
Returns a context manager which ensure all subsequent operations
Happen via a batch interface
An example is redis.map
'''
raise NotImplementedError()
[docs] def trim(self, key, length):
'''
Trims the feed to the given length
:param key: the key location
:param length: the length to which to trim
'''
pass
[docs] def count(self, key, *args, **kwargs):
raise NotImplementedError()
[docs] def delete(self, key, *args, **kwargs):
raise NotImplementedError()