「‍」 Lingenic

hsv

(⤓.py ◇.py); γ ≜ [2026-01-27T072638.882, 2026-01-27T072638.882] ∧ |γ| = 1

"""
HSV - Hierarchical Separated Values

A text-based file format and streaming protocol using ASCII control characters.
Unlimited nesting (like JSON). No escaping required. Binary data supported.

Copyright 2026 Danslav Slavenskoj, Lingenic LLC
License: CC0 1.0 - Public Domain
https://creativecommons.org/publicdomain/zero/1.0/
You may use this code for any purpose without attribution.

Spec: https://hsvfile.com
Repo: https://github.com/LingenicLLC/HSV

Features:
- SOH headers
- STX/ETX framing
- SO/SI nesting (unlimited depth)
- DLE binary mode
- FS/GS/RS/US structure

Example:
    >>> import hsv
    >>> doc = hsv.parse("\\x02name\\x1fAlice\\x1eage\\x1f30\\x03")
    >>> doc['records']
    [{'name': 'Alice', 'age': '30'}]
"""

from typing import Any, Dict, List, Optional, Tuple, Union

# Control characters
SOH = '\x01'  # Start of Header
STX = '\x02'  # Start of Text (data block)
ETX = '\x03'  # End of Text
EOT = '\x04'  # End of Transmission
ENQ = '\x05'  # Enquiry
ACK = '\x06'  # Acknowledge
SO = '\x0e'   # Shift Out (start nested)
SI = '\x0f'   # Shift In (end nested)
DLE = '\x10'  # Data Link Escape (binary mode)
XON = '\x11'  # Resume transmission
XOFF = '\x13' # Pause transmission
NAK = '\x15'  # Negative acknowledge
SYN = '\x16'  # Sync/keepalive
CAN = '\x18'  # Cancel
FS = '\x1c'   # File/Record Separator
GS = '\x1d'   # Group/Array Separator
RS = '\x1e'   # Record/Property Separator
US = '\x1f'   # Unit/Key-Value Separator

# Type aliases
HsvValue = Union[str, List['HsvValue'], Dict[str, 'HsvValue']]
HsvObject = Dict[str, HsvValue]
HsvDocument = Dict[str, Any]


def _unescape_binary(data: str) -> str:
    """Handle DLE escaping: DLE+DLE -> DLE"""
    result = []
    i = 0
    while i < len(data):
        if data[i] == DLE and i + 1 < len(data):
            if data[i + 1] == DLE:
                result.append(DLE)
                i += 2
                continue
        result.append(data[i])
        i += 1
    return ''.join(result)


def _extract_binary_sections(text: str) -> Tuple[str, Dict[str, str]]:
    """
    Extract DLE+STX...DLE+ETX binary sections and replace with placeholders.
    Returns (text_with_placeholders, {placeholder: binary_data})
    """
    result = []
    binaries = {}
    i = 0
    placeholder_count = 0

    while i < len(text):
        # Check for DLE+STX (binary start)
        if text[i] == DLE and i + 1 < len(text) and text[i + 1] == STX:
            # Find matching DLE+ETX
            binary_start = i + 2
            j = binary_start
            binary_data = []

            while j < len(text):
                if text[j] == DLE and j + 1 < len(text):
                    if text[j + 1] == ETX:
                        # End of binary section
                        placeholder = f'\x00BINARY{placeholder_count}\x00'
                        binaries[placeholder] = _unescape_binary(''.join(binary_data))
                        result.append(placeholder)
                        placeholder_count += 1
                        i = j + 2
                        break
                    elif text[j + 1] == DLE:
                        # Escaped DLE
                        binary_data.append(DLE)
                        j += 2
                        continue
                binary_data.append(text[j])
                j += 1
            else:
                # No closing DLE+ETX found, treat as literal
                result.append(text[i])
                i += 1
        else:
            result.append(text[i])
            i += 1

    return ''.join(result), binaries


def _restore_binaries(value: str, binaries: Dict[str, str]) -> str:
    """Replace binary placeholders with actual binary data"""
    for placeholder, data in binaries.items():
        value = value.replace(placeholder, data)
    return value


