Source code for ansys_optical_automation.post_process.dpf_xmp_viewer

import csv
import os
import sys

import numpy

if "nt" in os.name:
    from comtypes import automation
    from comtypes import pointer
else:
    pass

from ansys_optical_automation.post_process.dpf_base import DataProcessingFramework

supported_map_types = [2, 3]
supported_value_types = [0, 1, 2, 20]
supported_unit_types = [0, 1, 9]
axis_unit_types = [
    "default",
    "millimetre",
    "degree",
    "radian",
    "feet",
    "micrometer",
    "nanometer",
    "meter",
    "percent",
    "dB",
    "invert millimeter",
    "no unit",
    "wave",
    "centimeter",
    "inch",
    "milliradian",
    "arc minute",
    "arc second",
]


[docs]class MapStruct: """Provides a DPF to represent the data stored in an Ansys Speos XMP file""" def __init__( self, map_type, value_type, intensity_type, unit_type, axis_unit, size, resolution, layers=1, layer_name=None, wl_res=None, ): """ Initialize the XMP MapStructure to create and edit XMP data. Currently only limited data is supported. Parameters ---------- map_type : int 2 for spectral, 3 for extended value_type : int 0 for Irradiance, 1 for Intensity, 2 for Radiance, 20 for refractive Power unit_type : int 0 for Radiometric, 1 for Photometric, 9 for Diopter axis_unit : int 0 = Default, 1 = Millimeter, 2 = Degree, 3 = Radian, 4 = Feet, 5 = Micrometer, 6 = Nanometer, 7 = Meter, 8 = Percent, 9 = dB, 10 = Invert Millimeter, 11 = No Uni, 12 = Wave size : list of floats [XMin, XMax, YMin, YMax] map dimensions resolution : list of int resolution in [x,y] wl_res : list of int [Wstart, Wend, wNb] wstart and end wavlength and the resolution layers : int number of layers please not power values need to be defined later layer_name : list list of str for the layer name """ if map_type not in supported_map_types: msg = "Map type (value: " + str(map_type) + ") not supported" raise TypeError(msg) else: self.map_type = map_type if value_type not in supported_value_types: msg = "Value type (value: " + str(value_type) + ") not supported" raise TypeError(msg) else: self.value_type = value_type if unit_type not in supported_unit_types: msg = "Map type (value: " + str(unit_type) + ") not supported" raise TypeError(msg) else: self.unit_type = unit_type if len(size) != 4: msg = "Please provide correct input required" raise ValueError(msg) elif size[1] - size[0] <= 0: msg = "xMin (" + str(size[0]) + ") must be smaller then xMax (" + str(size[1]) + ")" raise ValueError(msg) elif size[3] - size[2] <= 0: msg = "yMin (" + str(size[0]) + ") must be smaller then yMax (" + str(size[1]) + ")" raise ValueError(msg) if not all(item > 0 for item in resolution): msg = "resolution must be a positive integer" raise ValueError(msg) if axis_unit not in range(13): msg = "Please provide a valid axis unit (value: " + str(axis_unit) raise ValueError(msg) else: self.axis_unit = axis_unit self.xMin = size[0] self.xMax = size[1] self.xNb = int(abs(resolution[0])) self.yMin = size[2] self.yMax = size[3] self.yNb = int(abs(resolution[1])) self.width = size[1] - size[0] self.height = size[3] - size[2] self.wl_res = wl_res self.comment = "" self.intensity_type = intensity_type if layers <= 0 or layers != int(layers): msg = "Please provide a positive layer integer" raise ValueError(msg) else: print( "Warning power values for layers are set to 1 and need to be manually" "adjusted before export only 1 unit type is supported both will have the same value" ) self.layers = layers self.layer_powers = numpy.ones(self.layers) if type(layer_name) is list: if len(layer_name) < self.layers: self.layer_name = range(self.layers) print("layername list to short is overwritten") else: self.layer_name = layer_name else: self.layer_name = range(self.layers) print("Layernames not provided were overwritten") if self.map_type == 3: if wl_res is None: self.data = numpy.zeros((self.layers, self.xNb, self.yNb, 1)) elif len(wl_res) != 3 or wl_res[1] - wl_res[0] <= 0: msg = "Please provide a valid wavelength range: \n" + str(wl_res) + "\n is not valid" raise ValueError(msg) else: self.wStart = wl_res[0] self.wEnd = wl_res[1] self.wNb = abs(int(wl_res[2])) self.data = numpy.zeros((self.layers, self.xNb, self.yNb, self.wNb)) elif map_type == 2: if wl_res is None: self.data = numpy.zeros((self.layers, self.xNb, self.yNb, 4)) elif len(wl_res) != 3 or wl_res[1] - wl_res[0] <= 0: msg = "Please provide a valid wavelength range: \n" + str(wl_res) + "\n is not valid" raise ValueError(msg) else: self.wStart = wl_res[0] self.wEnd = wl_res[1] self.wNb = abs(int(wl_res[2])) self.data = numpy.zeros((self.layers, self.xNb, self.yNb, self.wNb)) self.export_name = "export_mapstruct"
[docs] def valid_dir(self, str_path): """ Check if a folder is present and, if not, create it. Parameters ---------- str_path : str Path for the folder to validate or create. For example, ``r"C:\\temp\"``. Returns ------- None """ if not os.path.isdir(str_path): os.makedirs(str_path)
def __export_to_text(self, export_path): """ function to export current map struct to xmp TXT export. Parameters ---------- export_path : export dir Returns ------- None """ self.valid_dir(export_path) file_name = os.path.join(export_path, self.export_name + ".txt") file_export = open(file_name, "w") file_export.writelines(str(self.map_type) + "\n") file_export.writelines(str(self.value_type) + "\t" + str(self.intensity_type) + "\n") file_export.writelines(str(self.unit_type) + "\n") file_export.writelines(str(self.axis_unit) + "\n") file_export.writelines( str(self.xMin) + "\t" + str(self.xMax) + "\t" + str(self.yMin) + "\t" + str(self.yMax) + "\n" ) file_export.writelines(str(self.xNb) + "\t" + str(self.yNb) + "\n") if self.map_type == 2 and self.wl_res is not None: file_export.writelines(str(self.wStart) + "\t" + str(self.wEnd) + "\t" + str(self.wNb) + "\n") str_layer = "SeparatedByLayer" + "\t" + str(self.layers) for i in range(self.layers): str_layer += "\t" + str(self.layer_powers[i]) + "\t" + str(self.layer_powers[i]) file_export.writelines(str_layer + "\n") elif self.map_type == 2 and self.wl_res is None: str_layer = "-1" + "\t" + "SeparatedByLayer" + "\t" + str(self.layers) for i in range(self.layers): str_layer += "\t" + str(self.layer_powers[i]) file_export.writelines(str_layer + "\n") elif self.map_type == 3 and self.wl_res is not None: file_export.writelines(str(self.wStart) + "\t" + str(self.wEnd) + "\t" + str(self.wNb) + "\n") str_layer = str(self.layers) for i in range(self.layers): str_layer += "\t" + str(self.layer_powers[i]) + "\t" + str(self.layer_powers[i]) elif self.map_type == 3 and self.wl_res is None: str_layer = str(self.layers) for i in range(self.layers): str_layer += "\t" + str(self.layer_powers[i]) file_export.writelines(str_layer + "\n") if self.map_type == 2 and self.wl_res is not None: for i in range(self.layers): if type(self.layer_name[i]) == str: file_export.writelines(self.layer_name[i] + "\n") else: file_export.writelines("layer" + str(i) + "\n") for wl in range(self.wNb): if wl != 0: file_export.writelines("\n") for y in range(self.yNb): for x in range(self.xNb): file_export.writelines(str(self.data[i, x, y, wl]) + "\t") file_export.writelines("\n") elif self.map_type == 2 and self.wl_res is None: str_layer = "" for i in range(self.layers): str_layer += str(self.layer_powers[i]) + "\t" file_export.writelines(str_layer + "\n") for i in range(self.layers): if type(self.layer_name[i]) == str: file_export.writelines(self.layer_name[i] + "\n") else: file_export.writelines("layer" + str(i) + "\n") for y in range(self.yNb): for x in range(self.xNb): file_export.writelines( str(self.data[i, x, y, 0]) + "\t" + str(self.data[i, x, y, 1]) + "\t" + str(self.data[i, x, y, 2]) + "\t" + str(self.data[i, x, y, 3]) + "\t" ) file_export.writelines("\n") elif self.map_type == 3 and self.wl_res is not None: for i in range(self.layers): if type(self.layer_name[i]) == str: file_export.writelines(self.layer_name[i] + "\n") else: file_export.writelines("layer" + str(i) + "\n") for wl in range(self.wNb): if wl != 0: file_export.writelines("\n") for y in range(self.yNb): for x in range(self.xNb): file_export.writelines(str(self.data[i, x, y, wl]) + "\t") file_export.writelines("\n") elif self.map_type == 3 and self.wl_res is None: for i in range(self.layers): if type(self.layer_name[i]) == str: file_export.writelines(self.layer_name[i] + "\n") else: file_export.writelines("layer" + str(i) + "\n") for y in range(self.yNb): for x in range(self.xNb): file_export.writelines(str(self.data[i, x, y, 0]) + "\t") file_export.writelines("\n") file_export.close()
[docs] def export_to_xmp(self, export_path=r"C:\temp"): """ this function exports the current mapstruct in the dir defined as Mapstruct.xmp. Parameters ---------- export_path: str path to be exported. Default at C:\temp Returns ------- DpfXmpViewer object """ xmp = DpfXmpViewer() txt_file = os.path.join(export_path, self.export_name + ".txt") xmp_file = os.path.join(export_path, self.export_name + ".xmp") self.__export_to_text(export_path) xmp.read_txt_export(txt_file) os.remove(txt_file) xmp.dpf_instance.SaveFile(xmp_file) return xmp
[docs]class DpfXmpViewer(DataProcessingFramework): """ Provides for launching Speos postprocessing software, XMP Viewer. This framework is used to interact with the software and automatically perform analysis and postprocessing on the simulation results """ def __init__(self): """Initialize DPF as HDRIViewer.""" DataProcessingFramework.__init__(self, extension=".xmp", application="XMPViewer.Application") self.source_list = []
[docs] def open_file(self, str_path): """Open a file in DPF. Parameters ---------- str_path : str Path for the file to open. For example, ``r"C:\\temp\\Test.speos360"``. Returns ------- None """ if not os.path.isfile(str_path): # check if file is existing. raise FileNotFoundError("File is not found.") if not str_path.lower().endswith(tuple(self.accepted_extensions)): # check if accept extensions raise TypeError( str_path.lower().split(".")[len(str_path.lower().split(".")) - 1] + " is not an" + "accepted extension" ) self.file_path = str_path if self.dpf_instance.OpenFile(str_path): self.source_list = [] if self.dpf_instance.MapType == 2 or self.dpf_instance.MapType == 3: self.__get_source_list() else: raise ImportError("Opening the file failed.")
[docs] def export(self, format="txt"): """ function to export an XMP map to another format Parameters ---------- format : str export format allowed values ["txt", "png", "bmp", "jpg", "tiff", "pf", "ies", "ldt", "extended.txt"] """ allowed_exports = ["txt", "png", "bmp", "jpg", "tiff", "pf", "ies", "ldt", "extended.txt"] if format in allowed_exports: export_path = self.file_path + "export." + format if format == "txt": if not self.dpf_instance.ExportTXT(export_path): msg = format + " export failed" raise Exception(msg) elif format == "pf": if not self.dpf_instance.ExportPF(export_path): msg = format + " export failed" raise Exception(msg) elif format in ["ies", "ldt"]: if not self.dpf_instance.ExportXMPtoIntensity(export_path): msg = format + " export failed" raise Exception(msg) elif format in ["png", "bmp", "jpg", "tiff"]: if not self.dpf_instance.ExportXMPImage(export_path): msg = format + " export failed" raise Exception(msg) if format == "extended.txt": if not self.dpf_instance.ExtendedExportTXT(export_path, 1): msg = format + " export failed" raise Exception(msg) else: msg = "cannot export data to: " + format raise ValueError(msg) return export_path
def __read_txt_export(self, xmp_map_struct, txt_path): """ function to load txt content into a XmpStruct. Parameters ---------- xmp_map_struct : MapStruct class of MapStruct txt_path : str text file directory Returns ------- MapStruct: a MapStruct whose values are filled with content from text file provided. """ with open(txt_path, "r") as file: my_data = csv.reader(file, delimiter="\t") if self.dpf_instance.Maptype == 2 and xmp_map_struct.wl_res is not None: for i in range(8): line = next(my_data) if i == 6: if line[0] == "-1" and len(line) == 1: next(my_data) for layer_idx in range(xmp_map_struct.layers): next(my_data) for wavelength_idx in range(xmp_map_struct.wl_res[2]): if wavelength_idx != 0: next(my_data) for xmp_value_idy in range(xmp_map_struct.yNb): line = next(my_data) for xmp_value_idx, xmp_value in enumerate(line): if xmp_value != "": xmp_map_struct.data[ layer_idx, xmp_value_idx, xmp_value_idy, wavelength_idx ] = xmp_value elif self.dpf_instance.Maptype == 3 and xmp_map_struct.wl_res is not None: for i in range(8): line = next(my_data) if i == 6: if line[0] == "-1" and len(line) == 1: next(my_data) for layer_idx in range(xmp_map_struct.layers): next(my_data) for wavelength_idx in range(xmp_map_struct.wl_res[2]): if wavelength_idx != 0: next(my_data) for xmp_value_idy in range(xmp_map_struct.yNb): line = next(my_data) for xmp_value_idx, xmp_value in enumerate(line): if xmp_value != "": xmp_map_struct.data[ layer_idx, xmp_value_idx, xmp_value_idy, wavelength_idx ] = xmp_value elif self.dpf_instance.Maptype == 3 and xmp_map_struct.wl_res is None: for i in range(7): line = next(my_data) if i == 6: if line[0] == "-1" and len(line) == 1: next(my_data) for layer_idx in range(xmp_map_struct.layers): line = next(my_data) for xmp_value_idy in range(xmp_map_struct.yNb): line = next(my_data) for xmp_value_idx, xmp_value in enumerate(line): if xmp_value != "": xmp_map_struct.data[layer_idx, xmp_value_idx, xmp_value_idy, 0] = xmp_value elif self.dpf_instance.Maptype == 2 and xmp_map_struct.wl_res is None: for i in range(7): line = next(my_data) if i == 6: if line[0] == "-1" and len(line) == 1: next(my_data) for layer_idx in range(xmp_map_struct.layers): line = next(my_data) for xmp_value_idy in range(xmp_map_struct.yNb): line = next(my_data) for xmp_value_idx, xmp_value in enumerate(line): if xmp_value != "": xmp_map_struct.data[ layer_idx, int(xmp_value_idx / 4), xmp_value_idy, xmp_value_idx % 4 ] = xmp_value else: msg = "Currently not supported" raise ImportError(msg) return xmp_map_struct
[docs] def read_txt_export(self, txt_path, inc_data=False): """ Parameters ---------- txt_path : str string pointing to the textfile inc_data : bool Boolean to determine if to include data matrix as list currently not working correctly Returns ------- MapStruct: MapStruct created according to the text file. """ import_response = self.dpf_instance.ImportTXT(txt_path) self.source_list = [] self.__get_source_list() if not import_response: msg = "Provided text file cannot be imported, Please check your text file content" raise ImportError(msg) if inc_data: variant = automation.VARIANT(5) xmp_maptype = self.dpf_instance.Maptype xmp_value_type = self.dpf_instance.ValueType xmp_intensity_type = self.dpf_instance.GetIntensityType xmp_unit_type = self.dpf_instance.UnitType xmp_axis_unit = axis_unit_types.index(self.dpf_instance.GetAxisUnitName) xmp_size = [self.dpf_instance.XMin, self.dpf_instance.XMax, self.dpf_instance.YMin, self.dpf_instance.YMax] xmp_resolution = [self.dpf_instance.XNb, self.dpf_instance.YNb] xmp_layer = len(self.source_list) if xmp_maptype in supported_map_types: if self.dpf_instance.GetSampleCRI(0, 0, 2, pointer(variant)): # this is a spectral map xmp_wl_res = [self.dpf_instance.WMin, self.dpf_instance.WMax, self.dpf_instance.WNb] xmp_map_struct = MapStruct( xmp_maptype, xmp_value_type, xmp_intensity_type, xmp_unit_type, xmp_axis_unit, xmp_size, xmp_resolution, xmp_layer, self.source_list, xmp_wl_res, ) return self.__read_txt_export(xmp_map_struct, txt_path) else: xmp_map_struct = MapStruct( xmp_maptype, xmp_value_type, xmp_intensity_type, xmp_unit_type, xmp_axis_unit, xmp_size, xmp_resolution, xmp_layer, layer_name=self.source_list, ) return self.__read_txt_export(xmp_map_struct, txt_path) else: msg = "type of map are not supported currently" raise ImportError(msg)
def __get_source_list(self): """ Get the source list stored in the simulation result. Returns ------- list List of sources available in the postprocessing file. """ if "Iron" in sys.version: raise Exception("IronPython not supported") else: if self.dpf_instance.MapType == 2 or self.dpf_instance.MapType == 3: self.source_list = [] total_sources = self.dpf_instance.ExtendedGetNbSource for layer in range(total_sources): name = automation.VARIANT() self.dpf_instance.ExtendedGetSourceName(layer, pointer(name)) self.source_list.append(name.value[0]) return self.source_list else: msg = "MapType=" + self.dpf_instance.MapType + " not support." raise TypeError(msg)
[docs] def export_template_measures(self, template_path, export_path): """ Function to import measurement tamplate and export measures as text Parameters ---------- template_path : str path to XML template file export_path : str path to measurement text export Returns ------- None """ if not self.dpf_instance.ImportTemplate(template_path, 1, 1, 1): raise ImportError("Template import failed") else: if not self.dpf_instance.MeasuresExportTXT(export_path): raise ValueError("Measurement export failed")
[docs] def rect_export_spectrum(self, x_pos, y_pos, width, height): """ Parameters ---------- x_pos : float x position of rectangle y_pos : float y position of rectangle width : float width or rectangle height : float height of rectangle Returns ------- return list of wavelength and list of energy values """ if self.dpf_instance.MapType == 2: w_nb = self.dpf_instance.SpectralGetNbWavelength w_min = self.dpf_instance.WMin w_max = self.dpf_instance.WMax wavelength = [] energy = [] signal = [wavelength, energy] for i, WL in enumerate(range(int(w_min), int(w_max), int((w_max - w_min) / (w_nb - 1)))): signal[0].append(WL) self.dpf_instance.SpectralSetActiveWavelength(i) signal[1].append(self.dpf_instance.SurfaceRectangleCalculation(x_pos, y_pos, width, height)[6]) return signal else: msg = "NonSupportedMap" raise Exception(msg)