Source code for eemeter.transform

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""

   Copyright 2014-2023 OpenEEmeter contributors

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.

"""
from datetime import datetime, timedelta
from functools import partial

import numpy as np
import pandas as pd
import pytz

from .exceptions import NoBaselineDataError, NoReportingDataError
from .warnings import EEMeterWarning


__all__ = (
    "Term",
    "as_freq",
    "day_counts",
    "get_baseline_data",
    "get_reporting_data",
    "get_terms",
    "remove_duplicates",
    "overwrite_partial_rows_with_nan",
    "clean_caltrack_billing_data",
    "clean_caltrack_billing_daily_data",
)


[docs]def overwrite_partial_rows_with_nan(df): return df.dropna().reindex(df.index)
[docs]def remove_duplicates(df_or_series): """Remove duplicate rows or values by keeping the first of each duplicate. Parameters ---------- df_or_series : :any:`pandas.DataFrame` or :any:`pandas.Series` Pandas object from which to drop duplicate index values. Returns ------- deduplicated : :any:`pandas.DataFrame` or :any:`pandas.Series` The deduplicated pandas object. """ # CalTrack 2.3.2.2 return df_or_series[~df_or_series.index.duplicated(keep="first")]
[docs]def as_freq( data_series, freq, atomic_freq="1 Min", series_type="cumulative", include_coverage=False, ): """Resample data to a different frequency. This method can be used to upsample or downsample meter data. The assumption it makes to do so is that meter data is constant and averaged over the given periods. For instance, to convert billing-period data to daily data, this method first upsamples to the atomic frequency (1 minute freqency, by default), "spreading" usage evenly across all minutes in each period. Then it downsamples to hourly frequency and returns that result. With instantaneous series, the data is copied to all contiguous time intervals and the mean over `freq` is returned. **Caveats**: - This method gives a fair amount of flexibility in resampling as long as you are OK with the assumption that usage is constant over the period (this assumption is generally broken in observed data at large enough frequencies, so this caveat should not be taken lightly). Parameters ---------- data_series : :any:`pandas.Series` Data to resample. Should have a :any:`pandas.DatetimeIndex`. freq : :any:`str` The frequency to resample to. This should be given in a form recognized by the :any:`pandas.Series.resample` method. atomic_freq : :any:`str`, optional The "atomic" frequency of the intermediate data form. This can be adjusted to a higher atomic frequency to increase speed or memory performance. series_type : :any:`str`, {'cumulative', ‘instantaneous’}, default 'cumulative' Type of data sampling. 'cumulative' data can be spread over smaller time intervals and is aggregated using addition (e.g. meter data). 'instantaneous' data is copied (not spread) over smaller time intervals and is aggregated by averaging (e.g. weather data). include_coverage: :any:`bool`, default `False` Option of whether to return a series with just the resampled values or a dataframe with a column that includes percent coverage of source data used for each sample. Returns ------- resampled_data : :any:`pandas.Series` or :any:`pandas.DataFrame` Data resampled to the given frequency (optionally as a dataframe with a coverage column if `include_coverage` is used. """ # TODO(philngo): make sure this complies with CalTRACK 2.2.2.1 if not isinstance(data_series, pd.Series): raise ValueError( "expected series, got object with class {}".format(data_series.__class__) ) if data_series.empty: return data_series series = remove_duplicates(data_series) target_freq = pd.Timedelta(atomic_freq) timedeltas = (series.index[1:] - series.index[:-1]).append( pd.TimedeltaIndex([pd.NaT]) ) if series_type == "cumulative": spread_factor = target_freq.total_seconds() / timedeltas.total_seconds() series_spread = series * spread_factor atomic_series = series_spread.asfreq(atomic_freq, method="ffill") resampled = atomic_series.resample(freq).sum() resampled_with_nans = atomic_series.resample(freq).first() n_coverage = atomic_series.resample(freq).count() resampled = resampled[resampled_with_nans.notnull()].reindex(resampled.index) elif series_type == "instantaneous": atomic_series = series.asfreq(atomic_freq, method="ffill") resampled = atomic_series.resample(freq).mean() if resampled.index[-1] < series.index[-1]: # this adds a null at the end using the target frequency last_index = pd.date_range(resampled.index[-1], freq=freq, periods=2)[1:] resampled = ( pd.concat([resampled, pd.Series(np.nan, index=last_index)]) .resample(freq) .mean() ) if include_coverage: n_total = resampled.resample(atomic_freq).count().resample(freq).count() resampled = resampled.to_frame("value") resampled["coverage"] = n_coverage / n_total return resampled else: return resampled
[docs]def day_counts(index): """Days between DatetimeIndex values as a :any:`pandas.Series`. Parameters ---------- index : :any:`pandas.DatetimeIndex` The index for which to get day counts. Returns ------- day_counts : :any:`pandas.Series` A :any:`pandas.Series` with counts of days between periods. Counts are given on start dates of periods. """ # dont affect the original data index = index.copy() if len(index) == 0: return pd.Series([], index=index) timedeltas = (index[1:] - index[:-1]).append(pd.TimedeltaIndex([pd.NaT])) timedelta_days = timedeltas.total_seconds() / (60 * 60 * 24) return pd.Series(timedelta_days, index=index)
def _make_baseline_warnings( end_inf, start_inf, data_start, data_end, start_limit, end_limit ): warnings = [] # warn if there is a gap at end if not end_inf and data_end < end_limit: warnings.append( EEMeterWarning( qualified_name="eemeter.get_baseline_data.gap_at_baseline_end", description=( "Data does not have coverage at requested baseline end date." ), data={ "requested_end": end_limit.isoformat(), "data_end": data_end.isoformat(), }, ) ) # warn if there is a gap at start if not start_inf and start_limit < data_start: warnings.append( EEMeterWarning( qualified_name="eemeter.get_baseline_data.gap_at_baseline_start", description=( "Data does not have coverage at requested baseline start date." ), data={ "requested_start": start_limit.isoformat(), "data_start": data_start.isoformat(), }, ) ) return warnings
[docs]def get_baseline_data( data, start=None, end=None, max_days=365, allow_billing_period_overshoot=False, n_days_billing_period_overshoot=None, ignore_billing_period_gap_for_day_count=False, ): """Filter down to baseline period data. .. note:: For compliance with CalTRACK, set ``max_days=365`` (section 2.2.1.1). Parameters ---------- data : :any:`pandas.DataFrame` or :any:`pandas.Series` The data to filter to baseline data. This data will be filtered down to an acceptable baseline period according to the dates passed as `start` and `end`, or the maximum period specified with `max_days`. start : :any:`datetime.datetime` A timezone-aware datetime that represents the earliest allowable start date for the baseline data. The stricter of this or `max_days` is used to determine the earliest allowable baseline period date. end : :any:`datetime.datetime` A timezone-aware datetime that represents the latest allowable end date for the baseline data, i.e., the latest date for which data is available before the intervention begins. max_days : :any:`int`, default 365 The maximum length of the period. Ignored if `end` is not set. The stricter of this or `start` is used to determine the earliest allowable baseline period date. allow_billing_period_overshoot : :any:`bool`, default False If True, count `max_days` from the end of the last billing data period that ends before the `end` date, rather than from the exact `end` date. Otherwise use the exact `end` date as the cutoff. n_days_billing_period_overshoot: :any:`int`, default None If `allow_billing_period_overshoot` is set to True, this determines the number of days of overshoot that will be tolerated. A value of None implies that any number of days is allowed. ignore_billing_period_gap_for_day_count : :any:`bool`, default False If True, instead of going back `max_days` from either the `end` date or end of the last billing period before that date (depending on the value of the `allow_billing_period_overshoot` setting) and excluding the last period that began before that date, first check to see if excluding or including that period gets closer to a total of `max_days` of data. For example, with `max_days=365`, if an exact 365 period would targeted Feb 15, but the billing period went from Jan 20 to Feb 20, exclude that period for a total of ~360 days of data, because that's closer to 365 than ~390 days, which would be the total if that period was included. If, on the other hand, if that period started Feb 10 and went to Mar 10, include the period, because ~370 days of data is closer to than ~340. Returns ------- baseline_data, warnings : :any:`tuple` of (:any:`pandas.DataFrame` or :any:`pandas.Series`, :any:`list` of :any:`eemeter.EEMeterWarning`) Data for only the specified baseline period and any associated warnings. """ if max_days is not None: if start is not None: raise ValueError( # pragma: no cover "If max_days is set, start cannot be set: start={}, max_days={}.".format( start, max_days ) ) start_inf = False if start is None: # py datetime min/max are out of range of pd.Timestamp min/max start_target = pytz.UTC.localize(pd.Timestamp.min) + timedelta(days=1) start_inf = True else: start_target = start end_inf = False if end is None: end_limit = pytz.UTC.localize(pd.Timestamp.max) - timedelta(days=1) end_inf = True else: end_limit = end # copying prevents setting on slice warnings data_before_end_limit = data[:end_limit].copy() data_end = data_before_end_limit.index.max() if ignore_billing_period_gap_for_day_count and ( n_days_billing_period_overshoot is None or end_limit - timedelta(days=n_days_billing_period_overshoot) < data_end ): end_limit = data_before_end_limit.index.max() if not end_inf and max_days is not None: start_target = end_limit - timedelta(days=max_days) if allow_billing_period_overshoot: # adjust start limit to get a selection closest to max_days # also consider ffill for get_loc method - always picks previous try: loc = data_before_end_limit.index.get_loc(start_target, method="nearest") except (KeyError, IndexError): # pragma: no cover baseline_data = data_before_end_limit start_limit = start_target else: start_limit = data_before_end_limit.index[loc] baseline_data = data_before_end_limit[start_limit:].copy() else: # use hard limit for baseline start start_limit = start_target baseline_data = data_before_end_limit[start_limit:].copy() if baseline_data.dropna().empty: raise NoBaselineDataError() baseline_data.iloc[-1] = np.nan data_end = data.index.max() data_start = data.index.min() return ( baseline_data, _make_baseline_warnings( end_inf, start_inf, data_start, data_end, start_limit, end_limit ), )
def _make_reporting_warnings( end_inf, start_inf, data_start, data_end, start_limit, end_limit ): warnings = [] # warn if there is a gap at end if not end_inf and data_end < end_limit: warnings.append( EEMeterWarning( qualified_name="eemeter.get_reporting_data.gap_at_reporting_end", description=( "Data does not have coverage at requested reporting end date." ), data={ "requested_end": end_limit.isoformat(), "data_end": data_end.isoformat(), }, ) ) # warn if there is a gap at start if not start_inf and start_limit < data_start: warnings.append( EEMeterWarning( qualified_name="eemeter.get_reporting_data.gap_at_reporting_start", description=( "Data does not have coverage at requested reporting start date." ), data={ "requested_start": start_limit.isoformat(), "data_start": data_start.isoformat(), }, ) ) return warnings
[docs]def get_reporting_data( data, start=None, end=None, max_days=365, allow_billing_period_overshoot=False, ignore_billing_period_gap_for_day_count=False, ): """Filter down to reporting period data. Parameters ---------- data : :any:`pandas.DataFrame` or :any:`pandas.Series` The data to filter to reporting data. This data will be filtered down to an acceptable reporting period according to the dates passed as `start` and `end`, or the maximum period specified with `max_days`. start : :any:`datetime.datetime` A timezone-aware datetime that represents the earliest allowable start date for the reporting data, i.e., the earliest date for which data is available after the intervention begins. end : :any:`datetime.datetime` A timezone-aware datetime that represents the latest allowable end date for the reporting data. The stricter of this or `max_days` is used to determine the latest allowable reporting period date. max_days : :any:`int`, default 365 The maximum length of the period. Ignored if `start` is not set. The stricter of this or `end` is used to determine the latest allowable reporting period date. allow_billing_period_overshoot : :any:`bool`, default False If True, count `max_days` from the start of the first billing data period that starts after the `start` date, rather than from the exact `start` date. Otherwise use the exact `start` date as the cutoff. ignore_billing_period_gap_for_day_count : :any:`bool`, default False If True, instead of going forward `max_days` from either the `start` date or the `start` of the first billing period after that date (depending on the value of the `allow_billing_period_overshoot` setting) and excluding the first period that ended after that date, first check to see if excluding or including that period gets closer to a total of `max_days` of data. For example, with `max_days=365`, if an exact 365 period would targeted Feb 15, but the billing period went from Jan 20 to Feb 20, include that period for a total of ~370 days of data, because that's closer to 365 than ~340 days, which would be the total if that period was excluded. If, on the other hand, if that period started Feb 10 and went to Mar 10, exclude the period, because ~360 days of data is closer to than ~390. Returns ------- reporting_data, warnings : :any:`tuple` of (:any:`pandas.DataFrame` or :any:`pandas.Series`, :any:`list` of :any:`eemeter.EEMeterWarning`) Data for only the specified reporting period and any associated warnings. """ if max_days is not None: if end is not None: raise ValueError( # pragma: no cover "If max_days is set, end cannot be set: end={}, max_days={}.".format( end, max_days ) ) start_inf = False if start is None: # py datetime min/max are out of range of pd.Timestamp min/max start_limit = pytz.UTC.localize(pd.Timestamp.min) + timedelta(days=1) start_inf = True else: start_limit = start end_inf = False if end is None: end_target = pytz.UTC.localize(pd.Timestamp.max) - timedelta(days=1) end_inf = True else: end_target = end # copying prevents setting on slice warnings data_after_start_limit = data[start_limit:].copy() if ignore_billing_period_gap_for_day_count: start_limit = data_after_start_limit.index.min() if not start_inf and max_days is not None: end_target = start_limit + timedelta(days=max_days) if allow_billing_period_overshoot: # adjust start limit to get a selection closest to max_days # also consider bfill for get_loc method - always picks next try: loc = data_after_start_limit.index.get_loc(end_target, method="nearest") except (KeyError, IndexError): # pragma: no cover reporting_data = data_after_start_limit end_limit = end_target else: end_limit = data_after_start_limit.index[loc] reporting_data = data_after_start_limit[:end_limit].copy() else: # use hard limit for baseline start end_limit = end_target reporting_data = data_after_start_limit[:end_limit].copy() if reporting_data.dropna().empty: raise NoReportingDataError() reporting_data.iloc[-1] = np.nan data_end = data.index.max() data_start = data.index.min() return ( reporting_data, _make_reporting_warnings( end_inf, start_inf, data_start, data_end, start_limit, end_limit ), )
[docs]class Term(object): """ The term object represents a subset of an index. Attributes ---------- index : :any:`pandas.DatetimeIndex` The index of the term. Includes a period at the end meant to be NaN-value. label : :any:`str` The label for the term. target_start_date : :any:`pandas.Timestamp` or :any:`datetime.datetime` The start date inferred for this term from the start date and target term lenths. target_end_date : :any:`pandas.Timestamp` or :any:`datetime.datetime` The end date inferred for this term from the start date and target term lenths. target_term_length_days : :any:`int` The number of days targeted for this term. actual_start_date : :any:`pandas.Timestamp` The first date in the index. actual_end_date : :any:`pandas.Timestamp` The last date in the index. actual_term_length_days : :any:`int` The number of days between the actual start date and actual end date. complete : :any:`bool` True if this term is conclusively complete, such that additional data added to the series would not add more data to this term. """ def __init__( self, index, label, target_start_date, target_end_date, target_term_length_days, actual_start_date, actual_end_date, actual_term_length_days, complete, ): self.index = index self.label = label self.target_start_date = target_start_date self.target_end_date = target_end_date self.target_term_length_days = target_term_length_days self.actual_start_date = actual_start_date self.actual_end_date = actual_end_date self.actual_term_length_days = actual_term_length_days self.complete = complete def __repr__(self): return ( "Term(label={}, target_term_length_days={}, actual_term_length_days={}," " complete={})" ).format( self.label, self.target_term_length_days, self.actual_term_length_days, self.complete, )
[docs]def get_terms(index, term_lengths, term_labels=None, start=None, method="strict"): """Breaks a :any:`pandas.DatetimeIndex` into consecutive terms of specified lengths. Parameters ---------- index : :any:`pandas.DatetimeIndex` The index to split into terms, generally `meter_data.index` or `temperature_data.index`. term_lengths : :any:`list` of :any:`int` The lengths (in days) of the terms into which to split the data. term_labels : :any:`list` of :any:`str`, default None Labels to use for each term. List must be the same length as the `term_lengths` list. start : :any:`datetime.datetime`, default None A timezone-aware datetime that represents the earliest allowable start date for the terms. If None, use the first element of the index. method: one of ['strict', 'nearest'], default 'strict' The method to use to get terms. - "strict": Ensures that the term end will come on or before the length of Returns ------- terms : :any:`list` of :any:`eemeter.Term` A dataframe of term labels with the same :any:`pandas.DatetimeIndex` given as `index`. This can be used to filter the original data into terms of approximately the desired length. """ if method == "strict": get_loc_method = "pad" elif method == "nearest": get_loc_method = "nearest" else: raise ValueError( "method {} not supported - use either 'strict' or 'closest'".format(method) ) if not index.is_monotonic_increasing: raise ValueError("get_terms requires a sorted index") if term_labels is None: term_labels = [ "term_{:03d}".format(i + 1) for i, term_length in enumerate(term_lengths) ] elif len(term_labels) != len(term_lengths): raise ValueError( "term_labels (len {}) must be the same length as term_length (len {})".format( len(term_labels), len(term_lengths) ) ) if start is None: prev_start = index.min() else: prev_start = start term_end_targets = [ prev_start + timedelta(days=sum(term_lengths[: i + 1])) for i in range(len(term_lengths)) ] terms = [] remaining_index = index[index >= prev_start] for label, target_term_length, end_target in zip( term_labels, term_lengths, term_end_targets ): if len(remaining_index) <= 1: break next_index = remaining_index.get_loc(end_target, method=get_loc_method) # keep one extra index point for the end NaN - this could be confusing, but # helps identify the full range of the last data point term_index = remaining_index[: next_index + 1] # find the next start next_start = remaining_index[next_index] # reset the remaining index remaining_index = remaining_index[next_index:] # There may be a better way to tell if the term is conclusively complete, # but the logic here is that if there's more than one remaining point then # the term must be complete - since that final point was a worse candidate # than the one before it which was chosen. complete = len(remaining_index) > 1 terms.append( Term( index=term_index, label=label, target_start_date=prev_start, target_end_date=end_target, target_term_length_days=target_term_length, actual_start_date=term_index[0], actual_end_date=term_index[-1], actual_term_length_days=(term_index[-1] - term_index[0]).days, complete=complete, ) ) # reset the previous start prev_start = next_start return terms
def clean_caltrack_billing_data(data, source_interval): # check for empty data if data["value"].dropna().empty: return data[:0] if source_interval.startswith("billing"): diff = list((data.index[1:] - data.index[:-1]).days) filter_ = pd.Series(diff + [np.nan], index=data.index) # CalTRACK 2.2.3.4, 2.2.3.5 if source_interval == "billing_monthly": data = data[ (filter_ <= 35) & (filter_ >= 25) # keep these, inclusive ].reindex(data.index) # CalTRACK 2.2.3.4, 2.2.3.5 if source_interval == "billing_bimonthly": data = data[ (filter_ <= 70) & (filter_ >= 25) # keep these, inclusive ].reindex(data.index) # CalTRACK 2.2.3.1 """ Adds estimate to subsequent read if there aren't more than one estimate in a row and then removes the estimated row. Input: index value estimated 1 2 False 2 3 False 3 5 True 4 4 False 5 6 True 6 3 True 7 4 False 8 NaN NaN Output: index value 1 2 2 3 4 9 5 NaN 7 7 8 NaN """ add_estimated = [] remove_estimated_fixed_rows = [] orig_data = data.copy() if "estimated" in data.columns: data["unestimated_value"] = ( data[:-1].value[(data[:-1].estimated == False)].reindex(data.index) ) data["estimated_value"] = ( data[:-1].value[(data[:-1].estimated)].reindex(data.index) ) for i, (index, row) in enumerate(data[:-1].iterrows()): # ensures there is a prev_row and previous row value is null if i > 0 and pd.isnull(prev_row["unestimated_value"]): # current row value is not null add_estimated.append(prev_row["estimated_value"]) if not pd.isnull(row["unestimated_value"]): # get all rows that had only estimated reads that will be # added to the subsequent row meaning this row # needs to be removed remove_estimated_fixed_rows.append(prev_index) else: add_estimated.append(0) prev_row = row prev_index = index add_estimated.append(np.nan) data["value"] = data["unestimated_value"] + add_estimated data = data[~data.index.isin(remove_estimated_fixed_rows)] data = data[["value"]] # remove the estimated column # check again for empty data if data.dropna().empty: return data[:0] return data def downsample_and_clean_caltrack_daily_data(data): data = as_freq(data.value, "D", include_coverage=True) # CalTRACK 2.2.2.1 - interpolate with average of non-null values data.value[data.coverage > 0.5] = ( data[data.coverage > 0.5].value / data[data.coverage > 0.5].coverage ) # CalTRACK 2.2.2.1 - discard days with less than 50% coverage return data[data.coverage > 0.5].reindex(data.index)[["value"]] def clean_caltrack_billing_daily_data(data, source_interval): # billing data is cleaned but not resampled if source_interval.startswith("billing"): # CalTRACK 2.2.3.4, 2.2.3.5 return clean_caltrack_billing_data(data, source_interval) # higher intervals like daily, hourly, 30min, 15min are # resampled (daily) or downsampled (hourly, 30min, 15min) elif source_interval == "daily": return data else: return downsample_and_clean_caltrack_daily_data(data)