Skip to content

Core Engineering Components

The core module contains the foundational engineering infrastructure that powers PVGIS. While algorithms implement solar science and the core API and [CLI][cli.md] provide user-facing interfaces, the core module handles essential cross-cutting concerns like data model generation, caching, hashing, and context management.

What's in Core?

The core module includes:

  • Data Model Factory: Dynamic generation of Pydantic models from YAML definitions
  • Context Builder: Structured output generation and verbosity management
  • Caching System: Performance optimization through memoization
  • Hashing Utilities: Data fingerprinting for reproducibility and cache keys
  • Type Definitions: Shared type hints and validation schemas

These components operate behind the scenes, ensuring type safety, performance, and maintainability across the entire PVGIS codebase.

Design Philosophy

Core components follow these principles:

  • Separation of concerns: Scientific domain logic (algorithms) stays independent from the infrastructure (core) and interfaces (API, CLI, Web API)
  • Type safety: Pydantic validation catches errors before calculations run
  • Performance: Caching and efficient data structures minimize redundant computation
  • Extensibility: Factory patterns enable adding new models without code duplication

Key Components

Data Model Factory

Transforms YAML definitions into runtime Pydantic classes, enabling:

  • Centralized model definitions maintained by domain experts
  • Automatic validation of calculation inputs and outputs
  • Consistent structure across API, CLI, and Web API interfaces

See Data Model for detailed documentation.

Context Builder

Manages output generation based on verbosity levels and user requirements:

  • Reads output structure definitions from YAML
  • Evaluates conditional sections (e.g., metadata only at high verbosity)
  • Constructs nested dictionaries ready for JSON/CSV/terminal output

Caching and Hashing

Optimizes repeated calculations through:

  • Content-based caching: Hash inputs to detect identical calculations
  • Memory management: Configurable cache sizes and eviction policies
  • Reproducibility: Fingerprints enable tracking data provenance

Usage Pattern

Core components are typically not imported directly by users. Instead, they're used internally by API functions:

User calls API function

from pvgisprototype.api import calculate_solar_position
result = calculate_solar_position(lat=45.0, lon=8.0, timestamp="2025-01-01")

Behind the scenes:

  1. DataModelFactory creates SolarPosition model
  2. Calculation runs and validates output
  3. ContextBuilder generates structured result
  4. Caching stores result for future identical requests

Source Code Reference

core

Modules:

Name Description
array_methods
arrays
caching
data_model
factory
hashing

array_methods

Functions:

Name Description
create_array_method

Helper function to create an instance with an empty array

fill_array_method

Helper function to create an instance with an empty array

create_array_method

create_array_method(
    self,
    shape,
    dtype: str = DATA_TYPE_DEFAULT,
    init_method: bool | int | float | str = "zeros",
    backend: str = "numpy",
    use_gpu: bool = False,
) -> Any

Helper function to create an instance with an empty array

Source code in pvgisprototype/core/array_methods.py
def create_array_method(
    self,
    shape,
    dtype: str = DATA_TYPE_DEFAULT,
    init_method: bool | int | float | str = "zeros",
    backend: str = "numpy",
    use_gpu: bool = False,
) -> Any:
    """Helper function to create an instance with an empty array"""
    return self(
        value=create_array(
            shape=shape,
            dtype=dtype,
            init_method=init_method,
            backend=backend,
            use_gpu=use_gpu,
        )
    )

fill_array_method

fill_array_method(
    self,
    shape,
    dtype: str = DATA_TYPE_DEFAULT,
    init_method: bool | int | float | str = "zeros",
    backend: str = "numpy",
    use_gpu: bool = False,
) -> Any

Helper function to create an instance with an empty array

Source code in pvgisprototype/core/array_methods.py
def fill_array_method(
    # cl_ss,
    self,
    shape,
    dtype: str = DATA_TYPE_DEFAULT,
    init_method: bool | int | float | str = "zeros",
    backend: str = "numpy",
    use_gpu: bool = False,
) -> Any:
    """Helper function to create an instance with an empty array"""
    # return cl_ss(
    #     value=create_array(
    #         shape=shape,
    #         dtype=dtype,
    #         init_method=init_method,
    #         backend=backend,
    #         use_gpu=use_gpu,
    #     )
    # )
    self.value = create_array(
        shape=shape,
        dtype=dtype,
        init_method=init_method,
        backend=backend,
        use_gpu=use_gpu,
    )

arrays

Classes:

Name Description
ArrayDType
NDArrayBackend

Supported dense array backends.

Functions:

Name Description
create_array

Create an array with given shape, data type, initialization method, backend, and optional GPU usage.

ArrayDType

Bases: Enum

Methods:

Name Description
from_string

Return the corresponding dtype object from a string.

from_string classmethod

from_string(dtype_str)

Return the corresponding dtype object from a string.

Source code in pvgisprototype/core/arrays.py
@classmethod
def from_string(cls, dtype_str):
    """Return the corresponding dtype object from a string."""
    try:
        return cls[dtype_str.upper()].value
    except KeyError:
        raise ValueError(
            f"Invalid dtype. Supported types are: {list(cls.__members__.keys())}"
        )

NDArrayBackend

Bases: Enum

Supported dense array backends.

Methods:

Name Description
default

Return the default array backend.

from_gpu_flag

Select array backend based on whether GPU is used.

from_object

Determine the array backend associated with obj.

module

Return the Python module associated with an array backend.

type

Return the array type associated with the backend.

default classmethod

default() -> NDArrayBackend

Return the default array backend.

Source code in pvgisprototype/core/arrays.py
@classmethod
def default(cls) -> "NDArrayBackend":
    """Return the default array backend."""
    return cls.NUMPY

from_gpu_flag classmethod

from_gpu_flag(use_gpu: bool) -> NDArrayBackend

Select array backend based on whether GPU is used.

Source code in pvgisprototype/core/arrays.py
@classmethod
def from_gpu_flag(cls, use_gpu: bool) -> "NDArrayBackend":
    """Select array backend based on whether GPU is used."""
    return cls.CUPY if use_gpu and CUPY_ENABLED else cls.NUMPY

from_object classmethod

from_object(obj) -> NDArrayBackend

Determine the array backend associated with obj.

Source code in pvgisprototype/core/arrays.py
@classmethod
def from_object(cls, obj) -> "NDArrayBackend":
    """Determine the array backend associated with `obj`."""
    if obj is not None:
        for backend in cls:
            if isinstance(obj, backend.type()):
                return backend
    raise ValueError(f"No known array type to match {obj}.")

module

module(linear_algebra: bool = False) -> ModuleType

Return the Python module associated with an array backend.

Parameters:

Name Type Description Default
linear_algebra bool

If True, return the linear algebra submodule.

False
Source code in pvgisprototype/core/arrays.py
def module(self, linear_algebra: bool = False) -> types.ModuleType:
    """
    Return the Python module associated with an array backend.

    Parameters
    ----------
    linear_algebra: bool
        If True, return the linear algebra submodule.
    """
    if self == NDArrayBackend.NUMPY:
        module = numpy
        linalg_module = module.linalg
    elif self == NDArrayBackend.DASK:
        module = dask.array
        linalg_module = module.linalg
    elif self == NDArrayBackend.CUPY and CUPY_ENABLED:
        module = cupy
        linalg_module = module.linalg if module is not None else None
    else:
        raise ValueError(f"No known module for {self.name}.")
    return linalg_module if linear_algebra else module

type

type() -> type

Return the array type associated with the backend.

Source code in pvgisprototype/core/arrays.py
def type(self) -> type:
    """Return the array type associated with the backend."""
    if self == NDArrayBackend.NUMPY:
        return numpy.ndarray
    elif self == NDArrayBackend.DASK:
        import dask.array
        return dask.array.core.Array
    elif self == NDArrayBackend.CUPY and CUPY_ENABLED:
        return cupy.ndarray
    else:
        raise ValueError(f"No known array type for {self.name}.")

create_array

create_array(
    shape,
    dtype: str = DATA_TYPE_DEFAULT,
    init_method: bool | int | float | str = "zeros",
    backend: str = "numpy",
    use_gpu: bool = False,
)

Create an array with given shape, data type, initialization method, backend, and optional GPU usage.

Parameters:

Name Type Description Default
shape tuple

Shape of the array.

required
dtype str

Desired data-type for the array as a string. Default is 'float32'.

DATA_TYPE_DEFAULT
init_method str

Method to initialize the array. Options are 'zeros', 'ones', 'empty', and 'unset'. Default is 'zeros'.

'zeros'
backend str

The array backend to use. Options are 'numpy', 'cupy', and 'dask'. Default is 'numpy'.

'numpy'
use_gpu bool

If True, use GPU-accelerated arrays (CuPy) if available, overriding the backend choice. Default is False.

False

Returns:

Type Description
ndarray: An array initialized as specified.
Source code in pvgisprototype/core/arrays.py
def create_array(
    shape,
    dtype: str = DATA_TYPE_DEFAULT,
    init_method: bool | int | float | str = "zeros",
    backend: str = "numpy",
    use_gpu: bool = False,
):
    """
    Create an array with given shape, data type, initialization method, backend, and optional GPU usage.

    Parameters
    ----------
    shape : tuple
        Shape of the array.
    dtype : str, optional
        Desired data-type for the array as a string. Default is 'float32'.
    init_method : str, optional
        Method to initialize the array. Options are 'zeros', 'ones', 'empty', and 'unset'. Default is 'zeros'.
    backend : str, optional
        The array backend to use. Options are 'numpy', 'cupy', and 'dask'. Default is 'numpy'.
    use_gpu : bool, optional
        If True, use GPU-accelerated arrays (CuPy) if available, overriding the backend choice. Default is False.

    Returns
    -------
        ndarray: An array initialized as specified.

    """
    backend = backend.upper()

    # Get the actual dtype object from the string
    dtype_obj = ArrayDType.from_string(dtype)

    # Override backend if GPU is requested and CuPy is available
    if use_gpu and CUPY_ENABLED:
        array_backend = NDArrayBackend.CUPY
    # Handle backend selection
    else:
        if backend not in NDArrayBackend.__members__:
            raise ValueError(
                f"Invalid backend. Choose among {list(NDArrayBackend.__members__.keys())}."
            )
        array_backend = NDArrayBackend[backend.upper()]

    array_module = array_backend.module()

    # Select the initialization method
    if isinstance(init_method, (int, float)):  # User-requested value !
        array = array_module.full(shape, init_method, dtype=dtype_obj)
    elif isinstance(init_method, bool):
        array = array_module.full(shape, init_method, dtype=bool)
    elif init_method == "unset":
        array = array_module.full(shape, init_method, dtype='U5')
    elif init_method == "zeros":
        array = array_module.zeros(shape, dtype=dtype_obj)
    elif init_method == "ones":
        array = array_module.ones(shape, dtype=dtype_obj)
    elif init_method == "empty":
        array = array_module.empty(shape, dtype=dtype_obj)
    # elif isinstance(init_method, str):  # Handle arbitrary string initialization
    #     if dtype_obj != numpy.str_ and dtype_obj != numpy.object_:
    #         raise ValueError("String initialization requires dtype to be 'str' or 'object'.")
    #     array = array_module.full(shape, init_method, dtype=dtype_obj)
    else:
        raise ValueError(
            "Invalid initialization method. Choose from 'zeros', 'ones', 'empty', 'unset' or provide a specific numeric or boolean value."
        )

    return array

caching

Functions:

Name Description
clear_request_caches

Clear all caches for the current request

custom_cached

Backwards compatible per-request thread-safe LRU cache with TTL expiration.

generate_custom_hashkey

Generate a custom hash key for the given arguments and keyword arguments.

get_request_cache_registry

Get or create the current request's cache registry

inspect_cache_registry

Inspect the content of all cache memories in a cache registry

make_object_hashable

Convert unhashable objects to hashable representations.

register_cache

Register a cache memory in the thread-local cache registry

clear_request_caches

clear_request_caches()

Clear all caches for the current request

Source code in pvgisprototype/core/caching.py
def clear_request_caches():
    """Clear all caches for the current request"""
    if hasattr(_thread_local_storage, 'cache_registry'):
        registry = _thread_local_storage.cache_registry
        request_id = get_request_id()
        total_hits = 0
        total_misses = 0
        total_caches = len(registry)

        for cache in registry:
            try:
                if hasattr(cache, 'cache_info'):
                    info = cache.cache_info()
                    total_hits += info['lru_info'].hits
                    total_misses += info['lru_info'].misses

                if hasattr(cache, 'cache_clear'):
                    cache.cache_clear()
            except Exception as e:
                logger.warning(f"Error clearing cache: {e}")

        # Log performance summary
        total_requests = total_hits + total_misses
        hit_rate = (total_hits / total_requests * 100) if total_requests > 0 else 0

        logger.info(
            f"Request {request_id} cache summary: "
            f"{total_caches} caches, {total_hits} hits, {total_misses} misses, "
            f"{hit_rate:.1f}% hit rate"
        )

        # Clear the registry
        _thread_local_storage.cache_registry = []
        _thread_local_storage.request_id = 'unknown'

custom_cached

custom_cached(func)

Backwards compatible per-request thread-safe LRU cache with TTL expiration. Usage is exactly the same as your existing decorator. TTL is internally configurable via 'PVGIS_CACHE_TTL_SECONDS' env variable (default 300s).