def _split_respecting_nesting(text: str, sep: str) -> List[str]:
    """Split by separator, but respect SO/SI nesting depth"""
    parts = []
    current = []
    depth = 0
    i = 0

    while i < len(text):
        if text[i] == SO:
            depth += 1
            current.append(text[i])
        elif text[i] == SI:
            depth -= 1
            current.append(text[i])
        elif text[i] == sep and depth == 0:
            parts.append(''.join(current))
            current = []
        else:
            current.append(text[i])
        i += 1

    if current:
        parts.append(''.join(current))

    return parts


def _parse_value(value: str, binaries: Dict[str, str]) -> HsvValue:
    """Parse a value, handling arrays (GS) and nested structures (SO/SI)"""
    value = _restore_binaries(value, binaries)

    # Check for nested structure (SO at start, SI at end)
    if value.startswith(SO) and value.endswith(SI):
        inner = value[1:-1]
        return _parse_object(inner, binaries)

    # Check for array (respect nesting)
    if GS in value:
        parts = _split_respecting_nesting(value, GS)
        # Recursively parse each array element
        return [_parse_value(p, binaries) for p in parts]

    return value


def _parse_object(content: str, binaries: Dict[str, str]) -> HsvObject:
    """Parse an object from RS-separated properties"""
    obj = {}

    # Split by RS, respecting SO/SI nesting
    props = _split_respecting_nesting(content, RS)

    for prop in props:
        # Split by US, respecting SO/SI nesting
        parts = _split_respecting_nesting(prop, US)
        if len(parts) >= 2:
            k = parts[0]
            v = US.join(parts[1:])  # Handle multiple US in value
            obj[k] = _parse_value(v, binaries)

    return obj


def parse(text: str) -> HsvDocument:
    """
    Parse HSV text into structured data.

    Args:
        text: The HSV-encoded text to parse

    Returns:
        A dictionary with:
            - 'header': dict or None (content between SOH and STX)
            - 'records': list of dicts (content from data blocks)

    Example:
        >>> doc = parse("\\x02name\\x1fAlice\\x1eage\\x1f30\\x03")
        >>> doc['records']
        [{'name': 'Alice', 'age': '30'}]
    """
    # First, extract binary sections
    text, binaries = _extract_binary_sections(text)

    result: HsvDocument = {
        'header': None,
        'records': []
    }

    # Find all message blocks (SOH...ETX or STX...ETX)
    i = 0
    while i < len(text):
        # Check for SOH (header start)
        if text[i] == SOH:
            # Find STX (header end, data start)
            stx_pos = text.find(STX, i + 1)
            if stx_pos == -1:
                i += 1
                continue

            # Parse header
            header_content = text[i + 1:stx_pos]
            result['header'] = _parse_object(header_content, binaries)

            # Find ETX (data end)
            etx_pos = text.find(ETX, stx_pos + 1)
            if etx_pos == -1:
                i = stx_pos + 1
                continue

            # Parse data block
            data_content = text[stx_pos + 1:etx_pos]
            # Split by FS respecting nesting
            for record in _split_respecting_nesting(data_content, FS):
                obj = _parse_object(record, binaries)
                if obj:
                    result['records'].append(obj)

            i = etx_pos + 1

        # Check for STX (data block without header)
        elif text[i] == STX:
            etx_pos = text.find(ETX, i + 1)
            if etx_pos == -1:
                i += 1
                continue

            data_content = text[i + 1:etx_pos]
            # Split by FS respecting nesting
            for record in _split_respecting_nesting(data_content, FS):
                obj = _parse_object(record, binaries)
                if obj:
                    result['records'].append(obj)

            i = etx_pos + 1
        else:
            i += 1

    return result


# Convenience functions

def parse_records(text: str) -> List[HsvObject]:
    """Parse HSV text and return only the records (ignoring header)."""
    return parse(text)['records']


def parse_header(text: str) -> Optional[HsvObject]:
    """Parse HSV text and return only the header (ignoring records)."""
    return parse(text)['header']


