refactor: excel parse
This commit is contained in:
@@ -0,0 +1,431 @@
|
||||
# Copyright (c) 2022-2024, Manfred Moitzi
|
||||
# License: MIT License
|
||||
from __future__ import annotations
|
||||
from typing import (
|
||||
Any,
|
||||
Sequence,
|
||||
Iterator,
|
||||
Union,
|
||||
List,
|
||||
TYPE_CHECKING,
|
||||
Optional,
|
||||
)
|
||||
from typing_extensions import TypeAlias
|
||||
import math
|
||||
from datetime import datetime
|
||||
|
||||
from . import const
|
||||
from .const import ParsingError, InvalidLinkStructure
|
||||
from .hdr import AcisHeader
|
||||
from .abstract import (
|
||||
AbstractEntity,
|
||||
AbstractBuilder,
|
||||
DataLoader,
|
||||
DataExporter,
|
||||
EntityExporter,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .entities import AcisEntity
|
||||
from ezdxf.math import Vec3
|
||||
|
||||
SatRecord: TypeAlias = List[str]
|
||||
|
||||
|
||||
class SatEntity(AbstractEntity):
|
||||
"""Low level representation of an ACIS entity (node)."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
attr_ptr: str = "$-1",
|
||||
id: int = -1,
|
||||
data: Optional[list[Any]] = None,
|
||||
):
|
||||
self.name = name
|
||||
self.attr_ptr = attr_ptr
|
||||
self.id = id
|
||||
self.data: list[Any] = data if data is not None else []
|
||||
self.attributes: "SatEntity" = None # type: ignore
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}({self.id})"
|
||||
|
||||
|
||||
NULL_PTR = SatEntity("null-ptr", "$-1", -1, tuple()) # type: ignore
|
||||
|
||||
|
||||
def new_entity(
|
||||
name: str,
|
||||
attributes=NULL_PTR,
|
||||
id=-1,
|
||||
data: Optional[list[Any]] = None,
|
||||
) -> SatEntity:
|
||||
"""Factory to create new ACIS entities.
|
||||
|
||||
Args:
|
||||
name: entity type
|
||||
attributes: reference to the entity attributes or :attr:`NULL_PTR`.
|
||||
id: unique entity id as integer or -1
|
||||
data: generic data container as list
|
||||
|
||||
"""
|
||||
e = SatEntity(name, "$-1", id, data)
|
||||
e.attributes = attributes
|
||||
return e
|
||||
|
||||
|
||||
def is_ptr(s: str) -> bool:
|
||||
"""Returns ``True`` if the string `s` represents an entity pointer."""
|
||||
return len(s) > 0 and s[0] == "$"
|
||||
|
||||
|
||||
def resolve_str_pointers(entities: list[SatEntity]) -> list[SatEntity]:
|
||||
def ptr(s: str) -> SatEntity:
|
||||
num = int(s[1:])
|
||||
if num == -1:
|
||||
return NULL_PTR
|
||||
return entities[num]
|
||||
|
||||
for entity in entities:
|
||||
entity.attributes = ptr(entity.attr_ptr)
|
||||
entity.attr_ptr = "$-1"
|
||||
data = []
|
||||
for token in entity.data:
|
||||
if is_ptr(token):
|
||||
data.append(ptr(token))
|
||||
else:
|
||||
data.append(token)
|
||||
entity.data = data
|
||||
return entities
|
||||
|
||||
|
||||
class SatDataLoader(DataLoader):
|
||||
def __init__(self, data: list[Any], version: int):
|
||||
self.version = version
|
||||
self.data = data
|
||||
self.index = 0
|
||||
|
||||
def has_data(self) -> bool:
|
||||
return self.index <= len(self.data)
|
||||
|
||||
def read_int(self, skip_sat: Optional[int] = None) -> int:
|
||||
if skip_sat is not None:
|
||||
return skip_sat
|
||||
|
||||
entry = self.data[self.index]
|
||||
try:
|
||||
value = int(entry)
|
||||
except ValueError:
|
||||
raise ParsingError(f"expected integer, got {entry}")
|
||||
self.index += 1
|
||||
return value
|
||||
|
||||
def read_double(self) -> float:
|
||||
entry = self.data[self.index]
|
||||
try:
|
||||
value = float(entry)
|
||||
except ValueError:
|
||||
raise ParsingError(f"expected double, got {entry}")
|
||||
self.index += 1
|
||||
return value
|
||||
|
||||
def read_interval(self) -> float:
|
||||
finite = self.read_bool("F", "I")
|
||||
if finite:
|
||||
return self.read_double()
|
||||
return math.inf
|
||||
|
||||
def read_vec3(self) -> tuple[float, float, float]:
|
||||
x = self.read_double()
|
||||
y = self.read_double()
|
||||
z = self.read_double()
|
||||
return x, y, z
|
||||
|
||||
def read_bool(self, true: str, false: str) -> bool:
|
||||
value = self.data[self.index]
|
||||
if value == true:
|
||||
self.index += 1
|
||||
return True
|
||||
elif value == false:
|
||||
self.index += 1
|
||||
return False
|
||||
raise ParsingError(
|
||||
f"expected bool string '{true}' or '{false}', got {value}"
|
||||
)
|
||||
|
||||
def read_str(self) -> str:
|
||||
value = self.data[self.index]
|
||||
if self.version < const.Features.AT or value.startswith("@"):
|
||||
self.index += 2
|
||||
return self.data[self.index - 1]
|
||||
raise ParsingError(f"expected string, got {value}")
|
||||
|
||||
def read_ptr(self) -> AbstractEntity:
|
||||
entity = self.data[self.index]
|
||||
if isinstance(entity, AbstractEntity):
|
||||
self.index += 1
|
||||
return entity
|
||||
raise ParsingError(f"expected pointer, got {type(entity)}")
|
||||
|
||||
def read_transform(self) -> list[float]:
|
||||
# 4th column is not stored
|
||||
# Read only the matrix values which contain all information needed,
|
||||
# the additional data are only hints for the kernel how to process
|
||||
# the data (rotation, reflection, scaling, shearing).
|
||||
return [self.read_double() for _ in range(12)]
|
||||
|
||||
|
||||
class SatBuilder(AbstractBuilder):
|
||||
"""Low level data structure to manage ACIS SAT data files."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.header = AcisHeader()
|
||||
self.bodies: list[SatEntity] = []
|
||||
self.entities: list[SatEntity] = []
|
||||
self._export_mapping: dict[int, SatEntity] = {}
|
||||
|
||||
def dump_sat(self) -> list[str]:
|
||||
"""Returns the text representation of the ACIS file as list of strings
|
||||
without line endings.
|
||||
|
||||
Raise:
|
||||
InvalidLinkStructure: referenced ACIS entity is not stored in
|
||||
the :attr:`entities` storage
|
||||
|
||||
"""
|
||||
self.reorder_records()
|
||||
self.header.n_entities = len(self.bodies) + int(
|
||||
self.header.has_asm_header
|
||||
)
|
||||
if self.header.version == 700:
|
||||
self.header.n_records = 0 # ignored for old versions
|
||||
else:
|
||||
self.header.n_records = len(self.entities)
|
||||
data = self.header.dumps()
|
||||
data.extend(build_str_records(self.entities, self.header.version))
|
||||
data.append(self.header.sat_end_marker())
|
||||
return data
|
||||
|
||||
def set_entities(self, entities: list[SatEntity]) -> None:
|
||||
"""Reset entities and bodies list. (internal API)"""
|
||||
self.bodies = [e for e in entities if e.name == "body"]
|
||||
self.entities = entities
|
||||
|
||||
|
||||
class SatExporter(EntityExporter[SatEntity]):
|
||||
def make_record(self, entity: AcisEntity) -> SatEntity:
|
||||
record = SatEntity(entity.type, id=entity.id)
|
||||
record.attributes = NULL_PTR
|
||||
return record
|
||||
|
||||
def make_data_exporter(self, record: SatEntity) -> DataExporter:
|
||||
return SatDataExporter(self, record.data)
|
||||
|
||||
def dump_sat(self) -> list[str]:
|
||||
builder = SatBuilder()
|
||||
builder.header = self.header
|
||||
builder.set_entities(self.export_records())
|
||||
return builder.dump_sat()
|
||||
|
||||
|
||||
def build_str_records(entities: list[SatEntity], version: int) -> Iterator[str]:
|
||||
def ptr_str(e: SatEntity) -> str:
|
||||
if e is NULL_PTR:
|
||||
return "$-1"
|
||||
try:
|
||||
return f"${entities.index(e)}"
|
||||
except ValueError:
|
||||
raise InvalidLinkStructure(f"entity {str(e)} not in record storage")
|
||||
|
||||
for entity in entities:
|
||||
tokens = [entity.name]
|
||||
tokens.append(ptr_str(entity.attributes))
|
||||
if version >= 700:
|
||||
tokens.append(str(entity.id))
|
||||
for data in entity.data:
|
||||
if isinstance(data, SatEntity):
|
||||
tokens.append(ptr_str(data))
|
||||
else:
|
||||
tokens.append(str(data))
|
||||
tokens.append("#")
|
||||
yield " ".join(tokens)
|
||||
|
||||
|
||||
def parse_header_str(s: str) -> Iterator[str]:
|
||||
num = ""
|
||||
collect = 0
|
||||
token = ""
|
||||
for c in s.rstrip():
|
||||
if collect > 0:
|
||||
token += c
|
||||
collect -= 1
|
||||
if collect == 0:
|
||||
yield token
|
||||
token = ""
|
||||
elif c == "@":
|
||||
continue
|
||||
elif c in "0123456789":
|
||||
num += c
|
||||
elif c == " " and num:
|
||||
collect = int(num)
|
||||
num = ""
|
||||
|
||||
|
||||
def parse_header(data: Sequence[str]) -> tuple[AcisHeader, Sequence[str]]:
|
||||
header = AcisHeader()
|
||||
tokens = data[0].split()
|
||||
header.version = int(tokens[0])
|
||||
try:
|
||||
header.n_records = int(tokens[1])
|
||||
header.n_entities = int(tokens[2])
|
||||
header.flags = int(tokens[3])
|
||||
except (IndexError, ValueError):
|
||||
pass
|
||||
tokens = list(parse_header_str(data[1]))
|
||||
try:
|
||||
header.product_id = tokens[0]
|
||||
header.acis_version = tokens[1]
|
||||
except IndexError:
|
||||
pass
|
||||
|
||||
if len(tokens) > 2:
|
||||
try: # Sat Jan 1 10:00:00 2022
|
||||
header.creation_date = datetime.strptime(tokens[2], const.DATE_FMT)
|
||||
except ValueError:
|
||||
pass
|
||||
tokens = data[2].split()
|
||||
try:
|
||||
header.units_in_mm = float(tokens[0])
|
||||
except (IndexError, ValueError):
|
||||
pass
|
||||
return header, data[3:]
|
||||
|
||||
|
||||
def _filter_records(data: Sequence[str]) -> Iterator[str]:
|
||||
for line in data:
|
||||
if line.startswith(const.END_OF_ACIS_DATA_SAT) or line.startswith(
|
||||
const.BEGIN_OF_ACIS_HISTORY_DATA
|
||||
):
|
||||
return
|
||||
yield line
|
||||
|
||||
|
||||
def merge_record_strings(data: Sequence[str]) -> Iterator[str]:
|
||||
merged_data = " ".join(_filter_records(data))
|
||||
for record in merged_data.split("#"):
|
||||
record = record.strip()
|
||||
if record:
|
||||
yield record
|
||||
|
||||
|
||||
def parse_records(data: Sequence[str]) -> list[SatRecord]:
|
||||
expected_seq_num = 0
|
||||
records: list[SatRecord] = []
|
||||
for line in merge_record_strings(data):
|
||||
tokens: SatRecord = line.split()
|
||||
first_token = tokens[0].strip()
|
||||
if first_token.startswith("-"):
|
||||
num = -int(first_token)
|
||||
if num != expected_seq_num:
|
||||
raise ParsingError(
|
||||
"non-continuous sequence numbers not supported"
|
||||
)
|
||||
tokens.pop(0)
|
||||
records.append(tokens)
|
||||
expected_seq_num += 1
|
||||
return records
|
||||
|
||||
|
||||
def build_entities(
|
||||
records: Sequence[SatRecord], version: int
|
||||
) -> list[SatEntity]:
|
||||
entities: list[SatEntity] = []
|
||||
for record in records:
|
||||
name = record[0]
|
||||
attr = record[1]
|
||||
id_ = -1
|
||||
if version >= 700:
|
||||
id_ = int(record[2])
|
||||
data = record[3:]
|
||||
else:
|
||||
data = record[2:]
|
||||
entities.append(SatEntity(name, attr, id_, data))
|
||||
return entities
|
||||
|
||||
|
||||
def parse_sat(s: Union[str, Sequence[str]]) -> SatBuilder:
|
||||
"""Returns the :class:`SatBuilder` for the ACIS :term:`SAT` file content
|
||||
given as string or list of strings.
|
||||
|
||||
Raises:
|
||||
ParsingError: invalid or unsupported ACIS data structure
|
||||
|
||||
"""
|
||||
data: Sequence[str]
|
||||
if isinstance(s, str):
|
||||
data = s.splitlines()
|
||||
else:
|
||||
data = s
|
||||
if not isinstance(data, Sequence):
|
||||
raise TypeError("expected as string or a sequence of strings")
|
||||
builder = SatBuilder()
|
||||
header, data = parse_header(data)
|
||||
builder.header = header
|
||||
records = parse_records(data)
|
||||
entities = build_entities(records, header.version)
|
||||
builder.set_entities(resolve_str_pointers(entities))
|
||||
return builder
|
||||
|
||||
|
||||
class SatDataExporter(DataExporter):
|
||||
def __init__(self, exporter: SatExporter, data: list[Any]):
|
||||
self.version = exporter.version
|
||||
self.exporter = exporter
|
||||
self.data = data
|
||||
|
||||
def write_int(self, value: int, skip_sat=False) -> None:
|
||||
"""There are sometimes additional int values in SAB files which are
|
||||
not present in SAT files, maybe reference counters e.g. vertex, coedge.
|
||||
"""
|
||||
if not skip_sat:
|
||||
self.data.append(str(value))
|
||||
|
||||
def write_double(self, value: float) -> None:
|
||||
self.data.append(f"{value:g}")
|
||||
|
||||
def write_interval(self, value: float) -> None:
|
||||
if math.isinf(value):
|
||||
self.data.append("I") # infinite
|
||||
else:
|
||||
self.data.append("F") # finite
|
||||
self.write_double(value)
|
||||
|
||||
def write_loc_vec3(self, value: Vec3) -> None:
|
||||
self.write_double(value.x)
|
||||
self.write_double(value.y)
|
||||
self.write_double(value.z)
|
||||
|
||||
def write_dir_vec3(self, value: Vec3) -> None:
|
||||
self.write_double(value.x)
|
||||
self.write_double(value.y)
|
||||
self.write_double(value.z)
|
||||
|
||||
def write_bool(self, value: bool, true: str, false: str) -> None:
|
||||
self.data.append(true if value else false)
|
||||
|
||||
def write_str(self, value: str) -> None:
|
||||
self.data.append(f"@{len(value)}")
|
||||
self.data.append(str(value))
|
||||
|
||||
def write_literal_str(self, value: str) -> None:
|
||||
self.write_str(value) # just for SAB files important
|
||||
|
||||
def write_ptr(self, entity: AcisEntity) -> None:
|
||||
record = NULL_PTR
|
||||
if not entity.is_none:
|
||||
record = self.exporter.get_record(entity)
|
||||
self.data.append(record)
|
||||
|
||||
def write_transform(self, data: list[str]) -> None:
|
||||
self.data.extend(data)
|
||||
Reference in New Issue
Block a user