Source code in pvgisprototype/core/caching.py
def custom_cached(func):
    """
    Backwards compatible per-request thread-safe LRU cache with TTL expiration.
    Usage is exactly the same as your existing decorator.
    TTL is internally configurable via 'PVGIS_CACHE_TTL_SECONDS' env variable (default 300s).
    """
    ttl = DEFAULT_TTL_SECONDS
    ttl_hash_gen = _ttl_hash_gen(ttl)

    def get_or_create_cache():
        cache_attr = f"_cache_{func.__name__}_{id(func)}"

        if not hasattr(_thread_local_storage, cache_attr):
            # LRUCache as backing cache store
            cache_memory = LRUCache(maxsize=CACHE_MAXSIZE)
            setattr(_thread_local_storage, cache_attr, cache_memory)
            # Register cache for per-request cleanup
            registry = getattr(_thread_local_storage, 'cache_registry', None)
            if registry is None:
                registry = []
                setattr(_thread_local_storage, 'cache_registry', registry)
            if cache_memory not in registry:
                registry.append(cache_memory)

            request_id = getattr(_thread_local_storage, 'request_id', 'unknown')
            logger.debug(f"Created cache for {func.__name__} in request {request_id}, TTL={ttl}s, maxsize={CACHE_MAXSIZE}")

        return getattr(_thread_local_storage, cache_attr)

    @wraps(func)
    def wrapper(*args, **kwargs):
        cache_memory = get_or_create_cache()

        # Compute TTL hash to invalidate cache every ttl seconds
        ttl_hash = next(ttl_hash_gen)

        # Generate composite key: (ttl_hash, your original key)
        # Use your existing generate_custom_hashkey to maintain compatible key hashing
        from pvgisprototype.core.caching import generate_custom_hashkey  # adjust import as needed
        key_inner = generate_custom_hashkey(*args, **kwargs)
        key = (ttl_hash, key_inner)

        if key in cache_memory:
            request_id = getattr(_thread_local_storage, 'request_id', 'unknown')
            logger.debug(f"Cache HIT for {func.__name__} in request {request_id} (ttl_hash={ttl_hash})")
            return cache_memory[key]

        # Cache miss: call function and store result
        result = func(*args, **kwargs)
        cache_memory[key] = result

        request_id = getattr(_thread_local_storage, 'request_id', 'unknown')
        logger.debug(f"Cache MISS for {func.__name__} in request {request_id} (ttl_hash={ttl_hash})")

        return result

    return wrapper

generate_custom_hashkey

generate_custom_hashkey(*args, **kwargs)

Generate a custom hash key for the given arguments and keyword arguments.

Returns:

Name Type Description
hashkey The hash key for the given arguments and keyword arguments.
Source code in pvgisprototype/core/caching.py
def generate_custom_hashkey(*args, **kwargs):
    """
    Generate a custom hash key for the given arguments and keyword arguments.

    Returns
    -------
    hashkey: The hash key for the given arguments and keyword arguments.
    """
    args_hashed = tuple(make_object_hashable(argument) for argument in args)

    kwargs_hashed = {key: make_object_hashable(value) for key, value in kwargs.items()}

    return hashkey(*args_hashed, **kwargs_hashed)

get_request_cache_registry

get_request_cache_registry()

Get or create the current request's cache registry

Source code in pvgisprototype/core/caching.py
def get_request_cache_registry():
    """Get or create the current request's cache registry"""
    if not hasattr(_thread_local_storage, 'cache_registry'):
        _thread_local_storage.cache_registry = []
        _thread_local_storage.request_id = generate_request_id()
        logger.debug(f"Created new request cache registry for {_thread_local_storage.request_id}")
    return _thread_local_storage.cache_registry

inspect_cache_registry

inspect_cache_registry(registry=None)

Inspect the content of all cache memories in a cache registry

Source code in pvgisprototype/core/caching.py
def inspect_cache_registry(registry=None):
    """Inspect the content of all cache memories in a cache registry"""
    if registry is None:
        registry = get_request_cache_registry()

    cache_states = {}
    for index, cache_func in enumerate(registry):
        try:
            if hasattr(cache_func, 'cache_info'):
                info = cache_func.cache_info()
                cache_states[f"cache_{index}"] = {
                    "function": info.get('function', 'unknown'),
                    "hits": info['lru_info'].hits,
                    "misses": info['lru_info'].misses,
                    "currsize": info['lru_info'].currsize,
                    "maxsize": info.get('maxsize', 'unknown'),
                    "ttl_seconds": info.get('ttl_seconds', 'unknown')
                }
            else:
                cache_states[f"cache_{index}"] = "Cache info not available"
        except Exception as e:
            cache_states[f"cache_{index}"] = f"Error getting cache info: {e}"

    return cache_states

make_object_hashable

make_object_hashable(object)

Convert unhashable objects to hashable representations. Uses generate_hash() for complex objects that can't be hashed directly.

Source code in pvgisprototype/core/caching.py
def make_object_hashable(object):
    """
    Convert unhashable objects to hashable representations.
    Uses generate_hash() for complex objects that can't be hashed directly.
    """
    try:
        # Try to hash the object directly first
        hash(object)
        logger.debug(f"Object {object} is hashable.")
        return object
    except TypeError:
        # If it's unhashable, use our custom generate_hash function
        logger.debug(f"Object {object} is unhashable.")
        return generate_hash(object)

register_cache

register_cache(cache, registry=None)

Register a cache memory in the thread-local cache registry

Source code in pvgisprototype/core/caching.py
def register_cache(cache, registry=None):
    """Register a cache memory in the thread-local cache registry"""
    if registry is None:
        registry = get_request_cache_registry()

    if cache not in registry:
        registry.append(cache)
        request_id = get_request_id()
        logger.debug(f"Cache registered for request {request_id} (registry size: {len(registry)})")
    return cache

data_model

Modules:

Name Description
generate
graph
inspect_data_model
yaml_definition_files

Important Note !

generate

Functions:

Name Description
callback_reset_python_data_model_definitions
main

Build and write Python data models from YAML definitions.

callback_reset_python_data_model_definitions

callback_reset_python_data_model_definitions(
    ctx: Context, reset_definitions: bool
)
Source code in pvgisprototype/core/data_model/generate.py
def callback_reset_python_data_model_definitions(
    ctx: Context,
    reset_definitions: bool,
):
    """
    """
    # print(ctx.params)
    if not reset_definitions:
        return
    else:
        output_file = ctx.params.get('output_file')
        verbose = ctx.params.get('verbose')
        print(f"Reset the Python definition dictionary in {output_file} an empty one !")
        reset_python_data_model_definitions(
            output_file=output_file,
            verbose=verbose,
        )
        raise typer.Exit()

main

main(
    source_path: Annotated[
        Path,
        Option(
            help="Source directory with YAML data model descriptions"
        ),
    ] = Path("definitions.yaml"),
    definitions: Annotated[
        List[str], typer_list_of_yaml_files
    ] = PVGIS_DATA_MODEL_YAML_DEFINITION_FILES,
    output_file: Annotated[
        Path, Option(help="Output file", is_eager=True)
    ] = Path("definitions.py"),
    verbose: Annotated[bool, Option(help=Verbose)] = False,
    log_file: Annotated[
        str | None,
        Option(--log - file, -l, help="Log file"),
    ] = LOG_FILE,
    log_level: str = LOG_LEVEL,
    rich_handler: Annotated[
        bool,
        Option(--rich, --no - rich, help="Rich handler"),
    ] = RICH_HANDLER,
    reset_definitions: Annotated[
        bool, typer_option_reset_definitions
    ] = False,
)

Build and write Python data models from YAML definitions.

Source code in pvgisprototype/core/data_model/generate.py
@app.command()
def main(
    source_path: Annotated[Path, typer.Option(help="Source directory with YAML data model descriptions")] = Path("definitions.yaml"),
    definitions: Annotated[List[str], typer_list_of_yaml_files] = PVGIS_DATA_MODEL_YAML_DEFINITION_FILES,
    output_file: Annotated[Path, typer.Option(help='Output file', is_eager=True)] = Path("definitions.py"),
    verbose: Annotated[bool, typer.Option(help="Verbose")] = False,
    log_file: Annotated[str | None, typer.Option("--log-file", "-l",help="Log file")] = LOG_FILE,
    log_level: str = LOG_LEVEL,
    rich_handler: Annotated[bool, typer.Option("--rich", "--no-rich", help="Rich handler")] = RICH_HANDLER,
    reset_definitions: Annotated[bool, typer_option_reset_definitions] = False, # I am a callback  function !
):
    """
    Build and write Python data models from YAML definitions.
    """
    #  Initialize logging
    setup_factory_logger(
            verbose=verbose,
            level=log_level,
            file=log_file,
            rich_handler=rich_handler,
    )

    try:
        reset_python_data_model_definitions(output_file=output_file, verbose=verbose)
        pvgis_data_models = build_python_data_models(
            source_path=source_path,
            yaml_files=definitions,
            verbose=verbose,
        )
        write_to_python_module(models=pvgis_data_models, output_file=output_file, verbose=verbose)
    except Exception as e:
        logger.exception(f"An error occurred: {e}")
    else:
        if verbose:
            logger.success("Data models successfully generated !")

graph

Modules:

Name Description
build
circular_tree
colors
generate
graphviz_
gravis_
sort

build

Functions:

Name Description
build_dependency_graph

Build a recursive dependency graph from YAML files.

process_model

Process a YAML file and its dependencies

resolve_require_path

Resolve require path (e.g., 'sun/position') to a YAML file (e.g., 'sun/position.yaml').

build_dependency_graph
build_dependency_graph(
    source_path: Path,
    verbose: bool = False,
    log_level: str = "WARNING",
    log_file: Path | None = None,
    rich_handler: bool = False,
) -> DiGraph

Build a recursive dependency graph from YAML files.

Source code in pvgisprototype/core/data_model/graph/build.py
def build_dependency_graph(
    source_path: Path,
    verbose: bool = False,
    log_level: str = "WARNING",
    log_file: Path | None = None,
    rich_handler: bool = False,
) -> nx.DiGraph:
    """
    Build a recursive dependency graph from YAML files.
    """
    # # Only set up logging if not already configured
    # if not any(
    #     handler.levelno <= getattr(logger, log_level.upper())
    #     for handler in logger._core.handlers.values()
    # ):
    #  Initialize logging
    # setup_factory_logger(
    #     verbose=verbose,
    #     level=log_level,
    #     file=log_file,
    #     rich_handler=rich_handler,
    # )

    base_dir = Path(source_path.parts[0])
    logger.debug(f"Base directory : {base_dir=}")
    # base_dir = Path(source_path.parts[0]) if source_path.is_dir() else source_path.parents[1]

    graph = nx.DiGraph()
    visited = {}  # Maps require path -> model name
    queue = deque()

    if source_path.is_file() and source_path.suffix == '.yaml':
        queue.append((source_path.name, source_path))
        logger.debug(f"Data appended to queue which now is {queue=}")

    elif source_path.is_dir():
        yaml_files = source_path.rglob("*.yaml")
        for yaml_file in track(yaml_files, description="Queueing data models for processing\n" ):
            queue.append((yaml_file.name, yaml_file))

    while queue:
        # logger.debug(f"[underline]The queue is now[/underline]\n\n   {queue=}\n")
        req_name, yaml_path = queue.popleft()
        # logger.debug(f"[dim]After poping\n\n  {queue=}[/dim]\n")
        logger.debug(f"In the queue : {req_name=} {yaml_path=}")
        if req_name in visited:
            logger.debug(f"[red dim]{req_name=} already processed ![/red dim]")
            continue  # Already processed

        process_model(
            graph=graph,
            base_dir=base_dir,
            require_path=req_name,
            yaml_path=yaml_path,
            queue=queue,
            visited=visited,
        )

    # logger.debug("Return dependency graph G\n\n{graph}", G=graph)
    # logger.debug(f"Return {graph.nodes()=}")
    return graph
process_model
process_model(
    graph: DiGraph,
    base_dir: Path,
    require_path: str,
    yaml_path: Path,
    queue: deque,
    visited: dict,
)

Process a YAML file and its dependencies

