Module aws_lambda_powertools.utilities.feature_flags
Advanced feature flags utility
Sub-modules
- aws_lambda_powertools.utilities.feature_flags.appconfig
- aws_lambda_powertools.utilities.feature_flags.base
- aws_lambda_powertools.utilities.feature_flags.comparators
- aws_lambda_powertools.utilities.feature_flags.constants
- aws_lambda_powertools.utilities.feature_flags.exceptions
- aws_lambda_powertools.utilities.feature_flags.feature_flags
- aws_lambda_powertools.utilities.feature_flags.schema
- aws_lambda_powertools.utilities.feature_flags.types
Classes
- class AppConfigStore (environment: str,
 application: str,
 name: str,
 max_age: int = 5,
 sdk_config: Config | None = None,
 envelope: str | None = '',
 jmespath_options: dict | None = None,
 logger: logging.Logger | Logger | None = None,
 boto_config: Config | None = None,
 boto3_session: boto3.session.Session | None = None,
 boto3_client: AppConfigDataClient | None = None)
- 
Expand source codeclass AppConfigStore(StoreProvider): def __init__( self, environment: str, application: str, name: str, max_age: int = 5, sdk_config: Config | None = None, envelope: str | None = "", jmespath_options: dict | None = None, logger: logging.Logger | Logger | None = None, boto_config: Config | None = None, boto3_session: boto3.session.Session | None = None, boto3_client: AppConfigDataClient | None = None, ): """This class fetches JSON schemas from AWS AppConfig Parameters ---------- environment: str Appconfig environment, e.g. 'dev/test' etc. application: str AppConfig application name, e.g. 'powertools' name: str AppConfig configuration name e.g. `my_conf` max_age: int cache expiration time in seconds, or how often to call AppConfig to fetch latest configuration sdk_config: Config | None Botocore Config object to pass during client initialization envelope : str | None JMESPath expression to pluck feature flags data from config jmespath_options : dict | None Alternative JMESPath options to be included when filtering expr logger: A logging object Used to log messages. If None is supplied, one will be created. boto_config: botocore.config.Config, optional Botocore configuration to pass during client initialization boto3_session : boto3.Session, optional Boto3 session to use for AWS API communication boto3_client : AppConfigDataClient, optional Boto3 AppConfigDataClient Client to use, boto3_session and boto_config will be ignored if both are provided """ super().__init__() self.logger = logger or logging.getLogger(__name__) self.environment = environment self.application = application self.name = name self.cache_seconds = max_age self.config = sdk_config or boto_config self.envelope = envelope self.jmespath_options = jmespath_options self._conf_store = AppConfigProvider( environment=environment, application=application, config=sdk_config or boto_config, boto3_client=boto3_client, boto3_session=boto3_session, ) @property def get_raw_configuration(self) -> dict[str, Any]: """Fetch feature schema configuration from AWS AppConfig""" try: # parse result conf as JSON, keep in cache for self.max_age seconds self.logger.debug( "Fetching configuration from the store", extra={"param_name": self.name, "max_age": self.cache_seconds}, ) return cast( dict, self._conf_store.get( name=self.name, transform="json", max_age=self.cache_seconds, ), ) except (GetParameterError, TransformParameterError) as exc: err_msg = traceback.format_exc() if "AccessDenied" in err_msg: raise StoreClientError(err_msg) from exc raise ConfigurationStoreError("Unable to get AWS AppConfig configuration file") from exc def get_configuration(self) -> dict[str, Any]: """Fetch feature schema configuration from AWS AppConfig If envelope is set, it'll extract and return feature flags from configuration, otherwise it'll return the entire configuration fetched from AWS AppConfig. Raises ------ ConfigurationStoreError Any validation error or AppConfig error that can occur Returns ------- dict[str, Any] parsed JSON dictionary """ config = self.get_raw_configuration if self.envelope: self.logger.debug("Envelope enabled; extracting data from config", extra={"envelope": self.envelope}) config = jmespath_utils.query( data=config, envelope=self.envelope, jmespath_options=self.jmespath_options, ) return configHelper class that provides a standard way to create an ABC using inheritance. This class fetches JSON schemas from AWS AppConfig Parameters- environment:- str
- Appconfig environment, e.g. 'dev/test' etc.
- application:- str
- AppConfig application name, e.g. 'powertools'
- name:- str
- AppConfig configuration name e.g. my_conf
- max_age:- int
- cache expiration time in seconds, or how often to call AppConfig to fetch latest configuration
- sdk_config:- Config | None
- Botocore Config object to pass during client initialization
- envelope:- str | None
- JMESPath expression to pluck feature flags data from config
- jmespath_options:- dict | None
- Alternative JMESPath options to be included when filtering expr
- logger:- A logging object
- Used to log messages. If None is supplied, one will be created.
- boto_config:- botocore.config.Config, optional
- Botocore configuration to pass during client initialization
- boto3_session:- boto3.Session, optional
- Boto3 session to use for AWS API communication
- boto3_client:- AppConfigDataClient, optional
- Boto3 AppConfigDataClient Client to use, boto3_session and boto_config will be ignored if both are provided
 Ancestors- StoreProvider
- abc.ABC
 Instance variables- prop get_raw_configuration : dict[str, Any]