if __name__ == '__main__':
    # Run tests when executed directly
    import sys

    def test_basic():
        test = f"{STX}name{US}Alice{RS}age{US}30{ETX}"
        result = parse(test)
        assert result['records'] == [{'name': 'Alice', 'age': '30'}]
        print("✓ Basic parsing")

    def test_multiple_records():
        test = f"{STX}name{US}Alice{FS}name{US}Bob{ETX}"
        result = parse(test)
        assert result['records'] == [{'name': 'Alice'}, {'name': 'Bob'}]
        print("✓ Multiple records")

    def test_arrays():
        test = f"{STX}tags{US}a{GS}b{GS}c{ETX}"
        result = parse(test)
        assert result['records'] == [{'tags': ['a', 'b', 'c']}]
        print("✓ Array values")

    def test_header():
        test = f"{SOH}hsv{US}1.0{RS}type{US}users{STX}name{US}Alice{ETX}"
        result = parse(test)
        assert result['header'] == {'hsv': '1.0', 'type': 'users'}
        assert result['records'] == [{'name': 'Alice'}]
        print("✓ SOH header")

    def test_nesting():
        test = f"{STX}user{US}{SO}name{US}Alice{RS}email{US}a@b.com{SI}{ETX}"
        result = parse(test)
        assert result['records'] == [{'user': {'name': 'Alice', 'email': 'a@b.com'}}]
        print("✓ SO/SI nesting")

    def test_deep_nesting():
        test = f"{STX}data{US}{SO}level1{US}{SO}level2{US}deep{SI}{SI}{ETX}"
        result = parse(test)
        assert result['records'] == [{'data': {'level1': {'level2': 'deep'}}}]
        print("✓ Deep nesting")

    def test_binary_mode():
        binary_data = f"raw{STX}data{ETX}here"
        test = f"{STX}type{US}image{RS}data{US}{DLE}{STX}{binary_data}{DLE}{ETX}{ETX}"
        result = parse(test)
        assert result['records'][0]['type'] == 'image'
        assert result['records'][0]['data'] == binary_data
        print("✓ DLE binary mode")

    def test_binary_with_dle():
        binary_data = f"has{DLE}dle"
        escaped = binary_data.replace(DLE, DLE + DLE)
        test = f"{STX}data{US}{DLE}{STX}{escaped}{DLE}{ETX}{ETX}"
        result = parse(test)
        assert result['records'][0]['data'] == binary_data
        print("✓ DLE escaping")

    def test_newlines():
        test = f"{STX}text{US}line1\nline2\nline3{ETX}"
        result = parse(test)
        assert result['records'] == [{'text': 'line1\nline2\nline3'}]
        print("✓ Newlines in values")

    def test_quotes():
        test = f'{STX}msg{US}He said "hello"{ETX}'
        result = parse(test)
        assert result['records'] == [{'msg': 'He said "hello"'}]
        print("✓ Quotes (no escaping)")

    def test_mixed_content():
        test = f"ignored{STX}name{US}Alice{ETX}also ignored"
        result = parse(test)
        assert result['records'] == [{'name': 'Alice'}]
        print("✓ Mixed content (ignores outside STX...ETX)")

    def test_multiple_blocks():
        test = f"{STX}a{US}1{ETX}junk{STX}b{US}2{ETX}"
        result = parse(test)
        assert result['records'] == [{'a': '1'}, {'b': '2'}]
        print("✓ Multiple blocks")

    def test_nested_array():
        test = f"{STX}user{US}{SO}name{US}Alice{RS}tags{US}admin{GS}user{SI}{ETX}"
        result = parse(test)
        assert result['records'] == [{'user': {'name': 'Alice', 'tags': ['admin', 'user']}}]
        print("✓ Nested structure with array")

    def test_complex():
        test = (
            f"{SOH}hsv{US}1.0{RS}type{US}complex{STX}"
            f"user{US}{SO}name{US}Alice{RS}tags{US}admin{GS}active{SI}{FS}"
            f"user{US}{SO}name{US}Bob{RS}tags{US}user{SI}"
            f"{ETX}"
        )
        result = parse(test)
        assert result['header'] == {'hsv': '1.0', 'type': 'complex'}
        assert len(result['records']) == 2
        assert result['records'][0]['user']['name'] == 'Alice'
        assert result['records'][0]['user']['tags'] == ['admin', 'active']
        assert result['records'][1]['user']['name'] == 'Bob'
        print("✓ Complex combination")

    print("=" * 50)
    print("HSV Parser Tests (Python)")
    print("=" * 50)

    test_basic()
    test_multiple_records()
    test_arrays()
    test_header()
    test_nesting()
    test_deep_nesting()
    test_binary_mode()
    test_binary_with_dle()
    test_newlines()
    test_quotes()
    test_mixed_content()
    test_multiple_blocks()
    test_nested_array()
    test_complex()

    print("=" * 50)
    print("All tests passed!")
    print("=" * 50)