Source code in pvgisprototype/core/data_model/graph/build.py
def process_model(
    graph: nx.DiGraph,
    base_dir: Path,
    require_path: str,
    yaml_path: Path,
    queue: deque,
    visited: dict,
):
    """
    Process a YAML file and its dependencies
    """
    logger.debug(f"Input graph\n   {graph.nodes=}\n   {graph.edges=}\n")
    # logger.debug(
    #     f"Processing\n\n   {graph.nodes()=}\n\n   YAML file {yaml_path=}",
    # )
    logger.debug(
        "Processing YAML file {yaml_path}",
        yaml_path=yaml_path,
        alt=f"Processing YAML file {yaml_path=}"
    )
    with open(yaml_path, 'r', encoding='utf-8') as f:
        data = yaml.safe_load(f)
        logger.debug(f"Loaded data\n   {data=}")

    model_name = data.get('name')  # required
    visited[require_path] = model_name

    # Merge parent attributes if inheriting
    if 'require' in data:
        for parent_require in track(data['require'], description="Resolving requirements"):
            logger.debug(f"Processing require directive {base_dir=} / {require_path=} = {parent_require=}")
            parent_path = resolve_require_path(base_dir=base_dir, require_path=parent_require)
            logger.debug(f"Path to parent node {parent_path=}")
            if parent_path.exists():
                logger.debug(f"Loading {parent_path=}")
                with open(parent_path, 'r') as pf:
                    parent_data = yaml.safe_load(pf)
                    # Recursive attribute merging
                logger.debug(f"Merging\n\n   {parent_data}\n\nand\n\n   {data=}\n")
                data = deep_merge(
                    base=parent_data,
                    override=data,
                )
                logger.debug(f"Merged\n\n   {data=}\n")

    # Get meaninfgul attributes
    model_symbol = data.get('symbol', '')
    model_label = data.get('label', '')
    model_label += f" {model_symbol}"
    model_description = data.get('description')  # required
    model_attributes = data.get('sections', '')
    model_color = data.get('color', 'white')

    # Add node with merged attributes
    logger.debug(
            f"Adding {model_name=} to graph"
            )

    # ---------------------------------------------------------------------
    # graph.add_node(
    #     node_for_adding=model_name,
    #     **{key: value for key, value in data.items() if key != "require"},
    #     _source_path=str(yaml_path),
    # )
    # ---------------------------------------------------------------------
    graph.add_node(
        node_for_adding=model_name,
        label=model_label,
        description=model_description,
        symbol=model_symbol,
        attributes=model_attributes,
        color=model_color,
        border_color='lightgray',
        border_size=1,
        hover=model_description,
        click=yaml.dump(data=model_attributes, allow_unicode=True),#, encoding='utf-8'),
    )

    logger.debug(
        f"   [bold dim]Updated graph nodes\n      {graph.nodes[model_name]=}\n"
    )

    # Process dependencies
    requires = data.get('require', [])
    logger.debug(f"[bold blue]Require directives to process[/bold blue]\n\n   {requires=}\n")

    for parent_node in requires:

        logger.debug(f"[bold blue]Processing parent node[/bold blue] {parent_node=}")
        parent_node_path = resolve_require_path(base_dir, parent_node)
        parent_node_label = parent_node_path.parts[-2]

        # logger.debug(f"{parent_node_path.exists()=}")
        if not parent_node_path.exists():
            logger.debug(f"Continue ?")
            continue

        if parent_node not in visited:
            logger.debug(f"[blue dim]Appending to processing queue[/blue dim] {parent_node=}")
            queue.append((parent_node, parent_node_path))
            with open(parent_node_path, 'r', encoding='utf-8') as parent_node_file:
                parent_node = yaml.safe_load(parent_node_file)
            parent_node_name = parent_node.get('name', '<Unnamed node>')

        else:
            logger.debug(f"[blue]Already visited[/blue] {parent_node=}, see\n\n   {visited=}")
            parent_node_name = visited[parent_node]


        # Add edge even if parent not processed yet
        graph.add_edge(
            u_of_edge=model_name,
            v_of_edge=parent_node_name or parent_node,
            label=parent_node_label,
            color='lightgray',
        )
        # logger.debug(f"Updated edges\n      {graph.edges=}\n")
        # logger.debug(f"Nodes after new edge addition\n      {graph.nodes=}\n")

    logger.debug(f"Updated graph\n   {graph.nodes=}\n   {graph.edges=}\n")
resolve_require_path
resolve_require_path(
    base_dir: Path, require_path: str
) -> Path

Resolve require path (e.g., 'sun/position') to a YAML file (e.g., 'sun/position.yaml').

Source code in pvgisprototype/core/data_model/graph/build.py
def resolve_require_path(base_dir: Path, require_path: str) -> Path:
    """
    Resolve require path (e.g., 'sun/position') to a YAML file (e.g., 'sun/position.yaml').
    """
    path = (base_dir / require_path).with_suffix('.yaml')
    # logger.debug(f"[blue]Resolved path to parent node[/blue] {path=}")
    return path

circular_tree

Functions:

Name Description
visualise_circular_tree
visualise_circular_tree
visualise_circular_tree(
    graph: DiGraph,
    output_file: str = "data_model_dependency_graph",
    node_size: int = 20,
) -> None
Source code in pvgisprototype/core/data_model/graph/circular_tree.py
def visualise_circular_tree(
    graph: nx.DiGraph,
    output_file: str = "data_model_dependency_graph",
    node_size: int = 20,
) -> None:
    """ """
    pos = nx.nx_agraph.graphviz_layout(
        G=graph,
        prog="twopi",
        args="",
    )

    # plt.figure(figsize=(11.69, 11.69))
    plt.figure(figsize=(10, 10))
    nx.draw(
        G=graph,
        pos=pos,
        node_size=node_size,
        alpha=0.5,
        node_color="blue",
        with_labels=False,
    )

    leaf_nodes = [n for n in graph.nodes if graph.out_degree(n) == 0]

    # leaf_nodes = set()
    # parent_nodes = set()
    # parent_paths = {}

    # for u, v, data in graph.edges(data=True):
    #     leaf_nodes.add(u)
    #     parent_nodes.add(v)
    #     parent_paths[v] = data.get('label', 'unknown')

    logger.info(
        "Leaf nodes\n\n{leaf_nodes}",
        # "Leaf nodes\n\n{leaf_nodes}\n\nParent nodes\n\n{parent_nodes}",
        leaf_nodes=leaf_nodes,
        # parent_nodes=parent_nodes,
    )

    # Create labels only for leaf nodes
    # labels = {n: str(n) for n in leaf_nodes}
    labels = {
        # n: n + f"\n{data.get('symbol', '')}"
        node: f"{data.get('symbol', 'Symbol')}"
        for node, data in graph.nodes(data=True)
        if node in leaf_nodes
    }

    # Draw labels for leaf nodes
    nx.draw_networkx_labels(
        G=graph,
        pos=pos,
        labels=labels,
        font_size=7,
        font_color="red",
        # font_weight="bold",
        horizontalalignment='center'
    )

    # Titles
    plt.suptitle(
        t="Data Model Dependency Graph ⎄",
        fontsize=12,
        fontweight='bold',
        color='#2F3131',  #darkgray',
    )

    # plt.title(
    #     label="Lighter Colors = Path Hierarchy | Salmon = Leaf Node",
    #     fontsize=9,
    #     color='#2F4F4F',  #'darkgray',
    #     # pad=10
    # )

    # plt.axis("off")
    plt.axis("equal")
    # plt.tight_layout(rect=[0, 0, 1, 0.98])

    # Save
    output_file += ".png"
    plt.savefig(output_file, dpi=300, bbox_inches='tight')
    plt.close()
    logger.info(f"Graph saved to {output_file}")

colors

Functions:

Name Description
generate_color_from_path

Generate a visually distinct color for a given path.

generate_color_from_path
generate_color_from_path(path: str, max_levels: int = 10)

Generate a visually distinct color for a given path. - Root component determines the base hue. - Each subpath level modifies saturation and lightness.

Source code in pvgisprototype/core/data_model/graph/colors.py
def generate_color_from_path(path: str, max_levels: int = 10):
    """
    Generate a visually distinct color for a given path.
    - Root component determines the base hue.
    - Each subpath level modifies saturation and lightness.
    """
    components = path.split("/")
    root = components[0]

    # Generate a consistent base hue from root
    hash_obj = hashlib.md5(root.encode())
    hue = int(hash_obj.hexdigest(), 16) / (16**32)  # Normalize to [0,1]

    # Lighter base lightness and subtle contrast with depth
    base_lightness = 0.85
    lightness = base_lightness - 0.02 * min(len(components), max_levels)

    # Slightly increase saturation with depth for contrast
    saturation = 0.4 + 0.03 * min(len(components), max_levels)

    # Convert to RGB
    r, g, b = colorsys.hls_to_rgb(hue, lightness, saturation)
    return (r, g, b)

generate

Functions:

Name Description
generate_circular_tree
generate_graph
generate_gravis_d3
generate_hierarchical_graph
generate_circular_tree
generate_circular_tree(
    source_path: Path, node_size: int = 20
) -> None
Source code in pvgisprototype/core/data_model/graph/generate.py
def generate_circular_tree(
    source_path: Path,
    # yaml_file: Path,
    node_size: int = 20,
) -> None:
    """
    """
    # Build graph
    graph = build_dependency_graph(
        source_path=source_path,
    )

    # # Topological sort
    # import networkx as nx
    # def topological_sort(G: nx.DiGraph) -> list:
    #     try:
    #         return list(nx.topological_sort(G))
    #     except nx.NetworkXUnfeasible:
    #         raise ValueError("Graph contains a cycle")

    # order = topological_sort(graph)
    # logger.debug("Topological Order:", order)

    # Visualize
    visualise_circular_tree(
        graph=graph,
        node_size=node_size,
    )
generate_graph
generate_graph(
    source_path: Path,
    node_size: int = 2400,
    parent_node_size: int = 1200,
) -> None
Source code in pvgisprototype/core/data_model/graph/generate.py
def generate_graph(
    source_path: Path,
    # yaml_file: Path,
    node_size: int = 2400,
    parent_node_size: int = 1200,
) -> None:
    """
    """
    # Build graph
    graph = build_dependency_graph(
        source_path=source_path,
    )

    # # Topological sort
    # import networkx as nx
    # def topological_sort(G: nx.DiGraph) -> list:
    #     try:
    #         return list(nx.topological_sort(G))
    #     except nx.NetworkXUnfeasible:
    #         raise ValueError("Graph contains a cycle")

    # order = topological_sort(graph)
    # logger.debug("Topological Order:", order)

    # Visualize
    visualise_graph(
        graph=graph,
        node_size=node_size,
        parent_node_size=parent_node_size,
    )
generate_gravis_d3
generate_gravis_d3(
    yaml_file: Path,
    output_file: Path,
    verbose: bool = False,
    log_level: str = "WARNING",
    log_file: Path | None = None,
    rich_handler: bool = False,
) -> None
Source code in pvgisprototype/core/data_model/graph/generate.py
def generate_gravis_d3(
    yaml_file: Path,
    # yaml_file: Path,
    output_file: Path,
    # node_size: int = 2400,
    # parent_node_size: int = 1200,
    verbose: bool = False,
    log_level: str = "WARNING",
    log_file: Path | None = None,
    rich_handler: bool = False,
) -> None:
    """
    """
    graph = build_dependency_graph(
        source_path=yaml_file,
        verbose=verbose,
        log_level=log_level,
        log_file=log_file,
        rich_handler=rich_handler,
    )
    logger.debug(f"{graph.nodes()=}\n{graph.edges()=}")
    if not output_file:
        if yaml_file.is_file():
            output_file = Path(yaml_file.name).with_suffix('.html')
        if yaml_file.is_dir():
            output_file = yaml_file.with_suffix('.html')
    visualise_gravis_d3(
        graph=graph,
        output_file=output_file,
        # node_size=node_size,
        # parent_node_size=parent_node_size,
        # log_level=log_level,
        # log_file=log_file,
        # rich_handler=rich_handler,
    )
generate_hierarchical_graph
generate_hierarchical_graph(source_path: Path) -> None
Source code in pvgisprototype/core/data_model/graph/generate.py
def generate_hierarchical_graph(
    source_path: Path,
) -> None:
    """
    """
    # Build graph
    graph = build_dependency_graph(source_path=source_path)

    # Topological sort
    order = topological_sort(graph)
    logger.debug("Topological Order:", order)

    visualise_hierarchical_graph(graph)

graphviz_

Functions:

Name Description
visualise_graph

Visualize a dependency graph with hierarchical layout

visualise_hierarchical_graph

Visualize with hierarchical layout using graphviz

visualise_graph
visualise_graph(
    graph: DiGraph,
    output_file: str = "data_model_dependency_graph",
    node_size: int = 2400,
    parent_node_size: int = 1200,
) -> None

Visualize a dependency graph with hierarchical layout

