#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Nov 29 18:53:14 2020
@author: maurer
"""
from tqdm import tqdm
from contextlib import closing
import sqlite3
import pandas as pd
from datetime import timedelta
import time
from sqlalchemy import create_engine
from contextlib import contextmanager
from entsoe import EntsoePandasClient
from entsoe.mappings import PSRTYPE_MAPPINGS, NEIGHBOURS, Area
from entsoe.exceptions import NoMatchingDataError, InvalidBusinessParameterError
from requests.exceptions import HTTPError
import logging
logging.basicConfig()
log = logging.getLogger('entsoe')
log.setLevel(logging.INFO)
from_n = []
to_n = []
for n1 in NEIGHBOURS:
for n2 in NEIGHBOURS[n1]:
from_n.append(n1)
to_n.append(n2)
neighbours = pd.DataFrame({'from': from_n, 'to': to_n})
all_countries = [e.name for e in Area]
[docs]
def sanitize_series(seriesname):
"""
replaces illegal values from a series name
for insertion into database
Parameters
----------
seriesname : str
name of the series
Returns
-------
st : str
"""
st = str.replace(str(seriesname), ')', '')
st = str.replace(st, '(', '')
st = str.replace(st, ',', '')
st = str.replace(st, "'", '')
st = st.strip()
st = str.replace(st, ' ', '_')
return st
[docs]
def calculate_nett_generation(df):
"""
Calculates the difference between columns ending with _actual_aggregated and _actual_consumption.
Parameters
----------
df : pd.DataFrame
DataFrame with columns ending with _actual_aggregated or _actual_consumption
Returns
-------
dat: pd.DataFrame
"""
dat = df.copy()
for c in filter(lambda x: x.endswith('_actual_aggregated'), dat.columns):
new = str.replace(c, '_actual_aggregated', '')
dif = list(filter(lambda x: x.endswith('_actual_consumption')
and x.startswith(new), dat.columns))
if len(dif) > 0:
# calc difference if both exists
dat[new] = dat[c]-dat[dif[0]]
del dat[c] # delete handled series of our copy here
del dat[dif[0]]
else:
# if no consumption exists, directly return aggregated
dat[new] = dat[c]
del dat[c]
for c in filter(lambda x: x.endswith('_actual_consumption'), dat.columns):
# if only consumption exists use the negative value
new = str.replace(c, '_actual_consumption', '')
dat[new] = -dat[c]
del dat[c]
return dat
[docs]
class EntsoeCrawler:
"""
class to allow easier crawling of ENTSO-E timeseries data
Parameters
----------
database: str
database connection string or path to sqlite db
"""
def __init__(self, database):
# choice between pg and sqlite
if database.startswith('postgresql'):
self.engine = create_engine(database)
@contextmanager
def access_db():
"""contextmanager to handle opening of db, similar to closing for sqlite3"""
with self.engine.connect() as conn, conn.begin():
yield conn
self.db_accessor = access_db
else:
self.db_accessor = lambda: closing(sqlite3.connect(database))
[docs]
def init_base_sql(self):
"""
write static data to database once
"""
psrtype = pd.DataFrame.from_dict(
PSRTYPE_MAPPINGS, orient='index', columns=['prod_type'])
areas = pd.DataFrame([[e.name, e.value, e.tz, e.meaning]
for e in Area], columns=['name', 'value', 'tz', 'meaning'])
with self.db_accessor() as conn:
areas.columns = [x.lower() for x in areas.columns]
psrtype.columns = [x.lower() for x in psrtype.columns]
areas.to_sql('areas', conn, if_exists='replace')
psrtype.to_sql('psrtype', conn, if_exists='replace')
[docs]
def fetch_and_write_entsoe_df_to_db(self, country, proc, start, end):
"""
Crawl data from ENTSO-E transparency platform and write it to the database
Parameters
----------
country : str
2-letter country code
proc :
procedure of entsoe-py client
start : pd.Timestamp
start time
end : pd.Timestamp
end time
Returns
-------
"""
try:
try:
data = pd.DataFrame(proc(country, start=start, end=end))
except NoMatchingDataError as e:
raise e
except HTTPError as e:
log.error(f'{e.response.status_code} - {e.response.reason}')
if e.response.status_code == 400:
raise e
else:
log.info(f'retrying: {repr(e)}, {start}, {end}')
time.sleep(10)
data = pd.DataFrame(proc(country, start=start, end=end))
except Exception as e:
log.info(f'retrying: {repr(e)}, {start}, {end}')
time.sleep(10)
data = pd.DataFrame(proc(country, start=start, end=end))
# replace spaces and invalid chars in column names
data.columns = [sanitize_series(x).lower() for x in data.columns]
data = data.fillna(0)
# XXX could have used nett=True in entsoe-py client
# calculate difference betweeen agg and consumption
data = calculate_nett_generation(data)
# add country column
data['country'] = country
try:
with self.db_accessor() as conn:
data.to_sql(proc.__name__, conn, if_exists='append')
except Exception as e:
with self.db_accessor() as conn:
log.info(f'handling {repr(e)} by concat')
# merge old data with new data
prev = pd.read_sql_query(
f'select * from {proc.__name__}', conn, index_col='index')
dat = pd.concat([prev, data])
# convert type as pandas needs it
dat.index = dat.index.astype('datetime64[ns]')
dat.to_sql(proc.__name__, conn, if_exists='replace')
log.info(f'replaced table {proc.__name__}')
except NoMatchingDataError:
log.error(f'no data found for {proc.__name__}, {country}, {start}, {end}')
except Exception as e:
log.error(f'error downloading {proc.__name__}, {country}, {start}, {end}')
[docs]
def get_latest_crawled_timestamp(self, start, delta, tablename, tz='Europe/Berlin'):
"""
Find the best Start for the given procedurename by finding the last timestemp where data was collected for.
Also calculates the best delta to update until today.
Parameters
----------
start : pd.Timestamp
delta : pd.Timedelta
to check if a delta has already been set
tablename : str
name of the table
tz : str
(Default value = 'Europe/Berlin')
Returns
-------
type
start : pd.Timestamp
best start
delta : pd.Timedelta
best delta
"""
if start and delta:
return start, delta
else:
try:
with self.db_accessor() as conn:
query = f'select max("index") from {tablename}'
d = conn.execute(query).fetchone()[0]
start = pd.to_datetime(d)
try:
start = start.tz_localize('Europe/Berlin')
except TypeError:
# if already localized
pass
except Exception as e:
start = pd.Timestamp('20150101', tz=tz)
log.info(f'using default {start} timestamp ({e})')
end = pd.Timestamp.now(tz=tz)
delta = end-start
return start, delta
[docs]
def download_entsoe(self, countries, proc, start, delta, times):
"""
Downloads data with a procedure from a EntsoePandasClient
and stores it in the configured database
Parameters
----------
countries : list[str]
list of country codes
proc :
procedure of entsoe-py
start : pd.Timestamp
delta : pd.Timedelta
times : int
Returns
-------
"""
log.info(f'****** {proc.__name__} *******')
if (times*delta).days < 2:
log.info('nothing to do')
return
for i in range(times):
start_ = start + i * delta
end_ = start + (i+1)*delta
# daten für jedes Land runterladen
pbar = tqdm(countries)
for country in pbar:
pbar.set_description(
f"{country} {start_:%Y-%m-%d} to {end_:%Y-%m-%d}")
self.fetch_and_write_entsoe_df_to_db(country, proc, start_, end_)
# indexe anlegen für schnelles suchen
try:
with self.db_accessor() as conn:
log.info(f'creating index country_idx_{proc.__name__}')
query = (
f'CREATE INDEX IF NOT EXISTS "country_idx_{proc.__name__}" ON "{proc.__name__}" ("country", "index");')
conn.execute(query)
#query = (f'CREATE INDEX IF NOT EXISTS "country_{proc.__name__}" ON "{proc.__name__}" ("country");')
# conn.execute(query)
log.info(f'created indexes country_idx_{proc.__name__}')
except Exception as e:
log.error(f'could not create index if needed: {e}')
# falls es eine TimescaleDB ist, erzeuge eine Hypertable
try:
with self.db_accessor() as conn:
query_create_hypertable = f"SELECT create_hypertable('{proc.__name__}', 'index', if_not_exists => TRUE, migrate_data => TRUE);"
conn.execute(query_create_hypertable)
log.info(f'created hypertable {proc.__name__}')
except Exception as e:
log.error(f'could not create hypertable: {e}')
[docs]
def pull_crossborders(self, start, delta, times, proc, allZones=True):
"""
Pulls transmissions across borders from entsoe
Parameters
----------
start :
param delta:
times :
param proc:
allZones :
Default value = True)
delta :
param proc:
proc :
Returns
-------
"""
start, delta = self.get_latest_crawled_timestamp(start, delta, proc.__name__)
log.info(f'****** {proc.__name__} *******')
if (times*delta).days < 2:
log.info('nothing to do')
return
for i in range(times):
data = pd.DataFrame()
start_ = start + i * delta
end_ = start + (i+1)*delta
log.info(start_)
for n1 in tqdm(NEIGHBOURS):
for n2 in NEIGHBOURS[n1]:
try:
if (len(n1) == 2 and len(n2) == 2) or allZones:
dataN = proc(n1, n2, start=start_, end=end_)
data[n1+'-'+n2] = dataN
except (NoMatchingDataError, InvalidBusinessParameterError):
#log.info('no data found for ',n1,n2)
pass
except Exception as e:
log.error(f'Error crawling Crossboarders {e}')
data = data.copy()
data.columns = [x.lower() for x in data.columns]
try:
with self.db_accessor() as conn:
data.to_sql(proc.__name__, conn, if_exists='append')
except Exception as e:
log.error(f'error saving crossboarders {e}')
with self.db_accessor() as conn:
prev = pd.read_sql_query(
f'select * from {proc.__name__}', conn, index_col='index')
ges = pd.concat([prev, data])
ges.index = ges.index.astype('datetime64[ns]')
ges.to_sql(proc.__name__, conn, if_exists='replace')
log.info(f'fixed error by adding new columns to crossborders')
try:
with self.db_accessor() as conn:
query_create_hypertable = f"SELECT create_hypertable('{proc.__name__}', 'index', if_not_exists => TRUE, migrate_data => TRUE);"
conn.execute(query_create_hypertable)
except Exception as e:
log.error(f'could not create hypertable: {e}')
[docs]
def save_power_system_data(self):
"""
pulls static data from opsd and reads it into the database
- used for mapping existing power plants from entsoe to a location on a map
Parameters
----------
Returns
-------
"""
df = pd.read_csv(
'https://data.open-power-system-data.org/conventional_power_plants/latest/conventional_power_plants_EU.csv')
df = df.dropna(axis=0, subset=['lon', 'lat', 'eic_code'])
df = df[['eic_code', 'name', 'company', 'country',
'capacity', 'energy_source', 'lon', 'lat']]
# delete those without location or eic_code
with self.db_accessor() as conn:
df.to_sql('powersystemdata', conn, if_exists='replace')
return df
[docs]
def download_entsoe_plant_data(self, countries, client, start, delta, times):
"""
Allows to download the generation per power plant from entsoe.
Uses download_entsoe to write the data into the DB.
Parameters
----------
countries : list[str]
list of 2-letter countrycodes
client : entsoe.EntsoePandasClient
DataFrameClient of entsoe-py package
start : pd.Timestamp
timestamp aware pd.Timestamp
delta : pd.Timedelta
Timedelta to fetch data for per bulk
times : int
number of bulks with size delta to fetch
Returns
-------
"""
# new proxy function
def query_per_plant(country, start, end):
"""
wrapper function around query_generation_per_plant to convert multiindex
Parameters
----------
country : str
country to fetch
start : pd.DateTime
param end:
end :
Returns
-------
"""
ppp = client.query_generation_per_plant(
country, start=start, end=end)
# convert multiindex into second column
pp = ppp.melt(var_name=['name', 'type'],
value_name='value', ignore_index=False)
return pp
log.info(f'****** {query_per_plant.__name__} *******')
start_, delta_ = self.get_latest_crawled_timestamp(start, delta, query_per_plant.__name__)
self.download_entsoe(countries, query_per_plant, start_, delta=delta_, times=times)
try:
with self.db_accessor() as conn:
query = 'CREATE INDEX IF NOT EXISTS "idx_name_query_per_plant" ON "query_per_plant" ("name", "index", "country");'
conn.execute(query)
except Exception as e:
log.error(f'could not create index: {e}')
try:
with self.db_accessor() as conn:
query = 'select distinct name, country,type from query_per_plant'
names = pd.read_sql_query(query, conn)
names.to_sql('plant_names', conn, if_exists='replace')
except Exception as e:
log.error(f'could not create plant_names: {e}')
[docs]
def countries_with_plant_data(self, client, countries=all_countries, st=pd.Timestamp('20180101', tz='Europe/Berlin')):
"""
checks for all countries if the have available data at date.
Returns list of countries with existing generation data per plant at given timestamp
Parameters
----------
client : entsoe.EntsoePandasClient
countries : list[str], default all_countries
Returns
-------
plant_countries : list[str]
list of country_codes with existing data for generation per plant
"""
plant_countries = []
log.info(f'****** find countries with plant_data *******')
for country in countries:
try:
_ = client.query_generation_per_plant(
country, start=st, end=st+timedelta(days=1))
plant_countries.append(country)
log.info(f'found data for {country}')
except Exception:
continue
return plant_countries
[docs]
def update_database(self, client, start=None, delta=None, countries=all_countries):
"""
Runs everything which is needed to update the database and pull the data since the last successful pull.
Parameters
----------
client : entsoe.EntsoePandasClient
entsoe-py client
delta : pd.Timedelta
countries : list[str], default all_countries
start : pd.Timestamp
Returns
-------
"""
proc_cap = client.query_installed_generation_capacity
start_, delta_ = self.get_latest_crawled_timestamp(start, delta, proc_cap.__name__)
if delta_.days > 365:
self.download_entsoe(countries, proc_cap,
start_, delta=delta_, times=1)
# timeseries
ts_procs = [client.query_day_ahead_prices,
client.query_load,
client.query_load_forecast,
client.query_generation_forecast,
client.query_wind_and_solar_forecast,
client.query_generation]
# Download load and generation
# hier könnte man parallelisieren
for proc in ts_procs:
start_, delta_ = self.get_latest_crawled_timestamp(start, delta, proc.__name__)
self.download_entsoe(countries, proc, start_, delta_, times=1)
self.pull_crossborders(start, delta, 1, client.query_crossborder_flows)
plant_countries = self.countries_with_plant_data(client)
self.download_entsoe_plant_data(
plant_countries[:], client, start, delta, times=1)
log.info(f'****** finished updating ENTSO-E *******')
[docs]
def create_database(self, client, start, delta, countries=[]):
"""
Parameters
----------
client : entsoe.EntsoePandasClient
param start:
delta :
param countries: (Default value = [])
start :
param countries: (Default value = [])
countries :
(Default value = [])
Returns
-------
"""
self.init_base_sql()
self.save_power_system_data()
self.download_entsoe(countries, client.query_installed_generation_capacity_per_unit,
start, delta=delta, times=1)
if __name__ == "__main__":
log.info('ENTSOE')
client = EntsoePandasClient(api_key='ae2ed060-c25c-4eea-8ae4-007712f95375')
start = pd.Timestamp('20150101', tz='Europe/Berlin')
delta = timedelta(days=30)
end = start+delta
times = 7*12 # bis 2022
db = 'postgresql://entso:entso@localhost:5432/entsoe'
#db = 'data/entsoe.db'
crawler = EntsoeCrawler(database=db)
procs = [client.query_day_ahead_prices,
client.query_net_position,
client.query_load,
client.query_load_forecast,
client.query_generation_forecast,
client.query_wind_and_solar_forecast,
client.query_generation]
times = 1
# Download load and generation
for proc in procs:
# hier könnte man parallelisieren
crawler.download_entsoe(countries, proc, start, delta, times)
# Capacities
procs = [client.query_installed_generation_capacity,
client.query_installed_generation_capacity_per_unit]
# crawler.pull_crossborders(start,delta,1,client.query_crossborder_flows)
# per plant generation
crawler.countries_with_plant_data(client, all_countries)
#db = 'data/entsoe.db'
crawler = EntsoeCrawler(database=db)
# 2017-12-16 bis 2018-03-15 runterladen
crawler.download_entsoe_plant_data(
plant_countries[:], client, start, delta, times)
# create indices if not existing