diff --git a/data/deploymentview.dv.xml b/data/deploymentview.dv.xml
new file mode 100755
index 0000000..5f0e890
--- /dev/null
+++ b/data/deploymentview.dv.xml
@@ -0,0 +1,45 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/templateprocessor/dv.py b/templateprocessor/dv.py
new file mode 100644
index 0000000..3e41826
--- /dev/null
+++ b/templateprocessor/dv.py
@@ -0,0 +1,131 @@
+"""
+TASTE Deployment View (DV) data model classes.
+
+This module provides Python classes that reflect the schema/structure of
+TASTE Deployment View XML files, allowing for parsing, manipulation, and
+generation of DV data.
+"""
+
+from dataclasses import dataclass, field
+from typing import List
+
+
+@dataclass
+class DeploymentFunction:
+ """Function deployed to a partition."""
+
+ id: str
+ name: str
+ path: str
+
+
+@dataclass
+class Partition:
+ """
+ Partition within a node.
+
+ A partition represents an execution context that can host one or more
+ functions (software components).
+ """
+
+ id: str
+ name: str
+ functions: List[DeploymentFunction] = field(default_factory=list)
+
+
+@dataclass
+class Device:
+ """
+ Hardware device attached to a node.
+
+ Devices represent connection hardpoints (e.g., UART, TCP/UDP ports) that
+ provide bus access for communication.
+ """
+
+ id: str
+ name: str
+ requires_bus_access: str
+ port: str
+ asn1file: str
+ asn1type: str
+ asn1module: str
+ namespace: str
+ extends: str
+ impl_extends: str
+ bus_namespace: str
+ requirement_ids: List[str] = field(default_factory=list)
+
+
+@dataclass
+class Node:
+ """
+ Deployment node (processor/platform).
+
+ A node represents a physical or virtual hardware platform that can host
+ partitions and devices. Examples include embedded processors, Linux systems,
+ or other execution platforms.
+ """
+
+ id: str
+ name: str
+ type: str
+ node_label: str
+ namespace: str
+ partitions: List[Partition] = field(default_factory=list)
+ devices: List[Device] = field(default_factory=list)
+ requirement_ids: List[str] = field(default_factory=list)
+
+
+@dataclass
+class Message:
+ """
+ Message routed through a connection.
+
+ Represents data flow from one function's interface to another function's
+ interface over a physical connection.
+ """
+
+ id: str
+ name: str
+ from_function: str
+ from_interface: str
+ to_function: str
+ to_interface: str
+
+
+@dataclass
+class Connection:
+ """
+ Physical connection between nodes.
+
+ Represents a communication link between devices on different nodes,
+ potentially through a bus. Contains the messages that are routed
+ through this connection.
+ """
+
+ id: str
+ name: str
+ from_node: str
+ from_port: str
+ to_bus: str
+ to_node: str
+ to_port: str
+ messages: List[Message] = field(default_factory=list)
+
+
+@dataclass
+class DeploymentView:
+ """
+ Root element representing a TASTE Deployment View.
+
+ This is the main data structure that contains all nodes, connections,
+ and other elements that define how a TASTE system is deployed to
+ physical/virtual hardware.
+ """
+
+ version: str = ""
+ ui_file: str = ""
+ creator_hash: str = ""
+ modifier_hash: str = ""
+ nodes: List[Node] = field(default_factory=list)
+ connections: List[Connection] = field(default_factory=list)
diff --git a/templateprocessor/dvreader.py b/templateprocessor/dvreader.py
new file mode 100644
index 0000000..2b07252
--- /dev/null
+++ b/templateprocessor/dvreader.py
@@ -0,0 +1,201 @@
+"""
+TASTE Deployment View XML Reader.
+
+This module provides functionality to parse TASTE Deployment View XML files
+and construct DeploymentView data model instances.
+"""
+
+import xml.etree.ElementTree as ET
+from pathlib import Path
+from typing import Union
+
+from templateprocessor.dv import (
+ DeploymentView,
+ Node,
+ Partition,
+ DeploymentFunction,
+ Device,
+ Connection,
+ Message,
+)
+
+
+class DVReader:
+ """
+ Reader for TASTE Deployment View XML files.
+
+ Parses XML files conforming to the TASTE Deployment View schema and
+ constructs corresponding DeploymentView objects.
+
+ Example:
+ reader = DVReader()
+ deployment_view = reader.read("deploymentview.dv.xml")
+ """
+
+ def read(self, file_path: Union[str, Path]) -> DeploymentView:
+ """
+ Read and parse a TASTE Deployment View XML file.
+
+ Args:
+ file_path: Path to the DV XML file
+
+ Returns:
+ DeploymentView object populated with parsed data
+
+ Raises:
+ FileNotFoundError: If the file does not exist
+ xml.etree.ElementTree.ParseError: If XML is malformed
+ """
+ file_path = Path(file_path)
+ if not file_path.exists():
+ raise FileNotFoundError(f"Deployment View file not found: {file_path}")
+
+ tree = ET.parse(file_path)
+ root = tree.getroot()
+
+ return self._parse_deployment_view(root)
+
+ def read_string(self, xml_content: str) -> DeploymentView:
+ """
+ Read and parse TASTE Deployment View XML from a string.
+
+ Args:
+ xml_content: XML content as string
+
+ Returns:
+ DeploymentView object populated with parsed data
+
+ Raises:
+ xml.etree.ElementTree.ParseError: If XML is malformed
+ """
+ root = ET.fromstring(xml_content)
+ return self._parse_deployment_view(root)
+
+ def _parse_deployment_view(self, root: ET.Element) -> DeploymentView:
+ """Parse the root DeploymentView element."""
+ dv = DeploymentView(
+ version=root.get("version", ""),
+ ui_file=root.get("UiFile", ""),
+ creator_hash=root.get("creatorHash", ""),
+ modifier_hash=root.get("modifierHash", ""),
+ )
+
+ # Parse all Node elements
+ for node_elem in root.findall("Node"):
+ node = self._parse_node(node_elem)
+ dv.nodes.append(node)
+
+ # Parse all Connection elements
+ for conn_elem in root.findall("Connection"):
+ connection = self._parse_connection(conn_elem)
+ dv.connections.append(connection)
+
+ return dv
+
+ def _parse_node(self, elem: ET.Element) -> Node:
+ """Parse a Node element."""
+ # Parse requirement_ids if present
+ requirement_ids = []
+ req_ids_str = elem.get("requirement_ids", "")
+ if req_ids_str:
+ requirement_ids = [
+ rid.strip() for rid in req_ids_str.split(",") if rid.strip()
+ ]
+
+ node = Node(
+ id=elem.get("id", ""),
+ name=elem.get("name", ""),
+ type=elem.get("type", ""),
+ node_label=elem.get("node_label", ""),
+ namespace=elem.get("namespace", ""),
+ requirement_ids=requirement_ids,
+ )
+
+ # Parse partitions
+ for partition_elem in elem.findall("Partition"):
+ partition = self._parse_partition(partition_elem)
+ node.partitions.append(partition)
+
+ # Parse devices
+ for device_elem in elem.findall("Device"):
+ device = self._parse_device(device_elem)
+ node.devices.append(device)
+
+ return node
+
+ def _parse_partition(self, elem: ET.Element) -> Partition:
+ """Parse a Partition element."""
+ partition = Partition(
+ id=elem.get("id", ""),
+ name=elem.get("name", ""),
+ )
+
+ # Parse functions
+ for func_elem in elem.findall("Function"):
+ function = self._parse_deployment_function(func_elem)
+ partition.functions.append(function)
+
+ return partition
+
+ def _parse_deployment_function(self, elem: ET.Element) -> DeploymentFunction:
+ """Parse a Function element within a Partition."""
+ return DeploymentFunction(
+ id=elem.get("id", ""),
+ name=elem.get("name", ""),
+ path=elem.get("path", ""),
+ )
+
+ def _parse_device(self, elem: ET.Element) -> Device:
+ """Parse a Device element."""
+ # Parse requirement_ids if present
+ requirement_ids = []
+ req_ids_str = elem.get("requirement_ids", "")
+ if req_ids_str:
+ requirement_ids = [
+ rid.strip() for rid in req_ids_str.split(",") if rid.strip()
+ ]
+
+ return Device(
+ id=elem.get("id", ""),
+ name=elem.get("name", ""),
+ requires_bus_access=elem.get("requires_bus_access", ""),
+ port=elem.get("port", ""),
+ asn1file=elem.get("asn1file", ""),
+ asn1type=elem.get("asn1type", ""),
+ asn1module=elem.get("asn1module", ""),
+ namespace=elem.get("namespace", ""),
+ extends=elem.get("extends", ""),
+ impl_extends=elem.get("impl_extends", ""),
+ bus_namespace=elem.get("bus_namespace", ""),
+ requirement_ids=requirement_ids,
+ )
+
+ def _parse_connection(self, elem: ET.Element) -> Connection:
+ """Parse a Connection element."""
+ connection = Connection(
+ id=elem.get("id", ""),
+ name=elem.get("name", ""),
+ from_node=elem.get("from_node", ""),
+ from_port=elem.get("from_port", ""),
+ to_bus=elem.get("to_bus", ""),
+ to_node=elem.get("to_node", ""),
+ to_port=elem.get("to_port", ""),
+ )
+
+ # Parse messages
+ for msg_elem in elem.findall("Message"):
+ message = self._parse_message(msg_elem)
+ connection.messages.append(message)
+
+ return connection
+
+ def _parse_message(self, elem: ET.Element) -> Message:
+ """Parse a Message element."""
+ return Message(
+ id=elem.get("id", ""),
+ name=elem.get("name", ""),
+ from_function=elem.get("from_function", ""),
+ from_interface=elem.get("from_interface", ""),
+ to_function=elem.get("to_function", ""),
+ to_interface=elem.get("to_interface", ""),
+ )
diff --git a/tests/Makefile b/tests/Makefile
index f599055..bd4d2ba 100644
--- a/tests/Makefile
+++ b/tests/Makefile
@@ -3,6 +3,7 @@ PYTHON ?= python3
TESTS = \
test_ivreader.py \
+ test_dvreader.py \
test_soreader.py
.PHONY: \
diff --git a/tests/test_dvreader.py b/tests/test_dvreader.py
new file mode 100644
index 0000000..47e0e23
--- /dev/null
+++ b/tests/test_dvreader.py
@@ -0,0 +1,287 @@
+"""
+Tests for DVReader class
+"""
+
+import pytest
+from pathlib import Path
+from templateprocessor.dvreader import DVReader
+from templateprocessor.dv import (
+ DeploymentView,
+)
+
+
+class TestDVReader:
+ """Test cases for DVReader class."""
+
+ # Assuming the data directory is at the workspace root
+ @staticmethod
+ def get_test_data_file(filename: str) -> Path:
+ """Get the path to a test data file."""
+ return Path(__file__).parent.parent / "data" / filename
+
+ def test_read_deploymentview_xml(self):
+ """Test reading the deploymentview.dv.xml file."""
+ # Prepare
+ reader = DVReader()
+ dv_file = self.get_test_data_file("deploymentview.dv.xml")
+ assert dv_file.exists()
+
+ # Read
+ dv = reader.read(dv_file)
+
+ # Verify basic attributes
+ assert isinstance(dv, DeploymentView)
+ assert dv.version == "1.2"
+ assert dv.ui_file == "deploymentview.ui.xml"
+ assert dv.creator_hash == "383508e"
+ assert dv.modifier_hash == "383508e"
+
+ # Verify nodes were parsed
+ assert len(dv.nodes) == 2
+
+ def test_read_nodes(self):
+ """Test that nodes are correctly parsed."""
+ # Prepare
+ reader = DVReader()
+ dv_file = self.get_test_data_file("deploymentview.dv.xml")
+ assert dv_file.exists()
+
+ # Read
+ dv = reader.read(dv_file)
+
+ # Check node names
+ node_names = [n.name for n in dv.nodes]
+ assert "SAM V71 RTEMS N7S_1" in node_names
+ assert "x86 Linux C++_1" in node_names
+
+ # Find the SAM V71 node
+ sam_node = next((n for n in dv.nodes if n.name == "SAM V71 RTEMS N7S_1"), None)
+ assert sam_node is not None
+ assert sam_node.type == "ocarina_processors_arm::samv71.rtems"
+ assert sam_node.node_label == "Node_2"
+ assert sam_node.namespace == "ocarina_processors_arm"
+
+ # Find the x86 Linux node
+ linux_node = next((n for n in dv.nodes if n.name == "x86 Linux C++_1"), None)
+ assert linux_node is not None
+ assert linux_node.type == "ocarina_processors_x86::x86.generic_linux"
+ assert linux_node.node_label == "Node_1"
+ assert linux_node.namespace == "ocarina_processors_x86"
+
+ def test_read_node_requirement_ids(self):
+ """Test parsing node requirement IDs."""
+ # Prepare
+ reader = DVReader()
+ dv_file = self.get_test_data_file("deploymentview.dv.xml")
+ assert dv_file.exists()
+
+ # Read
+ dv = reader.read(dv_file)
+
+ # Find the x86 Linux node with requirement_ids
+ linux_node = next((n for n in dv.nodes if n.name == "x86 Linux C++_1"), None)
+ assert linux_node is not None
+ assert len(linux_node.requirement_ids) == 2
+ assert "r20" in linux_node.requirement_ids
+ assert "r21" in linux_node.requirement_ids
+
+ # SAM node should have no requirement_ids
+ sam_node = next((n for n in dv.nodes if n.name == "SAM V71 RTEMS N7S_1"), None)
+ assert sam_node is not None
+ assert len(sam_node.requirement_ids) == 0
+
+ def test_read_partitions(self):
+ """Test that partitions are correctly parsed."""
+ # Prepare
+ reader = DVReader()
+ dv_file = self.get_test_data_file("deploymentview.dv.xml")
+ assert dv_file.exists()
+
+ # Read
+ dv = reader.read(dv_file)
+
+ # Find the SAM V71 node
+ sam_node = next((n for n in dv.nodes if n.name == "SAM V71 RTEMS N7S_1"), None)
+ assert sam_node is not None
+
+ # Verify partitions
+ assert len(sam_node.partitions) == 1
+ partition = sam_node.partitions[0]
+ assert partition.name == "ASW"
+ assert partition.id == "{6d924f84-5366-47d0-8a89-56a2614f6813}"
+
+ def test_read_partition_functions(self):
+ """Test that partition functions are correctly parsed."""
+ # Prepare
+ reader = DVReader()
+ dv_file = self.get_test_data_file("deploymentview.dv.xml")
+ assert dv_file.exists()
+
+ # Read
+ dv = reader.read(dv_file)
+
+ # Find the SAM V71 node
+ sam_node = next((n for n in dv.nodes if n.name == "SAM V71 RTEMS N7S_1"), None)
+ assert sam_node is not None
+
+ # Get the ASW partition
+ partition = sam_node.partitions[0]
+ assert len(partition.functions) == 2
+
+ # Check function names
+ func_names = [f.name for f in partition.functions]
+ assert "Frontend" in func_names
+ assert "Backend" in func_names
+
+ # Check function details
+ frontend = next((f for f in partition.functions if f.name == "Frontend"), None)
+ assert frontend is not None
+ assert frontend.path == "Frontend"
+ assert frontend.id == "{81aa583e-d1d0-47bb-ae8b-de3323dac654}"
+
+ def test_read_devices(self):
+ """Test that devices are correctly parsed."""
+ # Prepare
+ reader = DVReader()
+ dv_file = self.get_test_data_file("deploymentview.dv.xml")
+ assert dv_file.exists()
+
+ # Read
+ dv = reader.read(dv_file)
+
+ # Find the SAM V71 node
+ sam_node = next((n for n in dv.nodes if n.name == "SAM V71 RTEMS N7S_1"), None)
+ assert sam_node is not None
+
+ # Verify devices
+ assert len(sam_node.devices) == 5
+
+ # Check device names
+ device_names = [d.name for d in sam_node.devices]
+ assert "uart0" in device_names
+ assert "uart1" in device_names
+ assert "uart2" in device_names
+
+ # Check specific device details
+ uart0 = next((d for d in sam_node.devices if d.name == "uart0"), None)
+ assert uart0 is not None
+ assert uart0.port == "uart0"
+ assert uart0.requires_bus_access == "ocarina_buses::serial.ccsds"
+ assert uart0.namespace == "ocarina_drivers"
+ assert uart0.extends == "ocarina_drivers::serial_ccsds"
+ assert uart0.impl_extends == "ocarina_drivers::serial_ccsds.samv71_rtems"
+ assert uart0.asn1type == "Serial-SamV71-Rtems-Conf-T"
+ assert uart0.asn1module == "SAMV71-RTEMS-SERIAL-DRIVER"
+
+ def test_read_device_requirement_ids(self):
+ """Test parsing device requirement IDs."""
+ # Prepare
+ reader = DVReader()
+ dv_file = self.get_test_data_file("deploymentview.dv.xml")
+ assert dv_file.exists()
+
+ # Read
+ dv = reader.read(dv_file)
+
+ # Find the SAM V71 node
+ sam_node = next((n for n in dv.nodes if n.name == "SAM V71 RTEMS N7S_1"), None)
+ assert sam_node is not None
+
+ # Find uart0 device with requirement_ids
+ uart0 = next((d for d in sam_node.devices if d.name == "uart0"), None)
+ assert uart0 is not None
+ assert len(uart0.requirement_ids) == 1
+ assert "r10" in uart0.requirement_ids
+
+ # Other devices should have no requirement_ids
+ uart1 = next((d for d in sam_node.devices if d.name == "uart1"), None)
+ assert uart1 is not None
+ assert len(uart1.requirement_ids) == 0
+
+ def test_read_connections(self):
+ """Test that connections are correctly parsed."""
+ # Prepare
+ reader = DVReader()
+ dv_file = self.get_test_data_file("deploymentview.dv.xml")
+ assert dv_file.exists()
+
+ # Read
+ dv = reader.read(dv_file)
+
+ # Verify connections were parsed
+ assert len(dv.connections) == 1
+
+ # Check connection details
+ connection = dv.connections[0]
+ assert connection.name == "Connection_1"
+ assert connection.from_node == "x86 Linux C++_1"
+ assert connection.from_port == "uart0"
+ assert connection.to_bus == "ocarina_buses::serial.ccsds"
+ assert connection.to_node == "SAM V71 RTEMS N7S_1"
+ assert connection.to_port == "uart0"
+
+ def test_read_messages(self):
+ """Test that messages in connections are correctly parsed."""
+ # Prepare
+ reader = DVReader()
+ dv_file = self.get_test_data_file("deploymentview.dv.xml")
+ assert dv_file.exists()
+
+ # Read
+ dv = reader.read(dv_file)
+
+ # Get the connection
+ connection = dv.connections[0]
+
+ # Verify messages
+ assert len(connection.messages) == 2
+
+ # Check message details
+ msg_names = [m.name for m in connection.messages]
+ assert "Message_1" in msg_names
+ assert "Message_2" in msg_names
+
+ # Check Message_1 details
+ msg1 = next((m for m in connection.messages if m.name == "Message_1"), None)
+ assert msg1 is not None
+ assert msg1.from_function == "EGSE"
+ assert msg1.from_interface == "tc"
+ assert msg1.to_function == "Frontend"
+ assert msg1.to_interface == "tc"
+
+ # Check Message_2 details
+ msg2 = next((m for m in connection.messages if m.name == "Message_2"), None)
+ assert msg2 is not None
+ assert msg2.from_function == "Frontend"
+ assert msg2.from_interface == "tm"
+ assert msg2.to_function == "EGSE"
+ assert msg2.to_interface == "tm"
+
+ def test_read_string(self):
+ """Test reading from XML string."""
+ reader = DVReader()
+
+ xml_content = """
+
+
+
+
+
+
+"""
+
+ dv = reader.read_string(xml_content)
+
+ assert dv.version == "1.0"
+ assert dv.ui_file == "test.ui.xml"
+ assert len(dv.nodes) == 1
+ assert dv.nodes[0].name == "test_node"
+ assert len(dv.nodes[0].partitions) == 1
+ assert dv.nodes[0].partitions[0].name == "test_partition"
+
+ def test_file_not_found(self):
+ """Test that FileNotFoundError is raised for non-existent files."""
+ reader = DVReader()
+
+ with pytest.raises(FileNotFoundError):
+ reader.read("nonexistent_file.dv.xml")