Source code in pvgisprototype/core/data_model/graph/graphviz_.py
def visualise_graph(
    graph: nx.DiGraph,
    output_file: str = "data_model_dependency_graph",
    node_size: int = 2400,
    parent_node_size: int = 1200,
) -> None:
    """
    Visualize a dependency graph with hierarchical layout
    """
    leaf_nodes = set()
    parent_nodes = set()
    parent_paths = {}

    for u, v, data in graph.edges(data=True):
        leaf_nodes.add(u)
        parent_nodes.add(v)
        parent_paths[v] = data.get('label', 'unknown')

    logger.info(
        "Leaf nodes\n\n{leaf_nodes}\n\nParent nodes\n\n{parent_nodes}",
        leaf_nodes=leaf_nodes,
        parent_nodes=parent_nodes,
    )

    # Assign node colors
    node_colors = {}

    # Color leaf nodes
    for node in leaf_nodes:
        node_colors[node] = "salmon"

    # Color parent nodes based on path
    for name, path in parent_paths.items():
        node_colors[name] = generate_color_from_path(path)

    # Ensure all nodes have a color
    for node in graph.nodes:
        if node in leaf_nodes:
            node_colors[node] = "salmon"
        elif node in parent_nodes:
            path = parent_paths.get(node, "unknown")
            node_colors[node] = generate_color_from_path(path)
        else:
            # Isolated node (no incoming or outgoing edges)
            node_colors[node] = "salmon"  # Default to leaf

    # Node sizes
    node_sizes = [
        parent_node_size if node in parent_nodes else node_size for node in graph.nodes
    ]

    # Use graphviz_layout for hierarchical layout
    plt.figure(figsize=(32, 11.69))
    # plt.figure(figsize=(11.69,8.27))  # landscape
    position = graphviz_layout(
        G=graph,
        # prog='sfdp',
        # args="-Goverlap=prism100 -Gsep=0.1",
        # prog="dot",  # Use `dot` for top-down hierarchy
        # args="-Granksep=1.5 -Gnodesep=1.0",
        prog='twopi',
        args="-Groot=node_name -Gsize=20 -Gsep=1",
    )
    # position = nx.spring_layout(
    #     G=graph,
    #     seed=42,
    #     k=.7,  # k controls spacing
    #     # weight='weight',
    # )

    # Draw nodes
    nx.draw(
        G=graph,
        pos=position,
        with_labels=False,
        node_size=node_sizes,
        node_color=[node_colors[n] for n in graph.nodes],
        edge_color='lightgray',
        width=0.5,
        alpha=0.7,
        linewidths=0.5,
        # edgecolors='black',
        ax=plt.gca()
    )

    # Draw labels
    # Parent labels excluding nodes that are also leaf nodes
    parent_labels = {
        n: n + f"\n{data.get('symbol', '')}"
        for n in parent_nodes
        if n not in leaf_nodes
    }
    leaf_labels = {
        n: n + f"\n{data.get('symbol', '')}"
        for n, data in graph.nodes(data=True)
        if n in leaf_nodes
    }

    label_pos = {k: (v[0], v[1] + 0.02) for k, v in position.items()}
    # node_symbols = {n: data.get("symbol", "") for n, data in graph.nodes(data=True)}

    nx.draw_networkx_labels(
        G=graph,
        pos=label_pos,
        labels=parent_labels,
        font_size=9,
        font_color="#4D4D4D",  # Gray 30
        font_weight="normal",
        horizontalalignment='center',
        # verticalalignment='bottom',
        ax=plt.gca()
    )
    nx.draw_networkx_labels(
        G=graph,
        pos=position,
        # labels=leaf_labels,
        labels=leaf_labels,
        font_size=10,
        font_color="#666666",
        # font_color='darkgray',
        font_weight='bold',
        horizontalalignment='center',
        # verticalalignment='top',
        ax=plt.gca()
    )

    # Draw edge labels
    edge_labels = {(u, v): data["label"] for u, v, data in graph.edges(data=True)}
    nx.draw_networkx_edge_labels(
        G=graph, 
        pos=position,
        edge_labels=edge_labels,
        font_size=8,
        font_color='blue',
        alpha=0.66,
        bbox=dict(facecolor='white', edgecolor='none', alpha=0.7, pad=1.5),
        label_pos=0.66,
        ax=plt.gca()
    )

    # Titles
    plt.suptitle(
        t="Data ⎄ Model Dependency Graph ⎄",
        fontsize=12,
        fontweight='bold',
        color='#2F3131',  #darkgray',
    )
    plt.title(
        label="Lighter Colors = Path Hierarchy | Salmon = Leaf Node",
        fontsize=9,
        color='#2F4F4F',  #'darkgray',
        # pad=10
    )
    plt.axis("off")
    plt.tight_layout(rect=[0, 0, 1, 0.98])

    # Save
    output_file += ".png"
    plt.savefig(output_file, dpi=300, bbox_inches='tight')
    plt.close()
    logger.info(f"Graph saved to {output_file}")
visualise_hierarchical_graph
visualise_hierarchical_graph(
    graph: dict, output_file: str = "hierarchical_graph"
)

Visualize with hierarchical layout using graphviz

Source code in pvgisprototype/core/data_model/graph/graphviz_.py
def visualise_hierarchical_graph(graph: dict, output_file: str = "hierarchical_graph"):
    """Visualize with hierarchical layout using graphviz"""
    dot = nx.DiGraph(
        comment="Hierarchical Data Model Dependencies",
        graph_attr={"rankdir": "LR", "splines": "ortho"},
    )

    # Add nodes with color coding
    parent_nodes = set()
    for deps in graph.values():
        parent_nodes.update(deps)

    for node in graph:
        dot.node(
            node,
            label=node,
            _attributes={
                "style": "filled",
                "fillcolor": "lightgreen" if node in parent_nodes else "salmon",
                "shape": "box",
                "width": "1.5" if node in parent_nodes else "1.2",
            },
        )

    # Add edges
    for node, deps in graph.items():
        for dep in deps:
            dot.edge(node, dep)

    dot.render(output_file, format="png", cleanup=True)
    print(f"Hierarchical graph saved to {output_file}.png")

gravis_

Functions:

Name Description
assign_properties
visualise_gravis_d3
assign_properties
assign_properties(g, node_size: int = 15)

Source of this function : https://robert-haas.github.io/gravis-docs/code/examples/external_tools/networkx.html#Example-2

Source code in pvgisprototype/core/data_model/graph/gravis_.py
def assign_properties(
    g,
    node_size: int = 15,
):
    """
    Source of this function : https://robert-haas.github.io/gravis-docs/code/examples/external_tools/networkx.html#Example-2
    """
    logger.debug(f"Post-processing")
    # Centrality calculation
    # node_centralities = nx.eigenvector_centrality(g)
    node_centralities = nx.out_degree_centrality(g)
    logger.debug(f"{node_centralities=}")
    # edge_centralities = nx.edge_betweenness_centrality(g)

    # Community detection
    # communities = nx.algorithms.community.greedy_modularity_communities(g)

    # # Graph properties
    # g.graph["node_border_size"] = 1.5
    # g.graph["node_border_color"] = "white"
    # g.graph["edge_opacity"] = 0.9

    # Node properties: Size by centrality, shape by size, color by community
    # colors = [
    #     "red",
    #     "blue",
    #     "green",
    #     "orange",
    #     "pink",
    #     "brown",
    #     "yellow",
    #     "cyan",
    #     "magenta",
    #     "violet",
    # ]
    for node_id in g.nodes:
        node = g.nodes[node_id]
        node["size"] = node_size + node_centralities[node_id] * 33
visualise_gravis_d3
visualise_gravis_d3(
    graph: DiGraph,
    output_file: Path = Path("data_model_graph.html"),
)
Source code in pvgisprototype/core/data_model/graph/gravis_.py
def visualise_gravis_d3(
    graph: nx.DiGraph,
    output_file: Path = Path("data_model_graph.html"),
    # node_size: int = 2400,
    # parent_node_size: int = 1200,
    # verbose: bool = False,
    # log_level: str = "WARNING",
    # log_file: Path | None = None,
    # rich_handler: bool = False,
):
    """
    """
    assign_properties(graph)
    fig = gv.d3(
        data=graph,
        graph_height=800,
        details_height=200,
        show_details=True,
        show_details_toggle_button=True,
        show_menu=True,
        show_menu_toggle_button=True,
        show_node=True,
        node_size_factor=1.2,
        node_size_data_source="size",
        use_node_size_normalization=False,
        node_size_normalization_min=10.0,
        node_size_normalization_max=50.0,
        node_drag_fix=True,
        node_hover_neighborhood=True,
        node_hover_tooltip=True,
        show_node_image=True,
        node_image_size_factor=1.0,
        show_node_label=True,
        show_node_label_border=True,
        node_label_data_source="label",
        node_label_size_factor=1.5,
        node_label_rotation=33.0,
        node_label_font="Arial",
        show_edge=True,
        edge_size_factor=1.0,
        edge_size_data_source="size",
        use_edge_size_normalization=True,
        edge_size_normalization_min=0.2,
        edge_size_normalization_max=5.0,
        edge_curvature=0.0,
        edge_hover_tooltip=True,
        show_edge_label=True,
        show_edge_label_border=True,
        edge_label_data_source="label",
        edge_label_size_factor=1.7,
        edge_label_rotation=33.0,
        edge_label_font="Arial",
        zoom_factor=0.9,
        large_graph_threshold=500,
        layout_algorithm_active=True,
        # specific for d3
        use_many_body_force=True,
        many_body_force_strength=-500.0,
        many_body_force_theta=0.9,
        use_many_body_force_min_distance=False,
        many_body_force_min_distance=10.0,
        use_many_body_force_max_distance=False,
        many_body_force_max_distance=1000.0,
        use_links_force=True,
        links_force_distance=150.0,
        links_force_strength=0.5,
        use_collision_force=True,
        collision_force_radius=90.0,
        collision_force_strength=0.9,
        use_x_positioning_force=False,
        x_positioning_force_strength=0.2,
        use_y_positioning_force=True,
        y_positioning_force_strength=0.5,
        use_centering_force=True,
    )

    # Export to HTML with embedded data
    fig.export_html(output_file)
    output_file.is_file()

sort

Functions:

Name Description
topological_sort

Perform topological sort using Kahn's algorithm

topological_sort
topological_sort(graph: dict) -> list

Perform topological sort using Kahn's algorithm

Source code in pvgisprototype/core/data_model/graph/sort.py
def topological_sort(graph: dict) -> list:
    """Perform topological sort using Kahn's algorithm"""
    in_degree = defaultdict(int)
    for node in graph:
        for neighbor in graph.get(node, []):
            in_degree[neighbor] += 1

    queue = deque([node for node in graph if in_degree[node] == 0])
    result = []

    while queue:
        node = queue.popleft()
        result.append(node)

        for neighbor in graph.get(node, []):
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)

    # Check for cycles
    if len(result) < len(graph):
        raise ValueError("Graph has a cycle")

    return result

inspect_data_model

Functions:

Name Description
main

Inspect data model definitions including YAML files, Python dictionaries

main

main(
    verbose: Annotated[bool, Option(help=Verbose)] = False,
    log_file: Annotated[
        str | None,
        Option(--log - file, -l, help="Log file"),
    ] = LOG_FILE,
    log_level: str = LOG_LEVEL,
    rich_handler: Annotated[
        bool,
        Option(--rich, --no - rich, help="Rich handler"),
    ] = RICH_HANDLER,
)

Inspect data model definitions including YAML files, Python dictionaries and native PVGIS data models.

Source code in pvgisprototype/core/data_model/inspect_data_model.py
@app.callback()
def main(
    verbose: Annotated[bool, typer.Option(help="Verbose")] = False,
    log_file: Annotated[str | None, typer.Option("--log-file", "-l",help="Log file")] = LOG_FILE,
    log_level: str = LOG_LEVEL,
    rich_handler: Annotated[bool, typer.Option("--rich", "--no-rich", help="Rich handler")] = RICH_HANDLER,
):
    """
    Inspect data model definitions including YAML files, Python dictionaries
    and native PVGIS data models.
    """
    if verbose:
        log_level = "DEBUG"
    setup_factory_logger(level=log_level, file=log_file, rich_handler=rich_handler)

yaml_definition_files

Important Note !

The following is an ordered list of files defining PVGIS' native data models in YAML syntax.

Complex models depend on simpler ones, therefore the latter must exist before the former.

If you need to reorder the generation of PVGIS' native data models, please handle the list with care, as it may lead to issues.

Hint

When developing new or refactoring existing YAML data model definitions, things may get messy. To start from scratch, assign an empty dictionary to the PVGIS_DATA_MODEL_YAML_DEFINITION_FILES "constant", like so

```
PVGIS_DATA_MODEL_YAML_DEFINITION_FILES = {}
```

and rerun the script that generates the Python data model definitions.

Happy pv-Hacking !

factory

Modules:

Name Description
context
data_model

This module defines a factory to generate custom data classes (or else models)

definition
log

Set FACTORY_LOG_FILE before running a script that uses this logger to log out

property_functions

Attention

context

Functions:

Name Description
parse_fields

Notes

populate_context

Populate the context of an existing object

parse_fields

parse_fields(
    data_model,
    model_definition,
    fields: list,
    angle_output_units: str = RADIANS,
) -> dict
Notes

The YAML-based definition of a data model includes (expectedly) important attributes used to construct the output, namely :

  • shortname
  • title
  • symbol

These attributes are functionally required for solar position relevant data models :

  • the combination of shortname + symbol or the title are used to check membership in SolarPositionParameterColumnName().

For the output (column names) :

  • the output field title (or name) is composed by the shortname and the symbol
