Source code for stream_framework.activity

from stream_framework import exceptions as stream_framework_exceptions
from stream_framework.utils import make_list_unique, datetime_to_epoch
from stream_framework.utils.five import long_t
import datetime
import uuid
import six


MAX_AGGREGATED_ACTIVITIES_LENGTH = 15


[docs]class BaseActivity(object): ''' Common parent class for Activity and Aggregated Activity Check for this if you want to see if something is an activity ''' pass
[docs]class DehydratedActivity(BaseActivity): ''' The dehydrated verions of an :class:`Activity`. the only data stored is serialization_id of the original Serializers can store this instead of the full activity Feed classes ''' def __init__(self, serialization_id): self.serialization_id = serialization_id self._activity_ids = [serialization_id] self.dehydrated = True
[docs] def get_hydrated(self, activities): ''' returns the full hydrated Activity from activities :param activities a dict {'activity_id': Activity} ''' activity = activities[int(self.serialization_id)] activity.dehydrated = False return activity
[docs]class Activity(BaseActivity): ''' Wrapper class for storing activities Note actor_id target_id and object_id are always present actor, target and object are lazy by default ''' def __init__(self, actor, verb, object, target=None, time=None, extra_context=None): self.verb = verb self.time = time or datetime.datetime.utcnow() # either set .actor or .actor_id depending on the data self._set_object_or_id('actor', actor) self._set_object_or_id('object', object) self._set_object_or_id('target', target) # store the extra context which gets serialized self.extra_context = extra_context or {} self.dehydrated = False
[docs] def get_dehydrated(self): ''' returns the dehydrated version of the current activity ''' return DehydratedActivity(serialization_id=self.serialization_id)
def __eq__(self, other): if not isinstance(other, Activity): raise ValueError( 'Can only compare to Activity not %r of type %s' % (other, type(other))) return self.serialization_id == other.serialization_id def __lt__(self, other): return self.serialization_id < other.serialization_id def __hash__(self): return hash(self.serialization_id) @property def serialization_id(self): ''' serialization_id is used to keep items locally sorted and unique (eg. used redis sorted sets' score or cassandra column names) serialization_id is also used to select random activities from the feed (eg. remove activities from feeds must be fast operation) for this reason the serialization_id should be unique and not change over time eg: activity.serialization_id = 1373266755000000000042008 1373266755000 activity creation time as epoch with millisecond resolution 0000000000042 activity left padded object_id (10 digits) 008 left padded activity verb id (3 digits) :returns: int --the serialization id ''' if self.object_id >= 10 ** 10 or self.verb.id >= 10 ** 3: raise TypeError('Fatal: object_id / verb have too many digits !') if not self.time: raise TypeError('Cant serialize activities without a time') milliseconds = str(int(datetime_to_epoch(self.time) * 1000)) serialization_id_str = '%s%0.10d%0.3d' % ( milliseconds, self.object_id, self.verb.id) serialization_id = int(serialization_id_str) return serialization_id def _set_object_or_id(self, field, object_): ''' Either write the integer to field_id Or if its a real object field_id = int field = object ''' id_field = '%s_id' % field if isinstance(object_, six.integer_types): setattr(self, id_field, object_) elif object_ is None: setattr(self, field, None) setattr(self, id_field, None) else: setattr(self, field, object_) setattr(self, id_field, object_.id) def __getattr__(self, name): ''' Fail early if using the activity class in the wrong way ''' if name in ['object', 'target', 'actor']: if name not in self.__dict__: error_message = 'Field self.%s is not defined, use self.%s_id instead' % ( name, name) raise AttributeError(error_message) return object.__getattribute__(self, name) def __repr__(self): class_name = self.__class__.__name__ message = '%s(%s) %s %s' % (class_name, self.verb.past_tense, self.actor_id, self.object_id) return message
[docs]class AggregatedActivity(BaseActivity): ''' Object to store aggregated activities ''' max_aggregated_activities_length = MAX_AGGREGATED_ACTIVITIES_LENGTH def __init__(self, group, activities=None, created_at=None, updated_at=None): self.group = group self.activities = activities or [] self.created_at = created_at self.updated_at = updated_at # if the user opened the notification window and browsed over the # content self.seen_at = None # if the user engaged with the content self.read_at = None # activity self.minimized_activities = 0 self.dehydrated = False self._activity_ids = [] @property def serialization_id(self): ''' serialization_id is used to keep items locally sorted and unique (eg. used redis sorted sets' score or cassandra column names) serialization_id is also used to select random activities from the feed (eg. remove activities from feeds must be fast operation) for this reason the serialization_id should be unique and not change over time eg: activity.serialization_id = 1373266755000000000042008 1373266755000 activity creation time as epoch with millisecond resolution 0000000000042 activity left padded object_id (10 digits) 008 left padded activity verb id (3 digits) :returns: int --the serialization id ''' milliseconds = str(int(datetime_to_epoch(self.updated_at)) * 1000) return milliseconds
[docs] def get_dehydrated(self): ''' returns the dehydrated version of the current activity ''' if self.dehydrated is True: raise ValueError('already dehydrated') self._activity_ids = [] for activity in self.activities: self._activity_ids.append(activity.serialization_id) self.activities = [] self.dehydrated = True return self
[docs] def get_hydrated(self, activities): ''' expects activities to be a dict like this {'activity_id': Activity} ''' assert self.dehydrated, 'not dehydrated yet' for activity_id in self._activity_ids: self.activities.append(activities[activity_id]) self._activity_ids = [] self.dehydrated = False return self
def __len__(self): ''' Works on both hydrated and not hydrated activities ''' if self._activity_ids: length = len(self.activity_ids) else: length = len(self.activities) return length @property def activity_ids(self): ''' Returns a list of activity ids ''' if self._activity_ids: activity_ids = self._activity_ids else: activity_ids = [a.serialization_id for a in self.activities] return activity_ids def __eq__(self, other): if not isinstance(other, AggregatedActivity): raise ValueError( 'I can only compare aggregated activities to other aggregated activities') equal = True date_fields = ['created_at', 'updated_at', 'seen_at', 'read_at'] for field in date_fields: current = getattr(self, field) other_value = getattr(other, field) if isinstance(current, datetime.datetime) and isinstance(other_value, datetime.datetime): delta = abs(current - other_value) if delta > datetime.timedelta(seconds=10): equal = False break else: if current != other_value: equal = False break if self.activities != other.activities: equal = False return equal def __hash__(self): return hash(self.serialization_id)
[docs] def contains(self, activity): ''' Checks if activity is present in this aggregated ''' if not isinstance(activity, (Activity, long_t, uuid.UUID)): raise ValueError('contains needs an activity or long not %s', activity) activity_id = getattr(activity, 'serialization_id', activity) return activity_id in set([a.serialization_id for a in self.activities])
[docs] def append(self, activity): if self.contains(activity): raise stream_framework_exceptions.DuplicateActivityException() # append the activity self.activities.append(activity) # set the first seen if self.created_at is None: self.created_at = activity.time # set the last seen if self.updated_at is None or activity.time > self.updated_at: self.updated_at = activity.time # ensure that our memory usage, and pickling overhead don't go up # endlessly if len(self.activities) > self.max_aggregated_activities_length: self.activities.pop(0) self.minimized_activities += 1
[docs] def remove(self, activity): if not self.contains(activity): raise stream_framework_exceptions.ActivityNotFound() if len(self.activities) == 1: raise ValueError( 'removing this activity would leave an empty aggregation') # remove the activity activity_id = getattr(activity, 'serialization_id', activity) self.activities = [a for a in self.activities if a.serialization_id != activity_id] # now time to update the times self.updated_at = self.last_activity.time # adjust the count if self.minimized_activities: self.minimized_activities -= 1
[docs] def remove_many(self, activities): removed_activities = [] for activity in activities: try: self.remove(activity) except stream_framework_exceptions.ActivityNotFound: pass else: removed_activities.append(activity) return removed_activities
@property def actor_count(self): ''' Returns a count of the number of actors When dealing with large lists only approximate the number of actors ''' base = self.minimized_activities actor_id_count = len(self.actor_ids) base += actor_id_count return base @property def other_actor_count(self): actor_count = self.actor_count return actor_count - 1 @property def activity_count(self): ''' Returns the number of activities ''' base = self.minimized_activities base += len(self.activities) return base @property def last_activity(self): activity = self.activities[-1] return activity @property def last_activities(self): activities = self.activities[::-1] return activities @property def verb(self): return self.activities[0].verb @property def verbs(self): return make_list_unique([a.verb for a in self.activities]) @property def actor_ids(self): return make_list_unique([a.actor_id for a in self.activities]) @property def object_ids(self): return make_list_unique([a.object_id for a in self.activities])
[docs] def is_seen(self): ''' Returns if the activity should be considered as seen at this moment ''' seen = self.seen_at is not None and self.seen_at >= self.updated_at return seen
[docs] def update_seen_at(self): ''' A hook method that updates the seen_at to current date ''' self.seen_at = datetime.datetime.utcnow()
[docs] def is_read(self): ''' Returns if the activity should be considered as seen at this moment ''' read = self.read_at is not None and self.read_at >= self.updated_at return read
[docs] def update_read_at(self): ''' A hook method that updates the read_at to current date ''' self.read_at = datetime.datetime.utcnow()
def __repr__(self): if self.dehydrated: message = 'Dehydrated AggregatedActivity (%s)' % self._activity_ids return message verbs = [v.past_tense for v in self.verbs] actor_ids = self.actor_ids object_ids = self.object_ids actors = ','.join(map(str, actor_ids)) message = 'AggregatedActivity(%s-%s) Actors %s: Objects %s' % ( self.group, ','.join(verbs), actors, object_ids) return message
[docs]class NotificationActivity(AggregatedActivity): def __init__(self, *args, **kwargs): AggregatedActivity.__init__(self, *args, **kwargs) # overrides AggregatedActivity is_read & is_seen instance methods self.is_seen = False self.is_read = False