- 
Expand source code@property def get_raw_configuration(self) -> dict[str, Any]: """Fetch feature schema configuration from AWS AppConfig""" try: # parse result conf as JSON, keep in cache for self.max_age seconds self.logger.debug( "Fetching configuration from the store", extra={"param_name": self.name, "max_age": self.cache_seconds}, ) return cast( dict, self._conf_store.get( name=self.name, transform="json", max_age=self.cache_seconds, ), ) except (GetParameterError, TransformParameterError) as exc: err_msg = traceback.format_exc() if "AccessDenied" in err_msg: raise StoreClientError(err_msg) from exc raise ConfigurationStoreError("Unable to get AWS AppConfig configuration file") from excFetch feature schema configuration from AWS AppConfig 
 Methods- def get_configuration(self) ‑> dict[str, typing.Any]
- 
Expand source codedef get_configuration(self) -> dict[str, Any]: """Fetch feature schema configuration from AWS AppConfig If envelope is set, it'll extract and return feature flags from configuration, otherwise it'll return the entire configuration fetched from AWS AppConfig. Raises ------ ConfigurationStoreError Any validation error or AppConfig error that can occur Returns ------- dict[str, Any] parsed JSON dictionary """ config = self.get_raw_configuration if self.envelope: self.logger.debug("Envelope enabled; extracting data from config", extra={"envelope": self.envelope}) config = jmespath_utils.query( data=config, envelope=self.envelope, jmespath_options=self.jmespath_options, ) return configFetch feature schema configuration from AWS AppConfig If envelope is set, it'll extract and return feature flags from configuration, otherwise it'll return the entire configuration fetched from AWS AppConfig. Raises- ConfigurationStoreError
- Any validation error or AppConfig error that can occur
 Returns- dict[str, Any]
- parsed JSON dictionary
 
 
- class ConfigurationStoreError (*args, **kwargs)
- 
Expand source codeclass ConfigurationStoreError(Exception): """When a configuration store raises an exception on config retrieval or parsing"""When a configuration store raises an exception on config retrieval or parsing Ancestors- builtins.Exception
- builtins.BaseException
 
- class FeatureFlags (store: StoreProvider,
 logger: logging.Logger | Logger | None = None)