Source code in pvgisprototype/core/factory/context.py
def parse_fields(
    data_model,
    model_definition,
    fields: list,
    angle_output_units: str = RADIANS,
) -> dict:
    """
    Notes
    -----
    The YAML-based definition of a data model includes (expectedly) important
    attributes used to construct the output, namely :

    - shortname
    - title
    - symbol

    These attributes are functionally required for solar position relevant data
    models :

    - the combination of `shortname` + `symbol` or the `title` are used to
      check membership in `SolarPositionParameterColumnName()`.

    For the output (column names) :

    - the output field title (or name) is composed by the `shortname` and the
    `symbol`

    """
    # Get all solar position parameter field names
    solar_position_parameters = set(
        SolarPositionParameterColumnName.__members__.values()
    )

    data_container = OrderedDict()
    data_model_shortname_and_symbol = f"{data_model.shortname} {data_model.symbol}"

    # First, in case the data model is a simple one (i.e. not a nested one)
    if hasattr(data_model, 'value'):
        if data_model_shortname_and_symbol in solar_position_parameters:
            # angular value : convert using the requested `angle_output_units` method
            data_model.value = getattr(data_model, angle_output_units)

    field_value = None
    for field in fields:

        try:
            field_object = getattr(data_model, field)
            field_definition = model_definition.get(field, {})
            # Check if this field is a solar position parameter
            is_solar_position_parameter = (
                field_definition.get("title", None) in solar_position_parameters
            )

            # for all fields, use .value if available
            if hasattr(field_object, "value"):
                field_value = field_object.value

                if is_solar_position_parameter:
                    # if the _object_ has .radians or .degrees implied is an angular quantity
                    attribute = getattr(field_object, angle_output_units)

                    # angular value : convert using the requested `angle_output_units` method
                    if callable(attribute):
                        field_value = (
                            attribute()
                        )  # Actually call .radians() or .degrees()

                    else:
                        field_value = attribute

                else:
                    field_value = field_object.value

            else:
                field_value = field_object

        except AttributeError:
            field_value = None

        field_title = str()
        if field == "value":
            # If shortname + symbol exist, use'm !
            if hasattr(data_model, "shortname") and hasattr(data_model, "symbol"):
                field_title = f"{data_model.shortname} {data_model.symbol}"
                # field_title = f"{data_model.shortname}"

        elif field == "fingerprint":
            field_name = model_definition.get(field, {}).get("title", field)
            field_symbol = model_definition.get(field, {}).get("symbol", field)
            field_title = f"{field_name} {field_symbol}"
            field_value = generate_hash(data_model.value)

        else:
            # Get the title for the field from the model definition
            field_title = model_definition.get(field, {}).get("title", field)

        # Add to component content with title as key
        data_container[field_title] = field_value

    return data_container

populate_context

populate_context(
    self,
    verbose=0,
    fingerprint: bool = True,
    angle_output_units: str = RADIANS,
    locals: dict = {},
)

Populate the context of an existing object

Parameters:

Name Type Description Default
self

This is a PVGIS Data Model. Initially defined and described in YAML syntax, then transformed to a Pydantic Model.

required
verbose

Verbosity level from the function's local scope. This is required compare against the verbose condition set in the data model definition.

0
fingerprint bool

True will retrieve the fingerprint from the data model.

True
angle_output_units str

Angular unit for the output data can be either 'radians' (default) or 'degrees'.

RADIANS
Notes

See also: data model definitions in YAML syntax under definitions.yaml.

An example for the input self data model is the SolarAltitude.

Source code in pvgisprototype/core/factory/context.py
def populate_context(
    self,
    verbose=0,  
    fingerprint: bool = True,
    angle_output_units: str = RADIANS,
    locals: dict = {},
):
    """Populate the context of an existing object

    Parameters
    ----------
    self: [DataModel]
        This is a PVGIS Data Model. Initially defined and described in YAML
        syntax, then transformed to a Pydantic Model.

    verbose: int
        Verbosity level from the function's local scope. This is required
        compare against the `verbose` condition set in the data model
        definition.

    fingerprint: bool
        True will retrieve the fingerprint from the data model.

    angle_output_units: str
        Angular unit for the output data can be either 'radians' (default) or
        'degrees'.

    Notes
    -----
    See also: data model definitions in YAML syntax under `definitions.yaml`.

    An example for the input `self` data model is the `SolarAltitude`.

    """
    # Get the definition of the data model, originally defined in YAML syntex
    model_definition = self.model_definition

    # Ensure order of data model fields as they appear in a YAML definition
    output = OrderedDict()

    # Check if there is an 'output' definition in the YAML for that Model
    if "output" in model_definition:

        # Read the structure definitions
        structure = model_definition['output'].get("structure")

        # Iterate the sections, if they exist
        if structure:

            for section_definition in structure:
                section = section_definition.get("section")
                condition = section_definition.get("condition")
                output[section] = {}

                if "subsections" in section_definition:
                    subsection = ""
                    subsections = section_definition.get("subsections")
                    subsection_content = {}

                    for subsection_definition in subsections:
                        subsection = subsection_definition.get("subsection")
                        subsection_condition = subsection_definition.get("condition")

                        # Build a `names` dictionary dynamically 
                        names = {
                            'verbose': verbose,
                            'reflectivity_factor': getattr(self, 'reflectivity_factor', numpy_array([])),
                            # other attributes ?
                        }

                        if subsection_condition is None or simple_eval(
                            subsection_condition,
                            names=names,
                        ):
                            subsection_content = {}

                            fields = subsection_definition.get("fields")
                            if fields:
                                subsection_content = parse_fields(
                                    data_model=self,
                                    model_definition=model_definition,
                                    fields=fields,
                                    angle_output_units=angle_output_units,
                                )
                            output[section][subsection] = subsection_content

                else:

                    # Build names dictionary - include self so conditions can access attributes
                    names = {
                        "verbose": verbose,
                        "fingerprint": fingerprint,
                        "out_of_range": getattr(self, "out_of_range", numpy_array([])),
                    }

                    # Does the condition evaluate to true ?
                    if condition is None or simple_eval(
                        condition,
                        names=names,
                    ):
                        section_content = {}  # Dictionary for that component

                        fields = section_definition.get("fields")
                        if fields:
                            section_content = parse_fields(
                                data_model=self,
                                model_definition=model_definition,
                                fields=fields,
                                angle_output_units=angle_output_units,
                            )
                        output[section] = section_content


    # Feed output to .output
    self.output = output

data_model

This module defines a factory to generate custom data classes (or else models) dynamically using Pydantic's BaseModel. It includes utilities for unit conversions (e.g., radians, degrees, timestamps), custom attribute handling, and validation of model fields. The DataModelFactory enables efficient creation of models with properties like solar incidence angles, coordinates, and time series data, allowing for flexible data representation and manipulation.

Key Features

  • Dynamic generation of data models with custom attributes.
  • Unit conversion utilities (e.g., degrees to radians, timestamps to hours).
  • Integration with NumPy for handling array-based fields.

Classes:

Name Description
DataModelFactory

DataModelFactory

definition

Modules:

Name Description
build
consolidate
helpers
inheritance
lists
load
merge
write

build

Functions:

Name Description
build_python_data_models

Aggregate multiple PVGIS-native data models into a single dictionary.

build_python_data_models
build_python_data_models(
    source_path: Path,
    yaml_files: List[str],
    verbose: bool = False,
) -> Dict[str, Any]

Aggregate multiple PVGIS-native data models into a single dictionary.

Source code in pvgisprototype/core/factory/definition/build.py
def build_python_data_models(source_path: Path, yaml_files: List[str], verbose: bool = False) -> Dict[str, Any]:
    """Aggregate multiple PVGIS-native data models into a single dictionary."""
    logger.info(
            "Building PVGIS-native Python data models",
            alt=f"Building PVGIS-native Python data models"
    )
    data_models = {}
    logger.debug(
            "PVGIS bases upon a series of native data models. "
            + "These are defined in YAML files located in the directory {source_path}."
            + "\n",
            source_path=source_path,
        alt=(
            f"[bold]PVGIS bases upon a series of native data models. [/bold]"
            + f"These are defined in YAML files located in the directory [code]{source_path}[/code]."
            + f"\n"
        )
    )
    logger.info(
        f"Reading YAML definitions in {source_path}",
        alt=f"[bold]Reading[/bold] YAML definitions in [code]{source_path}[/code]"
    )

    table = Table(box=None, show_header=False, show_edge=False, pad_edge=False)

    for yaml_file in track(yaml_files, description="Building data models...\n"):
        full_yaml_path = source_path / yaml_file
        data_model = load_data_model(source_path, full_yaml_path)
        table.add_row(f"- {next(iter(data_model))}")
        data_models.update(data_model)

    if verbose:
        Console().print(table)
        Console().print()

    return data_models

consolidate

Functions:

Name Description
finalize_output

Ensure output dict has required keys

load_data_model

Load and consolidate a YAML data model definition and return a nested

finalize_output
finalize_output(data: dict) -> dict

Ensure output dict has required keys

Source code in pvgisprototype/core/factory/definition/consolidate.py
def finalize_output(data: dict) -> dict:
    """Ensure output dict has required keys"""
    output = data.get('sections', {}).get('output', {})
    if isinstance(output, dict):
        output.setdefault('type', 'dict')
        output.setdefault('initial', {})
    return data
load_data_model
load_data_model(
    source_path: Path,
    data_model_yaml: Path,
    require: bool = False,
) -> Dict[str, Any]

Load and consolidate a YAML data model definition and return a nested dictionary compatible with DataModelFactory.

Source code in pvgisprototype/core/factory/definition/consolidate.py
def load_data_model(
    source_path: Path,
    data_model_yaml: Path,
    require: bool = False,
) -> Dict[str, Any]:
    """
    Load and consolidate a YAML data model definition and return a nested
    dictionary compatible with `DataModelFactory`.
    """
    # Load data model description
    data_model = load_yaml_file(data_model_yaml)
    data_model_name = data_model.get('name', '<unnamed data model>')
    if data_model_name == '<unnamed data model>':
        logger.warning(f"The data model {data_model} lacks a 'name' key!")

    log_data_model_loading(
        data_model=data_model,
        data_model_name=data_model_name,
        require=require,
    )

    # Resolve requirements
    data_model = resolve_requires(
        data=data_model,
        source_path=source_path,
    )
    finalize_output(data=data_model)

    # del(data_model['sections']['_file_path'])  # sane post-processing ?

    # logger.info(
    #     "Return consolidated data model :\n" + yaml.dump(data={data_model_name: data_model['sections']}, default_flow_style=False, sort_keys=False),
    #     alt="[dim]Return consolidated data model :[/dim]\n" + yaml.dump(data={data_model_name: data_model['sections']}, default_flow_style=False, sort_keys=False),
    # )

    # Return the consolidated data model
    return {data_model_name: data_model.get('sections', {})}

helpers

Functions:

Name Description
extract_structure_from_required

Extract the output structure list from the required file at

find_structure_in_path

Navigate nested dict to find structure at specified path.

get_structure

Retrieve the structure from the nested dictionary.

extract_structure_from_required
extract_structure_from_required(
    required_data: dict,
) -> List[dict]

Extract the output structure list from the required file at sections.output.structure.

Source code in pvgisprototype/core/factory/definition/helpers.py
def extract_structure_from_required(
    required_data: dict,
) -> List[dict]:
    """
    Extract the output structure list from the required file at
    `sections.output.structure`.
    """
    structure = []
    if 'sections' in required_data:
        output = required_data['sections'].get('output', {})
        if output:
            logger.debug(
                    f"   ! Identified an `output` structure !\n",
                    alt=f"   ! [blue bold]Identified an `output` structure ![/blue bold]\n",
                    )
        if 'structure' in output:
            structure = output['structure']
            required_data_model_name = required_data['name']
            yaml_dump_of_structure = yaml.dump(data=structure, sort_keys=False)
            logger.debug(
                "   Base output structure"
                + " in {required_data_model_name} :"
                + "\n\n   {yaml_dump_of_structure}\n",
                required_data_model_name=required_data_model_name,
                yaml_dump_of_structure=yaml_dump_of_structure,
                alt=f"   [dim][bold]Base[/bold] output structure[/dim]"
                + f" in {required_data_model_name} :"
                + f"\n\n   [dim]{yaml_dump_of_structure}[/dim]\n"
            )

    return structure
find_structure_in_path
find_structure_in_path(
    data: Dict, path: List[str]
) -> Union[List, None]

Navigate nested dict to find structure at specified path.

Source code in pvgisprototype/core/factory/definition/helpers.py
def find_structure_in_path(
    data: Dict,
    path: List[str],
) -> Union[List, None]:
    """
    Navigate nested dict to find structure at specified path.
    """
    data_model_name = data.get("name", "<unnamed data model>")
    if data_model_name == "<unnamed data model>":
        logger.warning(f"The data structure {data} lacks a 'name' key!")

    for part in path:
        if isinstance(data, dict) and part in data:
            data = data[part]
            logger.debug(
                "   Override output structure in {data_model_name} [Child]\n\n   {data}\n",
                data_model_name=data_model_name,
                data=data,
                alt=f"   [dim bold]Override output structure[/dim bold] in {data_model_name} [Child]\n\n   [dim]{data}[/dim]\n",
            )

        else:
            return None

    return data if isinstance(data, list) else None
get_structure
get_structure(data: Dict) -> List

Retrieve the structure from the nested dictionary. If it doesn't exist, return an empty list.

Source code in pvgisprototype/core/factory/definition/helpers.py
def get_structure(data: Dict) -> List:
    """
    Retrieve the structure from the nested dictionary.
    If it doesn't exist, return an empty list.
    """
    output_structure = data.get("sections", {}).get("output", {}).get("structure", [])
    data_model_name = data.get('name', '<unnamed data structure>')
    if output_structure:
        yaml_dump_of_structure = yaml.dump(data=output_structure, sort_keys=False)
        logger.debug(
            "   Child node output structure"
            + " in {data_model_name} :"
            + "\n\n   {yaml_dump_of_structure}\n",
            data_model_name=data_model_name,
            yaml_dump_of_structure=yaml_dump_of_structure,
            alt=f"   [dim][bold]Child node[/bold] output structure[/dim]"
            + f" in {data_model_name} :"
            + f"\n\n   [dim]{yaml_dump_of_structure}[/dim]\n"
        )
    else:
        logger.debug(
            "   Child node"
            + " in `{data_model_name}` has no output structure !",
            data_model_name=data_model_name,
            alt=f"   [dim][bold]Child node[/bold][/dim]"
            + f" in {data_model_name} has no output structure !"
        )

    logger.debug(
        f"Returning child node\n\n{data=}\n\noutput structure is\n\n{output_structure=}"
    )

    return output_structure

