Source code for sem.manager

import collections
import gc
import os
import shutil
from copy import deepcopy
from datetime import datetime
from pathlib import Path
from random import shuffle

from multiprocessing import Pool

import numpy as np
import xarray as xr
from scipy.io import savemat
from tqdm import tqdm

from .database import DatabaseManager
from .lptrunner import LptRunner
from .parallelrunner import ParallelRunner
from .conditionalrunner import ConditionalRunner
from .runner import SimulationRunner
from .utils import DRMAA_AVAILABLE, list_param_combinations
import pandas as pd

if DRMAA_AVAILABLE:
    from .gridrunner import GridRunner

def parse_result(param):
    result, function_yields_multiple_results, result_parsing_function, param_columns = param
    data = []
    if function_yields_multiple_results:
        for r in result_parsing_function(result):
            param_values = list(deepcopy(result['params']).values())
            if param_columns != 'all':
                param_keys = list(deepcopy(result['params']).keys())
                param_values_to_keep = [v for k, v in list(zip(param_keys, param_values)) if k in param_columns]
            else:
                param_values_to_keep = param_values
            param_values_to_keep += [r] if not isinstance(r, list) else r
            data += [param_values_to_keep]
    else:
        param_values = list(deepcopy(result['params']).values())
        if param_columns != 'all':
            param_keys = list(deepcopy(result['params']).keys())
            param_values_to_keep = list([v for k, v in list(zip(param_keys, param_values)) if k in param_columns])
        else:
            param_values_to_keep = param_values
        parsed = result_parsing_function(result)
        param_values_to_keep += [parsed] if not isinstance(parsed, list) else parsed
        data += [param_values_to_keep]
    return data