- 
Expand source codeclass FeatureFlags: def __init__(self, store: StoreProvider, logger: logging.Logger | Logger | None = None): """Evaluates whether feature flags should be enabled based on a given context. It uses the provided store to fetch feature flag rules before evaluating them. Examples -------- ```python from aws_lambda_powertools.utilities.feature_flags import FeatureFlags, AppConfigStore app_config = AppConfigStore( environment="test", application="powertools", name="test_conf_name", max_age=300, envelope="features" ) feature_flags: FeatureFlags = FeatureFlags(store=app_config) ``` Parameters ---------- store: StoreProvider Store to use to fetch feature flag schema configuration. logger: A logging object Used to log messages. If None is supplied, one will be created. """ self.store = store self.logger = logger or logging.getLogger(__name__) self._exception_handlers: dict[Exception, Callable] = {} def _match_by_action(self, action: str, condition_value: Any, context_value: Any) -> bool: try: func = RULE_ACTION_MAPPING.get(action, lambda a, b: False) return func(context_value, condition_value) except Exception as exc: self.logger.debug(f"caught exception while matching action: action={action}, exception={str(exc)}") handler = self._lookup_exception_handler(exc) if handler: self.logger.debug("Exception handler found! Delegating response.") return handler(exc) return False def _evaluate_conditions( self, rule_name: str, feature_name: str, rule: dict[str, Any], context: dict[str, Any], ) -> bool: """Evaluates whether context matches conditions, return False otherwise""" rule_match_value = rule.get(schema.RULE_MATCH_VALUE) conditions = cast(List[dict], rule.get(schema.CONDITIONS_KEY)) if not conditions: self.logger.debug( f"rule did not match, no conditions to match, rule_name={rule_name}, rule_value={rule_match_value}, " f"name={feature_name} ", ) return False for condition in conditions: context_value = context.get(condition.get(schema.CONDITION_KEY, "")) cond_action = condition.get(schema.CONDITION_ACTION, "") cond_value = condition.get(schema.CONDITION_VALUE) # time based rule actions have no user context. the context is the condition key if cond_action in ( schema.RuleAction.SCHEDULE_BETWEEN_TIME_RANGE.value, schema.RuleAction.SCHEDULE_BETWEEN_DATETIME_RANGE.value, schema.RuleAction.SCHEDULE_BETWEEN_DAYS_OF_WEEK.value, ): context_value = condition.get(schema.CONDITION_KEY) # e.g., CURRENT_TIME if not self._match_by_action(action=cond_action, condition_value=cond_value, context_value=context_value): self.logger.debug( f"rule did not match action, rule_name={rule_name}, rule_value={rule_match_value}, " f"name={feature_name}, context_value={str(context_value)} ", ) return False # context doesn't match condition self.logger.debug(f"rule matched, rule_name={rule_name}, rule_value={rule_match_value}, name={feature_name}") return True def _evaluate_rules( self, *, feature_name: str, context: dict[str, Any], feat_default: Any, rules: dict[str, Any], boolean_feature: bool, ) -> bool: """Evaluates whether context matches rules and conditions, otherwise return feature default""" for rule_name, rule in rules.items(): rule_match_value = rule.get(schema.RULE_MATCH_VALUE) # Context might contain PII data; do not log its value self.logger.debug( f"Evaluating rule matching, rule={rule_name}, feature={feature_name}, default={str(feat_default)}, boolean_feature={boolean_feature}", # noqa: E501 ) if self._evaluate_conditions(rule_name=rule_name, feature_name=feature_name, rule=rule, context=context): # Maintenance: Revisit before going GA. return bool(rule_match_value) if boolean_feature else rule_match_value # no rule matched, return default value of feature self.logger.debug( f"no rule matched, returning feature default, default={str(feat_default)}, name={feature_name}, boolean_feature={boolean_feature}", # noqa: E501 ) return feat_default def get_configuration(self) -> dict: """Get validated feature flag schema from configured store. Largely used to aid testing, since it's called by `evaluate` and `get_enabled_features` methods. Raises ------ ConfigurationStoreError Any propagated error from store SchemaValidationError When schema doesn't conform with feature flag schema Returns ------ dict[str, dict] parsed JSON dictionary **Example** ```python { "premium_features": { "default": False, "rules": { "customer tier equals premium": { "when_match": True, "conditions": [ { "action": "EQUALS", "key": "tier", "value": "premium", } ], } }, }, "feature_two": { "default": False } } ``` """ # parse result conf as JSON, keep in cache for max age defined in store self.logger.debug(f"Fetching schema from registered store, store={self.store}") config: dict = self.store.get_configuration() validator = schema.SchemaValidator(schema=config, logger=self.logger) validator.validate() return config def evaluate(self, *, name: str, context: dict[str, Any] | None = None, default: JSONType) -> JSONType: """Evaluate whether a feature flag should be enabled according to stored schema and input context **Logic when evaluating a feature flag** 1. Feature exists and a rule matches, returns when_match value 2. Feature exists but has either no rules or no match, return feature default value 3. Feature doesn't exist in stored schema, encountered an error when fetching -> return default value provided ┌────────────────────────┐ ┌────────────────────────┐ ┌────────────────────────┐ │ Feature flags │──────▶ Get Configuration ├───────▶ Evaluate rules │ └────────────────────────┘ │ │ │ │ │┌──────────────────────┐│ │┌──────────────────────┐│ ││ Fetch schema ││ ││ Match rule ││ │└───────────┬──────────┘│ │└───────────┬──────────┘│ │ │ │ │ │ │ │┌───────────▼──────────┐│ │┌───────────▼──────────┐│ ││ Cache schema ││ ││ Match condition ││ │└───────────┬──────────┘│ │└───────────┬──────────┘│ │ │ │ │ │ │ │┌───────────▼──────────┐│ │┌───────────▼──────────┐│ ││ Validate schema ││ ││ Match action ││ │└──────────────────────┘│ │└──────────────────────┘│ └────────────────────────┘ └────────────────────────┘ Parameters ---------- name: str feature name to evaluate context: dict[str, Any] | None Attributes that should be evaluated against the stored schema. for example: `{"tenant_id": "X", "username": "Y", "region": "Z"}` default: JSONType default value if feature flag doesn't exist in the schema, or there has been an error when fetching the configuration from the store Can be boolean or any JSON values for non-boolean features. Examples -------- ```python from aws_lambda_powertools.utilities.feature_flags import AppConfigStore, FeatureFlags from aws_lambda_powertools.utilities.typing import LambdaContext app_config = AppConfigStore(environment="dev", application="product-catalogue", name="features") feature_flags = FeatureFlags(store=app_config) def lambda_handler(event: dict, context: LambdaContext): # Get customer's tier from incoming request ctx = {"tier": event.get("tier", "standard")} # Evaluate whether customer's tier has access to premium features # based on `has_premium_features` rules has_premium_features: bool = feature_flags.evaluate(name="premium_features", context=ctx, default=False) if has_premium_features: # enable premium features ... ``` Returns ------ JSONType whether feature should be enabled (bool flags) or JSON value when non-bool feature matches Raises ------ SchemaValidationError When schema doesn't conform with feature flag schema """ if context is None: context = {} try: features = self.get_configuration() except ConfigurationStoreError as err: self.logger.debug(f"Failed to fetch feature flags from store, returning default provided, reason={err}") return default feature = features.get(name) if feature is None: self.logger.debug(f"Feature not found; returning default provided, name={name}, default={default}") return default rules = feature.get(schema.RULES_KEY) feat_default = feature.get(schema.FEATURE_DEFAULT_VAL_KEY) # Maintenance: Revisit before going GA. We might to simplify customers on-boarding by not requiring it # for non-boolean flags. It'll need minor implementation changes, docs changes, and maybe refactor # get_enabled_features. We can minimize breaking change, despite Beta label, by having a new # method `get_matching_features` returning dict[feature_name, feature_value] boolean_feature = feature.get( schema.FEATURE_DEFAULT_VAL_TYPE_KEY, True, ) # backwards compatibility, assume feature flag if not rules: self.logger.debug( f"no rules found, returning feature default, name={name}, default={str(feat_default)}, boolean_feature={boolean_feature}", # noqa: E501 ) # Maintenance: Revisit before going GA. We might to simplify customers on-boarding by not requiring it # for non-boolean flags. return bool(feat_default) if boolean_feature else feat_default self.logger.debug( f"looking for rule match, name={name}, default={str(feat_default)}, boolean_feature={boolean_feature}", # noqa: E501 ) return self._evaluate_rules( feature_name=name, context=context, feat_default=feat_default, rules=rules, boolean_feature=boolean_feature, ) def get_enabled_features(self, *, context: dict[str, Any] | None = None) -> list[str]: """Get all enabled feature flags while also taking into account context (when a feature has defined rules) Parameters ---------- context: dict[str, Any] | None dict of attributes that you would like to match the rules against, can be `{'tenant_id: 'X', 'username':' 'Y', 'region': 'Z'}` etc. Returns ---------- list[str] list of all feature names that either matches context or have True as default **Example** ```python ["premium_features", "my_feature_two", "always_true_feature"] ``` Raises ------ SchemaValidationError When schema doesn't conform with feature flag schema """ if context is None: context = {} features_enabled: list[str] = [] try: features: dict[str, Any] = self.get_configuration() except ConfigurationStoreError as err: self.logger.debug(f"Failed to fetch feature flags from store, returning empty list, reason={err}") return features_enabled self.logger.debug("Evaluating all features") for name, feature in features.items(): rules = feature.get(schema.RULES_KEY, {}) feature_default_value = feature.get(schema.FEATURE_DEFAULT_VAL_KEY) boolean_feature = feature.get( schema.FEATURE_DEFAULT_VAL_TYPE_KEY, True, ) # backwards compatibility, assume feature flag if feature_default_value and not rules: self.logger.debug(f"feature is enabled by default and has no defined rules, name={name}") features_enabled.append(name) elif self._evaluate_rules( feature_name=name, context=context, feat_default=feature_default_value, rules=rules, boolean_feature=boolean_feature, ): self.logger.debug(f"feature's calculated value is True, name={name}") features_enabled.append(name) return features_enabled def validation_exception_handler(self, exc_class: Exception | list[Exception]): """Registers function to handle unexpected validation exceptions when evaluating flags. It does not override the function of a default flag value in case of network and IAM permissions. For example, you won't be able to catch ConfigurationStoreError exception. Parameters ---------- exc_class : Exception | list[Exception] One or more exceptions to catch Examples -------- ```python feature_flags = FeatureFlags(store=app_config) @feature_flags.validation_exception_handler(Exception) # any exception def catch_exception(exc): raise TypeError("re-raised") from exc ``` """ def register_exception_handler(func: Callable[P, T]) -> Callable[P, T]: if isinstance(exc_class, list): for exp in exc_class: self._exception_handlers[exp] = func else: self._exception_handlers[exc_class] = func return func return register_exception_handler def _lookup_exception_handler(self, exc: BaseException) -> Callable | None: # Use "Method Resolution Order" to allow for matching against a base class # of an exception for cls in type(exc).__mro__: if cls in self._exception_handlers: return self._exception_handlers[cls] # type: ignore[index] # index is correct return NoneEvaluates whether feature flags should be enabled based on a given context. It uses the provided store to fetch feature flag rules before evaluating them. Examplesfrom aws_lambda_powertools.utilities.feature_flags import FeatureFlags, AppConfigStore app_config = AppConfigStore( environment="test", application="powertools", name="test_conf_name", max_age=300, envelope="features" ) feature_flags: FeatureFlags = FeatureFlags(store=app_config)Parameters- store:- StoreProvider
- Store to use to fetch feature flag schema configuration.
- logger:- A logging object
- Used to log messages. If None is supplied, one will be created.
 Methods- def evaluate(self, *, name: str, context: dict[str, Any] | None = None, default: JSONType) ‑> JSONType