inheritance

Functions:

Name Description
resolve_requires

Process a dictionary data structure and resolve recursively its require

set_nested_value

Set a 'value' at a nested dictionary key, creating intermediate dictionaries as needed.

resolve_requires
resolve_requires(
    data: Dict,
    source_path: Path,
    resolved_files: Set | None = None,
    cache: Dict[str, Dict] | None = None,
) -> Union[Dict, List, Any]

Process a dictionary data structure and resolve recursively its require directives by merging the current data model (also referred to as the child node or override) into the required data (also referred to as the parent node or base).

The output is a new grand-child node which combines data attributes from a child node after inheriting data attributes from the parent node.

Notes

child node : the input data structure parent node : any required directive defined in the input data structure

Source code in pvgisprototype/core/factory/definition/inheritance.py
def resolve_requires(
    data: Dict,
    # data: Union[Dict, List, Any],
    source_path: Path,
    resolved_files: Set | None = None,
    cache: Dict[str, Dict] | None = None,
) -> Union[Dict, List, Any]:
    """
    Process a dictionary `data` structure and resolve recursively its `require`
    directives by merging the _current_ data model (also referred to as the
    `child node` or `override`) into the _required_ data (also referred to as
    the `parent node` or `base`).

    The output is a new grand-child node which combines data attributes from a
    child node after inheriting data attributes from the parent node.

    Notes
    -----
    child node : the input `data` structure
    parent node : any `required` directive defined in the input `data` structure

    """
    # logger.debug(
    #     f"Input data for which to resolve require directives is :\n\n {data=}\n"
    # )
    # A cache set to track resolved files and avoide circular dependencies
    if resolved_files is None:
        resolved_files = set()

    # A cache dictionary to store resolved data models (files) by file path
    if cache is None:
        cache = {}

    # Sort of a "base" case : unstructured data need no processing !
    if not isinstance(data, (dict, list)):
        logger.debug(
            "[Unstructured data, needs no processing]",
            alt=f"[dim]\\[Unstructured data, needs no processing][/dim]",
        )
        return data 

    if isinstance(data, dict):

        # Detect an already resolved path to avoid circular dependencies
        current_file_path = data.get('_file_path', None)
        if current_file_path:
            if current_file_path in resolved_files:
                logger.warning(
                    "Detected a circular dependency : file path {current_file_path} already resolved ! Skipping.-",
                    current_file_path=current_file_path,
                    alt=f"Detected a circular dependency : file path {current_file_path} already resolved ! Skipping.-",
                )
                return data  # Skip circular dependencies

            # Tracking the file path
            logger.info(
                "Tracking the file path {current_file_path}",
                current_file_path=current_file_path,
                alt=f"[yellow]Tracking the file path[/yellow] {current_file_path}",
            )
            resolved_files.add(current_file_path)
            cache[current_file_path] = data  # Cache current state

        # Deep copy to avoid mutation during iteration
        data = deepcopy(data)

        # Process top-level `require` directive
        if 'require' in data:

            # Handle missing 'name' key
            data_model_name = data.get('name', '<unnamed data model>')
            logger.debug(
                f"Identified require directives in `{data_model_name}`"
            )

            requires = data.pop("require")
            # The `require` directive may or may not list multiple items ?
            # If a single "item" (string?), make it a list
            requires_list = [requires] if not isinstance(requires, list) else requires
            require_directives = "- " + "\n   - ".join(requires_list)

            logger.debug(
                "   Parents for {data_model_name}\n\n   {require_directives}\n",
                data_model_name=data_model_name,
                require_directives=require_directives,
                alt=f"[dim]   Parents for [/dim][bold]{data_model_name}[/bold]\n\n   [yellow]{require_directives}[/yellow]\n",
            )
            if len(require_directives) > 0:
                logger.debug(
                    "   >>> Integrating required items >>> >>> >>>\n",
                    alt=f"   [dim]>>> Integrating required items >>> >>> >>>[/dim]\n",
                )

            merged_structure = []

            # Resolve recursively, merge sequentially via `reverse()` :
            # respect order so a later require can override an earlier one
            # for required_item in reversed(requires):
            for required_item in requires:  # Don't touch me ! Unless you really know what you are doing !

                #
                # Load and cache data model
                #

                # First, build the path to the require YAML file
                required_path = (source_path / required_item).with_suffix('.yaml')
                # required_path.is_file()

                # Next check if the file is already processed, thus cached
                if str(required_path) in cache:
                    logger.debug(
                        "Using cached parent data model definition\n\n{required_path}",
                        required_path=required_path,
                        alt=f"Using cached parent data model definition\n\n{required_path}"
                    )
                    required_data = cache[str(required_path)]

                else:
                    required_data = load_yaml_file(required_path)
                    logger.info(
                            f"Required data model\n\n{required_data=}\n"
                            )

                    if isinstance(required_data, dict):
                        required_data['_file_path'] = str(required_path)
                        logger.debug(
                            "Caching {required_path}",
                            required_path=required_path,
                            alt=f"[magenta]Caching {required_path}[/magenta]",
                        )
                        cache[str(required_path)] = required_data
                    else:
                        logger.warning(
                            "Cannot track required parent data model node\n\n{required_data}\n\nis a list !\n",
                            required_data=required_data,
                            alt=f"[bold]Cannot track[/bold] required parent data model node\n\n{required_data}\n\nis a list !\n",
                        )

                #
                # Resolve require directives
                #

                try:
                    # Recursively resolve base model
                    required_data = resolve_requires(
                        data=required_data, 
                        source_path=source_path,
                        resolved_files=resolved_files.copy(),
                        cache=cache,
                    )
                except Exception as exception:
                    logger.error(
                        "Failed to resolve required `parent` YAML data model definition :\n\n File path : {required_path}\n\nData : {required_data}\n\nSource path : {source_path}\n\nResolved files : {resolved_files}\n\nCache : {cache}\n\nError : {exception}",
                        required_path=required_path,
                        required_data=required_data,
                        source_path=source_path,
                        resolved_files=resolved_files,
                        cache=cache,
                        exception=exception,
                        alt=f"Failed to resolve required `parent` YAML file : {required_path}\n\n{required_data}\n\n{source_path}\n\n{resolved_files}\n\n{cache}\n\nError : {exception}",
                    )
                    # continue
                    raise ValueError(f"Error resolving YAML file {required_data}")

                #
                # Process output-structure from required_data if present
                #

                ## Extract the base output-structure (list) : parent node output structure

                base_structure = extract_structure_from_required(
                    required_data=required_data
                )

                if base_structure:

                    merged_structure = merge_structure_list(
                        # base_structure=merged_structure,
                        # override_structure=base_structure,
                        base_structure=base_structure,
                        override_structure=merged_structure,
                    )

                    ## Then get the child node output structure
                    structure_list = get_structure(data=data)
                    if structure_list:
                        merged_structure = merge_structure_list(
                            base_structure=merged_structure,
                            override_structure=structure_list,
                        )
                    logger.debug(f"Before inheriting from parent output structure, data is\n\n   {data=}")
                    # --------------------------------------------------------
                    if len(base_structure) == 1:
                        logger.debug(f"Base structure lists a single item\n\n   {data=}")
                        base_node = base_structure[0]
                        base_node.update(data)
                        data = base_node

                    # else:
                    #     set_nested_value(
                    #         data,
                    #         ["sections", "output", "structure"],
                    #         merged_structure,
                    #     )

                    # --------------------------------------------------------
                    # for parent_node in base_structure:
                    #     parent_node.update(data)
                    # --------------------------------------------------------

                    yaml_dump_of_structure = yaml.dump(data=data, sort_keys=False)
                    logger.debug(
                        "{data_model_name} after inheriting is"
                        + "\n\n{yaml_dump_of_structure}\n",
                        data_model_name=data_model_name,
                        yaml_dump_of_structure=yaml_dump_of_structure,
                        alt=f"   [dim][code]{data_model_name}[/code] after inheriting is[/dim]"
                        + f"\n\n   [dim]{yaml_dump_of_structure}[/dim]\n"
                    )

                else:
                    # Merge (non-output-structure templates) required_data (base) into current data (override)
                    # or else said : the "current" data['name']  overrides  the "base" required_data['name']
                    logger.debug(
                        f"No parent output structure found!"
                        +" Deep-merge non-output-structure dictionaries/lists templates!"
                    )
                    data = deep_merge(
                        base=required_data,
                        override=data,
                    )
                    yaml_dump_of_structure = yaml.dump(data=data, sort_keys=False)
                    logger.debug(
                        "{data_model_name} after deep-merging is"
                        + "\n\n{yaml_dump_of_structure}\n",
                        data_model_name=data_model_name,
                        yaml_dump_of_structure=yaml_dump_of_structure,
                        alt=f"   [dim][code]{data_model_name}[/code] after deep-merging is[/dim]"
                        + f"\n\n   [dim]{yaml_dump_of_structure}[/dim]\n"
                    )

            # Apply accumulated structure
            if merged_structure:
                logger.debug(
                    "More merging... !"
                )
                existing_structure = get_structure(data)
                final_structure = merge_structure_list(
                    existing_structure,
                    merged_structure, 
                )
                structure_list = get_structure(data=data)
                set_nested_value(data, ["sections", "output", "structure"], final_structure)

        # Recurse into nested keys
        # logger.info(
        #     "The data structure\n\n{data}\n\ndoes not contain any `require` directives. Recurse into nested keys.",
        #     data=data,
        #     alt=f"The data structure\n\n{data}\n\ndoes not contain any `require` directives. Recurse into nested keys.",
        # )
        logger.info(
            "Recurse into nested data keys\n\n{data_keys}\n",
            data_keys=data.keys(),
            alt=f"Recurse into nested data keys\n\n{data.keys()=}\n",
        )
        for key, value in data.items():
            logger.info(
                "Resolve `{key}`",
                key=key,
                alt=f"Resolve {key=}",
            )
            data[key] = resolve_requires(
                data=value,
                source_path=source_path,
                resolved_files=resolved_files.copy(),
                cache=cache,
            )
        yaml_dump_of_structure = yaml.dump(data=data, sort_keys=False)
        logger.debug(
            "Resolved data is"
            + "\n\n{yaml_dump_of_structure}\n",
            yaml_dump_of_structure=yaml_dump_of_structure,
            alt=f"[dim][code]Resolved data is[/code][/dim]"
            + f"\n\n{yaml_dump_of_structure}\n"
        )

        return data

    elif isinstance(data, list):
        # Recurse into each item in the list
        # for i, item in enumerate(data):
        #     data[i] = resolve_requires(item, source_path)
        return [
            resolve_requires(
                data=item,
                source_path=source_path,
                resolved_files=resolved_files.copy(),
                cache=cache,
            )
            for item in reversed(data)  # Don't touch me ! Unless you really know what you are doing !
        ]
set_nested_value
set_nested_value(data: dict, path: list, value: Any)

Set a 'value' at a nested dictionary key, creating intermediate dictionaries as needed. Ensures 'output' dicts always have 'type' and 'initial' keys.

Source code in pvgisprototype/core/factory/definition/inheritance.py
def set_nested_value(
    data: dict,
    path: list,
    value: Any,
    # set_type: bool = False,
    # set_initial: bool = False,
):
    """
    Set a 'value' at a nested dictionary key, creating intermediate dictionaries as needed.
    Ensures 'output' dicts always have 'type' and 'initial' keys.
    """
    if value == [data]:
        logger.debug(
            "Value == [Data] : Safety against self-nesting ! Skipping setting.-"
        )
        return data

    logger.debug(
        f"My job is to set the\n\n{value=}\n\nto\n\n{data=}"
    )
    current = data

    # Traverse to the parent of the final key
    for part in path[:-1]:
        if part not in current or not isinstance(current[part], dict):
            current[part] = {}
        current = current[part]

    final_key = path[-1]

    if isinstance(current.get(final_key), dict) and isinstance(value, dict):
        logger.debug(
            "Deep merging dictionaries\n\nvalue={value}\n\nand\n\ncurrent[final_key]={current_at_final_key}\n",
            value=value,
            current_at_final_key=current[final_key],
            alt=f"[green]Deep merging dictionaries[/green]\n\n{value=}\n\nand\n\n{current[final_key]=}\n"
        )
        current[final_key] = deep_merge(current[final_key], value)

    else:
        logger.debug(
            f"Setting\n\n{value=}\n\nto\n\n{data=} @ {final_key=}"
        )
        current[final_key] = value

    yaml_dump_of_structure = yaml.dump(data=data, sort_keys=False)
    logger.debug(
        "Updated `data` is"
        + "\n\n{yaml_dump_of_structure}\n",
        yaml_dump_of_structure=yaml_dump_of_structure,
        alt=f"[dim][code]Update data is[/code][/dim]"
        + f"\n\n{yaml_dump_of_structure}\n"
    )

lists

Functions:

Name Description
merge_structure_list

Merge output structures, including nested lists/dicts.

merge_structure_list
merge_structure_list(base_structure, override_structure)

Merge output structures, including nested lists/dicts.

