Core Engineering Components¶
The core module contains the foundational engineering infrastructure that powers PVGIS. While algorithms implement solar science and the core API and [CLI][cli.md] provide user-facing interfaces, the core module handles essential cross-cutting concerns like data model generation, caching, hashing, and context management.
What's in Core?¶
The core module includes:
- Data Model Factory: Dynamic generation of Pydantic models from YAML definitions
- Context Builder: Structured output generation and verbosity management
- Caching System: Performance optimization through memoization
- Hashing Utilities: Data fingerprinting for reproducibility and cache keys
- Type Definitions: Shared type hints and validation schemas
These components operate behind the scenes, ensuring type safety, performance, and maintainability across the entire PVGIS codebase.
Design Philosophy¶
Core components follow these principles:
- Separation of concerns: Scientific domain logic (algorithms) stays independent from the infrastructure (core) and interfaces (API, CLI, Web API)
- Type safety: Pydantic validation catches errors before calculations run
- Performance: Caching and efficient data structures minimize redundant computation
- Extensibility: Factory patterns enable adding new models without code duplication
Key Components¶
Data Model Factory¶
Transforms YAML definitions into runtime Pydantic classes, enabling:
- Centralized model definitions maintained by domain experts
- Automatic validation of calculation inputs and outputs
- Consistent structure across API, CLI, and Web API interfaces
See Data Model for detailed documentation.
Context Builder¶
Manages output generation based on verbosity levels and user requirements:
- Reads output structure definitions from YAML
- Evaluates conditional sections (e.g., metadata only at high verbosity)
- Constructs nested dictionaries ready for JSON/CSV/terminal output
Caching and Hashing¶
Optimizes repeated calculations through:
- Content-based caching: Hash inputs to detect identical calculations
- Memory management: Configurable cache sizes and eviction policies
- Reproducibility: Fingerprints enable tracking data provenance
Usage Pattern¶
Core components are typically not imported directly by users. Instead, they're used internally by API functions:
User calls API function
Behind the scenes:
- DataModelFactory creates SolarPosition model
- Calculation runs and validates output
- ContextBuilder generates structured result
- Caching stores result for future identical requests
Source Code Reference¶
core ¶
Modules:
| Name | Description |
|---|---|
array_methods | |
arrays | |
caching | |
data_model | |
factory | |
hashing | |
array_methods ¶
Functions:
| Name | Description |
|---|---|
create_array_method | Helper function to create an instance with an empty array |
fill_array_method | Helper function to create an instance with an empty array |
create_array_method ¶
create_array_method(
self,
shape,
dtype: str = DATA_TYPE_DEFAULT,
init_method: bool | int | float | str = "zeros",
backend: str = "numpy",
use_gpu: bool = False,
) -> Any
Helper function to create an instance with an empty array
Source code in pvgisprototype/core/array_methods.py
def create_array_method(
self,
shape,
dtype: str = DATA_TYPE_DEFAULT,
init_method: bool | int | float | str = "zeros",
backend: str = "numpy",
use_gpu: bool = False,
) -> Any:
"""Helper function to create an instance with an empty array"""
return self(
value=create_array(
shape=shape,
dtype=dtype,
init_method=init_method,
backend=backend,
use_gpu=use_gpu,
)
)
fill_array_method ¶
fill_array_method(
self,
shape,
dtype: str = DATA_TYPE_DEFAULT,
init_method: bool | int | float | str = "zeros",
backend: str = "numpy",
use_gpu: bool = False,
) -> Any
Helper function to create an instance with an empty array
Source code in pvgisprototype/core/array_methods.py
def fill_array_method(
# cl_ss,
self,
shape,
dtype: str = DATA_TYPE_DEFAULT,
init_method: bool | int | float | str = "zeros",
backend: str = "numpy",
use_gpu: bool = False,
) -> Any:
"""Helper function to create an instance with an empty array"""
# return cl_ss(
# value=create_array(
# shape=shape,
# dtype=dtype,
# init_method=init_method,
# backend=backend,
# use_gpu=use_gpu,
# )
# )
self.value = create_array(
shape=shape,
dtype=dtype,
init_method=init_method,
backend=backend,
use_gpu=use_gpu,
)
arrays ¶
Classes:
| Name | Description |
|---|---|
ArrayDType | |
NDArrayBackend | Supported dense array backends. |
Functions:
| Name | Description |
|---|---|
create_array | Create an array with given shape, data type, initialization method, backend, and optional GPU usage. |
ArrayDType ¶
Bases: Enum
Methods:
| Name | Description |
|---|---|
from_string | Return the corresponding dtype object from a string. |
from_string classmethod ¶
Return the corresponding dtype object from a string.
NDArrayBackend ¶
Bases: Enum
Supported dense array backends.
Methods:
| Name | Description |
|---|---|
default | Return the default array backend. |
from_gpu_flag | Select array backend based on whether GPU is used. |
from_object | Determine the array backend associated with |
module | Return the Python module associated with an array backend. |
type | Return the array type associated with the backend. |
default classmethod ¶
default() -> NDArrayBackend
from_gpu_flag classmethod ¶
from_gpu_flag(use_gpu: bool) -> NDArrayBackend
from_object classmethod ¶
from_object(obj) -> NDArrayBackend
Determine the array backend associated with obj.
Source code in pvgisprototype/core/arrays.py
module ¶
Return the Python module associated with an array backend.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
linear_algebra | bool | If True, return the linear algebra submodule. | False |
Source code in pvgisprototype/core/arrays.py
def module(self, linear_algebra: bool = False) -> types.ModuleType:
"""
Return the Python module associated with an array backend.
Parameters
----------
linear_algebra: bool
If True, return the linear algebra submodule.
"""
if self == NDArrayBackend.NUMPY:
module = numpy
linalg_module = module.linalg
elif self == NDArrayBackend.DASK:
module = dask.array
linalg_module = module.linalg
elif self == NDArrayBackend.CUPY and CUPY_ENABLED:
module = cupy
linalg_module = module.linalg if module is not None else None
else:
raise ValueError(f"No known module for {self.name}.")
return linalg_module if linear_algebra else module
type ¶
type() -> type
Return the array type associated with the backend.
Source code in pvgisprototype/core/arrays.py
def type(self) -> type:
"""Return the array type associated with the backend."""
if self == NDArrayBackend.NUMPY:
return numpy.ndarray
elif self == NDArrayBackend.DASK:
import dask.array
return dask.array.core.Array
elif self == NDArrayBackend.CUPY and CUPY_ENABLED:
return cupy.ndarray
else:
raise ValueError(f"No known array type for {self.name}.")
create_array ¶
create_array(
shape,
dtype: str = DATA_TYPE_DEFAULT,
init_method: bool | int | float | str = "zeros",
backend: str = "numpy",
use_gpu: bool = False,
)
Create an array with given shape, data type, initialization method, backend, and optional GPU usage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
shape | tuple | Shape of the array. | required |
dtype | str | Desired data-type for the array as a string. Default is 'float32'. | DATA_TYPE_DEFAULT |
init_method | str | Method to initialize the array. Options are 'zeros', 'ones', 'empty', and 'unset'. Default is 'zeros'. | 'zeros' |
backend | str | The array backend to use. Options are 'numpy', 'cupy', and 'dask'. Default is 'numpy'. | 'numpy' |
use_gpu | bool | If True, use GPU-accelerated arrays (CuPy) if available, overriding the backend choice. Default is False. | False |
Returns:
| Type | Description |
|---|---|
ndarray: An array initialized as specified. | |
Source code in pvgisprototype/core/arrays.py
def create_array(
shape,
dtype: str = DATA_TYPE_DEFAULT,
init_method: bool | int | float | str = "zeros",
backend: str = "numpy",
use_gpu: bool = False,
):
"""
Create an array with given shape, data type, initialization method, backend, and optional GPU usage.
Parameters
----------
shape : tuple
Shape of the array.
dtype : str, optional
Desired data-type for the array as a string. Default is 'float32'.
init_method : str, optional
Method to initialize the array. Options are 'zeros', 'ones', 'empty', and 'unset'. Default is 'zeros'.
backend : str, optional
The array backend to use. Options are 'numpy', 'cupy', and 'dask'. Default is 'numpy'.
use_gpu : bool, optional
If True, use GPU-accelerated arrays (CuPy) if available, overriding the backend choice. Default is False.
Returns
-------
ndarray: An array initialized as specified.
"""
backend = backend.upper()
# Get the actual dtype object from the string
dtype_obj = ArrayDType.from_string(dtype)
# Override backend if GPU is requested and CuPy is available
if use_gpu and CUPY_ENABLED:
array_backend = NDArrayBackend.CUPY
# Handle backend selection
else:
if backend not in NDArrayBackend.__members__:
raise ValueError(
f"Invalid backend. Choose among {list(NDArrayBackend.__members__.keys())}."
)
array_backend = NDArrayBackend[backend.upper()]
array_module = array_backend.module()
# Select the initialization method
if isinstance(init_method, (int, float)): # User-requested value !
array = array_module.full(shape, init_method, dtype=dtype_obj)
elif isinstance(init_method, bool):
array = array_module.full(shape, init_method, dtype=bool)
elif init_method == "unset":
array = array_module.full(shape, init_method, dtype='U5')
elif init_method == "zeros":
array = array_module.zeros(shape, dtype=dtype_obj)
elif init_method == "ones":
array = array_module.ones(shape, dtype=dtype_obj)
elif init_method == "empty":
array = array_module.empty(shape, dtype=dtype_obj)
# elif isinstance(init_method, str): # Handle arbitrary string initialization
# if dtype_obj != numpy.str_ and dtype_obj != numpy.object_:
# raise ValueError("String initialization requires dtype to be 'str' or 'object'.")
# array = array_module.full(shape, init_method, dtype=dtype_obj)
else:
raise ValueError(
"Invalid initialization method. Choose from 'zeros', 'ones', 'empty', 'unset' or provide a specific numeric or boolean value."
)
return array
caching ¶
Functions:
| Name | Description |
|---|---|
clear_request_caches | Clear all caches for the current request |
custom_cached | Backwards compatible per-request thread-safe LRU cache with TTL expiration. |
generate_custom_hashkey | Generate a custom hash key for the given arguments and keyword arguments. |
get_request_cache_registry | Get or create the current request's cache registry |
inspect_cache_registry | Inspect the content of all cache memories in a cache registry |
make_object_hashable | Convert unhashable objects to hashable representations. |
register_cache | Register a cache memory in the thread-local cache registry |
clear_request_caches ¶
Clear all caches for the current request
Source code in pvgisprototype/core/caching.py
def clear_request_caches():
"""Clear all caches for the current request"""
if hasattr(_thread_local_storage, 'cache_registry'):
registry = _thread_local_storage.cache_registry
request_id = get_request_id()
total_hits = 0
total_misses = 0
total_caches = len(registry)
for cache in registry:
try:
if hasattr(cache, 'cache_info'):
info = cache.cache_info()
total_hits += info['lru_info'].hits
total_misses += info['lru_info'].misses
if hasattr(cache, 'cache_clear'):
cache.cache_clear()
except Exception as e:
logger.warning(f"Error clearing cache: {e}")
# Log performance summary
total_requests = total_hits + total_misses
hit_rate = (total_hits / total_requests * 100) if total_requests > 0 else 0
logger.info(
f"Request {request_id} cache summary: "
f"{total_caches} caches, {total_hits} hits, {total_misses} misses, "
f"{hit_rate:.1f}% hit rate"
)
# Clear the registry
_thread_local_storage.cache_registry = []
_thread_local_storage.request_id = 'unknown'
custom_cached ¶
Backwards compatible per-request thread-safe LRU cache with TTL expiration. Usage is exactly the same as your existing decorator. TTL is internally configurable via 'PVGIS_CACHE_TTL_SECONDS' env variable (default 300s).
Source code in pvgisprototype/core/caching.py
def custom_cached(func):
"""
Backwards compatible per-request thread-safe LRU cache with TTL expiration.
Usage is exactly the same as your existing decorator.
TTL is internally configurable via 'PVGIS_CACHE_TTL_SECONDS' env variable (default 300s).
"""
ttl = DEFAULT_TTL_SECONDS
ttl_hash_gen = _ttl_hash_gen(ttl)
def get_or_create_cache():
cache_attr = f"_cache_{func.__name__}_{id(func)}"
if not hasattr(_thread_local_storage, cache_attr):
# LRUCache as backing cache store
cache_memory = LRUCache(maxsize=CACHE_MAXSIZE)
setattr(_thread_local_storage, cache_attr, cache_memory)
# Register cache for per-request cleanup
registry = getattr(_thread_local_storage, 'cache_registry', None)
if registry is None:
registry = []
setattr(_thread_local_storage, 'cache_registry', registry)
if cache_memory not in registry:
registry.append(cache_memory)
request_id = getattr(_thread_local_storage, 'request_id', 'unknown')
logger.debug(f"Created cache for {func.__name__} in request {request_id}, TTL={ttl}s, maxsize={CACHE_MAXSIZE}")
return getattr(_thread_local_storage, cache_attr)
@wraps(func)
def wrapper(*args, **kwargs):
cache_memory = get_or_create_cache()
# Compute TTL hash to invalidate cache every ttl seconds
ttl_hash = next(ttl_hash_gen)
# Generate composite key: (ttl_hash, your original key)
# Use your existing generate_custom_hashkey to maintain compatible key hashing
from pvgisprototype.core.caching import generate_custom_hashkey # adjust import as needed
key_inner = generate_custom_hashkey(*args, **kwargs)
key = (ttl_hash, key_inner)
if key in cache_memory:
request_id = getattr(_thread_local_storage, 'request_id', 'unknown')
logger.debug(f"Cache HIT for {func.__name__} in request {request_id} (ttl_hash={ttl_hash})")
return cache_memory[key]
# Cache miss: call function and store result
result = func(*args, **kwargs)
cache_memory[key] = result
request_id = getattr(_thread_local_storage, 'request_id', 'unknown')
logger.debug(f"Cache MISS for {func.__name__} in request {request_id} (ttl_hash={ttl_hash})")
return result
return wrapper
generate_custom_hashkey ¶
Generate a custom hash key for the given arguments and keyword arguments.
Returns:
| Name | Type | Description |
|---|---|---|
hashkey | The hash key for the given arguments and keyword arguments. | |
Source code in pvgisprototype/core/caching.py
def generate_custom_hashkey(*args, **kwargs):
"""
Generate a custom hash key for the given arguments and keyword arguments.
Returns
-------
hashkey: The hash key for the given arguments and keyword arguments.
"""
args_hashed = tuple(make_object_hashable(argument) for argument in args)
kwargs_hashed = {key: make_object_hashable(value) for key, value in kwargs.items()}
return hashkey(*args_hashed, **kwargs_hashed)
get_request_cache_registry ¶
Get or create the current request's cache registry
Source code in pvgisprototype/core/caching.py
def get_request_cache_registry():
"""Get or create the current request's cache registry"""
if not hasattr(_thread_local_storage, 'cache_registry'):
_thread_local_storage.cache_registry = []
_thread_local_storage.request_id = generate_request_id()
logger.debug(f"Created new request cache registry for {_thread_local_storage.request_id}")
return _thread_local_storage.cache_registry
inspect_cache_registry ¶
Inspect the content of all cache memories in a cache registry
Source code in pvgisprototype/core/caching.py
def inspect_cache_registry(registry=None):
"""Inspect the content of all cache memories in a cache registry"""
if registry is None:
registry = get_request_cache_registry()
cache_states = {}
for index, cache_func in enumerate(registry):
try:
if hasattr(cache_func, 'cache_info'):
info = cache_func.cache_info()
cache_states[f"cache_{index}"] = {
"function": info.get('function', 'unknown'),
"hits": info['lru_info'].hits,
"misses": info['lru_info'].misses,
"currsize": info['lru_info'].currsize,
"maxsize": info.get('maxsize', 'unknown'),
"ttl_seconds": info.get('ttl_seconds', 'unknown')
}
else:
cache_states[f"cache_{index}"] = "Cache info not available"
except Exception as e:
cache_states[f"cache_{index}"] = f"Error getting cache info: {e}"
return cache_states
make_object_hashable ¶
Convert unhashable objects to hashable representations. Uses generate_hash() for complex objects that can't be hashed directly.
Source code in pvgisprototype/core/caching.py
def make_object_hashable(object):
"""
Convert unhashable objects to hashable representations.
Uses generate_hash() for complex objects that can't be hashed directly.
"""
try:
# Try to hash the object directly first
hash(object)
logger.debug(f"Object {object} is hashable.")
return object
except TypeError:
# If it's unhashable, use our custom generate_hash function
logger.debug(f"Object {object} is unhashable.")
return generate_hash(object)
register_cache ¶
Register a cache memory in the thread-local cache registry
Source code in pvgisprototype/core/caching.py
def register_cache(cache, registry=None):
"""Register a cache memory in the thread-local cache registry"""
if registry is None:
registry = get_request_cache_registry()
if cache not in registry:
registry.append(cache)
request_id = get_request_id()
logger.debug(f"Cache registered for request {request_id} (registry size: {len(registry)})")
return cache
data_model ¶
Modules:
| Name | Description |
|---|---|
generate | |
graph | |
inspect_data_model | |
yaml_definition_files | Important Note ! |
generate ¶
Functions:
| Name | Description |
|---|---|
callback_reset_python_data_model_definitions | |
main | Build and write Python data models from YAML definitions. |
callback_reset_python_data_model_definitions ¶
Source code in pvgisprototype/core/data_model/generate.py
def callback_reset_python_data_model_definitions(
ctx: Context,
reset_definitions: bool,
):
"""
"""
# print(ctx.params)
if not reset_definitions:
return
else:
output_file = ctx.params.get('output_file')
verbose = ctx.params.get('verbose')
print(f"Reset the Python definition dictionary in {output_file} an empty one !")
reset_python_data_model_definitions(
output_file=output_file,
verbose=verbose,
)
raise typer.Exit()
main ¶
main(
source_path: Annotated[
Path,
Option(
help="Source directory with YAML data model descriptions"
),
] = Path("definitions.yaml"),
definitions: Annotated[
List[str], typer_list_of_yaml_files
] = PVGIS_DATA_MODEL_YAML_DEFINITION_FILES,
output_file: Annotated[
Path, Option(help="Output file", is_eager=True)
] = Path("definitions.py"),
verbose: Annotated[bool, Option(help=Verbose)] = False,
log_file: Annotated[
str | None,
Option(--log - file, -l, help="Log file"),
] = LOG_FILE,
log_level: str = LOG_LEVEL,
rich_handler: Annotated[
bool,
Option(--rich, --no - rich, help="Rich handler"),
] = RICH_HANDLER,
reset_definitions: Annotated[
bool, typer_option_reset_definitions
] = False,
)
Build and write Python data models from YAML definitions.
Source code in pvgisprototype/core/data_model/generate.py
@app.command()
def main(
source_path: Annotated[Path, typer.Option(help="Source directory with YAML data model descriptions")] = Path("definitions.yaml"),
definitions: Annotated[List[str], typer_list_of_yaml_files] = PVGIS_DATA_MODEL_YAML_DEFINITION_FILES,
output_file: Annotated[Path, typer.Option(help='Output file', is_eager=True)] = Path("definitions.py"),
verbose: Annotated[bool, typer.Option(help="Verbose")] = False,
log_file: Annotated[str | None, typer.Option("--log-file", "-l",help="Log file")] = LOG_FILE,
log_level: str = LOG_LEVEL,
rich_handler: Annotated[bool, typer.Option("--rich", "--no-rich", help="Rich handler")] = RICH_HANDLER,
reset_definitions: Annotated[bool, typer_option_reset_definitions] = False, # I am a callback function !
):
"""
Build and write Python data models from YAML definitions.
"""
# Initialize logging
setup_factory_logger(
verbose=verbose,
level=log_level,
file=log_file,
rich_handler=rich_handler,
)
try:
reset_python_data_model_definitions(output_file=output_file, verbose=verbose)
pvgis_data_models = build_python_data_models(
source_path=source_path,
yaml_files=definitions,
verbose=verbose,
)
write_to_python_module(models=pvgis_data_models, output_file=output_file, verbose=verbose)
except Exception as e:
logger.exception(f"An error occurred: {e}")
else:
if verbose:
logger.success("Data models successfully generated !")
graph ¶
Modules:
| Name | Description |
|---|---|
build | |
circular_tree | |
colors | |
generate | |
graphviz_ | |
gravis_ | |
sort | |
build ¶
Functions:
| Name | Description |
|---|---|
build_dependency_graph | Build a recursive dependency graph from YAML files. |
process_model | Process a YAML file and its dependencies |
resolve_require_path | Resolve require path (e.g., 'sun/position') to a YAML file (e.g., 'sun/position.yaml'). |
build_dependency_graph ¶
build_dependency_graph(
source_path: Path,
verbose: bool = False,
log_level: str = "WARNING",
log_file: Path | None = None,
rich_handler: bool = False,
) -> DiGraph
Build a recursive dependency graph from YAML files.
Source code in pvgisprototype/core/data_model/graph/build.py
def build_dependency_graph(
source_path: Path,
verbose: bool = False,
log_level: str = "WARNING",
log_file: Path | None = None,
rich_handler: bool = False,
) -> nx.DiGraph:
"""
Build a recursive dependency graph from YAML files.
"""
# # Only set up logging if not already configured
# if not any(
# handler.levelno <= getattr(logger, log_level.upper())
# for handler in logger._core.handlers.values()
# ):
# Initialize logging
# setup_factory_logger(
# verbose=verbose,
# level=log_level,
# file=log_file,
# rich_handler=rich_handler,
# )
base_dir = Path(source_path.parts[0])
logger.debug(f"Base directory : {base_dir=}")
# base_dir = Path(source_path.parts[0]) if source_path.is_dir() else source_path.parents[1]
graph = nx.DiGraph()
visited = {} # Maps require path -> model name
queue = deque()
if source_path.is_file() and source_path.suffix == '.yaml':
queue.append((source_path.name, source_path))
logger.debug(f"Data appended to queue which now is {queue=}")
elif source_path.is_dir():
yaml_files = source_path.rglob("*.yaml")
for yaml_file in track(yaml_files, description="Queueing data models for processing\n" ):
queue.append((yaml_file.name, yaml_file))
while queue:
# logger.debug(f"[underline]The queue is now[/underline]\n\n {queue=}\n")
req_name, yaml_path = queue.popleft()
# logger.debug(f"[dim]After poping\n\n {queue=}[/dim]\n")
logger.debug(f"In the queue : {req_name=} {yaml_path=}")
if req_name in visited:
logger.debug(f"[red dim]{req_name=} already processed ![/red dim]")
continue # Already processed
process_model(
graph=graph,
base_dir=base_dir,
require_path=req_name,
yaml_path=yaml_path,
queue=queue,
visited=visited,
)
# logger.debug("Return dependency graph G\n\n{graph}", G=graph)
# logger.debug(f"Return {graph.nodes()=}")
return graph
process_model ¶
process_model(
graph: DiGraph,
base_dir: Path,
require_path: str,
yaml_path: Path,
queue: deque,
visited: dict,
)
Process a YAML file and its dependencies
Source code in pvgisprototype/core/data_model/graph/build.py
def process_model(
graph: nx.DiGraph,
base_dir: Path,
require_path: str,
yaml_path: Path,
queue: deque,
visited: dict,
):
"""
Process a YAML file and its dependencies
"""
logger.debug(f"Input graph\n {graph.nodes=}\n {graph.edges=}\n")
# logger.debug(
# f"Processing\n\n {graph.nodes()=}\n\n YAML file {yaml_path=}",
# )
logger.debug(
"Processing YAML file {yaml_path}",
yaml_path=yaml_path,
alt=f"Processing YAML file {yaml_path=}"
)
with open(yaml_path, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
logger.debug(f"Loaded data\n {data=}")
model_name = data.get('name') # required
visited[require_path] = model_name
# Merge parent attributes if inheriting
if 'require' in data:
for parent_require in track(data['require'], description="Resolving requirements"):
logger.debug(f"Processing require directive {base_dir=} / {require_path=} = {parent_require=}")
parent_path = resolve_require_path(base_dir=base_dir, require_path=parent_require)
logger.debug(f"Path to parent node {parent_path=}")
if parent_path.exists():
logger.debug(f"Loading {parent_path=}")
with open(parent_path, 'r') as pf:
parent_data = yaml.safe_load(pf)
# Recursive attribute merging
logger.debug(f"Merging\n\n {parent_data}\n\nand\n\n {data=}\n")
data = deep_merge(
base=parent_data,
override=data,
)
logger.debug(f"Merged\n\n {data=}\n")
# Get meaninfgul attributes
model_symbol = data.get('symbol', '')
model_label = data.get('label', '')
model_label += f" {model_symbol}"
model_description = data.get('description') # required
model_attributes = data.get('sections', '')
model_color = data.get('color', 'white')
# Add node with merged attributes
logger.debug(
f"Adding {model_name=} to graph"
)
# ---------------------------------------------------------------------
# graph.add_node(
# node_for_adding=model_name,
# **{key: value for key, value in data.items() if key != "require"},
# _source_path=str(yaml_path),
# )
# ---------------------------------------------------------------------
graph.add_node(
node_for_adding=model_name,
label=model_label,
description=model_description,
symbol=model_symbol,
attributes=model_attributes,
color=model_color,
border_color='lightgray',
border_size=1,
hover=model_description,
click=yaml.dump(data=model_attributes, allow_unicode=True),#, encoding='utf-8'),
)
logger.debug(
f" [bold dim]Updated graph nodes\n {graph.nodes[model_name]=}\n"
)
# Process dependencies
requires = data.get('require', [])
logger.debug(f"[bold blue]Require directives to process[/bold blue]\n\n {requires=}\n")
for parent_node in requires:
logger.debug(f"[bold blue]Processing parent node[/bold blue] {parent_node=}")
parent_node_path = resolve_require_path(base_dir, parent_node)
parent_node_label = parent_node_path.parts[-2]
# logger.debug(f"{parent_node_path.exists()=}")
if not parent_node_path.exists():
logger.debug(f"Continue ?")
continue
if parent_node not in visited:
logger.debug(f"[blue dim]Appending to processing queue[/blue dim] {parent_node=}")
queue.append((parent_node, parent_node_path))
with open(parent_node_path, 'r', encoding='utf-8') as parent_node_file:
parent_node = yaml.safe_load(parent_node_file)
parent_node_name = parent_node.get('name', '<Unnamed node>')
else:
logger.debug(f"[blue]Already visited[/blue] {parent_node=}, see\n\n {visited=}")
parent_node_name = visited[parent_node]
# Add edge even if parent not processed yet
graph.add_edge(
u_of_edge=model_name,
v_of_edge=parent_node_name or parent_node,
label=parent_node_label,
color='lightgray',
)
# logger.debug(f"Updated edges\n {graph.edges=}\n")
# logger.debug(f"Nodes after new edge addition\n {graph.nodes=}\n")
logger.debug(f"Updated graph\n {graph.nodes=}\n {graph.edges=}\n")
resolve_require_path ¶
Resolve require path (e.g., 'sun/position') to a YAML file (e.g., 'sun/position.yaml').
Source code in pvgisprototype/core/data_model/graph/build.py
circular_tree ¶
Functions:
| Name | Description |
|---|---|
visualise_circular_tree | |
visualise_circular_tree ¶
visualise_circular_tree(
graph: DiGraph,
output_file: str = "data_model_dependency_graph",
node_size: int = 20,
) -> None
Source code in pvgisprototype/core/data_model/graph/circular_tree.py
def visualise_circular_tree(
graph: nx.DiGraph,
output_file: str = "data_model_dependency_graph",
node_size: int = 20,
) -> None:
""" """
pos = nx.nx_agraph.graphviz_layout(
G=graph,
prog="twopi",
args="",
)
# plt.figure(figsize=(11.69, 11.69))
plt.figure(figsize=(10, 10))
nx.draw(
G=graph,
pos=pos,
node_size=node_size,
alpha=0.5,
node_color="blue",
with_labels=False,
)
leaf_nodes = [n for n in graph.nodes if graph.out_degree(n) == 0]
# leaf_nodes = set()
# parent_nodes = set()
# parent_paths = {}
# for u, v, data in graph.edges(data=True):
# leaf_nodes.add(u)
# parent_nodes.add(v)
# parent_paths[v] = data.get('label', 'unknown')
logger.info(
"Leaf nodes\n\n{leaf_nodes}",
# "Leaf nodes\n\n{leaf_nodes}\n\nParent nodes\n\n{parent_nodes}",
leaf_nodes=leaf_nodes,
# parent_nodes=parent_nodes,
)
# Create labels only for leaf nodes
# labels = {n: str(n) for n in leaf_nodes}
labels = {
# n: n + f"\n{data.get('symbol', '')}"
node: f"{data.get('symbol', 'Symbol')}"
for node, data in graph.nodes(data=True)
if node in leaf_nodes
}
# Draw labels for leaf nodes
nx.draw_networkx_labels(
G=graph,
pos=pos,
labels=labels,
font_size=7,
font_color="red",
# font_weight="bold",
horizontalalignment='center'
)
# Titles
plt.suptitle(
t="Data Model Dependency Graph ⎄",
fontsize=12,
fontweight='bold',
color='#2F3131', #darkgray',
)
# plt.title(
# label="Lighter Colors = Path Hierarchy | Salmon = Leaf Node",
# fontsize=9,
# color='#2F4F4F', #'darkgray',
# # pad=10
# )
# plt.axis("off")
plt.axis("equal")
# plt.tight_layout(rect=[0, 0, 1, 0.98])
# Save
output_file += ".png"
plt.savefig(output_file, dpi=300, bbox_inches='tight')
plt.close()
logger.info(f"Graph saved to {output_file}")
colors ¶
Functions:
| Name | Description |
|---|---|
generate_color_from_path | Generate a visually distinct color for a given path. |
generate_color_from_path ¶
Generate a visually distinct color for a given path. - Root component determines the base hue. - Each subpath level modifies saturation and lightness.
Source code in pvgisprototype/core/data_model/graph/colors.py
def generate_color_from_path(path: str, max_levels: int = 10):
"""
Generate a visually distinct color for a given path.
- Root component determines the base hue.
- Each subpath level modifies saturation and lightness.
"""
components = path.split("/")
root = components[0]
# Generate a consistent base hue from root
hash_obj = hashlib.md5(root.encode())
hue = int(hash_obj.hexdigest(), 16) / (16**32) # Normalize to [0,1]
# Lighter base lightness and subtle contrast with depth
base_lightness = 0.85
lightness = base_lightness - 0.02 * min(len(components), max_levels)
# Slightly increase saturation with depth for contrast
saturation = 0.4 + 0.03 * min(len(components), max_levels)
# Convert to RGB
r, g, b = colorsys.hls_to_rgb(hue, lightness, saturation)
return (r, g, b)
generate ¶
Functions:
| Name | Description |
|---|---|
generate_circular_tree | |
generate_graph | |
generate_gravis_d3 | |
generate_hierarchical_graph | |
generate_circular_tree ¶
Source code in pvgisprototype/core/data_model/graph/generate.py
def generate_circular_tree(
source_path: Path,
# yaml_file: Path,
node_size: int = 20,
) -> None:
"""
"""
# Build graph
graph = build_dependency_graph(
source_path=source_path,
)
# # Topological sort
# import networkx as nx
# def topological_sort(G: nx.DiGraph) -> list:
# try:
# return list(nx.topological_sort(G))
# except nx.NetworkXUnfeasible:
# raise ValueError("Graph contains a cycle")
# order = topological_sort(graph)
# logger.debug("Topological Order:", order)
# Visualize
visualise_circular_tree(
graph=graph,
node_size=node_size,
)
generate_graph ¶
Source code in pvgisprototype/core/data_model/graph/generate.py
def generate_graph(
source_path: Path,
# yaml_file: Path,
node_size: int = 2400,
parent_node_size: int = 1200,
) -> None:
"""
"""
# Build graph
graph = build_dependency_graph(
source_path=source_path,
)
# # Topological sort
# import networkx as nx
# def topological_sort(G: nx.DiGraph) -> list:
# try:
# return list(nx.topological_sort(G))
# except nx.NetworkXUnfeasible:
# raise ValueError("Graph contains a cycle")
# order = topological_sort(graph)
# logger.debug("Topological Order:", order)
# Visualize
visualise_graph(
graph=graph,
node_size=node_size,
parent_node_size=parent_node_size,
)
generate_gravis_d3 ¶
generate_gravis_d3(
yaml_file: Path,
output_file: Path,
verbose: bool = False,
log_level: str = "WARNING",
log_file: Path | None = None,
rich_handler: bool = False,
) -> None
Source code in pvgisprototype/core/data_model/graph/generate.py
def generate_gravis_d3(
yaml_file: Path,
# yaml_file: Path,
output_file: Path,
# node_size: int = 2400,
# parent_node_size: int = 1200,
verbose: bool = False,
log_level: str = "WARNING",
log_file: Path | None = None,
rich_handler: bool = False,
) -> None:
"""
"""
graph = build_dependency_graph(
source_path=yaml_file,
verbose=verbose,
log_level=log_level,
log_file=log_file,
rich_handler=rich_handler,
)
logger.debug(f"{graph.nodes()=}\n{graph.edges()=}")
if not output_file:
if yaml_file.is_file():
output_file = Path(yaml_file.name).with_suffix('.html')
if yaml_file.is_dir():
output_file = yaml_file.with_suffix('.html')
visualise_gravis_d3(
graph=graph,
output_file=output_file,
# node_size=node_size,
# parent_node_size=parent_node_size,
# log_level=log_level,
# log_file=log_file,
# rich_handler=rich_handler,
)
generate_hierarchical_graph ¶
Source code in pvgisprototype/core/data_model/graph/generate.py
graphviz_ ¶
Functions:
| Name | Description |
|---|---|
visualise_graph | Visualize a dependency graph with hierarchical layout |
visualise_hierarchical_graph | Visualize with hierarchical layout using graphviz |
visualise_graph ¶
visualise_graph(
graph: DiGraph,
output_file: str = "data_model_dependency_graph",
node_size: int = 2400,
parent_node_size: int = 1200,
) -> None
Visualize a dependency graph with hierarchical layout
Source code in pvgisprototype/core/data_model/graph/graphviz_.py
def visualise_graph(
graph: nx.DiGraph,
output_file: str = "data_model_dependency_graph",
node_size: int = 2400,
parent_node_size: int = 1200,
) -> None:
"""
Visualize a dependency graph with hierarchical layout
"""
leaf_nodes = set()
parent_nodes = set()
parent_paths = {}
for u, v, data in graph.edges(data=True):
leaf_nodes.add(u)
parent_nodes.add(v)
parent_paths[v] = data.get('label', 'unknown')
logger.info(
"Leaf nodes\n\n{leaf_nodes}\n\nParent nodes\n\n{parent_nodes}",
leaf_nodes=leaf_nodes,
parent_nodes=parent_nodes,
)
# Assign node colors
node_colors = {}
# Color leaf nodes
for node in leaf_nodes:
node_colors[node] = "salmon"
# Color parent nodes based on path
for name, path in parent_paths.items():
node_colors[name] = generate_color_from_path(path)
# Ensure all nodes have a color
for node in graph.nodes:
if node in leaf_nodes:
node_colors[node] = "salmon"
elif node in parent_nodes:
path = parent_paths.get(node, "unknown")
node_colors[node] = generate_color_from_path(path)
else:
# Isolated node (no incoming or outgoing edges)
node_colors[node] = "salmon" # Default to leaf
# Node sizes
node_sizes = [
parent_node_size if node in parent_nodes else node_size for node in graph.nodes
]
# Use graphviz_layout for hierarchical layout
plt.figure(figsize=(32, 11.69))
# plt.figure(figsize=(11.69,8.27)) # landscape
position = graphviz_layout(
G=graph,
# prog='sfdp',
# args="-Goverlap=prism100 -Gsep=0.1",
# prog="dot", # Use `dot` for top-down hierarchy
# args="-Granksep=1.5 -Gnodesep=1.0",
prog='twopi',
args="-Groot=node_name -Gsize=20 -Gsep=1",
)
# position = nx.spring_layout(
# G=graph,
# seed=42,
# k=.7, # k controls spacing
# # weight='weight',
# )
# Draw nodes
nx.draw(
G=graph,
pos=position,
with_labels=False,
node_size=node_sizes,
node_color=[node_colors[n] for n in graph.nodes],
edge_color='lightgray',
width=0.5,
alpha=0.7,
linewidths=0.5,
# edgecolors='black',
ax=plt.gca()
)
# Draw labels
# Parent labels excluding nodes that are also leaf nodes
parent_labels = {
n: n + f"\n{data.get('symbol', '')}"
for n in parent_nodes
if n not in leaf_nodes
}
leaf_labels = {
n: n + f"\n{data.get('symbol', '')}"
for n, data in graph.nodes(data=True)
if n in leaf_nodes
}
label_pos = {k: (v[0], v[1] + 0.02) for k, v in position.items()}
# node_symbols = {n: data.get("symbol", "") for n, data in graph.nodes(data=True)}
nx.draw_networkx_labels(
G=graph,
pos=label_pos,
labels=parent_labels,
font_size=9,
font_color="#4D4D4D", # Gray 30
font_weight="normal",
horizontalalignment='center',
# verticalalignment='bottom',
ax=plt.gca()
)
nx.draw_networkx_labels(
G=graph,
pos=position,
# labels=leaf_labels,
labels=leaf_labels,
font_size=10,
font_color="#666666",
# font_color='darkgray',
font_weight='bold',
horizontalalignment='center',
# verticalalignment='top',
ax=plt.gca()
)
# Draw edge labels
edge_labels = {(u, v): data["label"] for u, v, data in graph.edges(data=True)}
nx.draw_networkx_edge_labels(
G=graph,
pos=position,
edge_labels=edge_labels,
font_size=8,
font_color='blue',
alpha=0.66,
bbox=dict(facecolor='white', edgecolor='none', alpha=0.7, pad=1.5),
label_pos=0.66,
ax=plt.gca()
)
# Titles
plt.suptitle(
t="Data ⎄ Model Dependency Graph ⎄",
fontsize=12,
fontweight='bold',
color='#2F3131', #darkgray',
)
plt.title(
label="Lighter Colors = Path Hierarchy | Salmon = Leaf Node",
fontsize=9,
color='#2F4F4F', #'darkgray',
# pad=10
)
plt.axis("off")
plt.tight_layout(rect=[0, 0, 1, 0.98])
# Save
output_file += ".png"
plt.savefig(output_file, dpi=300, bbox_inches='tight')
plt.close()
logger.info(f"Graph saved to {output_file}")
visualise_hierarchical_graph ¶
Visualize with hierarchical layout using graphviz
Source code in pvgisprototype/core/data_model/graph/graphviz_.py
def visualise_hierarchical_graph(graph: dict, output_file: str = "hierarchical_graph"):
"""Visualize with hierarchical layout using graphviz"""
dot = nx.DiGraph(
comment="Hierarchical Data Model Dependencies",
graph_attr={"rankdir": "LR", "splines": "ortho"},
)
# Add nodes with color coding
parent_nodes = set()
for deps in graph.values():
parent_nodes.update(deps)
for node in graph:
dot.node(
node,
label=node,
_attributes={
"style": "filled",
"fillcolor": "lightgreen" if node in parent_nodes else "salmon",
"shape": "box",
"width": "1.5" if node in parent_nodes else "1.2",
},
)
# Add edges
for node, deps in graph.items():
for dep in deps:
dot.edge(node, dep)
dot.render(output_file, format="png", cleanup=True)
print(f"Hierarchical graph saved to {output_file}.png")
gravis_ ¶
Functions:
| Name | Description |
|---|---|
assign_properties | Source of this function : https://robert-haas.github.io/gravis-docs/code/examples/external_tools/networkx.html#Example-2 |
visualise_gravis_d3 | |
assign_properties ¶
Source of this function : https://robert-haas.github.io/gravis-docs/code/examples/external_tools/networkx.html#Example-2
Source code in pvgisprototype/core/data_model/graph/gravis_.py
def assign_properties(
g,
node_size: int = 15,
):
"""
Source of this function : https://robert-haas.github.io/gravis-docs/code/examples/external_tools/networkx.html#Example-2
"""
logger.debug(f"Post-processing")
# Centrality calculation
# node_centralities = nx.eigenvector_centrality(g)
node_centralities = nx.out_degree_centrality(g)
logger.debug(f"{node_centralities=}")
# edge_centralities = nx.edge_betweenness_centrality(g)
# Community detection
# communities = nx.algorithms.community.greedy_modularity_communities(g)
# # Graph properties
# g.graph["node_border_size"] = 1.5
# g.graph["node_border_color"] = "white"
# g.graph["edge_opacity"] = 0.9
# Node properties: Size by centrality, shape by size, color by community
# colors = [
# "red",
# "blue",
# "green",
# "orange",
# "pink",
# "brown",
# "yellow",
# "cyan",
# "magenta",
# "violet",
# ]
for node_id in g.nodes:
node = g.nodes[node_id]
node["size"] = node_size + node_centralities[node_id] * 33
visualise_gravis_d3 ¶
Source code in pvgisprototype/core/data_model/graph/gravis_.py
def visualise_gravis_d3(
graph: nx.DiGraph,
output_file: Path = Path("data_model_graph.html"),
# node_size: int = 2400,
# parent_node_size: int = 1200,
# verbose: bool = False,
# log_level: str = "WARNING",
# log_file: Path | None = None,
# rich_handler: bool = False,
):
"""
"""
assign_properties(graph)
fig = gv.d3(
data=graph,
graph_height=800,
details_height=200,
show_details=True,
show_details_toggle_button=True,
show_menu=True,
show_menu_toggle_button=True,
show_node=True,
node_size_factor=1.2,
node_size_data_source="size",
use_node_size_normalization=False,
node_size_normalization_min=10.0,
node_size_normalization_max=50.0,
node_drag_fix=True,
node_hover_neighborhood=True,
node_hover_tooltip=True,
show_node_image=True,
node_image_size_factor=1.0,
show_node_label=True,
show_node_label_border=True,
node_label_data_source="label",
node_label_size_factor=1.5,
node_label_rotation=33.0,
node_label_font="Arial",
show_edge=True,
edge_size_factor=1.0,
edge_size_data_source="size",
use_edge_size_normalization=True,
edge_size_normalization_min=0.2,
edge_size_normalization_max=5.0,
edge_curvature=0.0,
edge_hover_tooltip=True,
show_edge_label=True,
show_edge_label_border=True,
edge_label_data_source="label",
edge_label_size_factor=1.7,
edge_label_rotation=33.0,
edge_label_font="Arial",
zoom_factor=0.9,
large_graph_threshold=500,
layout_algorithm_active=True,
# specific for d3
use_many_body_force=True,
many_body_force_strength=-500.0,
many_body_force_theta=0.9,
use_many_body_force_min_distance=False,
many_body_force_min_distance=10.0,
use_many_body_force_max_distance=False,
many_body_force_max_distance=1000.0,
use_links_force=True,
links_force_distance=150.0,
links_force_strength=0.5,
use_collision_force=True,
collision_force_radius=90.0,
collision_force_strength=0.9,
use_x_positioning_force=False,
x_positioning_force_strength=0.2,
use_y_positioning_force=True,
y_positioning_force_strength=0.5,
use_centering_force=True,
)
# Export to HTML with embedded data
fig.export_html(output_file)
output_file.is_file()
sort ¶
Functions:
| Name | Description |
|---|---|
topological_sort | Perform topological sort using Kahn's algorithm |
topological_sort ¶
Perform topological sort using Kahn's algorithm
Source code in pvgisprototype/core/data_model/graph/sort.py
def topological_sort(graph: dict) -> list:
"""Perform topological sort using Kahn's algorithm"""
in_degree = defaultdict(int)
for node in graph:
for neighbor in graph.get(node, []):
in_degree[neighbor] += 1
queue = deque([node for node in graph if in_degree[node] == 0])
result = []
while queue:
node = queue.popleft()
result.append(node)
for neighbor in graph.get(node, []):
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
# Check for cycles
if len(result) < len(graph):
raise ValueError("Graph has a cycle")
return result
inspect_data_model ¶
Functions:
| Name | Description |
|---|---|
main | Inspect data model definitions including YAML files, Python dictionaries |
main ¶
main(
verbose: Annotated[bool, Option(help=Verbose)] = False,
log_file: Annotated[
str | None,
Option(--log - file, -l, help="Log file"),
] = LOG_FILE,
log_level: str = LOG_LEVEL,
rich_handler: Annotated[
bool,
Option(--rich, --no - rich, help="Rich handler"),
] = RICH_HANDLER,
)
Inspect data model definitions including YAML files, Python dictionaries and native PVGIS data models.
Source code in pvgisprototype/core/data_model/inspect_data_model.py
@app.callback()
def main(
verbose: Annotated[bool, typer.Option(help="Verbose")] = False,
log_file: Annotated[str | None, typer.Option("--log-file", "-l",help="Log file")] = LOG_FILE,
log_level: str = LOG_LEVEL,
rich_handler: Annotated[bool, typer.Option("--rich", "--no-rich", help="Rich handler")] = RICH_HANDLER,
):
"""
Inspect data model definitions including YAML files, Python dictionaries
and native PVGIS data models.
"""
if verbose:
log_level = "DEBUG"
setup_factory_logger(level=log_level, file=log_file, rich_handler=rich_handler)
yaml_definition_files ¶
Important Note !
The following is an ordered list of files defining PVGIS' native data models in YAML syntax.
Complex models depend on simpler ones, therefore the latter must exist before the former.
If you need to reorder the generation of PVGIS' native data models, please handle the list with care, as it may lead to issues.
Hint¶
When developing new or refactoring existing YAML data model definitions, things may get messy. To start from scratch, assign an empty dictionary to the PVGIS_DATA_MODEL_YAML_DEFINITION_FILES "constant", like so
```
PVGIS_DATA_MODEL_YAML_DEFINITION_FILES = {}
```
and rerun the script that generates the Python data model definitions.
Happy pv-Hacking !
factory ¶
Modules:
| Name | Description |
|---|---|
context | |
data_model | This module defines a factory to generate custom data classes (or else models) |
definition | |
log | Set FACTORY_LOG_FILE before running a script that uses this logger to log out |
property_functions | Attention |
context ¶
Functions:
| Name | Description |
|---|---|
parse_fields | Notes |
populate_context | Populate the context of an existing object |
parse_fields ¶
parse_fields(
data_model,
model_definition,
fields: list,
angle_output_units: str = RADIANS,
) -> dict
Notes
The YAML-based definition of a data model includes (expectedly) important attributes used to construct the output, namely :
- shortname
- title
- symbol
These attributes are functionally required for solar position relevant data models :
- the combination of
shortname+symbolor thetitleare used to check membership inSolarPositionParameterColumnName().
For the output (column names) :
- the output field title (or name) is composed by the
shortnameand thesymbol
Source code in pvgisprototype/core/factory/context.py
def parse_fields(
data_model,
model_definition,
fields: list,
angle_output_units: str = RADIANS,
) -> dict:
"""
Notes
-----
The YAML-based definition of a data model includes (expectedly) important
attributes used to construct the output, namely :
- shortname
- title
- symbol
These attributes are functionally required for solar position relevant data
models :
- the combination of `shortname` + `symbol` or the `title` are used to
check membership in `SolarPositionParameterColumnName()`.
For the output (column names) :
- the output field title (or name) is composed by the `shortname` and the
`symbol`
"""
# Get all solar position parameter field names
solar_position_parameters = set(
SolarPositionParameterColumnName.__members__.values()
)
data_container = OrderedDict()
data_model_shortname_and_symbol = f"{data_model.shortname} {data_model.symbol}"
# First, in case the data model is a simple one (i.e. not a nested one)
if hasattr(data_model, 'value'):
if data_model_shortname_and_symbol in solar_position_parameters:
# angular value : convert using the requested `angle_output_units` method
data_model.value = getattr(data_model, angle_output_units)
field_value = None
for field in fields:
try:
field_object = getattr(data_model, field)
field_definition = model_definition.get(field, {})
# Check if this field is a solar position parameter
is_solar_position_parameter = (
field_definition.get("title", None) in solar_position_parameters
)
# for all fields, use .value if available
if hasattr(field_object, "value"):
field_value = field_object.value
if is_solar_position_parameter:
# if the _object_ has .radians or .degrees implied is an angular quantity
attribute = getattr(field_object, angle_output_units)
# angular value : convert using the requested `angle_output_units` method
if callable(attribute):
field_value = (
attribute()
) # Actually call .radians() or .degrees()
else:
field_value = attribute
else:
field_value = field_object.value
else:
field_value = field_object
except AttributeError:
field_value = None
field_title = str()
if field == "value":
# If shortname + symbol exist, use'm !
if hasattr(data_model, "shortname") and hasattr(data_model, "symbol"):
field_title = f"{data_model.shortname} {data_model.symbol}"
# field_title = f"{data_model.shortname}"
elif field == "fingerprint":
field_name = model_definition.get(field, {}).get("title", field)
field_symbol = model_definition.get(field, {}).get("symbol", field)
field_title = f"{field_name} {field_symbol}"
field_value = generate_hash(data_model.value)
else:
# Get the title for the field from the model definition
field_title = model_definition.get(field, {}).get("title", field)
# Add to component content with title as key
data_container[field_title] = field_value
return data_container
populate_context ¶
populate_context(
self,
verbose=0,
fingerprint: bool = True,
angle_output_units: str = RADIANS,
locals: dict = {},
)
Populate the context of an existing object
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
self | This is a PVGIS Data Model. Initially defined and described in YAML syntax, then transformed to a Pydantic Model. | required | |
verbose | Verbosity level from the function's local scope. This is required compare against the | 0 | |
fingerprint | bool | True will retrieve the fingerprint from the data model. | True |
angle_output_units | str | Angular unit for the output data can be either 'radians' (default) or 'degrees'. | RADIANS |
Notes
See also: data model definitions in YAML syntax under definitions.yaml.
An example for the input self data model is the SolarAltitude.
Source code in pvgisprototype/core/factory/context.py
def populate_context(
self,
verbose=0,
fingerprint: bool = True,
angle_output_units: str = RADIANS,
locals: dict = {},
):
"""Populate the context of an existing object
Parameters
----------
self: [DataModel]
This is a PVGIS Data Model. Initially defined and described in YAML
syntax, then transformed to a Pydantic Model.
verbose: int
Verbosity level from the function's local scope. This is required
compare against the `verbose` condition set in the data model
definition.
fingerprint: bool
True will retrieve the fingerprint from the data model.
angle_output_units: str
Angular unit for the output data can be either 'radians' (default) or
'degrees'.
Notes
-----
See also: data model definitions in YAML syntax under `definitions.yaml`.
An example for the input `self` data model is the `SolarAltitude`.
"""
# Get the definition of the data model, originally defined in YAML syntex
model_definition = self.model_definition
# Ensure order of data model fields as they appear in a YAML definition
output = OrderedDict()
# Check if there is an 'output' definition in the YAML for that Model
if "output" in model_definition:
# Read the structure definitions
structure = model_definition['output'].get("structure")
# Iterate the sections, if they exist
if structure:
for section_definition in structure:
section = section_definition.get("section")
condition = section_definition.get("condition")
output[section] = {}
if "subsections" in section_definition:
subsection = ""
subsections = section_definition.get("subsections")
subsection_content = {}
for subsection_definition in subsections:
subsection = subsection_definition.get("subsection")
subsection_condition = subsection_definition.get("condition")
# Build a `names` dictionary dynamically
names = {
'verbose': verbose,
'reflectivity_factor': getattr(self, 'reflectivity_factor', numpy_array([])),
# other attributes ?
}
if subsection_condition is None or simple_eval(
subsection_condition,
names=names,
):
subsection_content = {}
fields = subsection_definition.get("fields")
if fields:
subsection_content = parse_fields(
data_model=self,
model_definition=model_definition,
fields=fields,
angle_output_units=angle_output_units,
)
output[section][subsection] = subsection_content
else:
# Build names dictionary - include self so conditions can access attributes
names = {
"verbose": verbose,
"fingerprint": fingerprint,
"out_of_range": getattr(self, "out_of_range", numpy_array([])),
}
# Does the condition evaluate to true ?
if condition is None or simple_eval(
condition,
names=names,
):
section_content = {} # Dictionary for that component
fields = section_definition.get("fields")
if fields:
section_content = parse_fields(
data_model=self,
model_definition=model_definition,
fields=fields,
angle_output_units=angle_output_units,
)
output[section] = section_content
# Feed output to .output
self.output = output
data_model ¶
This module defines a factory to generate custom data classes (or else models) dynamically using Pydantic's BaseModel. It includes utilities for unit conversions (e.g., radians, degrees, timestamps), custom attribute handling, and validation of model fields. The DataModelFactory enables efficient creation of models with properties like solar incidence angles, coordinates, and time series data, allowing for flexible data representation and manipulation.
Key Features
- Dynamic generation of data models with custom attributes.
- Unit conversion utilities (e.g., degrees to radians, timestamps to hours).
- Integration with NumPy for handling array-based fields.
Classes:
| Name | Description |
|---|---|
DataModelFactory | |
DataModelFactory ¶
definition ¶
Modules:
| Name | Description |
|---|---|
build | |
consolidate | |
helpers | |
inheritance | |
lists | |
load | |
merge | |
write | |
build ¶
Functions:
| Name | Description |
|---|---|
build_python_data_models | Aggregate multiple PVGIS-native data models into a single dictionary. |
build_python_data_models ¶
build_python_data_models(
source_path: Path,
yaml_files: List[str],
verbose: bool = False,
) -> Dict[str, Any]
Aggregate multiple PVGIS-native data models into a single dictionary.
Source code in pvgisprototype/core/factory/definition/build.py
def build_python_data_models(source_path: Path, yaml_files: List[str], verbose: bool = False) -> Dict[str, Any]:
"""Aggregate multiple PVGIS-native data models into a single dictionary."""
logger.info(
"Building PVGIS-native Python data models",
alt=f"Building PVGIS-native Python data models"
)
data_models = {}
logger.debug(
"PVGIS bases upon a series of native data models. "
+ "These are defined in YAML files located in the directory {source_path}."
+ "\n",
source_path=source_path,
alt=(
f"[bold]PVGIS bases upon a series of native data models. [/bold]"
+ f"These are defined in YAML files located in the directory [code]{source_path}[/code]."
+ f"\n"
)
)
logger.info(
f"Reading YAML definitions in {source_path}",
alt=f"[bold]Reading[/bold] YAML definitions in [code]{source_path}[/code]"
)
table = Table(box=None, show_header=False, show_edge=False, pad_edge=False)
for yaml_file in track(yaml_files, description="Building data models...\n"):
full_yaml_path = source_path / yaml_file
data_model = load_data_model(source_path, full_yaml_path)
table.add_row(f"- {next(iter(data_model))}")
data_models.update(data_model)
if verbose:
Console().print(table)
Console().print()
return data_models
consolidate ¶
Functions:
| Name | Description |
|---|---|
finalize_output | Ensure output dict has required keys |
load_data_model | Load and consolidate a YAML data model definition and return a nested |
finalize_output ¶
Ensure output dict has required keys
Source code in pvgisprototype/core/factory/definition/consolidate.py
load_data_model ¶
load_data_model(
source_path: Path,
data_model_yaml: Path,
require: bool = False,
) -> Dict[str, Any]
Load and consolidate a YAML data model definition and return a nested dictionary compatible with DataModelFactory.
Source code in pvgisprototype/core/factory/definition/consolidate.py
def load_data_model(
source_path: Path,
data_model_yaml: Path,
require: bool = False,
) -> Dict[str, Any]:
"""
Load and consolidate a YAML data model definition and return a nested
dictionary compatible with `DataModelFactory`.
"""
# Load data model description
data_model = load_yaml_file(data_model_yaml)
data_model_name = data_model.get('name', '<unnamed data model>')
if data_model_name == '<unnamed data model>':
logger.warning(f"The data model {data_model} lacks a 'name' key!")
log_data_model_loading(
data_model=data_model,
data_model_name=data_model_name,
require=require,
)
# Resolve requirements
data_model = resolve_requires(
data=data_model,
source_path=source_path,
)
finalize_output(data=data_model)
# del(data_model['sections']['_file_path']) # sane post-processing ?
# logger.info(
# "Return consolidated data model :\n" + yaml.dump(data={data_model_name: data_model['sections']}, default_flow_style=False, sort_keys=False),
# alt="[dim]Return consolidated data model :[/dim]\n" + yaml.dump(data={data_model_name: data_model['sections']}, default_flow_style=False, sort_keys=False),
# )
# Return the consolidated data model
return {data_model_name: data_model.get('sections', {})}
helpers ¶
Functions:
| Name | Description |
|---|---|
extract_structure_from_required | Extract the output structure list from the required file at |
find_structure_in_path | Navigate nested dict to find structure at specified path. |
get_structure | Retrieve the structure from the nested dictionary. |
extract_structure_from_required ¶
Extract the output structure list from the required file at sections.output.structure.
Source code in pvgisprototype/core/factory/definition/helpers.py
def extract_structure_from_required(
required_data: dict,
) -> List[dict]:
"""
Extract the output structure list from the required file at
`sections.output.structure`.
"""
structure = []
if 'sections' in required_data:
output = required_data['sections'].get('output', {})
if output:
logger.debug(
f" ! Identified an `output` structure !\n",
alt=f" ! [blue bold]Identified an `output` structure ![/blue bold]\n",
)
if 'structure' in output:
structure = output['structure']
required_data_model_name = required_data['name']
yaml_dump_of_structure = yaml.dump(data=structure, sort_keys=False)
logger.debug(
" Base output structure"
+ " in {required_data_model_name} :"
+ "\n\n {yaml_dump_of_structure}\n",
required_data_model_name=required_data_model_name,
yaml_dump_of_structure=yaml_dump_of_structure,
alt=f" [dim][bold]Base[/bold] output structure[/dim]"
+ f" in {required_data_model_name} :"
+ f"\n\n [dim]{yaml_dump_of_structure}[/dim]\n"
)
return structure
find_structure_in_path ¶
Navigate nested dict to find structure at specified path.
Source code in pvgisprototype/core/factory/definition/helpers.py
def find_structure_in_path(
data: Dict,
path: List[str],
) -> Union[List, None]:
"""
Navigate nested dict to find structure at specified path.
"""
data_model_name = data.get("name", "<unnamed data model>")
if data_model_name == "<unnamed data model>":
logger.warning(f"The data structure {data} lacks a 'name' key!")
for part in path:
if isinstance(data, dict) and part in data:
data = data[part]
logger.debug(
" Override output structure in {data_model_name} [Child]\n\n {data}\n",
data_model_name=data_model_name,
data=data,
alt=f" [dim bold]Override output structure[/dim bold] in {data_model_name} [Child]\n\n [dim]{data}[/dim]\n",
)
else:
return None
return data if isinstance(data, list) else None
get_structure ¶
Retrieve the structure from the nested dictionary. If it doesn't exist, return an empty list.
Source code in pvgisprototype/core/factory/definition/helpers.py
def get_structure(data: Dict) -> List:
"""
Retrieve the structure from the nested dictionary.
If it doesn't exist, return an empty list.
"""
output_structure = data.get("sections", {}).get("output", {}).get("structure", [])
data_model_name = data.get('name', '<unnamed data structure>')
if output_structure:
yaml_dump_of_structure = yaml.dump(data=output_structure, sort_keys=False)
logger.debug(
" Child node output structure"
+ " in {data_model_name} :"
+ "\n\n {yaml_dump_of_structure}\n",
data_model_name=data_model_name,
yaml_dump_of_structure=yaml_dump_of_structure,
alt=f" [dim][bold]Child node[/bold] output structure[/dim]"
+ f" in {data_model_name} :"
+ f"\n\n [dim]{yaml_dump_of_structure}[/dim]\n"
)
else:
logger.debug(
" Child node"
+ " in `{data_model_name}` has no output structure !",
data_model_name=data_model_name,
alt=f" [dim][bold]Child node[/bold][/dim]"
+ f" in {data_model_name} has no output structure !"
)
logger.debug(
f"Returning child node\n\n{data=}\n\noutput structure is\n\n{output_structure=}"
)
return output_structure
inheritance ¶
Functions:
| Name | Description |
|---|---|
resolve_requires | Process a dictionary |
set_nested_value | Set a 'value' at a nested dictionary key, creating intermediate dictionaries as needed. |
resolve_requires ¶
resolve_requires(
data: Dict,
source_path: Path,
resolved_files: Set | None = None,
cache: Dict[str, Dict] | None = None,
) -> Union[Dict, List, Any]
Process a dictionary data structure and resolve recursively its require directives by merging the current data model (also referred to as the child node or override) into the required data (also referred to as the parent node or base).
The output is a new grand-child node which combines data attributes from a child node after inheriting data attributes from the parent node.
Notes
child node : the input data structure parent node : any required directive defined in the input data structure
Source code in pvgisprototype/core/factory/definition/inheritance.py
def resolve_requires(
data: Dict,
# data: Union[Dict, List, Any],
source_path: Path,
resolved_files: Set | None = None,
cache: Dict[str, Dict] | None = None,
) -> Union[Dict, List, Any]:
"""
Process a dictionary `data` structure and resolve recursively its `require`
directives by merging the _current_ data model (also referred to as the
`child node` or `override`) into the _required_ data (also referred to as
the `parent node` or `base`).
The output is a new grand-child node which combines data attributes from a
child node after inheriting data attributes from the parent node.
Notes
-----
child node : the input `data` structure
parent node : any `required` directive defined in the input `data` structure
"""
# logger.debug(
# f"Input data for which to resolve require directives is :\n\n {data=}\n"
# )
# A cache set to track resolved files and avoide circular dependencies
if resolved_files is None:
resolved_files = set()
# A cache dictionary to store resolved data models (files) by file path
if cache is None:
cache = {}
# Sort of a "base" case : unstructured data need no processing !
if not isinstance(data, (dict, list)):
logger.debug(
"[Unstructured data, needs no processing]",
alt=f"[dim]\\[Unstructured data, needs no processing][/dim]",
)
return data
if isinstance(data, dict):
# Detect an already resolved path to avoid circular dependencies
current_file_path = data.get('_file_path', None)
if current_file_path:
if current_file_path in resolved_files:
logger.warning(
"Detected a circular dependency : file path {current_file_path} already resolved ! Skipping.-",
current_file_path=current_file_path,
alt=f"Detected a circular dependency : file path {current_file_path} already resolved ! Skipping.-",
)
return data # Skip circular dependencies
# Tracking the file path
logger.info(
"Tracking the file path {current_file_path}",
current_file_path=current_file_path,
alt=f"[yellow]Tracking the file path[/yellow] {current_file_path}",
)
resolved_files.add(current_file_path)
cache[current_file_path] = data # Cache current state
# Deep copy to avoid mutation during iteration
data = deepcopy(data)
# Process top-level `require` directive
if 'require' in data:
# Handle missing 'name' key
data_model_name = data.get('name', '<unnamed data model>')
logger.debug(
f"Identified require directives in `{data_model_name}`"
)
requires = data.pop("require")
# The `require` directive may or may not list multiple items ?
# If a single "item" (string?), make it a list
requires_list = [requires] if not isinstance(requires, list) else requires
require_directives = "- " + "\n - ".join(requires_list)
logger.debug(
" Parents for {data_model_name}\n\n {require_directives}\n",
data_model_name=data_model_name,
require_directives=require_directives,
alt=f"[dim] Parents for [/dim][bold]{data_model_name}[/bold]\n\n [yellow]{require_directives}[/yellow]\n",
)
if len(require_directives) > 0:
logger.debug(
" >>> Integrating required items >>> >>> >>>\n",
alt=f" [dim]>>> Integrating required items >>> >>> >>>[/dim]\n",
)
merged_structure = []
# Resolve recursively, merge sequentially via `reverse()` :
# respect order so a later require can override an earlier one
# for required_item in reversed(requires):
for required_item in requires: # Don't touch me ! Unless you really know what you are doing !
#
# Load and cache data model
#
# First, build the path to the require YAML file
required_path = (source_path / required_item).with_suffix('.yaml')
# required_path.is_file()
# Next check if the file is already processed, thus cached
if str(required_path) in cache:
logger.debug(
"Using cached parent data model definition\n\n{required_path}",
required_path=required_path,
alt=f"Using cached parent data model definition\n\n{required_path}"
)
required_data = cache[str(required_path)]
else:
required_data = load_yaml_file(required_path)
logger.info(
f"Required data model\n\n{required_data=}\n"
)
if isinstance(required_data, dict):
required_data['_file_path'] = str(required_path)
logger.debug(
"Caching {required_path}",
required_path=required_path,
alt=f"[magenta]Caching {required_path}[/magenta]",
)
cache[str(required_path)] = required_data
else:
logger.warning(
"Cannot track required parent data model node\n\n{required_data}\n\nis a list !\n",
required_data=required_data,
alt=f"[bold]Cannot track[/bold] required parent data model node\n\n{required_data}\n\nis a list !\n",
)
#
# Resolve require directives
#
try:
# Recursively resolve base model
required_data = resolve_requires(
data=required_data,
source_path=source_path,
resolved_files=resolved_files.copy(),
cache=cache,
)
except Exception as exception:
logger.error(
"Failed to resolve required `parent` YAML data model definition :\n\n File path : {required_path}\n\nData : {required_data}\n\nSource path : {source_path}\n\nResolved files : {resolved_files}\n\nCache : {cache}\n\nError : {exception}",
required_path=required_path,
required_data=required_data,
source_path=source_path,
resolved_files=resolved_files,
cache=cache,
exception=exception,
alt=f"Failed to resolve required `parent` YAML file : {required_path}\n\n{required_data}\n\n{source_path}\n\n{resolved_files}\n\n{cache}\n\nError : {exception}",
)
# continue
raise ValueError(f"Error resolving YAML file {required_data}")
#
# Process output-structure from required_data if present
#
## Extract the base output-structure (list) : parent node output structure
base_structure = extract_structure_from_required(
required_data=required_data
)
if base_structure:
merged_structure = merge_structure_list(
# base_structure=merged_structure,
# override_structure=base_structure,
base_structure=base_structure,
override_structure=merged_structure,
)
## Then get the child node output structure
structure_list = get_structure(data=data)
if structure_list:
merged_structure = merge_structure_list(
base_structure=merged_structure,
override_structure=structure_list,
)
logger.debug(f"Before inheriting from parent output structure, data is\n\n {data=}")
# --------------------------------------------------------
if len(base_structure) == 1:
logger.debug(f"Base structure lists a single item\n\n {data=}")
base_node = base_structure[0]
base_node.update(data)
data = base_node
# else:
# set_nested_value(
# data,
# ["sections", "output", "structure"],
# merged_structure,
# )
# --------------------------------------------------------
# for parent_node in base_structure:
# parent_node.update(data)
# --------------------------------------------------------
yaml_dump_of_structure = yaml.dump(data=data, sort_keys=False)
logger.debug(
"{data_model_name} after inheriting is"
+ "\n\n{yaml_dump_of_structure}\n",
data_model_name=data_model_name,
yaml_dump_of_structure=yaml_dump_of_structure,
alt=f" [dim][code]{data_model_name}[/code] after inheriting is[/dim]"
+ f"\n\n [dim]{yaml_dump_of_structure}[/dim]\n"
)
else:
# Merge (non-output-structure templates) required_data (base) into current data (override)
# or else said : the "current" data['name'] overrides the "base" required_data['name']
logger.debug(
f"No parent output structure found!"
+" Deep-merge non-output-structure dictionaries/lists templates!"
)
data = deep_merge(
base=required_data,
override=data,
)
yaml_dump_of_structure = yaml.dump(data=data, sort_keys=False)
logger.debug(
"{data_model_name} after deep-merging is"
+ "\n\n{yaml_dump_of_structure}\n",
data_model_name=data_model_name,
yaml_dump_of_structure=yaml_dump_of_structure,
alt=f" [dim][code]{data_model_name}[/code] after deep-merging is[/dim]"
+ f"\n\n [dim]{yaml_dump_of_structure}[/dim]\n"
)
# Apply accumulated structure
if merged_structure:
logger.debug(
"More merging... !"
)
existing_structure = get_structure(data)
final_structure = merge_structure_list(
existing_structure,
merged_structure,
)
structure_list = get_structure(data=data)
set_nested_value(data, ["sections", "output", "structure"], final_structure)
# Recurse into nested keys
# logger.info(
# "The data structure\n\n{data}\n\ndoes not contain any `require` directives. Recurse into nested keys.",
# data=data,
# alt=f"The data structure\n\n{data}\n\ndoes not contain any `require` directives. Recurse into nested keys.",
# )
logger.info(
"Recurse into nested data keys\n\n{data_keys}\n",
data_keys=data.keys(),
alt=f"Recurse into nested data keys\n\n{data.keys()=}\n",
)
for key, value in data.items():
logger.info(
"Resolve `{key}`",
key=key,
alt=f"Resolve {key=}",
)
data[key] = resolve_requires(
data=value,
source_path=source_path,
resolved_files=resolved_files.copy(),
cache=cache,
)
yaml_dump_of_structure = yaml.dump(data=data, sort_keys=False)
logger.debug(
"Resolved data is"
+ "\n\n{yaml_dump_of_structure}\n",
yaml_dump_of_structure=yaml_dump_of_structure,
alt=f"[dim][code]Resolved data is[/code][/dim]"
+ f"\n\n{yaml_dump_of_structure}\n"
)
return data
elif isinstance(data, list):
# Recurse into each item in the list
# for i, item in enumerate(data):
# data[i] = resolve_requires(item, source_path)
return [
resolve_requires(
data=item,
source_path=source_path,
resolved_files=resolved_files.copy(),
cache=cache,
)
for item in reversed(data) # Don't touch me ! Unless you really know what you are doing !
]
set_nested_value ¶
Set a 'value' at a nested dictionary key, creating intermediate dictionaries as needed. Ensures 'output' dicts always have 'type' and 'initial' keys.
Source code in pvgisprototype/core/factory/definition/inheritance.py
def set_nested_value(
data: dict,
path: list,
value: Any,
# set_type: bool = False,
# set_initial: bool = False,
):
"""
Set a 'value' at a nested dictionary key, creating intermediate dictionaries as needed.
Ensures 'output' dicts always have 'type' and 'initial' keys.
"""
if value == [data]:
logger.debug(
"Value == [Data] : Safety against self-nesting ! Skipping setting.-"
)
return data
logger.debug(
f"My job is to set the\n\n{value=}\n\nto\n\n{data=}"
)
current = data
# Traverse to the parent of the final key
for part in path[:-1]:
if part not in current or not isinstance(current[part], dict):
current[part] = {}
current = current[part]
final_key = path[-1]
if isinstance(current.get(final_key), dict) and isinstance(value, dict):
logger.debug(
"Deep merging dictionaries\n\nvalue={value}\n\nand\n\ncurrent[final_key]={current_at_final_key}\n",
value=value,
current_at_final_key=current[final_key],
alt=f"[green]Deep merging dictionaries[/green]\n\n{value=}\n\nand\n\n{current[final_key]=}\n"
)
current[final_key] = deep_merge(current[final_key], value)
else:
logger.debug(
f"Setting\n\n{value=}\n\nto\n\n{data=} @ {final_key=}"
)
current[final_key] = value
yaml_dump_of_structure = yaml.dump(data=data, sort_keys=False)
logger.debug(
"Updated `data` is"
+ "\n\n{yaml_dump_of_structure}\n",
yaml_dump_of_structure=yaml_dump_of_structure,
alt=f"[dim][code]Update data is[/code][/dim]"
+ f"\n\n{yaml_dump_of_structure}\n"
)
lists ¶
Functions:
| Name | Description |
|---|---|
merge_structure_list | Merge output structures, including nested lists/dicts. |
merge_structure_list ¶
Merge output structures, including nested lists/dicts.
Source code in pvgisprototype/core/factory/definition/lists.py
def merge_structure_list(
base_structure,
override_structure,
):
"""
Merge output structures, including nested lists/dicts.
"""
if not override_structure:
# Case 2: Child is placeholder - inherit full parent structure
logger.debug(
f"No output structure in the child node -- inheriting parent node entirely!\n\n{base_structure=}\n"
)
return deepcopy(base_structure)
# Case 1: Merge structures and assign back
logger.debug(
"/ Merging child output structure into the parent output structure",
alt=f"/ Merging child output structure into the parent output structure",
)
# Create a map of parent sections for quick lookup
parent_sections = {
# item["section"]: item for item in base_structure if "section" in item
item.get("section"): item
for item in base_structure
}
logger.debug(
f"A map of `sections` in the parent node\n\n{parent_sections=}\n",
# alt=f"A map of `sections` in the parent node\n\n{parent_sections}",
)
merged = []
# First process all child items
for child_item in override_structure:
section_name = child_item.get("section")
if section_name in parent_sections:
# Retrieve parent item
parent_item = parent_sections.pop(section_name)
logger.debug(
f"Matching child section\n\n{section_name=}\n\n {child_item=}\n\nin parent node is\n\n {parent_item=}\n"
)
# Remove require directive if present
if "require" in child_item:
logger.debug(f"Poping the require directive")
child_item.pop("require", None)
# Create merged item with parent as base
logger.debug(f"Inheriting from parent node")
merged_item = deep_merge(parent_item, child_item) # deepcopy
# Remove template metadata from the final structure item
for key in TEMPLATE_METADATA_KEYS:
merged_item.pop(key, None)
logger.debug(f"Updated child node\n\n{merged_item=}\n")
merged.append(merged_item)
# logger.debug(f"Updated output structure\n\n{merged=}\n")
else:
logger.debug(
f"No matching child section `{section_name=}` in parent node."
+ f"Preserve existing child item\n\n {child_item=}\n"
)
# merged.append(deepcopy(child_item))
# Clean child item before adding
clean_child = {k: v for k, v in child_item.items()
if k not in TEMPLATE_METADATA_KEYS}
merged.append(clean_child)
# Add remaining parent items not overridden by child
# merged.extend(parent_sections.values())
# merged.extend(deepcopy(item) for item in parent_sections.values())
# Add remaining parent items (also cleaned)
for parent_item in parent_sections.values():
clean_parent = {k: v for k, v in parent_item.items()
if k not in TEMPLATE_METADATA_KEYS}
merged.append(clean_parent)
logger.debug(f"Updated output structure\n\n{merged=}\n")
return merged
load ¶
Functions:
| Name | Description |
|---|---|
load_yaml_file | Load a data model definition from a properly structured YAML file into a |
load_yaml_file ¶
Load a data model definition from a properly structured YAML file into a Python dictionary.
Source code in pvgisprototype/core/factory/definition/load.py
def load_yaml_file(file_path: Path) -> Dict[str, Any]:
"""
Load a data model definition from a properly structured YAML file into a
Python dictionary.
"""
try:
logger.debug(
"Loading {file_path}...",
file_path=file_path,
alt=f"Loading [bold]{file_path.as_posix()}[/bold]...",
)
with open(file_path, "r") as yaml_file:
return yaml.safe_load(yaml_file)
except FileNotFoundError:
raise FileNotFoundError(f"File not found: {file_path}")
except yaml.YAMLError as e:
raise ValueError(f"Error parsing YAML file {file_path}: {e}")
merge ¶
Functions:
| Name | Description |
|---|---|
deep_merge | Recursively merge two dictionaries or lists without overwriting. |
merge_dictionaries | Recursively merge two dictionaries. |
merge_lists | Merges two lists, ensuring no duplicates. |
deep_merge ¶
Recursively merge two dictionaries or lists without overwriting. - Dicts: merged recursively. - Lists of dicts: merged by first common key (section, subsection, id, name), else append unique. - Lists of other types: append unique items.
Source code in pvgisprototype/core/factory/definition/merge.py
def deep_merge(base, override):
"""
Recursively merge two dictionaries or lists without overwriting.
- Dicts: merged recursively.
- Lists of dicts: merged by first common key (section, subsection, id, name), else append unique.
- Lists of other types: append unique items.
"""
if isinstance(base, dict) and isinstance(override, dict):
merged = base.copy()
for key, value in override.items():
if key in merged:
merged[key] = deep_merge(merged[key], value)
else:
merged[key] = value
return merged
elif isinstance(base, list) and isinstance(override, list):
#
# return base + [item for item in override if item not in base]
#
# Merge lists of dicts by identifier key
if all(isinstance(item, dict) for item in base + override):
id_keys = ['subsection', 'section', 'id', 'name']
identifier = next((k for k in id_keys if k in (base[0] if base else {})), None)
if identifier:
base_map = {item[identifier]: item for item in base if identifier in item}
override_map = {item[identifier]: item for item in override if identifier in item}
merged = []
seen_ids = set()
for item in base:
item_id = item.get(identifier)
if item_id in override_map:
merged.append(deep_merge(item, override_map[item_id]))
seen_ids.add(item_id)
else:
merged.append(item.copy())
for item in override:
item_id = item.get(identifier)
if item_id not in seen_ids:
merged.append(item.copy())
return merged
# Fallback: append unique items
merged = base.copy()
for item in override:
if item not in merged:
merged.append(item)
return merged
else:
return override
merge_dictionaries ¶
merge_dictionaries(
base: Dict[str, Any] | List[Any] | Any,
override: Dict[str, Any] | None,
) -> Dict[str, Any]
Recursively merge two dictionaries. Values in override will overwrite those in base if keys match.
Source code in pvgisprototype/core/factory/definition/merge.py
def merge_dictionaries(
base: Dict[str, Any] | List[Any] | Any,
override: Dict[str, Any] | None,
) -> Dict[str, Any]:
"""
Recursively merge two dictionaries.
Values in `override` will overwrite those in `base` if keys match.
"""
logger.info(
"Input data is\n\n{base}\n\nand\n\n{override}\n",
base=base,
override=override,
alt=f"[code]Input data is[/code]\n\n{base=}\n\nand\n\n{override=}\n",
)
# if "name" in base and not isinstance(base["name"], dict):
# base_data_model_name = base["name"]
# # logger.debug(
# # "/ Processing {base_data_model_name} [Parent]",
# # base_data_model_name=base_data_model_name,
# # alt=f"[dim]/ Processing [bold]{base_data_model_name}[/bold] [Parent][/dim]",
# # )
# log_action(
# action="/ Processing",
# action_style="",
# object_name=base_data_model_name,
# details="[Parent data model]",
# )
if override is None:
logger.info(
"Override is None, returning the base dictionary!",
alt="[orange]Override is None, returning the base dictionary![/orange]"
)
return base if base else {}
if isinstance(base, dict):
logger.debug(
"Merging dictionaries",
alt=f"[dim]Merging[/dim] {base.get('name', '<unnamed base>')} → {override.get('name', '<unnamed override>')}"
)
merged = deepcopy(base) if isinstance(base, (dict, list)) else base
# merged = base.copy() if isinstance(base, (dict, list)) else base
for override_key, override_value in override.items():
log_node(
node_type='Child',
key=override_key,
value=override_value,
)
# ----------------------------------------------
if (
override_value
and isinstance(override_value, dict)
and "name" in override_value
and not isinstance(override_value['name'], dict)
):
override_value_name = override_value["name"]
else:
override_value_name = ''
# ----------------------------------------------
if override_key in merged:
base_value = merged[override_key]
log_node(
node_type='Child', # the child key actually
key=override_key,
value=base_value,
state_message="exists in Parent [will inherit]",
)
# ----------------------------------------------
if (
base_value
and isinstance(base_value, dict)
and "name" in base_value
and not isinstance(base_value['name'], dict)
):
base_value_name = base_value["name"]
else:
base_value_name = '<unnamed base value>'
# ----------------------------------------------
if isinstance(override_value, dict) and isinstance(base_value, dict):
yaml_dump_of_merged = yaml.dump(data=merged, sort_keys=False)
log_action(
action="Before merging dictionaries",
action_style="dim yellow",
object_name=f"{base_value_name}, {override_value_name}",
details=yaml_dump_of_merged,
)
try:
# Recursively merge nested dictionaries
merged[override_key] = merge_dictionaries(
base=base_value,
override=override_value,
)
except Exception as exception:
logger.error(
"Error merging dictionaries for key `{override_key}` : {exception}",
override_key=override_key,
exception=exception,
alt=f"Error merging dictionaries for key [bold]{override_key}[/bold] : {exception}",
)
raise
yaml_dump_of_merged = yaml.dump(data=merged, sort_keys=False)
log_action(
action="After merging dictionaries",
action_style="yellow",
object_name='', # if 'name' in merged else '' ?
details=yaml_dump_of_merged,
)
elif isinstance(override_value, list) and isinstance(base_value, list):
# logger.debug("", alt=f"[blue]Before list :[/blue]\n{yaml.dump(merged)}")
yaml_dump_of_merged = yaml.dump(data=merged, sort_keys=False)
log_action(
action="Before merging lists",
action_style="dim blue",
object_name=f"{base_value_name} into {override_value_name}",
details=yaml_dump_of_merged,
)
base_list = base_value
try:
merged[override_key] = merge_lists(
base_list=base_list,
override_list=override_value,
)
except Exception as e:
logger.error(f"Error merging lists for key {override_key} : {e}")
raise
yaml_dump_of_merged = yaml.dump(data=merged, sort_keys=False)
log_action(
action="After merging lists",
action_style="bold blue",
object_name='', # if 'name' in merged else '' ?
details=yaml_dump_of_merged,
)
else:
log_action(
action="No dictionaries or lists, hence overwriting", # alt=f"[red]No dictionaries or lists[/red], hence [underline]overwriting[/underline] :
action_style='bold red',
object_name=override_key,
details=f"{override_key} = {override_value}", # [code]{override_key}[/code] = [bold]{override_value}[/bold]\n",
)
merged[override_key] = override_value
yaml_dump_of_merged = yaml.dump(data=merged, sort_keys=False)
log_action(
action="After direct assignment",
action_style="bold red",
object_name=f"{override_key} = see: `override_value`",
details=yaml_dump_of_merged,
)
else:
log_node(
node_type='Child', # the child key actually
key=override_key,
state_message="does not exist in Parent !",
message_style='red'
)
log_action(
action=f"Adding",
action_style='green',
object_name=override_key,
details=f'{override_key} = {override_value}',
)
merged[override_key] = deepcopy(override_value)
# merged[override_key] = override_value
yaml_dump_of_merged = yaml.dump(merged, sort_keys=False)
log_action(
action="After adding",
action_style="magenta",
object_name=f"{override_key}",
details=yaml_dump_of_merged,
)
# Special handling for nested sections
if 'sections' in merged and 'sections' in override:
for section_key, section_value in override['sections'].items():
if section_key in merged['sections']:
merged['sections'][section_key] = merge_dictionaries(
base=merged['sections'][section_key],
override=section_value,
)
else:
merged['sections'][section_key] = deepcopy(section_value)
yaml_dump_of_merged = yaml.dump(data=merged, sort_keys=False)
if 'name' in merged and not isinstance(merged['name'], dict):
action = "consolidated"
action_style = 'bold green'
merged_data_model_name = merged['name']
else:
action = "partially consolidated"
action_style = ''
merged_data_model_name = ''
log_action(
action=f"Return {action}",
action_style=action_style,
object_name=merged_data_model_name,
details=yaml_dump_of_merged,
)
return merged
merge_lists ¶
Merges two lists, ensuring no duplicates. If items are dicts, uses section, name, or id for deduplication.
Source code in pvgisprototype/core/factory/definition/merge.py
def merge_lists(
base_list: List,
override_list: List,
) -> List:
"""
Merges two lists, ensuring no duplicates.
If items are dicts, uses `section`, `name`, or `id` for deduplication.
"""
log_action(
action="/ Merging `override` list into `base`",
action_style='',
object_name='a pair of Lists',
details="[Parent data model]",
)
# if base_list is None:
# return deepcopy(override_list) if override_list else []
# if override_list is None:
# return base_list
# merged = deepcopy(base_list)
merged = base_list.copy()
for item in reversed(override_list):
if isinstance(item, dict):
match_key = next((identifier for identifier in ("section", "name", "id") if identifier in item), None)
if match_key:
match = next((key for key in merged if isinstance(key, dict) and key.get(match_key) == item.get(match_key)), None)
if match:
merged[merged.index(match)] = merge_dictionaries(base=match, override=item)
else:
merged.append(item)
elif item not in merged:
merged.append(item)
elif item not in merged:
merged.append(item)
yaml_dump_of_merged = yaml.dump(data=merged, sort_keys=False)
log_action(
action=f"Return merge list",
action_style='underline',
object_name='',
details=yaml_dump_of_merged,
)
return merged
write ¶
Functions:
| Name | Description |
|---|---|
reset_python_data_model_definitions | Reset to empty dictionary ! |
write_to_python_module | Write aggregated models to a Python module as a dictionary. |
reset_python_data_model_definitions ¶
Reset to empty dictionary !
Source code in pvgisprototype/core/factory/definition/write.py
write_to_python_module ¶
Write aggregated models to a Python module as a dictionary.
Source code in pvgisprototype/core/factory/definition/write.py
def write_to_python_module(
models: Dict[str, Any],
output_file: Path,
verbose: bool = False,
) -> None:
"""Write aggregated models to a Python module as a dictionary."""
try:
content = (
f"# Custom data model definitions\n\n"
f"PVGIS_DATA_MODEL_DEFINITIONS = {models}\n"
)
if verbose and models:
logger.info("", alt=f"[bold]Writing[/bold] to [code]{output_file}[/code]")
with open(output_file, "w") as python_module:
python_module.write(content)
except IOError as e:
print(f"Error writing to file '{output_file}' : {e}")
logger.debug("", alt=f"Python data model definitions written to [code]{output_file}[/code]")
log ¶
Set FACTORY_LOG_FILE before running a script that uses this logger to log out everything to a file !
Functions:
| Name | Description |
|---|---|
log_action | |
log_data_model_loading | |
log_node | |
setup_factory_logger | Set up a clean logger for the definition factory. |
log_action ¶
Source code in pvgisprototype/core/factory/log.py
def log_action(
action: str,
action_style: str,
object_name: str,
details: str | None = None,
):
"""
"""
action_style = 'dim' + f' {action_style}'
details = f"\n\n{details=}\n"
logger.info(
"{action} {object_name} {details}",
action=action,
object_name=object_name,
details=details,
# extra={'object_name': data_model_name, 'details': details},
alt=f"[{action_style}]{action}[/{action_style}] [bold]{object_name=}[/bold]{details}"
)
log_data_model_loading ¶
Source code in pvgisprototype/core/factory/log.py
def log_data_model_loading(
data_model,
data_model_name,
require: bool = False,
):
"""
"""
if not require:
logger.debug(
"Processing data model {data_model_name}",
data_model_name=data_model_name,
alt=f"[dim]Processing data model [code]{data_model_name}[/code] :[/dim]\n\n{yaml.dump(data_model, sort_keys=False)}",
)
else:
logger.debug(
"Require data model :\n{yaml.dump(data_model, default_flow_style=False, sort_keys=False,)}",
data_model=data_model,
alt=f"Require data model :\n[bold]{yaml.dump(data_model, default_flow_style=False, sort_keys=False,)}[/bold]"
)
log_node ¶
log_node(
node_type: str,
key: str | int,
value: Dict | List | None = None,
state_message: str | None = "",
message_style: str | None = "",
)
Source code in pvgisprototype/core/factory/log.py
def log_node(
node_type: str,
key: str | int,
value: Dict | List | None = None,
state_message: str | None = '',
message_style: str | None = '',
):
"""
"""
message_style_open = f"[{message_style}]" if message_style else ''
message_style_close = f"[/{message_style}]" if message_style else ''
value_type = '| ' + str(type(value)) if value else ''
value = f"\n\n {value}\n" if value else ''
logger.debug(
"{node_type} key {key} {state_message} {value_type} {value}",
node_type=node_type,
key=key,
state_message=state_message,
value_type=value_type,
value=value,
alt=f"[dim]{node_type} key[/dim] [bold]{key}[/bold] {message_style_open}{state_message}{message_style_close} [bold]{value_type}[/bold] {value}",
)
setup_factory_logger ¶
setup_factory_logger(
verbose: bool = False,
level: str = "WARNING",
format: str = LOG_FORMAT,
file: str | Path | None = None,
rich_handler: bool = False,
)
Set up a clean logger for the definition factory.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
verbose | bool | If True, sets the logging level to DEBUG and shows logs in the console. | False |
level | str | Log level | 'WARNING' |
file | str | Path | None | The file path to log to. If None, logging will be to console. | None |
rich_handler | bool | If True, enables rich formatting for console output. | False |
Source code in pvgisprototype/core/factory/log.py
def setup_factory_logger(
verbose: bool = False,
level: str = "WARNING",
format: str = LOG_FORMAT,
file: str | Path | None = None,
rich_handler: bool = False,
):
"""
Set up a clean logger for the definition factory.
Parameters
----------
verbose : (bool)
If True, sets the logging level to DEBUG and shows logs in the console.
level: str
Log level
file : str | Path | None
The file path to log to. If None, logging will be to console.
rich_handler : bool
If True, enables rich formatting for console output.
"""
logger.remove() # Remove any existing handlers
if rich_handler:
import richuru
richuru.install(level=level, rich_traceback=True)
logger.debug(f"Installed richuru")
level = 'DEBUG' if verbose else level
if verbose:
logger.debug(
"Logging directed to `sys.stderr`",
alt=f"Logging directed to `sys.stderr`",
)
logger.add(
sink=sys.stderr,
level=level,
format=format,
backtrace=False,
diagnose=False,
)
if file:
logger.info(
"Logging to file {file}",
file=file,
alt=f"Logging to file {file}",
)
logger.add(
sink=file,
level=level,
format=format,
backtrace=True,
diagnose=True,
)
logger.info("Factory logger initialized")
property_functions ¶
Attention
The 'module'
from pvgisprototype.core.data_model.definitions import PVGIS_DATA_MODEL_DEFINITIONS
needs to pre-exist, at the current setup, even for its "own" generation via the script
pvgisprototype/core/data_model/generate_definitions.py !
Just create one, in case, even with an empty dictionary names PVGIS_DATA_MODEL_DEFINITIONS. Then run the generation script, simply via
``` python generate_definitions.py ````
Functions:
| Name | Description |
|---|---|
as_hours_property | Instance property to convert to hours |
as_minutes_property | Instance property to convert to minutes |
datetime_property | Instance property to convert to datetime |
degrees_property | Instance property to convert to degrees. |
get_model_definition | Retrieve the definition of a model from the global definitions. |
radians_property | Instance property to convert to radians |
timedelta_property | Instance property to convert to timedelta |
timestamp_property | Instance property to convert to time (timestamp) |
as_hours_property ¶
Instance property to convert to hours
as_minutes_property ¶
Instance property to convert to minutes
Source code in pvgisprototype/core/factory/property_functions.py
def as_minutes_property(self) -> float | NpNDArray | None:
"""Instance property to convert to minutes"""
if self.unit == "minutes":
value = self.value
elif self.unit == "datetime":
value = (
self.value.hour * 3600 + self.value.minute * 60 + self.value.second
) / 60
elif self.unit == "timestamp":
value = _timestamp_to_minutes(self.value)
elif self.unit == "timedelta":
value = self.value.total_seconds() / 60
elif self.unit == RADIANS:
value = _radians_to_minutes(self.value)
elif self.unit == DEGREES:
value = _degrees_to_minutes(self.value)
else:
value = None
return value
datetime_property ¶
degrees_property ¶
Instance property to convert to degrees.
Notes
How to restrict to angular data models ?
Source code in pvgisprototype/core/factory/property_functions.py
def degrees_property(self) -> float | NpNDArray | None:
"""
Instance property to convert to degrees.
Notes
-----
How to restrict to angular data models ?
"""
# # Only proceed if this model is actually an angle
# if self.unit not in [DEGREES, RADIANS]:
# return None
if self.value is None:
return None
if self.unit == DEGREES:
return self.value
elif self.unit == RADIANS:
if isinstance(self.value, (int, float)):
from math import degrees
return degrees(self.value)
if isinstance(self.value, numpy.ndarray):
return numpy.degrees(self.value)
return None # Keep Me !
get_model_definition ¶
Retrieve the definition of a model from the global definitions.
Source code in pvgisprototype/core/factory/property_functions.py
def get_model_definition(self) -> Dict:
"""Retrieve the definition of a model from the global definitions."""
if self.data_model_name not in PVGIS_DATA_MODEL_DEFINITIONS:
raise ValueError(f"No definition found for model: {self.data_model_name}")
return PVGIS_DATA_MODEL_DEFINITIONS[self.data_model_name]
radians_property ¶
Instance property to convert to radians
Source code in pvgisprototype/core/factory/property_functions.py
def radians_property(self) -> float | NpNDArray | None:
"""Instance property to convert to radians"""
# # Only proceed if this model is actually an angle
# if self.unit not in [DEGREES, RADIANS]:
# return None
if self.value is None:
return None
if self.unit == RADIANS:
return self.value
elif self.unit == DEGREES:
if isinstance(self.value, (int, float)):
from math import radians
return radians(self.value)
if isinstance(self.value, numpy.ndarray):
return numpy.radians(self.value)
return None # Keep Me !
timedelta_property ¶
Instance property to convert to timedelta
Source code in pvgisprototype/core/factory/property_functions.py
def timedelta_property(self) -> Timedelta | TimedeltaIndex | None:
"""Instance property to convert to timedelta"""
if self.unit == RADIANS:
return _radians_to_timedelta(self.value)
elif self.unit == DEGREES:
return _degrees_to_timedelta(self.value)
elif self.unit == "timedelta":
return self.value
else:
return None
timestamp_property ¶
hashing ¶
Functions:
| Name | Description |
|---|---|
convert_numpy_to_json_serializable | Convert numpy arrays and other non-serializable objects to JSON-compatible types. |
convert_numpy_to_json_serializable ¶
Convert numpy arrays and other non-serializable objects to JSON-compatible types.
Source code in pvgisprototype/core/hashing.py
def convert_numpy_to_json_serializable(obj: Any) -> Any:
"""
Convert numpy arrays and other non-serializable objects to JSON-compatible types.
"""
if isinstance(obj, Enum):
return str(obj.name)
if isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, (np.integer, np.int64, np.int32, np.int16, np.int8)):
return int(obj)
elif isinstance(obj, (np.floating, np.float64, np.float32, np.float16)):
return float(obj)
elif isinstance(obj, np.bool_):
return bool(obj)
elif isinstance(obj, set):
return [convert_numpy_to_json_serializable(item) for item in obj] # Convert set to list while recursively convert its items
elif isinstance(obj, dict):
return {k: convert_numpy_to_json_serializable(v) for k, v in obj.items()}
elif isinstance(obj, (list, tuple)):
return [convert_numpy_to_json_serializable(item) for item in obj]
else:
return obj