- 
Expand source codedef evaluate(self, *, name: str, context: dict[str, Any] | None = None, default: JSONType) -> JSONType: """Evaluate whether a feature flag should be enabled according to stored schema and input context **Logic when evaluating a feature flag** 1. Feature exists and a rule matches, returns when_match value 2. Feature exists but has either no rules or no match, return feature default value 3. Feature doesn't exist in stored schema, encountered an error when fetching -> return default value provided ┌────────────────────────┐ ┌────────────────────────┐ ┌────────────────────────┐ │ Feature flags │──────▶ Get Configuration ├───────▶ Evaluate rules │ └────────────────────────┘ │ │ │ │ │┌──────────────────────┐│ │┌──────────────────────┐│ ││ Fetch schema ││ ││ Match rule ││ │└───────────┬──────────┘│ │└───────────┬──────────┘│ │ │ │ │ │ │ │┌───────────▼──────────┐│ │┌───────────▼──────────┐│ ││ Cache schema ││ ││ Match condition ││ │└───────────┬──────────┘│ │└───────────┬──────────┘│ │ │ │ │ │ │ │┌───────────▼──────────┐│ │┌───────────▼──────────┐│ ││ Validate schema ││ ││ Match action ││ │└──────────────────────┘│ │└──────────────────────┘│ └────────────────────────┘ └────────────────────────┘ Parameters ---------- name: str feature name to evaluate context: dict[str, Any] | None Attributes that should be evaluated against the stored schema. for example: `{"tenant_id": "X", "username": "Y", "region": "Z"}` default: JSONType default value if feature flag doesn't exist in the schema, or there has been an error when fetching the configuration from the store Can be boolean or any JSON values for non-boolean features. Examples -------- ```python from aws_lambda_powertools.utilities.feature_flags import AppConfigStore, FeatureFlags from aws_lambda_powertools.utilities.typing import LambdaContext app_config = AppConfigStore(environment="dev", application="product-catalogue", name="features") feature_flags = FeatureFlags(store=app_config) def lambda_handler(event: dict, context: LambdaContext): # Get customer's tier from incoming request ctx = {"tier": event.get("tier", "standard")} # Evaluate whether customer's tier has access to premium features # based on `has_premium_features` rules has_premium_features: bool = feature_flags.evaluate(name="premium_features", context=ctx, default=False) if has_premium_features: # enable premium features ... ``` Returns ------ JSONType whether feature should be enabled (bool flags) or JSON value when non-bool feature matches Raises ------ SchemaValidationError When schema doesn't conform with feature flag schema """ if context is None: context = {} try: features = self.get_configuration() except ConfigurationStoreError as err: self.logger.debug(f"Failed to fetch feature flags from store, returning default provided, reason={err}") return default feature = features.get(name) if feature is None: self.logger.debug(f"Feature not found; returning default provided, name={name}, default={default}") return default rules = feature.get(schema.RULES_KEY) feat_default = feature.get(schema.FEATURE_DEFAULT_VAL_KEY) # Maintenance: Revisit before going GA. We might to simplify customers on-boarding by not requiring it # for non-boolean flags. It'll need minor implementation changes, docs changes, and maybe refactor # get_enabled_features. We can minimize breaking change, despite Beta label, by having a new # method `get_matching_features` returning dict[feature_name, feature_value] boolean_feature = feature.get( schema.FEATURE_DEFAULT_VAL_TYPE_KEY, True, ) # backwards compatibility, assume feature flag if not rules: self.logger.debug( f"no rules found, returning feature default, name={name}, default={str(feat_default)}, boolean_feature={boolean_feature}", # noqa: E501 ) # Maintenance: Revisit before going GA. We might to simplify customers on-boarding by not requiring it # for non-boolean flags. return bool(feat_default) if boolean_feature else feat_default self.logger.debug( f"looking for rule match, name={name}, default={str(feat_default)}, boolean_feature={boolean_feature}", # noqa: E501 ) return self._evaluate_rules( feature_name=name, context=context, feat_default=feat_default, rules=rules, boolean_feature=boolean_feature, )Evaluate whether a feature flag should be enabled according to stored schema and input context Logic when evaluating a feature flag - Feature exists and a rule matches, returns when_match value
- Feature exists but has either no rules or no match, return feature default value
- Feature doesn't exist in stored schema, encountered an error when fetching -> return default value provided
 ┌────────────────────────┐ ┌────────────────────────┐ ┌────────────────────────┐ │ Feature flags │──────▶ Get Configuration ├───────▶ Evaluate rules │ └────────────────────────┘ │ │ │ │ │┌──────────────────────┐│ │┌──────────────────────┐│ ││ Fetch schema ││ ││ Match rule ││ │└───────────┬──────────┘│ │└───────────┬──────────┘│ │ │ │ │ │ │ │┌───────────▼──────────┐│ │┌───────────▼──────────┐│ ││ Cache schema ││ ││ Match condition ││ │└───────────┬──────────┘│ │└───────────┬──────────┘│ │ │ │ │ │ │ │┌───────────▼──────────┐│ │┌───────────▼──────────┐│ ││ Validate schema ││ ││ Match action ││ │└──────────────────────┘│ │└──────────────────────┘│ └────────────────────────┘ └────────────────────────┘ Parameters- name:- str
