Source code for sem.database

import os
from functools import reduce
import itertools
from operator import and_, or_
from pathlib import Path
from copy import deepcopy
import shutil
import collections
import glob
from pprint import pformat
from tinydb import TinyDB, where
from tinydb.storages import JSONStorage
from tinydb.middlewares import CachingMiddleware

REUSE_RNGRUN_VALUES = True

[docs]class DatabaseManager(object): """ This serves as an interface with the simulation campaign database. A database can either be created from scratch or loaded, via the new and load @classmethods. """ ################## # Initialization # ################## def __init__(self, db, campaign_dir): """ Initialize the DatabaseManager with a TinyDB instance. This function assumes that the DB is already complete with a config entry, as created by the new and load classmethods, and should not be called directly. Use the CampaignManager.new() and CampaignManager.load() facilities instead. """ self.campaign_dir = campaign_dir self.db = db self.maxrngrun = max([result['params']['RngRun'] for result in self.get_results()]) if self.get_results() else 0
[docs] @classmethod def new(cls, script, commit, params, campaign_dir, overwrite=False): """ Initialize a new class instance with a set configuration and filename. The created database has the same name of the campaign directory. Args: script (str): the ns-3 name of the script that will be used in this campaign; commit (str): the commit of the ns-3 installation that is used to run the simulations. params (list): a list of the parameters that can be used on the script. campaign_dir (str): The path of the file where to save the DB. overwrite (bool): Whether or not existing directories should be overwritten. """ # We only accept absolute paths if not Path(campaign_dir).is_absolute(): raise ValueError("Path is not absolute") # Make sure the directory does not exist already if Path(campaign_dir).exists() and not overwrite: raise FileExistsError("The specified directory already exists") elif Path(campaign_dir).exists() and overwrite: # Verify we are not deleting files belonging to the user campaign_dir_name = os.path.basename(campaign_dir) folder_contents = set(os.listdir(campaign_dir)) allowed_files = set( ['data', '%s.json' % campaign_dir_name] + # Allow hidden files (like .DS_STORE in macos) [os.path.basename(os.path.normpath(f)) for f in glob.glob(os.path.join(campaign_dir, ".*"))]) if(not folder_contents.issubset(allowed_files)): raise ValueError("The specified directory cannot be overwritten" " because it contains user files.") # This operation destroys data. shutil.rmtree(campaign_dir) # Create the directory and database file in it # The indent and separators ensure the database is human readable. os.makedirs(campaign_dir) tinydb = TinyDB(os.path.join(campaign_dir, "%s.json" % os.path.basename(campaign_dir)), storage=CachingMiddleware(JSONStorage)) # Save the configuration in the database config = { 'script': script, 'commit': commit, 'params': sorted(params) } tinydb.table('config').insert(config) tinydb.storage.flush() return cls(tinydb, campaign_dir)
[docs] @classmethod def load(cls, campaign_dir): """ Initialize from an existing database. It is assumed that the database json file has the same name as its containing folder. Args: campaign_dir (str): The path to the campaign directory. """ # We only accept absolute paths if not Path(campaign_dir).is_absolute(): raise ValueError("Path is not absolute") # Verify file exists if not Path(campaign_dir).exists(): raise ValueError("Directory does not exist") # Extract filename from campaign dir filename = "%s.json" % os.path.split(campaign_dir)[1] filepath = os.path.join(campaign_dir, filename) try: # Read TinyDB instance from file tinydb = TinyDB(filepath, storage=CachingMiddleware(JSONStorage)) # Make sure the configuration is a valid dictionary assert set( tinydb.table('config').all()[0].keys()) == set(['script', 'params', 'commit']) except: # Remove the database instance created by tinydb os.remove(filepath) raise ValueError("Specified campaign directory seems corrupt") return cls(tinydb, campaign_dir)
################### # Database access # ################### def write_to_disk(self): self.db.storage.flush()
[docs] def get_config(self): """ Return the configuration dictionary of this DatabaseManager's campaign. This is a dictionary containing the following keys: * script: the name of the script that is executed in the campaign. * params: a list of the command line parameters that can be used on the script. * commit: the commit at which the campaign is operating. """ # Read from self.db and return the config entry of the database return self.db.table('config').all()[0]
[docs] def get_data_dir(self): """ Return the data directory, which is simply campaign_directory/data. """ return os.path.join(self.campaign_dir, 'data')
[docs] def get_commit(self): """ Return the commit at which the campaign is operating. """ return self.get_config()['commit']
[docs] def get_script(self): """ Return the ns-3 script that is run in the campaign. """ return self.get_config()['script']
[docs] def get_params(self): """ Return a list containing the parameters that can be toggled. """ return self.get_config()['params']
[docs] def get_next_rngruns(self): """ Yield the next RngRun values that can be used in this campaign. """ available_runs = [result['params']['RngRun'] for result in self.get_results()] yield from DatabaseManager.get_next_values(self, available_runs)
def insert_results(self, results): # This dictionary serves as a model for how the keys in the newly # inserted result should be structured. example_result = { 'params': {k: ['...'] for k in self.get_params() + ['RngRun']}, 'meta': {k: ['...'] for k in ['elapsed_time', 'id']}, } for result in results: # Verify result format is correct if not(DatabaseManager.have_same_structure(result, example_result)): raise ValueError( '%s:\nExpected: %s\nGot: %s' % ( "Result dictionary does not correspond to database format", pformat(example_result, depth=1), pformat(result, depth=1))) # Insert results self.db.table('results').insert_multiple(results)
[docs] def insert_result(self, result): """ Insert a new result in the database. This function also verifies that the result dictionaries saved in the database have the following structure (with {'a': 1} representing a dictionary, 'a' a key and 1 its value):: { 'params': { 'param1': value1, 'param2': value2, ... 'RngRun': value3 }, 'meta': { 'elapsed_time': value4, 'id': value5 } } Where elapsed time is a float representing the seconds the simulation execution took, and id is a UUID uniquely identifying the result, and which is used to locate the output files in the campaign_dir/data folder. """ # This dictionary serves as a model for how the keys in the newly # inserted result should be structured. example_result = { 'params': {k: ['...'] for k in self.get_params() + ['RngRun']}, 'meta': {k: ['...'] for k in ['elapsed_time', 'id']}, } # Verify result format is correct if not(DatabaseManager.have_same_structure(result, example_result)): raise ValueError( '%s:\nExpected: %s\nGot: %s' % ( "Result dictionary does not correspond to database format", pformat(example_result, depth=1), pformat(result, depth=1))) # Insert result self.db.table('results').insert(deepcopy(result))
[docs] def get_results(self, params=None, result_id=None): """ Return all the results available from the database that fulfill some parameter combinations. If params is None (or not specified), return all results. If params is specified, it must be a dictionary specifying the result values we are interested in, with multiple values specified as lists. For example, if the following params value is used:: params = { 'param1': 'value1', 'param2': ['value2', 'value3'] } the database will be queried for results having param1 equal to value1, and param2 equal to value2 or value3. Not specifying a value for all the available parameters is allowed: unspecified parameters are assumed to be 'free', and can take any value. Returns: A list of results matching the query. Returned results have the same structure as results inserted with the insert_result method. """ # In this case, return all results # A cast to dict is necessary, since self.db.table() contains TinyDB's # Document object (which is simply a wrapper for a dictionary, thus the # simple cast). if result_id is not None: return [dict(i) for i in self.db.table('results').all() if i['meta']['id'] == result_id] if params is None: return [dict(i) for i in self.db.table('results').all()] # Verify parameter format is correct all_params = set(['RngRun'] + self.get_params()) param_subset = set(params.keys()) if not all_params.issuperset(param_subset): raise ValueError( '%s:\nParameters: %s\nQuery: %s' % ( 'Specified parameter keys do not match database format', all_params, param_subset)) # Convert values that are not lists into lists to later perform # iteration over values more naturally. Perform this on a new # dictionary not to modify the original copy. query_params = {} for key in params: if not isinstance(params[key], list): query_params[key] = [params[key]] else: query_params[key] = params[key] # Handle case where query params has no keys if not query_params.keys(): return [dict(i) for i in self.db.table('results').all()] # Create the TinyDB query # In the docstring example above, this is equivalent to: # AND(OR(param1 == value1), OR(param2 == value2, param2 == value3)) query = reduce(and_, [reduce(or_, [ where('params')[key] == v for v in value]) for key, value in query_params.items()]) return [dict(i) for i in self.db.table('results').search(query)]
[docs] def get_result_files(self, result): """ Return a dictionary containing filename: filepath values for each output file associated with an id. Result can be either a result dictionary (e.g., obtained with the get_results() method) or a result id. """ if isinstance(result, dict): result_id = result['meta']['id'] else: # Should already be a string containing the id result_id = result result_data_dir = os.path.join(self.get_data_dir(), result_id) filenames = next(os.walk(result_data_dir))[2] filename_path_pairs = [ (f, os.path.join(self.get_data_dir(), result_id, f)) for f in filenames] return {k: v for k, v in filename_path_pairs}
[docs] def get_complete_results(self, params=None, result_id=None): """ Return available results, analogously to what get_results does, but also read the corresponding output files for each result, and incorporate them in the result dictionary under the output key, as a dictionary of filename: file_contents. Args: params (dict): parameter specification of the desired parameter values, as described in the get_results documentation. In other words, results returned by this function will be in the form:: { 'params': { 'param1': value1, 'param2': value2, ... 'RngRun': value3 }, 'meta': { 'elapsed_time': value4, 'id': value5 } 'output': { 'stdout': stdout_as_string, 'stderr': stderr_as_string, 'file1': file1_as_string, ... } } Note that the stdout and stderr entries are always included, even if they are empty. """ if result_id is not None: results = deepcopy(self.get_results(result_id=result_id)) else: results = deepcopy(self.get_results(params)) for r in results: r['output'] = {} available_files = self.get_result_files(r['meta']['id']) for name, filepath in available_files.items(): with open(filepath, 'r') as file_contents: r['output'][name] = file_contents.read() return results
[docs] def wipe_results(self): """ Remove all results from the database. This also removes all output files, and cannot be undone. """ # Clean results table self.db.purge_table('results') self.write_to_disk() # Get rid of contents of data dir map(shutil.rmtree, glob.glob(os.path.join(self.get_data_dir(), '*.*')))
[docs] def delete_result(self, result): """ Remove the specified result from the database, based on its id. """ self.db.table('results').remove(where('meta')['id'] == result['meta']['id'])
############# # Utilities # ############# def __str__(self): """ Represent the database object as a human-readable string. """ configuration = self.get_config() return "script: %s\nparams: %s\nHEAD: %s" % ( configuration['script'], configuration['params'], configuration['commit'])
[docs] def get_next_values(self, values_list): """ Given a list of integers, this method yields the lowest integers that do not appear in the list. >>> import sem >>> v = [0, 1, 3, 4] >>> sem.DatabaseManager.get_next_values(v) [2, 5, 6, ...] """ if REUSE_RNGRUN_VALUES: yield from filter(lambda x: x not in values_list, itertools.count()) else: for next_value in filter(lambda x: x not in values_list, itertools.count(self.maxrngrun)): self.maxrngrun += 1 yield next_value
[docs] def have_same_structure(d1, d2): """ Given two dictionaries (possibly with other nested dictionaries as values), this function checks whether they have the same key structure. >>> from sem import DatabaseManager >>> d1 = {'a': 1, 'b': 2} >>> d2 = {'a': [], 'b': 3} >>> d3 = {'a': 4, 'c': 5} >>> DatabaseManager.have_same_structure(d1, d2) True >>> DatabaseManager.have_same_structure(d1, d3) False >>> d4 = {'a': {'c': 1}, 'b': 2} >>> d5 = {'a': {'c': 3}, 'b': 4} >>> d6 = {'a': {'c': 5, 'd': 6}, 'b': 7} >>> DatabaseManager.have_same_structure(d1, d4) False >>> DatabaseManager.have_same_structure(d4, d5) True >>> DatabaseManager.have_same_structure(d4, d6) False """ # Keys of this level are the same if set(d1.keys()) != set(d2.keys()): return False # Check nested dictionaries for k1, k2 in zip(sorted(d1.keys()), sorted(d2.keys())): # If one of the values is a dictionary and the other is not if isinstance(d1[k1], dict) != isinstance(d2[k2], dict): return False # If both are dictionaries, recur elif isinstance(d1[k1], dict) and isinstance(d2[k2], dict): if not DatabaseManager.have_same_structure(d1[k1], d2[k2]): return False return True
[docs] def get_all_values_of_all_params(self): """ Return a dictionary containing all values that are taken by all available parameters. Always returns the parameter list in alphabetical order. """ values = collections.OrderedDict([[p, []] for p in sorted(self.get_params())]) for result in self.get_results(): for param in self.get_params(): values[param] += [result['params'][param]] sorted_values = collections.OrderedDict([[k, sorted(list(set(values[k])))] for k in values.keys()]) for k in sorted_values.keys(): if sorted_values[k] == []: sorted_values[k] = None return sorted_values