Source code for entsog_crawler

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
Created on Fri Nov 13 12:04:45 2020

@author: maurer

import requests
import urllib

import time
from datetime import date, timedelta
import pandas as pd
from tqdm import tqdm

import sqlite3
from contextlib import closing

from sqlalchemy import create_engine, text
from contextlib import contextmanager

import logging

log = logging.getLogger('entsog')

api_endpoint = ''

fr = date(2020, 5, 18)
to =

data = pd.read_csv(
response = requests.get(api_endpoint+'operationaldatas')
data = pd.read_csv(api_endpoint+'AggregatedData.csv?limit=1000')
response = requests.get(api_endpoint+'AggregatedData?limit=1000')
data = pd.DataFrame(response.json()['AggregatedData'])

[docs] def getDataFrame(name, params=['limit=10000'], useJson=False): params_str = '' if len(params) > 0: params_str = '?' for param in params[:-1]: params_str = params_str+param+'&' params_str += params[-1] i = 0 data = pd.DataFrame() success = False while i < 10 and not success: try: i += 1 if useJson: url = f'{api_endpoint}{name}.json{params_str}' response = requests.get(url) data = pd.DataFrame(response.json()[name]) # replace empty string with None data = data.replace([''], [None]) else: url = f'{api_endpoint}{name}.csv{params_str}' data = pd.read_csv(url, index_col=False) success = True except requests.exceptions.InvalidURL as e: raise e except requests.exceptions.HTTPError as e: log.error('Error getting Dataframe') if e.response.status_code >= 500 :'{e.response.reason} - waiting 30 seconds..') time.sleep(30) except urllib.error.HTTPError as e: log.error(f'Error getting Dataframe') if e.code >= 500 :'{e.msg} - waiting 30 seconds..') time.sleep(30) if data.empty: raise Exception('could not get any data for params:', params_str) data.columns = [x.lower() for x in data.columns] return data
[docs] class EntsogCrawler: def __init__(self, database): if database.startswith('postgresql'): self.engine = create_engine(database) @contextmanager def access_db(): with self.engine.connect() as conn, conn.begin(): yield conn self.db_accessor = access_db else: self.db_accessor = lambda: closing(sqlite3.connect(database))
[docs] def pullData(self, names): pbar = tqdm(names) for name in pbar: try: pbar.set_description(name) # use Json as connectionpoints have weird csv # TODO Json somehow has different data # connectionpoints count differ # and tpTSO column are named tSO in connpointdirections data = getDataFrame(name, useJson=True) with self.db_accessor() as conn: tbl_name = name.lower().replace(' ','_') data.to_sql(tbl_name, conn, if_exists='replace') except Exception: log.exception('error pulling data') if 'operatorpointdirections' in names: with self.db_accessor() as conn: query = ( 'CREATE INDEX IF NOT EXISTS "idx_opd" ON operatorpointdirections (operatorKey, pointKey,directionkey);') conn.execute(text(query))
[docs] def findNewBegin(self, table_name): try: with self.db_accessor() as conn: query = f'select max(periodfrom) from {table_name}' d = conn.execute(text(query)).fetchone()[0] begin = pd.to_datetime(d).date() except Exception as e: begin = date(2017, 7, 10) log.error(f'table does not exist yet - using default start {begin} ({e})') return begin
[docs] def pullOperationalData(self, indicators, initial_begin=None, end=None):'getting values from operationaldata') if not end: end = for indicator in indicators: tbl_name = indicator.lower().replace(' ','_') if initial_begin: begin = initial_begin else: begin = self.findNewBegin(tbl_name) bulks = (end-begin).days'start: {begin}, end: {end}, days: {bulks}, indicator: {indicator}') if bulks < 1: return delta = timedelta(days=1) pbar = tqdm(range(int(bulks))) for i in pbar: beg1 = begin + i*delta end1 = begin + (i+1)*delta pbar.set_description(f'op {beg1} to {end1}') params = ['limit=-1', 'indicator='+urllib.parse.quote(indicator), 'from=' + str(beg1), 'to='+str(end1), 'periodType=hour'] time.sleep(5) # impact of sleeping here is quite small in comparison to 50s query length # rate limiting Gateway Timeouts df = getDataFrame('operationaldata', params) df['periodfrom'] = pd.to_datetime(df['periodfrom']) df['periodto'] = pd.to_datetime(df['periodto']) try: with self.db_accessor() as conn: df.to_sql(tbl_name, conn, if_exists='append') except Exception as e: # allow adding a new column or converting type with self.db_accessor() as conn:'handling {repr(e)} by concat') # merge old data with new data prev = pd.read_sql_query( f'select * from {tbl_name}', conn) dat = pd.concat([prev, df]) # convert type as pandas needs it dat.to_sql(tbl_name, conn, if_exists='replace')'replaced table {tbl_name}') try: with self.db_accessor() as conn: query_create_hypertable = f"SELECT public.create_hypertable('{tbl_name}', 'periodfrom', if_not_exists => TRUE, migrate_data => TRUE);" conn.execute(text(query_create_hypertable))'created hypertable {tbl_name}') except Exception as e: log.error(f'could not create hypertable {tbl_name}: {e}') # sqlite will only use one index. EXPLAIN QUERY PLAIN shows if index is used # ref: # reference if 'Allocation' in indicators: with self.db_accessor() as conn: query = ( 'CREATE INDEX IF NOT EXISTS "idx_opdata" ON Allocation (operatorKey,periodfrom);') conn.execute(text(query)) query = ( 'CREATE INDEX IF NOT EXISTS "idx_pointKey" ON Allocation (pointKey,periodfrom);') conn.execute(text(query)) if 'Physical Flow' in indicators: with self.db_accessor() as conn: query = ( 'CREATE INDEX IF NOT EXISTS "idx_phys_operator" ON Physical_Flow (operatorKey,periodfrom);') conn.execute(text(query)) query = ( 'CREATE INDEX IF NOT EXISTS "idx_phys_point" ON Physical_Flow (pointKey,periodfrom);') conn.execute(text(query)) if 'Firm Technical' in indicators: with self.db_accessor() as conn: query = ( 'CREATE INDEX IF NOT EXISTS "idx_ft_opdata" ON Firm_Technical (operatorKey,periodfrom);') conn.execute(text(query)) query = ( 'CREATE INDEX IF NOT EXISTS "idx_ft_pointKey" ON Firm_Technical (pointKey,periodfrom);') conn.execute(text(query))
if __name__ == "__main__": database = 'data/entsog.db' import os craw = EntsogCrawler(database) names = ['cmpUnsuccessfulRequests', # 'operationaldata', # 'cmpUnavailables', # 'cmpAuctions', # 'AggregatedData', # operationaldata aggregated for each zone # 'tariffssimulations', # 'tariffsfulls', # 'urgentmarketmessages', 'connectionpoints', 'operators', 'balancingzones', 'operatorpointdirections', 'Interconnections', 'aggregateInterconnections'] craw.pullData(names) indicators = ['Physical Flow', 'Allocation', 'Firm Technical'] craw.pullOperationalData(indicators)