- feature name to evaluate
- context:- dict[str, Any] | None
- 
Attributes that should be evaluated against the stored schema. for example: {"tenant_id": "X", "username": "Y", "region": "Z"}
- default:- JSONType
- default value if feature flag doesn't exist in the schema, or there has been an error when fetching the configuration from the store Can be boolean or any JSON values for non-boolean features.
 Examplesfrom aws_lambda_powertools.utilities.feature_flags import AppConfigStore, FeatureFlags from aws_lambda_powertools.utilities.typing import LambdaContext app_config = AppConfigStore(environment="dev", application="product-catalogue", name="features") feature_flags = FeatureFlags(store=app_config) def lambda_handler(event: dict, context: LambdaContext): # Get customer's tier from incoming request ctx = {"tier": event.get("tier", "standard")} # Evaluate whether customer's tier has access to premium features # based on `has_premium_features` rules has_premium_features: bool = feature_flags.evaluate(name="premium_features", context=ctx, default=False) if has_premium_features: # enable premium features ...Returns- JSONType
- whether feature should be enabled (bool flags) or JSON value when non-bool feature matches
 Raises- SchemaValidationError
- When schema doesn't conform with feature flag schema
 
- def get_configuration(self) ‑> dict
- 
Expand source codedef get_configuration(self) -> dict: """Get validated feature flag schema from configured store. Largely used to aid testing, since it's called by `evaluate` and `get_enabled_features` methods. Raises ------ ConfigurationStoreError Any propagated error from store SchemaValidationError When schema doesn't conform with feature flag schema Returns ------ dict[str, dict] parsed JSON dictionary **Example** ```python { "premium_features": { "default": False, "rules": { "customer tier equals premium": { "when_match": True, "conditions": [ { "action": "EQUALS", "key": "tier", "value": "premium", } ], } }, }, "feature_two": { "default": False } } ``` """ # parse result conf as JSON, keep in cache for max age defined in store self.logger.debug(f"Fetching schema from registered store, store={self.store}") config: dict = self.store.get_configuration() validator = schema.SchemaValidator(schema=config, logger=self.logger) validator.validate() return configGet validated feature flag schema from configured store. Largely used to aid testing, since it's called by evaluateandget_enabled_featuresmethods.Raises- ConfigurationStoreError
- Any propagated error from store
- SchemaValidationError
- When schema doesn't conform with feature flag schema
 Returns- dict[str, dict]
