Source code for stream_framework.verbs

from stream_framework.utils import get_class_from_string


VERB_DICT = dict()


[docs]def get_verb_storage(): from stream_framework import settings if settings.STREAM_VERB_STORAGE == 'in-memory': return VERB_DICT else: return get_class_from_string(settings.STREAM_VERB_STORAGE)()
[docs]def register(verb): ''' Registers the given verb class ''' from stream_framework.verbs.base import Verb if not issubclass(verb, Verb): raise ValueError('%s doesnt subclass Verb' % verb) registered_verb = get_verb_storage().get(verb.id, verb) if registered_verb != verb: raise ValueError( 'cant register verb %r with id %s (clashing with verb %r)' % (verb, verb.id, registered_verb)) get_verb_storage()[verb.id] = verb
[docs]def get_verb_by_id(verb_id): if not isinstance(verb_id, int): raise ValueError('please provide a verb id, got %r' % verb_id) return get_verb_storage()[verb_id]