Source code for autofit.non_linear.search.nest.nautilus.search

import numpy as np
import logging
import os
import sys
from pathlib import Path
from typing import Dict, Optional, Tuple

from autofit.database.sqlalchemy_ import sa

from autofit.mapper.prior_model.abstract import AbstractPriorModel
from autofit.mapper.prior.vectorized import PriorVectorized
from autofit.non_linear.fitness import Fitness
from autofit.non_linear.paths.null import NullPaths
from autofit.non_linear.search.nest import abstract_nest
from autofit.non_linear.samples.sample import Sample
from autofit.non_linear.samples.nest import SamplesNest
from autofit.non_linear.test_mode import is_test_mode


logger = logging.getLogger(__name__)

[docs] class Nautilus(abstract_nest.AbstractNest): __identifier_fields__ = ( "n_live", "n_update", "enlarge_per_dim", "n_points_min", "split_threshold", "n_networks", "n_like_new_bound", "seed", "n_shell", "n_eff", ) def __init__( self, name: Optional[str] = None, path_prefix: Optional[str] = None, unique_tag: Optional[str] = None, n_live: int = 3000, n_update: Optional[int] = None, enlarge_per_dim: float = 1.1, n_points_min: Optional[int] = None, split_threshold: int = 100, n_networks: int = 4, n_batch: int = 100, n_like_new_bound: Optional[int] = None, vectorized: bool = False, seed: Optional[int] = None, f_live: float = 0.01, n_shell: int = 1, n_eff: int = 500, n_like_max: float = float("inf"), discard_exploration: bool = False, verbose: bool = True, iterations_per_quick_update: Optional[int] = None, iterations_per_full_update: int = None, number_of_cores: int = 1, silence: bool = False, force_x1_cpu: bool = False, session: Optional[sa.orm.Session] = None, use_jax_vmap: bool = True, **kwargs, ): """ A Nautilus non-linear search. Nautilus is an optional requirement and must be installed manually via the command `pip install nautilus-sampler`. It is optional as it has certain dependencies which are generally straight forward to install. For a full description of Nautilus checkout its Github and documentation webpages: https://github.com/johannesulf/nautilus https://nautilus-sampler.readthedocs.io/en/stable/index.html Parameters ---------- name The name of the search, controlling the last folder results are output. path_prefix The path of folders prefixing the name folder where results are output. unique_tag The name of a unique tag for this model-fit, which will be given a unique entry in the sqlite database and also acts as the folder after the path prefix and before the search name. n_live Number of live points used for sampling. n_batch Number of likelihood evaluations performed at each step. n_like_max Maximum number of likelihood evaluations before stopping. f_live Maximum fraction of evidence in the live set before terminating. n_eff Minimum effective sample size before stopping. iterations_per_full_update The number of iterations performed between update (e.g. output latest model to hard-disk, visualization). number_of_cores The number of cores sampling is performed using a Python multiprocessing Pool instance. silence If True, the default print output of the non-linear search is silenced. force_x1_cpu If True, force single-CPU mode even when number_of_cores > 1. session An SQLalchemy session instance so the results of the model-fit are written to an SQLite database. """ super().__init__( name=name, path_prefix=path_prefix, unique_tag=unique_tag, iterations_per_full_update=iterations_per_full_update, iterations_per_quick_update=iterations_per_quick_update, number_of_cores=number_of_cores, silence=silence, session=session, **kwargs, ) self.n_live = n_live self.n_update = n_update self.enlarge_per_dim = enlarge_per_dim self.n_points_min = n_points_min self.split_threshold = split_threshold self.n_networks = n_networks self.n_batch = n_batch self.n_like_new_bound = n_like_new_bound self.vectorized = vectorized self.seed = seed self.f_live = f_live self.n_shell = n_shell self.n_eff = n_eff self.n_like_max = n_like_max self.discard_exploration = discard_exploration self.verbose = verbose self.force_x1_cpu = force_x1_cpu self.use_jax_vmap = use_jax_vmap if is_test_mode(): self.apply_test_mode() self.logger.debug("Creating Nautilus Search")
[docs] def apply_test_mode(self): logger.warning( "TEST MODE 1 (reduced iterations): Sampler will run with " "minimal iterations for faster completion." ) self.n_like_max = 1
def _fit(self, model: AbstractPriorModel, analysis): """ Fit a model using the search and the Analysis class which contains the data and returns the log likelihood from instances of the model, which the `NonLinearSearch` seeks to maximize. Parameters ---------- model : ModelMapper The model which generates instances for different points in parameter space. analysis : Analysis Contains the data and the log likelihood function which fits an instance of the model to the data, returning the log likelihood the `NonLinearSearch` maximizes. Returns ------- A result object comprising the Samples object that includes the maximum log likelihood instance and full set of accepted ssamples of the fit. """ if not isinstance(self.paths, NullPaths): checkpoint_exists = Path(self.checkpoint_file).exists() else: checkpoint_exists = False if checkpoint_exists: self.logger.info( "Resuming Nautilus non-linear search (previous samples found)." ) else: self.logger.info( "Starting new Nautilus non-linear search (no previous samples found)." ) if self.force_x1_cpu or analysis._use_jax: fitness = Fitness( model=model, analysis=analysis, paths=self.paths, fom_is_log_likelihood=True, resample_figure_of_merit=-1.0e99, iterations_per_quick_update=self.iterations_per_quick_update, background_quick_update=self.quick_update_background, live_visual_update=self.live_visual_update, use_jax_vmap=self.use_jax_vmap, batch_size=self.n_batch, ) search_internal = self.fit_x1_cpu( fitness=fitness, model=model, analysis=analysis, ) else: fitness = Fitness( model=model, analysis=analysis, paths=self.paths, fom_is_log_likelihood=True, resample_figure_of_merit=-1.0e99, iterations_per_quick_update=self.iterations_per_quick_update, background_quick_update=self.quick_update_background, live_visual_update=self.live_visual_update, ) search_internal = self.fit_multiprocessing( fitness=fitness, model=model, analysis=analysis, ) return search_internal, fitness @property def sampler_cls(self): try: from nautilus import Sampler return Sampler except ModuleNotFoundError: raise ModuleNotFoundError( "\n--------------------\n" "You are attempting to perform a model-fit using Nautilus. \n\n" "However, the optional library Nautilus (https://nautilus-sampler.readthedocs.io/en/stable/index.html) is " "not installed.\n\n" "Install it via the command `pip install nautilus-sampler==1.0.5`.\n\n" "----------------------" ) @property def checkpoint_file(self): """ The path to the file used for checkpointing. If autofit is not outputting results to hard-disk (e.g. paths is `NullPaths`), this function is bypassed. """ try: return self.paths.search_internal_path / "checkpoint.hdf5" except TypeError: pass
[docs] def fit_x1_cpu(self, fitness, model, analysis): """ Perform the non-linear search, using one CPU core. This is used if the likelihood function calls external libraries that cannot be parallelized or use threading in a way that conflicts with the parallelization of the non-linear search. Parameters ---------- fitness The function which takes a model instance and returns its log likelihood via the Analysis class model The model which maps parameters chosen via the non-linear search (e.g. via the priors or sampling) to instances of the model, which are passed to the fitness function. analysis Contains the data and the log likelihood function which fits an instance of the model to the data, returning the log likelihood the search maximizes. """ if analysis._use_jax: self.logger.info( "Running search with JAX vectorization (parallelization handled by JAX)." ) else: self.logger.info( "Running search where parallelization is disabled." ) search_internal = self.sampler_cls( prior=PriorVectorized(model=model), likelihood=fitness.call_wrap, n_dim=model.prior_count, filepath=self.checkpoint_file, pool=None, vectorized=fitness.use_jax_vmap, n_live=self.n_live, n_update=self.n_update, enlarge_per_dim=self.enlarge_per_dim, n_points_min=self.n_points_min, split_threshold=self.split_threshold, n_networks=self.n_networks, n_batch=self.n_batch, n_like_new_bound=self.n_like_new_bound, seed=self.seed, ) return self.call_search(search_internal=search_internal, model=model, analysis=analysis, fitness=fitness)
[docs] def fit_multiprocessing(self, fitness, model, analysis): """ Perform the non-linear search, using multiple CPU cores parallelized via Python's multiprocessing module. This uses PyAutoFit's sneaky pool class, which allows us to use the multiprocessing module in a way that plays nicely with the non-linear search (e.g. exception handling, keyboard interupts, etc.). Multiprocessing parallelization can only parallelize across multiple cores on a single device, it cannot be distributed across multiple devices or computing nodes. For that, use the `fit_mpi` method. Parameters ---------- fitness The function which takes a model instance and returns its log likelihood via the Analysis class model The model which maps parameters chosen via the non-linear search (e.g. via the priors or sampling) to instances of the model, which are passed to the fitness function. analysis Contains the data and the log likelihood function which fits an instance of the model to the data, returning the log likelihood the search maximizes. """ search_internal = self.sampler_cls( prior=PriorVectorized(model=model), likelihood=fitness.call_wrap, n_dim=model.prior_count, filepath=self.checkpoint_file, pool=self.number_of_cores, n_live=self.n_live, n_update=self.n_update, enlarge_per_dim=self.enlarge_per_dim, n_points_min=self.n_points_min, split_threshold=self.split_threshold, n_networks=self.n_networks, n_batch=self.n_batch, n_like_new_bound=self.n_like_new_bound, vectorized=self.vectorized, seed=self.seed, ) search_internal = self.call_search( search_internal=search_internal, model=model, analysis=analysis, fitness=fitness ) # Nautilus creates its own multiprocessing.Pool internally when pool=N. # Close them here so their finalizers don't fire at interpreter shutdown # (after pickle has been torn down, causing AttributeError on Pool.__del__). for pool_attr in ("pool_l", "pool_s"): pool = getattr(search_internal, pool_attr, None) if pool is not None: try: pool.close() pool.join() except Exception: pass setattr(search_internal, pool_attr, None) return search_internal
[docs] def iterations_from( self, search_internal ) -> Tuple[int, int]: """ Returns the next number of iterations that a dynesty call will use and the total number of iterations that have been performed so far. This is used so that the `iterations_per_full_update` input leads to on-the-fly output of dynesty results. It also ensures dynesty does not perform more samples than the `n_like_max` input variable. Parameters ---------- search_internal The Dynesty sampler (static or dynamic) which is run and performs nested sampling. Returns ------- The next number of iterations that a dynesty run sampling will perform and the total number of iterations it has performed so far. """ if isinstance(self.paths, NullPaths): if self.n_like_max is not None and self.n_like_max != float("inf"): return int(self.n_like_max), int(self.n_like_max) return int(1e99), int(1e99) try: total_iterations = len(search_internal.posterior()[1]) except ValueError: total_iterations = 0 iterations = total_iterations + self.iterations_per_full_update if self.n_like_max is not None and self.n_like_max != float("inf"): if iterations > self.n_like_max: iterations = int(self.n_like_max) return iterations, total_iterations
[docs] def output_search_internal(self, search_internal): """ Output the sampler results to hard-disk in their internal format. The multiprocessing `Pool` object cannot be pickled and thus the sampler cannot be saved to hard-disk. This function therefore extracts the necessary information from the sampler and saves it to hard-disk. Parameters ---------- sampler The nautilus sampler object containing the results of the model-fit. """ pool_l = search_internal.pool_l pool_s = search_internal.pool_s search_internal.pool_l = None search_internal.pool_s = None self.paths.save_search_internal( obj=search_internal, ) search_internal.pool_l = pool_l search_internal.pool_s = pool_s try: os.remove(self.checkpoint_file) except (TypeError, FileNotFoundError): pass
[docs] def samples_info_from(self, search_internal=None): return { "log_evidence": search_internal.evidence(), "total_samples": int(search_internal.n_like), "total_accepted_samples": int(search_internal.n_like), "time": self.timer.time if self.timer else None, "number_live_points": int(search_internal.n_live), }
[docs] def samples_via_internal_from( self, model: AbstractPriorModel, search_internal=None ): """ Returns a `Samples` object from the nautilus internal results. The samples contain all information on the parameter space sampling (e.g. the parameters, log likelihoods, etc.). The internal search results are converted from the native format used by the search to lists of values (e.g. `parameter_lists`, `log_likelihood_list`). Parameters ---------- model Maps input vectors of unit parameter values to physical values and model instances via priors. """ if search_internal is None: search_internal = self.paths.load_search_internal() parameters, log_weights, log_likelihoods = search_internal.posterior() parameter_lists = parameters.tolist() log_likelihood_list = log_likelihoods.tolist() weight_list = np.exp(log_weights).tolist() log_prior_list = [ sum(model.log_prior_list_from_vector(vector=vector)) for vector in parameter_lists ] sample_list = Sample.from_lists( model=model, parameter_lists=parameter_lists, log_likelihood_list=log_likelihood_list, log_prior_list=log_prior_list, weight_list=weight_list, ) return SamplesNest( model=model, sample_list=sample_list, samples_info=self.samples_info_from(search_internal=search_internal), )
@property def batch_size(self): return self.n_batch