Source code for nem_bidding_dashboard.postgres_helpers

import math

import numpy as np
import pandas as pd
import psycopg
from psycopg.rows import dict_row


[docs]def build_connection_string( hostname, dbname, username, password, port, timeout_seconds=None ): """ Creates a properly formatted connection string for connecting to a PostgresSQL database. Examples: >>> from nem_bidding_dashboard import postgres_helpers >>> con_string = postgres_helpers.build_connection_string( ... hostname='localhost', ... dbname='bidding_dashboard_db', ... username='bidding_dashboard_maintainer', ... password='1234abcd', ... port=5433) Args: hostname: str where the database is hosted dbname: str name of database to connect to username: str name of user that has read and write permission on database password: str password of user port: int or str port that you can connect to the database on timeout_seconds: int number of seconds before timeout Returns: str in formatted required for connecting to postgres database. """ connection_string = "host={hostname} dbname={dbname} user={username} password={password} port={port}" connection_string = connection_string.format( hostname=hostname, dbname=dbname, username=username, password=password, port=port, ) if timeout_seconds is not None: timeout_setting = " options='-c statement_timeout={time_milliseconds}'".format( time_milliseconds=timeout_seconds * 1000 ) connection_string += timeout_setting return connection_string
_list_of_tables = [ "bidding_data", "demand_data", "duid_info", "price_bins", "unit_dispatch", ] _list_of_functions = [ "distinct_unit_types", "aggregate_bids_v2", "aggregate_dispatch_data", "aggregate_dispatch_data_duids", "get_bids_by_unit", "get_duids_for_stations", "get_duids_and_stations", "aggregate_prices", ]
[docs]def drop_tables_and_functions(connection_string): """ Drop all the tables and functions created by build_postgres.create_db_tables_and_functions. Intended for to help developement when testing the creation of tables and functions. Examples: >>> con_string = build_connection_string( ... hostname='localhost', ... dbname='bidding_dashboard_db', ... username='bidding_dashboard_maintainer', ... password='1234abcd', ... port=5433) >>> drop_tables_and_functions(con_string) Args: connection_string: str for connecting to PostgresSQL database, the function :py:func:`nem_bidding_dashboard.postgres_helpers.build_connection_string` can be used to build a properly formated connection string, or alternative any string that matches the format allowed by `PostgresSQL <https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING>`_ can be used """ with psycopg.connect(connection_string) as conn: with conn.cursor() as cur: for table in _list_of_tables: cur.execute("DROP TABLE IF EXISTS {}".format(table)) for function in _list_of_functions: cur.execute("DROP FUNCTION IF EXISTS {}".format(function)) conn.commit()
[docs]def insert_data_into_postgres(connection_string, table_name, data): """Insert data into the postgres database. Arguments: connection_string: str for connecting to PostgresSQL database, the function :py:func:`nem_bidding_dashboard.postgres_helpers.build_connection_string` can be used to build a properly formated connection string, or alternative any string that matches the format allowed by `PostgresSQL <https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING>`_ can be used table_name: str which is the name of the table in the postgres database data: pd dataframe of data to be uploaded """ with psycopg.connect(connection_string) as conn: with conn.cursor() as cur: rows_per_chunk = 5000 data.columns = data.columns.str.lower() number_of_chunks = math.ceil(data.shape[0] / rows_per_chunk) chunked_data = np.array_split(data, number_of_chunks) for chunk in chunked_data: column_list = [ c if " " not in c else '"' + c + '"' for c in data.columns ] columns = ", ".join(column_list) place_holders = ",".join(["%s" for c in data.columns]) sets = ", ".join( ["{c} = excluded.{c}".format(c=c) for c in column_list] ) query = ( "INSERT INTO {table_name}({columns}) VALUES({place_holders}) ON CONFLICT ON CONSTRAINT " + "{table_name}_pkey DO UPDATE SET {sets};" ) query = query.format( table_name=table_name, columns=columns, place_holders=place_holders, sets=sets, ) chunk = list(chunk.itertuples(index=False, name=None)) cur.executemany(query, chunk) conn.commit()
[docs]def run_query_return_dataframe(connection_string, query): """ Sends an arbitary query to the specified postgres database and return the result as a pd.DataFrame. Should only be used for queries that return a result as a table. Handles opening and closing connection to database. Examples: >>> from nem_bidding_dashboard import postgres_helpers >>> con_string = postgres_helpers.build_connection_string( ... hostname='localhost', ... dbname='bidding_dashboard_db', ... username='bidding_dashboard_maintainer', ... password='1234abcd', ... port=5433) >>> run_query_return_dataframe(con_string, "select * from duid_info limit 10;") DUID REGION ... UNIT TYPE STATION NAME 0 ADPBA1G SA ... Battery Discharge Adelaide Desalination Plant 1 ADPBA1L SA ... Battery Charge Adelaide Desalination Plant 2 ADPMH1 SA ... Run of River Hydro Adelaide Desalination Plant 3 ADPPV3 SA ... Solar Adelaide Desalination Plant 4 ADPPV2 SA ... Solar Adelaide Desalination Plant 5 ADPPV1 SA ... Solar Adelaide Desalination Plant 6 AGLSITA1 NSW ... Engine Agl Kemps Creek 7 ANGAST1 SA ... Engine Angaston Power Station 8 APPIN NSW ... Engine Appin Power Plant 9 ARWF1 VIC ... Wind Ararat Wind Farm <BLANKLINE> [10 rows x 7 columns] Args: connection_string: str for connecting to PostgresSQL database, the function :py:func:`nem_bidding_dashboard.postgres_helpers.build_connection_string` can be used to build a properly formated connection string, or alternative any string that matches the format allowed by `PostgresSQL <https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING>`_ can be used query: str which is postgres select query Returns: pd.DataFrame column as per the query provided """ with psycopg.connect(connection_string) as conn: with conn.cursor(row_factory=dict_row) as cur: cur.execute(query) data = cur.fetchall() data = pd.DataFrame(data) data.columns = data.columns.str.upper() return data
[docs]def run_query(connection_string, query, autocommit=False): """ Run a genric query in the database which isn't inserting or retrieving data. For example, creating and dropping tables, functions and indexes. Examples: >>> import os >>> from nem_bidding_dashboard import postgres_helpers >>> con_string = postgres_helpers.build_connection_string( ... hostname=os.environ.get("SUPABASEADDRESS"), ... dbname='postgres', ... username='postgres', ... password=os.environ.get("SUPABASEPASSWORD"), ... port=5432, ... timeout_seconds=6000) # >>> run_query(con_string, "DROP INDEX unit_dispatch_time_hour_duid_index;") # # >>> run_query(con_string, "DROP INDEX unit_dispatch_time_duid_index;") >>> run_query(con_string, "CREATE INDEX unit_dispatch_time_hour_duid_index ON unit_dispatch (interval_datetime DESC, onhour, duid);") >>> run_query(con_string, "CREATE INDEX unit_dispatch_time_duid_index ON unit_dispatch (interval_datetime DESC, duid);") Args: connection_string: str for connecting to PostgresSQL database, the function :py:func:`nem_bidding_dashboard.postgres_helpers.build_connection_string` can be used to build a properly formated connection string, or alternative any string that matches the format allowed by `PostgresSQL <https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING>`_ can be used query: str query to run in supabase database autocommit: boolean, set to True to run queries that must be run outside a transaction such as vaccum """ with psycopg.connect(connection_string, autocommit=autocommit) as conn: with conn.cursor() as cur: cur.execute(query) conn.commit()