[docs]class CampaignManager(object): """ This Simulation Execution Manager class can be used as an interface to execute simulations and access the results of simulation campaigns. The CampaignManager class wraps up a DatabaseManager and a SimulationRunner, which are used internally but can also be accessed as public member variables. """ ####################################### # Campaign initialization and loading # ####################################### def __init__(self, campaign_db, campaign_runner, check_repo=True): """ Initialize the Simulation Execution Manager, using the provided CampaignManager and SimulationRunner instances. This method should never be used on its own, but only as a constructor from the new and load @classmethods. Args: campaign_db (DatabaseManager): the DatabaseManager object to associate to this campaign. campaign_runner (SimulationRunner): the SimulationRunner object to associate to this campaign. """ self.db = campaign_db self.runner = campaign_runner self.check_repo = check_repo # Check that the current repo commit corresponds to the one specified # in the campaign if self.check_repo: self.check_repo_ok()
[docs] @classmethod def new(cls, ns_path, script, campaign_dir, runner_type='Auto', overwrite=False, optimized=True, check_repo=True, skip_configuration=False, max_parallel_processes=None): """ Create a new campaign from an ns-3 installation and a campaign directory. This method will create a DatabaseManager, which will install a database in the specified campaign_dir. If a database is already available at the ns_path described in the specified campaign_dir and its configuration matches config, this instance is used instead. If the overwrite argument is set to True instead, the specified directory is wiped and a new campaign is created in its place. Furthermore, this method will initialize a SimulationRunner, of type specified by the runner_type parameter, which will be locked on the ns-3 installation at ns_path and set up to run the desired script. Finally, note that creation of a campaign requires a git repository to be initialized at the specified ns_path. This will allow SEM to save the commit at which the simulations are run, enforce reproducibility and avoid mixing results coming from different versions of ns-3 and its libraries. Args: ns_path (str): path to the ns-3 installation to employ in this campaign. script (str): ns-3 script that will be executed to run simulations. campaign_dir (str): path to the directory in which to save the simulation campaign database. runner_type (str): implementation of the SimulationRunner to use. Value can be: SimulationRunner (for running sequential simulations locally), ParallelRunner (for running parallel simulations locally), GridRunner (for running simulations using a DRMAA-compatible parallel task scheduler). Use Auto to automatically pick the best runner. overwrite (bool): whether to overwrite already existing campaign_dir folders. This deletes the directory if and only if it only contains files that were detected to be created by sem. optimized (bool): whether to configure the runner to employ an optimized ns-3 build. skip_configuration (bool): whether to skip the configuration step, and only perform compilation. NOTE: if skip_configuration=True and optimized=True, the build folder should be manually set to --out=build/optimized. """ # Convert paths to be absolute ns_path = os.path.abspath(ns_path) campaign_dir = os.path.abspath(campaign_dir) # Verify if the specified campaign is already available if Path(campaign_dir).exists() and not overwrite: # Try loading manager = CampaignManager.load(campaign_dir, ns_path, runner_type=runner_type, optimized=optimized, check_repo=check_repo, skip_configuration=skip_configuration, max_parallel_processes=max_parallel_processes) if manager.db.get_script() == script: return manager else: del manager # Initialize runner runner = CampaignManager.create_runner(ns_path, script, runner_type=runner_type, optimized=optimized, skip_configuration=skip_configuration, max_parallel_processes=max_parallel_processes) # Get list of parameters to save in the DB params = runner.get_available_parameters() # Get current commit commit = "" if check_repo: from git import Repo, exc repo = Repo(ns_path) commit = repo.head.commit.hexsha if repo.is_dirty(untracked_files=True): raise Exception("ns-3 repository is not clean") # Create a database manager from the configuration db = DatabaseManager.new(script=script, params=params, commit=commit, campaign_dir=campaign_dir, overwrite=overwrite) return cls(db, runner, check_repo)
[docs] @classmethod def load(cls, campaign_dir, ns_path=None, runner_type='Auto', optimized=True, check_repo=True, skip_configuration=False, max_parallel_processes=None): """ Load an existing simulation campaign. Note that specifying an ns-3 installation is not compulsory when using this method: existing results will be available, but in order to run additional simulations it will be necessary to specify a SimulationRunner object, and assign it to the CampaignManager. Args: campaign_dir (str): path to the directory in which to save the simulation campaign database. ns_path (str): path to the ns-3 installation to employ in this campaign. runner_type (str): implementation of the SimulationRunner to use. Value can be: SimulationRunner (for running sequential simulations locally), ParallelRunner (for running parallel simulations locally), GridRunner (for running simulations using a DRMAA-compatible parallel task scheduler). optimized (bool): whether to configure the runner to employ an optimized ns-3 build. skip_configuration (bool): whether to skip the configuration step, and only perform compilation. """ # Convert paths to be absolute if ns_path is not None: ns_path = os.path.abspath(ns_path) campaign_dir = os.path.abspath(campaign_dir) # Read the existing configuration into the new DatabaseManager db = DatabaseManager.load(campaign_dir) script = db.get_script() runner = None if ns_path is not None: runner = CampaignManager.create_runner(ns_path, script, runner_type, optimized, skip_configuration, max_parallel_processes=max_parallel_processes) return cls(db, runner, check_repo)
[docs] def create_runner(ns_path, script, runner_type='Auto', optimized=True, skip_configuration=False, max_parallel_processes=None): """ Create a SimulationRunner from a string containing the desired class implementation, and return it. Args: ns_path (str): path to the ns-3 installation to employ in this SimulationRunner. script (str): ns-3 script that will be executed to run simulations. runner_type (str): implementation of the SimulationRunner to use. Value can be: SimulationRunner (for running sequential simulations locally), ParallelRunner (for running parallel simulations locally), GridRunner (for running simulations using a DRMAA-compatible parallel task scheduler). If Auto, automatically pick the best available runner (GridRunner if DRMAA is available, ParallelRunner otherwise). optimized (bool): whether to configure the runner to employ an optimized ns-3 build. skip_configuration (bool): whether to skip the configuration step, and only perform compilation. """ # locals() contains a dictionary pairing class names with class # objects: we can create the object using the desired class starting # from its name. if runner_type == 'Auto' and DRMAA_AVAILABLE: runner_type = 'GridRunner' elif runner_type == 'Auto': runner_type = 'ParallelRunner' return locals().get(runner_type, globals().get(runner_type))( ns_path, script, optimized=optimized, skip_configuration=skip_configuration, max_parallel_processes=max_parallel_processes)
def check_and_fill_parameters(self, param_list, needs_rngrun): # Check all parameter combinations fully specify the desired simulation desired_params = list(self.db.get_params().keys()) for p in param_list: # Besides the parameters that were actually passed, we add the ones # that are always available in every script if isinstance(p, list): parameter = p[0] else: parameter = p passed = list(parameter.keys()) available = ['RngRun'] + desired_params if needs_rngrun else desired_params if set(passed) != set(available): not_supported_parameters = set(passed) - set(available) if not_supported_parameters: raise ValueError("The following parameters are " "not supported by the script: %s\n" % not_supported_parameters) # Automatically fill remaining parameters with defaults additional_required_parameters = set(available) - set(passed) for additional_parameter in additional_required_parameters: p[additional_parameter] = self.db.get_params()[additional_parameter] ###################### # Simulation running # ######################
[docs] def run_simulations(self, param_list, show_progress=True, stop_on_errors=True): """ Run several simulations specified by a list of parameter combinations. Note: this function does not verify whether we already have the required simulations in the database - it just runs all the parameter combinations that are specified in the list. Args: param_list (list): list of parameter combinations to execute. Items of this list are dictionaries, with one key for each parameter, and a value specifying the parameter value (which can be either a string or a number). show_progress (bool): whether or not to show a progress bar with percentage and expected remaining time. """ # Make sure we have a runner to run simulations with. # This can happen in case the simulation campaign is loaded and not # created from scratch. if self.runner is None: raise Exception("No runner was ever specified" " for this CampaignManager.") # Return if the list is empty if param_list == []: return self.check_and_fill_parameters(param_list, needs_rngrun=True) # Check that the current repo commit corresponds to the one specified # in the campaign if self.check_repo: self.check_repo_ok() # Build ns-3 before running any simulations # At this point, we can assume the project was already configured self.runner.configure_and_build(skip_configuration=True) # Shuffle simulations # This mixes up long and short simulations, and gives better time # estimates for the simple ParallelRunner. shuffle(param_list) # Offload simulation execution to self.runner # Note that this only creates a generator for the results, no # computation is performed on this line. results = self.runner.run_simulations(param_list, self.db.get_data_dir(), stop_on_errors=stop_on_errors) # Wrap the result generator in the progress bar generator. if show_progress: result_generator = tqdm(results, total=len(param_list), unit='simulation', desc='Running simulations') else: result_generator = results self.run_and_save_results(result_generator)
def run_and_save_results(self, result_generator, batch_results=True): # Insert result object in db. Using the generator here ensures we # save results as they are finalized by the SimulationRunner, and # that they are kept even if execution is terminated abruptly by # crashes or by a KeyboardInterrupt. results_batch = [] last_save_time = datetime.now() for result in result_generator: results_batch += [result] # Save results to disk once every 60 seconds if not batch_results: self.db.insert_results(results_batch) results_batch = [] elif (batch_results and (datetime.now() - last_save_time).total_seconds() > 60): self.db.insert_results(results_batch) self.db.write_to_disk() results_batch = [] last_save_time = datetime.now() self.db.insert_results(results_batch) self.db.write_to_disk()
[docs] def get_missing_simulations(self, param_list, runs=None, with_time_estimate=False): """ Return a list of the simulations among the required ones that are not available in the database. Args: param_list (list): a list of dictionaries containing all the parameters combinations. runs (int): an integer representing how many repetitions are wanted for each parameter combination, None if the dictionaries in param_list already feature the desired RngRun value. """ params_to_simulate = [] # Fill up a possibly impartial parameter definition with defaults if runs is None: self.check_and_fill_parameters (param_list, needs_rngrun=True) else: self.check_and_fill_parameters (param_list, needs_rngrun=False) if runs is not None: # Get next available runs from the database next_runs = self.db.get_next_rngruns() available_results = [r for r in self.db.get_results()] for param_comb in param_list: # Count how many param combinations we found, and remove them # from the list of available_results for faster searching in the # future needed_runs = runs if with_time_estimate: time_prediction = float("Inf") for i, r in enumerate(available_results): if param_comb == {k: r['params'][k] for k in r['params'].keys() if k != "RngRun"}: needed_runs -= 1 if with_time_estimate: time_prediction = float(r['meta']['elapsed_time']) new_param_combs = [] for needed_run in range(needed_runs): # Here it's important that we make copies of the # dictionaries, so that if we modify one we don't modify # the others. This is necessary because after this step, # typically, we will add the RngRun key which must be # different for each copy. new_param = deepcopy(param_comb) new_param['RngRun'] = next(next_runs) if with_time_estimate: new_param_combs += [[new_param, time_prediction]] else: new_param_combs += [new_param] params_to_simulate += new_param_combs else: for param_comb in param_list: previous_results = self.db.get_results(param_comb) if not previous_results: if with_time_estimate: # Try and find results with different RngRun to provide # a time prediction param_comb_no_rngrun = {k:param_comb[k] for k in param_comb.keys() if k != "RngRun"} prev_results_different_rngrun = self.db.get_results(param_comb_no_rngrun) if prev_results_different_rngrun: time_prediction = float(prev_results_different_rngrun[0]['meta']['elapsed_time']) else: time_prediction = float("Inf") params_to_simulate += [[param_comb, time_prediction]] else: params_to_simulate += [param_comb] return params_to_simulate
[docs] def run_missing_simulations(self, param_list, runs=None, condition_checking_function=None, stop_on_errors=True): """ Run the simulations from the parameter list that are not yet available in the database. This function also makes sure that we have at least runs replications for each parameter combination. Additionally, param_list can either be a list containing the desired parameter combinations or a dictionary containing multiple values for each parameter, to be expanded into a list. Args: param_list (list, dict): either a list of parameter combinations or a dictionary to be expanded into a list through the list_param_combinations function. runs (int): the number of runs to perform for each parameter combination. This parameter is only allowed if the param_list specification doesn't feature an 'RngRun' key already. """ # Expand the parameter specification param_list = list_param_combinations(param_list) # In this case, we need to run simulations in batches if runs is None and condition_checking_function: next_runs = self.db.get_next_rngruns() # Create a ConditionalRunner cr = ConditionalRunner(self.runner.path, self.runner.script, self.runner.optimized, max_parallel_processes=self.runner.max_parallel_processes) # Set up the runner's stopping condition function cr.stopping_function = lambda x: condition_checking_function(self, x) # Set up the runner's iterator for next runs cr.next_runs = next_runs # Fill up a possibly impartial parameter definition with defaults self.check_and_fill_parameters (param_list, needs_rngrun=False) self.run_and_save_results(cr.run_simulations(param_list, self.db.get_data_dir(), stop_on_errors=stop_on_errors), batch_results=False) # Otherwise, we just run all required runs for each combination if condition_checking_function is None: # If we are passed a list already, just run the missing simulations if isinstance(self.runner, LptRunner): self.run_simulations( self.get_missing_simulations(param_list, runs, with_time_estimate=True), stop_on_errors=stop_on_errors) else: self.run_simulations( self.get_missing_simulations(param_list, runs), stop_on_errors=stop_on_errors)
##################### # Result management # #####################
[docs] def get_results_as_dataframe(self, result_parsing_function, columns=None, params=None, runs=None, param_columns='all', drop_constant_columns=False, parallel_parsing=False, verbose=False): """ Return a Pandas DataFrame containing results parsed using a user-specified function. If function_yields_multiple_results if False, result_parsing_function is expected to return a list of outputs for each parsed result, and column should contain an equal number of labels describing the contents of the output list. If function_yields_multiple_results is True, instead, result_parsing_function is expected to return multiple lists of outputs, as described by the labels in columns, for each result. In this case, each result in the database will yield a number of rows in the output dataframe that is equal to the length of the result_parsing_function output computed on that result. Args: result_parsing_function (function): user-defined function, taking a result dictionary as input and returning a list of outputs or a list of lists of outputs. """ results_list = [] if params is not None: for param_combination in list_param_combinations(params): if runs is not None: results_list += list(self.db.get_results(param_combination))[:runs] else: results_list += list(self.db.get_results(param_combination)) else: results_list = list(self.db.get_results()) if result_parsing_function.__dict__.get('files_to_load', None) is not None: files_to_load = result_parsing_function.__dict__['files_to_load'] else: files_to_load = r".*" if result_parsing_function.__dict__.get('yields_multiple_results', None) is not None: function_yields_multiple_results = True else: function_yields_multiple_results = False if columns is None and result_parsing_function.__dict__.get('output_labels', None) is None: raise ValueError("Please either specify a column parameter or decorate your function with the @sem.utils.output_labels decorator") elif columns is None: columns = result_parsing_function.__dict__['output_labels'] data = [] if parallel_parsing: with Pool(processes=self.runner.max_parallel_processes) as pool: for parsed_result in tqdm(pool.imap_unordered(parse_result, [[self.db.get_complete_results(result_id=result['meta']['id'], files_to_load=files_to_load)[0], function_yields_multiple_results, result_parsing_function, param_columns] for result in results_list]), total=len(results_list), unit='result', desc='Parsing Results', disable=not verbose): data += parsed_result else: for parsed_result in tqdm((parse_result([self.db.get_complete_results(result_id=result['meta']['id'], files_to_load=files_to_load)[0], function_yields_multiple_results, result_parsing_function, param_columns]) for result in results_list), total=len(results_list), unit='result', desc='Parsing Results', disable=not verbose): data += parsed_result if param_columns == 'all': param_columns = list(self.db.get_results()[0]['params'].keys()) all_columns = ([k for k in list(self.db.get_results()[0]['params'].keys()) if k in param_columns] + columns) df = pd.DataFrame(data, columns=all_columns) if drop_constant_columns: nunique = df.apply(pd.Series.nunique) cols_to_drop = nunique[nunique == 1].index df = df.drop(cols_to_drop, axis=1) return df else: return df
[docs] def get_results_as_numpy_array(self, parameter_space, result_parsing_function, runs=None, extract_complete_results=True): """ Return the results relative to the desired parameter space in the form of a numpy array. Args: parameter_space (dict): dictionary containing parameter/list-of-values pairs. result_parsing_function (function): user-defined function, taking a result dictionary as argument, that can be used to parse the result files and return a list of values. runs (int): number of runs to gather for each parameter combination. """ data = self.get_space( self.db.get_results(), {}, collections.OrderedDict([(k, v) for k, v in parameter_space.items()]), result_parsing_function, runs, extract_complete_results) return np.array(data)
[docs] def save_to_mat_file(self, parameter_space, result_parsing_function, filename, runs): """ Return the results relative to the desired parameter space in the form of a .mat file. Args: parameter_space (dict): dictionary containing parameter/list-of-values pairs. result_parsing_function (function): user-defined function, taking a result dictionary as argument, that can be used to parse the result files and return a list of values. filename (path): name of output .mat file. runs (int): number of runs to gather for each parameter combination. """ # Make sure all values are lists for key in parameter_space: if not isinstance(parameter_space[key], list): parameter_space[key] = [parameter_space[key]] # Add a dimension label for each non-singular dimension dimension_labels = [{key: np.array(parameter_space[key], dtype=np.object)} for key in parameter_space.keys() if len(parameter_space[key]) > 0] + [{'runs': range(runs)}] # Create a list of the parameter names return savemat( filename, {'results': self.get_results_as_numpy_array(parameter_space, result_parsing_function, runs=runs).astype(np.object), 'dimension_labels': dimension_labels})
[docs] def save_to_npy_file(self, parameter_space, result_parsing_function, filename, runs): """ Save results to a numpy array file format. """ np.save(filename, self.get_results_as_numpy_array( parameter_space, result_parsing_function, runs=runs))
[docs] def save_to_folders(self, parameter_space, folder_name, runs): """ Save results to a folder structure. """ self.space_to_folders(self.db.get_results(), {}, parameter_space, runs, folder_name)
[docs] def space_to_folders(self, current_result_list, current_query, param_space, runs, current_directory): """ Convert a parameter space specification to a directory tree with a nested structure. """ # Base case: we iterate over the runs and copy files in the final # directory. if not param_space: for run, r in enumerate(current_result_list[:runs]): files = self.db.get_result_files(r) new_dir = os.path.join(current_directory, "run=%s" % run) os.makedirs(new_dir, exist_ok=True) for filename, filepath in files.items(): shutil.copyfile(filepath, os.path.join(new_dir, filename)) return [key, value] = list(param_space.items())[0] # Iterate over dictionary values for v in value: next_query = deepcopy(current_query) temp_query = deepcopy(current_query) # For each list, recur 'fixing' that dimension. next_query[key] = v # Update query # Create folder folder_name = ("%s=%s" % (key, v)).replace('/', '_') new_dir = os.path.join(current_directory, folder_name) os.makedirs(new_dir, exist_ok=True) next_param_space = deepcopy(param_space) del(next_param_space[key]) temp_query[key] = v temp_result_list = [r for r in current_result_list if self.satisfies_query(r, temp_query)] self.space_to_folders(temp_result_list, next_query, next_param_space, runs, new_dir)
[docs] def get_results_as_xarray(self, parameter_space, result_parsing_function, output_labels, runs=None, extract_complete_results=True): """ Return the results relative to the desired parameter space in the form of an xarray data structure. Args: parameter_space (dict): The space of parameters to export. result_parsing_function (function): user-defined function, taking a result dictionary as argument, that can be used to parse the result files and return a list of values. output_labels (list): a list of labels to apply to the results dimensions, output by the result_parsing_function. runs (int): the number of runs to export for each parameter combination. """ # Create a parameter space only containing the variable parameters clean_parameter_space = collections.OrderedDict( [(k, v) if isinstance(v, list) else (k, [v]) for k, v in parameter_space.items()]) clean_parameter_space['runs'] = range(runs) if isinstance(output_labels, list): clean_parameter_space['metrics'] = output_labels data = self.get_space( self.db.get_results(), {}, collections.OrderedDict([(k, v) for k, v in parameter_space.items()]), result_parsing_function, runs) xr_array = xr.DataArray(data, coords=clean_parameter_space, dims=list(clean_parameter_space.keys())) return xr_array
[docs] def files_in_dictionary(result): """ Parsing function that returns a dictionary containing one entry for each file. Typically used to perform parsing externally. """ return result['output']
[docs] def get_space(self, current_result_list, current_query, param_space, result_parsing_function, runs=None, extract_complete_results=True): """ Convert a parameter space specification to a nested array structure representing the space. In other words, if the parameter space is:: param_space = { 'a': [1, 2], 'b': [3, 4] } the function will return a structure like the following:: [ [ {'a': 1, 'b': 3}, {'a': 1, 'b': 4} ], [ {'a': 2, 'b': 3}, {'a': 2, 'b': 4} ] ] where the first dimension represents a, and the second dimension represents b. This nested-array structure can then be easily converted to a numpy array via np.array(). Args: current_query (dict): the query to apply to the structure. param_space (dict): representation of the parameter space. result_parsing_function (function): user-defined function to call on results, typically used to parse data and outputting metrics. runs (int): the number of runs to query for each parameter combination. """ if result_parsing_function is None: result_parsing_function = CampaignManager.files_in_dictionary # Note that this function operates recursively. # Base case if not param_space: results = [r for r in current_result_list if self.satisfies_query(r, current_query)] parsed = [] for r in results[:runs]: # Make results complete, by reading the output from file # TODO Extract this into a function r['output'] = {} available_files = self.db.get_result_files(r['meta']['id']) for name, filepath in available_files.items(): if extract_complete_results: with open(filepath, 'r') as file_contents: r['output'][name] = file_contents.read() else: r['output'][name] = filepath parsed.append(result_parsing_function(r)) del r del results return parsed space = [] [key, value] = list(param_space.items())[0] if not isinstance(value, list): value = [value] # Iterate over dictionary values for v in value: next_query = deepcopy(current_query) temp_query = deepcopy(current_query) # For each list, recur 'fixing' that dimension. next_query[key] = v next_param_space = deepcopy(param_space) del(next_param_space[key]) temp_query[key] = v temp_result_list = [r for r in current_result_list if self.satisfies_query(r, temp_query)] space.append(self.get_space(temp_result_list, next_query, next_param_space, result_parsing_function, runs, extract_complete_results)) return space
def satisfies_query(self, result, query): for current_param, current_value in query.items(): if result['params'][current_param] != current_value: return False return True ############# # Utilities # ############# def __str__(self): """ Return a human-readable representation of the campaign. """ if self.runner: # if the campaign has a runner, print the db and the type of runner return "--- Campaign info ---\n%s\nRunner type: %s\n-----------" %\ (self.db, type(self.runner)) else: # the campaign has no runner, print only the db return "--- Campaign info ---\n%s\n-----------" % self.db
[docs] def check_repo_ok(self): """ Make sure that the ns-3 repository's HEAD commit is the same as the one saved in the campaign database, and that the ns-3 repository is clean (i.e., no untracked or modified files exist). """ from git import Repo, exc # Check that git is at the expected commit and that the repo is not # dirty if self.runner is not None: path = self.runner.path try: repo = Repo(path) except(exc.InvalidGitRepositoryError): raise Exception("No git repository detected.\nIn order to " "use SEM and its reproducibility enforcing " "features, please create a git repository at " "the root of your ns-3 project.") current_commit = repo.head.commit.hexsha campaign_commit = self.db.get_commit() if repo.is_dirty(untracked_files=True): raise Exception("ns-3 repository is not clean") if current_commit != campaign_commit: raise Exception("ns-3 repository is on a different commit " "from the one specified in the campaign")