Source code for eemeter.segmentation

#!/usr/bin/env python
# -*- coding: utf-8 -*-

   Copyright 2014-2023 OpenEEmeter contributors

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   See the License for the specific language governing permissions and
   limitations under the License.

from collections import namedtuple

import numpy as np
import pandas as pd
from patsy import dmatrix

__all__ = (

HourlyModelPrediction = namedtuple("HourlyModelPrediction", ["result"])

[docs]class CalTRACKSegmentModel(object): """An object that captures the model fit for one segment. Attributes ---------- segment_name : :any:`str` The name of the segment of data this model was fit to. model : :any:`object` The fitted model object. formula : :any:`str` The formula of the model regression. model_param : :any:`dict` A dictionary of parameters warnings : :any:`list` A list of eemeter warnings. """ def __init__(self, segment_name, model, formula, model_params, warnings=None): self.segment_name = segment_name self.model = model self.formula = formula self.model_params = model_params if warnings is None: warnings = [] self.warnings = warnings
[docs] def predict(self, data): """A function which takes input data and predicts for this segment model.""" if self.formula is None: var_str = "" else: var_str = self.formula.split("~", 1)[1] design_matrix_granular = dmatrix(var_str, data, return_type="dataframe") parameters = pd.Series(self.model_params) # Step 1, slice col_type = "C(hour_of_week)" hour_of_week_cols = [ c for c in design_matrix_granular.columns if col_type in c and c in parameters.keys() ] # Step 2, cut out all 0s design_matrix_granular = design_matrix_granular[ (design_matrix_granular[hour_of_week_cols] != 0).any(axis=1) ] cols_to_predict = list( set(parameters.keys()).intersection(set(design_matrix_granular.keys())) ) design_matrix_granular = design_matrix_granular[cols_to_predict] parameters = parameters[cols_to_predict] # Step 3, predict prediction ="predicted_usage") # Step 4, put nans back in prediction = prediction.reindex(data.index) return prediction
[docs] def json(self): """Return a JSON-serializable representation of this result. The output of this function can be converted to a serialized string with :any:`json.dumps`. """ data = { "segment_name": self.segment_name, "formula": self.formula, "warnings": [w.json() for w in self.warnings], "model_params": self.model_params, } return data
[docs] @classmethod def from_json(cls, data): """Loads a JSON-serializable representation into the model state. The input of this function is a dict which can be the result of :any:`json.loads`. """ c = cls( data.get("segment_name"), None, data.get("formula"), data.get("model_params"), warnings=data.get("warnings"), ) return c
[docs]class SegmentedModel(object): """Represent a model which has been broken into multiple model segments (for CalTRACK Hourly, these are month-by-month segments, each of which is associated with a different model. Parameters ---------- segment_models : :any:`dict` of :any:`eemeter.CalTRACKSegmentModel` Dictionary of segment models, keyed by segment name. prediction_segment_type : :any:`str` Any segment_type that can be passed to :any:`eemeter.segment_time_series`, currently "single", "one_month", "three_month", or "three_month_weighted". prediction_segment_name_mapping : :any:`dict` of :any:`str` A dictionary mapping the segment names for the segment type used for predicting to the segment names for the segment type used for fitting, e.g., `{"<predict_segment_name>": "<fit_segment_name>"}`. prediction_feature_processor : :any:`function` A function that transforms raw inputs (temperatures) into features for each segment. prediction_feature_processor_kwargs : :any:`dict` A dict of keyword arguments to be passed as `**kwargs` to the `prediction_feature_processor` function. """ def __init__( self, segment_models, prediction_segment_type, prediction_segment_name_mapping=None, prediction_feature_processor=None, prediction_feature_processor_kwargs=None, ): self.segment_models = segment_models fitted_model_lookup = { segment_model.segment_name: segment_model for segment_model in segment_models } if prediction_segment_name_mapping is None: self.model_lookup = fitted_model_lookup else: self.model_lookup = { pred_name: fitted_model_lookup.get(fit_name) for pred_name, fit_name in prediction_segment_name_mapping.items() } self.prediction_segment_type = prediction_segment_type self.prediction_segment_name_mapping = prediction_segment_name_mapping self.prediction_feature_processor = prediction_feature_processor self.prediction_feature_processor_kwargs = prediction_feature_processor_kwargs
[docs] def predict( self, prediction_index, temperature, **kwargs ): # ignore extra args with kwargs """Predict over a prediction index by combining results from all models. Parameters ---------- prediction_index : :any:`pandas.DatetimeIndex` The index over which to predict. temperature : :any:`pandas.Series` Hourly temperatures. **kwargs Extra argmuents will be ignored """ prediction_segmentation = segment_time_series( temperature.index, self.prediction_segment_type, drop_zero_weight_segments=True, ) iterator = iterate_segmented_dataset( temperature.to_frame("temperature_mean"), segmentation=prediction_segmentation, feature_processor=self.prediction_feature_processor, feature_processor_kwargs=self.prediction_feature_processor_kwargs, feature_processor_segment_name_mapping=self.prediction_segment_name_mapping, ) predictions = {} for segment_name, segmented_data in iterator: segment_model = self.model_lookup.get(segment_name) if segment_model is None: continue prediction = segment_model.predict(segmented_data) * segmented_data.weight # NaN the zero weights and reindex prediction = prediction[segmented_data.weight > 0].reindex(prediction_index) predictions[segment_name] = prediction predictions = pd.DataFrame(predictions) result = pd.DataFrame({"predicted_usage": predictions.sum(axis=1, min_count=1)}) return HourlyModelPrediction(result=result)
[docs] def json(self): """Return a JSON-serializable representation of this result. The output of this function can be converted to a serialized string with :any:`json.dumps`. """ def _json_or_none(obj): return None if obj is None else obj.json() data = { "segment_models": [_json_or_none(m) for m in self.segment_models], "model_lookup": { key: _json_or_none(val) for key, val in self.model_lookup.items() }, "prediction_segment_type": self.prediction_segment_type, "prediction_segment_name_mapping": self.prediction_segment_name_mapping, "prediction_feature_processor": self.prediction_feature_processor.__name__, } return data
def filter_zero_weights_feature_processor(segment_name, segment_data): """A default segment processor to use if none is provided.""" return segment_data[segment_data.weight > 0]
[docs]def iterate_segmented_dataset( data, segmentation=None, feature_processor=None, feature_processor_kwargs=None, feature_processor_segment_name_mapping=None, ): """A utility for iterating over segments which allows providing a function for processing outputs into features. Parameters ---------- data : :any:`pandas.DataFrame`, required Data to segment, segmentation : :any:`pandas.DataFrame`, default None A segmentation of the input dataframe expressed as a dataframe which shares the timeseries index of the data and has named columns of weights, which are iterated over to create the outputs (or inputs to the feature processor, which then creates the actual outputs). feature_processor : :any:`function`, default None A function that transforms raw inputs (temperatures) into features for each segment. feature_processor_kwargs : :any:`dict`, default None A dict of keyword arguments to be passed as `**kwargs` to the `feature_processor` function. feature_processor_segment_name_mapping : :any:`dict`, default None A mapping from the default segmentation segment names to alternate names. This is useful when prediction uses a different segment type than fitting. """ if feature_processor is None: feature_processor = filter_zero_weights_feature_processor if feature_processor_kwargs is None: feature_processor_kwargs = {} if feature_processor_segment_name_mapping is None: feature_processor_segment_name_mapping = {} def _apply_feature_processor(segment_name, segment_data): feature_processor_segment_name = feature_processor_segment_name_mapping.get( segment_name, segment_name ) if feature_processor is not None: segment_data = feature_processor( feature_processor_segment_name, segment_data, **feature_processor_kwargs ) return segment_data def _add_weights(data, weights): return pd.merge(data, weights, left_index=True, right_index=True) if segmentation is None: # spoof segment name and weights column segment_name = None weights = pd.DataFrame({"weight": 1}, index=data.index) segment_data = _add_weights(data, weights) segment_data = _apply_feature_processor(segment_name, segment_data) yield segment_name, segment_data else: for segment_name, segment_weights in segmentation.iteritems(): weights = segment_weights.to_frame("weight") segment_data = _add_weights(data, weights) segment_data = _apply_feature_processor(segment_name, segment_data) yield segment_name, segment_data
def _get_calendar_year_coverage_warning(index): pass def _get_hourly_coverage_warning(index, min_fraction_daily_coverage=0.9): pass def _segment_weights_single(index): return pd.DataFrame({"all": 1.0}, index=index) def _segment_weights_one_month(index): return pd.DataFrame( { month_name: (index.month == month_number).astype(float) for month_name, month_number in [ ("jan", 1), ("feb", 2), ("mar", 3), ("apr", 4), ("may", 5), ("jun", 6), ("jul", 7), ("aug", 8), ("sep", 9), ("oct", 10), ("nov", 11), ("dec", 12), ] }, index=index, columns=[ "jan", "feb", "mar", "apr", "may", "jun", "jul", "aug", "sep", "oct", "nov", "dec", ], # guarantee order ) def _segment_weights_three_month(index): return pd.DataFrame( { month_names: ( i: i in month_numbers)).astype(float) for month_names, month_numbers in [ ("dec-jan-feb", (12, 1, 2)), ("jan-feb-mar", (1, 2, 3)), ("feb-mar-apr", (2, 3, 4)), ("mar-apr-may", (3, 4, 5)), ("apr-may-jun", (4, 5, 6)), ("may-jun-jul", (5, 6, 7)), ("jun-jul-aug", (6, 7, 8)), ("jul-aug-sep", (7, 8, 9)), ("aug-sep-oct", (8, 9, 10)), ("sep-oct-nov", (9, 10, 11)), ("oct-nov-dec", (10, 11, 12)), ("nov-dec-jan", (11, 12, 1)), ] }, index=index, columns=[ "dec-jan-feb", "jan-feb-mar", "feb-mar-apr", "mar-apr-may", "apr-may-jun", "may-jun-jul", "jun-jul-aug", "jul-aug-sep", "aug-sep-oct", "sep-oct-nov", "oct-nov-dec", "nov-dec-jan", ], # guarantee order ) def _segment_weights_three_month_weighted(index): return pd.DataFrame( { month_names: lambda i: month_weights.get(str(i), 0.0) ).astype(float) for month_names, month_weights in [ ("dec-jan-feb-weighted", {"12": 0.5, "1": 1, "2": 0.5}), ("jan-feb-mar-weighted", {"1": 0.5, "2": 1, "3": 0.5}), ("feb-mar-apr-weighted", {"2": 0.5, "3": 1, "4": 0.5}), ("mar-apr-may-weighted", {"3": 0.5, "4": 1, "5": 0.5}), ("apr-may-jun-weighted", {"4": 0.5, "5": 1, "6": 0.5}), ("may-jun-jul-weighted", {"5": 0.5, "6": 1, "7": 0.5}), ("jun-jul-aug-weighted", {"6": 0.5, "7": 1, "8": 0.5}), ("jul-aug-sep-weighted", {"7": 0.5, "8": 1, "9": 0.5}), ("aug-sep-oct-weighted", {"8": 0.5, "9": 1, "10": 0.5}), ("sep-oct-nov-weighted", {"9": 0.5, "10": 1, "11": 0.5}), ("oct-nov-dec-weighted", {"10": 0.5, "11": 1, "12": 0.5}), ("nov-dec-jan-weighted", {"11": 0.5, "12": 1, "1": 0.5}), ] }, index=index, columns=[ "dec-jan-feb-weighted", "jan-feb-mar-weighted", "feb-mar-apr-weighted", "mar-apr-may-weighted", "apr-may-jun-weighted", "may-jun-jul-weighted", "jun-jul-aug-weighted", "jul-aug-sep-weighted", "aug-sep-oct-weighted", "sep-oct-nov-weighted", "oct-nov-dec-weighted", "nov-dec-jan-weighted", ], # guarantee order )
[docs]def segment_time_series(index, segment_type="single", drop_zero_weight_segments=False): """Split a time series index into segments by applying weights. Parameters ---------- index : :any:`pandas.DatetimeIndex` A time series index which gets split into segments. segment_type : :any:`str` The method to use when creating segments. - "single": creates one big segment with the name "all". - "one_month": creates up to twelve segments, each of which contains a single month. Segment names are "jan", "feb", ... "dec". - "three_month": creates up to twelve overlapping segments, each of which contains three calendar months of data. Segment names are "dec-jan-feb", "jan-feb-mar", ... "nov-dec-jan" - "three_month_weighted": creates up to twelve overlapping segments, each of contains three calendar months of data with first and third month in each segment having weights of one half. Segment names are "dec-jan-feb-weighted", "jan-feb-mar-weighted", ... "nov-dec-jan-weighted". Returns ------- segmentation : `pandas.DataFrame` A segmentation of the input index expressed as a dataframe which shares the input index and has named columns of weights. """ segment_weight_func = { "single": _segment_weights_single, "one_month": _segment_weights_one_month, "three_month": _segment_weights_three_month, "three_month_weighted": _segment_weights_three_month_weighted, }.get(segment_type, None) if segment_weight_func is None: raise ValueError("Invalid segment type: %s" % (segment_type)) segment_weights = segment_weight_func(index) if drop_zero_weight_segments: # keep only columns with non-zero weights total_weights = segment_weights.sum() columns_to_keep = total_weights[total_weights > 0].index.tolist() segment_weights = segment_weights[columns_to_keep] # TODO: Do something with these _get_hourly_coverage_warning(segment_weights) # each model _get_calendar_year_coverage_warning(index) # whole index return segment_weights
def fit_model_segments(segmented_dataset_dict, fit_segment): """A function which fits a model to each item in a dataset. Parameters ---------- segmented_dataset_dict : :any:`dict` of :any:`pandas.DataFrame` A dict with keys as segment names and values as dataframes of model input. fit_segment : :any:`function` A function which fits a model to a dataset in the `segmented_dataset_dict`. Returns ------- segment_models : :any:`list` of :any:`object` List of fitted model objects - the return values of the fit_segment function. """ segment_models = [ fit_segment(segment_name, segment_data) for segment_name, segment_data in segmented_dataset_dict.items() ] return segment_models