- 
parsed JSON dictionary Example 
 { "premium_features": { "default": False, "rules": { "customer tier equals premium": { "when_match": True, "conditions": [ { "action": "EQUALS", "key": "tier", "value": "premium", } ], } }, }, "feature_two": { "default": False } }
- def get_enabled_features(self, *, context: dict[str, Any] | None = None) ‑> list[str]
- 
Expand source codedef get_enabled_features(self, *, context: dict[str, Any] | None = None) -> list[str]: """Get all enabled feature flags while also taking into account context (when a feature has defined rules) Parameters ---------- context: dict[str, Any] | None dict of attributes that you would like to match the rules against, can be `{'tenant_id: 'X', 'username':' 'Y', 'region': 'Z'}` etc. Returns ---------- list[str] list of all feature names that either matches context or have True as default **Example** ```python ["premium_features", "my_feature_two", "always_true_feature"] ``` Raises ------ SchemaValidationError When schema doesn't conform with feature flag schema """ if context is None: context = {} features_enabled: list[str] = [] try: features: dict[str, Any] = self.get_configuration() except ConfigurationStoreError as err: self.logger.debug(f"Failed to fetch feature flags from store, returning empty list, reason={err}") return features_enabled self.logger.debug("Evaluating all features") for name, feature in features.items(): rules = feature.get(schema.RULES_KEY, {}) feature_default_value = feature.get(schema.FEATURE_DEFAULT_VAL_KEY) boolean_feature = feature.get( schema.FEATURE_DEFAULT_VAL_TYPE_KEY, True, ) # backwards compatibility, assume feature flag if feature_default_value and not rules: self.logger.debug(f"feature is enabled by default and has no defined rules, name={name}") features_enabled.append(name) elif self._evaluate_rules( feature_name=name, context=context, feat_default=feature_default_value, rules=rules, boolean_feature=boolean_feature, ): self.logger.debug(f"feature's calculated value is True, name={name}") features_enabled.append(name) return features_enabledGet all enabled feature flags while also taking into account context (when a feature has defined rules) Parameters- context:- dict[str, Any] | None
- dict of attributes that you would like to match the rules
against, can be {'tenant_id: 'X', 'username':' 'Y', 'region': 'Z'}etc.
 Returns- list[str]
- 
list of all feature names that either matches context or have True as default Example 
 ["premium_features", "my_feature_two", "always_true_feature"]Raises- SchemaValidationError
- When schema doesn't conform with feature flag schema
 
- def validation_exception_handler(self, exc_class: Exception | list[Exception])
- 
Expand source codedef validation_exception_handler(self, exc_class: Exception | list[Exception]): """Registers function to handle unexpected validation exceptions when evaluating flags. It does not override the function of a default flag value in case of network and IAM permissions. For example, you won't be able to catch ConfigurationStoreError exception. Parameters ---------- exc_class : Exception | list[Exception] One or more exceptions to catch Examples -------- ```python feature_flags = FeatureFlags(store=app_config) @feature_flags.validation_exception_handler(Exception) # any exception def catch_exception(exc): raise TypeError("re-raised") from exc ``` """ def register_exception_handler(func: Callable[P, T]) -> Callable[P, T]: if isinstance(exc_class, list): for exp in exc_class: self._exception_handlers[exp] = func else: self._exception_handlers[exc_class] = func return func return register_exception_handlerRegisters function to handle unexpected validation exceptions when evaluating flags. It does not override the function of a default flag value in case of network and IAM permissions. For example, you won't be able to catch ConfigurationStoreError exception. Parameters- exc_class:- Exception | list[Exception]
- One or more exceptions to catch
 Examplesfeature_flags = FeatureFlags(store=app_config) @feature_flags.validation_exception_handler(Exception) # any exception def catch_exception(exc): raise TypeError("re-raised") from exc
 
