#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Nov 13 12:04:45 2020
@author: maurer
"""
import requests
import urllib
import time
from datetime import date, timedelta
import pandas as pd
from tqdm import tqdm
import sqlite3
from contextlib import closing
from sqlalchemy import create_engine
from contextlib import contextmanager
import logging
logging.basicConfig()
log = logging.getLogger('entsog')
log.setLevel(logging.INFO)
api_endpoint = 'https://transparency.entsog.eu/api/v1/'
fr = date(2020, 5, 18)
to = date.today()
'''
data = pd.read_csv(
f'{api_endpoint}operationaldata.csv?limit=1000&indicator=Allocation&from={fr}&to={to}')
response = requests.get(api_endpoint+'operationaldatas')
data = pd.read_csv(api_endpoint+'AggregatedData.csv?limit=1000')
response = requests.get(api_endpoint+'AggregatedData?limit=1000')
data = pd.DataFrame(response.json()['AggregatedData'])
'''
[docs]
def getDataFrame(name, params=['limit=10000'], useJson=False):
params_str = ''
if len(params) > 0:
params_str = '?'
for param in params[:-1]:
params_str = params_str+param+'&'
params_str += params[-1]
i = 0
data = pd.DataFrame()
success = False
while i < 10 and not success:
try:
i += 1
if useJson:
url = f'{api_endpoint}{name}.json{params_str}'
response = requests.get(url)
data = pd.DataFrame(response.json()[name])
# replace empty string with None
data = data.replace([''], [None])
else:
url = f'{api_endpoint}{name}.csv{params_str}'
data = pd.read_csv(url, index_col=False)
success = True
except requests.exceptions.InvalidURL as e:
raise e
except requests.exceptions.HTTPError as e:
log.error('Error getting Dataframe')
if e.response.status_code >= 500 :
log.info(f'{e.response.reason} - waiting 30 seconds..')
time.sleep(30)
except urllib.error.HTTPError as e:
log.error(f'Error getting Dataframe')
if e.code >= 500 :
log.info(f'{e.msg} - waiting 30 seconds..')
time.sleep(30)
if data.empty:
raise Exception('could not get any data for params:', params_str)
data.columns = [x.lower() for x in data.columns]
return data
[docs]
class EntsogCrawler:
def __init__(self, database):
if database.startswith('postgresql'):
self.engine = create_engine(database)
@contextmanager
def access_db():
with self.engine.connect() as conn, conn.begin():
yield conn
self.db_accessor = access_db
else:
self.db_accessor = lambda: closing(sqlite3.connect(database))
[docs]
def pullData(self, names):
pbar = tqdm(names)
for name in pbar:
try:
pbar.set_description(name)
# use Json as connectionpoints have weird csv
# TODO Json somehow has different data
# connectionpoints count differ
# and tpTSO column are named tSO in connpointdirections
data = getDataFrame(name, useJson=True)
with self.db_accessor() as conn:
tbl_name = name.lower().replace(' ','_')
data.to_sql(tbl_name, conn, if_exists='replace')
except Exception:
log.exception('error pulling data')
if 'operatorpointdirections' in names:
with self.db_accessor() as conn:
query = (
'CREATE INDEX IF NOT EXISTS "idx_opd" ON operatorpointdirections (operatorKey, pointKey,directionkey);')
conn.execute(query)
[docs]
def findNewBegin(self, table_name):
try:
with self.db_accessor() as conn:
query = f'select max(periodfrom) from {table_name}'
d = conn.execute(query).fetchone()[0]
begin = pd.to_datetime(d).date()
except Exception as e:
begin = date(2017, 7, 10)
log.error(f'table does not exist yet - using default start {begin} ({e})')
return begin
[docs]
def pullOperationalData(self, indicators, initial_begin=None, end=None):
log.info('getting values from operationaldata')
if not end:
end = date.today()
for indicator in indicators:
tbl_name = indicator.lower().replace(' ','_')
if initial_begin:
begin = initial_begin
else:
begin = self.findNewBegin(tbl_name)
bulks = (end-begin).days
log.info(f'start: {begin}, end: {end}, days: {bulks}, indicator: {indicator}')
if bulks < 1:
return
delta = timedelta(days=1)
pbar = tqdm(range(int(bulks)))
for i in pbar:
beg1 = begin + i*delta
end1 = begin + (i+1)*delta
pbar.set_description(f'op {beg1} to {end1}')
params = ['limit=-1', 'indicator='+urllib.parse.quote(indicator), 'from=' +
str(beg1), 'to='+str(end1), 'periodType=hour']
time.sleep(5)
# impact of sleeping here is quite small in comparison to 50s query length
# rate limiting Gateway Timeouts
df = getDataFrame('operationaldata', params)
df['periodfrom'] = pd.to_datetime(df['periodfrom'], infer_datetime_format=True)
df['periodto'] = pd.to_datetime(df['periodto'], infer_datetime_format=True)
try:
with self.db_accessor() as conn:
df.to_sql(tbl_name, conn, if_exists='append')
except Exception as e:
# allow adding a new column or converting type
with self.db_accessor() as conn:
log.info(f'handling {repr(e)} by concat')
# merge old data with new data
prev = pd.read_sql_query(
f'select * from {tbl_name}', conn)
dat = pd.concat([prev, df])
# convert type as pandas needs it
dat.to_sql(tbl_name, conn, if_exists='replace')
log.info(f'replaced table {tbl_name}')
try:
with self.db_accessor() as conn:
query_create_hypertable = f"SELECT create_hypertable('{tbl_name}', 'periodfrom', if_not_exists => TRUE, migrate_data => TRUE);"
conn.execute(query_create_hypertable)
log.info(f'created hypertable {tbl_name}')
except Exception as e:
log.error(f'could not create hypertable {tbl_name}: {e}')
# sqlite will only use one index. EXPLAIN QUERY PLAIN shows if index is used
# ref: https://www.sqlite.org/optoverview.html#or_optimizations
# reference https://stackoverflow.com/questions/31031561/sqlite-query-to-get-the-closest-datetime
if 'Allocation' in indicators:
with self.db_accessor() as conn:
query = (
'CREATE INDEX IF NOT EXISTS "idx_opdata" ON Allocation (operatorKey,periodfrom);')
conn.execute(query)
query = (
'CREATE INDEX IF NOT EXISTS "idx_pointKey" ON Allocation (pointKey,periodfrom);')
conn.execute(query)
if 'Physical Flow' in indicators:
with self.db_accessor() as conn:
query = (
'CREATE INDEX IF NOT EXISTS "idx_phys_operator" ON Physical_Flow (operatorKey,periodfrom);')
conn.execute(query)
query = (
'CREATE INDEX IF NOT EXISTS "idx_phys_point" ON Physical_Flow (pointKey,periodfrom);')
conn.execute(query)
if 'Firm Technical' in indicators:
with self.db_accessor() as conn:
query = (
'CREATE INDEX IF NOT EXISTS "idx_ft_opdata" ON Firm_Technical (operatorKey,periodfrom);')
conn.execute(query)
query = (
'CREATE INDEX IF NOT EXISTS "idx_ft_pointKey" ON Firm_Technical (pointKey,periodfrom);')
conn.execute(query)
if __name__ == "__main__":
database = 'data/entsog.db'
import os
craw = EntsogCrawler(database)
names = ['cmpUnsuccessfulRequests',
# 'operationaldata',
# 'cmpUnavailables',
# 'cmpAuctions',
# 'AggregatedData', # operationaldata aggregated for each zone
# 'tariffssimulations',
# 'tariffsfulls',
# 'urgentmarketmessages',
'connectionpoints',
'operators',
'balancingzones',
'operatorpointdirections',
'Interconnections',
'aggregateInterconnections']
craw.pullData(names)
indicators = ['Physical Flow', 'Allocation', 'Firm Technical']
craw.pullOperationalData(indicators)