Source code in pvgisprototype/core/factory/definition/lists.py
def merge_structure_list(
    base_structure,
    override_structure,
):
    """
    Merge output structures, including nested lists/dicts.
    """
    if not override_structure:
        # Case 2: Child is placeholder - inherit full parent structure
        logger.debug(
            f"No output structure in the child node -- inheriting parent node entirely!\n\n{base_structure=}\n"
        )
        return deepcopy(base_structure)

    # Case 1: Merge structures and assign back
    logger.debug(
        "/ Merging child output structure into the parent output structure",
        alt=f"/ Merging child output structure into the parent output structure",
    )

    # Create a map of parent sections for quick lookup
    parent_sections = {
        # item["section"]: item for item in base_structure if "section" in item
        item.get("section"): item
        for item in base_structure
    }
    logger.debug(
        f"A map of `sections` in the parent node\n\n{parent_sections=}\n",
        # alt=f"A map of `sections` in the parent node\n\n{parent_sections}",
    )
    merged = []

    # First process all child items
    for child_item in override_structure:
        section_name = child_item.get("section")

        if section_name in parent_sections:

            # Retrieve parent item
            parent_item = parent_sections.pop(section_name)
            logger.debug(
                f"Matching child section\n\n{section_name=}\n\n   {child_item=}\n\nin parent node is\n\n   {parent_item=}\n"
            )

            # Remove require directive if present
            if "require" in child_item:
                logger.debug(f"Poping the require directive")
                child_item.pop("require", None)

            # Create merged item with parent as base
            logger.debug(f"Inheriting from parent node")
            merged_item = deep_merge(parent_item, child_item)  # deepcopy

            # Remove template metadata from the final structure item
            for key in TEMPLATE_METADATA_KEYS:
                merged_item.pop(key, None)

            logger.debug(f"Updated child node\n\n{merged_item=}\n")
            merged.append(merged_item)
            # logger.debug(f"Updated output structure\n\n{merged=}\n")

        else:
            logger.debug(
                f"No matching child section `{section_name=}` in parent node."
                + f"Preserve existing child item\n\n   {child_item=}\n"
            )
            # merged.append(deepcopy(child_item))
            # Clean child item before adding
            clean_child = {k: v for k, v in child_item.items() 
                          if k not in TEMPLATE_METADATA_KEYS}
            merged.append(clean_child)

    # Add remaining parent items not overridden by child
    # merged.extend(parent_sections.values())
    # merged.extend(deepcopy(item) for item in parent_sections.values())

    # Add remaining parent items (also cleaned)
    for parent_item in parent_sections.values():
        clean_parent = {k: v for k, v in parent_item.items() 
                       if k not in TEMPLATE_METADATA_KEYS}
        merged.append(clean_parent)

    logger.debug(f"Updated output structure\n\n{merged=}\n")

    return merged

load

Functions:

Name Description
load_yaml_file

Load a data model definition from a properly structured YAML file into a

load_yaml_file
load_yaml_file(file_path: Path) -> Dict[str, Any]

Load a data model definition from a properly structured YAML file into a Python dictionary.

Source code in pvgisprototype/core/factory/definition/load.py
def load_yaml_file(file_path: Path) -> Dict[str, Any]:
    """
    Load a data model definition from a properly structured YAML file into a
    Python dictionary.
    """
    try:
        logger.debug(
            "Loading {file_path}...",
            file_path=file_path,
            alt=f"Loading [bold]{file_path.as_posix()}[/bold]...",
        )
        with open(file_path, "r") as yaml_file:
            return yaml.safe_load(yaml_file)

    except FileNotFoundError:
        raise FileNotFoundError(f"File not found: {file_path}")

    except yaml.YAMLError as e:
        raise ValueError(f"Error parsing YAML file {file_path}: {e}")

merge

Functions:

Name Description
deep_merge

Recursively merge two dictionaries or lists without overwriting.

merge_dictionaries

Recursively merge two dictionaries.

merge_lists

Merges two lists, ensuring no duplicates.

deep_merge
deep_merge(base, override)

Recursively merge two dictionaries or lists without overwriting. - Dicts: merged recursively. - Lists of dicts: merged by first common key (section, subsection, id, name), else append unique. - Lists of other types: append unique items.

Source code in pvgisprototype/core/factory/definition/merge.py
def deep_merge(base, override):
    """
    Recursively merge two dictionaries or lists without overwriting.
    - Dicts: merged recursively.
    - Lists of dicts: merged by first common key (section, subsection, id, name), else append unique.
    - Lists of other types: append unique items.
    """
    if isinstance(base, dict) and isinstance(override, dict):
        merged = base.copy()
        for key, value in override.items():
            if key in merged:
                merged[key] = deep_merge(merged[key], value)
            else:
                merged[key] = value

        return merged


    elif isinstance(base, list) and isinstance(override, list):
        #
        # return base + [item for item in override if item not in base]
        #
        # Merge lists of dicts by identifier key
        if all(isinstance(item, dict) for item in base + override):
            id_keys = ['subsection', 'section', 'id', 'name']
            identifier = next((k for k in id_keys if k in (base[0] if base else {})), None)
            if identifier:
                base_map = {item[identifier]: item for item in base if identifier in item}
                override_map = {item[identifier]: item for item in override if identifier in item}
                merged = []
                seen_ids = set()
                for item in base:
                    item_id = item.get(identifier)
                    if item_id in override_map:
                        merged.append(deep_merge(item, override_map[item_id]))
                        seen_ids.add(item_id)
                    else:
                        merged.append(item.copy())
                for item in override:
                    item_id = item.get(identifier)
                    if item_id not in seen_ids:
                        merged.append(item.copy())
                return merged
        # Fallback: append unique items
        merged = base.copy()
        for item in override:
            if item not in merged:
                merged.append(item)
        return merged
    else:
        return override
merge_dictionaries
merge_dictionaries(
    base: Dict[str, Any] | List[Any] | Any,
    override: Dict[str, Any] | None,
) -> Dict[str, Any]

Recursively merge two dictionaries. Values in override will overwrite those in base if keys match.

Source code in pvgisprototype/core/factory/definition/merge.py
def merge_dictionaries(
    base: Dict[str, Any] | List[Any] | Any,
    override: Dict[str, Any] | None,
) -> Dict[str, Any]:
    """
    Recursively merge two dictionaries.
    Values in `override` will overwrite those in `base` if keys match.
    """
    logger.info(
        "Input data is\n\n{base}\n\nand\n\n{override}\n",
        base=base,
        override=override,
        alt=f"[code]Input data is[/code]\n\n{base=}\n\nand\n\n{override=}\n",
    )

    # if "name" in base and not isinstance(base["name"], dict):
    #     base_data_model_name = base["name"]
    #     # logger.debug(
    #     #     "/ Processing {base_data_model_name} [Parent]",
    #     #     base_data_model_name=base_data_model_name,
    #     #     alt=f"[dim]/ Processing [bold]{base_data_model_name}[/bold] [Parent][/dim]",
    #     # )
    #     log_action(
    #         action="/ Processing",
    #         action_style="",
    #         object_name=base_data_model_name,
    #         details="[Parent data model]",
    #     )

    if override is None:
        logger.info(
                "Override is None, returning the base dictionary!",
                alt="[orange]Override is None, returning the base dictionary![/orange]"
                )
        return base if base else {}

    if isinstance(base, dict):
        logger.debug(
            "Merging dictionaries",
            alt=f"[dim]Merging[/dim] {base.get('name', '<unnamed base>')}{override.get('name', '<unnamed override>')}"
        )

    merged = deepcopy(base) if isinstance(base, (dict, list)) else base
    # merged = base.copy() if isinstance(base, (dict, list)) else base

    for override_key, override_value in override.items():
        log_node(
                node_type='Child',
                key=override_key,
                value=override_value,
        )

        # ----------------------------------------------
        if (
            override_value
            and isinstance(override_value, dict)
            and "name" in override_value
            and not isinstance(override_value['name'], dict)
        ):
            override_value_name = override_value["name"]
        else:
            override_value_name = ''
        # ----------------------------------------------

        if override_key in merged:
            base_value = merged[override_key]
            log_node(
                node_type='Child',  # the child key actually
                key=override_key,
                value=base_value,
                state_message="exists in Parent [will inherit]",
            )

            # ----------------------------------------------
            if (
                base_value
                and isinstance(base_value, dict)
                and "name" in base_value
                and not isinstance(base_value['name'], dict)
            ):
                base_value_name = base_value["name"]
            else:
                base_value_name = '<unnamed base value>'
            # ----------------------------------------------

            if isinstance(override_value, dict) and isinstance(base_value, dict):

                yaml_dump_of_merged = yaml.dump(data=merged, sort_keys=False)
                log_action(
                    action="Before merging dictionaries",
                    action_style="dim yellow",
                    object_name=f"{base_value_name}, {override_value_name}",
                    details=yaml_dump_of_merged,
                )

                try:
                    # Recursively merge nested dictionaries
                    merged[override_key] = merge_dictionaries(
                        base=base_value,
                        override=override_value,
                    )
                except Exception as exception:
                    logger.error(
                        "Error merging dictionaries for key `{override_key}` : {exception}",
                        override_key=override_key,
                        exception=exception,
                        alt=f"Error merging dictionaries for key [bold]{override_key}[/bold] : {exception}",
                    )
                    raise

                yaml_dump_of_merged = yaml.dump(data=merged, sort_keys=False)
                log_action(
                    action="After merging dictionaries",
                    action_style="yellow",
                    object_name='',  # if 'name' in merged else ''  ?
                    details=yaml_dump_of_merged,
                )

            elif isinstance(override_value, list) and isinstance(base_value, list):
                # logger.debug("", alt=f"[blue]Before list :[/blue]\n{yaml.dump(merged)}")
                yaml_dump_of_merged = yaml.dump(data=merged, sort_keys=False)
                log_action(
                    action="Before merging lists",
                    action_style="dim blue",
                    object_name=f"{base_value_name} into {override_value_name}",
                    details=yaml_dump_of_merged,
                )
                base_list = base_value
                try:
                    merged[override_key] = merge_lists(
                        base_list=base_list,
                        override_list=override_value,
                    )
                except Exception as e:
                    logger.error(f"Error merging lists for key {override_key} : {e}")
                    raise

                yaml_dump_of_merged = yaml.dump(data=merged, sort_keys=False)
                log_action(
                    action="After merging lists",
                    action_style="bold blue",
                    object_name='',  # if 'name' in merged else ''  ?
                    details=yaml_dump_of_merged,
                )

            else:
                log_action(
                        action="No dictionaries or lists, hence overwriting", # alt=f"[red]No dictionaries or lists[/red], hence [underline]overwriting[/underline] :
                        action_style='bold red',
                        object_name=override_key,
                        details=f"{override_key} = {override_value}",  # [code]{override_key}[/code] = [bold]{override_value}[/bold]\n",
                        )
                merged[override_key] = override_value
                yaml_dump_of_merged = yaml.dump(data=merged, sort_keys=False)
                log_action(
                    action="After direct assignment",
                    action_style="bold red",
                    object_name=f"{override_key} = see: `override_value`",
                    details=yaml_dump_of_merged,
                )

        else:
            log_node(
                node_type='Child',  # the child key actually
                key=override_key,
                state_message="does not exist in Parent !",
                message_style='red'
            )
            log_action(
                    action=f"Adding",
                    action_style='green',
                    object_name=override_key,
                    details=f'{override_key} = {override_value}',
                    )
            merged[override_key] = deepcopy(override_value)
            # merged[override_key] = override_value
            yaml_dump_of_merged = yaml.dump(merged, sort_keys=False)
            log_action(
                action="After adding",
                action_style="magenta",
                object_name=f"{override_key}",
                details=yaml_dump_of_merged,
            )

    # Special handling for nested sections
    if 'sections' in merged and 'sections' in override:
        for section_key, section_value in override['sections'].items():
            if section_key in merged['sections']:
                merged['sections'][section_key] = merge_dictionaries(
                    base=merged['sections'][section_key],
                    override=section_value,
                )
            else:
                merged['sections'][section_key] = deepcopy(section_value)

    yaml_dump_of_merged = yaml.dump(data=merged, sort_keys=False)

    if 'name' in merged and not isinstance(merged['name'], dict):
        action = "consolidated"
        action_style = 'bold green'
        merged_data_model_name = merged['name']
    else:
        action = "partially consolidated"
        action_style = ''
        merged_data_model_name = ''

    log_action(
            action=f"Return {action}",
            action_style=action_style,
            object_name=merged_data_model_name,
            details=yaml_dump_of_merged,
            )

    return merged
merge_lists
merge_lists(base_list: List, override_list: List) -> List

Merges two lists, ensuring no duplicates. If items are dicts, uses section, name, or id for deduplication.

Source code in pvgisprototype/core/factory/definition/merge.py
def merge_lists(
    base_list: List,
    override_list: List,
) -> List:
    """
    Merges two lists, ensuring no duplicates.
    If items are dicts, uses `section`, `name`, or `id` for deduplication.
    """
    log_action(
        action="/ Merging `override` list into `base`",
        action_style='',
        object_name='a pair of Lists',
        details="[Parent data model]",
    )

    # if base_list is None:
    #     return deepcopy(override_list) if override_list else []

    # if override_list is None:
    #     return base_list

    # merged = deepcopy(base_list)
    merged = base_list.copy()

    for item in reversed(override_list):

        if isinstance(item, dict):
            match_key = next((identifier for identifier in ("section", "name", "id") if identifier in item), None)

            if match_key:
                match = next((key for key in merged if isinstance(key, dict) and key.get(match_key) == item.get(match_key)), None)

                if match:
                    merged[merged.index(match)] = merge_dictionaries(base=match, override=item)

                else:
                    merged.append(item)

            elif item not in merged:
                merged.append(item)

        elif item not in merged:
            merged.append(item)

    yaml_dump_of_merged = yaml.dump(data=merged, sort_keys=False)
    log_action(
            action=f"Return merge list",
            action_style='underline',
            object_name='',
            details=yaml_dump_of_merged,
            )

    return merged