- class RuleAction (*args, **kwds)
- 
Expand source codeclass RuleAction(str, Enum): EQUALS = "EQUALS" NOT_EQUALS = "NOT_EQUALS" KEY_GREATER_THAN_VALUE = "KEY_GREATER_THAN_VALUE" KEY_GREATER_THAN_OR_EQUAL_VALUE = "KEY_GREATER_THAN_OR_EQUAL_VALUE" KEY_LESS_THAN_VALUE = "KEY_LESS_THAN_VALUE" KEY_LESS_THAN_OR_EQUAL_VALUE = "KEY_LESS_THAN_OR_EQUAL_VALUE" STARTSWITH = "STARTSWITH" ENDSWITH = "ENDSWITH" IN = "IN" NOT_IN = "NOT_IN" KEY_IN_VALUE = "KEY_IN_VALUE" KEY_NOT_IN_VALUE = "KEY_NOT_IN_VALUE" VALUE_IN_KEY = "VALUE_IN_KEY" VALUE_NOT_IN_KEY = "VALUE_NOT_IN_KEY" ALL_IN_VALUE = "ALL_IN_VALUE" ANY_IN_VALUE = "ANY_IN_VALUE" NONE_IN_VALUE = "NONE_IN_VALUE" SCHEDULE_BETWEEN_TIME_RANGE = "SCHEDULE_BETWEEN_TIME_RANGE" # hour:min 24 hours clock SCHEDULE_BETWEEN_DATETIME_RANGE = "SCHEDULE_BETWEEN_DATETIME_RANGE" # full datetime format, excluding timezone SCHEDULE_BETWEEN_DAYS_OF_WEEK = "SCHEDULE_BETWEEN_DAYS_OF_WEEK" # MONDAY, TUESDAY, .... see TimeValues enum MODULO_RANGE = "MODULO_RANGE"str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'. Ancestors- builtins.str
- enum.Enum
 Class variables- var ALL_IN_VALUE
- var ANY_IN_VALUE
- var ENDSWITH
- var EQUALS
- var IN
- var KEY_GREATER_THAN_OR_EQUAL_VALUE
- var KEY_GREATER_THAN_VALUE
- var KEY_IN_VALUE
- var KEY_LESS_THAN_OR_EQUAL_VALUE
- var KEY_LESS_THAN_VALUE
- var KEY_NOT_IN_VALUE
- var MODULO_RANGE
- var NONE_IN_VALUE
- var NOT_EQUALS
- var NOT_IN
- var SCHEDULE_BETWEEN_DATETIME_RANGE
- var SCHEDULE_BETWEEN_DAYS_OF_WEEK
- var SCHEDULE_BETWEEN_TIME_RANGE
- var STARTSWITH
- var VALUE_IN_KEY
- var VALUE_NOT_IN_KEY
 
- class SchemaValidator (schema: dict[str, Any], logger: logging.Logger | Logger | None = None)
- 
Expand source codeclass SchemaValidator(BaseValidator): """Validates feature flag schema configuration Raises ------ SchemaValidationError When schema doesn't conform with feature flag schema Schema ------ **Feature object** A dictionary containing default value and rules for matching. The value MUST be an object and MIGHT contain the following members: * **default**: `bool | JSONType`. Defines default feature value. This MUST be present * **boolean_type**: bool. Defines whether feature has non-boolean value (`JSONType`). This MIGHT be present * **rules**: `dict[str, dict]`. Rules object. This MIGHT be present `JSONType` being any JSON primitive value: `str | int | float | bool | None | dict[str, Any] | list[Any]` ```json { "my_feature": { "default": true, "rules": {} }, "my_non_boolean_feature": { "default": {"group": "read-only"}, "boolean_type": false, "rules": {} } } ``` **Rules object** A dictionary with each rule and their conditions that a feature might have. The value MIGHT be present, and when defined it MUST contain the following members: * **when_match**: `bool | JSONType`. Defines value to return when context matches conditions * **conditions**: `list[dict]`. Conditions object. This MUST be present ```json { "my_feature": { "default": true, "rules": { "tenant id equals 345345435": { "when_match": false, "conditions": [] } } }, "my_non_boolean_feature": { "default": {"group": "read-only"}, "boolean_type": false, "rules": { "tenant id equals 345345435": { "when_match": {"group": "admin"}, "conditions": [] } } } } ``` **Conditions object** A list of dictionaries containing conditions for a given rule. The value MUST contain the following members: * **action**: `str`. Operation to perform to match a key and value. The value MUST be either EQUALS, STARTSWITH, ENDSWITH, KEY_IN_VALUE KEY_NOT_IN_VALUE VALUE_IN_KEY VALUE_NOT_IN_KEY * **key**: `str`. Key in given context to perform operation * **value**: `Any`. Value in given context that should match action operation. ```json { "my_feature": { "default": true, "rules": { "tenant id equals 345345435": { "when_match": false, "conditions": [ { "action": "EQUALS", "key": "tenant_id", "value": "345345435", } ] } } } } ``` """ def __init__(self, schema: dict[str, Any], logger: logging.Logger | Logger | None = None): self.schema = schema self.logger = logger or LOGGER # Validators are designed for modular testing # therefore we link the custom logger with global LOGGER # so custom validators can use them when necessary SchemaValidator._link_global_logger(self.logger) def validate(self) -> None: self.logger.debug("Validating schema") if not isinstance(self.schema, dict): raise SchemaValidationError(f"Features must be a dictionary, schema={str(self.schema)}") features = FeaturesValidator(schema=self.schema, logger=self.logger) features.validate() @staticmethod def _link_global_logger(logger: logging.Logger | Logger): global LOGGER LOGGER = loggerValidates feature flag schema configuration Raises- SchemaValidationError
- When schema doesn't conform with feature flag schema
 SchemaFeature object A dictionary containing default value and rules for matching. The value MUST be an object and MIGHT contain the following members: - default: bool | JSONType. Defines default feature value. This MUST be present
