Source code for nem_bidding_dashboard.populate_supabase_db

import math
import os
import time
from datetime import datetime, timedelta

import numpy as np
import pandas as pd
import pytz
from supabase import create_client

from nem_bidding_dashboard import fetch_and_preprocess, input_validation

"""This module is used for populating the database used by the dashboard. The functions it contains co-ordinate
 fetching historical AEMO data, pre-processing to limit the work done by the dashboard (to improve responsiveness),
 and loading the processed data into the database. It will contain functions for both populating the production
 version for the hosted version of the dashboard and functions for populating an sqlite database for use by user
 on their local machine."""


[docs]def insert_data_into_supabase(table_name, data, rows_per_chunk=1000): """Insert data into the supabase database. For this function to run the supabase url and key need to be configured as environment variables labeled SUPABASE_BIDDING_DASHBOARD_URL and SUPABASE_BIDDING_DASHBOARD_WRITE_KEY respectively. Arguments: table_name: str which is the name of the table in the supabase database data: pd dataframe of data to be uploaded """ url = os.environ.get("SUPABASE_BIDDING_DASHBOARD_URL") key = os.environ.get("SUPABASE_BIDDING_DASHBOARD_WRITE_KEY") supabase = create_client(url, key) data.columns = data.columns.str.lower() number_of_chunks = math.ceil(data.shape[0] / rows_per_chunk) chunked_data = np.array_split(data, number_of_chunks) original_length = len(chunked_data) chunks_at_once = 1 while chunked_data: chunk = [] for i in range(0, chunks_at_once): if chunked_data: chunk.append(chunked_data.pop(0)) chunk = pd.concat(chunk) trying = True while trying: try: t0 = time.time() supabase.table(table_name).upsert(chunk.to_dict("records")).execute() tt = time.time() - t0 if tt > 2: chunks_at_once -= 1 else: chunks_at_once += 1 if chunks_at_once == 0: chunks_at_once = 1 print(tt) print(chunk.shape) trying = False except Exception as e: print(e) print("Upload of chunk failed waiting 10 min and trying again.") chunks_at_once = 1 time.sleep(60 * 10) supabase = create_client(url, key) finally: print(len(chunked_data) / original_length)
[docs]def region_data(raw_data_cache, start_time, end_time): """ Function to populate database table containing electricity demand and price data by region. For this function to run the supabase url and key need to be configured as environment variables labeled SUPABASE_BIDDING_DASHBOARD_URL and SUPABASE_BIDDING_DASHBOARD_WRITE_KEY respectively. Data is prepped for loading by the function :py:func:`nem_bidding_dashboard.fetch_and_preprocess.region_data`. Examples: >>> region_data( ... "D:/nemosis_cache", ... "2020/01/01 00:00:00", ... "2020/01/02 00:00:00") Arguments: start_time: Initial datetime, formatted "DD/MM/YYYY HH:MM:SS" end_time: Ending datetime, formatted identical to start_time raw_data_cache: Filepath to directory for storing data that is fetched """ input_validation.validate_start_end_and_cache_location( start_time, end_time, raw_data_cache ) regional_data = fetch_and_preprocess.region_data( start_time, end_time, raw_data_cache ) insert_data_into_supabase("demand_data", regional_data)
[docs]def bid_data(raw_data_cache, start_time, end_time): """ Function to populate database table containing bidding data by unit. For this function to run the supabase url and key need to be configured as environment variables labeled SUPABASE_BIDDING_DASHBOARD_URL and SUPABASE_BIDDING_DASHBOARD_WRITE_KEY respectively. Data is prepped for loading by the function :py:func:`nem_bidding_dashboard.fetch_and_preprocess.bid_data`. Examples: >>> bid_data( ... "D:/nemosis_cache", ... "2020/01/01 00:00:00", ... "2020/01/02 00:00:00") Arguments: start_time: Initial datetime, formatted "DD/MM/YYYY HH:MM:SS" end_time: Ending datetime, formatted identical to start_time raw_data_cache: Filepath to directory for storing data that is fetched """ input_validation.validate_start_end_and_cache_location( start_time, end_time, raw_data_cache ) combined_bids = fetch_and_preprocess.bid_data(start_time, end_time, raw_data_cache) insert_data_into_supabase("bidding_data", combined_bids)
[docs]def duid_info(raw_data_cache): """ Function to populate database table containing bidding data by unit. For this function to run the supabase url and key need to be configured as environment variables labeled SUPABASE_BIDDING_DASHBOARD_URL and SUPABASE_BIDDING_DASHBOARD_WRITE_KEY respectively. Data is prepped for loading by the function :py:func:`nem_bidding_dashboard.fetch_and_preprocess.duid_info`. Examples: >>> duid_info( ... "2020/01/01 00:00:00", ... "2020/01/02 00:00:00", ... "D:/nemosis_cache") Arguments: start_time: Initial datetime, formatted "DD/MM/YYYY HH:MM:SS" end_time: Ending datetime, formatted identical to start_time raw_data_cache: Filepath to directory for storing data that is fetched """ input_validation.data_cache_exits(raw_data_cache) duid_info = fetch_and_preprocess.duid_info(raw_data_cache) insert_data_into_supabase("duid_info", duid_info)
[docs]def unit_dispatch(raw_data_cache, start_time, end_time): """ Function to populate database table containing unit time series metrics. For this function to run the supabase url and key need to be configured as environment variables labeled SUPABASE_BIDDING_DASHBOARD_URL and SUPABASE_BIDDING_DASHBOARD_WRITE_KEY respectively. Data is prepped for loading by the function :py:func:`nem_bidding_dashboard.fetch_and_preprocess.unit_dispatch`. Examples: >>> unit_dispatch( ... "D:/nemosis_cache", ... "2020/01/01 00:00:00", ... "2020/01/02 00:00:00") Arguments: start_time: Initial datetime, formatted "DD/MM/YYYY HH:MM:SS" end_time: Ending datetime, formatted identical to start_time raw_data_cache: Filepath to directory for storing data that is fetched """ input_validation.validate_start_end_and_cache_location( start_time, end_time, raw_data_cache ) unit_time_series_metrics = fetch_and_preprocess.unit_dispatch( start_time, end_time, raw_data_cache ) insert_data_into_supabase( "unit_dispatch", unit_time_series_metrics, rows_per_chunk=500 )
[docs]def price_bin_edges_table(): """ Function to populate database table containing bin definitions for aggregating bids. For this function to run the supabase url and key need to be configured as environment variables labeled SUPABASE_BIDDING_DASHBOARD_URL and SUPABASE_BIDDING_DASHBOARD_WRITE_KEY respectively. Data is prepped for loading by the function :py:func:`nem_bidding_dashboard.fetch_and_preprocess.region_data`. Examples: >>> price_bin_edges_table() """ price_bins = fetch_and_preprocess.define_and_return_price_bins() insert_data_into_supabase("price_bins", price_bins)
[docs]def all_tables_two_most_recent_market_days(cache): """ Upload data to supabase for a window starting at 4 am of the current day and going back 48 hrs. Upload is performed for all tables except price_bin_edges. Examples: >>> all_tables_two_most_recent_market_days( ... "D:/nemosis_data_cache") Arguments: cache: str the directory to use for caching data prior to upload. """ current_time = datetime.now(pytz.timezone("Etc/GMT-10")) start_today = datetime( year=current_time.year, month=current_time.month, day=current_time.day, hour=4, ) two_days_before_today = start_today - timedelta(days=2) start_today = start_today.isoformat().replace("T", " ").replace("-", "/") two_days_before_today = ( two_days_before_today.isoformat().replace("T", " ").replace("-", "/") ) region_data( raw_data_cache=cache, start_time=two_days_before_today, end_time=start_today ) bid_data( raw_data_cache=cache, start_time=two_days_before_today, end_time=start_today ) duid_info(raw_data_cache=cache) unit_dispatch( raw_data_cache=cache, start_time=two_days_before_today, end_time=start_today )
if __name__ == "__main__": raw_data_cache = "D:/nemosis_data_cache" start = "2023/01/16 00:00:00" end = "2023/01/16 08:00:00" print(start) print(end) # populate_supabase_duid_info(raw_data_cache) # region_data(start, end, raw_data_cache) # bid_data(start, end, raw_data_cache) # unit_dispatch(start, end, raw_data_cache) price_bin_edges_table() # populate_supabase_duid_info(raw_data_cache) # populate_supabase_all_tables_two_most_recent_market_days(raw_data_cache)