write

Functions:

Name Description
reset_python_data_model_definitions

Reset to empty dictionary !

write_to_python_module

Write aggregated models to a Python module as a dictionary.

reset_python_data_model_definitions
reset_python_data_model_definitions(
    output_file: Path, verbose: bool = False
) -> None

Reset to empty dictionary !

Source code in pvgisprototype/core/factory/definition/write.py
def reset_python_data_model_definitions(
    output_file: Path,
    verbose: bool = False,
) -> None:
    """Reset to empty dictionary !"""
    if verbose:
        logger.info("", alt=f"[bold]Reseting[/bold] [code]{output_file}[/code] to an empty dictionary")
    write_to_python_module(models={}, output_file=output_file)
write_to_python_module
write_to_python_module(
    models: Dict[str, Any],
    output_file: Path,
    verbose: bool = False,
) -> None

Write aggregated models to a Python module as a dictionary.

Source code in pvgisprototype/core/factory/definition/write.py
def write_to_python_module(
    models: Dict[str, Any],
    output_file: Path,
    verbose: bool = False,
) -> None:
    """Write aggregated models to a Python module as a dictionary."""
    try:
        content = (
            f"# Custom data model definitions\n\n"
            f"PVGIS_DATA_MODEL_DEFINITIONS = {models}\n"
        )
        if verbose and models:
            logger.info("", alt=f"[bold]Writing[/bold] to [code]{output_file}[/code]")

        with open(output_file, "w") as python_module:
            python_module.write(content)

    except IOError as e:
        print(f"Error writing to file '{output_file}' : {e}")

    logger.debug("", alt=f"Python data model definitions written to [code]{output_file}[/code]")

log

Set FACTORY_LOG_FILE before running a script that uses this logger to log out everything to a file !

Functions:

Name Description
log_action
log_data_model_loading
log_node
setup_factory_logger

Set up a clean logger for the definition factory.

log_action

log_action(
    action: str,
    action_style: str,
    object_name: str,
    details: str | None = None,
)
Source code in pvgisprototype/core/factory/log.py
def log_action(
    action: str,
    action_style: str,
    object_name: str,
    details: str | None = None,
):
    """
    """
    action_style = 'dim' + f' {action_style}'
    details = f"\n\n{details=}\n"
    logger.info(
        "{action} {object_name} {details}",
        action=action,
        object_name=object_name,
        details=details,
        # extra={'object_name': data_model_name, 'details': details},
        alt=f"[{action_style}]{action}[/{action_style}] [bold]{object_name=}[/bold]{details}"
    )

log_data_model_loading

log_data_model_loading(
    data_model, data_model_name, require: bool = False
)
Source code in pvgisprototype/core/factory/log.py
def log_data_model_loading(
        data_model,
        data_model_name,
        require: bool = False,
        ):
    """
    """
    if not require:
        logger.debug(
            "Processing data model {data_model_name}",
            data_model_name=data_model_name,
            alt=f"[dim]Processing data model [code]{data_model_name}[/code] :[/dim]\n\n{yaml.dump(data_model, sort_keys=False)}",
        )
    else:
        logger.debug(
             "Require data model :\n{yaml.dump(data_model, default_flow_style=False, sort_keys=False,)}",
             data_model=data_model,
             alt=f"Require data model :\n[bold]{yaml.dump(data_model, default_flow_style=False, sort_keys=False,)}[/bold]"
        )

log_node

log_node(
    node_type: str,
    key: str | int,
    value: Dict | List | None = None,
    state_message: str | None = "",
    message_style: str | None = "",
)
Source code in pvgisprototype/core/factory/log.py
def log_node(
    node_type: str,
    key: str | int,
    value: Dict | List | None = None,
    state_message: str | None = '',
    message_style: str | None = '',
    ):
    """
    """
    message_style_open = f"[{message_style}]" if message_style else ''
    message_style_close = f"[/{message_style}]" if message_style else ''
    value_type = '| ' + str(type(value)) if value else ''
    value = f"\n\n   {value}\n" if value else ''

    logger.debug(
        "{node_type} key {key} {state_message} {value_type} {value}",
        node_type=node_type,
        key=key,
        state_message=state_message,
        value_type=value_type,
        value=value,
        alt=f"[dim]{node_type} key[/dim] [bold]{key}[/bold] {message_style_open}{state_message}{message_style_close} [bold]{value_type}[/bold] {value}",
    )

setup_factory_logger

setup_factory_logger(
    verbose: bool = False,
    level: str = "WARNING",
    format: str = LOG_FORMAT,
    file: str | Path | None = None,
    rich_handler: bool = False,
)

Set up a clean logger for the definition factory.

Parameters:

Name Type Description Default
verbose bool

If True, sets the logging level to DEBUG and shows logs in the console.

False
level str

Log level

'WARNING'
file str | Path | None

The file path to log to. If None, logging will be to console.

None
rich_handler bool

If True, enables rich formatting for console output.

False
Source code in pvgisprototype/core/factory/log.py
def setup_factory_logger(
    verbose: bool = False,
    level: str = "WARNING",
    format: str = LOG_FORMAT,
    file: str | Path | None = None,
    rich_handler: bool = False,
):
    """
    Set up a clean logger for the definition factory.

    Parameters
    ----------
    verbose : (bool)
        If True, sets the logging level to DEBUG and shows logs in the console.

    level: str
        Log level

    file : str | Path | None
        The file path to log to. If None, logging will be to console.

    rich_handler : bool
        If True, enables rich formatting for console output.
    """
    logger.remove()  # Remove any existing handlers

    if rich_handler:
        import richuru
        richuru.install(level=level, rich_traceback=True)
        logger.debug(f"Installed richuru")

    level = 'DEBUG' if verbose else level
    if verbose:
        logger.debug(
            "Logging directed to `sys.stderr`",
            alt=f"Logging directed to `sys.stderr`",
        )
        logger.add(
            sink=sys.stderr,
            level=level,
            format=format,
            backtrace=False,
            diagnose=False,
        )

    if file:
        logger.info(
            "Logging to file {file}",
            file=file,
            alt=f"Logging to file {file}",
        )
        logger.add(
                sink=file,
                level=level,
                format=format,
                backtrace=True,
                diagnose=True,
        )

    logger.info("Factory logger initialized")

property_functions

Attention

The 'module'

from pvgisprototype.core.data_model.definitions import PVGIS_DATA_MODEL_DEFINITIONS

needs to pre-exist, at the current setup, even for its "own" generation via the script

pvgisprototype/core/data_model/generate_definitions.py !

Just create one, in case, even with an empty dictionary names PVGIS_DATA_MODEL_DEFINITIONS. Then run the generation script, simply via

``` python generate_definitions.py ````

Functions:

Name Description
as_hours_property

Instance property to convert to hours

as_minutes_property

Instance property to convert to minutes

datetime_property

Instance property to convert to datetime

degrees_property

Instance property to convert to degrees.

get_model_definition

Retrieve the definition of a model from the global definitions.

radians_property

Instance property to convert to radians

timedelta_property

Instance property to convert to timedelta

timestamp_property

Instance property to convert to time (timestamp)

as_hours_property

as_hours_property(self) -> float | NpNDArray | None

Instance property to convert to hours

Source code in pvgisprototype/core/factory/property_functions.py
def as_hours_property(self) -> float | NpNDArray | None:
    """Instance property to convert to hours"""
    if self.unit == "hours":
        return self.value
    elif self.unit == "timestamp":
        return _timestamp_to_hours(self.value)
    else:
        return None

as_minutes_property

as_minutes_property(self) -> float | NpNDArray | None

Instance property to convert to minutes

Source code in pvgisprototype/core/factory/property_functions.py
def as_minutes_property(self) -> float | NpNDArray | None:
    """Instance property to convert to minutes"""
    if self.unit == "minutes":
        value = self.value
    elif self.unit == "datetime":
        value = (
            self.value.hour * 3600 + self.value.minute * 60 + self.value.second
        ) / 60
    elif self.unit == "timestamp":
        value = _timestamp_to_minutes(self.value)
    elif self.unit == "timedelta":
        value = self.value.total_seconds() / 60
    elif self.unit == RADIANS:
        value = _radians_to_minutes(self.value)
    elif self.unit == DEGREES:
        value = _degrees_to_minutes(self.value)
    else:
        value = None
    return value

datetime_property

datetime_property(self)

Instance property to convert to datetime

Source code in pvgisprototype/core/factory/property_functions.py
def datetime_property(self):
    """Instance property to convert to datetime"""
    if self.unit == "datetime":
        return self.value
    else:
        return None

degrees_property

degrees_property(self) -> float | NpNDArray | None

Instance property to convert to degrees.

Notes

How to restrict to angular data models ?

Source code in pvgisprototype/core/factory/property_functions.py
def degrees_property(self) -> float | NpNDArray | None:
    """
    Instance property to convert to degrees.

    Notes
    -----

    How to restrict to angular data models ?
    """
    # # Only proceed if this model is actually an angle
    # if self.unit not in [DEGREES, RADIANS]:
    #     return None

    if self.value is None:
        return None

    if self.unit == DEGREES:
        return self.value

    elif self.unit == RADIANS:
        if isinstance(self.value, (int, float)):
            from math import degrees
            return degrees(self.value)

        if isinstance(self.value, numpy.ndarray):
            return numpy.degrees(self.value)

    return None  # Keep Me !

get_model_definition

get_model_definition(self) -> Dict

Retrieve the definition of a model from the global definitions.

Source code in pvgisprototype/core/factory/property_functions.py
def get_model_definition(self) -> Dict:
    """Retrieve the definition of a model from the global definitions."""
    if self.data_model_name not in PVGIS_DATA_MODEL_DEFINITIONS:
        raise ValueError(f"No definition found for model: {self.data_model_name}")
    return PVGIS_DATA_MODEL_DEFINITIONS[self.data_model_name]

radians_property

radians_property(self) -> float | NpNDArray | None

Instance property to convert to radians

Source code in pvgisprototype/core/factory/property_functions.py
def radians_property(self) -> float | NpNDArray | None:
    """Instance property to convert to radians"""
    # # Only proceed if this model is actually an angle
    # if self.unit not in [DEGREES, RADIANS]:
    #     return None

    if self.value is None:
        return None

    if self.unit == RADIANS:
        return self.value

    elif self.unit == DEGREES:
        if isinstance(self.value, (int, float)):
            from math import radians
            return radians(self.value)

        if isinstance(self.value, numpy.ndarray):
            return numpy.radians(self.value)

    return None  # Keep Me !

timedelta_property

timedelta_property(
    self,
) -> Timedelta | TimedeltaIndex | None

Instance property to convert to timedelta

Source code in pvgisprototype/core/factory/property_functions.py
def timedelta_property(self) -> Timedelta | TimedeltaIndex | None:
    """Instance property to convert to timedelta"""
    if self.unit == RADIANS:
        return _radians_to_timedelta(self.value)
    elif self.unit == DEGREES:
        return _degrees_to_timedelta(self.value)
    elif self.unit == "timedelta":
        return self.value
    else:
        return None

timestamp_property

timestamp_property(self) -> Timestamp | None

Instance property to convert to time (timestamp)

Source code in pvgisprototype/core/factory/property_functions.py
def timestamp_property(self) -> Timestamp | None:
    """Instance property to convert to time (timestamp)"""
    if self.unit == "timestamp":
        return self.value
    else:
        return None

hashing

Functions:

Name Description
convert_numpy_to_json_serializable

Convert numpy arrays and other non-serializable objects to JSON-compatible types.

convert_numpy_to_json_serializable

convert_numpy_to_json_serializable(obj: Any) -> Any

Convert numpy arrays and other non-serializable objects to JSON-compatible types.

Source code in pvgisprototype/core/hashing.py
def convert_numpy_to_json_serializable(obj: Any) -> Any:
    """
    Convert numpy arrays and other non-serializable objects to JSON-compatible types.
    """
    if isinstance(obj, Enum):
        return str(obj.name)
    if isinstance(obj, np.ndarray):
        return obj.tolist()
    elif isinstance(obj, (np.integer, np.int64, np.int32, np.int16, np.int8)):
        return int(obj)
    elif isinstance(obj, (np.floating, np.float64, np.float32, np.float16)):
        return float(obj)
    elif isinstance(obj, np.bool_):
        return bool(obj)
    elif isinstance(obj, set):
        return [convert_numpy_to_json_serializable(item) for item in obj]  # Convert set to list while recursively convert its items
    elif isinstance(obj, dict):
        return {k: convert_numpy_to_json_serializable(v) for k, v in obj.items()}
    elif isinstance(obj, (list, tuple)):
        return [convert_numpy_to_json_serializable(item) for item in obj]
    else:
        return obj