- boolean_type: bool. Defines whether feature has non-boolean value (JSONType). This MIGHT be present
- rules: dict[str, dict]. Rules object. This MIGHT be present
 JSONTypebeing any JSON primitive value:str | int | float | bool | None | dict[str, Any] | list[Any]{ "my_feature": { "default": true, "rules": {} }, "my_non_boolean_feature": { "default": {"group": "read-only"}, "boolean_type": false, "rules": {} } }Rules object A dictionary with each rule and their conditions that a feature might have. The value MIGHT be present, and when defined it MUST contain the following members: - when_match: bool | JSONType. Defines value to return when context matches conditions
- conditions: list[dict]. Conditions object. This MUST be present
 { "my_feature": { "default": true, "rules": { "tenant id equals 345345435": { "when_match": false, "conditions": [] } } }, "my_non_boolean_feature": { "default": {"group": "read-only"}, "boolean_type": false, "rules": { "tenant id equals 345345435": { "when_match": {"group": "admin"}, "conditions": [] } } } }Conditions object A list of dictionaries containing conditions for a given rule. The value MUST contain the following members: - 
action: str. Operation to perform to match a key and value. The value MUST be either EQUALS, STARTSWITH, ENDSWITH, KEY_IN_VALUE KEY_NOT_IN_VALUE VALUE_IN_KEY VALUE_NOT_IN_KEY
- 
key: str. Key in given context to perform operation
- value: Any. Value in given context that should match action operation.
 { "my_feature": { "default": true, "rules": { "tenant id equals 345345435": { "when_match": false, "conditions": [ { "action": "EQUALS", "key": "tenant_id", "value": "345345435", } ] } } } }Ancestors- BaseValidator
- abc.ABC
 Methods- def validate(self) ‑> None
- 
Expand source codedef validate(self) -> None: self.logger.debug("Validating schema") if not isinstance(self.schema, dict): raise SchemaValidationError(f"Features must be a dictionary, schema={str(self.schema)}") features = FeaturesValidator(schema=self.schema, logger=self.logger) features.validate()
 
- class StoreProvider
- 
Expand source codeclass StoreProvider(ABC): @property @abstractmethod def get_raw_configuration(self) -> dict[str, Any]: """Get configuration from any store and return the parsed JSON dictionary""" raise NotImplementedError() # pragma: no cover @abstractmethod def get_configuration(self) -> dict[str, Any]: """Get configuration from any store and return the parsed JSON dictionary If envelope is set, it'll extract and return feature flags from configuration, otherwise it'll return the entire configuration fetched from the store. Raises ------ ConfigurationStoreError Any error that can occur during schema fetch or JSON parse Returns ------- dict[str, Any] parsed JSON dictionary **Example** ```python { "premium_features": { "default": False, "rules": { "customer tier equals premium": { "when_match": True, "conditions": [ { "action": "EQUALS", "key": "tier", "value": "premium", } ], } }, }, "feature_two": { "default": False } } ``` """ raise NotImplementedError() # pragma: no coverHelper class that provides a standard way to create an ABC using inheritance. Ancestors- abc.ABC
 SubclassesInstance variables- prop get_raw_configuration : dict[str, Any]
- 
Expand source code@property @abstractmethod def get_raw_configuration(self) -> dict[str, Any]: """Get configuration from any store and return the parsed JSON dictionary""" raise NotImplementedError() # pragma: no coverGet configuration from any store and return the parsed JSON dictionary 
 Methods- def get_configuration(self) ‑> dict[str, typing.Any]
- 
Expand source code@abstractmethod def get_configuration(self) -> dict[str, Any]: """Get configuration from any store and return the parsed JSON dictionary If envelope is set, it'll extract and return feature flags from configuration, otherwise it'll return the entire configuration fetched from the store. Raises ------ ConfigurationStoreError Any error that can occur during schema fetch or JSON parse Returns ------- dict[str, Any] parsed JSON dictionary **Example** ```python { "premium_features": { "default": False, "rules": { "customer tier equals premium": { "when_match": True, "conditions": [ { "action": "EQUALS", "key": "tier", "value": "premium", } ], } }, }, "feature_two": { "default": False } } ``` """ raise NotImplementedError() # pragma: no coverGet configuration from any store and return the parsed JSON dictionary If envelope is set, it'll extract and return feature flags from configuration, otherwise it'll return the entire configuration fetched from the store. Raises- ConfigurationStoreError
- Any error that can occur during schema fetch or JSON parse
 Returns- dict[str, Any]
- 
parsed JSON dictionary Example 
 { "premium_features": { "default": False, "rules": { "customer tier equals premium": { "when_match": True, "conditions": [ { "action": "EQUALS", "key": "tier", "value": "premium", } ], } }, }, "feature_two": { "default": False } }