Source code for pysgems.io.sgio

#  Copyright (c) 2020. Robin Thibaut, Ghent University
import os
import struct
from os.path import join as jp

import numpy as np
import pandas as pd
from loguru import logger

from pysgems.base.packbase import Package


[docs]def datread(file: str = None, start: int = 0, end: int = None): """Reads space separated dat file :param file: path to the file to be read :param start: start index :param end: end index :return: numpy array of the data """ try: with open(file, "r") as fr: lines = np.copy(fr.readlines())[start:end] try: op = np.array([list(map(float, line.split())) for line in lines]) except ValueError: op = [line.split() for line in lines] except FileNotFoundError: logger.error(f"File {file} not found.") op = None return op
[docs]def write_point_set(file_name: str, sub_dataframe: pd.DataFrame, nodata: int = -999): # TODO: build similar method to save grid files. """ Function to write sgems binary point set files. The Simulacre_input_filter class is a filter that can read the default file format of GsTLAppli. The format is a binary format, with big endian byte order. Following are a description of the file formats for the pointset and the cartesian grid objects. All file formats begin with magic number 0xB211175D, a string indicating the type of object stored in the file, the name of the object, and a version number (Q_INT32). The rest is specific to the object stored: - point-set: a Q_UINT32 indicating the number of points in the object. a Q_UINT32 indicating the number of properties in the object strings containing the names of the properties the x,y,z coordinates of each point, as floats all the property values, one property at a time, in the order specified by the strings of names, as floats. For each property there are as many values as points in the point-set. - cartesian grid: 3 Q_UINT32 indicating the number of cells in the x,y,z directions 3 floats for the dimensions of a single cell 3 floats for the origin of the grid a Q_UINT32 indicating the number of properties all the property values, one property at a time, in the order specified by the strings of names, as floats. For each property, there are nx*ny*nz values (nx,ny,nz are the number of cells in the x,y,z directions). :param nodata: nodata value, rows containing this value are omitted. :param file_name: name of the file to be written. :param sub_dataframe: Sub-frame of the feature to be exported [x, y, feature value] """ # First, rows with no data occurrence are popped sub_dataframe = sub_dataframe[(sub_dataframe != nodata).all(axis=1)] xyz = sub_dataframe[["x", "y", "z"]].to_numpy() pp = sub_dataframe.columns[-1] # Get name of the property grid_name = f"{pp}_grid" ext = ".sgems" if ext not in file_name: file_name += ext with open(file_name, "wb") as wb: wb.write(struct.pack("i", int(1.561792946e9))) # Magic number wb.write(struct.pack(">i", 10)) # Length of 'Point_set' + 1 with open(file_name, "a") as wb: wb.write("Point_set") # Type of file with open(file_name, "ab") as wb: wb.write(struct.pack(">b", 0)) # Signed character 0 after str with open(file_name, "ab") as wb: wb.write(struct.pack(">i", len(grid_name) + 1)) # Length of 'grid' + 1 with open(file_name, "a") as wb: wb.write(grid_name) # Name of the grid on which points are saved with open(file_name, "ab") as wb: wb.write(struct.pack(">b", 0)) # Signed character 0 after str with open(file_name, "ab") as wb: wb.write(struct.pack(">i", 100)) # version number wb.write(struct.pack(">i", len(xyz))) # n data points wb.write(struct.pack(">i", 1)) # n property with open(file_name, "ab") as wb: wb.write(struct.pack(">i", len(pp) + 1)) # Length of property name + 1 with open(file_name, "a") as wb: wb.write(pp) # Property name with open(file_name, "ab") as wb: wb.write(struct.pack(">b", 0)) # Signed character 0 after str with open(file_name, "ab") as wb: for c in xyz: ttt = struct.pack(">fff", c[0], c[1], c[2]) # Coordinates x, y, z wb.write(ttt) with open(file_name, "ab") as wb: for v in sub_dataframe[pp]: wb.write(struct.pack(">f", v)) # Values
[docs]def export_eas(dataframe: pd.DataFrame, filename: str = "dataset"): """Exports a Pandas DataFrame to geo-eas format :param dataframe: Pandas DataFrame to be exported :param filename: name of the file to be written """ columns = list(dataframe.columns.values) name = os.path.basename(filename).split(".")[0] header = [name, str(len(columns))] + columns ri = dataframe.iterrows() rows = [" ".join(list(map(str, r[1]))) for r in ri] lines = header + rows with open(filename + ".eas", "w") as eas: eas.writelines("\n".join(lines))
[docs]class PointSet(Package):
[docs] def __init__(self, project, pointset_path: str = None, force_2d: bool = False): """ Handles the creation of a PointSet object. :param project: Project instance :param pointset_path: Path to the pointset file :param force_2d: If True, the pointset is forced to be 2D """ # Initiate from parent Package.__init__(self, project) self.object_file_names = [] self.project_name = self.parent.project_name self.file_path = pointset_path self.res_dir = self.parent.res_dir self.dimension = 2 # Load raw data self.raw_data, self.project_name, self.columns = self.loader() # Put into panda DataFrame self.dataframe = pd.DataFrame(data=self.raw_data, columns=self.columns) # Assess the dimensionality try: # If Z (depth) information exists, sets the dimensionality to 3 self.xyz = self.dataframe[["x", "y", "z"]].to_numpy() self.dimension = 3 if self.parent.verbose: logger.info("3D dataset detected.") if force_2d: if self.parent.verbose: logger.info("3D dataset detected, but 2D will be enforced.") self.dataframe["z"] = np.zeros(self.dataframe.shape[0]) except KeyError: # Assumes 2D dataset if self.parent.verbose: logger.info("2D dataset detected.") # Replaces Z dimension by 0's self.dataframe.insert(2, "z", np.zeros(self.dataframe.shape[0])) self.columns = list(self.dataframe.columns.values) self.xyz = self.dataframe[["x", "y", "z"]].to_numpy() self.dimension = 2 # Set attribute to parent setattr(self.parent, "point_set", self)
# Load sgems dataset
[docs] def loader(self): """Parse dataset in GSLIB format""" project_info = datread(self.file_path, end=2) # Name, n features project_name = project_info[0][0].lower() # Project name (lowered) # Number of features len([x, y, f1, f2... fn]) n_features = int(project_info[1][0]) # Name of features (x, y, z, f1...) head = datread(self.file_path, start=2, end=2 + n_features) columns_name = [h[0].lower() for h in head] # Column names (lowered) data = datread(self.file_path, start=2 + n_features) # Raw data if self.parent.verbose: logger.info("Data loaded.") return data, project_name, columns_name
[docs] def export_01(self, features: list = None): """ Gives a list of point set names to be saved in sgems binary format and saves them to the result directory :param features: Names of features to export """ if (not isinstance(features, list)) and (features is not None): features = [features] if features is None: features = self.dataframe.columns.values[3:] for pp in features: # Extract x, y, z, values subframe = self.dataframe[["x", "y", "z", pp]] ps_name = jp(self.res_dir, pp) # Path of binary file write_point_set(ps_name, subframe) # Write binary file if ( pp not in self.parent.object_file_names ): # Adding features name to load them within sgems self.parent.object_file_names.append(pp) if self.parent.verbose: logger.info(f"Feature {pp} exported to binary file")