Source code for CIM2Matpower

# utf8
"""
This module contains the core functionality of the CIM to Matpower import tool.

.. note::This module is based partially on the started but never finished open source project Cim2BusBranch https://www.versioneye.com/Python/Cim2BusBranch/0.1

.. seealso::  :py:mod:`Topology_NodeBreaker` and :py:mod:`CIM2Matpower`

:Date: 2016-05-10
:Authors: Konstantin Gerasimov
:e-mail: kkgerasimov@gmail.com
:Credits: This function is created for KU-Leuven as part of the GARPUR project http://www.garpur-project.eu
"""


import math
from datetime import datetime
from itertools import count
from sys import stdout

from PyCIM import RDFXMLReader

import Topology_NodeBreaker
import Topology_BusBranch
from PreProcess_CIM_files import fix_RTE_cim_files
from PostProcess_mpc import create_mpc


import logging
logger = logging.getLogger(__name__)


[docs]def cim_to_mpc(cimfiles, boundary_profiles=[], log_filename=''): """ The main function transforming the CIM XML files into Matpower case structure (mpc) in BusBranch topology, together with an additional structure for the NodeBreaker topology data which cannot be fit in the mpc. .. warning:: The *cimfiles* and *boundary_profiles* must not contain Unidoce characters! Otherwise the `PyCIM` module fails! :param cimfiles: the filenames of the CIM files to be imported (preferably with full path) :type cimfiles: list of strings .. warning:: The *cimfiles* usually come in three files: _EQ, _TP and _SV. If so, they need to be provided in the specified order! :param boundary_profiles: (optional) the filenames of the boundary files to be imported (preferably with full path) :type boundary_profiles: list of strings .. warning:: The *boundary_profiles* usually come in two files: _EQ and _TP. If so, they need to be provided in the specified order! :param log_filename: (optional) the filename in which a log should be saved (preferably with full path) :type log_filename: string :return mpc: Matpower case, together with information about the NodeBreaker topology :type mpc: dictionary .. warning:: In order to be used in Matlab, it still needs to be transformed from Python dictionaries and array to Matlab structure and matrices. .. seealso:: :py:mod:`Topology_NodeBreaker` """ # Initialize logging logging.getLogger('').setLevel(logging.INFO) if type(log_filename) == str and log_filename != '': file_logger_handler = logging.FileHandler(filename=log_filename, mode='w') # file_logger_handler.setLevel(logging.DEBUG) file_logger_formatter = logging.Formatter('%(levelname)s : %(name)s : %(message)s') file_logger_handler.setFormatter(file_logger_formatter) logging.getLogger('').addHandler(file_logger_handler) # attaches the file_handler to the root logger else: file_logger_handler = None pass console_logger_handler = logging.StreamHandler(stdout) # console_logger_handler.setLevel(logging.DEBUG) stdout_logger_formatter = logging.Formatter('%(levelname)s: %(message)s') console_logger_handler.setFormatter(stdout_logger_formatter) logging.getLogger('').addHandler(console_logger_handler) # attaches the file_handler to the root logger logger.info('CIM to Matpower import STARTED at %s \n' % str(datetime.now())) # Check the RTE CIM files for compliance with PyCIM package class definitions fix_RTE_cim_files(cimfiles) try: # NB: The ENTSO-E boundary files have to be listed first (before the CIM of the actual network) in order to be imported correctly !!! # NB: The CIM files have to be loaded in the following order: EQ, TP and then SV, in order for the RDFXMLReader to work correctly !!! importer = CIM2Matpower(boundary_profiles + cimfiles) bus_branch_model, node_breaker_topology = importer.import_cim() mpc = create_mpc(bus_branch_model, node_breaker_topology) except: logger.error('Some error occured during the CIM import.\nAre you sure you are importing CIMv14 ENTSO-E profile and that you are providing all the CIM files (_EQ, _TP, _SV) and boundary files (_EQ, _TP). All the files must end with the corresponding suffix and must not contain Unicode characters! \n') logging.getLogger('').removeHandler(console_logger_handler) if file_logger_handler is not None: logging.getLogger('').removeHandler(file_logger_handler) logging.shutdown() raise logger.info('CIM to Matpower import ENDED at %s \n' %str(datetime.now())) # close the logging files after the function is finished logging.getLogger('').removeHandler(console_logger_handler) if file_logger_handler is not None: logging.getLogger('').removeHandler(file_logger_handler) logging.shutdown() return mpc
[docs]class CIM2Matpower(object): """ The *CIM2Matpower* transforms a CIM topology in *cim_file* into a bus/branch topology. *cim_file* is the path to an XML/RDF encoded CIM topology. """ # Create a topological node (bus) for each one-terminal primary equipment _primary_one_t = [ 'TransformerWinding', 'EnergySource', 'EnergyConsumer', 'Connector', # BusbarSection, Junction 'RegulatingCondEq' # ShuntCompensator, StaticVarCompensator, FrequencyConverter, SynchronousMachine ] # Create a branch for each two-terminal primary equipment _primary_two_t = [ 'Conductor', # DC-/ACLineSegment 'SeriesCompensator', 'PowerTransformer' ] # One-terminal secondary equipment can safely be ignored _secondary_one_t = [ 'Ground', ] # Search two-terminal for neighboring connectivity nodes for amalgamation _secondary_two_t = [ 'RectifierInverter', 'Switch', # Jumper, Fuse, Disconnector, GroundDisconnector, Breaker, LoadBreakSwitch ] #################################################################################################################### def __init__(self, cim_files): if type(cim_files) == str: cim_files = [cim_files] self.cim_files = cim_files xmlns = RDFXMLReader.xmlns(cim_files[0]) self._package_map = RDFXMLReader.get_cim_ns(xmlns)[1] self._prim_onet = self._get_equipment_cls(CIM2Matpower._primary_one_t) self._prim_twot = self._get_equipment_cls(CIM2Matpower._primary_two_t) self._sec_onet = self._get_equipment_cls(CIM2Matpower._secondary_one_t) self._sec_twot = self._get_equipment_cls(CIM2Matpower._secondary_two_t) # reinitilize the automatically created ids Topology_BusBranch.Bus.id_generator = count(1) Topology_NodeBreaker.Node.id_generator = count(1) Topology_NodeBreaker.Area.id_generator = count(1) Topology_NodeBreaker.Zone.id_generator = count(1) Topology_NodeBreaker.Branch.id_generator = count(1) Topology_NodeBreaker.Generator.id_generator = count(1) ####################################################################################################################
[docs] def import_cim(self): """ Actually parses the CIM file and transforms its contents. A :class:`Topology_BusBranch.Case` object is returned. """ cim_objects = {} for cim_file in self.cim_files: cim_objects = RDFXMLReader.cimread(cim_file, start_dict=cim_objects) base_power, tnodes = self._iterate_prim_eq(cim_objects) return self._create_bb_topology(base_power, tnodes), self.node_breaker_data
####################################################################################################################
[docs] def _create_bb_topology(self, BasePower_MVA, tnodes): """Creates the :class:`Topology_BusBranch.Case`""" buses, generators, p_transformers, ac_lines = self._create_buses_and_gens(tnodes) branches = self._create_branches(ac_lines, p_transformers, BasePower_MVA, buses) # replace tnode dict by list of buses for i, gen in enumerate(generators): generators[i].bus = buses[gen.bus] for i, branch in enumerate(branches): branches[i].from_bus = buses[branch.from_bus] branches[i].to_bus = buses[branch.to_bus] buses = list(set(buses.values())) return Topology_BusBranch.Case(BasePower_MVA, buses, generators, branches)
####################################################################################################################
[docs] def _find_tnode(self, terminal): try: tnode = terminal.ConnectivityNode.TopologicalNode except AttributeError: # if no ConnectivityNode, try the TopologicalNode directly tnode = terminal.TopologicalNode return tnode
####################################################################################################################
[docs] def _get_acline_mva_lim(self, acline): limit_class = 'CurrentLimit' limits_mva = [0, 0, 0] num_matpower_limits = 3 try: limits_from_mva = [] for lim in acline.Terminals[0].OperationalLimitSet[0].OperationalLimitValue: if lim.__class__.__name__ == limit_class: limits_from_mva.append(lim.value * acline.Terminals[0].TopologicalNode.SvVoltage.v * math.sqrt(3) / 1e3) if limits_from_mva: limits_from_mva.sort() if len(limits_from_mva) < num_matpower_limits: limits_from_mva.extend([0] * (num_matpower_limits - len(limits_from_mva))) if not limits_from_mva: limits_from_mva = [0, 0, 0] except: limits_from_mva = [0, 0, 0] try: limits_to_mva = [] for lim in acline.Terminals[1].OperationalLimitSet[0].OperationalLimitValue: if lim.__class__.__name__ == limit_class: limits_to_mva.append(lim.value * acline.Terminals[1].TopologicalNode.SvVoltage.v * math.sqrt(3) / 1e3) if limits_to_mva: limits_to_mva.sort() if len(limits_to_mva) < num_matpower_limits: limits_to_mva.extend([0] * (num_matpower_limits - len(limits_to_mva))) if not limits_to_mva: limits_to_mva = [0, 0, 0] except: limits_to_mva = [0, 0, 0] for i in range(0,num_matpower_limits): if limits_from_mva[i] < limits_to_mva[i]: limits_mva[i] = limits_from_mva[i] else: limits_mva[i] = limits_to_mva[i] return limits_mva
####################################################################################################################
[docs] def _create_branches(self, lines, p_transformers, BasePower_MVA, buses): """ Creates list of :class:`Topology_BusBranch.Branch` -es Create list of :class:`Topology_NodeBreaker.Branch`, :class:`Topology_NodeBreaker.PhaseTapChanger` and :class:`Topology_NodeBreaker.RatioTapChanger` -s """ logger.info('START creating branches') branches = [] for line in lines: status_connected = True for term in line.Terminals: if not term.connected: status_connected = False from_bus = self._find_tnode(line.Terminals[0]) to_bus = self._find_tnode(line.Terminals[1]) if not from_bus: logging_message = 'Line %s: terminal _T0 is None and because of that the line is not imported!' % line.UUID logger.warn(logging_message) if from_bus not in buses: tisland_name = line.Terminals[0].TopologicalNode.TopologicalIsland.UUID if status_connected: # ToDo: Maybe this should raise an error! logging_message = 'Line %s (switched ON): the line is switched ON but terminal _T1 belongs to another island (%s) and because of that the line is not imported!' %(line.UUID, tisland_name) logger.error(logging_message) else: logger.warn('Line %s (switched off): terminal %s is connected to another island (%s) and because of that the line is not imported!' %(line.UUID, line.Terminals[0].UUID, tisland_name)) continue if not to_bus: logging_message = 'Line %s: terminal _T1 is None and because of that the line is not imported!' % line.UUID logger.warn(logging_message) if to_bus not in buses: tisland_name = line.Terminals[1].TopologicalNode.TopologicalIsland.UUID if status_connected: # ToDo: Maybe this should be an error! logging_message = 'Line %s (switched ON): the line is switched ON but terminal _T1 belongs to another island (%s) and because of that the line is not imported!' %(line.UUID, tisland_name) logger.error(logging_message) else: logging_message = 'Line %s (switched off): terminal %s is connected to another island (%s) and because of that the line is not imported!' %(line.UUID, line.Terminals[1].UUID, tisland_name) logger.warn(logging_message) continue if buses[from_bus].base_kv == buses[to_bus].base_kv: BaseVoltage_kV = buses[from_bus].base_kv else: logging_message = 'Line %s: base voltages different at the line terminals!' % line.UUID logger.critical(logging_message) raise ValueError(logging_message) # Ref. values for p.u. conversion z_base = BaseVoltage_kV ** 2 / BasePower_MVA # Ohm y_base = 1.0 / z_base # Ohm^-1 limits_mva = self._get_acline_mva_lim(line) branches.append(Topology_BusBranch.Branch( from_bus, to_bus, name=line.UUID, r=(line.r / z_base), x=(line.x / z_base), b=(line.bch / y_base), rate_a=limits_mva[0], rate_b=limits_mva[1], rate_c=limits_mva[2], online=status_connected )) self.node_breaker_data.branches.append( Topology_NodeBreaker.Branch(node_from_id=self.cim_nodes[from_bus].id, node_to_id=self.cim_nodes[to_bus].id, id_cim=line.UUID, name=line.name, status_from=line.Terminals[0].connected, status_to=line.Terminals[1].connected)) for pt in p_transformers: status_connected = True if hasattr(pt, 'TransformerWindings'): for tw in pt.TransformerWindings: if not tw.Terminals[0].connected: status_connected = False # CIMv14 if len(pt.TransformerWindings) == 2: # two-winding transformer if pt.TransformerWindings[0].windingType == 'primary': index_primary = 0 index_secondary = 1 else: index_primary = 1 index_secondary = 0 to_bus_primary = self._find_tnode(pt.TransformerWindings[index_primary].Terminals[0]) if not to_bus_primary: logging_message = 'Transformer %s: the TNode of terminal of primary winding is None and because of that the transformer is not imported!' % pt.UUID logger.warn(logging_message) if to_bus_primary not in buses: tisland_name = pt.TransformerWindings[index_primary].Terminals[ 0].TopologicalNode.TopologicalIsland.UUID if status_connected: logging_message = 'Transformer %s (switched ON): the transformer is switched ON but terminal %s belongs to another island (%s) and because of that the line is not imported!' %(pt.UUID,pt.TransformerWindings[index_primary].Terminals[0].UUID, tisland_name) logger.error(logging_message) # NOTE: Maybe we should raise an error for this case else: logging_message = 'Transformer %s (switched off): terminal %s is connected to another island (%s) and because of that the transformer is not imported!' %(pt.UUID, pt.TransformerWindings[index_primary].Terminals[0].UUID, tisland_name) logger.warn(logging_message) continue from_bus_secondary = self._find_tnode(pt.TransformerWindings[index_secondary].Terminals[0]) if not from_bus_secondary: logging_message = 'Transformer %s: the TNode of terminal of secondary winding is None and because of that the transformer is not imported!' % pt.UUID logger.warn(logging_message) if from_bus_secondary not in buses: tisland_name = pt.TransformerWindings[index_secondary].Terminals[ 0].TopologicalNode.TopologicalIsland.UUID if status_connected: logging_message = 'Transformer %s (switched ON): the transformer is switched ON but terminal %s belongs to another island (%s) and because of that the line is not imported!' %(pt.UUID,pt.TransformerWindings[index_secondary].Terminals[0].UUID, tisland_name) logger.error(logging_message) # NOTE: Maybe we should raise an error for this case else: logging_message = 'Transformer %s (switched off): terminal %s is connected to another island (%s) and because of that the transformer is not imported!' %(pt.UUID, pt.TransformerWindings[index_secondary].Terminals[0].UUID, tisland_name) logger.warn(logging_message) continue pt_ratio = (pt.TransformerWindings[index_secondary].ratedU / buses[from_bus_secondary].base_kv) / ( pt.TransformerWindings[index_primary].ratedU / buses[to_bus_primary].base_kv) z_base_primary = buses[to_bus_primary].base_kv ** 2 / BasePower_MVA # Ohm z_base_secondary = buses[from_bus_secondary].base_kv ** 2 / BasePower_MVA # Ohm if pt.TransformerWindings[index_secondary].r or pt.TransformerWindings[index_secondary].x or \ pt.TransformerWindings[index_secondary].b: logging_message = 'The two-windings transformer %s has impedance and/or admittance on its secondary winding!' % pt.UUID logger.warn(logging_message) pt_r_pu = pt.TransformerWindings[index_primary].r / z_base_primary pt_x_pu = pt.TransformerWindings[index_primary].x / z_base_primary pt_b_pu = -pt.TransformerWindings[ index_primary].b * z_base_primary # KKG: ToDo: Shouldn't b be with minus? # PHASE tap changers ptc_ratio = 1 ptc_angle_deg = 0 ptc_x_pu = 0 ptc = pt.TransformerWindings[index_primary].PhaseTapChanger or pt.TransformerWindings[ index_secondary].PhaseTapChanger if pt.TransformerWindings[index_primary].PhaseTapChanger and pt.TransformerWindings[ index_secondary].PhaseTapChanger: logging_message = 'The two-windings transformer %s has PhaseTapChanger on its primary winding!' % pt.UUID logger.warn(logging_message) if ptc: # The implementation of the PhaseTapChanger is based on the CIM1Impowerter.txt from the iTesla project neutral_step = ptc.neutralStep current_step = ptc.SvTapStep.continuousPosition du0 = ptc.neutralU / ptc.TransformerWinding.ratedU if math.fabs(du0) > 0.5: du0 = 0 if ptc.voltageStepIncrementOutOfPhase and ptc.voltageStepIncrementOutOfPhase != 0: # the original Java code is: # du = (config.isInvertVoltageStepIncrementOutOfPhase() ? -1 : 1) * ptc.getVoltageStepIncrementOutOfPhase() / ptc.getTransformerWinding().getRatedU(); du = -1 * ptc.voltageStepIncrementOutOfPhase / ptc.TransformerWinding.ratedU elif ptc.stepVoltageIncrement and ptc.stepVoltageIncrement != 0: du = ptc.stepVoltageIncrement / 100 else: logging_message = 'PhaseTapChanger %s of power transformer %s does not have a valid value for voltageStepIncrementOutOfPhase or stepVoltageIncrement attribute, so by default it is set to 1!' %(ptc.UUID, pt.UUID) logger.warn(logging_message) du = 1 / 100 if ptc.windingConnectionAngle: theta_rad = math.radians(ptc.windingConnectionAngle) else: logging_message = 'PhaseTapChanger %s of power transformer %s does not have windingConnectionAngle attribute, so by default it is set to 1!' %(ptc.UUID, pt.UUID) logger.warn(logging_message) theta_rad = math.pi / 2 if ptc.xStepMin < 0 or ptc.xStepMax <= 0 or ptc.xStepMin > ptc.xStepMax: xStepRangeIsInconsistent = True logging_message = 'xStepMin and xStepMax of PhaseTapChanger %s of PowerTransformer %s are inconsistents!' %(ptc.UUID, pt.UUID) logger.warn(logging_message) else: xStepRangeIsInconsistent = False step_num_list = [] x_pu_list = [] angle_shift_deg_list = [] voltage_ratio_list = [] if ptc.phaseTapChangerType == 'symmetrical': if ptc.stepPhaseShiftIncrement and ptc.stepPhaseShiftIncrement != 0: for step in range(ptc.lowStep, ptc.highStep+1): step_num_list.append(step) n = step - neutral_step voltage_ratio_list.append(1) angle_shift_rad=-1 * n * math.radians(ptc.stepPhaseShiftIncrement) angle_shift_deg_list.append(math.degrees(angle_shift_rad)) # the original Java code is: # float alpha = n * (float) Math.toRadians((config.isInvertVoltageStepIncrementOutOfPhase() ? -1 : 1) * ptc.stepPhaseShiftIncrement); else: for step in range(ptc.lowStep, ptc.highStep+1): step_num_list.append(step) n = step - neutral_step voltage_ratio_list.append(1) dy = (n * du / 2 - du0) * math.sin(theta_rad) angle_shift_rad=2 * math.asin(dy) angle_shift_deg_list.append(math.degrees(angle_shift_rad)) elif ptc.phaseTapChangerType == 'asymmetrical': for step in range(ptc.lowStep, ptc.highStep+1): step_num_list.append(step) n = step - neutral_step dx = (n * du - du0) * math.cos(theta_rad) dy = (n * du - du0) * math.sin(theta_rad) angle_shift_rad = math.atan2(dy, 1 + dx) angle_shift_deg_list.append(math.degrees(angle_shift_rad)) voltage_ratio_list.append(1 / math.hypot(dy, 1 + dx)) else: logger.critical('Unknown phaseTapChangerType!') raise Exception('Unknown phaseTapChangerType!'), ptc.phaseTapChangerType angle_max_rad = math.radians(max(angle_shift_deg_list)) for angle_deg in angle_shift_deg_list: angle_rad = math.radians(angle_deg) if xStepRangeIsInconsistent or angle_max_rad == 0: x_ohms = pt.TransformerWindings[index_primary].x else: if ptc.phaseTapChangerType == 'symmetrical': x_ohms = ptc.xStepMin + (ptc.xStepMax - ptc.xStepMin) * math.pow( math.sin(angle_rad / 2) / math.sin(angle_max_rad / 2), 2) elif ptc.phaseTapChangerType == 'asymmetrical': numer = math.sin(theta_rad) - math.tan(angle_max_rad) * math.cos(theta_rad) denom = math.sin(theta_rad) - math.tan(angle_rad) * math.cos(theta_rad) x_ohms = ptc.xStepMin + (ptc.xStepMax - ptc.xStepMin) * math.pow( math.tan(angle_rad) / math.tan(angle_max_rad) * numer / denom, 2) x_pu_list.append(x_ohms / z_base_secondary) current_step_index = step_num_list.index(current_step) ptc_x_pu = x_pu_list[current_step_index] ptc_ratio = voltage_ratio_list[current_step_index] ptc_angle_deg = angle_shift_deg_list[current_step_index] # Ratio tap changer rtc = None rtc_ratio = 1 if pt.TransformerWindings[index_secondary].RatioTapChanger: rtc = pt.TransformerWindings[index_secondary].RatioTapChanger if rtc: rtc_ratio = 1 + (rtc.SvTapStep.continuousPosition - rtc.neutralStep) * ( rtc.stepVoltageIncrement / 100) if pt.TransformerWindings[index_primary].RatioTapChanger: if pt.TransformerWindings[index_secondary].RatioTapChanger: logging_message = 'The two-windings transformer %s has RatioTapChnager on both windings!' % pt.UUID logger.warn(logging_message) else: rtc = pt.TransformerWindings[index_secondary].RatioTapChanger if rtc: rtc_ratio = 1 / (1 + (rtc.SvTapStep.continuousPosition - rtc.neutralStep) * ( rtc.stepVoltageIncrement / 100)) # Limits rate_a_mva = pt.TransformerWindings[ index_primary].ratedS rate_b_mva = pt.TransformerWindings[index_primary].shortTermS rate_c_mva = pt.TransformerWindings[index_primary].emergencyS if pt.TransformerWindings[index_primary].Terminals[0].OperationalLimitSet: limits_list = [] for lim in pt.TransformerWindings[index_primary].Terminals[0].OperationalLimitSet[ 0].OperationalLimitValue: if lim.__class__.__name__ == 'CurrentLimit': limits_list.append(lim) # limits_list = pt.TransformerWindings[index_primary].Terminals[0].OperationalLimitSet[ # 0].OperationalLimitValue limits_tnode = self._find_tnode(pt.TransformerWindings[index_primary].Terminals[0]) elif pt.TransformerWindings[index_secondary].Terminals[0].OperationalLimitSet: limits_list = [] for lim in pt.TransformerWindings[index_secondary].Terminals[0].OperationalLimitSet[ 0].OperationalLimitValue: if lim.__class__.__name__ == 'CurrentLimit': limits_list.append(lim) # limits_list = pt.TransformerWindings[index_secondary].Terminals[0].OperationalLimitSet[ # 0].OperationalLimitValue limits_tnode = self._find_tnode(pt.TransformerWindings[index_secondary].Terminals[0]) else: limits_list = None if limits_list: if len(limits_list) > 0: rate_a_mva = limits_list[0].value * limits_tnode.SvVoltage.v * math.sqrt(3) / 1e3 # buses[to_bus_primary].base_kv * math.sqrt(3) / 1e3 if len(limits_list) > 1: rate_b_mva = limits_list[1].value * limits_tnode.SvVoltage.v * math.sqrt(3) / 1e3 if len(limits_list) > 2: rate_c_mva = limits_list[2].value * limits_tnode.SvVoltage.v * math.sqrt(3) / 1e3 elif len(pt.TransformerWindings) == 3: logging_message = 'Three-winding transformer (%s) is NOT implemented yet!' % pt.UUID logger.critical(logging_message) raise NameError(logging_message) else: logging_message = 'Transformer (%s) has wrong number (%d) of windings' %(pt.UUID, len(pt.TransformerWindings)) logger.critical(logging_message) raise NameError(logging_message) else: logging_message = 'Transformer %s in CIM v15 and above is NOT implemented yet!' % pt.UUID logger.critical(logging_message) raise NameError(logging_message) if ptc_x_pu != 0: # This represents RTE CIM export logic for transformers!!!! # They always export the PhaseTapChanger as a separate transformer!!! However the CIMv14 standard suggests a more general situatuion is possible. pt_x_pu = ptc_x_pu branches.append(Topology_BusBranch.Branch( from_bus_secondary, to_bus_primary, name=pt.UUID, r=pt_r_pu, #*((rtc_ratio/pt_ratio)**2), # ToDo: correct the resistence accounting for the non-nominal tap setting - it makes a small change but is part of the ENTSO-E PhaseShifter modelling guideline for CIMv16 x=pt_x_pu, #*((rtc_ratio/pt_ratio)**2), # + ptc_x_pu, # ToDo: correct the transformer winding reactance accounting for the non-nominal tap setting - it makes a small change but is part of the ENTSO-E PhaseShifter modelling guideline for CIMv16. However this is not the case for the RTE CIM export logic (because they treat the PhaseShifter as a separate transformer always). b=pt_b_pu, ratio=pt_ratio * rtc_ratio * ptc_ratio, # NB: This should be the correct way to calculate the exact ratio. However, in theory, the different export tools might have impleneted in a different way... angle=ptc_angle_deg, rate_a=rate_a_mva, rate_b=rate_b_mva, rate_c=rate_c_mva, online=status_connected, )) self.node_breaker_data.branches.append( Topology_NodeBreaker.Branch(node_from_id=self.cim_nodes[from_bus_secondary].id, node_to_id=self.cim_nodes[to_bus_primary].id, id_cim=pt.UUID, name=pt.name, status_from=pt.TransformerWindings[index_secondary].Terminals[0].connected, status_to=pt.TransformerWindings[index_primary].Terminals[0].connected)) if ptc: # PhaseTapChanger self.node_breaker_data.phasetapchangers.append( Topology_NodeBreaker.PhaseTapChanger(branch_id=self.node_breaker_data.branches[-1].id, id_cim=ptc.UUID, step_num_list=step_num_list, x_pu_list=x_pu_list, angle_shift_deg_list=angle_shift_deg_list, continuous_position=ptc.SvTapStep.continuousPosition)) if rtc: # RatioTapChanger if rtc.RegulatingControl: self.node_breaker_data.ratiotapchangers.append( Topology_NodeBreaker.RatioTapChanger(branch_id=self.node_breaker_data.branches[-1].id, id_cim=rtc.UUID, lowStep=rtc.lowStep, highStep=rtc.highStep, neutralU_kV=rtc.neutralU, stepVoltageIncrement_kV=rtc.stepVoltageIncrement, continuousPosition=rtc.SvTapStep.continuousPosition, hasRegulatingControl=True, RC_discrete=rtc.RegulatingControl.discrete, RC_mode=rtc.RegulatingControl.mode, RC_targetRange_kV=rtc.RegulatingControl.targetRange, RC_targetValue_kV=rtc.RegulatingControl.targetValue)) else: self.node_breaker_data.ratiotapchangers.append( Topology_NodeBreaker.RatioTapChanger(branch_id=self.node_breaker_data.branches[-1].id, id_cim=rtc.UUID, lowStep=rtc.lowStep, highStep=rtc.highStep, neutralU_kV=rtc.neutralU, stepVoltageIncrement_kV=rtc.stepVoltageIncrement, continuousPosition=rtc.SvTapStep.continuousPosition, hasRegulatingControl=False)) logging_message = 'Created totally %d branches' % len(branches) logger.info(logging_message) logger.info('END creating branches\n') return branches
####################################################################################################################
[docs] def _create_buses_and_gens(self, tnodes): """ Creates dictionary of :class:`Topology_BusBranch.Bus` -es and list of :class:`Topology_BusBranch.Generator` -s Create lists of ``PowerTransformer`` -s and ``ACLineSegment`` -s CIM objects Create lists of :class:`Topology_NodeBreaker.Node`, :class:`Topology_NodeBreaker.Substation`, :class:`Topology_NodeBreaker.Generator`, :class:`Topology_NodeBreaker.Load` and :class:`Topology_NodeBreaker.Shunt` -s """ logger.info('START creating buses') buses = {} self.cim_nodes = {} # To be used to find the node id in the self.node_breaker_data additional data structure cim_substations = {} generators = [] power_transformers = set() ac_lines = set() # First create dictionary for the buses for voltage_level, tnodes_set in self.voltage_levels.items(): # for each VoltageLevel for tnode in tnodes_set: self.cim_nodes[tnode] = Topology_NodeBreaker.Node(id_cim=tnode.UUID, name=tnode.name, desc=tnode.description) if voltage_level.Substation in cim_substations: cim_substations[voltage_level.Substation].node_id_list.extend([self.cim_nodes[key].id for key in tnodes_set]) else: cim_substations[voltage_level.Substation] = Topology_NodeBreaker.Substation(id_cim=voltage_level.Substation.UUID, name=voltage_level.Substation.name, node_id_list=[self.cim_nodes[key].id for key in tnodes_set]) for voltage_level, tnodes_set in self.voltage_levels.items(): # for each VoltageLevel # import switches tnodes_bus_sets_list = [] # list of sets of tnodes at the same bus for equip in voltage_level.Equipments: if isinstance(equip, self._get_cls('Switch')): term_from = equip.Terminals[0] term_to = equip.Terminals[1] tnode_from = self._find_tnode(term_from) tnode_to = self._find_tnode(term_to) add_switch_flag = True if bool(tnode_from.TopologicalIsland is not self.biggest_tnode_island) ^ bool(tnode_to.TopologicalIsland is not self.biggest_tnode_island): add_switch_flag = False logging_message = 'Switch "%s" is switched ON and is connected between two different TopologicalIslands (%s and %s) and thus will not be imported !!!' % ( equip.UUID, tnode_from.TopologicalIsland.UUID, tnode_to.TopologicalIsland.UUID) logger.error(logging_message) # raise AssertionError(logging_message) elif tnode_from.TopologicalIsland is not self.biggest_tnode_island and tnode_to.TopologicalIsland is not self.biggest_tnode_island: add_switch_flag = False if term_from.connected and term_to.connected: switch_status = True if (tnode_from.SvVoltage.v, tnode_from.SvVoltage.angle) != (tnode_to.SvVoltage.v, tnode_to.SvVoltage.angle): add_switch_flag = False logging_message = 'Switch "%s" is switched ON but has different voltage at its terminals!!!' logger.critical(logging_message) raise AssertionError(logging_message) else: switch_status = False if add_switch_flag: self.node_breaker_data.switches.append(Topology_NodeBreaker.Switch(node_from_id=self.cim_nodes[tnode_from].id, node_to_id=self.cim_nodes[tnode_to].id, id_cim=equip.UUID, name=equip.name, status=switch_status)) if switch_status: from_tnodes_bus_set = None for idx, tnodes_bus_set in enumerate(tnodes_bus_sets_list): if tnode_from in tnodes_bus_set: from_tnodes_bus_set = tnodes_bus_set from_idx = idx break to_tnodes_bus_set = None for idx, tnodes_bus_set in enumerate(tnodes_bus_sets_list): if tnode_to in tnodes_bus_set: to_tnodes_bus_set = tnodes_bus_set to_idx = idx break if from_tnodes_bus_set is not None and to_tnodes_bus_set is not None: if from_idx != to_idx: # merge tnode sets in one bus tnodes_bus_sets_list[from_idx] |= tnodes_bus_sets_list[to_idx] tnodes_bus_sets_list.pop(to_idx) elif from_tnodes_bus_set is not None and to_tnodes_bus_set is None: # add to the tnode to the set of tnodes for the corresponding bus from_tnodes_bus_set.add(tnode_to) elif from_tnodes_bus_set is None and to_tnodes_bus_set is not None: # add to the tnode to the set of tnodes for the corresponding bus to_tnodes_bus_set.add(tnode_from) elif from_tnodes_bus_set is None and to_tnodes_bus_set is None: tnodes_bus_sets_list.append({tnode_to, tnode_from}) # create buses for the tnodes connected by switches processed_tnodes = set() for tnodes_bus_set in tnodes_bus_sets_list: new_set = set() for tnode in tnodes_bus_set: if tnode.TopologicalIsland == self.biggest_tnode_island: new_set.add(tnode) if new_set: new_bus = Topology_BusBranch.Bus() for tnode in new_set: buses[tnode] = new_bus self.cim_nodes[tnode].bus_id = new_bus.id processed_tnodes.add(tnode) # fill in some Bus data based on the last imported TNode in the Bus new_bus.base_kv = self._get_base_voltage(tnode) new_bus.vm = tnode.SvVoltage.v / self._get_base_voltage(tnode) new_bus.va = tnode.SvVoltage.angle new_bus.area = self.cim_areas[tnode.ControlArea].id new_bus.zone = self.cim_zones[voltage_level.Substation.Region].id # create buses for the rest of the tnodes in the VoltageLevel for tnode in tnodes_set: if tnode not in processed_tnodes: new_bus = Topology_BusBranch.Bus( base_kv=self._get_base_voltage(tnode), vm=tnode.SvVoltage.v / self._get_base_voltage(tnode), va=tnode.SvVoltage.angle, area=self.cim_areas[tnode.ControlArea].id, zone=self.cim_zones[voltage_level.Substation.Region].id ) buses[tnode] = new_bus self.cim_nodes[tnode].bus_id = new_bus.id self.node_breaker_data.nodes = self.cim_nodes.values() self.node_breaker_data.substations = cim_substations.values() # find the swing bus voltage_levels_keys = self.voltage_levels.keys() swingbus_tnode = voltage_levels_keys[0].TopologicalNode[0].TopologicalIsland.AngleRef_TopologicalNode del voltage_levels_keys if not swingbus_tnode: logger.warn('No swing bus is given in the network!\n') for tnode, bus in buses.iteritems(): # for each TopologicalNode within the bus bus_pv = None # As of Matpower documentation bus_pd = 0.0 # real power demand (MW) bus_qd = 0.0 # reactive power demand (MVAr) bus_gs = 0.0 # shunt conductance (MW demanded at V = 1.0 p.u.) bus_bs = 0.0 # shunt susceptance (MVAr injected at V = 1.0 p.u.) try: prim_equip = tnodes[tnode] # tnodes is actually the tnode_equip from _iterate_prim_eq() except KeyError: # skips the nodes with zero voltage continue base_voltage = self._get_base_voltage(tnode) vmax = 2 vmin = 0 for equip in prim_equip: status = equip.Terminals[0].connected # This is only for the 1 terminal elements. The status of the 2 terminal elements is checked when they are transformed to branches if status: try: v_limits = [] for lim in equip.Terminals[0].OperationalLimitSet[0].OperationalLimitValue: if lim.__class__.__name__ == 'VoltageLimit': v_limits.append(lim.value / base_voltage) if v_limits: if max(v_limits) < vmax: vmax = max(v_limits) if min(v_limits) > vmin: vmin = min(v_limits) except: pass equip_cls = equip.__class__.__name__ ###################################### #### 1 terminal elements if equip_cls == 'SynchronousMachine': gen_name = equip.UUID try: # CIM16 pg = equip.p qg = equip.q except AttributeError: pg = -equip.Terminals[0].SvPowerFlow.p qg = -equip.Terminals[0].SvPowerFlow.q try: pg_max = equip.GeneratingUnit.maxOperatingP pg_min = equip.GeneratingUnit.minOperatingP gen_type = equip.GeneratingUnit.__class__.__name__.replace('GeneratingUnit','') # Some generator belong to the generic GeneratingUnit class, so their type will result in empty string except AttributeError: pg_max = 0 pg_min = 0 gen_type='' if equip.InitialReactiveCapabilityCurve: try: pc1 = equip.InitialReactiveCapabilityCurve.CurveDatas[0].xvalue pc2 = equip.InitialReactiveCapabilityCurve.CurveDatas[1].xvalue qc1_min = equip.InitialReactiveCapabilityCurve.CurveDatas[0].y1value qc2_min = equip.InitialReactiveCapabilityCurve.CurveDatas[1].y1value qc1_max = equip.InitialReactiveCapabilityCurve.CurveDatas[0].y2value qc2_max = equip.InitialReactiveCapabilityCurve.CurveDatas[1].y2value except AttributeError: pc1, pc2, qc1_min, qc2_min, qc1_max, qc2_max = 0, 0, 0, 0, 0, 0 else: pc1, pc2, qc1_min, qc2_min, qc1_max, qc2_max = 0, 0, 0, 0, 0, 0 if equip.minQ: Qmin_MVAr = equip.minQ else: if equip.InitialReactiveCapabilityCurve: Qmin_MVAr = equip.InitialReactiveCapabilityCurve.CurveDatas[0].y1value # ToDo: Does this needs to be recalculated? As far as I understand it is given for a specific P else: Qmin_MVAr = 0 if equip.maxQ: Qmax_MVAr = equip.maxQ else: if equip.InitialReactiveCapabilityCurve: Qmax_MVAr = equip.InitialReactiveCapabilityCurve.CurveDatas[0].y2value # ToDo: Does this needs to be recalculated? As far as I understand it is given for a specific P else: Qmax_MVAr = 0 vg_pu = tnode.SvVoltage.v / base_voltage generators.append(Topology_BusBranch.Generator( # bus is not created yet, fix later bus=tnode, pg=pg, qg=qg, pg_min=pg_min, pg_max=pg_max, qg_min=Qmin_MVAr, qg_max=Qmax_MVAr, vg=vg_pu, name=gen_name, online=status, pc1=pc1, pc2=pc2, qc1_min=qc1_min, qc1_max=qc1_max, qc2_min=qc2_min, qc2_max=qc2_max )) try: fuel_type = equip.GeneratingUnit.FossilFuels[0].fossilFuelType except AttributeError: fuel_type = '' self.node_breaker_data.generators.append(Topology_NodeBreaker.Generator(node_id=self.cim_nodes[tnode].id, id_cim=equip.UUID, name=equip.GeneratingUnit.name or equip.name, type=gen_type, mode=equip.operatingMode, fuel_type=fuel_type)) if equip.operatingMode == 'generator': bus_pv = Topology_BusBranch.bus_type.PV # ToDo: This logic for defining the type of bus might not be the best... Instead maybe look at VoltageControl? ###################################### elif equip_cls == 'EnergyConsumer': if status: try: bus_pd += equip.p bus_qd += equip.q except AttributeError: try: bus_pd += equip.Terminals[0].SvPowerFlow.p bus_qd += equip.Terminals[0].SvPowerFlow.q except AttributeError: logger.error('Load %s was not imported correctly!', equip.UUID) self.node_breaker_data.loads.append( Topology_NodeBreaker.Load(node_id=self.cim_nodes[tnode].id, id_cim=equip.UUID, name=equip.name, p_mw=equip.Terminals[0].SvPowerFlow.p, q_mvar=equip.Terminals[0].SvPowerFlow.q, status=status)) ###################################### elif equip_cls == 'ShuntCompensator': shunt_sections = equip.SvShuntCompensatorSections.continuousSections if status: bus_bs += shunt_sections * equip.bPerSection * base_voltage ** 2 bus_gs += shunt_sections * equip.gPerSection * base_voltage ** 2 self.node_breaker_data.shunts.append( Topology_NodeBreaker.Shunt(node_id=self.cim_nodes[tnode].id, id_cim=equip.UUID, name=equip.name, status=status, gPerSection_MVAr=equip.gPerSection * base_voltage ** 2, bPerSection_MVAr=equip.bPerSection * base_voltage ** 2, maximumSections=equip.maximumSections, numActiveSections=equip.SvShuntCompensatorSections.continuousSections, )) ###################################### #### 2 terminal elements elif equip_cls == 'TransformerWinding': if equip.PowerTransformer: power_transformers.add(equip.PowerTransformer) elif equip_cls == 'PowerTransformer': power_transformers.add(equip) ###################################### elif equip_cls == 'ACLineSegment': ac_lines.add(equip) ###################################### elif equip_cls == 'SeriesCompensator': logging_message = 'SeriesCompensator %s is NOT implemented yet and thus is NOT imported!' % equip.UUID logger.error(logging_message) ###################################### elif equip_cls == 'StaticVarCompensator': logging_message = 'StaticVarCompensator %s is NOT implemented yet and thus is NOT imported!' % equip.UUID logger.error(logging_message) ###################################### elif equip_cls == 'DCLineSegment': logging_message = 'DCLineSegment %s is NOT implemented yet and thus is NOT imported!' % equip.UUID logger.error(logging_message) ###################################### elif equip_cls == 'RectifierInverter': logging_message = 'RectifierInverter %s is NOT implemented yet and thus is NOT imported!' % equip.UUID logger.error(logging_message) if tnode is swingbus_tnode: bus_swing = Topology_BusBranch.bus_type.REF else: bus_swing = None if bus.vm == 0: bus_isolated = Topology_BusBranch.bus_type.ISOLATED if bus.vm != 0: logging_message = 'TNode %s has zero voltage with angle %f degrees !' %(tnode.UUID, bus.va) logger.warn(logging_message) else: if bus.vm < 0.9 or bus.vm > 1.1: logging_message = 'TNode %s has voltage %f !' %(tnode.UUID, bus.vm) logger.warn(logging_message) bus_isolated = None tnode_bus_type = (bus_isolated or bus_swing or bus_pv or Topology_BusBranch.bus_type.PQ) if vmax < 1.9: # 1.2: bus.vm_max = vmax if vmin > 0.1: # 0.8 bus.vm_min = vmin if bus.btype < tnode_bus_type: bus.btype = tnode_bus_type bus.pd += bus_pd bus.qd += bus_qd bus.bs += bus_bs bus.gs += bus_gs # if bus.cim_classes is not None: # bus.cim_classes.append(cim_classes) # bus.cim_classes.sort() # else: # bus.cim_classes = cim_classes # if bus.pos is not None: # bus.pos.union(position_points) # else: # bus.pos = position_points logging_message = 'Created totally %d buses' % len(set(buses.values())) logger.info(logging_message) logger.info('END creating buses\n') return buses, generators, power_transformers, ac_lines
####################################################################################################################
[docs] def _get_position_points(self, equipment): """ Returns the set of position points (:class:`Topology_BusBranch.Point`) for *equipment*. """ loc = equipment.Location if not loc: return set() pos = set([Topology_BusBranch.Point(float(p.xPosition), float(p.yPosition)) for p in loc.PositionPoints]) return pos
####################################################################################################################
[docs] def _get_base_voltage(self, tnode): """ Tries to get a base voltage from *tnode* (TopologicalNode) to from its connectivity nodes. Raises a :class:`ValueError` if no base voltage can be found or if the base voltages of the connectivity nodes differ from each other. """ if tnode.BaseVoltage: return tnode.BaseVoltage.nominalVoltage if (hasattr(tnode.ConnectivityNodeContainer, 'BaseVoltage') and tnode.ConnectivityNodeContainer.BaseVoltage): return tnode.ConnectivityNodeContainer.BaseVoltage.nominalVoltage base_voltage = None for cn in tnode.ConnectivityNodes: if not cn.ConnectivityNodeContainer or not hasattr( cn.ConnectivityNodeContainer, 'BaseVoltage'): continue bv = cn.ConnectivityNodeContainer.BaseVoltage.nominalVoltage if not bv: continue if not base_voltage: base_voltage = bv elif bv != base_voltage: raise ValueError('Base voltage %s of %s deffers from %s' % (bv, cn.mRID, base_voltage)) if base_voltage: return base_voltage return base_voltage
####################################################################################################################
[docs] def _iterate_prim_eq(self, cim_objects): """ Iterates over all *cim_objects* and creates ``TopologicalNode`-s (if they were not present). Find the self.biggest_tnode_island and imports ``TopologicalNode`-s only from it. Create a list of the control Areas and the load Zones. """ self.node_breaker_data = Topology_NodeBreaker.Topology_NodeBreaker(CIM_filenames=self.cim_files) # will store all data from the CIM files which cannot be directly mapped to the Matpower structure self.cim_areas = {} self.cim_zones = {} processed = set() # Contain all processed connectivity nodes tnode_equipment = {} # Top. nodes and adjacent equipment lines = set() # Contains all lines base_power = None BasePower = self._get_cls('BasePower') TopologicalNode = self._get_cls('TopologicalNode') tnode_islands = {} # First iteration over all CIM objects - in order to create the TopologicalNodes (if not present) for mrid, obj in cim_objects.items(): if isinstance(obj, self._prim_onet) or isinstance(obj, self._prim_twot): if hasattr(obj, 'Terminals'): for term in obj.Terminals: cnode = term.ConnectivityNode if cnode: if cnode in processed: tnode = cnode.TopologicalNode else: tnode = TopologicalNode() self._process_cnode(tnode, term, processed) # Append obj to the tnodes equipment tnode_equipment.setdefault(tnode, []).append(obj) if tnode.TopologicalIsland not in tnode_islands: tnode_islands[tnode.TopologicalIsland] = len(tnode.TopologicalIsland.TopologicalNodes) else: tnode = term.TopologicalNode tnode_equipment.setdefault(tnode, []).append(obj) if tnode.TopologicalIsland not in tnode_islands: tnode_islands[tnode.TopologicalIsland] = len(tnode.TopologicalIsland.TopologicalNodes) elif isinstance(obj, BasePower): assert not base_power, ('There should be only one BasePower instance.') base_power = float(obj.basePower) elif isinstance(obj, self._get_cls('ControlArea')): self.cim_areas[obj] = Topology_NodeBreaker.Area(id_cim=obj.UUID, name=obj.name, netInterchange=obj.netInterchange, pTolerance=obj.pTolerance) elif isinstance(obj, self._get_cls('SubGeographicalRegion')): self.cim_zones[obj] = Topology_NodeBreaker.Zone(id_cim=obj.UUID, name=obj.name) self.node_breaker_data.areas = self.cim_areas.values() self.node_breaker_data.zones = self.cim_zones.values() # find biggest_tnode_island max_num_tnodes = max(tnode_islands.values()) for tnode_island in tnode_islands: if tnode_islands[tnode_island] == max_num_tnodes: self.biggest_tnode_island = tnode_island logging_message = 'The biggest topological island is %s. Only elements in it will be imported!!!\n' % self.biggest_tnode_island.UUID logger.info(logging_message) if base_power is None: base_power = 100.0 # default basepower # Second iteration over all CIM objects - in order to import objects only from the biggest TopologicalIsland self.voltage_levels = {} for mrid, obj in cim_objects.items(): if isinstance(obj, self._get_cls('Terminal')): try: tnode = obj.ConnectivityNode.TopologicalNode except AttributeError: try: tnode = obj.TopologicalNode except AttributeError: print( 'Skipped %s of class %s .' % (obj.UUID, obj.__class__.__name__)) # this was just for testing continue if tnode.ConnectivityNodeContainer: if tnode.TopologicalIsland == self.biggest_tnode_island: self.voltage_levels.setdefault(tnode.ConnectivityNodeContainer, set()).add(tnode) else: print('TNode %s does not have ConnectivityNodeContainer.' % (tnode.name or tnode.UUID)) return base_power, tnode_equipment
####################################################################################################################
[docs] def _process_cnode(self, tnode, src_terminal, processed): """ Recursively processes the connectivity node connected to *src_terminal* and adds it to terminals. You need to pass the terminal and not the connectivity node, because we need the terminal to keep track where we are coming from and to not to visit the same terminal again. """ cnode = src_terminal.ConnectivityNode if cnode in processed: return tnode.addConnectivityNodes(cnode) processed.add(cnode) for terminal in cnode.Terminals: if terminal is src_terminal: continue ce = terminal.ConductingEquipment # Recursivly process other nodes if the terminal connects to a # rectifier inverter or a closed switch. if (isinstance(ce, self._get_cls('RectifierInverter')) or (isinstance(ce, self._get_cls('Switch')) and (not ce.normalOpen or ce.retain))): assert len(ce.Terminals) == 2 other_term = ce.Terminals[1] if terminal is ce.Terminals[0] \ else ce.Terminals[0] self._process_cnode(tnode, other_term, processed)
####################################################################################################################
[docs] def _get_equipment_cls(self, names): """Returns a touple containing the class objects for *names*.""" classes = [] for name in names: # Not all classes in each version of CIM try: classes.append(self._get_cls(name)) except KeyError: pass return tuple(classes)
####################################################################################################################
[docs] def _get_cls(self, name): """Imports and returns the class object for *name*.""" module = __import__(self._package_map[name], globals(), locals(), [name]) return getattr(module, name)